kbendick commented on a change in pull request #1515:
URL: https://github.com/apache/iceberg/pull/1515#discussion_r495639861



##########
File path: 
flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
##########
@@ -145,6 +147,40 @@ public void testWriteRowData() throws Exception {
     SimpleDataUtil.assertTableRows(tablePath, expectedRows);
   }
 
+  @Test
+  public void testWriteRowDataWithoutCheckpoint() throws Exception {
+    List<Row> rows = Lists.newArrayList(
+        Row.of(1, "hello"),
+        Row.of(2, "world"),
+        Row.of(3, "foo")
+    );
+
+    env = StreamExecutionEnvironment.getExecutionEnvironment()
+        .setParallelism(parallelism)
+        .setMaxParallelism(parallelism);
+
+    DataStream<RowData> dataStream = env.addSource(new 
FiniteTestSourceNoCheckpoint<>(rows), ROW_TYPE_INFO)
+        .map(CONVERTER::toInternal, 
RowDataTypeInfo.of(SimpleDataUtil.ROW_TYPE));
+
+    org.apache.flink.configuration.Configuration flinkConf = new 
org.apache.flink.configuration.Configuration();
+    flinkConf.setLong(FlinkSink.FLINK_ICEBERG_SINK_FLUSHINTERVAL, 100L);
+
+    FlinkSink.forRowData(dataStream)
+        .table(table)
+        .tableLoader(tableLoader)
+        .flinkConf(flinkConf)
+        .hadoopConf(CONF)
+        .build();
+
+    // Execute the program.
+    env.execute("Test Iceberg DataStream");
+
+    // Assert the iceberg table's records. NOTICE: the FiniteTestSource will 
checkpoint the same rows twice, so it will
+    // commit the same row list into iceberg twice.

Review comment:
       Possible misdocumentation? You're referencing `FiniteTestSource` in this 
and saying that it chckpoints the same rows twice. However, this test is using 
`FiniteTestSourceNoCheckpoint` and it _should_, by definition, not checkpoint.
   
   Additionally, it seems that `FiniteTestSource` will emit the same rows twice 
based on the [behavior in its `run` 
function](https://github.com/apache/flink/blob/0bd4e46a6596c9d9ea8096f63dc3f6fd85132a7e/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/FiniteTestSource.java#L59-L65).
 Unless I'm mistaken, it seems like the `FiniteTestSourceNoCheckpoint` does not 
have this same behavior in its run method and should therefore only emit the 
data one time.
   
   If this test is currently passing, I wonder if that's indicative of a bug or 
a flakey test. Could you help me understand better why your 
`FiniteTestSourceNoCheckpoint` emits and commits the same rows twice?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to