[ 
https://issues.apache.org/jira/browse/FLINK-23850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17409453#comment-17409453
 ] 

Matthias commented on FLINK-23850:
----------------------------------

I created a data generator to emit random data and ran the following job:
{code:java}
final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        tableEnv.createTemporaryTable("T1",
                TableDescriptor.forConnector("upsert-kafka")
                        .schema(Schema.newBuilder()
                                .column("pk", DataTypes.STRING().notNull())
                                .column("x", DataTypes.STRING().notNull())
                                .primaryKey("pk")
                                .build())
                        .option("topic", "flink-23850-in1")
                        .option("properties.bootstrap.servers", 
FLINK23850Utils.BOOTSTRAP_SERVERS)
                        .option("key.format", "csv")
                        .option("value.format", "csv")
                        .build());

        tableEnv.createTemporaryTable("T2",
                TableDescriptor.forConnector("upsert-kafka")
                        .schema(Schema.newBuilder()
                                .column("pk", DataTypes.STRING().notNull())
                                .column("y", DataTypes.STRING().notNull())
                                .column("some_value", 
DataTypes.STRING().notNull())
                                .primaryKey("pk")
                                .build())
                        .option("topic", "flink-23850-in2")
                        .option("properties.bootstrap.servers", 
FLINK23850Utils.BOOTSTRAP_SERVERS)
                        .option("key.format", "csv")
                        .option("value.format", "csv")
                        .build());

        tableEnv.createTemporaryTable("T3",
                TableDescriptor.forConnector("upsert-kafka")
                        .schema(Schema.newBuilder()
                                .column("pk1", DataTypes.STRING().notNull())
                                .column("pk2", DataTypes.STRING().notNull())
                                .column("some_other_value", 
DataTypes.STRING().notNull())
                                .primaryKey("pk1", "pk2")
                                .build())
                        .option("topic", "flink-23850-in3")
                        .option("properties.bootstrap.servers", 
FLINK23850Utils.BOOTSTRAP_SERVERS)
                        .option("key.format", "csv")
                        .option("value.format", "csv")
                        .build());

        final Table resultTable =
                tableEnv.sqlQuery(
                        "SELECT "
                                + "T1.pk, "
                                + "T2.some_value, "
                                + "T3.pk1, "
                                + "T3.pk2 "
                                + "FROM T1 "
                                + "LEFT JOIN T2 on T1.pk = T2.pk "
                                + "LEFT JOIN T3 ON T1.x = T3.pk1 AND T2.y = 
T3.pk2");

        tableEnv.createTemporaryTable("T4",
                TableDescriptor.forConnector("upsert-kafka")
                        .schema(Schema.newBuilder()
                                .column("pk", DataTypes.STRING().notNull())
                                .column("some_calculated_value", 
DataTypes.STRING())
                                .column("pk1", DataTypes.STRING())
                                .column("pk2", DataTypes.STRING())
                                .primaryKey("pk")
                                .build())
                        .option("topic", "flink-23850-out")
                        .option("properties.bootstrap.servers", 
FLINK23850Utils.BOOTSTRAP_SERVERS)
                        .option("key.format", "csv")
                        .option("value.format", "csv")
                        .build());

        resultTable.executeInsert("T4");
{code}

I played around with the configuration:
* parallelism: 4 and 6
* restart strategy: fixed delay of 2000ms with 20 retries
* enabling checkpointing with {{at-least-once-strategy}}

I stopped and restarted TaskManagers without any problems popping up (besides 
Exceptions related to the missing TaskManagers)

> Test Kafka table connector with new runtime provider
> ----------------------------------------------------
>
>                 Key: FLINK-23850
>                 URL: https://issues.apache.org/jira/browse/FLINK-23850
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Connectors / Kafka
>    Affects Versions: 1.14.0
>            Reporter: Qingsheng Ren
>            Assignee: Matthias
>            Priority: Blocker
>              Labels: release-testing
>             Fix For: 1.14.0
>
>
> The runtime provider of Kafka table connector has been replaced with new 
> KafkaSource and KafkaSink. The table connector requires to be tested to make 
> sure nothing is surprised to Table/SQL API users. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to