This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new ba752b9e8b3 [FLINK-32380] Support Java Records with
PojoTypeInfo/Serializer
ba752b9e8b3 is described below
commit ba752b9e8b3fa0fbbe67d6d1bd70cccbc74e6ca0
Author: Gyula Fora <[email protected]>
AuthorDate: Thu Nov 9 23:54:15 2023 -0800
[FLINK-32380] Support Java Records with PojoTypeInfo/Serializer
---
.../apache/flink/api/common/typeinfo/Types.java | 4 +
.../flink/api/java/typeutils/PojoTypeInfo.java | 4 +
.../flink/api/java/typeutils/TypeExtractor.java | 20 +-
.../runtime/JavaRecordBuilderFactory.java | 171 +++++++++++++++
.../api/java/typeutils/runtime/PojoSerializer.java | 164 +++++++++++----
.../api/common/typeutils/SerializerTestBase.java | 8 +-
.../runtime/Java17PojoRecordSerializerTest.java | 230 +++++++++++++++++++++
.../Java17PojoRecordSerializerUpgradeTest.java | 63 ++++++
...oRecordSerializerUpgradeTestSpecifications.java | 151 ++++++++++++++
.../runtime/Java17RecordBuilderFactoryTest.java | 112 ++++++++++
.../serializer-snapshot | Bin 0 -> 389 bytes
.../test-data | Bin 0 -> 14 bytes
.../serializer-snapshot | Bin 0 -> 380 bytes
.../pojo-serializer-to-record-1.19/test-data | Bin 0 -> 14 bytes
pom.xml | 10 +
tools/maven/suppressions-core.xml | 3 +
16 files changed, 894 insertions(+), 46 deletions(-)
diff --git
a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java
b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java
index a5c509b9ad0..a66e3be2b13 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java
@@ -269,6 +269,10 @@ public class Types {
*
* <p>The generic types for all fields of the POJO can be defined in a
hierarchy of subclasses.
*
+ * <p>Java Record classes can also be used as valid POJOs (even though
they don't fulfill some
+ * of the above criteria). In this case Flink will use the record
canonical constructor to
+ * create the objects.
+ *
* <p>If Flink's type analyzer is unable to extract a valid POJO type
information with type
* information for all fields, an {@link
* org.apache.flink.api.common.functions.InvalidTypesException} is thrown.
Alternatively, you
diff --git
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
index 021f9dfe333..97aa069b778 100644
---
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
+++
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
@@ -59,6 +59,10 @@ import static org.apache.flink.util.Preconditions.checkState;
* field can be null independent of the field's type.
* </ul>
*
+ * Java Record classes can also be used as valid POJOs (even though they don't
fulfill some of the
+ * above criteria). In this case Flink will use the record canonical
constructor to create the
+ * objects.
+ *
* @param <T> The type represented by this type information.
*/
@Public
diff --git
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index 939c988df62..6fc71cdd5bc 100644
---
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -2072,10 +2072,11 @@ public class TypeExtractor {
return new GenericTypeInfo<>(clazz);
}
+ boolean isRecord = isRecord(clazz);
List<PojoField> pojoFields = new ArrayList<>();
for (Field field : fields) {
Type fieldType = field.getGenericType();
- if (!isValidPojoField(field, clazz, typeHierarchy) && clazz !=
Row.class) {
+ if (!isRecord && !isValidPojoField(field, clazz, typeHierarchy) &&
clazz != Row.class) {
LOG.info(
"Class "
+ clazz
@@ -2140,6 +2141,11 @@ public class TypeExtractor {
}
}
+ if (isRecord) {
+ // no default constructor extraction needs to be applied for Java
records
+ return pojoType;
+ }
+
// Try retrieving the default constructor, if it does not have one
// we cannot use this because the serializer uses it.
Constructor<OUT> defaultConstructor = null;
@@ -2174,6 +2180,18 @@ public class TypeExtractor {
return pojoType;
}
+ /**
+ * Determine whether the given class is a valid Java record.
+ *
+ * @param clazz class to check
+ * @return True if the class is a Java record
+ */
+ @PublicEvolving
+ public static boolean isRecord(Class<?> clazz) {
+ return clazz.getSuperclass().getName().equals("java.lang.Record")
+ && (clazz.getModifiers() & Modifier.FINAL) != 0;
+ }
+
/**
* Recursively determine all declared fields This is required because
class.getFields() is not
* returning fields defined in parent classes.
diff --git
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaRecordBuilderFactory.java
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaRecordBuilderFactory.java
new file mode 100644
index 00000000000..ff5eefe2ee1
--- /dev/null
+++
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaRecordBuilderFactory.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava31.com.google.common.base.Defaults;
+
+import javax.annotation.Nullable;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/** Utility class for constructing Java records in the {@link PojoSerializer}.
*/
+@Internal
+final class JavaRecordBuilderFactory<T> {
+
+ private final Constructor<T> canonicalConstructor;
+
+ /**
+ * Record constructor parameter index mapping in case the new constructor
has a different
+ * parameter order than the serialized data. Used for schema evolution or
`null` if no schema
+ * evolution is applied for that record class.
+ */
+ @Nullable private final int[] paramIndexMapping;
+
+ /**
+ * Default record args used for newly introduced primitive fields during
schema evolution.
+ * `null` if no schema evolution is applied for that record class.
+ */
+ @Nullable private final Object[] defaultConstructorArgs;
+
+ private JavaRecordBuilderFactory(Constructor<T> canonicalConstructor) {
+ this(canonicalConstructor, null, null);
+ }
+
+ private JavaRecordBuilderFactory(
+ Constructor<T> canonicalConstructor,
+ @Nullable int[] argIndexMapping,
+ @Nullable Object[] defaultConstructorArgs) {
+ Preconditions.checkArgument((argIndexMapping == null) ==
(defaultConstructorArgs == null));
+ this.canonicalConstructor = canonicalConstructor;
+ this.paramIndexMapping = argIndexMapping;
+ this.defaultConstructorArgs = defaultConstructorArgs;
+ }
+
+ JavaRecordBuilder newBuilder() {
+ return new JavaRecordBuilder();
+ }
+
+ /** Builder class for incremental record construction. */
+ @Internal
+ final class JavaRecordBuilder {
+ private final Object[] args;
+
+ JavaRecordBuilder() {
+ if (defaultConstructorArgs == null) {
+ args = new Object[canonicalConstructor.getParameterCount()];
+ } else {
+ args = Arrays.copyOf(defaultConstructorArgs,
defaultConstructorArgs.length);
+ }
+ }
+
+ T build() {
+ try {
+ return canonicalConstructor.newInstance(args);
+ } catch (Exception e) {
+ throw new RuntimeException("Could not instantiate record", e);
+ }
+ }
+
+ /**
+ * Set record field by index. If parameter index mapping is provided,
the index is mapped,
+ * otherwise it is used as is.
+ *
+ * @param i index of field to be set
+ * @param value field value
+ */
+ void setField(int i, Object value) {
+ if (paramIndexMapping != null) {
+ args[paramIndexMapping[i]] = value;
+ } else {
+ args[i] = value;
+ }
+ }
+ }
+
+ static <T> JavaRecordBuilderFactory<T> create(Class<T> clazz, Field[]
fields) {
+ try {
+ Object[] recordComponents =
+ (Object[])
Class.class.getMethod("getRecordComponents").invoke(clazz);
+
+ Class<?>[] componentTypes = new Class[recordComponents.length];
+ List<String> componentNames = new
ArrayList<>(recordComponents.length);
+
+ // We need to use reflection to access record components as they
are not available in
+ // before Java 14
+ Method getType =
+
Class.forName("java.lang.reflect.RecordComponent").getMethod("getType");
+ Method getName =
+
Class.forName("java.lang.reflect.RecordComponent").getMethod("getName");
+ for (int i = 0; i < recordComponents.length; i++) {
+ componentNames.add((String)
getName.invoke(recordComponents[i]));
+ componentTypes[i] = (Class<?>)
getType.invoke(recordComponents[i]);
+ }
+ Constructor<T> recordConstructor =
clazz.getDeclaredConstructor(componentTypes);
+ recordConstructor.setAccessible(true);
+
+ List<String> previousFields =
+ Arrays.stream(fields)
+ // There may be (removed) null fields due to
schema evolution
+ .filter(Objects::nonNull)
+ .map(Field::getName)
+ .collect(Collectors.toList());
+
+ // If the field names / order changed we know that we are
migrating the records and arg
+ // index remapping may be necessary
+ boolean migrating = !previousFields.equals(componentNames);
+ if (migrating) {
+ // If the order / index of arguments changed in the new record
class we have to map
+ // it, otherwise we pass the wrong arguments to the constructor
+ int[] argIndexMapping = new int[fields.length];
+ for (int i = 0; i < fields.length; i++) {
+ Field field = fields[i];
+ // There may be (removed) null fields due to schema
evolution
+ argIndexMapping[i] =
+ field == null ? -1 :
componentNames.indexOf(fields[i].getName());
+ }
+
+ // We have to initialize newly added primitive fields to their
correct default value
+ Object[] defaultValues = new Object[componentNames.size()];
+ for (int i = 0; i < componentNames.size(); i++) {
+ Class<?> fieldType = componentTypes[i];
+ boolean newPrimitive =
+ fieldType.isPrimitive()
+ &&
!previousFields.contains(componentNames.get(i));
+ defaultValues[i] = newPrimitive ?
Defaults.defaultValue(fieldType) : null;
+ }
+ return new JavaRecordBuilderFactory<>(
+ recordConstructor, argIndexMapping, defaultValues);
+ } else {
+ return new JavaRecordBuilderFactory<>(recordConstructor);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Could not find record canonical
constructor", e);
+ }
+ }
+}
diff --git
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
index d7475ea7b11..0531af8a0e1 100644
---
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
+++
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
@@ -26,6 +26,8 @@ import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.util.CollectionUtil;
+import javax.annotation.Nullable;
+
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
@@ -89,6 +91,8 @@ public final class PojoSerializer<T> extends
TypeSerializer<T> {
private transient ClassLoader cl;
+ @Nullable private transient JavaRecordBuilderFactory<T> recordFactory;
+
/** Constructor to create a new {@link PojoSerializer}. */
@SuppressWarnings("unchecked")
public PojoSerializer(
@@ -118,6 +122,9 @@ public final class PojoSerializer<T> extends
TypeSerializer<T> {
createRegisteredSubclassSerializers(registeredSubclasses,
executionConfig);
this.subclassSerializerCache = new HashMap<>();
+ if (TypeExtractor.isRecord(clazz)) {
+ this.recordFactory = JavaRecordBuilderFactory.create(clazz,
fields);
+ }
}
/**
@@ -142,6 +149,9 @@ public final class PojoSerializer<T> extends
TypeSerializer<T> {
this.subclassSerializerCache = checkNotNull(subclassSerializerCache);
this.executionConfig = checkNotNull(executionConfig);
this.cl = Thread.currentThread().getContextClassLoader();
+ if (TypeExtractor.isRecord(clazz)) {
+ this.recordFactory = JavaRecordBuilderFactory.create(clazz,
fields);
+ }
}
@Override
@@ -149,6 +159,10 @@ public final class PojoSerializer<T> extends
TypeSerializer<T> {
return false;
}
+ private boolean isRecord() {
+ return this.recordFactory != null;
+ }
+
@Override
public PojoSerializer<T> duplicate() {
TypeSerializer<Object>[] duplicateFieldSerializers =
duplicateSerializers(fieldSerializers);
@@ -189,7 +203,7 @@ public final class PojoSerializer<T> extends
TypeSerializer<T> {
@Override
public T createInstance() {
- if (clazz.isInterface() || Modifier.isAbstract(clazz.getModifiers())) {
+ if (clazz.isInterface() || Modifier.isAbstract(clazz.getModifiers())
|| isRecord()) {
return null;
}
try {
@@ -221,7 +235,20 @@ public final class PojoSerializer<T> extends
TypeSerializer<T> {
}
Class<?> actualType = from.getClass();
- if (actualType == clazz) {
+ if (isRecord()) {
+ try {
+ JavaRecordBuilderFactory<T>.JavaRecordBuilder builder =
recordFactory.newBuilder();
+ for (int i = 0; i < numFields; i++) {
+ if (fields[i] != null) {
+ builder.setField(i, copyField(i, from));
+ }
+ }
+ return builder.build();
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException(
+ "Error during POJO copy, this should not happen since
we check the fields before.");
+ }
+ } else if (actualType == clazz) {
T target;
try {
target = (T) from.getClass().newInstance();
@@ -232,13 +259,7 @@ public final class PojoSerializer<T> extends
TypeSerializer<T> {
try {
for (int i = 0; i < numFields; i++) {
if (fields[i] != null) {
- Object value = fields[i].get(from);
- if (value != null) {
- Object copy = fieldSerializers[i].copy(value);
- fields[i].set(target, copy);
- } else {
- fields[i].set(target, null);
- }
+ fields[i].set(target, copyField(i, from));
}
}
} catch (IllegalAccessException e) {
@@ -266,23 +287,24 @@ public final class PojoSerializer<T> extends
TypeSerializer<T> {
return copy(from);
}
- if (actualType == clazz) {
+ if (isRecord()) {
try {
+ JavaRecordBuilderFactory<T>.JavaRecordBuilder builder =
recordFactory.newBuilder();
for (int i = 0; i < numFields; i++) {
if (fields[i] != null) {
- Object value = fields[i].get(from);
- if (value != null) {
- Object reuseValue = fields[i].get(reuse);
- Object copy;
- if (reuseValue != null) {
- copy = fieldSerializers[i].copy(value,
reuseValue);
- } else {
- copy = fieldSerializers[i].copy(value);
- }
- fields[i].set(reuse, copy);
- } else {
- fields[i].set(reuse, null);
- }
+ builder.setField(i, copyField(reuse, i, from));
+ }
+ }
+ return builder.build();
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException(
+ "Error during POJO copy, this should not happen since
we check the fields before.");
+ }
+ } else if (actualType == clazz) {
+ try {
+ for (int i = 0; i < numFields; i++) {
+ if (fields[i] != null) {
+ fields[i].set(reuse, copyField(reuse, i, from));
}
}
} catch (IllegalAccessException e) {
@@ -298,6 +320,30 @@ public final class PojoSerializer<T> extends
TypeSerializer<T> {
return reuse;
}
+ private Object copyField(int i, Object from) throws IllegalAccessException
{
+ Object value = fields[i].get(from);
+ if (value != null) {
+ return fieldSerializers[i].copy(value);
+ } else {
+ return null;
+ }
+ }
+
+ private Object copyField(T reuse, int i, Object from) throws
IllegalAccessException {
+ Object value = fields[i].get(from);
+ if (value != null) {
+ Object reuseValue = fields[i].get(reuse);
+
+ if (reuseValue != null) {
+ return fieldSerializers[i].copy(value, reuseValue);
+ } else {
+ return fieldSerializers[i].copy(value);
+ }
+ } else {
+ return null;
+ }
+ }
+
@Override
public int getLength() {
return -1;
@@ -400,21 +446,23 @@ public final class PojoSerializer<T> extends
TypeSerializer<T> {
target = createInstance();
}
- if ((flags & NO_SUBCLASS) != 0) {
+ if (isRecord()) {
+ JavaRecordBuilderFactory<T>.JavaRecordBuilder builder =
recordFactory.newBuilder();
+ for (int i = 0; i < numFields; i++) {
+ boolean isNull = source.readBoolean();
+ Object fieldValue = isNull ? null :
fieldSerializers[i].deserialize(source);
+ if (fields[i] != null) {
+ builder.setField(i, fieldValue);
+ }
+ }
+ target = builder.build();
+ } else if ((flags & NO_SUBCLASS) != 0) {
try {
for (int i = 0; i < numFields; i++) {
boolean isNull = source.readBoolean();
-
+ Object fieldValue = isNull ? null :
fieldSerializers[i].deserialize(source);
if (fields[i] != null) {
- if (isNull) {
- fields[i].set(target, null);
- } else {
- Object field =
fieldSerializers[i].deserialize(source);
- fields[i].set(target, field);
- }
- } else if (!isNull) {
- // read and dump a pre-existing field value
- fieldSerializers[i].deserialize(source);
+ fields[i].set(target, fieldValue);
}
}
} catch (IllegalAccessException e) {
@@ -473,25 +521,41 @@ public final class PojoSerializer<T> extends
TypeSerializer<T> {
}
}
- if ((flags & NO_SUBCLASS) != 0) {
+ if (isRecord()) {
try {
+ JavaRecordBuilderFactory<T>.JavaRecordBuilder builder =
recordFactory.newBuilder();
for (int i = 0; i < numFields; i++) {
boolean isNull = source.readBoolean();
if (fields[i] != null) {
if (isNull) {
- fields[i].set(reuse, null);
+ builder.setField(i, null);
} else {
- Object field;
+ Object reuseField = reuse == null ? null :
fields[i].get(reuse);
+ builder.setField(i, deserializeField(reuseField,
i, source));
+ }
+ } else if (!isNull) {
+ // read and dump a pre-existing field value
+ fieldSerializers[i].deserialize(source);
+ }
+ }
- Object reuseField = fields[i].get(reuse);
- if (reuseField != null) {
- field =
fieldSerializers[i].deserialize(reuseField, source);
- } else {
- field =
fieldSerializers[i].deserialize(source);
- }
+ reuse = builder.build();
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException(
+ "Error during POJO copy, this should not happen since
we check the fields before.",
+ e);
+ }
+ } else if ((flags & NO_SUBCLASS) != 0) {
+ try {
+ for (int i = 0; i < numFields; i++) {
+ boolean isNull = source.readBoolean();
- fields[i].set(reuse, field);
+ if (fields[i] != null) {
+ if (isNull) {
+ fields[i].set(reuse, null);
+ } else {
+ fields[i].set(reuse,
deserializeField(fields[i].get(reuse), i, source));
}
} else if (!isNull) {
// read and dump a pre-existing field value
@@ -512,6 +576,15 @@ public final class PojoSerializer<T> extends
TypeSerializer<T> {
return reuse;
}
+ private Object deserializeField(Object reuseField, int i, DataInputView
source)
+ throws IllegalAccessException, IOException {
+ if (reuseField != null) {
+ return fieldSerializers[i].deserialize(reuseField, source);
+ } else {
+ return fieldSerializers[i].deserialize(source);
+ }
+ }
+
@Override
public void copy(DataInputView source, DataOutputView target) throws
IOException {
// copy the flags
@@ -617,6 +690,9 @@ public final class PojoSerializer<T> extends
TypeSerializer<T> {
cl = Thread.currentThread().getContextClassLoader();
subclassSerializerCache = new HashMap<>();
+ if (TypeExtractor.isRecord(clazz)) {
+ this.recordFactory = JavaRecordBuilderFactory.create(clazz,
fields);
+ }
}
//
--------------------------------------------------------------------------------------------
diff --git
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
index e495fb4ff68..6dfdde50e6f 100644
---
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
+++
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
@@ -186,8 +186,11 @@ public abstract class SerializerTestBase<T> {
@Test
protected void testCopy() {
+ testCopy(getSerializer());
+ }
+
+ protected void testCopy(TypeSerializer<T> serializer) {
try {
- TypeSerializer<T> serializer = getSerializer();
T[] testData = getData();
for (T datum : testData) {
@@ -445,6 +448,9 @@ public abstract class SerializerTestBase<T> {
assertEquals(
"The copy of the serializer is not equal to the original
one.", ser1, ser2);
+
+ // Make sure the serializer can be used after cloning
+ testCopy(ser2);
} catch (Exception e) {
System.err.println(e.getMessage());
e.printStackTrace();
diff --git
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/Java17PojoRecordSerializerTest.java
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/Java17PojoRecordSerializerTest.java
new file mode 100644
index 00000000000..58e2c2a8535
--- /dev/null
+++
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/Java17PojoRecordSerializerTest.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.operators.Keys.ExpressionKeys;
+import org.apache.flink.api.common.operators.Keys.IncompatibleKeysException;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor;
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Objects;
+import java.util.Random;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** A test for the {@link PojoSerializer} with Java Records. */
+class Java17PojoRecordSerializerTest
+ extends
SerializerTestBase<Java17PojoRecordSerializerTest.TestUserClass> {
+ private final TypeInformation<TestUserClass> type =
+ TypeExtractor.getForClass(TestUserClass.class);
+
+ @Override
+ protected TypeSerializer<TestUserClass> createSerializer() {
+ TypeSerializer<TestUserClass> serializer = type.createSerializer(new
ExecutionConfig());
+ assertThat(serializer).isInstanceOf(PojoSerializer.class);
+ return serializer;
+ }
+
+ @Override
+ protected boolean allowNullInstances(TypeSerializer<TestUserClass>
serializer) {
+ return true;
+ }
+
+ @Override
+ protected int getLength() {
+ return -1;
+ }
+
+ @Override
+ protected Class<TestUserClass> getTypeClass() {
+ return TestUserClass.class;
+ }
+
+ @Override
+ protected TestUserClass[] getTestData() {
+ Random rnd = new Random(874597969123412341L);
+
+ return new TestUserClass[] {
+ new TestUserClass(
+ rnd.nextInt(),
+ "foo",
+ rnd.nextDouble(),
+ new Date(),
+ new NestedTestUserClass(
+ rnd.nextInt(), "foo@boo", rnd.nextDouble(), new
int[] {10, 11, 12})),
+ new TestUserClass(
+ rnd.nextInt(),
+ "bar",
+ rnd.nextDouble(),
+ null,
+ new NestedTestUserClass(
+ rnd.nextInt(), "bar@bas", rnd.nextDouble(), new
int[] {20, 21, 22})),
+ new TestUserClass(rnd.nextInt(), null, rnd.nextDouble(), null,
null),
+ new TestUserClass(
+ rnd.nextInt(),
+ "bar",
+ rnd.nextDouble(),
+ new Date(),
+ new NestedTestUserClass(
+ rnd.nextInt(), "bar@bas", rnd.nextDouble(), new
int[] {20, 21, 22}))
+ };
+ }
+
+ // User code class for testing the serializer
+ public record TestUserClass(
+ int dumm1, String dumm2, double dumm3, Date dumm5,
NestedTestUserClass nestedClass) {}
+
+ public static class NestedTestUserClass {
+ public int dumm1;
+ public String dumm2;
+ public double dumm3;
+ public int[] dumm4;
+
+ public NestedTestUserClass() {}
+
+ public NestedTestUserClass(int dumm1, String dumm2, double dumm3,
int[] dumm4) {
+ this.dumm1 = dumm1;
+ this.dumm2 = dumm2;
+ this.dumm3 = dumm3;
+ this.dumm4 = dumm4;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(dumm1, dumm2, dumm3, dumm4);
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (!(other instanceof NestedTestUserClass)) {
+ return false;
+ }
+ NestedTestUserClass otherTUC = (NestedTestUserClass) other;
+ if (dumm1 != otherTUC.dumm1) {
+ return false;
+ }
+ if (!dumm2.equals(otherTUC.dumm2)) {
+ return false;
+ }
+ if (dumm3 != otherTUC.dumm3) {
+ return false;
+ }
+ if (dumm4.length != otherTUC.dumm4.length) {
+ return false;
+ }
+ for (int i = 0; i < dumm4.length; i++) {
+ if (dumm4[i] != otherTUC.dumm4[i]) {
+ return false;
+ }
+ }
+ return true;
+ }
+ }
+
+ /** This tests if the hashes returned by the pojo and tuple comparators
are the same. */
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ @Test
+ void testTuplePojoTestEquality() throws IncompatibleKeysException {
+
+ // test with a simple, string-key first.
+ PojoTypeInfo<TestUserClass> pType = (PojoTypeInfo<TestUserClass>) type;
+ List<FlatFieldDescriptor> result = new ArrayList<>();
+ pType.getFlatFields("nestedClass.dumm2", 0, result);
+ int[] fields = new int[1]; // see below
+ fields[0] = result.get(0).getPosition();
+ TypeComparator<TestUserClass> pojoComp =
+ pType.createComparator(fields, new boolean[] {true}, 0, new
ExecutionConfig());
+
+ TestUserClass pojoTestRecord =
+ new TestUserClass(
+ 0,
+ "abc",
+ 3d,
+ new Date(),
+ new NestedTestUserClass(1, "haha", 4d, new int[] {5,
4, 3}));
+ int pHash = pojoComp.hash(pojoTestRecord);
+
+ Tuple1<String> tupleTest = new Tuple1<>("haha");
+ TupleTypeInfo<Tuple1<String>> tType =
+ (TupleTypeInfo<Tuple1<String>>)
TypeExtractor.getForObject(tupleTest);
+ TypeComparator<Tuple1<String>> tupleComp =
+ tType.createComparator(
+ new int[] {0}, new boolean[] {true}, 0, new
ExecutionConfig());
+
+ int tHash = tupleComp.hash(tupleTest);
+
+ assertThat(tHash)
+ .isEqualTo(pHash)
+ .withFailMessage(
+ "The hashing for tuples and pojos must be the same, so
that they are mixable");
+
+ Tuple3<Integer, String, Double> multiTupleTest =
+ new Tuple3<>(1, "haha", 4d); // its important here to use the
same values.
+ TupleTypeInfo<Tuple3<Integer, String, Double>> multiTupleType =
+ (TupleTypeInfo<Tuple3<Integer, String, Double>>)
+ TypeExtractor.getForObject(multiTupleTest);
+
+ ExpressionKeys fieldKey = new ExpressionKeys(new int[] {1, 0, 2},
multiTupleType);
+ ExpressionKeys expressKey =
+ new ExpressionKeys(
+ new String[] {
+ "nestedClass.dumm2", "nestedClass.dumm1",
"nestedClass.dumm3"
+ },
+ pType);
+
+ assertThat(fieldKey.areCompatible(expressKey))
+ .isTrue()
+ .withFailMessage("Expecting the keys to be compatible");
+ TypeComparator<TestUserClass> multiPojoComp =
+ pType.createComparator(
+ expressKey.computeLogicalKeyPositions(),
+ new boolean[] {true, true, true},
+ 0,
+ new ExecutionConfig());
+ int multiPojoHash = multiPojoComp.hash(pojoTestRecord);
+
+ // pojo order is: dumm2 (str), dumm1 (int), dumm3 (double).
+ TypeComparator<Tuple3<Integer, String, Double>> multiTupleComp =
+ multiTupleType.createComparator(
+ fieldKey.computeLogicalKeyPositions(),
+ new boolean[] {true, true, true},
+ 0,
+ new ExecutionConfig());
+ int multiTupleHash = multiTupleComp.hash(multiTupleTest);
+
+ assertThat(multiPojoHash)
+ .isEqualTo(multiTupleHash)
+ .withFailMessage(
+ "The hashing for tuples and pojos must be the same, so
that they are mixable. Also for those with multiple key fields");
+ }
+}
diff --git
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/Java17PojoRecordSerializerUpgradeTest.java
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/Java17PojoRecordSerializerUpgradeTest.java
new file mode 100644
index 00000000000..455c2bda826
--- /dev/null
+++
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/Java17PojoRecordSerializerUpgradeTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.FlinkVersion;
+import org.apache.flink.api.common.typeutils.TypeSerializerUpgradeTestBase;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+/** A {@link TypeSerializerUpgradeTestBase} for the {@link PojoSerializer}. */
+class Java17PojoRecordSerializerUpgradeTest
+ extends TypeSerializerUpgradeTestBase<
+
Java17PojoRecordSerializerUpgradeTestSpecifications.RecordMigrationSetup
+ .RecordBeforeMigration,
+
Java17PojoRecordSerializerUpgradeTestSpecifications.RecordMigrationVerifier
+ .RecordAfterSchemaUpgrade> {
+
+ @Override
+ public Collection<FlinkVersion> getMigrationVersions() {
+ List<FlinkVersion> testVersions = new ArrayList<>();
+ testVersions.add(FlinkVersion.v1_19);
+ return testVersions;
+ }
+
+ public Collection<TestSpecification<?, ?>>
createTestSpecifications(FlinkVersion flinkVersion)
+ throws Exception {
+ Collection<TestSpecification<?, ?>> testSpecifications = new
ArrayList<>();
+ testSpecifications.add(
+ new TestSpecification<>(
+ "pojo-serializer-to-record",
+ flinkVersion,
+
Java17PojoRecordSerializerUpgradeTestSpecifications.PojoToRecordSetup.class,
+
Java17PojoRecordSerializerUpgradeTestSpecifications.PojoToRecordVerifier
+ .class));
+ testSpecifications.add(
+ new TestSpecification<>(
+ "pojo-serializer-record-migration",
+ flinkVersion,
+
Java17PojoRecordSerializerUpgradeTestSpecifications.RecordMigrationSetup
+ .class,
+
Java17PojoRecordSerializerUpgradeTestSpecifications.RecordMigrationVerifier
+ .class));
+ return testSpecifications;
+ }
+}
diff --git
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/Java17PojoRecordSerializerUpgradeTestSpecifications.java
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/Java17PojoRecordSerializerUpgradeTestSpecifications.java
new file mode 100644
index 00000000000..158795a2125
--- /dev/null
+++
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/Java17PojoRecordSerializerUpgradeTestSpecifications.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.FlinkVersion;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.ClassRelocator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerMatchers;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
+import org.apache.flink.api.common.typeutils.TypeSerializerUpgradeTestBase;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+
+import org.hamcrest.Matcher;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertSame;
+
+/** A {@link TypeSerializerUpgradeTestBase} for the {@link PojoSerializer}. */
+class Java17PojoRecordSerializerUpgradeTestSpecifications {
+
+ public static final class PojoToRecordSetup
+ implements TypeSerializerUpgradeTestBase.PreUpgradeSetup<
+ PojoToRecordSetup.PojoBeforeUpgrade> {
+
+ @ClassRelocator.RelocateClass("TestPojoToRecord")
+ @SuppressWarnings("WeakerAccess")
+ public static class PojoBeforeUpgrade {
+ public int id;
+ public String name;
+
+ public PojoBeforeUpgrade() {}
+
+ public PojoBeforeUpgrade(int id, String name) {
+ this.id = id;
+ this.name = name;
+ }
+ }
+
+ @Override
+ public TypeSerializer<PojoBeforeUpgrade> createPriorSerializer() {
+ TypeSerializer<PojoBeforeUpgrade> serializer =
+ TypeExtractor.createTypeInfo(PojoBeforeUpgrade.class)
+ .createSerializer(new ExecutionConfig());
+ assertSame(PojoSerializer.class, serializer.getClass());
+ return serializer;
+ }
+
+ @Override
+ public PojoBeforeUpgrade createTestData() {
+ return new PojoBeforeUpgrade(911108, "Gordon");
+ }
+ }
+
+ public static final class PojoToRecordVerifier
+ implements TypeSerializerUpgradeTestBase.UpgradeVerifier<
+ PojoToRecordVerifier.PojoAfterUpgrade> {
+
+ @ClassRelocator.RelocateClass("TestPojoToRecord")
+ @SuppressWarnings("WeakerAccess")
+ public record PojoAfterUpgrade(int id, String name) {}
+
+ @Override
+ public TypeSerializer<PojoAfterUpgrade> createUpgradedSerializer() {
+ TypeSerializer<PojoAfterUpgrade> serializer =
+ TypeExtractor.createTypeInfo(PojoAfterUpgrade.class)
+ .createSerializer(new ExecutionConfig());
+ assertSame(PojoSerializer.class, serializer.getClass());
+ return serializer;
+ }
+
+ @Override
+ public Matcher<PojoAfterUpgrade> testDataMatcher() {
+ return is(new PojoAfterUpgrade(911108, "Gordon"));
+ }
+
+ @Override
+ public Matcher<TypeSerializerSchemaCompatibility<PojoAfterUpgrade>>
+ schemaCompatibilityMatcher(FlinkVersion version) {
+ return TypeSerializerMatchers.isCompatibleAsIs();
+ }
+ }
+
+ public static final class RecordMigrationSetup
+ implements TypeSerializerUpgradeTestBase.PreUpgradeSetup<
+ RecordMigrationSetup.RecordBeforeMigration> {
+
+ @ClassRelocator.RelocateClass("TestRecordMigration")
+ @SuppressWarnings("WeakerAccess")
+ public record RecordBeforeMigration(int id, String name) {}
+
+ @Override
+ public TypeSerializer<RecordBeforeMigration> createPriorSerializer() {
+ TypeSerializer<RecordBeforeMigration> serializer =
+ TypeExtractor.createTypeInfo(RecordBeforeMigration.class)
+ .createSerializer(new ExecutionConfig());
+ assertSame(PojoSerializer.class, serializer.getClass());
+ return serializer;
+ }
+
+ @Override
+ public RecordBeforeMigration createTestData() {
+ return new RecordBeforeMigration(911108, "Gordon");
+ }
+ }
+
+ public static final class RecordMigrationVerifier
+ implements TypeSerializerUpgradeTestBase.UpgradeVerifier<
+ RecordMigrationVerifier.RecordAfterSchemaUpgrade> {
+
+ @ClassRelocator.RelocateClass("TestRecordMigration")
+ @SuppressWarnings("WeakerAccess")
+ public record RecordAfterSchemaUpgrade(String name, int age, String
newField) {}
+
+ @Override
+ public TypeSerializer<RecordAfterSchemaUpgrade>
createUpgradedSerializer() {
+ TypeSerializer<RecordAfterSchemaUpgrade> serializer =
+
TypeExtractor.createTypeInfo(RecordAfterSchemaUpgrade.class)
+ .createSerializer(new ExecutionConfig());
+ assertSame(PojoSerializer.class, serializer.getClass());
+ return serializer;
+ }
+
+ @Override
+ public Matcher<RecordAfterSchemaUpgrade> testDataMatcher() {
+ return is(new RecordAfterSchemaUpgrade("Gordon", 0, null));
+ }
+
+ @Override
+ public
Matcher<TypeSerializerSchemaCompatibility<RecordAfterSchemaUpgrade>>
+ schemaCompatibilityMatcher(FlinkVersion version) {
+ return TypeSerializerMatchers.isCompatibleAfterMigration();
+ }
+ }
+}
diff --git
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/Java17RecordBuilderFactoryTest.java
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/Java17RecordBuilderFactoryTest.java
new file mode 100644
index 00000000000..f40f5b02eae
--- /dev/null
+++
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/Java17RecordBuilderFactoryTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.lang.reflect.Field;
+import java.util.Arrays;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for the @{@link JavaRecordBuilderFactory}. */
+class Java17RecordBuilderFactoryTest {
+
+ Field[] fields;
+
+ record TestRecord(int i1, int i2, String s1, String s2) {}
+
+ @BeforeEach
+ void setup() {
+ fields = TestRecord.class.getDeclaredFields();
+ }
+
+ @Test
+ void testNoDefaultOrParamMapping() {
+ JavaRecordBuilderFactory<TestRecord> helper =
+ JavaRecordBuilderFactory.create(TestRecord.class, fields);
+ JavaRecordBuilderFactory<TestRecord>.JavaRecordBuilder builder =
helper.newBuilder();
+ builder.setField(1, 100);
+ builder.setField(0, 50);
+ builder.setField(3, "test");
+
+ assertThat(builder.build()).isEqualTo(new TestRecord(50, 100, null,
"test"));
+ }
+
+ @Test
+ void testNewFieldsAdded() {
+ // Test restoring from fields [i2, s1]
+ JavaRecordBuilderFactory<TestRecord> helper =
+ JavaRecordBuilderFactory.create(TestRecord.class,
Arrays.copyOfRange(fields, 1, 3));
+
+ JavaRecordBuilderFactory<TestRecord>.JavaRecordBuilder builder =
helper.newBuilder();
+ builder.setField(0, 100);
+ builder.setField(1, "test");
+
+ assertThat(builder.build()).isEqualTo(new TestRecord(0, 100, "test",
null));
+ }
+
+ @Test
+ void testFieldsAddedRemovedAndRearranged() {
+ Field[] oldFields = new Field[] {fields[3], null, fields[0]};
+ JavaRecordBuilderFactory<TestRecord> helper =
+ JavaRecordBuilderFactory.create(TestRecord.class, oldFields);
+
+ JavaRecordBuilderFactory<TestRecord>.JavaRecordBuilder builder =
helper.newBuilder();
+ builder.setField(0, "test");
+ builder.setField(2, 100);
+
+ assertThat(builder.build()).isEqualTo(new TestRecord(100, 0, null,
"test"));
+ }
+
+ @Test
+ void testReorderFields() {
+ // Swap first and last field
+ Field temp = fields[0];
+ fields[0] = fields[3];
+ fields[3] = temp;
+
+ JavaRecordBuilderFactory<TestRecord> helper =
+ JavaRecordBuilderFactory.create(TestRecord.class, fields);
+
+ JavaRecordBuilderFactory<TestRecord>.JavaRecordBuilder builder =
helper.newBuilder();
+ builder.setField(0, "4");
+ builder.setField(1, 2);
+ builder.setField(2, "3");
+ builder.setField(3, 1);
+
+ assertThat(builder.build()).isEqualTo(new TestRecord(1, 2, "3", "4"));
+ }
+
+ @Test
+ void testMissingRequiredField() {
+ JavaRecordBuilderFactory<TestRecord> helper =
+ JavaRecordBuilderFactory.create(TestRecord.class, fields);
+ JavaRecordBuilderFactory<TestRecord>.JavaRecordBuilder builder =
helper.newBuilder();
+
+ builder.setField(0, 50);
+ // Do not set required param 1
+
+ assertThatThrownBy(builder::build)
+ .hasMessage("Could not instantiate record")
+ .hasCause(new IllegalArgumentException());
+ }
+}
diff --git
a/flink-core/src/test/resources/pojo-serializer-record-migration-1.19/serializer-snapshot
b/flink-core/src/test/resources/pojo-serializer-record-migration-1.19/serializer-snapshot
new file mode 100644
index 00000000000..ef1b4cf9203
Binary files /dev/null and
b/flink-core/src/test/resources/pojo-serializer-record-migration-1.19/serializer-snapshot
differ
diff --git
a/flink-core/src/test/resources/pojo-serializer-record-migration-1.19/test-data
b/flink-core/src/test/resources/pojo-serializer-record-migration-1.19/test-data
new file mode 100644
index 00000000000..c817b845cdb
Binary files /dev/null and
b/flink-core/src/test/resources/pojo-serializer-record-migration-1.19/test-data
differ
diff --git
a/flink-core/src/test/resources/pojo-serializer-to-record-1.19/serializer-snapshot
b/flink-core/src/test/resources/pojo-serializer-to-record-1.19/serializer-snapshot
new file mode 100644
index 00000000000..ab14aa6a500
Binary files /dev/null and
b/flink-core/src/test/resources/pojo-serializer-to-record-1.19/serializer-snapshot
differ
diff --git
a/flink-core/src/test/resources/pojo-serializer-to-record-1.19/test-data
b/flink-core/src/test/resources/pojo-serializer-to-record-1.19/test-data
new file mode 100644
index 00000000000..c817b845cdb
Binary files /dev/null and
b/flink-core/src/test/resources/pojo-serializer-to-record-1.19/test-data differ
diff --git a/pom.xml b/pom.xml
index 66ceba2669c..dedac06e23e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -207,6 +207,7 @@ under the License.
<!-- Can be set to any value to reproduce a specific build. -->
<test.randomization.seed/>
<test.unit.pattern>**/*Test.*</test.unit.pattern>
+ <test.exclusion.pattern>**/Java17*.java</test.exclusion.pattern>
</properties>
<dependencies>
@@ -1104,6 +1105,12 @@ under the License.
<profile>
<id>java17-target</id>
+
+ <!-- Include Java 17 specific tests (by not excluding
them) -->
+ <properties>
+
<test.exclusion.pattern>nothing</test.exclusion.pattern>
+ </properties>
+
<build>
<plugins>
<plugin>
@@ -2060,6 +2067,9 @@ under the License.
<!-- Prevents
recompilation due to missing package-info.class, see MCOMPILER-205 -->
<arg>-Xpkginfo:always</arg>
</compilerArgs>
+ <testExcludes>
+
<testExclude>${test.exclusion.pattern}</testExclude>
+ </testExcludes>
</configuration>
</plugin>
diff --git a/tools/maven/suppressions-core.xml
b/tools/maven/suppressions-core.xml
index dc6e275dc62..fffe480a956 100644
--- a/tools/maven/suppressions-core.xml
+++ b/tools/maven/suppressions-core.xml
@@ -121,4 +121,7 @@ under the License.
<suppress
files="(.*)test[/\\](.*)testutils[/\\](.*)"
checks="RedundantModifier|JavadocParagraph|JavadocType|JavadocStyle|StaticVariableNameCheck|LocalFinalVariableName|EmptyLineSeparator"/>
+
+ <!-- Suppress for Java17-specific test classes where Java records are
used. -->
+ <suppress files="(.*)test[/\\].*[/\\]Java17.*.java"
checks="MethodNameCheck"/>
</suppressions>