This is an automated email from the ASF dual-hosted git repository.
luchunliang pushed a commit to branch branch-1.9
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/branch-1.9 by this push:
new 4ef2b3d1e6 [INLONG-8953][Sort] Fix IcebergSource defaults the
StartSnapshot to the latest (#8954)
4ef2b3d1e6 is described below
commit 4ef2b3d1e61f6e3195b11a4477d92db01c150528
Author: vernedeng <[email protected]>
AuthorDate: Thu Sep 21 10:10:14 2023 +0800
[INLONG-8953][Sort] Fix IcebergSource defaults the StartSnapshot to the
latest (#8954)
---
.../apache/inlong/sort/protocol/constant/IcebergConstant.java | 10 ++++++++++
.../inlong/sort/protocol/node/extract/IcebergExtractNode.java | 3 +++
2 files changed, 13 insertions(+)
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/IcebergConstant.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/IcebergConstant.java
index 676f7f4435..2cce35fb9b 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/IcebergConstant.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/IcebergConstant.java
@@ -34,6 +34,7 @@ public class IcebergConstant {
public static final String WAREHOUSE_KEY = "warehouse";
public static final String START_SNAPSHOT_ID = "start-snapshot-id";
public static final String STREAMING = "streaming";
+ public static final String STARTING_STRATEGY_KEY = "starting-strategy";
/**
* Iceberg supported catalog type
@@ -65,4 +66,13 @@ public class IcebergConstant {
throw new IllegalArgumentException(String.format("Unsupport
catalogType:%s", name));
}
}
+
+ public enum StreamingStartingStrategy {
+ TABLE_SCAN_THEN_INCREMENTAL,
+ INCREMENTAL_FROM_LATEST_SNAPSHOT,
+ INCREMENTAL_FROM_EARLIEST_SNAPSHOT,
+ INCREMENTAL_FROM_SNAPSHOT_ID,
+ INCREMENTAL_FROM_SNAPSHOT_TIMESTAMP;
+
+ }
}
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/IcebergExtractNode.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/IcebergExtractNode.java
index cbceb5485a..e87c111743 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/IcebergExtractNode.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/IcebergExtractNode.java
@@ -115,6 +115,7 @@ public class IcebergExtractNode extends ExtractNode
implements InlongMetric, Met
options.put(IcebergConstant.TABLE_KEY, tableName);
options.put(IcebergConstant.CATALOG_TYPE_KEY, catalogType.name());
options.put(IcebergConstant.CATALOG_NAME_KEY, catalogName);
+ // support streaming only
options.put(IcebergConstant.STREAMING, "true");
options.put(IcebergConstant.STARTING_STRATEGY_KEY,
IcebergConstant.StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL.name());
@@ -126,6 +127,8 @@ public class IcebergExtractNode extends ExtractNode
implements InlongMetric, Met
}
if (null != startSnapShotId) {
options.put(IcebergConstant.START_SNAPSHOT_ID,
startSnapShotId.toString());
+ options.put(IcebergConstant.STARTING_STRATEGY_KEY,
+
IcebergConstant.StreamingStartingStrategy.INCREMENTAL_FROM_SNAPSHOT_ID.name());
}
return options;
}