Reo-LEI commented on a change in pull request #2898:
URL: https://github.com/apache/iceberg/pull/2898#discussion_r697887709
##########
File path:
flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
##########
@@ -83,29 +83,39 @@
private final FileFormat format;
private final int parallelism;
private final boolean partitioned;
+ private final String distributionMode;
private StreamExecutionEnvironment env;
private TestTableLoader tableLoader;
- @Parameterized.Parameters(name = "FileFormat = {0}, Parallelism = {1},
Partitioned={2}")
+ @Parameterized.Parameters(name = "FileFormat = {0}, Parallelism = {1},
Partitioned={2}, Distribution={3}")
public static Object[][] parameters() {
return new Object[][] {
- new Object[] {"avro", 1, true},
- new Object[] {"avro", 1, false},
- new Object[] {"avro", 2, true},
- new Object[] {"avro", 2, false},
- new Object[] {"parquet", 1, true},
- new Object[] {"parquet", 1, false},
- new Object[] {"parquet", 2, true},
- new Object[] {"parquet", 2, false}
+ new Object[] {"avro", 1, true,
TableProperties.WRITE_DISTRIBUTION_MODE_NONE},
+ new Object[] {"avro", 1, false,
TableProperties.WRITE_DISTRIBUTION_MODE_NONE},
+ new Object[] {"avro", 4, true,
TableProperties.WRITE_DISTRIBUTION_MODE_NONE},
Review comment:
Because parallelism 2 could not cover this problem situation.
Suppose we have change log as follow:
```
+I<1, "aaa">
-U<1, "aaa">
+U<1, "bbb">
```
When parallelism is 2, we got DAG like that.
```
+---> Map ---forward--->
IcebergStreamWriter-1 ---+
ChangelogStream ---rebalance---+
+---rebalance---> IcebergFilesCommitter
+---> Map ---forward--->
IcebergStreamWriter-2 ---+
```
We can know the `+I` and `+U` will be distribute to `IcebergStreamWriter-1`
and `-U` will be distribute to `IcebergStreamWriter-2`. The process is wrong,
but we still get the correct result, because `IcebergStreamWriter` will write
pos-delete record for all has been written record in txn on
`BaseEqualityDeltaWriter.write` method.
If we increase the parallelism greater than 2, such as 3, we will get DAG
like this.
```
+---> Map ---forward--->
IcebergStreamWriter-1 ---+
ChangelogStream ---rebalance---+---> Map ---forward--->
IcebergStreamWriter-2 ---+---rebalance---> IcebergFilesCommitter
+---> Map ---forward--->
IcebergStreamWriter-3 ---+
```
Now `+I` will be distribute to `IcebergStreamWriter-1`, `+U` will be
distribute to `IcebergStreamWriter-2` and `-U` will be distribute to
`IcebergStreamWriter-3`. And will get the duplicate rows as expectations. So I
increase parallelism to 4 to cover this problem situation.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]