This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 49d397c61d [Improve][hbase] The specified column is written to the 
specified column family (#5234)
49d397c61d is described below

commit 49d397c61df50e2dc5bc779284de0b7935cdd888
Author: ZhiLin Li <[email protected]>
AuthorDate: Wed Jul 10 11:20:18 2024 +0800

    [Improve][hbase] The specified column is written to the specified column 
family (#5234)
---
 docs/en/connector-v2/sink/Hbase.md                 | 13 ++++++
 docs/zh/connector-v2/sink/Hbase.md                 | 14 +++++++
 .../connectors/seatunnel/hbase/sink/HbaseSink.java |  5 +++
 .../seatunnel/hbase/sink/HbaseSinkWriter.java      | 13 ++++--
 .../seatunnel/e2e/connector/hbase/HbaseIT.java     | 44 ++++++++++++++++++++
 .../test/resources/fake-to-assign-cf-hbase.conf    | 47 ++++++++++++++++++++++
 6 files changed, 133 insertions(+), 3 deletions(-)

diff --git a/docs/en/connector-v2/sink/Hbase.md 
b/docs/en/connector-v2/sink/Hbase.md
index 51cb4b3362..dd75d21f0b 100644
--- a/docs/en/connector-v2/sink/Hbase.md
+++ b/docs/en/connector-v2/sink/Hbase.md
@@ -116,7 +116,20 @@ Hbase {
     all_columns = seatunnel
   }
 }
+```
+
+## Writes To The Specified Column Family
 
+```hocon
+Hbase {
+  zookeeper_quorum = "hbase_e2e:2181"
+  table = "assign_cf_table"
+  rowkey_column = ["id"]
+  family_name {
+    c_double = "cf1"
+    c_bigint = "cf2"
+  }
+}
 ```
 
 ## Changelog
diff --git a/docs/zh/connector-v2/sink/Hbase.md 
b/docs/zh/connector-v2/sink/Hbase.md
index a9839dbafa..871cad206c 100644
--- a/docs/zh/connector-v2/sink/Hbase.md
+++ b/docs/zh/connector-v2/sink/Hbase.md
@@ -119,6 +119,20 @@ Hbase {
 
 ```
 
+## 写入指定列族
+
+```hocon
+Hbase {
+  zookeeper_quorum = "hbase_e2e:2181"
+  table = "assign_cf_table"
+  rowkey_column = ["id"]
+  family_name {
+    c_double = "cf1"
+    c_bigint = "cf2"
+  }
+}
+```
+
 ## 更改日志
 
 ### 下一个版本
diff --git 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSink.java
 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSink.java
index 81452eb989..848e1e8205 100644
--- 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSink.java
+++ 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSink.java
@@ -80,6 +80,11 @@ public class HbaseSink extends 
AbstractSimpleSink<SeaTunnelRow, Void> {
                             getPluginName(), PluginType.SINK, 
result.getMsg()));
         }
         this.hbaseParameters = HbaseParameters.buildWithConfig(pluginConfig);
