This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new adafd80255 [feature][connector-v2-hbase-sink] Support Connector v2
HBase sink TTL data writing (#7116)
adafd80255 is described below
commit adafd802551b9acf9c942965e4f67986a92b07a8
Author: Jast <[email protected]>
AuthorDate: Sat Jul 6 10:00:04 2024 +0800
[feature][connector-v2-hbase-sink] Support Connector v2 HBase sink TTL data
writing (#7116)
---
docs/en/connector-v2/sink/Hbase.md | 5 +++++
docs/zh/connector-v2/sink/Hbase.md | 5 +++++
.../seatunnel/connectors/seatunnel/hbase/config/HbaseConfig.java | 7 +++++++
.../connectors/seatunnel/hbase/config/HbaseParameters.java | 6 ++++++
.../seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriter.java | 3 +++
5 files changed, 26 insertions(+)
diff --git a/docs/en/connector-v2/sink/Hbase.md
b/docs/en/connector-v2/sink/Hbase.md
index 58c0a16c34..51cb4b3362 100644
--- a/docs/en/connector-v2/sink/Hbase.md
+++ b/docs/en/connector-v2/sink/Hbase.md
@@ -26,6 +26,7 @@ Output data to Hbase
| encoding | string | no | utf8 |
| hbase_extra_config | string | no | - |
| common-options | | no | - |
+| ttl | long | no | - |
### zookeeper_quorum [string]
@@ -95,6 +96,10 @@ The encoding of string field, support [`utf8`, `gbk`],
default `utf8`
The extra configuration of hbase
+### ttl [long]
+
+Hbase writes data TTL time, the default is based on the TTL set in the table,
unit: milliseconds
+
### common options
Sink plugin common parameters, please refer to [Sink Common
Options](common-options.md) for details
diff --git a/docs/zh/connector-v2/sink/Hbase.md
b/docs/zh/connector-v2/sink/Hbase.md
index 9e79ed9799..a9839dbafa 100644
--- a/docs/zh/connector-v2/sink/Hbase.md
+++ b/docs/zh/connector-v2/sink/Hbase.md
@@ -26,6 +26,7 @@
| encoding | string | no | utf8 |
| hbase_extra_config | string | no | - |
| common-options | | no | - |
+| ttl | long | no | - |
### zookeeper_quorum [string]
@@ -95,6 +96,10 @@ hbase 客户端的写入缓冲区大小,默认 8 * 1024 * 1024
hbase扩展配置
+### ttl [long]
+
+hbase 写入数据 TTL 时间,默认以表设置的TTL为准,单位毫秒
+
### 常见选项
Sink 插件常用参数,详见 Sink 常用选项 [Sink Common Options](common-options.md)
diff --git
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseConfig.java
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseConfig.java
index 565f1b4b48..88c068bee1 100644
---
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseConfig.java
+++
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseConfig.java
@@ -97,6 +97,13 @@ public class HbaseConfig {
.noDefaultValue()
.withDescription("Hbase extra config");
+ public static final Option<Long> HBASE_TTL_CONFIG =
+ Options.key("ttl")
+ .longType()
+ .defaultValue(-1L)
+ .withDescription(
+ "The expiration time configuration for writing
hbase data. The default value is -1, indicating no expiration time.");
+
public enum NullMode {
SKIP,
EMPTY;
diff --git
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java
index 858030fe2a..490e248107 100644
---
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java
+++
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java
@@ -31,6 +31,7 @@ import java.util.Map;
import static
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ENCODING;
import static
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.FAMILY_NAME;
import static
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.HBASE_EXTRA_CONFIG;
+import static
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.HBASE_TTL_CONFIG;
import static
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.NULL_MODE;
import static
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.QUERY_COLUMNS;
import static
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ROWKEY_COLUMNS;
@@ -59,6 +60,8 @@ public class HbaseParameters implements Serializable {
private Map<String, String> hbaseExtraConfig;
+ @Builder.Default private Long ttl = HBASE_TTL_CONFIG.defaultValue();
+
@Builder.Default private String rowkeyDelimiter =
ROWKEY_DELIMITER.defaultValue();
@Builder.Default private HbaseConfig.NullMode nullMode =
NULL_MODE.defaultValue();
@@ -80,6 +83,9 @@ public class HbaseParameters implements Serializable {
TypesafeConfigUtils.configToMap(pluginConfig.getConfig(FAMILY_NAME.key())));
// optional parameters
+ if (pluginConfig.hasPath(HBASE_TTL_CONFIG.key())) {
+ builder.ttl(pluginConfig.getLong(HBASE_TTL_CONFIG.key()));
+ }
if (pluginConfig.hasPath(ROWKEY_DELIMITER.key())) {
builder.rowkeyDelimiter(pluginConfig.getString(ROWKEY_DELIMITER.key()));
}
diff --git
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriter.java
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriter.java
index 0455245c67..72722e582e 100644
---
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriter.java
@@ -117,6 +117,9 @@ public class HbaseSinkWriter extends
AbstractSinkWriter<SeaTunnelRow, Void> {
timestamp = (Long) row.getField(versionColumnIndex);
}
Put put = new Put(rowkey, timestamp);
+ if (hbaseParameters.getTtl() != -1 && hbaseParameters.getTtl() > 0) {
+ put.setTTL(hbaseParameters.getTtl());
+ }
if (!hbaseParameters.isWalWrite()) {
put.setDurability(Durability.SKIP_WAL);
}