openinx commented on a change in pull request #2898: URL: https://github.com/apache/iceberg/pull/2898#discussion_r814414515
########## File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/EqualityFieldKeySelector.java ########## @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.sink; + +import java.util.List; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.RowType; +import org.apache.iceberg.Schema; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.util.StructLikeWrapper; +import org.apache.iceberg.util.StructProjection; + +/** + * Create a {@link KeySelector} to shuffle by equality fields, to ensure same equality fields record will be emitted to + * same writer. That can prevent create duplicate record when insert and delete one row which have same equality field + * values on different writer in one transaction, and guarantee pos-delete will take effect. + */ +class EqualityFieldKeySelector extends BaseKeySelector<RowData, Integer> { + + private final Schema schema; + private final Schema deleteSchema; + + private transient StructProjection structProjection; + private transient StructLikeWrapper structLikeWrapper; + + EqualityFieldKeySelector(List<Integer> equalityFieldIds, Schema schema, RowType flinkSchema) { + super(schema, flinkSchema); + this.schema = schema; Review comment: As this `schema` has been defined in its parent class, right ? Then here we don't need to add an extra `schema` field in this children class. Personally, I think the can just make all the logics from `BaseKeySelector` into the `EqualityFieldKeySelector`, because the current abstraction of `BaseKeySelector` seems only provide a `lazyRowDataWrapper` method. Just removing the `BaseKeySelector` looks more clear for me. ########## File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/EqualityFieldKeySelector.java ########## @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.sink; + +import java.util.List; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.RowType; +import org.apache.iceberg.Schema; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.util.StructLikeWrapper; +import org.apache.iceberg.util.StructProjection; + +/** + * Create a {@link KeySelector} to shuffle by equality fields, to ensure same equality fields record will be emitted to + * same writer. That can prevent create duplicate record when insert and delete one row which have same equality field + * values on different writer in one transaction, and guarantee pos-delete will take effect. + */ +class EqualityFieldKeySelector extends BaseKeySelector<RowData, Integer> { + + private final Schema schema; + private final Schema deleteSchema; + + private transient StructProjection structProjection; + private transient StructLikeWrapper structLikeWrapper; + + EqualityFieldKeySelector(List<Integer> equalityFieldIds, Schema schema, RowType flinkSchema) { + super(schema, flinkSchema); + this.schema = schema; + this.deleteSchema = TypeUtil.select(schema, Sets.newHashSet(equalityFieldIds)); + } + + /** + * Construct the {@link StructProjection} lazily because it is not serializable. + */ + protected StructProjection lazyStructProjection() { + if (structProjection == null) { + structProjection = StructProjection.create(schema, deleteSchema); + } + return structProjection; + } + + /** + * Construct the {@link StructLikeWrapper} lazily because it is not serializable. + */ + protected StructLikeWrapper lazyStructLikeWrapper() { + if (structLikeWrapper == null) { + structLikeWrapper = StructLikeWrapper.forType(deleteSchema.asStruct()); + } + return structLikeWrapper; + } + + @Override + public Integer getKey(RowData row) { + return lazyStructLikeWrapper().set(lazyStructProjection().wrap(lazyRowDataWrapper().wrap(row))).hashCode(); Review comment: Nit: It more clear for me to understand the unfolded code like the following: ```java public Integer getKey(RowData row) { RowDataWrapper rowDataWrapper = lazyRowDataWrapper().wrap(row); StructProjection structProjection = lazyStructProjection().wrap(rowDataWrapper); StructLikeWrapper structLikeWrapper = lazyStructLikeWrapper().set(structProjection); return structLikeWrapper.hashCode(); } ``` ########## File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java ########## @@ -422,21 +436,50 @@ private String operatorName(String suffix) { writeMode = distributionMode; } + LOG.info("Write distribution mode is '{}'", writeMode.modeName()); switch (writeMode) { case NONE: - return input; + if (equalityFieldIds.isEmpty()) { + return input; + } else { + LOG.info("Distribute rows by equality fields, because there are equality fields set"); + return input.keyBy(new EqualityFieldKeySelector(equalityFieldIds, iSchema, flinkRowType)); + } case HASH: - if (partitionSpec.isUnpartitioned()) { - return input; + if (equalityFieldIds.isEmpty()) { + if (partitionSpec.isUnpartitioned()) { + LOG.warn("Fallback to use 'none' distribution mode, because there are no equality fields set " + + "and table is unpartitioned"); + return input; + } else { + return input.keyBy(new PartitionKeySelector(partitionSpec, iSchema, flinkRowType)); + } } else { - return input.keyBy(new PartitionKeySelector(partitionSpec, iSchema, flinkRowType)); + if (partitionSpec.isUnpartitioned()) { + LOG.info("Distribute rows by equality fields, because there are equality fields set " + + "and table is unpartitioned"); + return input.keyBy(new EqualityFieldKeySelector(equalityFieldIds, iSchema, flinkRowType)); + } else { + for (PartitionField partitionField : partitionSpec.fields()) { + Preconditions.checkState(equalityFieldIds.contains(partitionField.sourceId()), + "In 'hash' distribution mode with equality fields set, partition field '%s' " + + "should be included in equality fields: '%s'", partitionField, equalityFieldColumns); + } + return input.keyBy(new PartitionKeySelector(partitionSpec, iSchema, flinkRowType)); + } } case RANGE: - LOG.warn("Fallback to use 'none' distribution mode, because {}={} is not supported in flink now", - WRITE_DISTRIBUTION_MODE, DistributionMode.RANGE.modeName()); - return input; + if (equalityFieldIds.isEmpty()) { + LOG.warn("Fallback to use 'none' distribution mode, because there are no equality fields set " + + "and write.distribution-mode=range is not supported yet in flink"); Review comment: Nit: Keep to use the `WRITE_DISTRIBUTION_MODE` and `DistributionMode.Range `for refactoring purpose. ########## File path: flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java ########## @@ -84,36 +84,39 @@ private final FileFormat format; private final int parallelism; private final boolean partitioned; + private final String distributionMode; private StreamExecutionEnvironment env; private TestTableLoader tableLoader; - @Parameterized.Parameters(name = "FileFormat = {0}, Parallelism = {1}, Partitioned={2}") + @Parameterized.Parameters(name = "FileFormat = {0}, Parallelism = {1}, Partitioned={2}, Distribution={3}") Review comment: Nit: Use the full name `WriteDistributionMode` pls. ########## File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/EqualityFieldKeySelector.java ########## @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.sink; + +import java.util.List; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.RowType; +import org.apache.iceberg.Schema; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.util.StructLikeWrapper; +import org.apache.iceberg.util.StructProjection; + +/** + * Create a {@link KeySelector} to shuffle by equality fields, to ensure same equality fields record will be emitted to + * same writer. That can prevent create duplicate record when insert and delete one row which have same equality field Review comment: Nit: I think add the `will be emitted to same writer in order` will make more clear. The next sentence about `That can prevent ... will take effect` can be removed. ########## File path: flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java ########## @@ -155,10 +159,6 @@ private void testChangeLogs(List<String> equalityFieldColumns, List<List<Record>> expectedRecordsPerCheckpoint) throws Exception { DataStream<Row> dataStream = env.addSource(new BoundedTestSource<>(elementsPerCheckpoint), ROW_TYPE_INFO); - // Shuffle by the equality key, so that different operations of the same key could be wrote in order when - // executing tasks in parallel. - dataStream = dataStream.keyBy(keySelector); Review comment: Then the argument `keySelector` in testChangeLogs method can be removed now ? ########## File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java ########## @@ -422,21 +436,50 @@ private String operatorName(String suffix) { writeMode = distributionMode; } + LOG.info("Write distribution mode is '{}'", writeMode.modeName()); switch (writeMode) { case NONE: - return input; + if (equalityFieldIds.isEmpty()) { + return input; + } else { + LOG.info("Distribute rows by equality fields, because there are equality fields set"); + return input.keyBy(new EqualityFieldKeySelector(equalityFieldIds, iSchema, flinkRowType)); + } case HASH: - if (partitionSpec.isUnpartitioned()) { - return input; + if (equalityFieldIds.isEmpty()) { + if (partitionSpec.isUnpartitioned()) { + LOG.warn("Fallback to use 'none' distribution mode, because there are no equality fields set " + + "and table is unpartitioned"); + return input; + } else { + return input.keyBy(new PartitionKeySelector(partitionSpec, iSchema, flinkRowType)); + } } else { - return input.keyBy(new PartitionKeySelector(partitionSpec, iSchema, flinkRowType)); + if (partitionSpec.isUnpartitioned()) { + LOG.info("Distribute rows by equality fields, because there are equality fields set " + + "and table is unpartitioned"); + return input.keyBy(new EqualityFieldKeySelector(equalityFieldIds, iSchema, flinkRowType)); + } else { + for (PartitionField partitionField : partitionSpec.fields()) { + Preconditions.checkState(equalityFieldIds.contains(partitionField.sourceId()), + "In 'hash' distribution mode with equality fields set, partition field '%s' " + + "should be included in equality fields: '%s'", partitionField, equalityFieldColumns); + } + return input.keyBy(new PartitionKeySelector(partitionSpec, iSchema, flinkRowType)); + } } case RANGE: - LOG.warn("Fallback to use 'none' distribution mode, because {}={} is not supported in flink now", - WRITE_DISTRIBUTION_MODE, DistributionMode.RANGE.modeName()); - return input; + if (equalityFieldIds.isEmpty()) { + LOG.warn("Fallback to use 'none' distribution mode, because there are no equality fields set " + + "and write.distribution-mode=range is not supported yet in flink"); + return input; + } else { + LOG.info("Distribute rows by equality fields, because there are equality fields set " + + "and write.distribution-mode=range is not supported yet in flink"); Review comment: Ditto. ########## File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java ########## @@ -295,15 +298,34 @@ public Builder setSnapshotProperty(String property, String value) { } } + // Find out the equality field id list based on the user-provided equality field column names. + List<Integer> equalityFieldIds = Lists.newArrayList(table.schema().identifierFieldIds()); + if (equalityFieldColumns != null && equalityFieldColumns.size() > 0) { + Set<Integer> equalityFieldSet = Sets.newHashSetWithExpectedSize(equalityFieldColumns.size()); + for (String column : equalityFieldColumns) { + org.apache.iceberg.types.Types.NestedField field = table.schema().findField(column); + Preconditions.checkNotNull(field, "Missing required equality field column '%s' in table schema %s", + column, table.schema()); + equalityFieldSet.add(field.fieldId()); + } + + if (!equalityFieldSet.equals(table.schema().identifierFieldIds())) { + LOG.warn("The configured equality field columns are not match with the identifier fields of schema, " + Review comment: Nit: `match` -> `matched`. It's better to log the two different field id set in this message. ########## File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java ########## @@ -146,13 +148,14 @@ private Builder forRowData(DataStream<RowData> newRowDataInput) { MapFunction<T, RowData> mapper, TypeInformation<RowData> outputType) { this.inputCreator = newUidPrefix -> { + // Input stream order is crucial for some situation(e.g. in cdc case). Therefore, we need to set the parallelism + // of map operator same as its input to keep map operator chaining its input, and avoid rebalanced by default. + SingleOutputStreamOperator<RowData> inputStream = input.map(mapper, outputType) Review comment: Nice catch. Is possible for me add an unit test to address this catching ? ########## File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java ########## @@ -295,15 +298,34 @@ public Builder setSnapshotProperty(String property, String value) { } } + // Find out the equality field id list based on the user-provided equality field column names. + List<Integer> equalityFieldIds = Lists.newArrayList(table.schema().identifierFieldIds()); + if (equalityFieldColumns != null && equalityFieldColumns.size() > 0) { + Set<Integer> equalityFieldSet = Sets.newHashSetWithExpectedSize(equalityFieldColumns.size()); + for (String column : equalityFieldColumns) { + org.apache.iceberg.types.Types.NestedField field = table.schema().findField(column); + Preconditions.checkNotNull(field, "Missing required equality field column '%s' in table schema %s", + column, table.schema()); + equalityFieldSet.add(field.fieldId()); + } + + if (!equalityFieldSet.equals(table.schema().identifierFieldIds())) { + LOG.warn("The configured equality field columns are not match with the identifier fields of schema, " + + "use job specified equality field columns as the equality fields by default."); + } + equalityFieldIds = Lists.newArrayList(equalityFieldSet); + } Review comment: Let's make this into a separate small method named `checkAndGetEqualityFieldIds` to make this `build` core logic more clear. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
