This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch release-1.20 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.20 by this push: new 7bd8ba4913a [FLINK-37557] Fix ResolvedSchema#getPrimaryKeyIndexes 7bd8ba4913a is described below commit 7bd8ba4913a92293285ca44bdbd1621c9740aa22 Author: Dawid Wysakowicz <dwysakow...@apache.org> AuthorDate: Tue Mar 25 14:10:26 2025 +0100 [FLINK-37557] Fix ResolvedSchema#getPrimaryKeyIndexes --- .../apache/flink/table/catalog/SchemaResolutionTest.java | 14 ++++++++++++++ .../org/apache/flink/table/catalog/ResolvedSchema.java | 11 +++++++++-- 2 files changed, 23 insertions(+), 2 deletions(-) diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaResolutionTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaResolutionTest.java index 75f6fce9d3b..5d5c4975ef5 100644 --- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaResolutionTest.java +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaResolutionTest.java @@ -427,6 +427,20 @@ class SchemaResolutionTest { .isFalse(); } + @Test + void testPrimaryKeyIndices() { + final ResolvedSchema resolvedSchema = + resolveSchema( + Schema.newBuilder() + .columnByMetadata("orig_ts", DataTypes.TIMESTAMP(3), "timestamp") + .column("id", DataTypes.INT().notNull()) + .column("counter", DataTypes.INT().notNull()) + .primaryKey("id") + .build()); + + assertThat(resolvedSchema.getPrimaryKeyIndexes()).isEqualTo(new int[] {0}); + } + // -------------------------------------------------------------------------------------------- private static void testError(Schema schema, String errorMessage) { diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedSchema.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedSchema.java index 5fab67459b1..f5285f5f428 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedSchema.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedSchema.java @@ -163,9 +163,16 @@ public final class ResolvedSchema { return Optional.ofNullable(primaryKey); } - /** Returns the primary key indexes, if any, otherwise returns an empty array. */ + /** + * Returns the primary key indexes in the {@link #toPhysicalRowDataType()}, if any, otherwise + * returns an empty array. + */ public int[] getPrimaryKeyIndexes() { - final List<String> columns = getColumnNames(); + final List<String> columns = + getColumns().stream() + .filter(Column::isPhysical) + .map(Column::getName) + .collect(Collectors.toList()); return getPrimaryKey() .map(UniqueConstraint::getColumns) .map(pkColumns -> pkColumns.stream().mapToInt(columns::indexOf).toArray())