This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 49d397c61d [Improve][hbase] The specified column is written to the
specified column family (#5234)
49d397c61d is described below
commit 49d397c61df50e2dc5bc779284de0b7935cdd888
Author: ZhiLin Li <[email protected]>
AuthorDate: Wed Jul 10 11:20:18 2024 +0800
[Improve][hbase] The specified column is written to the specified column
family (#5234)
---
docs/en/connector-v2/sink/Hbase.md | 13 ++++++
docs/zh/connector-v2/sink/Hbase.md | 14 +++++++
.../connectors/seatunnel/hbase/sink/HbaseSink.java | 5 +++
.../seatunnel/hbase/sink/HbaseSinkWriter.java | 13 ++++--
.../seatunnel/e2e/connector/hbase/HbaseIT.java | 44 ++++++++++++++++++++
.../test/resources/fake-to-assign-cf-hbase.conf | 47 ++++++++++++++++++++++
6 files changed, 133 insertions(+), 3 deletions(-)
diff --git a/docs/en/connector-v2/sink/Hbase.md
b/docs/en/connector-v2/sink/Hbase.md
index 51cb4b3362..dd75d21f0b 100644
--- a/docs/en/connector-v2/sink/Hbase.md
+++ b/docs/en/connector-v2/sink/Hbase.md
@@ -116,7 +116,20 @@ Hbase {
all_columns = seatunnel
}
}
+```
+
+## Writes To The Specified Column Family
+```hocon
+Hbase {
+ zookeeper_quorum = "hbase_e2e:2181"
+ table = "assign_cf_table"
+ rowkey_column = ["id"]
+ family_name {
+ c_double = "cf1"
+ c_bigint = "cf2"
+ }
+}
```
## Changelog
diff --git a/docs/zh/connector-v2/sink/Hbase.md
b/docs/zh/connector-v2/sink/Hbase.md
index a9839dbafa..871cad206c 100644
--- a/docs/zh/connector-v2/sink/Hbase.md
+++ b/docs/zh/connector-v2/sink/Hbase.md
@@ -119,6 +119,20 @@ Hbase {
```
+## 写入指定列族
+
+```hocon
+Hbase {
+ zookeeper_quorum = "hbase_e2e:2181"
+ table = "assign_cf_table"
+ rowkey_column = ["id"]
+ family_name {
+ c_double = "cf1"
+ c_bigint = "cf2"
+ }
+}
+```
+
## 更改日志
### 下一个版本
diff --git
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSink.java
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSink.java
index 81452eb989..848e1e8205 100644
---
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSink.java
+++
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSink.java
@@ -80,6 +80,11 @@ public class HbaseSink extends
AbstractSimpleSink<SeaTunnelRow, Void> {
getPluginName(), PluginType.SINK,
result.getMsg()));
}
this.hbaseParameters = HbaseParameters.buildWithConfig(pluginConfig);
+ if (hbaseParameters.getFamilyNames().size() == 0) {
+ throw new HbaseConnectorException(
+ SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+ "The corresponding field options should be configured and
should not be empty Refer to the hbase sink document");
+ }
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriter.java
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriter.java
index 72722e582e..7683d6aab0 100644
---
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriter.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.List;
+import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -62,7 +63,7 @@ public class HbaseSinkWriter extends
AbstractSinkWriter<SeaTunnelRow, Void> {
private final int versionColumnIndex;
- private String defaultFamilyName = "value";
+ private String writeAllColumnFamily;
public HbaseSinkWriter(
SeaTunnelRowType seaTunnelRowType,
@@ -76,7 +77,7 @@ public class HbaseSinkWriter extends
AbstractSinkWriter<SeaTunnelRow, Void> {
this.versionColumnIndex = versionColumnIndex;
if (hbaseParameters.getFamilyNames().size() == 1) {
- defaultFamilyName =
hbaseParameters.getFamilyNames().getOrDefault(ALL_COLUMNS, "value");
+ this.writeAllColumnFamily =
hbaseParameters.getFamilyNames().get(ALL_COLUMNS);
}
// initialize hbase configuration
@@ -131,8 +132,14 @@ public class HbaseSinkWriter extends
AbstractSinkWriter<SeaTunnelRow, Void> {
.collect(Collectors.toList());
for (Integer writeColumnIndex : writeColumnIndexes) {
String fieldName = seaTunnelRowType.getFieldName(writeColumnIndex);
+ // This is the family of columns that we define to be written
through the.conf file
+ Map<String, String> configurationFamilyNames =
hbaseParameters.getFamilyNames();
String familyName =
- hbaseParameters.getFamilyNames().getOrDefault(fieldName,
defaultFamilyName);
+ configurationFamilyNames.getOrDefault(fieldName,
writeAllColumnFamily);
+ if (!configurationFamilyNames.containsKey(ALL_COLUMNS)
+ && !configurationFamilyNames.containsKey(fieldName)) {
+ continue;
+ }
byte[] bytes = convertColumnToBytes(row, writeColumnIndex);
if (bytes != null) {
put.addColumn(Bytes.toBytes(familyName),
Bytes.toBytes(fieldName), bytes);
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java
index d3cd57b326..13a7a8805a 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java
@@ -56,6 +56,7 @@ import java.util.Objects;
public class HbaseIT extends TestSuiteBase implements TestResource {
private static final String TABLE_NAME = "seatunnel_test";
+ private static final String ASSIGN_CF_TABLE_NAME = "assign_cf_table";
private static final String FAMILY_NAME = "info";
@@ -64,6 +65,7 @@ public class HbaseIT extends TestSuiteBase implements
TestResource {
private Admin admin;
private TableName table;
+ private TableName tableAssign;
private HbaseCluster hbaseCluster;
@@ -75,7 +77,9 @@ public class HbaseIT extends TestSuiteBase implements
TestResource {
// Create table for hbase sink test
log.info("initial");
hbaseCluster.createTable(TABLE_NAME, Arrays.asList(FAMILY_NAME));
+ hbaseCluster.createTable(ASSIGN_CF_TABLE_NAME, Arrays.asList("cf1",
"cf2"));
table = TableName.valueOf(TABLE_NAME);
+ tableAssign = TableName.valueOf(ASSIGN_CF_TABLE_NAME);
}
@AfterAll
@@ -133,6 +137,46 @@ public class HbaseIT extends TestSuiteBase implements
TestResource {
scanner.close();
}
+ @TestTemplate
+ public void testHbaseSinkAssignCfSink(TestContainer container)
+ throws IOException, InterruptedException {
+ deleteData(tableAssign);
+
+ Container.ExecResult sinkExecResult =
container.executeJob("/fake-to-assign-cf-hbase.conf");
+ Assertions.assertEquals(0, sinkExecResult.getExitCode());
+
+ Table hbaseTable = hbaseConnection.getTable(tableAssign);
+ Scan scan = new Scan();
+ ResultScanner scanner = hbaseTable.getScanner(scan);
+ ArrayList<Result> results = new ArrayList<>();
+ for (Result result : scanner) {
+ results.add(result);
+ }
+
+ Assertions.assertEquals(results.size(), 5);
+
+ if (scanner != null) {
+ scanner.close();
+ }
+ int cf1Count = 0;
+ int cf2Count = 0;
+
+ for (Result result : results) {
+ for (Cell cell : result.listCells()) {
+ String family = Bytes.toString(CellUtil.cloneFamily(cell));
+ if ("cf1".equals(family)) {
+ cf1Count++;
+ }
+ if ("cf2".equals(family)) {
+ cf2Count++;
+ }
+ }
+ }
+ // check cf1 and cf2
+ Assertions.assertEquals(cf1Count, 5);
+ Assertions.assertEquals(cf2Count, 5);
+ }
+
private void deleteData(TableName table) throws IOException {
Table hbaseTable = hbaseConnection.getTable(table);
Scan scan = new Scan();
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake-to-assign-cf-hbase.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake-to-assign-cf-hbase.conf
new file mode 100644
index 0000000000..26f2307dfd
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake-to-assign-cf-hbase.conf
@@ -0,0 +1,47 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the
feature source plugin**
+ FakeSource {
+ schema = {
+ fields {
+ id = int
+ c_double = double
+ c_bigint = bigint
+ }
+ }
+ }
+}
+
+sink {
+ Hbase {
+ zookeeper_quorum = "hbase_e2e:2181"
+ table = "assign_cf_table"
+ rowkey_column = ["id"]
+ family_name {
+ c_double = "cf1"
+ c_bigint = "cf2"
+ }
+ }
+}
\ No newline at end of file