This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new be24d8fff6c [FLINK-39008][table-common] Support record types natively
in DataTypeExtractor
be24d8fff6c is described below
commit be24d8fff6c97d7a2561deee5be4ebc1b92095af
Author: Timo Walther <[email protected]>
AuthorDate: Tue Feb 3 11:29:32 2026 +0100
[FLINK-39008][table-common] Support record types natively in
DataTypeExtractor
This closes #27500.
---
.../table/types/extraction/ExtractionUtils.java | 75 ++++++++++++--
flink-tests-java17/pom.xml | 15 +++
.../extraction/DataTypeExtractorJava17Test.java | 109 +++++++++++++++++++++
3 files changed, 193 insertions(+), 6 deletions(-)
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/ExtractionUtils.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/ExtractionUtils.java
index e7be9df3d57..c788a71905e 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/ExtractionUtils.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/ExtractionUtils.java
@@ -718,7 +718,7 @@ public final class ExtractionUtils {
continue;
}
final List<String> parameterNames =
- extractConstructorParameterNames(constructor, fields);
+ extractConstructorParameterNames(clazz, constructor,
fields);
if (parameterNames != null) {
if (foundConstructor != null) {
throw extractionError(
@@ -741,14 +741,19 @@ public final class ExtractionUtils {
* matching (possibly primitive and lenient) type and name.
*/
private static @Nullable List<String> extractConstructorParameterNames(
- Constructor<?> constructor, List<Field> fields) {
- final Type[] parameterTypes = constructor.getGenericParameterTypes();
-
- List<String> parameterNames = extractExecutableNames(constructor);
+ Class<?> clazz, Constructor<?> constructor, List<Field> fields) {
+ // Java Records are preferred as they have perfect field order and
naming
+ List<String> parameterNames = extractRecordComponentNames(clazz);
if (parameterNames == null) {
- return null;
+ // Fallback to inspecting an all-arg constructor
+ parameterNames = extractExecutableNames(constructor);
+ if (parameterNames == null) {
+ return null;
+ }
}
+ final Type[] parameterTypes = constructor.getGenericParameterTypes();
+
final Map<String, Field> fieldMap =
fields.stream()
.collect(
@@ -777,6 +782,64 @@ public final class ExtractionUtils {
return fieldNames;
}
+ // We check Java record methods reflectively
+ // so the code compiles on Java 11
+ private static @Nullable Method isRecordMethod;
+ private static @Nullable Method getRecordComponentsMethod;
+ private static @Nullable Method getRecordComponentNameMethod;
+
+ static {
+ try {
+ isRecordMethod = Class.class.getMethod("isRecord");
+ getRecordComponentsMethod =
Class.class.getMethod("getRecordComponents");
+ getRecordComponentNameMethod =
+
Class.forName("java.lang.reflect.RecordComponent").getMethod("getName");
+ } catch (Throwable t) {
+ // For older Java version or to catch any kind of reflective issues
+ isRecordMethod = null;
+ getRecordComponentsMethod = null;
+ getRecordComponentNameMethod = null;
+ }
+ }
+
+ /**
+ * Returns the record component names for the given class if the current
JDK supports records
+ * and the given class is actually a record.
+ */
+ static @Nullable List<String> extractRecordComponentNames(Class<?> type) {
+ if (isRecordMethod == null
+ || getRecordComponentsMethod == null
+ || getRecordComponentNameMethod == null) {
+ return null;
+ }
+ try {
+ final boolean isRecord = (Boolean) isRecordMethod.invoke(type);
+ if (!isRecord) {
+ return null;
+ }
+ final Object componentsArray =
getRecordComponentsMethod.invoke(type);
+ if (componentsArray == null) {
+ return null;
+ }
+
+ final int length = Array.getLength(componentsArray);
+ final List<String> names = new ArrayList<>(length);
+ for (int i = 0; i < length; i++) {
+ final Object component = Array.get(componentsArray, i);
+ if (component == null) {
+ continue;
+ }
+ final String name = (String)
getRecordComponentNameMethod.invoke(component);
+ names.add(name);
+ }
+
+ return names;
+ } catch (Throwable t) {
+ // Catch any kind of reflective issues
+ return null;
+ }
+ }
+
@VisibleForTesting
static @Nullable List<String> extractExecutableNames(Executable
executable) {
final int offset;
diff --git a/flink-tests-java17/pom.xml b/flink-tests-java17/pom.xml
index 8facb299517..7719823dcfd 100644
--- a/flink-tests-java17/pom.xml
+++ b/flink-tests-java17/pom.xml
@@ -63,6 +63,21 @@
<scope>test</scope>
</dependency>
+ <!-- For Table API Java record type extraction -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-common</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-common</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+
<!-- Base testing tools -->
<dependency>
<groupId>org.apache.flink</groupId>
diff --git
a/flink-tests-java17/src/test/java/org/apache/flink/table/types/extraction/DataTypeExtractorJava17Test.java
b/flink-tests-java17/src/test/java/org/apache/flink/table/types/extraction/DataTypeExtractorJava17Test.java
new file mode 100644
index 00000000000..e770ae41ab8
--- /dev/null
+++
b/flink-tests-java17/src/test/java/org/apache/flink/table/types/extraction/DataTypeExtractorJava17Test.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.extraction;
+
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.types.extraction.DataTypeExtractorTest.TestSpec;
+
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.stream.Stream;
+
+import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
+import static
org.apache.flink.table.types.extraction.DataTypeExtractorTest.runExtraction;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/**
+ * Tests for {@link DataTypeExtractor} on Java record classes.
+ *
+ * <p>Move this to {@link DataTypeExtractorTest} once we support Java 17
language level.
+ */
+class DataTypeExtractorJava17Test {
+
+ private static Stream<TestSpec> testData() {
+ return Stream.of(
+ TestSpec.forType(PojoByRecord.class)
+ .expectDataType(
+ DataTypes.STRUCTURED(
+ PojoByRecord.class,
+ DataTypes.FIELD(
+ "record1",
+
DataTypes.INT().notNull().bridgedTo(int.class)),
+ DataTypes.FIELD("record2",
DataTypes.STRING()),
+ DataTypes.FIELD(
+ "record3",
+ DataTypes.DOUBLE()
+ .notNull()
+
.bridgedTo(double.class)),
+ DataTypes.FIELD(
+ "nestedRecord",
+ DataTypes.STRUCTURED(
+
PojoByRecordNested.class,
+ DataTypes.FIELD(
+ "nested1",
+ DataTypes.INT()
+
.notNull()
+
.bridgedTo(int.class)),
+ DataTypes.FIELD(
+ "nested2",
+
DataTypes.VARCHAR(200)))))));
+ }
+
+ @ParameterizedTest(name = "{index}: {0}")
+ @MethodSource("testData")
+ void testExtraction(TestSpec testSpec) {
+ if (testSpec.expectedErrorMessage != null) {
+ assertThatThrownBy(() -> runExtraction(testSpec))
+ .isInstanceOf(ValidationException.class)
+ .satisfies(
+ anyCauseMatches(
+ ValidationException.class,
testSpec.expectedErrorMessage));
+ } else {
+ runExtraction(testSpec);
+ }
+ }
+
+ //
--------------------------------------------------------------------------------------------
+ // Test utilities
+ //
--------------------------------------------------------------------------------------------
+
+ /** Basic test record. */
+ public record PojoByRecord(
+ int record1, String record2, double record3, PojoByRecordNested
nestedRecord) {}
+
+ /** Complex test record. */
+ public record PojoByRecordNested(
+ int nested1,
+ // Record signature has precedence over the
+ // custom constructor below
+ @DataTypeHint("VARCHAR(200)") String nested2) {
+
+ public PojoByRecordNested(int nested1, String nested2) {
+ this.nested1 = nested1;
+ this.nested2 = nested2;
+ }
+
+ public PojoByRecordNested(int nested1) {
+ this(nested1, "UNKNOWN");
+ }
+ }
+}