This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 52498f1d1b [INLONG-9240][Manager][Sort] Add append mode for the
Iceberg connector (#9242)
52498f1d1b is described below
commit 52498f1d1bc9038b3c567353fdf37685d2112142
Author: vernedeng <[email protected]>
AuthorDate: Wed Nov 15 15:51:19 2023 +0800
[INLONG-9240][Manager][Sort] Add append mode for the Iceberg connector
(#9242)
---
.../apache/inlong/manager/pojo/sink/iceberg/IcebergSink.java | 3 +++
.../inlong/manager/pojo/sink/iceberg/IcebergSinkRequest.java | 6 ++++++
.../manager/pojo/sort/node/provider/IcebergProvider.java | 3 ++-
.../inlong/sort/protocol/constant/IcebergConstant.java | 2 ++
.../inlong/sort/protocol/node/load/IcebergLoadNode.java | 12 +++++++++---
.../inlong/sort/protocol/node/load/IcebergLoadNodeTest.java | 3 ++-
.../apache/inlong/sort/parser/IcebergNodeSqlParserTest.java | 6 ++++--
7 files changed, 28 insertions(+), 7 deletions(-)
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergSink.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergSink.java
index e7bb7325d0..19b988b79d 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergSink.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergSink.java
@@ -72,6 +72,9 @@ public class IcebergSink extends StreamSink {
@ApiModelProperty("Primary key")
private String primaryKey;
+ @ApiModelProperty("append mode, UPSERT or APPEND")
+ private String appendMode;
+
public IcebergSink() {
this.setSinkType(SinkType.ICEBERG);
}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergSinkRequest.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergSinkRequest.java
index 129790f90c..aa3c606b3f 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergSinkRequest.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergSinkRequest.java
@@ -27,6 +27,8 @@ import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
+import javax.validation.constraints.Pattern;
+
/**
* Iceberg sink request.
*/
@@ -64,4 +66,8 @@ public class IcebergSinkRequest extends SinkRequest {
@ApiModelProperty("Primary key")
private String primaryKey;
+ @ApiModelProperty("append mode, UPSERT or APPEND")
+ @Pattern(regexp = "(?i)(UPSERT|APPEND)", message = "Invalid append mode")
+ private String appendMode;
+
}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/IcebergProvider.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/IcebergProvider.java
index 4af912f347..2147c3159a 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/IcebergProvider.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/IcebergProvider.java
@@ -97,7 +97,8 @@ public class IcebergProvider implements ExtractNodeProvider,
LoadNodeProvider {
icebergSink.getPrimaryKey(),
catalogType,
icebergSink.getCatalogUri(),
- icebergSink.getWarehouse());
+ icebergSink.getWarehouse(),
+ icebergSink.getAppendMode());
}
@Override
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/IcebergConstant.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/IcebergConstant.java
index 2cce35fb9b..ce9b81b8a9 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/IcebergConstant.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/IcebergConstant.java
@@ -36,6 +36,8 @@ public class IcebergConstant {
public static final String STREAMING = "streaming";
public static final String STARTING_STRATEGY_KEY = "starting-strategy";
+ public static final String APPEND_MODE_KEY = "appendMode";
+
/**
* Iceberg supported catalog type
*/
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java
index 6ffaf5f2da..7f76891dfd 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java
@@ -73,6 +73,9 @@ public class IcebergLoadNode extends LoadNode implements
InlongMetric, Metadata,
@JsonProperty("warehouse")
private String warehouse;
+ @JsonProperty("appendMode")
+ private String appendMode;
+
@JsonCreator
public IcebergLoadNode(@JsonProperty("id") String id,
@JsonProperty("name") String name,
@@ -87,7 +90,8 @@ public class IcebergLoadNode extends LoadNode implements
InlongMetric, Metadata,
@JsonProperty("primaryKey") String primaryKey,
@JsonProperty("catalogType") IcebergConstant.CatalogType
catalogType,
@JsonProperty("uri") String uri,
- @JsonProperty("warehouse") String warehouse) {
+ @JsonProperty("warehouse") String warehouse,
+ @JsonProperty("appendMode") String appendMode) {
super(id, name, fields, fieldRelations, filters, filterStrategy,
sinkParallelism, properties);
this.tableName = Preconditions.checkNotNull(tableName, "table name is
null");
this.dbName = Preconditions.checkNotNull(dbName, "db name is null");
@@ -95,6 +99,7 @@ public class IcebergLoadNode extends LoadNode implements
InlongMetric, Metadata,
this.catalogType = catalogType == null ? CatalogType.HIVE :
catalogType;
this.uri = uri;
this.warehouse = warehouse;
+ this.appendMode = appendMode;
}
@Override
@@ -108,11 +113,12 @@ public class IcebergLoadNode extends LoadNode implements
InlongMetric, Metadata,
options.put(IcebergConstant.DEFAULT_DATABASE_KEY, dbName);
options.put(IcebergConstant.CATALOG_TYPE_KEY, catalogType.name());
options.put(IcebergConstant.CATALOG_NAME_KEY, catalogType.name());
+ options.put(IcebergConstant.APPEND_MODE_KEY, appendMode);
if (null != uri) {
- options.put("uri", uri);
+ options.put(IcebergConstant.URI_KEY, uri);
}
if (null != warehouse) {
- options.put("warehouse", warehouse);
+ options.put(IcebergConstant.WAREHOUSE_KEY, warehouse);
}
return options;
}
diff --git
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNodeTest.java
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNodeTest.java
index 69f2031235..24d5b39730 100644
---
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNodeTest.java
+++
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNodeTest.java
@@ -45,6 +45,7 @@ public class IcebergLoadNodeTest extends
SerializeBaseTest<IcebergLoadNode> {
"id",
CatalogType.HIVE,
"thrift://localhost:9083",
- "hdfs://localhost:9000/user/iceberg/warehouse");
+ "hdfs://localhost:9000/user/iceberg/warehouse",
+ null);
}
}
diff --git
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/IcebergNodeSqlParserTest.java
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/IcebergNodeSqlParserTest.java
index 41d5701fdf..fc0b889db0 100644
---
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/IcebergNodeSqlParserTest.java
+++
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/IcebergNodeSqlParserTest.java
@@ -100,7 +100,8 @@ public class IcebergNodeSqlParserTest extends
AbstractTestBase {
null,
CatalogType.HADOOP,
null,
- "hdfs://localhost:9000/iceberg/warehouse");
+ "hdfs://localhost:9000/iceberg/warehouse",
+ null);
}
private IcebergLoadNode buildIcebergLoadNodeWithHiveCatalog() {
@@ -139,7 +140,8 @@ public class IcebergNodeSqlParserTest extends
AbstractTestBase {
null,
CatalogType.HIVE,
"thrift://localhost:9083",
- "/hive/warehouse");
+ "/hive/warehouse",
+ null);
}
/**