This is an automated email from the ASF dual-hosted git repository.
yunqing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new a970fa616 [INLONG-6382][Sort] Iceberg data is messed up when the
source table has no primary key in multiple sink scenes (#6383)
a970fa616 is described below
commit a970fa6167790cfd9d0797e92b5ea26d5f4712e3
Author: thesumery <[email protected]>
AuthorDate: Mon Nov 7 11:22:30 2022 +0800
[INLONG-6382][Sort] Iceberg data is messed up when the source table has no
primary key in multiple sink scenes (#6383)
* [INLONG-6379][Sort] Bugfix:iceberg miss metric data in multiple sink
(#6381)
* [INLONG-6382][Sort] Bugfix:Iceberg miss data when source table do not
have primary key in multiple sink scences
* [INLONG-6382][Sort] Import auto generated primary key config in multiple
sink scences
Co-authored-by: thesumery <[email protected]>
---
.../org/apache/inlong/sort/base/Constants.java | 9 ++++++++
.../inlong/sort/base/sink/MultipleSinkOption.java | 26 +++++++++++++++++-----
.../sort/iceberg/FlinkDynamicTableFactory.java | 2 ++
.../inlong/sort/iceberg/IcebergTableSink.java | 2 ++
.../apache/inlong/sort/iceberg/sink/FlinkSink.java | 3 +--
.../sink/multiple/IcebergMultipleStreamWriter.java | 8 +++++--
6 files changed, 40 insertions(+), 10 deletions(-)
diff --git
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
index 19c58e9c1..c0c7f8808 100644
---
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
+++
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
@@ -163,4 +163,13 @@ public final class Constants {
.booleanType()
.defaultValue(true)
.withDescription("Whether ignore the single table erros
when multiple sink writing scenario.");
+
+ public static final ConfigOption<Boolean> SINK_MULTIPLE_PK_AUTO_GENERATED =
+ ConfigOptions.key("sink.multiple.pk-auto-generated")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("Whether generated pk fields as whole
data when source table does not have a "
+ + "primary key.");
+
+
}
diff --git
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/sink/MultipleSinkOption.java
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/sink/MultipleSinkOption.java
index 77c924b95..69ce3fc68 100644
---
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/sink/MultipleSinkOption.java
+++
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/sink/MultipleSinkOption.java
@@ -35,22 +35,26 @@ public class MultipleSinkOption implements Serializable {
private static final long serialVersionUID = 1L;
private static final Logger LOG =
LoggerFactory.getLogger(MultipleSinkOption.class);
- private String format;
+ private final String format;
- private SchemaUpdateExceptionPolicy schemaUpdatePolicy;
+ private final SchemaUpdateExceptionPolicy schemaUpdatePolicy;
- private String databasePattern;
+ private final String databasePattern;
- private String tablePattern;
+ private final String tablePattern;
+
+ private final boolean pkAutoGenerated;
public MultipleSinkOption(String format,
SchemaUpdateExceptionPolicy schemaUpdatePolicy,
String databasePattern,
- String tablePattern) {
+ String tablePattern,
+ boolean pkAutoGenerated) {
this.format = format;
this.schemaUpdatePolicy = schemaUpdatePolicy;
this.databasePattern = databasePattern;
this.tablePattern = tablePattern;
+ this.pkAutoGenerated = pkAutoGenerated;
}
public String getFormat() {
@@ -69,6 +73,10 @@ public class MultipleSinkOption implements Serializable {
return tablePattern;
}
+ public boolean isPkAutoGenerated() {
+ return pkAutoGenerated;
+ }
+
public static Builder builder() {
return new Builder();
}
@@ -78,6 +86,7 @@ public class MultipleSinkOption implements Serializable {
private SchemaUpdateExceptionPolicy schemaUpdatePolicy;
private String databasePattern;
private String tablePattern;
+ private boolean pkAutoGenerated;
public MultipleSinkOption.Builder withFormat(String format) {
this.format = format;
@@ -99,8 +108,13 @@ public class MultipleSinkOption implements Serializable {
return this;
}
+ public MultipleSinkOption.Builder withPkAutoGenerated(boolean
pkAutoGenerated) {
+ this.pkAutoGenerated = pkAutoGenerated;
+ return this;
+ }
+
public MultipleSinkOption build() {
- return new MultipleSinkOption(format, schemaUpdatePolicy,
databasePattern, tablePattern);
+ return new MultipleSinkOption(format, schemaUpdatePolicy,
databasePattern, tablePattern, pkAutoGenerated);
}
}
diff --git
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
index 4852bca33..3a0e3fb01 100644
---
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
+++
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
@@ -53,6 +53,7 @@ import static
org.apache.inlong.sort.base.Constants.INLONG_METRIC;
import static
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_DATABASE_PATTERN;
import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_ENABLE;
import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_FORMAT;
+import static
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_PK_AUTO_GENERATED;
import static
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_SCHEMA_UPDATE_POLICY;
import static
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_TABLE_PATTERN;
import static
org.apache.inlong.sort.iceberg.FlinkConfigOptions.ICEBERG_IGNORE_ALL_CHANGELOG;
@@ -241,6 +242,7 @@ public class FlinkDynamicTableFactory implements
DynamicTableSinkFactory, Dynami
options.add(SINK_MULTIPLE_DATABASE_PATTERN);
options.add(SINK_MULTIPLE_TABLE_PATTERN);
options.add(SINK_MULTIPLE_SCHEMA_UPDATE_POLICY);
+ options.add(SINK_MULTIPLE_PK_AUTO_GENERATED);
return options;
}
diff --git
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
index ee7cf2c89..6a96d4b72 100644
---
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
+++
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
@@ -47,6 +47,7 @@ import static
org.apache.inlong.sort.base.Constants.INLONG_METRIC;
import static
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_DATABASE_PATTERN;
import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_ENABLE;
import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_FORMAT;
+import static
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_PK_AUTO_GENERATED;
import static
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_SCHEMA_UPDATE_POLICY;
import static
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_TABLE_PATTERN;
import static
org.apache.inlong.sort.iceberg.FlinkConfigOptions.ICEBERG_IGNORE_ALL_CHANGELOG;
@@ -107,6 +108,7 @@ public class IcebergTableSink implements DynamicTableSink,
SupportsPartitioning,
.withDatabasePattern(tableOptions.get(SINK_MULTIPLE_DATABASE_PATTERN))
.withTablePattern(tableOptions.get(SINK_MULTIPLE_TABLE_PATTERN))
.withSchemaUpdatePolicy(tableOptions.get(SINK_MULTIPLE_SCHEMA_UPDATE_POLICY))
+
.withPkAutoGenerated(tableOptions.get(SINK_MULTIPLE_PK_AUTO_GENERATED))
.build())
.append();
} else {
diff --git
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
index 25f9e963b..9191aa575 100644
---
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
+++
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
@@ -400,7 +400,6 @@ public class FlinkSink {
}
-
/**
* Append the iceberg sink operators to write records to iceberg table.
*
@@ -529,7 +528,7 @@ public class FlinkSink {
IcebergProcessOperator streamWriter =
new IcebergProcessOperator(new IcebergMultipleStreamWriter(
- appendMode, catalogLoader, inlongMetric,
auditHostAndPorts));
+ appendMode, catalogLoader, inlongMetric,
auditHostAndPorts, multipleSinkOption));
SingleOutputStreamOperator<MultipleWriteResult> writerStream =
routeStream
.transform(operatorName(ICEBERG_MULTIPLE_STREAM_WRITER_NAME),
TypeInformation.of(IcebergProcessOperator.class),
diff --git
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java
index 617eb6d69..2c67cbd51 100644
---
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java
+++
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java
@@ -43,6 +43,7 @@ import
org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
import org.apache.inlong.sort.base.metric.MetricState;
import org.apache.inlong.sort.base.metric.SinkMetricData;
import org.apache.inlong.sort.base.util.MetricStateUtils;
+import org.apache.inlong.sort.base.sink.MultipleSinkOption;
import org.apache.inlong.sort.iceberg.sink.RowDataTaskWriterFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -76,6 +77,7 @@ public class IcebergMultipleStreamWriter extends
IcebergProcessFunction<RecordWi
private final boolean appendMode;
private final CatalogLoader catalogLoader;
+ private final MultipleSinkOption multipleSinkOption;
private transient Catalog catalog;
private transient Map<TableIdentifier, IcebergSingleStreamWriter<RowData>>
multipleWriters;
@@ -95,11 +97,13 @@ public class IcebergMultipleStreamWriter extends
IcebergProcessFunction<RecordWi
boolean appendMode,
CatalogLoader catalogLoader,
String inlongMetric,
- String auditHostAndPorts) {
+ String auditHostAndPorts,
+ MultipleSinkOption multipleSinkOption) {
this.appendMode = appendMode;
this.catalogLoader = catalogLoader;
this.inlongMetric = inlongMetric;
this.auditHostAndPorts = auditHostAndPorts;
+ this.multipleSinkOption = multipleSinkOption;
}
@Override
@@ -170,7 +174,7 @@ public class IcebergMultipleStreamWriter extends
IcebergProcessFunction<RecordWi
.map(pk ->
recordWithSchema.getSchema().findField(pk).fieldId())
.collect(Collectors.toList());
// if physical primary key not exist, put all field to logical
primary key
- if (equalityFieldIds.isEmpty()) {
+ if (equalityFieldIds.isEmpty() &&
multipleSinkOption.isPkAutoGenerated()) {
equalityFieldIds =
recordWithSchema.getSchema().columns().stream()
.map(NestedField::fieldId)
.collect(Collectors.toList());