[ 
https://issues.apache.org/jira/browse/FLINK-23835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17409530#comment-17409530
 ] 

Matthias commented on FLINK-23835:
----------------------------------

The following job was used:
{code:java}
final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

tableEnv.createTemporaryTable("T1",
        TableDescriptor.forConnector("upsert-kafka")
                .schema(Schema.newBuilder()
                        .column("pk", DataTypes.INT().notNull())
                        .column("x", DataTypes.STRING().notNull())
                        .primaryKey("pk")
                        .build())
                .option("topic", "flink-23850-in1")
                .option("properties.bootstrap.servers", 
FLINK23850Utils.BOOTSTRAP_SERVERS)
                .option("key.format", "csv")
                .option("value.format", "csv")
                .build());

tableEnv.createTemporaryTable("T2",
        TableDescriptor.forConnector("upsert-kafka")
                .schema(Schema.newBuilder()
                        .column("pk", DataTypes.INT().notNull())
                        .column("y", DataTypes.STRING().notNull())
                        .column("some_value", DataTypes.DOUBLE().notNull())
                        .primaryKey("pk")
                        .build())
                .option("topic", "flink-23850-in2")
                .option("properties.bootstrap.servers", 
FLINK23850Utils.BOOTSTRAP_SERVERS)
                .option("key.format", "csv")
                .option("value.format", "csv")
                .build());

tableEnv.createTemporaryTable("T3",
        TableDescriptor.forConnector("upsert-kafka")
                .schema(Schema.newBuilder()
                        .column("pk1", DataTypes.STRING().notNull())
                        .column("pk2", DataTypes.STRING().notNull())
                        .column("some_other_value", 
DataTypes.DOUBLE().notNull())
                        .primaryKey("pk1", "pk2")
                        .build())
                .option("topic", "flink-23850-in3")
                .option("properties.bootstrap.servers", 
FLINK23850Utils.BOOTSTRAP_SERVERS)
                .option("key.format", "csv")
                .option("value.format", "csv")
                .build());

final Table resultTable =
        tableEnv.sqlQuery(
                "SELECT "
                        + "T1.pk, "
                        + "T2.some_value * T3.some_other_value, "
                        + "T3.pk1, "
                        + "T3.pk2 "
                        + "FROM T1 "
                        + "LEFT JOIN T2 on T1.pk = T2.pk "
                        + "LEFT JOIN T3 ON T1.x = T3.pk1 AND T2.y = T3.pk2");

tableEnv.createTemporaryTable("T4",
        TableDescriptor.forConnector("upsert-kafka")
                .schema(Schema.newBuilder()
                        .column("pk", DataTypes.INT().notNull())
                        .column("some_calculated_value", DataTypes.DOUBLE())
                        .column("pk1", DataTypes.STRING())
                        .column("pk2", DataTypes.STRING())
                        .primaryKey("pk")
                        .build())
                .option("topic", "flink-23850-out")
                .option("properties.bootstrap.servers", 
FLINK23850Utils.BOOTSTRAP_SERVERS)
                .option("key.format", "csv")
                .option("value.format", "csv")
                .build());

resultTable.executeInsert("T4");
{code}

> Test upsert sink with upsert keys
> ---------------------------------
>
>                 Key: FLINK-23835
>                 URL: https://issues.apache.org/jira/browse/FLINK-23835
>             Project: Flink
>          Issue Type: Improvement
>          Components: Tests
>            Reporter: Jingsong Lee
>            Assignee: Timo Walther
>            Priority: Major
>             Fix For: 1.14.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to