This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 025fa3bb88 [Feature][Connector-V2] Support multi-table sink feature 
for HBase (#7169)
025fa3bb88 is described below

commit 025fa3bb880f0ac25bff764b51ec7c9e15b489ca
Author: Bruce Wong <[email protected]>
AuthorDate: Fri Aug 30 17:23:08 2024 +0800

    [Feature][Connector-V2] Support multi-table sink feature for HBase (#7169)
---
 docs/en/connector-v2/sink/Hbase.md                 |  73 ++++++++++++
 docs/zh/connector-v2/sink/Hbase.md                 |  72 ++++++++++++
 .../seatunnel/hbase/config/HbaseParameters.java    |  52 +++------
 .../connectors/seatunnel/hbase/sink/HbaseSink.java |  55 ++-------
 .../seatunnel/hbase/sink/HbaseSinkFactory.java     |  13 +++
 .../seatunnel/hbase/sink/HbaseSinkWriter.java      |  18 ++-
 .../seatunnel/e2e/connector/hbase/HbaseIT.java     |  47 +++++++-
 .../fake-to-hbase-with-multipletable.conf          |  86 ++++++++++++++
 .../hbase-to-assert-with-multipletable.conf        | 129 +++++++++++++++++++++
 9 files changed, 451 insertions(+), 94 deletions(-)

diff --git a/docs/en/connector-v2/sink/Hbase.md 
b/docs/en/connector-v2/sink/Hbase.md
index 0f808f5e52..3ceba0982d 100644
--- a/docs/en/connector-v2/sink/Hbase.md
+++ b/docs/en/connector-v2/sink/Hbase.md
@@ -116,6 +116,79 @@ Hbase {
     all_columns = seatunnel
   }
 }
+
+```
+
+### Multiple Table
+
+```hocon
+env {
+  # You can set engine configuration here
+  execution.parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    tables_configs = [
+       {
+        schema = {
+          table = "hbase_sink_1"
+         fields {
+                    name = STRING
+                    c_string = STRING
+                    c_double = DOUBLE
+                    c_bigint = BIGINT
+                    c_float = FLOAT
+                    c_int = INT
+                    c_smallint = SMALLINT
+                    c_boolean = BOOLEAN
+                    time = BIGINT
+           }
+        }
+            rows = [
+              {
+                kind = INSERT
+                fields = ["label_1", "sink_1", 4.3, 200, 2.5, 2, 5, true, 
1627529632356]
+              }
+              ]
+       },
+       {
+       schema = {
+         table = "hbase_sink_2"
+              fields {
+                    name = STRING
+                    c_string = STRING
+                    c_double = DOUBLE
+                    c_bigint = BIGINT
+                    c_float = FLOAT
+                    c_int = INT
+                    c_smallint = SMALLINT
+                    c_boolean = BOOLEAN
+                    time = BIGINT
+              }
+       }
+           rows = [
+             {
+               kind = INSERT
+               fields = ["label_2", "sink_2", 4.3, 200, 2.5, 2, 5, true, 
1627529632357]
+             }
+             ]
+      }
+    ]
+  }
+}
+
+sink {
+  Hbase {
+    zookeeper_quorum = "hadoop001:2181,hadoop002:2181,hadoop003:2181"
+    table = "${table_name}"
+    rowkey_column = ["name"]
+    family_name {
+      all_columns = info
+    }
+  }
+}
 ```
 
 ## Writes To The Specified Column Family
diff --git a/docs/zh/connector-v2/sink/Hbase.md 
b/docs/zh/connector-v2/sink/Hbase.md
index edc9e48510..f028a8c93e 100644
--- a/docs/zh/connector-v2/sink/Hbase.md
+++ b/docs/zh/connector-v2/sink/Hbase.md
@@ -119,6 +119,78 @@ Hbase {
 
 ```
 
