This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.13 by this push:
new 4b682e8 [FLINK-23920][table-common] Keep primary key when inferring
schema with SchemaTranslator
4b682e8 is described below
commit 4b682e88d093c2d610f00e0c827a48c7408ee83d
Author: Timo Walther <[email protected]>
AuthorDate: Mon Aug 23 16:04:44 2021 +0200
[FLINK-23920][table-common] Keep primary key when inferring schema with
SchemaTranslator
This closes #16944.
---
.../org/apache/flink/table/catalog/ExternalSchemaTranslator.java | 7 ++++---
.../apache/flink/table/catalog/ExternalSchemaTranslatorTest.java | 8 +++++---
2 files changed, 9 insertions(+), 6 deletions(-)
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ExternalSchemaTranslator.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ExternalSchemaTranslator.java
index 5c47c88..7190cd1 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ExternalSchemaTranslator.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ExternalSchemaTranslator.java
@@ -66,7 +66,8 @@ public final class ExternalSchemaTranslator {
*
* <ul>
* <li>1. Derive physical columns from the input schema.
- * <li>2. Derive physical columns from the input schema but enrich with
metadata column.
+ * <li>2. Derive physical columns from the input schema but enrich with
metadata column and
+ * primary key.
* <li>3. Entirely use declared schema.
* </ul>
*/
@@ -85,7 +86,7 @@ public final class ExternalSchemaTranslator {
final List<UnresolvedColumn> declaredColumns =
declaredSchema.getColumns();
// the declared schema does not contain physical information,
- // thus, it only replaces physical columns with metadata rowtime
+ // thus, it only replaces physical columns with metadata rowtime or
adds a primary key
if
(declaredColumns.stream().noneMatch(ExternalSchemaTranslator::isPhysical)) {
// go through data type to erase time attributes
final DataType sourceDataType = inputSchema.toSourceRowDataType();
@@ -93,7 +94,7 @@ public final class ExternalSchemaTranslator {
patchDataTypeWithoutMetadataRowtime(sourceDataType,
declaredColumns);
final Schema.Builder builder = Schema.newBuilder();
builder.fromRowDataType(physicalDataType);
- builder.fromColumns(declaredColumns);
+ builder.fromSchema(declaredSchema);
return new OutputResult(null, builder.build(), null);
}
diff --git
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/ExternalSchemaTranslatorTest.java
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/ExternalSchemaTranslatorTest.java
index 7c565d0..3fdd9dc 100644
---
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/ExternalSchemaTranslatorTest.java
+++
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/ExternalSchemaTranslatorTest.java
@@ -312,10 +312,10 @@ public class ExternalSchemaTranslatorTest {
}
@Test
- public void testOutputToMetadataSchema() {
+ public void testOutputToPartialSchema() {
final ResolvedSchema tableSchema =
ResolvedSchema.of(
- Column.physical("id", DataTypes.BIGINT()),
+ Column.physical("id", DataTypes.BIGINT().notNull()),
Column.physical("name", DataTypes.STRING()),
Column.metadata("rowtime", DataTypes.TIMESTAMP_LTZ(3),
null, false));
@@ -325,14 +325,16 @@ public class ExternalSchemaTranslatorTest {
Schema.newBuilder()
.columnByExpression("computed", "f1 + 42")
.columnByMetadata("rowtime",
DataTypes.TIMESTAMP_LTZ(3))
+ .primaryKey("id")
.build());
assertEquals(
Schema.newBuilder()
- .column("id", DataTypes.BIGINT())
+ .column("id", DataTypes.BIGINT().notNull())
.column("name", DataTypes.STRING())
.columnByExpression("computed", "f1 + 42")
.columnByMetadata("rowtime",
DataTypes.TIMESTAMP_LTZ(3)) // becomes metadata
+ .primaryKey("id")
.build(),
result.getSchema());
}