+        if (hbaseParameters.getFamilyNames().size() == 0) {
+            throw new HbaseConnectorException(
+                    SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+                    "The corresponding field options should be configured and 
should not be empty Refer to the hbase sink document");
+        }
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriter.java
 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriter.java
index 72722e582e..7683d6aab0 100644
--- 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriter.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.util.Bytes;
 import java.io.IOException;
 import java.nio.charset.Charset;
 import java.util.List;
+import java.util.Map;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
@@ -62,7 +63,7 @@ public class HbaseSinkWriter extends 
AbstractSinkWriter<SeaTunnelRow, Void> {
 
     private final int versionColumnIndex;
 
-    private String defaultFamilyName = "value";
+    private String writeAllColumnFamily;
 
     public HbaseSinkWriter(
             SeaTunnelRowType seaTunnelRowType,
@@ -76,7 +77,7 @@ public class HbaseSinkWriter extends 
AbstractSinkWriter<SeaTunnelRow, Void> {
         this.versionColumnIndex = versionColumnIndex;
 
         if (hbaseParameters.getFamilyNames().size() == 1) {
-            defaultFamilyName = 
hbaseParameters.getFamilyNames().getOrDefault(ALL_COLUMNS, "value");
+            this.writeAllColumnFamily = 
hbaseParameters.getFamilyNames().get(ALL_COLUMNS);
         }
 
         // initialize hbase configuration
@@ -131,8 +132,14 @@ public class HbaseSinkWriter extends 
AbstractSinkWriter<SeaTunnelRow, Void> {
                         .collect(Collectors.toList());
         for (Integer writeColumnIndex : writeColumnIndexes) {
             String fieldName = seaTunnelRowType.getFieldName(writeColumnIndex);
+            // This is the family of columns that we define to be written 
through the.conf file
+            Map<String, String> configurationFamilyNames = 
hbaseParameters.getFamilyNames();
             String familyName =
-                    hbaseParameters.getFamilyNames().getOrDefault(fieldName, 
defaultFamilyName);
+                    configurationFamilyNames.getOrDefault(fieldName, 
writeAllColumnFamily);
+            if (!configurationFamilyNames.containsKey(ALL_COLUMNS)
+                    && !configurationFamilyNames.containsKey(fieldName)) {
+                continue;
+            }
             byte[] bytes = convertColumnToBytes(row, writeColumnIndex);
             if (bytes != null) {
                 put.addColumn(Bytes.toBytes(familyName), 
Bytes.toBytes(fieldName), bytes);
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java
index d3cd57b326..13a7a8805a 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java
@@ -56,6 +56,7 @@ import java.util.Objects;
 public class HbaseIT extends TestSuiteBase implements TestResource {
 
     private static final String TABLE_NAME = "seatunnel_test";
+    private static final String ASSIGN_CF_TABLE_NAME = "assign_cf_table";
 
     private static final String FAMILY_NAME = "info";
 
@@ -64,6 +65,7 @@ public class HbaseIT extends TestSuiteBase implements 
TestResource {
     private Admin admin;
 
     private TableName table;
+    private TableName tableAssign;
 
     private HbaseCluster hbaseCluster;
 
@@ -75,7 +77,9 @@ public class HbaseIT extends TestSuiteBase implements 
TestResource {
         // Create table for hbase sink test
         log.info("initial");
         hbaseCluster.createTable(TABLE_NAME, Arrays.asList(FAMILY_NAME));
+        hbaseCluster.createTable(ASSIGN_CF_TABLE_NAME, Arrays.asList("cf1", 
"cf2"));
         table = TableName.valueOf(TABLE_NAME);
+        tableAssign = TableName.valueOf(ASSIGN_CF_TABLE_NAME);
     }
 
     @AfterAll
@@ -133,6 +137,46 @@ public class HbaseIT extends TestSuiteBase implements 
TestResource {
         scanner.close();
     }
 
+    @TestTemplate
+    public void testHbaseSinkAssignCfSink(TestContainer container)
+            throws IOException, InterruptedException {
+        deleteData(tableAssign);
+
+        Container.ExecResult sinkExecResult = 
container.executeJob("/fake-to-assign-cf-hbase.conf");
+        Assertions.assertEquals(0, sinkExecResult.getExitCode());
+
+        Table hbaseTable = hbaseConnection.getTable(tableAssign);
+        Scan scan = new Scan();
+        ResultScanner scanner = hbaseTable.getScanner(scan);
+        ArrayList<Result> results = new ArrayList<>();
+        for (Result result : scanner) {
+            results.add(result);
+        }
+
+        Assertions.assertEquals(results.size(), 5);
+
+        if (scanner != null) {
+            scanner.close();
+        }
+        int cf1Count = 0;
+        int cf2Count = 0;
+
+        for (Result result : results) {
+            for (Cell cell : result.listCells()) {
+                String family = Bytes.toString(CellUtil.cloneFamily(cell));
+                if ("cf1".equals(family)) {
+                    cf1Count++;
+                }
+                if ("cf2".equals(family)) {
+                    cf2Count++;
+                }
+            }
+        }
+        // check cf1 and cf2
+        Assertions.assertEquals(cf1Count, 5);
+        Assertions.assertEquals(cf2Count, 5);
+    }
+
     private void deleteData(TableName table) throws IOException {
         Table hbaseTable = hbaseConnection.getTable(table);
         Scan scan = new Scan();
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake-to-assign-cf-hbase.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake-to-assign-cf-hbase.conf
new file mode 100644
index 0000000000..26f2307dfd
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake-to-assign-cf-hbase.conf
@@ -0,0 +1,47 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  # This is a example source plugin **only for test and demonstrate the 
feature source plugin**
+  FakeSource {
+    schema = {
+      fields {
+        id = int
+        c_double = double
+        c_bigint = bigint
+      }
+    }
+  }
+}
+
+sink {
+  Hbase {
+    zookeeper_quorum = "hbase_e2e:2181"
+    table = "assign_cf_table"
+    rowkey_column = ["id"]
+    family_name {
+      c_double = "cf1"
+      c_bigint = "cf2"
+    }
+  }
+}
\ No newline at end of file

Reply via email to