This is an automated email from the ASF dual-hosted git repository.
zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 8626f28761 [INLONG-9377][Sort] Fix init iceberg sink failed with
upsert mode (#9379)
8626f28761 is described below
commit 8626f287613d674c05a38931d9d6974baa5d91d6
Author: vernedeng <[email protected]>
AuthorDate: Tue Dec 5 10:02:24 2023 +0800
[INLONG-9377][Sort] Fix init iceberg sink failed with upsert mode (#9379)
---
.../apache/inlong/sort/protocol/constant/IcebergConstant.java | 3 +--
.../apache/inlong/sort/protocol/node/load/IcebergLoadNode.java | 10 ++++++++--
2 files changed, 9 insertions(+), 4 deletions(-)
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/IcebergConstant.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/IcebergConstant.java
index ce9b81b8a9..cf12115080 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/IcebergConstant.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/IcebergConstant.java
@@ -35,8 +35,7 @@ public class IcebergConstant {
public static final String START_SNAPSHOT_ID = "start-snapshot-id";
public static final String STREAMING = "streaming";
public static final String STARTING_STRATEGY_KEY = "starting-strategy";
-
- public static final String APPEND_MODE_KEY = "appendMode";
+ public static final String UPSERT_ENABLED_KEY = "upsert-enabled";
/**
* Iceberg supported catalog type
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java
index 9e3fb9e8dd..3b08fa490f 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java
@@ -180,7 +180,13 @@ public class IcebergLoadNode extends LoadNode implements
InlongMetric, Metadata,
options.put(IcebergConstant.DEFAULT_DATABASE_KEY, dbName);
options.put(IcebergConstant.CATALOG_TYPE_KEY, catalogType.name());
options.put(IcebergConstant.CATALOG_NAME_KEY, catalogType.name());
- options.put(IcebergConstant.APPEND_MODE_KEY, appendMode);
+
+ if ("upsert".equals(appendMode)) {
+ options.put(IcebergConstant.UPSERT_ENABLED_KEY,
Boolean.TRUE.toString());
+ } else {
+ options.put(IcebergConstant.UPSERT_ENABLED_KEY,
Boolean.FALSE.toString());
+ }
+
if (null != uri) {
options.put(IcebergConstant.URI_KEY, uri);
}
@@ -194,7 +200,7 @@ public class IcebergLoadNode extends LoadNode implements
InlongMetric, Metadata,
options.put(SINK_MULTIPLE_DATABASE_PATTERN, databasePattern);
options.put(SINK_MULTIPLE_TABLE_PATTERN, tablePattern);
} else {
- options.put(SINK_MULTIPLE_ENABLE, "false");
+ options.put(SINK_MULTIPLE_ENABLE, Boolean.FALSE.toString());
}
return options;