This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch release-1.20
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.20 by this push:
     new 7bd8ba4913a [FLINK-37557] Fix ResolvedSchema#getPrimaryKeyIndexes
7bd8ba4913a is described below

commit 7bd8ba4913a92293285ca44bdbd1621c9740aa22
Author: Dawid Wysakowicz <dwysakow...@apache.org>
AuthorDate: Tue Mar 25 14:10:26 2025 +0100

    [FLINK-37557] Fix ResolvedSchema#getPrimaryKeyIndexes
---
 .../apache/flink/table/catalog/SchemaResolutionTest.java   | 14 ++++++++++++++
 .../org/apache/flink/table/catalog/ResolvedSchema.java     | 11 +++++++++--
 2 files changed, 23 insertions(+), 2 deletions(-)

diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaResolutionTest.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaResolutionTest.java
index 75f6fce9d3b..5d5c4975ef5 100644
--- 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaResolutionTest.java
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaResolutionTest.java
@@ -427,6 +427,20 @@ class SchemaResolutionTest {
                 .isFalse();
     }
 
+    @Test
+    void testPrimaryKeyIndices() {
+        final ResolvedSchema resolvedSchema =
+                resolveSchema(
+                        Schema.newBuilder()
+                                .columnByMetadata("orig_ts", 
DataTypes.TIMESTAMP(3), "timestamp")
+                                .column("id", DataTypes.INT().notNull())
+                                .column("counter", DataTypes.INT().notNull())
+                                .primaryKey("id")
+                                .build());
+
+        assertThat(resolvedSchema.getPrimaryKeyIndexes()).isEqualTo(new int[] 
{0});
+    }
+
     // 
--------------------------------------------------------------------------------------------
 
     private static void testError(Schema schema, String errorMessage) {
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedSchema.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedSchema.java
index 5fab67459b1..f5285f5f428 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedSchema.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedSchema.java
@@ -163,9 +163,16 @@ public final class ResolvedSchema {
         return Optional.ofNullable(primaryKey);
     }
 
-    /** Returns the primary key indexes, if any, otherwise returns an empty 
array. */
+    /**
+     * Returns the primary key indexes in the {@link 
#toPhysicalRowDataType()}, if any, otherwise
+     * returns an empty array.
+     */
     public int[] getPrimaryKeyIndexes() {
-        final List<String> columns = getColumnNames();
+        final List<String> columns =
+                getColumns().stream()
+                        .filter(Column::isPhysical)
+                        .map(Column::getName)
+                        .collect(Collectors.toList());
         return getPrimaryKey()
                 .map(UniqueConstraint::getColumns)
                 .map(pkColumns -> 
pkColumns.stream().mapToInt(columns::indexOf).toArray())

Reply via email to