This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new be24d8fff6c [FLINK-39008][table-common] Support record types natively 
in DataTypeExtractor
be24d8fff6c is described below

commit be24d8fff6c97d7a2561deee5be4ebc1b92095af
Author: Timo Walther <[email protected]>
AuthorDate: Tue Feb 3 11:29:32 2026 +0100

    [FLINK-39008][table-common] Support record types natively in 
DataTypeExtractor
    
    This closes #27500.
---
 .../table/types/extraction/ExtractionUtils.java    |  75 ++++++++++++--
 flink-tests-java17/pom.xml                         |  15 +++
 .../extraction/DataTypeExtractorJava17Test.java    | 109 +++++++++++++++++++++
 3 files changed, 193 insertions(+), 6 deletions(-)

diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/ExtractionUtils.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/ExtractionUtils.java
index e7be9df3d57..c788a71905e 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/ExtractionUtils.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/ExtractionUtils.java
@@ -718,7 +718,7 @@ public final class ExtractionUtils {
                 continue;
             }
             final List<String> parameterNames =
-                    extractConstructorParameterNames(constructor, fields);
+                    extractConstructorParameterNames(clazz, constructor, 
fields);
             if (parameterNames != null) {
                 if (foundConstructor != null) {
                     throw extractionError(
@@ -741,14 +741,19 @@ public final class ExtractionUtils {
      * matching (possibly primitive and lenient) type and name.
      */
     private static @Nullable List<String> extractConstructorParameterNames(
-            Constructor<?> constructor, List<Field> fields) {
-        final Type[] parameterTypes = constructor.getGenericParameterTypes();
-
-        List<String> parameterNames = extractExecutableNames(constructor);
+            Class<?> clazz, Constructor<?> constructor, List<Field> fields) {
+        // Java Records are preferred as they have perfect field order and 
naming
+        List<String> parameterNames = extractRecordComponentNames(clazz);
         if (parameterNames == null) {
-            return null;
+            // Fallback to inspecting an all-arg constructor
+            parameterNames = extractExecutableNames(constructor);
+            if (parameterNames == null) {
+                return null;
+            }
         }
 
+        final Type[] parameterTypes = constructor.getGenericParameterTypes();
+
         final Map<String, Field> fieldMap =
                 fields.stream()
                         .collect(
@@ -777,6 +782,64 @@ public final class ExtractionUtils {
         return fieldNames;
     }
 
+    // We check Java record methods reflectively
+    // so the code compiles on Java 11
+    private static @Nullable Method isRecordMethod;
+    private static @Nullable Method getRecordComponentsMethod;
+    private static @Nullable Method getRecordComponentNameMethod;
+
+    static {
+        try {
+            isRecordMethod = Class.class.getMethod("isRecord");
+            getRecordComponentsMethod = 
Class.class.getMethod("getRecordComponents");
+            getRecordComponentNameMethod =
+                    
Class.forName("java.lang.reflect.RecordComponent").getMethod("getName");
+        } catch (Throwable t) {
+            // For older Java version or to catch any kind of reflective issues
+            isRecordMethod = null;
+            getRecordComponentsMethod = null;
+            getRecordComponentNameMethod = null;
+        }
+    }
+
+    /**
+     * Returns the record component names for the given class if the current 
JDK supports records
+     * and the given class is actually a record.
+     */
+    static @Nullable List<String> extractRecordComponentNames(Class<?> type) {
+        if (isRecordMethod == null
+                || getRecordComponentsMethod == null
+                || getRecordComponentNameMethod == null) {
+            return null;
+        }
+        try {
+            final boolean isRecord = (Boolean) isRecordMethod.invoke(type);
+            if (!isRecord) {
+                return null;
+            }
+            final Object componentsArray = 
getRecordComponentsMethod.invoke(type);
+            if (componentsArray == null) {
+                return null;
+            }
+
+            final int length = Array.getLength(componentsArray);
+            final List<String> names = new ArrayList<>(length);
+            for (int i = 0; i < length; i++) {
+                final Object component = Array.get(componentsArray, i);
+                if (component == null) {
+                    continue;
+                }
+                final String name = (String) 
getRecordComponentNameMethod.invoke(component);
+                names.add(name);
+            }
+
+            return names;
+        } catch (Throwable t) {
+            // Catch any kind of reflective issues
+            return null;
+        }
+    }
+
     @VisibleForTesting
     static @Nullable List<String> extractExecutableNames(Executable 
executable) {
         final int offset;
diff --git a/flink-tests-java17/pom.xml b/flink-tests-java17/pom.xml
index 8facb299517..7719823dcfd 100644
--- a/flink-tests-java17/pom.xml
+++ b/flink-tests-java17/pom.xml
@@ -63,6 +63,21 @@
                        <scope>test</scope>
                </dependency>
 
+               <!-- For Table API Java record type extraction -->
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-table-common</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-table-common</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+                       <type>test-jar</type>
+               </dependency>
+
                <!-- Base testing tools -->
                <dependency>
                        <groupId>org.apache.flink</groupId>
diff --git 
a/flink-tests-java17/src/test/java/org/apache/flink/table/types/extraction/DataTypeExtractorJava17Test.java
 
b/flink-tests-java17/src/test/java/org/apache/flink/table/types/extraction/DataTypeExtractorJava17Test.java
new file mode 100644
index 00000000000..e770ae41ab8
--- /dev/null
+++ 
b/flink-tests-java17/src/test/java/org/apache/flink/table/types/extraction/DataTypeExtractorJava17Test.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.extraction;
+
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.types.extraction.DataTypeExtractorTest.TestSpec;
+
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.stream.Stream;
+
+import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
+import static 
org.apache.flink.table.types.extraction.DataTypeExtractorTest.runExtraction;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/**
+ * Tests for {@link DataTypeExtractor} on Java record classes.
+ *
+ * <p>Move this to {@link DataTypeExtractorTest} once we support Java 17 
language level.
+ */
+class DataTypeExtractorJava17Test {
+
+    private static Stream<TestSpec> testData() {
+        return Stream.of(
+                TestSpec.forType(PojoByRecord.class)
+                        .expectDataType(
+                                DataTypes.STRUCTURED(
+                                        PojoByRecord.class,
+                                        DataTypes.FIELD(
+                                                "record1",
+                                                
DataTypes.INT().notNull().bridgedTo(int.class)),
+                                        DataTypes.FIELD("record2", 
DataTypes.STRING()),
+                                        DataTypes.FIELD(
+                                                "record3",
+                                                DataTypes.DOUBLE()
+                                                        .notNull()
+                                                        
.bridgedTo(double.class)),
+                                        DataTypes.FIELD(
+                                                "nestedRecord",
+                                                DataTypes.STRUCTURED(
+                                                        
PojoByRecordNested.class,
+                                                        DataTypes.FIELD(
+                                                                "nested1",
+                                                                DataTypes.INT()
+                                                                        
.notNull()
+                                                                        
.bridgedTo(int.class)),
+                                                        DataTypes.FIELD(
+                                                                "nested2",
+                                                                
DataTypes.VARCHAR(200)))))));
+    }
+
+    @ParameterizedTest(name = "{index}: {0}")
+    @MethodSource("testData")
+    void testExtraction(TestSpec testSpec) {
+        if (testSpec.expectedErrorMessage != null) {
+            assertThatThrownBy(() -> runExtraction(testSpec))
+                    .isInstanceOf(ValidationException.class)
+                    .satisfies(
+                            anyCauseMatches(
+                                    ValidationException.class, 
testSpec.expectedErrorMessage));
+        } else {
+            runExtraction(testSpec);
+        }
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Test utilities
+    // 
--------------------------------------------------------------------------------------------
+
+    /** Basic test record. */
+    public record PojoByRecord(
+            int record1, String record2, double record3, PojoByRecordNested 
nestedRecord) {}
+
+    /** Complex test record. */
+    public record PojoByRecordNested(
+            int nested1,
+            // Record signature has precedence over the
+            // custom constructor below
+            @DataTypeHint("VARCHAR(200)") String nested2) {
+
+        public PojoByRecordNested(int nested1, String nested2) {
+            this.nested1 = nested1;
+            this.nested2 = nested2;
+        }
+
+        public PojoByRecordNested(int nested1) {
+            this(nested1, "UNKNOWN");
+        }
+    }
+}

Reply via email to