This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch branch-1.4 in repository https://gitbox.apache.org/repos/asf/inlong.git
commit f30dec91e613dc8058ad8c077b63eebe965d8d5f Author: thesumery <[email protected]> AuthorDate: Tue Nov 8 21:37:30 2022 +0800 [INLONG-6382][Sort] Iceberg data is messed up when the source table has no primary key in multiple sink scenes (#6461) Co-authored-by: thesumery <[email protected]> --- .../org/apache/inlong/sort/base/Constants.java | 7 ++++++ .../inlong/sort/base/sink/MultipleSinkOption.java | 26 +++++++++++++++++----- .../sort/iceberg/FlinkDynamicTableFactory.java | 2 ++ .../inlong/sort/iceberg/IcebergTableSink.java | 2 ++ .../apache/inlong/sort/iceberg/sink/FlinkSink.java | 3 +-- .../sink/multiple/IcebergMultipleStreamWriter.java | 8 +++++-- 6 files changed, 38 insertions(+), 10 deletions(-) diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java index 9cc3c979d..9780370f2 100644 --- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java +++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java @@ -164,6 +164,13 @@ public final class Constants { .defaultValue(true) .withDescription("Whether ignore the single table erros when multiple sink writing scenario."); + public static final ConfigOption<Boolean> SINK_MULTIPLE_PK_AUTO_GENERATED = + ConfigOptions.key("sink.multiple.pk-auto-generated") + .booleanType() + .defaultValue(false) + .withDescription("Whether generated pk fields as whole data when source table does not have a " + + "primary key."); + public static final ConfigOption<Boolean> SINK_MULTIPLE_TYPE_MAP_COMPATIBLE_WITH_SPARK = ConfigOptions.key("sink.multiple.typemap-compatible-with-spark") .booleanType() diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/sink/MultipleSinkOption.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/sink/MultipleSinkOption.java index 10bc3f3d1..3d5663b74 100644 --- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/sink/MultipleSinkOption.java +++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/sink/MultipleSinkOption.java @@ -38,26 +38,29 @@ public class MultipleSinkOption implements Serializable { private static final long serialVersionUID = 1L; private static final Logger LOG = LoggerFactory.getLogger(MultipleSinkOption.class); - private String format; + private final String format; private boolean sparkEngineEnable; - private SchemaUpdateExceptionPolicy schemaUpdatePolicy; + private final SchemaUpdateExceptionPolicy schemaUpdatePolicy; + private final String databasePattern; - private String databasePattern; + private final String tablePattern; - private String tablePattern; + private final boolean pkAutoGenerated; public MultipleSinkOption(String format, boolean sparkEngineEnable, SchemaUpdateExceptionPolicy schemaUpdatePolicy, String databasePattern, - String tablePattern) { + String tablePattern, + boolean pkAutoGenerated) { this.format = format; this.sparkEngineEnable = sparkEngineEnable; this.schemaUpdatePolicy = schemaUpdatePolicy; this.databasePattern = databasePattern; this.tablePattern = tablePattern; + this.pkAutoGenerated = pkAutoGenerated; } public String getFormat() { @@ -85,6 +88,10 @@ public class MultipleSinkOption implements Serializable { return tablePattern; } + public boolean isPkAutoGenerated() { + return pkAutoGenerated; + } + public static Builder builder() { return new Builder(); } @@ -95,6 +102,7 @@ public class MultipleSinkOption implements Serializable { private SchemaUpdateExceptionPolicy schemaUpdatePolicy; private String databasePattern; private String tablePattern; + private boolean pkAutoGenerated; public MultipleSinkOption.Builder withFormat(String format) { this.format = format; @@ -121,8 +129,14 @@ public class MultipleSinkOption implements Serializable { return this; } + public MultipleSinkOption.Builder withPkAutoGenerated(boolean pkAutoGenerated) { + this.pkAutoGenerated = pkAutoGenerated; + return this; + } + public MultipleSinkOption build() { - return new MultipleSinkOption(format, sparkEngineEnable, schemaUpdatePolicy, databasePattern, tablePattern); + return new MultipleSinkOption( + format, sparkEngineEnable, schemaUpdatePolicy, databasePattern, tablePattern, pkAutoGenerated); } } diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java index 040e73df6..94a7cc305 100644 --- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java +++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java @@ -53,6 +53,7 @@ import static org.apache.inlong.sort.base.Constants.INLONG_METRIC; import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_DATABASE_PATTERN; import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_ENABLE; import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_FORMAT; +import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_PK_AUTO_GENERATED; import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_SCHEMA_UPDATE_POLICY; import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_TABLE_PATTERN; import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_TYPE_MAP_COMPATIBLE_WITH_SPARK; @@ -242,6 +243,7 @@ public class FlinkDynamicTableFactory implements DynamicTableSinkFactory, Dynami options.add(SINK_MULTIPLE_DATABASE_PATTERN); options.add(SINK_MULTIPLE_TABLE_PATTERN); options.add(SINK_MULTIPLE_SCHEMA_UPDATE_POLICY); + options.add(SINK_MULTIPLE_PK_AUTO_GENERATED); options.add(SINK_MULTIPLE_TYPE_MAP_COMPATIBLE_WITH_SPARK); return options; } diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java index 67ca5fdf2..367727309 100644 --- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java +++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java @@ -47,6 +47,7 @@ import static org.apache.inlong.sort.base.Constants.INLONG_METRIC; import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_DATABASE_PATTERN; import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_ENABLE; import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_FORMAT; +import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_PK_AUTO_GENERATED; import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_SCHEMA_UPDATE_POLICY; import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_TABLE_PATTERN; import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_TYPE_MAP_COMPATIBLE_WITH_SPARK; @@ -109,6 +110,7 @@ public class IcebergTableSink implements DynamicTableSink, SupportsPartitioning, .withDatabasePattern(tableOptions.get(SINK_MULTIPLE_DATABASE_PATTERN)) .withTablePattern(tableOptions.get(SINK_MULTIPLE_TABLE_PATTERN)) .withSchemaUpdatePolicy(tableOptions.get(SINK_MULTIPLE_SCHEMA_UPDATE_POLICY)) + .withPkAutoGenerated(tableOptions.get(SINK_MULTIPLE_PK_AUTO_GENERATED)) .build()) .append(); } else { diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java index 25f9e963b..9191aa575 100644 --- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java +++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java @@ -400,7 +400,6 @@ public class FlinkSink { } - /** * Append the iceberg sink operators to write records to iceberg table. * @@ -529,7 +528,7 @@ public class FlinkSink { IcebergProcessOperator streamWriter = new IcebergProcessOperator(new IcebergMultipleStreamWriter( - appendMode, catalogLoader, inlongMetric, auditHostAndPorts)); + appendMode, catalogLoader, inlongMetric, auditHostAndPorts, multipleSinkOption)); SingleOutputStreamOperator<MultipleWriteResult> writerStream = routeStream .transform(operatorName(ICEBERG_MULTIPLE_STREAM_WRITER_NAME), TypeInformation.of(IcebergProcessOperator.class), diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java index 617eb6d69..2c67cbd51 100644 --- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java +++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java @@ -43,6 +43,7 @@ import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric; import org.apache.inlong.sort.base.metric.MetricState; import org.apache.inlong.sort.base.metric.SinkMetricData; import org.apache.inlong.sort.base.util.MetricStateUtils; +import org.apache.inlong.sort.base.sink.MultipleSinkOption; import org.apache.inlong.sort.iceberg.sink.RowDataTaskWriterFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -76,6 +77,7 @@ public class IcebergMultipleStreamWriter extends IcebergProcessFunction<RecordWi private final boolean appendMode; private final CatalogLoader catalogLoader; + private final MultipleSinkOption multipleSinkOption; private transient Catalog catalog; private transient Map<TableIdentifier, IcebergSingleStreamWriter<RowData>> multipleWriters; @@ -95,11 +97,13 @@ public class IcebergMultipleStreamWriter extends IcebergProcessFunction<RecordWi boolean appendMode, CatalogLoader catalogLoader, String inlongMetric, - String auditHostAndPorts) { + String auditHostAndPorts, + MultipleSinkOption multipleSinkOption) { this.appendMode = appendMode; this.catalogLoader = catalogLoader; this.inlongMetric = inlongMetric; this.auditHostAndPorts = auditHostAndPorts; + this.multipleSinkOption = multipleSinkOption; } @Override @@ -170,7 +174,7 @@ public class IcebergMultipleStreamWriter extends IcebergProcessFunction<RecordWi .map(pk -> recordWithSchema.getSchema().findField(pk).fieldId()) .collect(Collectors.toList()); // if physical primary key not exist, put all field to logical primary key - if (equalityFieldIds.isEmpty()) { + if (equalityFieldIds.isEmpty() && multipleSinkOption.isPkAutoGenerated()) { equalityFieldIds = recordWithSchema.getSchema().columns().stream() .map(NestedField::fieldId) .collect(Collectors.toList());
