zentol commented on code in PR #20343:
URL: https://github.com/apache/flink/pull/20343#discussion_r988711185


##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertWriterTest.java:
##########
@@ -214,6 +282,73 @@ public void testWriteData() throws Exception {
         assertThat(writer.rowDataCollectors).isEmpty();
     }
 
+    @Test
+    public void testWriteDataWithNullTimestamp() throws Exception {
+        final MockedSinkWriter writer = new MockedSinkWriter();
+        final ReducingUpsertWriter<?> bufferedWriter = 
createBufferedWriter(writer);
+
+        // write 4 records which doesn't trigger batch size
+        writeDataWithNullTimestamp(bufferedWriter, new 
ReusableIteratorWithNullTimestamp(0, 4));
+        assertThat(writer.rowDataCollectors).isEmpty();
+
+        // write one more record, and should flush the buffer
+        writeDataWithNullTimestamp(bufferedWriter, new 
ReusableIteratorWithNullTimestamp(7, 1));

Review Comment:
   Is it necessary to split this into 2 calls? What do we gain?



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertWriterTest.java:
##########
@@ -141,6 +142,73 @@ public static Object[] enableObjectReuse() {
                 
TimestampData.fromInstant(Instant.parse("2021-03-30T21:00:00Z")))
     };
 
+    public static final RowData[] TEST_DATA_WITH_NULL_TIMESTAMP = {
+        GenericRowData.ofKind(

Review Comment:
   I don't see why we need so many records. This should already fail even with 
a single record.
   Reducing the number of required required records for the test would remove a 
lot of noise from the test.



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertWriterTest.java:
##########
@@ -214,6 +282,73 @@ public void testWriteData() throws Exception {
         assertThat(writer.rowDataCollectors).isEmpty();
     }
 
+    @Test
+    public void testWriteDataWithNullTimestamp() throws Exception {
+        final MockedSinkWriter writer = new MockedSinkWriter();
+        final ReducingUpsertWriter<?> bufferedWriter = 
createBufferedWriter(writer);
+
+        // write 4 records which doesn't trigger batch size
+        writeDataWithNullTimestamp(bufferedWriter, new 
ReusableIteratorWithNullTimestamp(0, 4));
+        assertThat(writer.rowDataCollectors).isEmpty();
+
+        // write one more record, and should flush the buffer
+        writeDataWithNullTimestamp(bufferedWriter, new 
ReusableIteratorWithNullTimestamp(7, 1));
+
+        HashMap<Integer, List<RowData>> expected = new HashMap<>();
+        expected.put(
+                1001,
+                Collections.singletonList(
+                        GenericRowData.ofKind(
+                                UPDATE_AFTER,
+                                1001,
+                                StringData.fromString("Java public for 
dummies"),
+                                StringData.fromString("Tan Ah Teck"),
+                                11.11,
+                                11,
+                                null)));
+        expected.put(
+                1002,
+                Collections.singletonList(
+                        GenericRowData.ofKind(
+                                UPDATE_AFTER,
+                                1002,
+                                StringData.fromString("More Java for dummies"),
+                                StringData.fromString("Tan Ah Teck"),
+                                22.22,
+                                22,
+                                null)));
+        expected.put(
+                1004,
+                Collections.singletonList(
+                        GenericRowData.ofKind(
+                                UPDATE_AFTER,
+                                1004,
+                                StringData.fromString("A Teaspoon of Java"),
+                                StringData.fromString("Kevin Jones"),
+                                55.55,
+                                55,
+                                null)));
+
+        expected.put(
+                1005,
+                Collections.singletonList(
+                        GenericRowData.ofKind(
+                                DELETE,
+                                1005,
+                                StringData.fromString("A Teaspoon of Java 
1.8"),
+                                StringData.fromString("Kevin Jones"),
+                                null,
+                                1010,
+                                null)));
+
+        compareCompactedResult(expected, writer.rowDataCollectors);
+
+        writer.rowDataCollectors.clear();
+        // write remaining data, and they are still buffered
+        writeDataWithNullTimestamp(bufferedWriter, new 
ReusableIteratorWithNullTimestamp(4, 3));
+        assertThat(writer.rowDataCollectors).isEmpty();

Review Comment:
   Why do we keep testing the buffering?



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertWriterTest.java:
##########
@@ -214,6 +282,73 @@ public void testWriteData() throws Exception {
         assertThat(writer.rowDataCollectors).isEmpty();
     }
 
+    @Test
+    public void testWriteDataWithNullTimestamp() throws Exception {
+        final MockedSinkWriter writer = new MockedSinkWriter();
+        final ReducingUpsertWriter<?> bufferedWriter = 
createBufferedWriter(writer);
+
+        // write 4 records which doesn't trigger batch size
+        writeDataWithNullTimestamp(bufferedWriter, new 
ReusableIteratorWithNullTimestamp(0, 4));
+        assertThat(writer.rowDataCollectors).isEmpty();
+
+        // write one more record, and should flush the buffer
+        writeDataWithNullTimestamp(bufferedWriter, new 
ReusableIteratorWithNullTimestamp(7, 1));
+
+        HashMap<Integer, List<RowData>> expected = new HashMap<>();

Review Comment:
   ```suggestion
           final Map<Integer, List<RowData>> expected = new HashMap<>();
   ```



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertWriterTest.java:
##########
@@ -214,6 +282,73 @@ public void testWriteData() throws Exception {
         assertThat(writer.rowDataCollectors).isEmpty();
     }
 
+    @Test
+    public void testWriteDataWithNullTimestamp() throws Exception {
+        final MockedSinkWriter writer = new MockedSinkWriter();
+        final ReducingUpsertWriter<?> bufferedWriter = 
createBufferedWriter(writer);
+
+        // write 4 records which doesn't trigger batch size

Review Comment:
   Why doesn't this trigger batching? Are we relying on some default config 
value here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to