This is an automated email from the ASF dual-hosted git repository.

yunqing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new a970fa616 [INLONG-6382][Sort] Iceberg data is messed up when the 
source table has no primary key in multiple sink scenes (#6383)
a970fa616 is described below

commit a970fa6167790cfd9d0797e92b5ea26d5f4712e3
Author: thesumery <[email protected]>
AuthorDate: Mon Nov 7 11:22:30 2022 +0800

    [INLONG-6382][Sort] Iceberg data is messed up when the source table has no 
primary key in multiple sink scenes (#6383)
    
    * [INLONG-6379][Sort] Bugfix:iceberg miss metric data in multiple sink 
(#6381)
    
    * [INLONG-6382][Sort] Bugfix:Iceberg miss data when source table do not
    have primary key in multiple sink scences
    
    * [INLONG-6382][Sort] Import auto generated primary key config in multiple
    sink scences
    
    Co-authored-by: thesumery <[email protected]>
---
 .../org/apache/inlong/sort/base/Constants.java     |  9 ++++++++
 .../inlong/sort/base/sink/MultipleSinkOption.java  | 26 +++++++++++++++++-----
 .../sort/iceberg/FlinkDynamicTableFactory.java     |  2 ++
 .../inlong/sort/iceberg/IcebergTableSink.java      |  2 ++
 .../apache/inlong/sort/iceberg/sink/FlinkSink.java |  3 +--
 .../sink/multiple/IcebergMultipleStreamWriter.java |  8 +++++--
 6 files changed, 40 insertions(+), 10 deletions(-)

diff --git 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
index 19c58e9c1..c0c7f8808 100644
--- 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
+++ 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
@@ -163,4 +163,13 @@ public final class Constants {
                     .booleanType()
                     .defaultValue(true)
                     .withDescription("Whether ignore the single table erros 
when multiple sink writing scenario.");
+
+    public static final ConfigOption<Boolean> SINK_MULTIPLE_PK_AUTO_GENERATED =
+            ConfigOptions.key("sink.multiple.pk-auto-generated")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("Whether generated pk fields as whole 
data when source table does not have a "
+                            + "primary key.");
+
+
 }
diff --git 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/sink/MultipleSinkOption.java
 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/sink/MultipleSinkOption.java
index 77c924b95..69ce3fc68 100644
--- 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/sink/MultipleSinkOption.java
+++ 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/sink/MultipleSinkOption.java
@@ -35,22 +35,26 @@ public class MultipleSinkOption implements Serializable {
     private static final long serialVersionUID = 1L;
     private static final Logger LOG = 
LoggerFactory.getLogger(MultipleSinkOption.class);
 
-    private String format;
+    private final String format;
 
-    private SchemaUpdateExceptionPolicy schemaUpdatePolicy;
+    private final SchemaUpdateExceptionPolicy schemaUpdatePolicy;
 
-    private String databasePattern;
+    private final String databasePattern;
 
-    private String tablePattern;
+    private final String tablePattern;
+
+    private final boolean pkAutoGenerated;
 
     public MultipleSinkOption(String format,
             SchemaUpdateExceptionPolicy schemaUpdatePolicy,
             String databasePattern,
-            String tablePattern) {
+            String tablePattern,
+            boolean pkAutoGenerated) {
         this.format = format;
         this.schemaUpdatePolicy = schemaUpdatePolicy;
         this.databasePattern = databasePattern;
         this.tablePattern = tablePattern;
+        this.pkAutoGenerated = pkAutoGenerated;
     }
 
     public String getFormat() {
@@ -69,6 +73,10 @@ public class MultipleSinkOption implements Serializable {
         return tablePattern;
     }
 
+    public boolean isPkAutoGenerated() {
+        return pkAutoGenerated;
+    }
+
     public static Builder builder() {
         return new Builder();
     }
@@ -78,6 +86,7 @@ public class MultipleSinkOption implements Serializable {
         private SchemaUpdateExceptionPolicy schemaUpdatePolicy;
         private String databasePattern;
         private String tablePattern;
+        private boolean pkAutoGenerated;
 
         public MultipleSinkOption.Builder withFormat(String format) {
             this.format = format;
@@ -99,8 +108,13 @@ public class MultipleSinkOption implements Serializable {
             return this;
         }
 
+        public MultipleSinkOption.Builder withPkAutoGenerated(boolean 
pkAutoGenerated) {
+            this.pkAutoGenerated = pkAutoGenerated;
+            return this;
+        }
+
         public MultipleSinkOption build() {
-            return new MultipleSinkOption(format, schemaUpdatePolicy, 
databasePattern, tablePattern);
+            return new MultipleSinkOption(format, schemaUpdatePolicy, 
databasePattern, tablePattern, pkAutoGenerated);
         }
     }
 
diff --git 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
index 4852bca33..3a0e3fb01 100644
--- 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
+++ 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
@@ -53,6 +53,7 @@ import static 
org.apache.inlong.sort.base.Constants.INLONG_METRIC;
 import static 
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_DATABASE_PATTERN;
 import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_ENABLE;
 import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_FORMAT;
+import static 
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_PK_AUTO_GENERATED;
 import static 
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_SCHEMA_UPDATE_POLICY;
 import static 
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_TABLE_PATTERN;
 import static 
org.apache.inlong.sort.iceberg.FlinkConfigOptions.ICEBERG_IGNORE_ALL_CHANGELOG;
@@ -241,6 +242,7 @@ public class FlinkDynamicTableFactory implements 
DynamicTableSinkFactory, Dynami
         options.add(SINK_MULTIPLE_DATABASE_PATTERN);
         options.add(SINK_MULTIPLE_TABLE_PATTERN);
         options.add(SINK_MULTIPLE_SCHEMA_UPDATE_POLICY);
+        options.add(SINK_MULTIPLE_PK_AUTO_GENERATED);
         return options;
     }
 
diff --git 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
index ee7cf2c89..6a96d4b72 100644
--- 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
+++ 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
@@ -47,6 +47,7 @@ import static 
org.apache.inlong.sort.base.Constants.INLONG_METRIC;
 import static 
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_DATABASE_PATTERN;
 import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_ENABLE;
 import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_FORMAT;
+import static 
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_PK_AUTO_GENERATED;
 import static 
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_SCHEMA_UPDATE_POLICY;
 import static 
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_TABLE_PATTERN;
 import static 
org.apache.inlong.sort.iceberg.FlinkConfigOptions.ICEBERG_IGNORE_ALL_CHANGELOG;
@@ -107,6 +108,7 @@ public class IcebergTableSink implements DynamicTableSink, 
SupportsPartitioning,
                             
.withDatabasePattern(tableOptions.get(SINK_MULTIPLE_DATABASE_PATTERN))
                             
.withTablePattern(tableOptions.get(SINK_MULTIPLE_TABLE_PATTERN))
                             
.withSchemaUpdatePolicy(tableOptions.get(SINK_MULTIPLE_SCHEMA_UPDATE_POLICY))
+                            
.withPkAutoGenerated(tableOptions.get(SINK_MULTIPLE_PK_AUTO_GENERATED))
                             .build())
                     .append();
         } else {
diff --git 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
index 25f9e963b..9191aa575 100644
--- 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
+++ 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
@@ -400,7 +400,6 @@ public class FlinkSink {
         }
 
 
-
         /**
          * Append the iceberg sink operators to write records to iceberg table.
          *
@@ -529,7 +528,7 @@ public class FlinkSink {
 
             IcebergProcessOperator streamWriter =
                     new IcebergProcessOperator(new IcebergMultipleStreamWriter(
-                            appendMode, catalogLoader, inlongMetric, 
auditHostAndPorts));
+                            appendMode, catalogLoader, inlongMetric, 
auditHostAndPorts, multipleSinkOption));
             SingleOutputStreamOperator<MultipleWriteResult> writerStream = 
routeStream
                     
.transform(operatorName(ICEBERG_MULTIPLE_STREAM_WRITER_NAME),
                             TypeInformation.of(IcebergProcessOperator.class),
diff --git 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java
 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java
index 617eb6d69..2c67cbd51 100644
--- 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java
+++ 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java
@@ -43,6 +43,7 @@ import 
org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
 import org.apache.inlong.sort.base.metric.MetricState;
 import org.apache.inlong.sort.base.metric.SinkMetricData;
 import org.apache.inlong.sort.base.util.MetricStateUtils;
+import org.apache.inlong.sort.base.sink.MultipleSinkOption;
 import org.apache.inlong.sort.iceberg.sink.RowDataTaskWriterFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -76,6 +77,7 @@ public class IcebergMultipleStreamWriter extends 
IcebergProcessFunction<RecordWi
 
     private final boolean appendMode;
     private final CatalogLoader catalogLoader;
+    private final MultipleSinkOption multipleSinkOption;
 
     private transient Catalog catalog;
     private transient Map<TableIdentifier, IcebergSingleStreamWriter<RowData>> 
multipleWriters;
@@ -95,11 +97,13 @@ public class IcebergMultipleStreamWriter extends 
IcebergProcessFunction<RecordWi
             boolean appendMode,
             CatalogLoader catalogLoader,
             String inlongMetric,
-            String auditHostAndPorts) {
+            String auditHostAndPorts,
+            MultipleSinkOption multipleSinkOption) {
         this.appendMode = appendMode;
         this.catalogLoader = catalogLoader;
         this.inlongMetric = inlongMetric;
         this.auditHostAndPorts = auditHostAndPorts;
+        this.multipleSinkOption = multipleSinkOption;
     }
 
     @Override
@@ -170,7 +174,7 @@ public class IcebergMultipleStreamWriter extends 
IcebergProcessFunction<RecordWi
                     .map(pk -> 
recordWithSchema.getSchema().findField(pk).fieldId())
                     .collect(Collectors.toList());
             // if physical primary key not exist, put all field to logical 
primary key
-            if (equalityFieldIds.isEmpty()) {
+            if (equalityFieldIds.isEmpty() && 
multipleSinkOption.isPkAutoGenerated()) {
                 equalityFieldIds = 
recordWithSchema.getSchema().columns().stream()
                         .map(NestedField::fieldId)
                         .collect(Collectors.toList());

Reply via email to