+### 写入多表
+
+```hocon
+env {
+  # You can set engine configuration here
+  execution.parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    tables_configs = [
+       {
+        schema = {
+          table = "hbase_sink_1"
+         fields {
+                    name = STRING
+                    c_string = STRING
+                    c_double = DOUBLE
+                    c_bigint = BIGINT
+                    c_float = FLOAT
+                    c_int = INT
+                    c_smallint = SMALLINT
+                    c_boolean = BOOLEAN
+                    time = BIGINT
+           }
+        }
+            rows = [
+              {
+                kind = INSERT
+                fields = ["label_1", "sink_1", 4.3, 200, 2.5, 2, 5, true, 
1627529632356]
+              }
+              ]
+       },
+       {
+       schema = {
+         table = "hbase_sink_2"
+              fields {
+                    name = STRING
+                    c_string = STRING
+                    c_double = DOUBLE
+                    c_bigint = BIGINT
+                    c_float = FLOAT
+                    c_int = INT
+                    c_smallint = SMALLINT
+                    c_boolean = BOOLEAN
+                    time = BIGINT
+              }
+       }
+           rows = [
+             {
+               kind = INSERT
+               fields = ["label_2", "sink_2", 4.3, 200, 2.5, 2, 5, true, 
1627529632357]
+             }
+             ]
+      }
+    ]
+  }
+}
+
+sink {
+  Hbase {
+    zookeeper_quorum = "hadoop001:2181,hadoop002:2181,hadoop003:2181"
+    table = "${table_name}"
+    rowkey_column = ["name"]
+    family_name {
+      all_columns = info
+    }
+  }
+}
+```
+
 ## 写入指定列族
 
 ```hocon
diff --git 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java
 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java
index c25f04b375..4d020700ad 100644
--- 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java
+++ 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java
@@ -19,6 +19,7 @@ package 
org.apache.seatunnel.connectors.seatunnel.hbase.config;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.common.config.TypesafeConfigUtils;
 
 import lombok.Builder;
@@ -80,44 +81,25 @@ public class HbaseParameters implements Serializable {
 
     @Builder.Default private HbaseConfig.EnCoding enCoding = 
ENCODING.defaultValue();
 
-    public static HbaseParameters buildWithSinkConfig(Config pluginConfig) {
+    public static HbaseParameters buildWithConfig(ReadonlyConfig config) {
         HbaseParametersBuilder builder = HbaseParameters.builder();
 
         // required parameters
-        
builder.zookeeperQuorum(pluginConfig.getString(ZOOKEEPER_QUORUM.key()));
-        builder.table(pluginConfig.getString(TABLE.key()));
-        
builder.rowkeyColumns(pluginConfig.getStringList(ROWKEY_COLUMNS.key()));
-        builder.familyNames(
-                
TypesafeConfigUtils.configToMap(pluginConfig.getConfig(FAMILY_NAME.key())));
-
-        // optional parameters
-        if (pluginConfig.hasPath(HBASE_TTL_CONFIG.key())) {
-            builder.ttl(pluginConfig.getLong(HBASE_TTL_CONFIG.key()));
-        }
-        if (pluginConfig.hasPath(ROWKEY_DELIMITER.key())) {
-            
builder.rowkeyDelimiter(pluginConfig.getString(ROWKEY_DELIMITER.key()));
-        }
-        if (pluginConfig.hasPath(VERSION_COLUMN.key())) {
-            
builder.versionColumn(pluginConfig.getString(VERSION_COLUMN.key()));
-        }
-        if (pluginConfig.hasPath(NULL_MODE.key())) {
-            String nullMode = pluginConfig.getString(NULL_MODE.key());
-            
builder.nullMode(HbaseConfig.NullMode.valueOf(nullMode.toUpperCase()));
-        }
-        if (pluginConfig.hasPath(WAL_WRITE.key())) {
-            builder.walWrite(pluginConfig.getBoolean(WAL_WRITE.key()));
-        }
-        if (pluginConfig.hasPath(WRITE_BUFFER_SIZE.key())) {
-            
builder.writeBufferSize(pluginConfig.getInt(WRITE_BUFFER_SIZE.key()));
-        }
-        if (pluginConfig.hasPath(ENCODING.key())) {
-            String encoding = pluginConfig.getString(ENCODING.key());
-            
builder.enCoding(HbaseConfig.EnCoding.valueOf(encoding.toUpperCase()));
-        }
-        if (pluginConfig.hasPath(HBASE_EXTRA_CONFIG.key())) {
-            Config extraConfig = 
pluginConfig.getConfig(HBASE_EXTRA_CONFIG.key());
-            
builder.hbaseExtraConfig(TypesafeConfigUtils.configToMap(extraConfig));
-        }
+        builder.zookeeperQuorum(config.get(ZOOKEEPER_QUORUM));
+        builder.rowkeyColumns(config.get(ROWKEY_COLUMNS));
+        builder.familyNames(config.get(FAMILY_NAME));
+
+        builder.table(config.get(TABLE));
+        builder.rowkeyDelimiter(config.get(ROWKEY_DELIMITER));
+        builder.versionColumn(config.get(VERSION_COLUMN));
+        String nullMode = String.valueOf(config.get(NULL_MODE));
+        builder.nullMode(HbaseConfig.NullMode.valueOf(nullMode.toUpperCase()));
+        builder.walWrite(config.get(WAL_WRITE));
+        builder.writeBufferSize(config.get(WRITE_BUFFER_SIZE));
+        String encoding = String.valueOf(config.get(ENCODING));
+        builder.enCoding(HbaseConfig.EnCoding.valueOf(encoding.toUpperCase()));
+        builder.hbaseExtraConfig(config.get(HBASE_EXTRA_CONFIG));
+        builder.ttl(config.get(HBASE_TTL_CONFIG));
         return builder.build();
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSink.java
 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSink.java
index 4f7b929223..0c592dd65a 100644
--- 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSink.java
+++ 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSink.java
@@ -19,33 +19,20 @@ package 
org.apache.seatunnel.connectors.seatunnel.hbase.sink;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
-import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
-import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.sink.SupportMultiTableSink;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
-import org.apache.seatunnel.common.constants.PluginType;
 import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
-import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
 import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseParameters;
-import 
org.apache.seatunnel.connectors.seatunnel.hbase.exception.HbaseConnectorException;
-
-import com.google.auto.service.AutoService;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
-import static 
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.FAMILY_NAME;
-import static 
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ROWKEY_COLUMNS;
-import static 
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.TABLE;
-import static 
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ZOOKEEPER_QUORUM;
-
-@AutoService(SeaTunnelSink.class)
-public class HbaseSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
+public class HbaseSink extends AbstractSimpleSink<SeaTunnelRow, Void>
+        implements SupportMultiTableSink {
 
     private Config pluginConfig;
 
@@ -62,34 +49,9 @@ public class HbaseSink extends 
AbstractSimpleSink<SeaTunnelRow, Void> {
         return HbaseSinkFactory.IDENTIFIER;
     }
 
-    @Override
-    public void prepare(Config pluginConfig) throws PrepareFailException {
-        this.pluginConfig = pluginConfig;
-        CheckResult result =
-                CheckConfigUtil.checkAllExists(
-                        pluginConfig,
-                        ZOOKEEPER_QUORUM.key(),
-                        TABLE.key(),
-                        ROWKEY_COLUMNS.key(),
-                        FAMILY_NAME.key());
-        if (!result.isSuccess()) {
-            throw new HbaseConnectorException(
-                    SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
-                    String.format(
-                            "PluginName: %s, PluginType: %s, Message: %s",
-                            getPluginName(), PluginType.SINK, 
result.getMsg()));
-        }
-        this.hbaseParameters = 
HbaseParameters.buildWithSinkConfig(pluginConfig);
-        if (hbaseParameters.getFamilyNames().size() == 0) {
-            throw new HbaseConnectorException(
-                    SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
-                    "The corresponding field options should be configured and 
should not be empty Refer to the hbase sink document");
-        }
-    }
-
-    @Override
-    public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
-        this.seaTunnelRowType = seaTunnelRowType;
+    public HbaseSink(HbaseParameters hbaseParameters, CatalogTable 
catalogTable) {
+        this.hbaseParameters = hbaseParameters;
+        this.seaTunnelRowType = 
catalogTable.getTableSchema().toPhysicalRowDataType();
         for (String rowkeyColumn : hbaseParameters.getRowkeyColumns()) {
             
this.rowkeyColumnIndexes.add(seaTunnelRowType.indexOf(rowkeyColumn));
         }
@@ -99,8 +61,7 @@ public class HbaseSink extends 
AbstractSimpleSink<SeaTunnelRow, Void> {
     }
 
     @Override
-    public AbstractSinkWriter<SeaTunnelRow, Void> 
createWriter(SinkWriter.Context context)
-            throws IOException {
+    public HbaseSinkWriter createWriter(SinkWriter.Context context) throws 
IOException {
         return new HbaseSinkWriter(
                 seaTunnelRowType, hbaseParameters, rowkeyColumnIndexes, 
versionColumnIndex);
     }
diff --git 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkFactory.java
 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkFactory.java
index 3038473c4e..1bbeb43f4e 100644
--- 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkFactory.java
@@ -18,8 +18,13 @@
 package org.apache.seatunnel.connectors.seatunnel.hbase.sink;
 
 import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.sink.SinkCommonOptions;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.connector.TableSink;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
+import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseParameters;
 
 import com.google.auto.service.AutoService;
 
@@ -50,6 +55,7 @@ public class HbaseSinkFactory implements TableSinkFactory {
         return OptionRule.builder()
                 .required(ZOOKEEPER_QUORUM, TABLE, ROWKEY_COLUMNS, FAMILY_NAME)
                 .optional(
+                        SinkCommonOptions.MULTI_TABLE_SINK_REPLICA,
                         ROWKEY_DELIMITER,
                         VERSION_COLUMN,
                         NULL_MODE,
@@ -59,4 +65,11 @@ public class HbaseSinkFactory implements TableSinkFactory {
                         HBASE_EXTRA_CONFIG)
                 .build();
     }
+
+    @Override
+    public TableSink createSink(TableSinkFactoryContext context) {
+        HbaseParameters hbaseParameters = 
HbaseParameters.buildWithConfig(context.getOptions());
+        CatalogTable catalogTable = context.getCatalogTable();
+        return () -> new HbaseSink(hbaseParameters, catalogTable);
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriter.java
 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriter.java
index 7683d6aab0..e1e312d305 100644
--- 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriter.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.connectors.seatunnel.hbase.sink;
 
+import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
@@ -41,11 +42,11 @@ import org.apache.hadoop.hbase.util.Bytes;
 import java.io.IOException;
 import java.nio.charset.Charset;
 import java.util.List;
-import java.util.Map;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
-public class HbaseSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
+public class HbaseSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void>
+        implements SupportMultiTableSinkWriter<Void> {
 
     private static final String ALL_COLUMNS = "all_columns";
 
@@ -63,7 +64,7 @@ public class HbaseSinkWriter extends 
AbstractSinkWriter<SeaTunnelRow, Void> {
 
     private final int versionColumnIndex;
 
-    private String writeAllColumnFamily;
+    private String defaultFamilyName = "value";
 
     public HbaseSinkWriter(
             SeaTunnelRowType seaTunnelRowType,
@@ -77,7 +78,8 @@ public class HbaseSinkWriter extends 
AbstractSinkWriter<SeaTunnelRow, Void> {
         this.versionColumnIndex = versionColumnIndex;
 
         if (hbaseParameters.getFamilyNames().size() == 1) {
-            this.writeAllColumnFamily = 
hbaseParameters.getFamilyNames().get(ALL_COLUMNS);
+            defaultFamilyName =
+                    hbaseParameters.getFamilyNames().getOrDefault(ALL_COLUMNS, 
defaultFamilyName);
         }
 
         // initialize hbase configuration
@@ -132,14 +134,8 @@ public class HbaseSinkWriter extends 
AbstractSinkWriter<SeaTunnelRow, Void> {
                         .collect(Collectors.toList());
         for (Integer writeColumnIndex : writeColumnIndexes) {
             String fieldName = seaTunnelRowType.getFieldName(writeColumnIndex);
-            // This is the family of columns that we define to be written 
through the.conf file
-            Map<String, String> configurationFamilyNames = 
hbaseParameters.getFamilyNames();
             String familyName =
-                    configurationFamilyNames.getOrDefault(fieldName, 
writeAllColumnFamily);
-            if (!configurationFamilyNames.containsKey(ALL_COLUMNS)
-                    && !configurationFamilyNames.containsKey(fieldName)) {
-                continue;
-            }
+                    hbaseParameters.getFamilyNames().getOrDefault(fieldName, 
defaultFamilyName);
             byte[] bytes = convertColumnToBytes(row, writeColumnIndex);
             if (bytes != null) {
                 put.addColumn(Bytes.toBytes(familyName), 
Bytes.toBytes(fieldName), bytes);
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java
index 85ceef9235..fe736f965e 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java
@@ -56,8 +56,13 @@ import java.util.Objects;
 public class HbaseIT extends TestSuiteBase implements TestResource {
 
     private static final String TABLE_NAME = "seatunnel_test";
+
     private static final String ASSIGN_CF_TABLE_NAME = "assign_cf_table";
 
+    private static final String MULTI_TABLE_ONE_NAME = "hbase_sink_1";
+
+    private static final String MULTI_TABLE_TWO_NAME = "hbase_sink_2";
+
     private static final String FAMILY_NAME = "info";
 
     private Connection hbaseConnection;
@@ -77,9 +82,14 @@ public class HbaseIT extends TestSuiteBase implements 
TestResource {
         // Create table for hbase sink test
         log.info("initial");
         hbaseCluster.createTable(TABLE_NAME, Arrays.asList(FAMILY_NAME));
+        // Create table for hbase assign cf table sink test
         hbaseCluster.createTable(ASSIGN_CF_TABLE_NAME, Arrays.asList("cf1", 
"cf2"));
         table = TableName.valueOf(TABLE_NAME);
         tableAssign = TableName.valueOf(ASSIGN_CF_TABLE_NAME);
+
+        // Create table for hbase multi-table sink test
+        hbaseCluster.createTable(MULTI_TABLE_ONE_NAME, 
Arrays.asList(FAMILY_NAME));
+        hbaseCluster.createTable(MULTI_TABLE_TWO_NAME, 
Arrays.asList(FAMILY_NAME));
     }
 
     @AfterAll
@@ -93,7 +103,11 @@ public class HbaseIT extends TestSuiteBase implements 
TestResource {
 
     @TestTemplate
     public void testHbaseSink(TestContainer container) throws IOException, 
InterruptedException {
-        fakeToHbase(container);
+        deleteData(table);
+        Container.ExecResult sinkExecResult = 
container.executeJob("/fake-to-hbase.conf");
+        Assertions.assertEquals(0, sinkExecResult.getExitCode());
+        ArrayList<Result> results = readData(table);
+        Assertions.assertEquals(results.size(), 5);
         Container.ExecResult sourceExecResult = 
container.executeJob("/hbase-to-assert.conf");
         Assertions.assertEquals(0, sourceExecResult.getExitCode());
     }
@@ -166,6 +180,25 @@ public class HbaseIT extends TestSuiteBase implements 
TestResource {
         Assertions.assertEquals(cf2Count, 5);
     }
 
+    @DisabledOnContainer(
+            value = {},
+            type = {EngineType.FLINK},
+            disabledReason = "Currently FLINK does not support multiple table 
write")
+    public void testHbaseMultiTableSink(TestContainer container)
+            throws IOException, InterruptedException {
+        TableName multiTable1 = TableName.valueOf(MULTI_TABLE_ONE_NAME);
+        TableName multiTable2 = TableName.valueOf(MULTI_TABLE_TWO_NAME);
+        deleteData(multiTable1);
+        deleteData(multiTable2);
+        Container.ExecResult sinkExecResult =
+                container.executeJob("/fake-to-hbase-with-multipletable.conf");
+        Assertions.assertEquals(0, sinkExecResult.getExitCode());
+        ArrayList<Result> results = readData(multiTable1);
+        Assertions.assertEquals(results.size(), 1);
+        results = readData(multiTable2);
+        Assertions.assertEquals(results.size(), 1);
+    }
+
     @TestTemplate
     public void testHbaseSourceWithBatchQuery(TestContainer container)
             throws IOException, InterruptedException {
@@ -200,4 +233,16 @@ public class HbaseIT extends TestSuiteBase implements 
TestResource {
             hbaseTable.delete(deleteRow);
         }
     }
+
+    public ArrayList<Result> readData(TableName table) throws IOException {
+        Table hbaseTable = hbaseConnection.getTable(table);
+        Scan scan = new Scan();
+        ResultScanner scanner = hbaseTable.getScanner(scan);
+        ArrayList<Result> results = new ArrayList<>();
+        for (Result result : scanner) {
+            results.add(result);
+        }
+        scanner.close();
+        return results;
+    }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake-to-hbase-with-multipletable.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake-to-hbase-with-multipletable.conf
new file mode 100644
index 0000000000..8972bf1324
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake-to-hbase-with-multipletable.conf
@@ -0,0 +1,86 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  # You can set engine configuration here
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    tables_configs = [
+       {
+        schema = {
+          table = "hbase_sink_1"
+         fields {
+                    name = STRING
+                    c_string = STRING
+                    c_double = DOUBLE
+                    c_bigint = BIGINT
+                    c_float = FLOAT
+                    c_int = INT
+                    c_smallint = SMALLINT
+                    c_boolean = BOOLEAN
+                    time = BIGINT
+           }
+        }
+            rows = [
+              {
+                kind = INSERT
+                fields = ["label_1", "sink_1", 4.3, 200, 2.5, 2, 5, true, 
1627529632356]
+              }
+              ]
+       },
+       {
+       schema = {
+         table = "hbase_sink_2"
+              fields {
+                    name = STRING
+                    c_string = STRING
+                    c_double = DOUBLE
+                    c_bigint = BIGINT
+                    c_float = FLOAT
+                    c_int = INT
+                    c_smallint = SMALLINT
+                    c_boolean = BOOLEAN
+              }
+       }
+           rows = [
+             {
+               kind = INSERT
+               fields = ["label_2", "sink_2", 4.3, 200, 2.5, 2, 5, true]
+             }
+             ]
+      }
+    ]
+  }
+}
+
+sink {
+  Hbase {
+    zookeeper_quorum = "hbase_e2e:2181"
+    table = "${table_name}"
+    rowkey_column = ["name"]
+    family_name {
+      all_columns = info
+    }
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/hbase-to-assert-with-multipletable.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/hbase-to-assert-with-multipletable.conf
new file mode 100644
index 0000000000..6dc7530b4b
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/hbase-to-assert-with-multipletable.conf
@@ -0,0 +1,129 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  Hbase {
+      zookeeper_quorum = "hbase_e2e:2181"
+      table = "seatunnel_test"
+      query_columns=["rowkey", "info:age", "info:c_double", 
"info:c_boolean","info:c_bigint","info:c_smallint","info:c_tinyint","info:c_float"]
+      schema = {
+            columns = [
+                  {
+                     name = rowkey
+                     type = string
+                  },
+                  {
+                     name = "info:age"
+                     type = int
+                  },
+                  {
+                     name = "info:c_double"
+                     type = double
+                  },
+                  {
+                     name = "info:c_boolean"
+                     type = boolean
+                  },
+                  {
+                     name = "info:c_bigint"
+                     type = bigint
+                  },
+                  {
+                     name = "info:c_smallint"
+                     type = smallint
+                  },
+                  {
+                     name = "info:c_tinyint"
+                     type = tinyint
+                  },
+                  {
+                     name = "info:c_float"
+                     type = float
+                  }
+             ]
+       }
+    }
+}
+
+sink {
+  Assert {
+    rules {
+      row_rules = [
+        {
+          rule_type = MAX_ROW
+          rule_value = 11
+        },
+        {
+          rule_type = MIN_ROW
+          rule_value = 11
+        }
+      ],
+      field_rules = [
+        {
+          field_name = rowkey
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = "info:c_boolean"
+          field_type = boolean
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = "info:c_double"
+          field_type = double
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = "info:c_bigint"
+          field_type = bigint
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = "info:age"
+          field_type = int
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        }
+      ]
+    }
+  }
+}
\ No newline at end of file

Reply via email to