This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch branch-1.4
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit f30dec91e613dc8058ad8c077b63eebe965d8d5f
Author: thesumery <[email protected]>
AuthorDate: Tue Nov 8 21:37:30 2022 +0800

    [INLONG-6382][Sort] Iceberg data is messed up when the source table has no 
primary key in multiple sink scenes (#6461)
    
    Co-authored-by: thesumery <[email protected]>
---
 .../org/apache/inlong/sort/base/Constants.java     |  7 ++++++
 .../inlong/sort/base/sink/MultipleSinkOption.java  | 26 +++++++++++++++++-----
 .../sort/iceberg/FlinkDynamicTableFactory.java     |  2 ++
 .../inlong/sort/iceberg/IcebergTableSink.java      |  2 ++
 .../apache/inlong/sort/iceberg/sink/FlinkSink.java |  3 +--
 .../sink/multiple/IcebergMultipleStreamWriter.java |  8 +++++--
 6 files changed, 38 insertions(+), 10 deletions(-)

diff --git 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
index 9cc3c979d..9780370f2 100644
--- 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
+++ 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
@@ -164,6 +164,13 @@ public final class Constants {
                     .defaultValue(true)
                     .withDescription("Whether ignore the single table erros 
when multiple sink writing scenario.");
 
+    public static final ConfigOption<Boolean> SINK_MULTIPLE_PK_AUTO_GENERATED =
+            ConfigOptions.key("sink.multiple.pk-auto-generated")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("Whether generated pk fields as whole 
data when source table does not have a "
+                            + "primary key.");
+
     public static final ConfigOption<Boolean> 
SINK_MULTIPLE_TYPE_MAP_COMPATIBLE_WITH_SPARK =
             ConfigOptions.key("sink.multiple.typemap-compatible-with-spark")
                     .booleanType()
diff --git 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/sink/MultipleSinkOption.java
 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/sink/MultipleSinkOption.java
index 10bc3f3d1..3d5663b74 100644
--- 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/sink/MultipleSinkOption.java
+++ 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/sink/MultipleSinkOption.java
@@ -38,26 +38,29 @@ public class MultipleSinkOption implements Serializable {
     private static final long serialVersionUID = 1L;
     private static final Logger LOG = 
LoggerFactory.getLogger(MultipleSinkOption.class);
 
-    private String format;
+    private final String format;
 
     private boolean sparkEngineEnable;
 
-    private SchemaUpdateExceptionPolicy schemaUpdatePolicy;
+    private final SchemaUpdateExceptionPolicy schemaUpdatePolicy;
+    private final String databasePattern;
 
-    private String databasePattern;
+    private final String tablePattern;
 
-    private String tablePattern;
+    private final boolean pkAutoGenerated;
 
     public MultipleSinkOption(String format,
             boolean sparkEngineEnable,
             SchemaUpdateExceptionPolicy schemaUpdatePolicy,
             String databasePattern,
-            String tablePattern) {
+            String tablePattern,
+            boolean pkAutoGenerated) {
         this.format = format;
         this.sparkEngineEnable = sparkEngineEnable;
         this.schemaUpdatePolicy = schemaUpdatePolicy;
         this.databasePattern = databasePattern;
         this.tablePattern = tablePattern;
+        this.pkAutoGenerated = pkAutoGenerated;
     }
 
     public String getFormat() {
@@ -85,6 +88,10 @@ public class MultipleSinkOption implements Serializable {
         return tablePattern;
     }
 
+    public boolean isPkAutoGenerated() {
+        return pkAutoGenerated;
+    }
+
     public static Builder builder() {
         return new Builder();
     }
@@ -95,6 +102,7 @@ public class MultipleSinkOption implements Serializable {
         private SchemaUpdateExceptionPolicy schemaUpdatePolicy;
         private String databasePattern;
         private String tablePattern;
+        private boolean pkAutoGenerated;
 
         public MultipleSinkOption.Builder withFormat(String format) {
             this.format = format;
@@ -121,8 +129,14 @@ public class MultipleSinkOption implements Serializable {
             return this;
         }
 
+        public MultipleSinkOption.Builder withPkAutoGenerated(boolean 
pkAutoGenerated) {
+            this.pkAutoGenerated = pkAutoGenerated;
+            return this;
+        }
+
         public MultipleSinkOption build() {
-            return new MultipleSinkOption(format, sparkEngineEnable, 
schemaUpdatePolicy, databasePattern, tablePattern);
+            return new MultipleSinkOption(
+                    format, sparkEngineEnable, schemaUpdatePolicy, 
databasePattern, tablePattern, pkAutoGenerated);
         }
     }
 
diff --git 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
index 040e73df6..94a7cc305 100644
--- 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
+++ 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
@@ -53,6 +53,7 @@ import static 
org.apache.inlong.sort.base.Constants.INLONG_METRIC;
 import static 
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_DATABASE_PATTERN;
 import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_ENABLE;
 import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_FORMAT;
+import static 
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_PK_AUTO_GENERATED;
 import static 
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_SCHEMA_UPDATE_POLICY;
 import static 
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_TABLE_PATTERN;
 import static 
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_TYPE_MAP_COMPATIBLE_WITH_SPARK;
@@ -242,6 +243,7 @@ public class FlinkDynamicTableFactory implements 
DynamicTableSinkFactory, Dynami
         options.add(SINK_MULTIPLE_DATABASE_PATTERN);
         options.add(SINK_MULTIPLE_TABLE_PATTERN);
         options.add(SINK_MULTIPLE_SCHEMA_UPDATE_POLICY);
+        options.add(SINK_MULTIPLE_PK_AUTO_GENERATED);
         options.add(SINK_MULTIPLE_TYPE_MAP_COMPATIBLE_WITH_SPARK);
         return options;
     }
diff --git 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
index 67ca5fdf2..367727309 100644
--- 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
+++ 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
@@ -47,6 +47,7 @@ import static 
org.apache.inlong.sort.base.Constants.INLONG_METRIC;
 import static 
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_DATABASE_PATTERN;
 import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_ENABLE;
 import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_FORMAT;
+import static 
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_PK_AUTO_GENERATED;
 import static 
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_SCHEMA_UPDATE_POLICY;
 import static 
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_TABLE_PATTERN;
 import static 
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_TYPE_MAP_COMPATIBLE_WITH_SPARK;
@@ -109,6 +110,7 @@ public class IcebergTableSink implements DynamicTableSink, 
SupportsPartitioning,
                             
.withDatabasePattern(tableOptions.get(SINK_MULTIPLE_DATABASE_PATTERN))
                             
.withTablePattern(tableOptions.get(SINK_MULTIPLE_TABLE_PATTERN))
                             
.withSchemaUpdatePolicy(tableOptions.get(SINK_MULTIPLE_SCHEMA_UPDATE_POLICY))
+                            
.withPkAutoGenerated(tableOptions.get(SINK_MULTIPLE_PK_AUTO_GENERATED))
                             .build())
                     .append();
         } else {
diff --git 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
index 25f9e963b..9191aa575 100644
--- 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
+++ 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
@@ -400,7 +400,6 @@ public class FlinkSink {
         }
 
 
-
         /**
          * Append the iceberg sink operators to write records to iceberg table.
          *
@@ -529,7 +528,7 @@ public class FlinkSink {
 
             IcebergProcessOperator streamWriter =
                     new IcebergProcessOperator(new IcebergMultipleStreamWriter(
-                            appendMode, catalogLoader, inlongMetric, 
auditHostAndPorts));
+                            appendMode, catalogLoader, inlongMetric, 
auditHostAndPorts, multipleSinkOption));
             SingleOutputStreamOperator<MultipleWriteResult> writerStream = 
routeStream
                     
.transform(operatorName(ICEBERG_MULTIPLE_STREAM_WRITER_NAME),
                             TypeInformation.of(IcebergProcessOperator.class),
diff --git 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java
 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java
index 617eb6d69..2c67cbd51 100644
--- 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java
+++ 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java
@@ -43,6 +43,7 @@ import 
org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
 import org.apache.inlong.sort.base.metric.MetricState;
 import org.apache.inlong.sort.base.metric.SinkMetricData;
 import org.apache.inlong.sort.base.util.MetricStateUtils;
+import org.apache.inlong.sort.base.sink.MultipleSinkOption;
 import org.apache.inlong.sort.iceberg.sink.RowDataTaskWriterFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -76,6 +77,7 @@ public class IcebergMultipleStreamWriter extends 
IcebergProcessFunction<RecordWi
 
     private final boolean appendMode;
     private final CatalogLoader catalogLoader;
+    private final MultipleSinkOption multipleSinkOption;
 
     private transient Catalog catalog;
     private transient Map<TableIdentifier, IcebergSingleStreamWriter<RowData>> 
multipleWriters;
@@ -95,11 +97,13 @@ public class IcebergMultipleStreamWriter extends 
IcebergProcessFunction<RecordWi
             boolean appendMode,
             CatalogLoader catalogLoader,
             String inlongMetric,
-            String auditHostAndPorts) {
+            String auditHostAndPorts,
+            MultipleSinkOption multipleSinkOption) {
         this.appendMode = appendMode;
         this.catalogLoader = catalogLoader;
         this.inlongMetric = inlongMetric;
         this.auditHostAndPorts = auditHostAndPorts;
+        this.multipleSinkOption = multipleSinkOption;
     }
 
     @Override
@@ -170,7 +174,7 @@ public class IcebergMultipleStreamWriter extends 
IcebergProcessFunction<RecordWi
                     .map(pk -> 
recordWithSchema.getSchema().findField(pk).fieldId())
                     .collect(Collectors.toList());
             // if physical primary key not exist, put all field to logical 
primary key
-            if (equalityFieldIds.isEmpty()) {
+            if (equalityFieldIds.isEmpty() && 
multipleSinkOption.isPkAutoGenerated()) {
                 equalityFieldIds = 
recordWithSchema.getSchema().columns().stream()
                         .map(NestedField::fieldId)
                         .collect(Collectors.toList());

Reply via email to