This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new a1fc17cbf [INLONG-6382][Sort] Iceberg data is messed up when the
source table has no primary key in multiple sink scenes (#6461)
a1fc17cbf is described below
commit a1fc17cbfbab05bb076bf5620e34634aaa9d182b
Author: thesumery <[email protected]>
AuthorDate: Tue Nov 8 21:37:30 2022 +0800
[INLONG-6382][Sort] Iceberg data is messed up when the source table has no
primary key in multiple sink scenes (#6461)
Co-authored-by: thesumery <[email protected]>
---
.../org/apache/inlong/sort/base/Constants.java | 7 ++++++
.../inlong/sort/base/sink/MultipleSinkOption.java | 26 +++++++++++++++++-----
.../sort/iceberg/FlinkDynamicTableFactory.java | 2 ++
.../inlong/sort/iceberg/IcebergTableSink.java | 2 ++
.../apache/inlong/sort/iceberg/sink/FlinkSink.java | 3 +--
.../sink/multiple/IcebergMultipleStreamWriter.java | 8 +++++--
6 files changed, 38 insertions(+), 10 deletions(-)
diff --git
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
index 9cc3c979d..9780370f2 100644
---
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
+++
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
@@ -164,6 +164,13 @@ public final class Constants {
.defaultValue(true)
.withDescription("Whether ignore the single table erros
when multiple sink writing scenario.");
+ public static final ConfigOption<Boolean> SINK_MULTIPLE_PK_AUTO_GENERATED =
+ ConfigOptions.key("sink.multiple.pk-auto-generated")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("Whether generated pk fields as whole
data when source table does not have a "
+ + "primary key.");
+
public static final ConfigOption<Boolean>
SINK_MULTIPLE_TYPE_MAP_COMPATIBLE_WITH_SPARK =
ConfigOptions.key("sink.multiple.typemap-compatible-with-spark")
.booleanType()
diff --git
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/sink/MultipleSinkOption.java
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/sink/MultipleSinkOption.java
index 10bc3f3d1..3d5663b74 100644
---
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/sink/MultipleSinkOption.java
+++
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/sink/MultipleSinkOption.java
@@ -38,26 +38,29 @@ public class MultipleSinkOption implements Serializable {
private static final long serialVersionUID = 1L;
private static final Logger LOG =
LoggerFactory.getLogger(MultipleSinkOption.class);
- private String format;
+ private final String format;
private boolean sparkEngineEnable;
- private SchemaUpdateExceptionPolicy schemaUpdatePolicy;
+ private final SchemaUpdateExceptionPolicy schemaUpdatePolicy;
+ private final String databasePattern;
- private String databasePattern;
+ private final String tablePattern;
- private String tablePattern;
+ private final boolean pkAutoGenerated;
public MultipleSinkOption(String format,
boolean sparkEngineEnable,
SchemaUpdateExceptionPolicy schemaUpdatePolicy,
String databasePattern,
- String tablePattern) {
+ String tablePattern,
+ boolean pkAutoGenerated) {
this.format = format;
this.sparkEngineEnable = sparkEngineEnable;
this.schemaUpdatePolicy = schemaUpdatePolicy;
this.databasePattern = databasePattern;
this.tablePattern = tablePattern;
+ this.pkAutoGenerated = pkAutoGenerated;
}
public String getFormat() {
@@ -85,6 +88,10 @@ public class MultipleSinkOption implements Serializable {
return tablePattern;
}
+ public boolean isPkAutoGenerated() {
+ return pkAutoGenerated;
+ }
+
public static Builder builder() {
return new Builder();
}
@@ -95,6 +102,7 @@ public class MultipleSinkOption implements Serializable {
private SchemaUpdateExceptionPolicy schemaUpdatePolicy;
private String databasePattern;
private String tablePattern;
+ private boolean pkAutoGenerated;
public MultipleSinkOption.Builder withFormat(String format) {
this.format = format;
@@ -121,8 +129,14 @@ public class MultipleSinkOption implements Serializable {
return this;
}
+ public MultipleSinkOption.Builder withPkAutoGenerated(boolean
pkAutoGenerated) {
+ this.pkAutoGenerated = pkAutoGenerated;
+ return this;
+ }
+
public MultipleSinkOption build() {
- return new MultipleSinkOption(format, sparkEngineEnable,
schemaUpdatePolicy, databasePattern, tablePattern);
+ return new MultipleSinkOption(
+ format, sparkEngineEnable, schemaUpdatePolicy,
databasePattern, tablePattern, pkAutoGenerated);
}
}
diff --git
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
index 040e73df6..94a7cc305 100644
---
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
+++
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
@@ -53,6 +53,7 @@ import static
org.apache.inlong.sort.base.Constants.INLONG_METRIC;
import static
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_DATABASE_PATTERN;
import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_ENABLE;
import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_FORMAT;
+import static
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_PK_AUTO_GENERATED;
import static
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_SCHEMA_UPDATE_POLICY;
import static
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_TABLE_PATTERN;
import static
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_TYPE_MAP_COMPATIBLE_WITH_SPARK;
@@ -242,6 +243,7 @@ public class FlinkDynamicTableFactory implements
DynamicTableSinkFactory, Dynami
options.add(SINK_MULTIPLE_DATABASE_PATTERN);
options.add(SINK_MULTIPLE_TABLE_PATTERN);
options.add(SINK_MULTIPLE_SCHEMA_UPDATE_POLICY);
+ options.add(SINK_MULTIPLE_PK_AUTO_GENERATED);
options.add(SINK_MULTIPLE_TYPE_MAP_COMPATIBLE_WITH_SPARK);
return options;
}
diff --git
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
index 67ca5fdf2..367727309 100644
---
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
+++
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
@@ -47,6 +47,7 @@ import static
org.apache.inlong.sort.base.Constants.INLONG_METRIC;
import static
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_DATABASE_PATTERN;
import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_ENABLE;
import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_FORMAT;
+import static
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_PK_AUTO_GENERATED;
import static
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_SCHEMA_UPDATE_POLICY;
import static
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_TABLE_PATTERN;
import static
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_TYPE_MAP_COMPATIBLE_WITH_SPARK;
@@ -109,6 +110,7 @@ public class IcebergTableSink implements DynamicTableSink,
SupportsPartitioning,
.withDatabasePattern(tableOptions.get(SINK_MULTIPLE_DATABASE_PATTERN))
.withTablePattern(tableOptions.get(SINK_MULTIPLE_TABLE_PATTERN))
.withSchemaUpdatePolicy(tableOptions.get(SINK_MULTIPLE_SCHEMA_UPDATE_POLICY))
+
.withPkAutoGenerated(tableOptions.get(SINK_MULTIPLE_PK_AUTO_GENERATED))
.build())
.append();
} else {
diff --git
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
index 25f9e963b..9191aa575 100644
---
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
+++
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
@@ -400,7 +400,6 @@ public class FlinkSink {
}
-
/**
* Append the iceberg sink operators to write records to iceberg table.
*
@@ -529,7 +528,7 @@ public class FlinkSink {
IcebergProcessOperator streamWriter =
new IcebergProcessOperator(new IcebergMultipleStreamWriter(
- appendMode, catalogLoader, inlongMetric,
auditHostAndPorts));
+ appendMode, catalogLoader, inlongMetric,
auditHostAndPorts, multipleSinkOption));
SingleOutputStreamOperator<MultipleWriteResult> writerStream =
routeStream
.transform(operatorName(ICEBERG_MULTIPLE_STREAM_WRITER_NAME),
TypeInformation.of(IcebergProcessOperator.class),
diff --git
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java
index 617eb6d69..2c67cbd51 100644
---
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java
+++
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java
@@ -43,6 +43,7 @@ import
org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
import org.apache.inlong.sort.base.metric.MetricState;
import org.apache.inlong.sort.base.metric.SinkMetricData;
import org.apache.inlong.sort.base.util.MetricStateUtils;
+import org.apache.inlong.sort.base.sink.MultipleSinkOption;
import org.apache.inlong.sort.iceberg.sink.RowDataTaskWriterFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -76,6 +77,7 @@ public class IcebergMultipleStreamWriter extends
IcebergProcessFunction<RecordWi
private final boolean appendMode;
private final CatalogLoader catalogLoader;
+ private final MultipleSinkOption multipleSinkOption;
private transient Catalog catalog;
private transient Map<TableIdentifier, IcebergSingleStreamWriter<RowData>>
multipleWriters;
@@ -95,11 +97,13 @@ public class IcebergMultipleStreamWriter extends
IcebergProcessFunction<RecordWi
boolean appendMode,
CatalogLoader catalogLoader,
String inlongMetric,
- String auditHostAndPorts) {
+ String auditHostAndPorts,
+ MultipleSinkOption multipleSinkOption) {
this.appendMode = appendMode;
this.catalogLoader = catalogLoader;
this.inlongMetric = inlongMetric;
this.auditHostAndPorts = auditHostAndPorts;
+ this.multipleSinkOption = multipleSinkOption;
}
@Override
@@ -170,7 +174,7 @@ public class IcebergMultipleStreamWriter extends
IcebergProcessFunction<RecordWi
.map(pk ->
recordWithSchema.getSchema().findField(pk).fieldId())
.collect(Collectors.toList());
// if physical primary key not exist, put all field to logical
primary key
- if (equalityFieldIds.isEmpty()) {
+ if (equalityFieldIds.isEmpty() &&
multipleSinkOption.isPkAutoGenerated()) {
equalityFieldIds =
recordWithSchema.getSchema().columns().stream()
.map(NestedField::fieldId)
.collect(Collectors.toList());