[
https://issues.apache.org/jira/browse/FLINK-23835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17409530#comment-17409530
]
Matthias commented on FLINK-23835:
----------------------------------
The following job was used:
{code:java}
final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
tableEnv.createTemporaryTable("T1",
TableDescriptor.forConnector("upsert-kafka")
.schema(Schema.newBuilder()
.column("pk", DataTypes.INT().notNull())
.column("x", DataTypes.STRING().notNull())
.primaryKey("pk")
.build())
.option("topic", "flink-23850-in1")
.option("properties.bootstrap.servers",
FLINK23850Utils.BOOTSTRAP_SERVERS)
.option("key.format", "csv")
.option("value.format", "csv")
.build());
tableEnv.createTemporaryTable("T2",
TableDescriptor.forConnector("upsert-kafka")
.schema(Schema.newBuilder()
.column("pk", DataTypes.INT().notNull())
.column("y", DataTypes.STRING().notNull())
.column("some_value", DataTypes.DOUBLE().notNull())
.primaryKey("pk")
.build())
.option("topic", "flink-23850-in2")
.option("properties.bootstrap.servers",
FLINK23850Utils.BOOTSTRAP_SERVERS)
.option("key.format", "csv")
.option("value.format", "csv")
.build());
tableEnv.createTemporaryTable("T3",
TableDescriptor.forConnector("upsert-kafka")
.schema(Schema.newBuilder()
.column("pk1", DataTypes.STRING().notNull())
.column("pk2", DataTypes.STRING().notNull())
.column("some_other_value",
DataTypes.DOUBLE().notNull())
.primaryKey("pk1", "pk2")
.build())
.option("topic", "flink-23850-in3")
.option("properties.bootstrap.servers",
FLINK23850Utils.BOOTSTRAP_SERVERS)
.option("key.format", "csv")
.option("value.format", "csv")
.build());
final Table resultTable =
tableEnv.sqlQuery(
"SELECT "
+ "T1.pk, "
+ "T2.some_value * T3.some_other_value, "
+ "T3.pk1, "
+ "T3.pk2 "
+ "FROM T1 "
+ "LEFT JOIN T2 on T1.pk = T2.pk "
+ "LEFT JOIN T3 ON T1.x = T3.pk1 AND T2.y = T3.pk2");
tableEnv.createTemporaryTable("T4",
TableDescriptor.forConnector("upsert-kafka")
.schema(Schema.newBuilder()
.column("pk", DataTypes.INT().notNull())
.column("some_calculated_value", DataTypes.DOUBLE())
.column("pk1", DataTypes.STRING())
.column("pk2", DataTypes.STRING())
.primaryKey("pk")
.build())
.option("topic", "flink-23850-out")
.option("properties.bootstrap.servers",
FLINK23850Utils.BOOTSTRAP_SERVERS)
.option("key.format", "csv")
.option("value.format", "csv")
.build());
resultTable.executeInsert("T4");
{code}
> Test upsert sink with upsert keys
> ---------------------------------
>
> Key: FLINK-23835
> URL: https://issues.apache.org/jira/browse/FLINK-23835
> Project: Flink
> Issue Type: Improvement
> Components: Tests
> Reporter: Jingsong Lee
> Assignee: Timo Walther
> Priority: Major
> Fix For: 1.14.0
>
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)