XComp commented on code in PR #23490:
URL: https://github.com/apache/flink/pull/23490#discussion_r1368358256


##########
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaRecordHelper.java:
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava31.com.google.common.base.Defaults;
+
+import javax.annotation.Nullable;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/** Utilities for handling Java records nicely in the {@link PojoSerializer}. 
*/
+@Internal
+final class JavaRecordHelper<T> {
+
+    /** Record canonical constructor. */
+    private final Constructor<T> recordConstructor;
+
+    /**
+     * Record constructor parameter in case the new constructor has a 
different parameter order then
+     * the serialized data. Used for schema evolution. Null when not needed.

Review Comment:
   ```suggestion
        * the serialized data. Used for schema evolution or `null` if no schema 
evolution is applied for that record class.
   ```
   nit: Sorry for being stingy here. I burned myself on this topic in the past 
when working on the FileSystem delete logic. :innocent: What about making the 
`null` contract more explicit. `is not needed` still sounds a bit ambiguous 
(even with the schema evolution being mentioned in the sentence before).  Feel 
free to not follow it if you disagree



##########
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaRecordHelper.java:
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava31.com.google.common.base.Defaults;
+
+import javax.annotation.Nullable;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/** Utilities for handling Java records nicely in the {@link PojoSerializer}. 
*/
+@Internal
+final class JavaRecordHelper<T> {

Review Comment:
   ```suggestion
   final class JavaRecordBuilderFactory<T> {
   ```
   Essentially it's a factory for `JavaRecordBuilder` instances. WDYT?



##########
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaRecordHelper.java:
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava31.com.google.common.base.Defaults;
+
+import javax.annotation.Nullable;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/** Utilities for handling Java records nicely in the {@link PojoSerializer}. 
*/
+@Internal
+final class JavaRecordHelper<T> {
+
+    /** Record canonical constructor. */
+    private final Constructor<T> recordConstructor;
+
+    /**
+     * Record constructor parameter in case the new constructor has a 
different parameter order then
+     * the serialized data. Used for schema evolution. Null when not needed.
+     */
+    @Nullable private final int[] paramIndexMapping;
+    /**
+     * Default record args used for newly introduced primitive fields during 
schema evolution. Null
+     * when not needed.
+     */
+    @Nullable private final Object[] defaultConstructorArgs;
+
+    private JavaRecordHelper(Constructor<T> recordConstructor) {
+        this(recordConstructor, null, null);

Review Comment:
   ```suggestion
           this(recordConstructor, null, new 
Object[recordConstructor.getParameterCount()]);
   ```
   Creating the defaultArgs even in this case would remove the necessity of 
doing `null` for that parameter. WDYT?



##########
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaRecordHelper.java:
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava31.com.google.common.base.Defaults;
+
+import javax.annotation.Nullable;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/** Utilities for handling Java records nicely in the {@link PojoSerializer}. 
*/
+@Internal
+final class JavaRecordHelper<T> {
+
+    /** Record canonical constructor. */

Review Comment:
   ```suggestion
   ```
   nit: I think that this comment isn't adding any value. We could rename the 
member variable if you think that the word `canonical` is needed. Feel free to 
keep the comment if you disagree.



##########
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaRecordHelper.java:
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava31.com.google.common.base.Defaults;
+
+import javax.annotation.Nullable;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/** Utilities for handling Java records nicely in the {@link PojoSerializer}. 
*/
+@Internal
+final class JavaRecordHelper<T> {
+
+    /** Record canonical constructor. */
+    private final Constructor<T> recordConstructor;
+
+    /**
+     * Record constructor parameter in case the new constructor has a 
different parameter order then

Review Comment:
   ```suggestion
        * Record constructor parameter in case the new constructor has a 
different parameter order than
   ```
   nit: one typo PR less in the future :innocent: 



##########
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaRecordHelper.java:
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava31.com.google.common.base.Defaults;
+
+import javax.annotation.Nullable;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/** Utilities for handling Java records nicely in the {@link PojoSerializer}. 
*/
+@Internal
+final class JavaRecordHelper<T> {
+
+    /** Record canonical constructor. */
+    private final Constructor<T> recordConstructor;
+
+    /**
+     * Record constructor parameter in case the new constructor has a 
different parameter order then
+     * the serialized data. Used for schema evolution. Null when not needed.
+     */
+    @Nullable private final int[] paramIndexMapping;
+    /**
+     * Default record args used for newly introduced primitive fields during 
schema evolution. Null
+     * when not needed.
+     */
+    @Nullable private final Object[] defaultConstructorArgs;
+
+    private JavaRecordHelper(Constructor<T> recordConstructor) {
+        this(recordConstructor, null, null);
+    }
+
+    private JavaRecordHelper(
+            Constructor<T> recordConstructor,
+            int[] argIndexMapping,
+            Object[] defaultConstructorArgs) {
+        Preconditions.checkArgument((argIndexMapping == null) == 
(defaultConstructorArgs == null));
+        this.recordConstructor = recordConstructor;
+        this.paramIndexMapping = argIndexMapping;
+        this.defaultConstructorArgs = defaultConstructorArgs;
+    }
+
+    public JavaRecordBuilder newBuilder() {

Review Comment:
   ```suggestion
       public JavaRecordBuilder create() {
   ```
   ...if we go with `JavaRecordBuilderFactory`.Usually, `newBuilder()` returns 
a Builder for the class itself (i.e. `JavaRecordHelper`). The Factory pattern 
make the relationship clearer.



##########
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaRecordHelper.java:
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava31.com.google.common.base.Defaults;
+
+import javax.annotation.Nullable;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/** Utilities for handling Java records nicely in the {@link PojoSerializer}. 
*/
+@Internal
+final class JavaRecordHelper<T> {
+
+    /** Record canonical constructor. */
+    private final Constructor<T> recordConstructor;
+
+    /**
+     * Record constructor parameter in case the new constructor has a 
different parameter order then
+     * the serialized data. Used for schema evolution. Null when not needed.
+     */
+    @Nullable private final int[] paramIndexMapping;
+    /**
+     * Default record args used for newly introduced primitive fields during 
schema evolution. Null
+     * when not needed.
+     */
+    @Nullable private final Object[] defaultConstructorArgs;
+
+    private JavaRecordHelper(Constructor<T> recordConstructor) {
+        this(recordConstructor, null, null);
+    }
+
+    private JavaRecordHelper(
+            Constructor<T> recordConstructor,
+            int[] argIndexMapping,
+            Object[] defaultConstructorArgs) {
+        Preconditions.checkArgument((argIndexMapping == null) == 
(defaultConstructorArgs == null));
+        this.recordConstructor = recordConstructor;
+        this.paramIndexMapping = argIndexMapping;
+        this.defaultConstructorArgs = defaultConstructorArgs;
+    }
+
+    public JavaRecordBuilder newBuilder() {
+        return new JavaRecordBuilder();
+    }
+
+    /** Builder class for incremental record construction. */
+    @Internal
+    final class JavaRecordBuilder {
+        private final Object[] args;
+
+        JavaRecordBuilder() {
+            if (defaultConstructorArgs == null) {
+                args = new Object[recordConstructor.getParameterCount()];
+            } else {
+                args = Arrays.copyOf(defaultConstructorArgs, 
defaultConstructorArgs.length);
+            }
+        }
+
+        T build() {
+            try {
+                return recordConstructor.newInstance(args);
+            } catch (Exception e) {
+                throw new RuntimeException("Could not instantiate record", e);
+            }
+        }
+
+        void setParam(int i, Object field) {
+            if (paramIndexMapping != null) {
+                args[paramIndexMapping[i]] = field;
+            } else {
+                args[i] = field;
+            }
+        }
+    }
+
+    public static <T> JavaRecordHelper<T> create(Class<T> clazz, Field[] 
fields) {
+        try {
+            Object[] recordComponents =
+                    (Object[]) 
Class.class.getMethod("getRecordComponents").invoke(clazz);
+
+            Class<?>[] componentTypes = new Class[recordComponents.length];
+            List<String> componentNames = new 
ArrayList<>(recordComponents.length);
+
+            // We need to use reflection to access record components as they 
are not available in
+            // before Java 14
+            Method getType =
+                    
Class.forName("java.lang.reflect.RecordComponent").getMethod("getType");
+            Method getName =
+                    
Class.forName("java.lang.reflect.RecordComponent").getMethod("getName");
+            for (int i = 0; i < recordComponents.length; i++) {
+                componentNames.add((String) 
getName.invoke(recordComponents[i]));
+                componentTypes[i] = (Class<?>) 
getType.invoke(recordComponents[i]);
+            }
+            Constructor<T> recordConstructor = 
clazz.getDeclaredConstructor(componentTypes);
+            recordConstructor.setAccessible(true);
+
+            List<String> previousFields =
+                    Arrays.stream(fields)
+                            // There may be (removed) null fields due to 
schema evolution
+                            .filter(Objects::nonNull)
+                            .map(Field::getName)
+                            .collect(Collectors.toList());
+
+            // If the field names / order changed we know that we are 
migrating the records and arg
+            // index remapping may be necessary
+            boolean migrating = !previousFields.equals(componentNames);
+            if (migrating) {

Review Comment:
   ```suggestion
               if (!previousFields.equals(componentNames)) {
   ```
   we don't need the local variable anymore



##########
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java:
##########
@@ -89,6 +89,10 @@ public final class PojoSerializer<T> extends 
TypeSerializer<T> {
 
     private transient ClassLoader cl;
 
+    private final boolean isRecord;

Review Comment:
   ```suggestion
       private final boolean isRecord;
   ```
   Do we gain anything from having this final field during deserialization 
instead of doing a `null` check on the transient field `recordHelper`? The 
constructor is called, anyway, which would initialize `recordHelper` properly 
based on the `clazz` parameter. ...or am I missing something here? :thinking: 



##########
flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/Java17RecordHelperTest.java:
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.lang.reflect.Field;
+import java.util.Arrays;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for the @{@link JavaRecordHelper}. */
+class Java17RecordHelperTest {
+
+    Field[] fields;
+
+    record TestRecord(int i1, int i2, String s1, String s2) {}
+
+    @BeforeEach
+    void setup() {
+        fields = TestRecord.class.getDeclaredFields();
+    }
+
+    @Test
+    void testNoDefaultOrParamMapping() {
+        JavaRecordHelper<TestRecord> helper = 
JavaRecordHelper.create(TestRecord.class, fields);
+        JavaRecordHelper<TestRecord>.JavaRecordBuilder builder = 
helper.newBuilder();
+        builder.setParam(1, 100);
+        builder.setParam(0, 50);

Review Comment:
   ```suggestion
           builder.setParam(0, 50);
           builder.setParam(1, 100);
   ```
   nit: maybe, let's not confuse the reader more than necessary by switching 
orders. The order doesn't matter here, does it?



##########
pom.xml:
##########
@@ -1102,6 +1103,12 @@ under the License.
 
                <profile>
                        <id>java17-target</id>
+
+                       <!-- Include Java 17 specific tests (by not excluding 
them) -->
+                       <properties>

Review Comment:
   I see. Thanks for clarification :+1: 



##########
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java:
##########
@@ -118,6 +122,9 @@ public PojoSerializer(
                 createRegisteredSubclassSerializers(registeredSubclasses, 
executionConfig);
 
         this.subclassSerializerCache = new HashMap<>();
+        if (this.isRecord = TypeExtractor.isRecord(clazz)) {

Review Comment:
   Do we gain anything from having the final field `isRecord` during 
deserialization instead of doing a `null` check on the transient field 
`recordHelper`? AFAIU, the constructor is called, anyway, which would 
initialize `recordHelper` properly based on the clazz parameter. ...or am I 
missing something here? 🤔



##########
tools/maven/suppressions-core.xml:
##########
@@ -121,4 +121,7 @@ under the License.
        <suppress
                files="(.*)test[/\\](.*)testutils[/\\](.*)"
                
checks="RedundantModifier|JavadocParagraph|JavadocType|JavadocStyle|StaticVariableNameCheck|LocalFinalVariableName|EmptyLineSeparator"/>
+
+       <!-- suppress check java 17 tests -->
+       <suppress files="(.*)test[/\\].*[/\\]Java17.*.java" checks=".*"/>

Review Comment:
   Any opinion on specifying the test classes explicitly rather than using 
regex? ...since we don't have that many classes, yet :thinking: I'm fine with 
keeping it like it's now, though.



##########
flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/Java17RecordHelperTest.java:
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.lang.reflect.Field;
+import java.util.Arrays;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for the @{@link JavaRecordHelper}. */
+class Java17RecordHelperTest {
+
+    Field[] fields;
+
+    record TestRecord(int i1, int i2, String s1, String s2) {}
+
+    @BeforeEach
+    void setup() {
+        fields = TestRecord.class.getDeclaredFields();
+    }
+
+    @Test
+    void testNoDefaultOrParamMapping() {
+        JavaRecordHelper<TestRecord> helper = 
JavaRecordHelper.create(TestRecord.class, fields);
+        JavaRecordHelper<TestRecord>.JavaRecordBuilder builder = 
helper.newBuilder();
+        builder.setParam(1, 100);
+        builder.setParam(0, 50);
+        builder.setParam(3, "test");
+
+        assertThat(builder.build()).isEqualTo(new TestRecord(50, 100, null, 
"test"));
+    }
+
+    @Test
+    void testNewFieldsAdded() {
+        // Test restoring from fields [i2, s1]
+        JavaRecordHelper<TestRecord> helper =
+                JavaRecordHelper.create(TestRecord.class, 
Arrays.copyOfRange(fields, 1, 3));
+
+        JavaRecordHelper<TestRecord>.JavaRecordBuilder builder = 
helper.newBuilder();
+        builder.setParam(0, 100);
+        builder.setParam(1, "test");
+
+        assertThat(builder.build()).isEqualTo(new TestRecord(0, 100, "test", 
null));
+    }
+
+    @Test
+    void testFieldsAddedRemovedAndRearranged() {
+        Field[] oldFields = new Field[] {fields[3], null, fields[0]};
+        JavaRecordHelper<TestRecord> helper = 
JavaRecordHelper.create(TestRecord.class, oldFields);
+
+        JavaRecordHelper<TestRecord>.JavaRecordBuilder builder = 
helper.newBuilder();
+        builder.setParam(0, "test");
+        builder.setParam(2, 100);
+
+        assertThat(builder.build()).isEqualTo(new TestRecord(100, 0, null, 
"test"));
+    }
+
+    @Test
+    void testReorderFields() {
+        // Swap first and last field
+        Field temp = fields[0];
+        fields[0] = fields[3];
+        fields[3] = temp;
+
+        JavaRecordHelper<TestRecord> helper = 
JavaRecordHelper.create(TestRecord.class, fields);
+
+        JavaRecordHelper<TestRecord>.JavaRecordBuilder builder = 
helper.newBuilder();
+        builder.setParam(0, "4");
+        builder.setParam(1, 2);
+        builder.setParam(2, "3");
+        builder.setParam(3, 1);
+
+        assertThat(builder.build()).isEqualTo(new TestRecord(1, 2, "3", "4"));
+    }
+
+    @Test
+    void testMissingRequiredField() {
+        JavaRecordHelper<TestRecord> helper = 
JavaRecordHelper.create(TestRecord.class, fields);
+        JavaRecordHelper<TestRecord>.JavaRecordBuilder builder = 
helper.newBuilder();
+
+        builder.setParam(0, 50);
+        // Do not set required param 1
+
+        assertThatThrownBy(() -> builder.build())

Review Comment:
   ```suggestion
           assertThatThrownBy(builder::build)
   ```
   nit



##########
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaRecordHelper.java:
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava31.com.google.common.base.Defaults;
+
+import javax.annotation.Nullable;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/** Utilities for handling Java records nicely in the {@link PojoSerializer}. 
*/
+@Internal
+final class JavaRecordHelper<T> {
+
+    /** Record canonical constructor. */
+    private final Constructor<T> recordConstructor;
+
+    /**
+     * Record constructor parameter in case the new constructor has a 
different parameter order then
+     * the serialized data. Used for schema evolution. Null when not needed.
+     */
+    @Nullable private final int[] paramIndexMapping;
+    /**
+     * Default record args used for newly introduced primitive fields during 
schema evolution. Null
+     * when not needed.
+     */
+    @Nullable private final Object[] defaultConstructorArgs;
+
+    private JavaRecordHelper(Constructor<T> recordConstructor) {
+        this(recordConstructor, null, null);
+    }
+
+    private JavaRecordHelper(
+            Constructor<T> recordConstructor,
+            int[] argIndexMapping,
+            Object[] defaultConstructorArgs) {
+        Preconditions.checkArgument((argIndexMapping == null) == 
(defaultConstructorArgs == null));
+        this.recordConstructor = recordConstructor;
+        this.paramIndexMapping = argIndexMapping;
+        this.defaultConstructorArgs = defaultConstructorArgs;
+    }
+
+    public JavaRecordBuilder newBuilder() {
+        return new JavaRecordBuilder();
+    }
+
+    /** Builder class for incremental record construction. */
+    @Internal
+    final class JavaRecordBuilder {
+        private final Object[] args;
+
+        JavaRecordBuilder() {
+            if (defaultConstructorArgs == null) {
+                args = new Object[recordConstructor.getParameterCount()];
+            } else {
+                args = Arrays.copyOf(defaultConstructorArgs, 
defaultConstructorArgs.length);
+            }
+        }
+
+        T build() {
+            try {
+                return recordConstructor.newInstance(args);
+            } catch (Exception e) {
+                throw new RuntimeException("Could not instantiate record", e);
+            }
+        }
+
+        void setParam(int i, Object field) {
+            if (paramIndexMapping != null) {
+                args[paramIndexMapping[i]] = field;
+            } else {
+                args[i] = field;
+            }
+        }
+    }
+
+    public static <T> JavaRecordHelper<T> create(Class<T> clazz, Field[] 
fields) {
+        try {
+            Object[] recordComponents =
+                    (Object[]) 
Class.class.getMethod("getRecordComponents").invoke(clazz);
+
+            Class<?>[] componentTypes = new Class[recordComponents.length];
+            List<String> componentNames = new 
ArrayList<>(recordComponents.length);
+
+            // We need to use reflection to access record components as they 
are not available in
+            // before Java 14
+            Method getType =
+                    
Class.forName("java.lang.reflect.RecordComponent").getMethod("getType");
+            Method getName =
+                    
Class.forName("java.lang.reflect.RecordComponent").getMethod("getName");
+            for (int i = 0; i < recordComponents.length; i++) {
+                componentNames.add((String) 
getName.invoke(recordComponents[i]));
+                componentTypes[i] = (Class<?>) 
getType.invoke(recordComponents[i]);
+            }
+            Constructor<T> recordConstructor = 
clazz.getDeclaredConstructor(componentTypes);
+            recordConstructor.setAccessible(true);
+
+            List<String> previousFields =
+                    Arrays.stream(fields)
+                            // There may be (removed) null fields due to 
schema evolution

Review Comment:
   ```suggestion
                               // There may be removed fields due to schema 
evolution still listed
   ```
   again nitty: Maybe that's my English but I could read "removed null field" 
as the null field is already removed



##########
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaRecordSerializationHelper.java:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.annotation.Internal;
+
+import org.apache.flink.shaded.guava31.com.google.common.base.Defaults;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+/** Utilities for handling Java records nicely in the {@link PojoSerializer}. 
*/

Review Comment:
   Renaming the class to `JavaRecordBuilderFactory` would be good enough to 
address purpose of it without touching the JavaDoc, I guess :thinking:  



##########
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java:
##########
@@ -118,6 +122,9 @@ public PojoSerializer(
                 createRegisteredSubclassSerializers(registeredSubclasses, 
executionConfig);
 
         this.subclassSerializerCache = new HashMap<>();
+        if (this.isRecord = TypeExtractor.isRecord(clazz)) {

Review Comment:
   Do we gain anything from having the final field `isRecord` during 
deserialization instead of doing a `null` check on the transient field 
`recordHelper`? AFAIU, the constructor is called, anyway, which would 
initialize `recordHelper` properly based on the clazz parameter. ...or am I 
missing something here? 🤔



##########
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaRecordHelper.java:
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava31.com.google.common.base.Defaults;
+
+import javax.annotation.Nullable;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/** Utilities for handling Java records nicely in the {@link PojoSerializer}. 
*/
+@Internal
+final class JavaRecordHelper<T> {
+
+    /** Record canonical constructor. */
+    private final Constructor<T> recordConstructor;
+
+    /**
+     * Record constructor parameter in case the new constructor has a 
different parameter order then
+     * the serialized data. Used for schema evolution. Null when not needed.
+     */
+    @Nullable private final int[] paramIndexMapping;
+    /**
+     * Default record args used for newly introduced primitive fields during 
schema evolution. Null
+     * when not needed.
+     */
+    @Nullable private final Object[] defaultConstructorArgs;
+
+    private JavaRecordHelper(Constructor<T> recordConstructor) {
+        this(recordConstructor, null, null);
+    }
+
+    private JavaRecordHelper(
+            Constructor<T> recordConstructor,
+            int[] argIndexMapping,
+            Object[] defaultConstructorArgs) {
+        Preconditions.checkArgument((argIndexMapping == null) == 
(defaultConstructorArgs == null));
+        this.recordConstructor = recordConstructor;
+        this.paramIndexMapping = argIndexMapping;
+        this.defaultConstructorArgs = defaultConstructorArgs;
+    }
+
+    public JavaRecordBuilder newBuilder() {
+        return new JavaRecordBuilder();
+    }
+
+    /** Builder class for incremental record construction. */
+    @Internal
+    final class JavaRecordBuilder {
+        private final Object[] args;
+
+        JavaRecordBuilder() {
+            if (defaultConstructorArgs == null) {
+                args = new Object[recordConstructor.getParameterCount()];
+            } else {
+                args = Arrays.copyOf(defaultConstructorArgs, 
defaultConstructorArgs.length);
+            }
+        }
+
+        T build() {
+            try {
+                return recordConstructor.newInstance(args);
+            } catch (Exception e) {
+                throw new RuntimeException("Could not instantiate record", e);
+            }
+        }
+
+        void setParam(int i, Object field) {
+            if (paramIndexMapping != null) {
+                args[paramIndexMapping[i]] = field;
+            } else {
+                args[i] = field;
+            }
+        }
+    }
+
+    public static <T> JavaRecordHelper<T> create(Class<T> clazz, Field[] 
fields) {
+        try {
+            Object[] recordComponents =
+                    (Object[]) 
Class.class.getMethod("getRecordComponents").invoke(clazz);
+
+            Class<?>[] componentTypes = new Class[recordComponents.length];
+            List<String> componentNames = new 
ArrayList<>(recordComponents.length);
+
+            // We need to use reflection to access record components as they 
are not available in
+            // before Java 14
+            Method getType =
+                    
Class.forName("java.lang.reflect.RecordComponent").getMethod("getType");
+            Method getName =
+                    
Class.forName("java.lang.reflect.RecordComponent").getMethod("getName");
+            for (int i = 0; i < recordComponents.length; i++) {
+                componentNames.add((String) 
getName.invoke(recordComponents[i]));
+                componentTypes[i] = (Class<?>) 
getType.invoke(recordComponents[i]);
+            }
+            Constructor<T> recordConstructor = 
clazz.getDeclaredConstructor(componentTypes);
+            recordConstructor.setAccessible(true);
+
+            List<String> previousFields =
+                    Arrays.stream(fields)
+                            // There may be (removed) null fields due to 
schema evolution
+                            .filter(Objects::nonNull)
+                            .map(Field::getName)
+                            .collect(Collectors.toList());
+
+            // If the field names / order changed we know that we are 
migrating the records and arg
+            // index remapping may be necessary
+            boolean migrating = !previousFields.equals(componentNames);
+            if (migrating) {
+                // If the order / index of arguments changed in the new record 
class we have to map
+                // it, otherwise we pass the wrong arguments to the constructor
+                int[] argIndexMapping = new int[fields.length];
+                for (int i = 0; i < fields.length; i++) {
+                    Field field = fields[i];
+                    // There may be (removed) null fields due to schema 
evolution

Review Comment:
   ```suggestion
                       // There may be removed fields due to schema evolution 
still listed
   ```
   nit: same reasoning as above



##########
flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/Java17RecordHelperTest.java:
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.lang.reflect.Field;
+import java.util.Arrays;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for the @{@link JavaRecordHelper}. */
+class Java17RecordHelperTest {
+
+    Field[] fields;
+
+    record TestRecord(int i1, int i2, String s1, String s2) {}
+
+    @BeforeEach
+    void setup() {
+        fields = TestRecord.class.getDeclaredFields();
+    }
+
+    @Test
+    void testNoDefaultOrParamMapping() {
+        JavaRecordHelper<TestRecord> helper = 
JavaRecordHelper.create(TestRecord.class, fields);
+        JavaRecordHelper<TestRecord>.JavaRecordBuilder builder = 
helper.newBuilder();
+        builder.setParam(1, 100);
+        builder.setParam(0, 50);
+        builder.setParam(3, "test");
+
+        assertThat(builder.build()).isEqualTo(new TestRecord(50, 100, null, 
"test"));
+    }
+
+    @Test
+    void testNewFieldsAdded() {
+        // Test restoring from fields [i2, s1]
+        JavaRecordHelper<TestRecord> helper =
+                JavaRecordHelper.create(TestRecord.class, 
Arrays.copyOfRange(fields, 1, 3));
+
+        JavaRecordHelper<TestRecord>.JavaRecordBuilder builder = 
helper.newBuilder();
+        builder.setParam(0, 100);
+        builder.setParam(1, "test");

Review Comment:
   It took me a bit to figure out whether we should use the "new" or "old" 
parameter indexes here. That's an indication that we could make the method or 
parameter names of the `setParam` more explicit. But I cannot come up with a 
better naming. `setValueBasedOnPreviousParamIndex(..)` sounds reasonable. But 
I'm not a fan of the "previous" label in this context. :thinking: maybe, you 
have a better idea.



##########
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaRecordHelper.java:
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava31.com.google.common.base.Defaults;
+
+import javax.annotation.Nullable;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/** Utilities for handling Java records nicely in the {@link PojoSerializer}. 
*/
+@Internal
+final class JavaRecordHelper<T> {
+
+    /** Record canonical constructor. */
+    private final Constructor<T> recordConstructor;
+
+    /**
+     * Record constructor parameter in case the new constructor has a 
different parameter order then
+     * the serialized data. Used for schema evolution. Null when not needed.
+     */
+    @Nullable private final int[] paramIndexMapping;
+    /**
+     * Default record args used for newly introduced primitive fields during 
schema evolution. Null
+     * when not needed.
+     */
+    @Nullable private final Object[] defaultConstructorArgs;
+
+    private JavaRecordHelper(Constructor<T> recordConstructor) {
+        this(recordConstructor, null, null);
+    }
+
+    private JavaRecordHelper(
+            Constructor<T> recordConstructor,
+            int[] argIndexMapping,
+            Object[] defaultConstructorArgs) {
+        Preconditions.checkArgument((argIndexMapping == null) == 
(defaultConstructorArgs == null));
+        this.recordConstructor = recordConstructor;
+        this.paramIndexMapping = argIndexMapping;
+        this.defaultConstructorArgs = defaultConstructorArgs;
+    }
+
+    public JavaRecordBuilder newBuilder() {
+        return new JavaRecordBuilder();
+    }
+
+    /** Builder class for incremental record construction. */
+    @Internal
+    final class JavaRecordBuilder {
+        private final Object[] args;
+
+        JavaRecordBuilder() {
+            if (defaultConstructorArgs == null) {
+                args = new Object[recordConstructor.getParameterCount()];
+            } else {
+                args = Arrays.copyOf(defaultConstructorArgs, 
defaultConstructorArgs.length);
+            }
+        }
+
+        T build() {
+            try {
+                return recordConstructor.newInstance(args);
+            } catch (Exception e) {
+                throw new RuntimeException("Could not instantiate record", e);
+            }
+        }
+
+        void setParam(int i, Object field) {

Review Comment:
   ```suggestion
           void setParam(int parameterIndex, Object parameterValue) {
   ```



##########
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaRecordHelper.java:
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava31.com.google.common.base.Defaults;
+
+import javax.annotation.Nullable;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/** Utilities for handling Java records nicely in the {@link PojoSerializer}. 
*/
+@Internal
+final class JavaRecordHelper<T> {
+
+    /** Record canonical constructor. */
+    private final Constructor<T> recordConstructor;
+
+    /**
+     * Record constructor parameter in case the new constructor has a 
different parameter order then
+     * the serialized data. Used for schema evolution. Null when not needed.
+     */
+    @Nullable private final int[] paramIndexMapping;
+    /**
+     * Default record args used for newly introduced primitive fields during 
schema evolution. Null
+     * when not needed.
+     */
+    @Nullable private final Object[] defaultConstructorArgs;
+
+    private JavaRecordHelper(Constructor<T> recordConstructor) {
+        this(recordConstructor, null, null);
+    }
+
+    private JavaRecordHelper(
+            Constructor<T> recordConstructor,
+            int[] argIndexMapping,

Review Comment:
   ```suggestion
               @Nullable int[] argIndexMapping,
   ```



##########
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaRecordHelper.java:
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava31.com.google.common.base.Defaults;
+
+import javax.annotation.Nullable;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/** Utilities for handling Java records nicely in the {@link PojoSerializer}. 
*/
+@Internal
+final class JavaRecordHelper<T> {
+
+    /** Record canonical constructor. */
+    private final Constructor<T> recordConstructor;
+
+    /**
+     * Record constructor parameter in case the new constructor has a 
different parameter order then
+     * the serialized data. Used for schema evolution. Null when not needed.
+     */
+    @Nullable private final int[] paramIndexMapping;
+    /**
+     * Default record args used for newly introduced primitive fields during 
schema evolution. Null
+     * when not needed.
+     */
+    @Nullable private final Object[] defaultConstructorArgs;
+
+    private JavaRecordHelper(Constructor<T> recordConstructor) {
+        this(recordConstructor, null, null);
+    }
+
+    private JavaRecordHelper(
+            Constructor<T> recordConstructor,
+            int[] argIndexMapping,
+            Object[] defaultConstructorArgs) {
+        Preconditions.checkArgument((argIndexMapping == null) == 
(defaultConstructorArgs == null));
+        this.recordConstructor = recordConstructor;
+        this.paramIndexMapping = argIndexMapping;
+        this.defaultConstructorArgs = defaultConstructorArgs;
+    }
+
+    public JavaRecordBuilder newBuilder() {
+        return new JavaRecordBuilder();
+    }
+
+    /** Builder class for incremental record construction. */
+    @Internal
+    final class JavaRecordBuilder {
+        private final Object[] args;
+
+        JavaRecordBuilder() {
+            if (defaultConstructorArgs == null) {
+                args = new Object[recordConstructor.getParameterCount()];
+            } else {
+                args = Arrays.copyOf(defaultConstructorArgs, 
defaultConstructorArgs.length);
+            }
+        }
+
+        T build() {
+            try {
+                return recordConstructor.newInstance(args);
+            } catch (Exception e) {
+                throw new RuntimeException("Could not instantiate record", e);
+            }
+        }
+
+        void setParam(int i, Object field) {
+            if (paramIndexMapping != null) {
+                args[paramIndexMapping[i]] = field;
+            } else {
+                args[i] = field;
+            }
+        }
+    }
+
+    public static <T> JavaRecordHelper<T> create(Class<T> clazz, Field[] 
fields) {
+        try {
+            Object[] recordComponents =
+                    (Object[]) 
Class.class.getMethod("getRecordComponents").invoke(clazz);
+
+            Class<?>[] componentTypes = new Class[recordComponents.length];
+            List<String> componentNames = new 
ArrayList<>(recordComponents.length);
+
+            // We need to use reflection to access record components as they 
are not available in
+            // before Java 14
+            Method getType =
+                    
Class.forName("java.lang.reflect.RecordComponent").getMethod("getType");
+            Method getName =
+                    
Class.forName("java.lang.reflect.RecordComponent").getMethod("getName");
+            for (int i = 0; i < recordComponents.length; i++) {
+                componentNames.add((String) 
getName.invoke(recordComponents[i]));
+                componentTypes[i] = (Class<?>) 
getType.invoke(recordComponents[i]);
+            }
+            Constructor<T> recordConstructor = 
clazz.getDeclaredConstructor(componentTypes);
+            recordConstructor.setAccessible(true);
+
+            List<String> previousFields =
+                    Arrays.stream(fields)
+                            // There may be (removed) null fields due to 
schema evolution
+                            .filter(Objects::nonNull)
+                            .map(Field::getName)
+                            .collect(Collectors.toList());

Review Comment:
   ```suggestion
                               .toList();
   ```
   nit: Feel free to ignore these kind of suggestions if you think it's too 
much. I'd totally understand. I'm just mentioning it because Intellij raises 
the warning (at least in my setup). 
   
   I didn't do any changes to Intellij's default suggestions config. Fixing the 
warning at the beginning makes me hope that there are less mini PRs created in 
the future because someone found a (kind of reasonable) warning raised by 
Intellij.



##########
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java:
##########
@@ -89,6 +89,10 @@ public final class PojoSerializer<T> extends 
TypeSerializer<T> {
 
     private transient ClassLoader cl;
 
+    private final boolean isRecord;
+
+    private transient JavaRecordHelper<T> recordHelper;

Review Comment:
   ```suggestion
       @Nullable private transient JavaRecordHelper<T> recordHelper;
   ```



##########
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java:
##########
@@ -89,6 +89,10 @@ public final class PojoSerializer<T> extends 
TypeSerializer<T> {
 
     private transient ClassLoader cl;
 
+    private final boolean isRecord;
+
+    private transient JavaRecordSerializationHelper<T> recordHelper;
+
     /** Constructor to create a new {@link PojoSerializer}. */
     @SuppressWarnings("unchecked")
     public PojoSerializer(

Review Comment:
   :+1: I saw your JavaDoc extension for `Types.POJO`. But unfortunately, we 
have redundant documentation in `PojoTypeInfo` for this one. What about 
updating `PojoTypeInfo` and replacing the redundant docs with a `@see 
PojoTypeInfo` in the JavaDoc of `Types.POJO`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to