mas-chen commented on code in PR #20343:
URL: https://github.com/apache/flink/pull/20343#discussion_r991847890
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertWriterTest.java:
##########
@@ -141,6 +142,73 @@ public static Object[] enableObjectReuse() {
TimestampData.fromInstant(Instant.parse("2021-03-30T21:00:00Z")))
};
+ public static final RowData[] TEST_DATA_WITH_NULL_TIMESTAMP = {
+ GenericRowData.ofKind(
Review Comment:
+1, I believe you can reuse `TEST_DATA` already defined in the class.
`TEST_DATA` also has many records, but that's a different issue
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertWriterTest.java:
##########
@@ -382,4 +543,33 @@ public RowData next() {
}
}
}
+
+ private class ReusableIteratorWithNullTimestamp implements
Iterator<RowData> {
Review Comment:
I think the iterator is overkill here. You can even test this functionality
with one record. Basically you want to confirm if a record with a null
timestamp can be flushed.
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertWriterTest.java:
##########
@@ -297,6 +432,26 @@ public Long timestamp() {
}
}
+ private void writeDataWithNullTimestamp(
+ ReducingUpsertWriter<?> writer, Iterator<RowData> iterator) throws
Exception {
+ while (iterator.hasNext()) {
+ RowData next = iterator.next();
+ writer.write(
Review Comment:
You can just call this once and invoke flush() afterwards. There's even no
need to test the buffering logic as that is captured by other tests.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]