[ https://issues.apache.org/jira/browse/FLINK-23850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17409453#comment-17409453 ]
Matthias commented on FLINK-23850: ---------------------------------- I created a data generator to emit random data and ran the following job: {code:java} final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); tableEnv.createTemporaryTable("T1", TableDescriptor.forConnector("upsert-kafka") .schema(Schema.newBuilder() .column("pk", DataTypes.STRING().notNull()) .column("x", DataTypes.STRING().notNull()) .primaryKey("pk") .build()) .option("topic", "flink-23850-in1") .option("properties.bootstrap.servers", FLINK23850Utils.BOOTSTRAP_SERVERS) .option("key.format", "csv") .option("value.format", "csv") .build()); tableEnv.createTemporaryTable("T2", TableDescriptor.forConnector("upsert-kafka") .schema(Schema.newBuilder() .column("pk", DataTypes.STRING().notNull()) .column("y", DataTypes.STRING().notNull()) .column("some_value", DataTypes.STRING().notNull()) .primaryKey("pk") .build()) .option("topic", "flink-23850-in2") .option("properties.bootstrap.servers", FLINK23850Utils.BOOTSTRAP_SERVERS) .option("key.format", "csv") .option("value.format", "csv") .build()); tableEnv.createTemporaryTable("T3", TableDescriptor.forConnector("upsert-kafka") .schema(Schema.newBuilder() .column("pk1", DataTypes.STRING().notNull()) .column("pk2", DataTypes.STRING().notNull()) .column("some_other_value", DataTypes.STRING().notNull()) .primaryKey("pk1", "pk2") .build()) .option("topic", "flink-23850-in3") .option("properties.bootstrap.servers", FLINK23850Utils.BOOTSTRAP_SERVERS) .option("key.format", "csv") .option("value.format", "csv") .build()); final Table resultTable = tableEnv.sqlQuery( "SELECT " + "T1.pk, " + "T2.some_value, " + "T3.pk1, " + "T3.pk2 " + "FROM T1 " + "LEFT JOIN T2 on T1.pk = T2.pk " + "LEFT JOIN T3 ON T1.x = T3.pk1 AND T2.y = T3.pk2"); tableEnv.createTemporaryTable("T4", TableDescriptor.forConnector("upsert-kafka") .schema(Schema.newBuilder() .column("pk", DataTypes.STRING().notNull()) .column("some_calculated_value", DataTypes.STRING()) .column("pk1", DataTypes.STRING()) .column("pk2", DataTypes.STRING()) .primaryKey("pk") .build()) .option("topic", "flink-23850-out") .option("properties.bootstrap.servers", FLINK23850Utils.BOOTSTRAP_SERVERS) .option("key.format", "csv") .option("value.format", "csv") .build()); resultTable.executeInsert("T4"); {code} I played around with the configuration: * parallelism: 4 and 6 * restart strategy: fixed delay of 2000ms with 20 retries * enabling checkpointing with {{at-least-once-strategy}} I stopped and restarted TaskManagers without any problems popping up (besides Exceptions related to the missing TaskManagers) > Test Kafka table connector with new runtime provider > ---------------------------------------------------- > > Key: FLINK-23850 > URL: https://issues.apache.org/jira/browse/FLINK-23850 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Kafka > Affects Versions: 1.14.0 > Reporter: Qingsheng Ren > Assignee: Matthias > Priority: Blocker > Labels: release-testing > Fix For: 1.14.0 > > > The runtime provider of Kafka table connector has been replaced with new > KafkaSource and KafkaSink. The table connector requires to be tested to make > sure nothing is surprised to Table/SQL API users. -- This message was sent by Atlassian Jira (v8.3.4#803005)