This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.13 by this push:
     new 4b682e8  [FLINK-23920][table-common] Keep primary key when inferring 
schema with SchemaTranslator
4b682e8 is described below

commit 4b682e88d093c2d610f00e0c827a48c7408ee83d
Author: Timo Walther <[email protected]>
AuthorDate: Mon Aug 23 16:04:44 2021 +0200

    [FLINK-23920][table-common] Keep primary key when inferring schema with 
SchemaTranslator
    
    This closes #16944.
---
 .../org/apache/flink/table/catalog/ExternalSchemaTranslator.java  | 7 ++++---
 .../apache/flink/table/catalog/ExternalSchemaTranslatorTest.java  | 8 +++++---
 2 files changed, 9 insertions(+), 6 deletions(-)

diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ExternalSchemaTranslator.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ExternalSchemaTranslator.java
index 5c47c88..7190cd1 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ExternalSchemaTranslator.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ExternalSchemaTranslator.java
@@ -66,7 +66,8 @@ public final class ExternalSchemaTranslator {
      *
      * <ul>
      *   <li>1. Derive physical columns from the input schema.
-     *   <li>2. Derive physical columns from the input schema but enrich with 
metadata column.
+     *   <li>2. Derive physical columns from the input schema but enrich with 
metadata column and
+     *       primary key.
      *   <li>3. Entirely use declared schema.
      * </ul>
      */
@@ -85,7 +86,7 @@ public final class ExternalSchemaTranslator {
         final List<UnresolvedColumn> declaredColumns = 
declaredSchema.getColumns();
 
         // the declared schema does not contain physical information,
-        // thus, it only replaces physical columns with metadata rowtime
+        // thus, it only replaces physical columns with metadata rowtime or 
adds a primary key
         if 
(declaredColumns.stream().noneMatch(ExternalSchemaTranslator::isPhysical)) {
             // go through data type to erase time attributes
             final DataType sourceDataType = inputSchema.toSourceRowDataType();
@@ -93,7 +94,7 @@ public final class ExternalSchemaTranslator {
                     patchDataTypeWithoutMetadataRowtime(sourceDataType, 
declaredColumns);
             final Schema.Builder builder = Schema.newBuilder();
             builder.fromRowDataType(physicalDataType);
-            builder.fromColumns(declaredColumns);
+            builder.fromSchema(declaredSchema);
             return new OutputResult(null, builder.build(), null);
         }
 
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/ExternalSchemaTranslatorTest.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/ExternalSchemaTranslatorTest.java
index 7c565d0..3fdd9dc 100644
--- 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/ExternalSchemaTranslatorTest.java
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/ExternalSchemaTranslatorTest.java
@@ -312,10 +312,10 @@ public class ExternalSchemaTranslatorTest {
     }
 
     @Test
-    public void testOutputToMetadataSchema() {
+    public void testOutputToPartialSchema() {
         final ResolvedSchema tableSchema =
                 ResolvedSchema.of(
-                        Column.physical("id", DataTypes.BIGINT()),
+                        Column.physical("id", DataTypes.BIGINT().notNull()),
                         Column.physical("name", DataTypes.STRING()),
                         Column.metadata("rowtime", DataTypes.TIMESTAMP_LTZ(3), 
null, false));
 
@@ -325,14 +325,16 @@ public class ExternalSchemaTranslatorTest {
                         Schema.newBuilder()
                                 .columnByExpression("computed", "f1 + 42")
                                 .columnByMetadata("rowtime", 
DataTypes.TIMESTAMP_LTZ(3))
+                                .primaryKey("id")
                                 .build());
 
         assertEquals(
                 Schema.newBuilder()
-                        .column("id", DataTypes.BIGINT())
+                        .column("id", DataTypes.BIGINT().notNull())
                         .column("name", DataTypes.STRING())
                         .columnByExpression("computed", "f1 + 42")
                         .columnByMetadata("rowtime", 
DataTypes.TIMESTAMP_LTZ(3)) // becomes metadata
+                        .primaryKey("id")
                         .build(),
                 result.getSchema());
     }

Reply via email to