kbendick commented on a change in pull request #2898:
URL: https://github.com/apache/iceberg/pull/2898#discussion_r680590851
##########
File path: flink/src/main/java/org/apache/iceberg/flink/IcebergTableSink.java
##########
@@ -59,12 +60,22 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context
context) {
.map(UniqueConstraint::getColumns)
.orElseGet(ImmutableList::of);
- return (DataStreamSinkProvider) dataStream ->
FlinkSink.forRowData(dataStream)
- .tableLoader(tableLoader)
- .tableSchema(tableSchema)
- .equalityFieldColumns(equalityColumns)
- .overwrite(overwrite)
- .build();
+ return (DataStreamSinkProvider) dataStream -> {
+ // For CDC case in FlinkSQL, change log will be rebalanced(default
partition strategy) distributed to Filter opr
+ // when set job default parallelism greater than 1. That will make
change log data disorder and produce a wrong
+ // result for iceberg(e.g. +U comes before -U). Here try to specific the
Filter opr parallelism same as it's
+ // input to keep Filter chaining it's input and avoid rebalance.
+ Transformation<?> forwardOpr = dataStream.getTransformation();
+ if (forwardOpr.getName().equals("Filter") &&
forwardOpr.getInputs().size() == 1) {
+
forwardOpr.setParallelism(forwardOpr.getInputs().get(0).getParallelism());
Review comment:
How does setting the parallelism here interplay with the parallelism of
the writer?
I notice there's one other place where we set the parallelism based on some
feature of the input, but we also have write parallelism that I've commented on
below. Is it possible for `writeParallelism` in the below section to be
different than what users have, say in the case of the CDC stream? For example,
could you draw the DAG if a users input `Filter` has p=2, and then
`writeParallelism` is set to `p=3`. Or is that not a possible case?
##########
File path: flink/src/main/java/org/apache/iceberg/flink/IcebergTableSink.java
##########
@@ -59,12 +60,22 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context
context) {
.map(UniqueConstraint::getColumns)
.orElseGet(ImmutableList::of);
- return (DataStreamSinkProvider) dataStream ->
FlinkSink.forRowData(dataStream)
- .tableLoader(tableLoader)
- .tableSchema(tableSchema)
- .equalityFieldColumns(equalityColumns)
- .overwrite(overwrite)
- .build();
+ return (DataStreamSinkProvider) dataStream -> {
+ // For CDC case in FlinkSQL, change log will be rebalanced(default
partition strategy) distributed to Filter opr
+ // when set job default parallelism greater than 1. That will make
change log data disorder and produce a wrong
+ // result for iceberg(e.g. +U comes before -U). Here try to specific the
Filter opr parallelism same as it's
+ // input to keep Filter chaining it's input and avoid rebalance.
+ Transformation<?> forwardOpr = dataStream.getTransformation();
+ if (forwardOpr.getName().equals("Filter") &&
forwardOpr.getInputs().size() == 1) {
Review comment:
Correct me if I'm wrong, but can't users set the name of the operators
themselves?
Is there a way (possibly in the Flink SDK) that we can determine if we're in
CDC mode (or more generally, if we have a retract stream) that is more specific
than this? I'm worried we might catch more cases here than we mean to and
unnecessarily force parallelism on jobs that don't require it.
##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
##########
@@ -260,18 +263,21 @@ public Builder uidPrefix(String newPrefix) {
// Convert the requested flink table schema to flink row type.
RowType flinkRowType = toFlinkRowType(table.schema(), tableSchema);
- // Distribute the records from input data stream based on the
write.distribution-mode.
- rowDataInput = distributeDataStream(rowDataInput, table.properties(),
table.spec(), table.schema(), flinkRowType);
+ // Distribute the records from input data stream based on the
write.distribution-mode and equality fields.
+ rowDataInput = distributeDataStream(rowDataInput, table.properties(),
equalityFieldIds, table.spec(),
+ table.schema(), flinkRowType);
// Chain the iceberg stream writer and committer operator.
IcebergStreamWriter<RowData> streamWriter = createStreamWriter(table,
flinkRowType, equalityFieldIds);
IcebergFilesCommitter filesCommitter = new
IcebergFilesCommitter(tableLoader, overwrite);
- this.writeParallelism = writeParallelism == null ?
rowDataInput.getParallelism() : writeParallelism;
-
SingleOutputStreamOperator<WriteResult> writerStream = rowDataInput
- .transform(ICEBERG_STREAM_WRITER_NAME,
TypeInformation.of(WriteResult.class), streamWriter)
- .setParallelism(writeParallelism);
+ .transform(ICEBERG_STREAM_WRITER_NAME,
TypeInformation.of(WriteResult.class), streamWriter);
+
+ if (this.writeParallelism != null) {
+ writerStream.setParallelism(writeParallelism);
+ }
Review comment:
Is it possible for this to be `null`? I notice the null check is new.
Also, in cases where you'd added `setParallelism` calls, what happens if
this `writeParallelism` is also set? Is that a situation that could be
encountered?
##########
File path:
flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
##########
@@ -83,29 +83,39 @@
private final FileFormat format;
private final int parallelism;
private final boolean partitioned;
+ private final String distributionMode;
private StreamExecutionEnvironment env;
private TestTableLoader tableLoader;
- @Parameterized.Parameters(name = "FileFormat = {0}, Parallelism = {1},
Partitioned={2}")
+ @Parameterized.Parameters(name = "FileFormat = {0}, Parallelism = {1},
Partitioned={2}, Distribution={3}")
public static Object[][] parameters() {
return new Object[][] {
- new Object[] {"avro", 1, true},
- new Object[] {"avro", 1, false},
- new Object[] {"avro", 2, true},
- new Object[] {"avro", 2, false},
- new Object[] {"parquet", 1, true},
- new Object[] {"parquet", 1, false},
- new Object[] {"parquet", 2, true},
- new Object[] {"parquet", 2, false}
+ new Object[] {"avro", 1, true,
TableProperties.WRITE_DISTRIBUTION_MODE_NONE},
+ new Object[] {"avro", 1, false,
TableProperties.WRITE_DISTRIBUTION_MODE_NONE},
+ new Object[] {"avro", 4, true,
TableProperties.WRITE_DISTRIBUTION_MODE_NONE},
+ new Object[] {"avro", 4, false,
TableProperties.WRITE_DISTRIBUTION_MODE_NONE},
+ new Object[] {"parquet", 1, true,
TableProperties.WRITE_DISTRIBUTION_MODE_NONE},
+ new Object[] {"parquet", 1, false,
TableProperties.WRITE_DISTRIBUTION_MODE_NONE},
+ new Object[] {"parquet", 4, true,
TableProperties.WRITE_DISTRIBUTION_MODE_NONE},
+ new Object[] {"parquet", 4, false,
TableProperties.WRITE_DISTRIBUTION_MODE_NONE},
+ new Object[] {"avro", 1, true,
TableProperties.WRITE_DISTRIBUTION_MODE_HASH},
+ new Object[] {"avro", 1, false,
TableProperties.WRITE_DISTRIBUTION_MODE_HASH},
+ new Object[] {"avro", 4, true,
TableProperties.WRITE_DISTRIBUTION_MODE_HASH},
+ new Object[] {"avro", 4, false,
TableProperties.WRITE_DISTRIBUTION_MODE_HASH},
+ new Object[] {"parquet", 1, true,
TableProperties.WRITE_DISTRIBUTION_MODE_HASH},
+ new Object[] {"parquet", 1, false,
TableProperties.WRITE_DISTRIBUTION_MODE_HASH},
+ new Object[] {"parquet", 4, true,
TableProperties.WRITE_DISTRIBUTION_MODE_HASH},
+ new Object[] {"parquet", 4, false,
TableProperties.WRITE_DISTRIBUTION_MODE_HASH}
Review comment:
Are there any tests that set any other values for
`write.distribution-mode` than `hash`?
Given that Flink CDC is a pretty common use case for Iceberg, It would be
great to add tests here for `none` and `ordered` since this has been added to
the tests as well, if possible.
##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
##########
@@ -313,13 +320,21 @@ public Builder uidPrefix(String newPrefix) {
switch (writeMode) {
case NONE:
- return input;
+ if (!equalityFieldIds.isEmpty()) {
+ return input.keyBy(new EqualityFieldKeySelector(equalityFieldIds,
table.schema(), flinkRowType));
+ } else {
+ return input;
+ }
case HASH:
- if (partitionSpec.isUnpartitioned()) {
- return input;
- } else {
+ if (!partitionSpec.isUnpartitioned() && !equalityFieldIds.isEmpty())
{
+ return input.keyBy(new HybridKeySelector(partitionSpec,
equalityFieldIds, iSchema, flinkRowType));
+ } else if (!partitionSpec.isUnpartitioned() &&
equalityFieldIds.isEmpty()) {
return input.keyBy(new PartitionKeySelector(partitionSpec,
iSchema, flinkRowType));
+ } else if (partitionSpec.isUnpartitioned() &&
!equalityFieldIds.isEmpty()) {
+ return input.keyBy(new EqualityFieldKeySelector(equalityFieldIds,
table.schema(), flinkRowType));
+ } else {
+ return input;
Review comment:
It seems like we've gotten into a lot of `if` nesting. Would it make
sense to either add a utility function, or add some method that returns more
reader-friendly value from the equalityFieldIds? For me, its not as clear to
see `isEmpty()` vs for example seeing `isUnpartitioned()`.
Can we add a utility or a method with some name that helps developers more
easily understand the different cases, especially given we have so much `if
else` nesting?
##########
File path:
flink/src/main/java/org/apache/iceberg/flink/sink/HybridKeySelector.java
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iceberg.flink.sink;
+
+import java.util.List;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+
+public class HybridKeySelector implements KeySelector<RowData, String> {
Review comment:
Nit but also an important question to help me understand better =)
The name `HybridKeySelector` is admittedly somewhat confusing to me
initially. Once I see the class definition, it's more understandable, but I do
have to look through the source to see what `Hybrid` means.
Which leads me to my question:
In the case of current `HybridKeySelector`, we're using the PartitionKey and
the `EqualityFieldKey`. This seems to work for `hash` distribution (at least
per the updated tests), but what about `range` and `none`? Which key selectors
would they use?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]