This is an automated email from the ASF dual-hosted git repository. tzulitai pushed a commit to branch v3.0 in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git
commit 5bb239111586531fdc1594ddbf5076efdf43ead2 Author: zhang_yao <[email protected]> AuthorDate: Fri Feb 10 14:40:37 2023 +0800 [FLINK-25916][connector-kafka] Using upsert-kafka with a flush buffer results in Null Pointer Exception This closes #5. --- .../kafka/table/ReducingUpsertWriter.java | 5 +- .../kafka/table/ReducingUpsertWriterTest.java | 53 +++++++++++++++++- .../kafka/table/UpsertKafkaTableITCase.java | 65 ++++++++++++++++++++++ 3 files changed, 118 insertions(+), 5 deletions(-) diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertWriter.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertWriter.java index 67df4a69..91487466 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertWriter.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertWriter.java @@ -158,7 +158,7 @@ class ReducingUpsertWriter<WriterState> * ReducingUpsertWriter} will emit the records in the buffer with memorized timestamp. */ private static class WrappedContext implements SinkWriter.Context { - private long timestamp; + private Long timestamp; private SinkWriter.Context context; @Override @@ -169,11 +169,10 @@ class ReducingUpsertWriter<WriterState> @Override public Long timestamp() { - checkNotNull(timestamp, "timestamp must to be set before retrieving it."); return timestamp; } - public void setTimestamp(long timestamp) { + public void setTimestamp(Long timestamp) { this.timestamp = timestamp; } diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertWriterTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertWriterTest.java index 1ad9d094..5ef36e75 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertWriterTest.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertWriterTest.java @@ -262,6 +262,50 @@ public class ReducingUpsertWriterTest { compareCompactedResult(expected, writer.rowDataCollectors); } + @Test + public void testWriteDataWithNullTimestamp() throws Exception { + final MockedSinkWriter writer = new MockedSinkWriter(); + final ReducingUpsertWriter<?> bufferedWriter = createBufferedWriter(writer); + + bufferedWriter.write( + GenericRowData.ofKind( + INSERT, + 1001, + StringData.fromString("Java public for dummies"), + StringData.fromString("Tan Ah Teck"), + 11.11, + 11, + null), + new org.apache.flink.api.connector.sink2.SinkWriter.Context() { + @Override + public long currentWatermark() { + throw new UnsupportedOperationException("Not implemented."); + } + + @Override + public Long timestamp() { + return null; + } + }); + + bufferedWriter.flush(true); + + final HashMap<Integer, List<RowData>> expected = new HashMap<>(); + expected.put( + 1001, + Collections.singletonList( + GenericRowData.ofKind( + UPDATE_AFTER, + 1001, + StringData.fromString("Java public for dummies"), + StringData.fromString("Tan Ah Teck"), + 11.11, + 11, + null))); + + compareCompactedResult(expected, writer.rowDataCollectors); + } + private void compareCompactedResult( Map<Integer, List<RowData>> expected, List<RowData> actual) { Map<Integer, List<RowData>> actualMap = new HashMap<>(); @@ -340,8 +384,13 @@ public class ReducingUpsertWriterTest { @Override public void write(RowData element, Context context) throws IOException, InterruptedException { - assertThat(Instant.ofEpochMilli(context.timestamp())) - .isEqualTo(element.getTimestamp(TIMESTAMP_INDICES, 3).toInstant()); + // Allow comparison between null timestamps + if (context.timestamp() == null) { + assertThat(element.getTimestamp(TIMESTAMP_INDICES, 3)).isNull(); + } else { + assertThat(Instant.ofEpochMilli(context.timestamp())) + .isEqualTo(element.getTimestamp(TIMESTAMP_INDICES, 3).toInstant()); + } rowDataCollectors.add(element); } diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaTableITCase.java b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaTableITCase.java index 109f4402..d82e64cf 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaTableITCase.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaTableITCase.java @@ -195,6 +195,71 @@ public class UpsertKafkaTableITCase extends KafkaTableTestBase { deleteTestTopic(topic); } + @Test + public void testBufferedUpsertSinkWithoutAssigningWatermark() throws Exception { + final String topic = "buffered_upsert_topic_without_assigning_watermark_" + format; + createTestTopic(topic, 1, 1); + String bootstraps = getBootstrapServers(); + env.setParallelism(1); + + Table table = + tEnv.fromDataStream( + env.fromElements( + Row.of(1, null, "payload 1"), + Row.of(2, null, "payload 2"), + Row.of(3, null, "payload 3"), + Row.of(3, null, "payload")) + .returns( + ROW_NAMED( + new String[] {"k_id", "ts", "payload"}, + INT, + LOCAL_DATE_TIME, + STRING)), + Schema.newBuilder() + .column("k_id", DataTypes.INT()) + .column("ts", DataTypes.TIMESTAMP(3)) + .column("payload", DataTypes.STRING()) + .build()); + + final String createTable = + String.format( + "CREATE TABLE upsert_kafka (\n" + + " `k_id` INTEGER,\n" + + " `ts` TIMESTAMP(3),\n" + + " `payload` STRING,\n" + + " PRIMARY KEY (k_id) NOT ENFORCED" + + ") WITH (\n" + + " 'connector' = 'upsert-kafka',\n" + + " 'topic' = '%s',\n" + + " 'properties.bootstrap.servers' = '%s',\n" + + " 'key.format' = '%s',\n" + + " 'sink.buffer-flush.max-rows' = '2',\n" + + " 'sink.buffer-flush.interval' = '100000',\n" + + " 'value.format' = '%s',\n" + + " 'value.fields-include' = 'ALL',\n" + + " 'key.csv.null-literal' = '<NULL>',\n" + + " 'value.csv.null-literal' = '<NULL>'\n" + + ")", + topic, bootstraps, "csv", "csv"); + + tEnv.executeSql(createTable); + + table.executeInsert("upsert_kafka").await(); + + final List<Row> result = collectRows(tEnv.sqlQuery("SELECT * FROM upsert_kafka"), 3); + final List<Row> expected = + Arrays.asList( + changelogRow("+I", 1, null, "payload 1"), + changelogRow("+I", 2, null, "payload 2"), + changelogRow("+I", 3, null, "payload")); + + assertThat(result).satisfies(matching(deepEqualTo(expected, true))); + + // ------------- cleanup ------------------- + + deleteTestTopic(topic); + } + @Test public void testSourceSinkWithKeyAndPartialValue() throws Exception { // we always use a different topic name for each parameterized topic,
