This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch v3.0
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git

commit 5bb239111586531fdc1594ddbf5076efdf43ead2
Author: zhang_yao <[email protected]>
AuthorDate: Fri Feb 10 14:40:37 2023 +0800

    [FLINK-25916][connector-kafka] Using upsert-kafka with a flush buffer 
results in Null Pointer Exception
    
    This closes #5.
---
 .../kafka/table/ReducingUpsertWriter.java          |  5 +-
 .../kafka/table/ReducingUpsertWriterTest.java      | 53 +++++++++++++++++-
 .../kafka/table/UpsertKafkaTableITCase.java        | 65 ++++++++++++++++++++++
 3 files changed, 118 insertions(+), 5 deletions(-)

diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertWriter.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertWriter.java
index 67df4a69..91487466 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertWriter.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertWriter.java
@@ -158,7 +158,7 @@ class ReducingUpsertWriter<WriterState>
      * ReducingUpsertWriter} will emit the records in the buffer with 
memorized timestamp.
      */
     private static class WrappedContext implements SinkWriter.Context {
-        private long timestamp;
+        private Long timestamp;
         private SinkWriter.Context context;
 
         @Override
@@ -169,11 +169,10 @@ class ReducingUpsertWriter<WriterState>
 
         @Override
         public Long timestamp() {
-            checkNotNull(timestamp, "timestamp must to be set before 
retrieving it.");
             return timestamp;
         }
 
-        public void setTimestamp(long timestamp) {
+        public void setTimestamp(Long timestamp) {
             this.timestamp = timestamp;
         }
 
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertWriterTest.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertWriterTest.java
index 1ad9d094..5ef36e75 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertWriterTest.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertWriterTest.java
@@ -262,6 +262,50 @@ public class ReducingUpsertWriterTest {
         compareCompactedResult(expected, writer.rowDataCollectors);
     }
 
+    @Test
+    public void testWriteDataWithNullTimestamp() throws Exception {
+        final MockedSinkWriter writer = new MockedSinkWriter();
+        final ReducingUpsertWriter<?> bufferedWriter = 
createBufferedWriter(writer);
+
+        bufferedWriter.write(
+                GenericRowData.ofKind(
+                        INSERT,
+                        1001,
+                        StringData.fromString("Java public for dummies"),
+                        StringData.fromString("Tan Ah Teck"),
+                        11.11,
+                        11,
+                        null),
+                new org.apache.flink.api.connector.sink2.SinkWriter.Context() {
+                    @Override
+                    public long currentWatermark() {
+                        throw new UnsupportedOperationException("Not 
implemented.");
+                    }
+
+                    @Override
+                    public Long timestamp() {
+                        return null;
+                    }
+                });
+
+        bufferedWriter.flush(true);
+
+        final HashMap<Integer, List<RowData>> expected = new HashMap<>();
+        expected.put(
+                1001,
+                Collections.singletonList(
+                        GenericRowData.ofKind(
+                                UPDATE_AFTER,
+                                1001,
+                                StringData.fromString("Java public for 
dummies"),
+                                StringData.fromString("Tan Ah Teck"),
+                                11.11,
+                                11,
+                                null)));
+
+        compareCompactedResult(expected, writer.rowDataCollectors);
+    }
+
     private void compareCompactedResult(
             Map<Integer, List<RowData>> expected, List<RowData> actual) {
         Map<Integer, List<RowData>> actualMap = new HashMap<>();
@@ -340,8 +384,13 @@ public class ReducingUpsertWriterTest {
         @Override
         public void write(RowData element, Context context)
                 throws IOException, InterruptedException {
-            assertThat(Instant.ofEpochMilli(context.timestamp()))
-                    .isEqualTo(element.getTimestamp(TIMESTAMP_INDICES, 
3).toInstant());
+            // Allow comparison between null timestamps
+            if (context.timestamp() == null) {
+                assertThat(element.getTimestamp(TIMESTAMP_INDICES, 
3)).isNull();
+            } else {
+                assertThat(Instant.ofEpochMilli(context.timestamp()))
+                        .isEqualTo(element.getTimestamp(TIMESTAMP_INDICES, 
3).toInstant());
+            }
             rowDataCollectors.add(element);
         }
 
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaTableITCase.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaTableITCase.java
index 109f4402..d82e64cf 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaTableITCase.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaTableITCase.java
@@ -195,6 +195,71 @@ public class UpsertKafkaTableITCase extends 
KafkaTableTestBase {
         deleteTestTopic(topic);
     }
 
+    @Test
+    public void testBufferedUpsertSinkWithoutAssigningWatermark() throws 
Exception {
+        final String topic = 
"buffered_upsert_topic_without_assigning_watermark_" + format;
+        createTestTopic(topic, 1, 1);
+        String bootstraps = getBootstrapServers();
+        env.setParallelism(1);
+
+        Table table =
+                tEnv.fromDataStream(
+                        env.fromElements(
+                                        Row.of(1, null, "payload 1"),
+                                        Row.of(2, null, "payload 2"),
+                                        Row.of(3, null, "payload 3"),
+                                        Row.of(3, null, "payload"))
+                                .returns(
+                                        ROW_NAMED(
+                                                new String[] {"k_id", "ts", 
"payload"},
+                                                INT,
+                                                LOCAL_DATE_TIME,
+                                                STRING)),
+                        Schema.newBuilder()
+                                .column("k_id", DataTypes.INT())
+                                .column("ts", DataTypes.TIMESTAMP(3))
+                                .column("payload", DataTypes.STRING())
+                                .build());
+
+        final String createTable =
+                String.format(
+                        "CREATE TABLE upsert_kafka (\n"
+                                + "  `k_id` INTEGER,\n"
+                                + "  `ts` TIMESTAMP(3),\n"
+                                + "  `payload` STRING,\n"
+                                + "  PRIMARY KEY (k_id) NOT ENFORCED"
+                                + ") WITH (\n"
+                                + "  'connector' = 'upsert-kafka',\n"
+                                + "  'topic' = '%s',\n"
+                                + "  'properties.bootstrap.servers' = '%s',\n"
+                                + "  'key.format' = '%s',\n"
+                                + "  'sink.buffer-flush.max-rows' = '2',\n"
+                                + "  'sink.buffer-flush.interval' = 
'100000',\n"
+                                + "  'value.format' = '%s',\n"
+                                + "  'value.fields-include' = 'ALL',\n"
+                                + "  'key.csv.null-literal' = '<NULL>',\n"
+                                + "  'value.csv.null-literal' = '<NULL>'\n"
+                                + ")",
+                        topic, bootstraps, "csv", "csv");
+
+        tEnv.executeSql(createTable);
+
+        table.executeInsert("upsert_kafka").await();
+
+        final List<Row> result = collectRows(tEnv.sqlQuery("SELECT * FROM 
upsert_kafka"), 3);
+        final List<Row> expected =
+                Arrays.asList(
+                        changelogRow("+I", 1, null, "payload 1"),
+                        changelogRow("+I", 2, null, "payload 2"),
+                        changelogRow("+I", 3, null, "payload"));
+
+        assertThat(result).satisfies(matching(deepEqualTo(expected, true)));
+
+        // ------------- cleanup -------------------
+
+        deleteTestTopic(topic);
+    }
+
     @Test
     public void testSourceSinkWithKeyAndPartialValue() throws Exception {
         // we always use a different topic name for each parameterized topic,

Reply via email to