This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 025fa3bb88 [Feature][Connector-V2] Support multi-table sink feature
for HBase (#7169)
025fa3bb88 is described below
commit 025fa3bb880f0ac25bff764b51ec7c9e15b489ca
Author: Bruce Wong <[email protected]>
AuthorDate: Fri Aug 30 17:23:08 2024 +0800
[Feature][Connector-V2] Support multi-table sink feature for HBase (#7169)
---
docs/en/connector-v2/sink/Hbase.md | 73 ++++++++++++
docs/zh/connector-v2/sink/Hbase.md | 72 ++++++++++++
.../seatunnel/hbase/config/HbaseParameters.java | 52 +++------
.../connectors/seatunnel/hbase/sink/HbaseSink.java | 55 ++-------
.../seatunnel/hbase/sink/HbaseSinkFactory.java | 13 +++
.../seatunnel/hbase/sink/HbaseSinkWriter.java | 18 ++-
.../seatunnel/e2e/connector/hbase/HbaseIT.java | 47 +++++++-
.../fake-to-hbase-with-multipletable.conf | 86 ++++++++++++++
.../hbase-to-assert-with-multipletable.conf | 129 +++++++++++++++++++++
9 files changed, 451 insertions(+), 94 deletions(-)
diff --git a/docs/en/connector-v2/sink/Hbase.md
b/docs/en/connector-v2/sink/Hbase.md
index 0f808f5e52..3ceba0982d 100644
--- a/docs/en/connector-v2/sink/Hbase.md
+++ b/docs/en/connector-v2/sink/Hbase.md
@@ -116,6 +116,79 @@ Hbase {
all_columns = seatunnel
}
}
+
+```
+
+### Multiple Table
+
+```hocon
+env {
+ # You can set engine configuration here
+ execution.parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ tables_configs = [
+ {
+ schema = {
+ table = "hbase_sink_1"
+ fields {
+ name = STRING
+ c_string = STRING
+ c_double = DOUBLE
+ c_bigint = BIGINT
+ c_float = FLOAT
+ c_int = INT
+ c_smallint = SMALLINT
+ c_boolean = BOOLEAN
+ time = BIGINT
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = ["label_1", "sink_1", 4.3, 200, 2.5, 2, 5, true,
1627529632356]
+ }
+ ]
+ },
+ {
+ schema = {
+ table = "hbase_sink_2"
+ fields {
+ name = STRING
+ c_string = STRING
+ c_double = DOUBLE
+ c_bigint = BIGINT
+ c_float = FLOAT
+ c_int = INT
+ c_smallint = SMALLINT
+ c_boolean = BOOLEAN
+ time = BIGINT
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = ["label_2", "sink_2", 4.3, 200, 2.5, 2, 5, true,
1627529632357]
+ }
+ ]
+ }
+ ]
+ }
+}
+
+sink {
+ Hbase {
+ zookeeper_quorum = "hadoop001:2181,hadoop002:2181,hadoop003:2181"
+ table = "${table_name}"
+ rowkey_column = ["name"]
+ family_name {
+ all_columns = info
+ }
+ }
+}
```
## Writes To The Specified Column Family
diff --git a/docs/zh/connector-v2/sink/Hbase.md
b/docs/zh/connector-v2/sink/Hbase.md
index edc9e48510..f028a8c93e 100644
--- a/docs/zh/connector-v2/sink/Hbase.md
+++ b/docs/zh/connector-v2/sink/Hbase.md
@@ -119,6 +119,78 @@ Hbase {
```
+### 写入多表
+
+```hocon
+env {
+ # You can set engine configuration here
+ execution.parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ tables_configs = [
+ {
+ schema = {
+ table = "hbase_sink_1"
+ fields {
+ name = STRING
+ c_string = STRING
+ c_double = DOUBLE
+ c_bigint = BIGINT
+ c_float = FLOAT
+ c_int = INT
+ c_smallint = SMALLINT
+ c_boolean = BOOLEAN
+ time = BIGINT
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = ["label_1", "sink_1", 4.3, 200, 2.5, 2, 5, true,
1627529632356]
+ }
+ ]
+ },
+ {
+ schema = {
+ table = "hbase_sink_2"
+ fields {
+ name = STRING
+ c_string = STRING
+ c_double = DOUBLE
+ c_bigint = BIGINT
+ c_float = FLOAT
+ c_int = INT
+ c_smallint = SMALLINT
+ c_boolean = BOOLEAN
+ time = BIGINT
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = ["label_2", "sink_2", 4.3, 200, 2.5, 2, 5, true,
1627529632357]
+ }
+ ]
+ }
+ ]
+ }
+}
+
+sink {
+ Hbase {
+ zookeeper_quorum = "hadoop001:2181,hadoop002:2181,hadoop003:2181"
+ table = "${table_name}"
+ rowkey_column = ["name"]
+ family_name {
+ all_columns = info
+ }
+ }
+}
+```
+
## 写入指定列族
```hocon
diff --git
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java
index c25f04b375..4d020700ad 100644
---
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java
+++
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java
@@ -19,6 +19,7 @@ package
org.apache.seatunnel.connectors.seatunnel.hbase.config;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.common.config.TypesafeConfigUtils;
import lombok.Builder;
@@ -80,44 +81,25 @@ public class HbaseParameters implements Serializable {
@Builder.Default private HbaseConfig.EnCoding enCoding =
ENCODING.defaultValue();
- public static HbaseParameters buildWithSinkConfig(Config pluginConfig) {
+ public static HbaseParameters buildWithConfig(ReadonlyConfig config) {
HbaseParametersBuilder builder = HbaseParameters.builder();
// required parameters
-
builder.zookeeperQuorum(pluginConfig.getString(ZOOKEEPER_QUORUM.key()));
- builder.table(pluginConfig.getString(TABLE.key()));
-
builder.rowkeyColumns(pluginConfig.getStringList(ROWKEY_COLUMNS.key()));
- builder.familyNames(
-
TypesafeConfigUtils.configToMap(pluginConfig.getConfig(FAMILY_NAME.key())));
-
- // optional parameters
- if (pluginConfig.hasPath(HBASE_TTL_CONFIG.key())) {
- builder.ttl(pluginConfig.getLong(HBASE_TTL_CONFIG.key()));
- }
- if (pluginConfig.hasPath(ROWKEY_DELIMITER.key())) {
-
builder.rowkeyDelimiter(pluginConfig.getString(ROWKEY_DELIMITER.key()));
- }
- if (pluginConfig.hasPath(VERSION_COLUMN.key())) {
-
builder.versionColumn(pluginConfig.getString(VERSION_COLUMN.key()));
- }
- if (pluginConfig.hasPath(NULL_MODE.key())) {
- String nullMode = pluginConfig.getString(NULL_MODE.key());
-
builder.nullMode(HbaseConfig.NullMode.valueOf(nullMode.toUpperCase()));
- }
- if (pluginConfig.hasPath(WAL_WRITE.key())) {
- builder.walWrite(pluginConfig.getBoolean(WAL_WRITE.key()));
- }
- if (pluginConfig.hasPath(WRITE_BUFFER_SIZE.key())) {
-
builder.writeBufferSize(pluginConfig.getInt(WRITE_BUFFER_SIZE.key()));
- }
- if (pluginConfig.hasPath(ENCODING.key())) {
- String encoding = pluginConfig.getString(ENCODING.key());
-
builder.enCoding(HbaseConfig.EnCoding.valueOf(encoding.toUpperCase()));
- }
- if (pluginConfig.hasPath(HBASE_EXTRA_CONFIG.key())) {
- Config extraConfig =
pluginConfig.getConfig(HBASE_EXTRA_CONFIG.key());
-
builder.hbaseExtraConfig(TypesafeConfigUtils.configToMap(extraConfig));
- }
+ builder.zookeeperQuorum(config.get(ZOOKEEPER_QUORUM));
+ builder.rowkeyColumns(config.get(ROWKEY_COLUMNS));
+ builder.familyNames(config.get(FAMILY_NAME));
+
+ builder.table(config.get(TABLE));
+ builder.rowkeyDelimiter(config.get(ROWKEY_DELIMITER));
+ builder.versionColumn(config.get(VERSION_COLUMN));
+ String nullMode = String.valueOf(config.get(NULL_MODE));
+ builder.nullMode(HbaseConfig.NullMode.valueOf(nullMode.toUpperCase()));
+ builder.walWrite(config.get(WAL_WRITE));
+ builder.writeBufferSize(config.get(WRITE_BUFFER_SIZE));
+ String encoding = String.valueOf(config.get(ENCODING));
+ builder.enCoding(HbaseConfig.EnCoding.valueOf(encoding.toUpperCase()));
+ builder.hbaseExtraConfig(config.get(HBASE_EXTRA_CONFIG));
+ builder.ttl(config.get(HBASE_TTL_CONFIG));
return builder.build();
}
diff --git
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSink.java
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSink.java
index 4f7b929223..0c592dd65a 100644
---
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSink.java
+++
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSink.java
@@ -19,33 +19,20 @@ package
org.apache.seatunnel.connectors.seatunnel.hbase.sink;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
-import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
-import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.sink.SupportMultiTableSink;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
-import org.apache.seatunnel.common.constants.PluginType;
import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
-import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseParameters;
-import
org.apache.seatunnel.connectors.seatunnel.hbase.exception.HbaseConnectorException;
-
-import com.google.auto.service.AutoService;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-import static
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.FAMILY_NAME;
-import static
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ROWKEY_COLUMNS;
-import static
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.TABLE;
-import static
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ZOOKEEPER_QUORUM;
-
-@AutoService(SeaTunnelSink.class)
-public class HbaseSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
+public class HbaseSink extends AbstractSimpleSink<SeaTunnelRow, Void>
+ implements SupportMultiTableSink {
private Config pluginConfig;
@@ -62,34 +49,9 @@ public class HbaseSink extends
AbstractSimpleSink<SeaTunnelRow, Void> {
return HbaseSinkFactory.IDENTIFIER;
}
- @Override
- public void prepare(Config pluginConfig) throws PrepareFailException {
- this.pluginConfig = pluginConfig;
- CheckResult result =
- CheckConfigUtil.checkAllExists(
- pluginConfig,
- ZOOKEEPER_QUORUM.key(),
- TABLE.key(),
- ROWKEY_COLUMNS.key(),
- FAMILY_NAME.key());
- if (!result.isSuccess()) {
- throw new HbaseConnectorException(
- SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
- String.format(
- "PluginName: %s, PluginType: %s, Message: %s",
- getPluginName(), PluginType.SINK,
result.getMsg()));
- }
- this.hbaseParameters =
HbaseParameters.buildWithSinkConfig(pluginConfig);
- if (hbaseParameters.getFamilyNames().size() == 0) {
- throw new HbaseConnectorException(
- SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
- "The corresponding field options should be configured and
should not be empty Refer to the hbase sink document");
- }
- }
-
- @Override
- public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
- this.seaTunnelRowType = seaTunnelRowType;
+ public HbaseSink(HbaseParameters hbaseParameters, CatalogTable
catalogTable) {
+ this.hbaseParameters = hbaseParameters;
+ this.seaTunnelRowType =
catalogTable.getTableSchema().toPhysicalRowDataType();
for (String rowkeyColumn : hbaseParameters.getRowkeyColumns()) {
this.rowkeyColumnIndexes.add(seaTunnelRowType.indexOf(rowkeyColumn));
}
@@ -99,8 +61,7 @@ public class HbaseSink extends
AbstractSimpleSink<SeaTunnelRow, Void> {
}
@Override
- public AbstractSinkWriter<SeaTunnelRow, Void>
createWriter(SinkWriter.Context context)
- throws IOException {
+ public HbaseSinkWriter createWriter(SinkWriter.Context context) throws
IOException {
return new HbaseSinkWriter(
seaTunnelRowType, hbaseParameters, rowkeyColumnIndexes,
versionColumnIndex);
}
diff --git
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkFactory.java
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkFactory.java
index 3038473c4e..1bbeb43f4e 100644
---
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkFactory.java
@@ -18,8 +18,13 @@
package org.apache.seatunnel.connectors.seatunnel.hbase.sink;
import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.sink.SinkCommonOptions;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
+import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseParameters;
import com.google.auto.service.AutoService;
@@ -50,6 +55,7 @@ public class HbaseSinkFactory implements TableSinkFactory {
return OptionRule.builder()
.required(ZOOKEEPER_QUORUM, TABLE, ROWKEY_COLUMNS, FAMILY_NAME)
.optional(
+ SinkCommonOptions.MULTI_TABLE_SINK_REPLICA,
ROWKEY_DELIMITER,
VERSION_COLUMN,
NULL_MODE,
@@ -59,4 +65,11 @@ public class HbaseSinkFactory implements TableSinkFactory {
HBASE_EXTRA_CONFIG)
.build();
}
+
+ @Override
+ public TableSink createSink(TableSinkFactoryContext context) {
+ HbaseParameters hbaseParameters =
HbaseParameters.buildWithConfig(context.getOptions());
+ CatalogTable catalogTable = context.getCatalogTable();
+ return () -> new HbaseSink(hbaseParameters, catalogTable);
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriter.java
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriter.java
index 7683d6aab0..e1e312d305 100644
---
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriter.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.connectors.seatunnel.hbase.sink;
+import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
@@ -41,11 +42,11 @@ import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.List;
-import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
-public class HbaseSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
+public class HbaseSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void>
+ implements SupportMultiTableSinkWriter<Void> {
private static final String ALL_COLUMNS = "all_columns";
@@ -63,7 +64,7 @@ public class HbaseSinkWriter extends
AbstractSinkWriter<SeaTunnelRow, Void> {
private final int versionColumnIndex;
- private String writeAllColumnFamily;
+ private String defaultFamilyName = "value";
public HbaseSinkWriter(
SeaTunnelRowType seaTunnelRowType,
@@ -77,7 +78,8 @@ public class HbaseSinkWriter extends
AbstractSinkWriter<SeaTunnelRow, Void> {
this.versionColumnIndex = versionColumnIndex;
if (hbaseParameters.getFamilyNames().size() == 1) {
- this.writeAllColumnFamily =
hbaseParameters.getFamilyNames().get(ALL_COLUMNS);
+ defaultFamilyName =
+ hbaseParameters.getFamilyNames().getOrDefault(ALL_COLUMNS,
defaultFamilyName);
}
// initialize hbase configuration
@@ -132,14 +134,8 @@ public class HbaseSinkWriter extends
AbstractSinkWriter<SeaTunnelRow, Void> {
.collect(Collectors.toList());
for (Integer writeColumnIndex : writeColumnIndexes) {
String fieldName = seaTunnelRowType.getFieldName(writeColumnIndex);
- // This is the family of columns that we define to be written
through the.conf file
- Map<String, String> configurationFamilyNames =
hbaseParameters.getFamilyNames();
String familyName =
- configurationFamilyNames.getOrDefault(fieldName,
writeAllColumnFamily);
- if (!configurationFamilyNames.containsKey(ALL_COLUMNS)
- && !configurationFamilyNames.containsKey(fieldName)) {
- continue;
- }
+ hbaseParameters.getFamilyNames().getOrDefault(fieldName,
defaultFamilyName);
byte[] bytes = convertColumnToBytes(row, writeColumnIndex);
if (bytes != null) {
put.addColumn(Bytes.toBytes(familyName),
Bytes.toBytes(fieldName), bytes);
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java
index 85ceef9235..fe736f965e 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java
@@ -56,8 +56,13 @@ import java.util.Objects;
public class HbaseIT extends TestSuiteBase implements TestResource {
private static final String TABLE_NAME = "seatunnel_test";
+
private static final String ASSIGN_CF_TABLE_NAME = "assign_cf_table";
+ private static final String MULTI_TABLE_ONE_NAME = "hbase_sink_1";
+
+ private static final String MULTI_TABLE_TWO_NAME = "hbase_sink_2";
+
private static final String FAMILY_NAME = "info";
private Connection hbaseConnection;
@@ -77,9 +82,14 @@ public class HbaseIT extends TestSuiteBase implements
TestResource {
// Create table for hbase sink test
log.info("initial");
hbaseCluster.createTable(TABLE_NAME, Arrays.asList(FAMILY_NAME));
+ // Create table for hbase assign cf table sink test
hbaseCluster.createTable(ASSIGN_CF_TABLE_NAME, Arrays.asList("cf1",
"cf2"));
table = TableName.valueOf(TABLE_NAME);
tableAssign = TableName.valueOf(ASSIGN_CF_TABLE_NAME);
+
+ // Create table for hbase multi-table sink test
+ hbaseCluster.createTable(MULTI_TABLE_ONE_NAME,
Arrays.asList(FAMILY_NAME));
+ hbaseCluster.createTable(MULTI_TABLE_TWO_NAME,
Arrays.asList(FAMILY_NAME));
}
@AfterAll
@@ -93,7 +103,11 @@ public class HbaseIT extends TestSuiteBase implements
TestResource {
@TestTemplate
public void testHbaseSink(TestContainer container) throws IOException,
InterruptedException {
- fakeToHbase(container);
+ deleteData(table);
+ Container.ExecResult sinkExecResult =
container.executeJob("/fake-to-hbase.conf");
+ Assertions.assertEquals(0, sinkExecResult.getExitCode());
+ ArrayList<Result> results = readData(table);
+ Assertions.assertEquals(results.size(), 5);
Container.ExecResult sourceExecResult =
container.executeJob("/hbase-to-assert.conf");
Assertions.assertEquals(0, sourceExecResult.getExitCode());
}
@@ -166,6 +180,25 @@ public class HbaseIT extends TestSuiteBase implements
TestResource {
Assertions.assertEquals(cf2Count, 5);
}
+ @DisabledOnContainer(
+ value = {},
+ type = {EngineType.FLINK},
+ disabledReason = "Currently FLINK does not support multiple table
write")
+ public void testHbaseMultiTableSink(TestContainer container)
+ throws IOException, InterruptedException {
+ TableName multiTable1 = TableName.valueOf(MULTI_TABLE_ONE_NAME);
+ TableName multiTable2 = TableName.valueOf(MULTI_TABLE_TWO_NAME);
+ deleteData(multiTable1);
+ deleteData(multiTable2);
+ Container.ExecResult sinkExecResult =
+ container.executeJob("/fake-to-hbase-with-multipletable.conf");
+ Assertions.assertEquals(0, sinkExecResult.getExitCode());
+ ArrayList<Result> results = readData(multiTable1);
+ Assertions.assertEquals(results.size(), 1);
+ results = readData(multiTable2);
+ Assertions.assertEquals(results.size(), 1);
+ }
+
@TestTemplate
public void testHbaseSourceWithBatchQuery(TestContainer container)
throws IOException, InterruptedException {
@@ -200,4 +233,16 @@ public class HbaseIT extends TestSuiteBase implements
TestResource {
hbaseTable.delete(deleteRow);
}
}
+
+ public ArrayList<Result> readData(TableName table) throws IOException {
+ Table hbaseTable = hbaseConnection.getTable(table);
+ Scan scan = new Scan();
+ ResultScanner scanner = hbaseTable.getScanner(scan);
+ ArrayList<Result> results = new ArrayList<>();
+ for (Result result : scanner) {
+ results.add(result);
+ }
+ scanner.close();
+ return results;
+ }
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake-to-hbase-with-multipletable.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake-to-hbase-with-multipletable.conf
new file mode 100644
index 0000000000..8972bf1324
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake-to-hbase-with-multipletable.conf
@@ -0,0 +1,86 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ # You can set engine configuration here
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ tables_configs = [
+ {
+ schema = {
+ table = "hbase_sink_1"
+ fields {
+ name = STRING
+ c_string = STRING
+ c_double = DOUBLE
+ c_bigint = BIGINT
+ c_float = FLOAT
+ c_int = INT
+ c_smallint = SMALLINT
+ c_boolean = BOOLEAN
+ time = BIGINT
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = ["label_1", "sink_1", 4.3, 200, 2.5, 2, 5, true,
1627529632356]
+ }
+ ]
+ },
+ {
+ schema = {
+ table = "hbase_sink_2"
+ fields {
+ name = STRING
+ c_string = STRING
+ c_double = DOUBLE
+ c_bigint = BIGINT
+ c_float = FLOAT
+ c_int = INT
+ c_smallint = SMALLINT
+ c_boolean = BOOLEAN
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = ["label_2", "sink_2", 4.3, 200, 2.5, 2, 5, true]
+ }
+ ]
+ }
+ ]
+ }
+}
+
+sink {
+ Hbase {
+ zookeeper_quorum = "hbase_e2e:2181"
+ table = "${table_name}"
+ rowkey_column = ["name"]
+ family_name {
+ all_columns = info
+ }
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/hbase-to-assert-with-multipletable.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/hbase-to-assert-with-multipletable.conf
new file mode 100644
index 0000000000..6dc7530b4b
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/hbase-to-assert-with-multipletable.conf
@@ -0,0 +1,129 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ Hbase {
+ zookeeper_quorum = "hbase_e2e:2181"
+ table = "seatunnel_test"
+ query_columns=["rowkey", "info:age", "info:c_double",
"info:c_boolean","info:c_bigint","info:c_smallint","info:c_tinyint","info:c_float"]
+ schema = {
+ columns = [
+ {
+ name = rowkey
+ type = string
+ },
+ {
+ name = "info:age"
+ type = int
+ },
+ {
+ name = "info:c_double"
+ type = double
+ },
+ {
+ name = "info:c_boolean"
+ type = boolean
+ },
+ {
+ name = "info:c_bigint"
+ type = bigint
+ },
+ {
+ name = "info:c_smallint"
+ type = smallint
+ },
+ {
+ name = "info:c_tinyint"
+ type = tinyint
+ },
+ {
+ name = "info:c_float"
+ type = float
+ }
+ ]
+ }
+ }
+}
+
+sink {
+ Assert {
+ rules {
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 11
+ },
+ {
+ rule_type = MIN_ROW
+ rule_value = 11
+ }
+ ],
+ field_rules = [
+ {
+ field_name = rowkey
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = "info:c_boolean"
+ field_type = boolean
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = "info:c_double"
+ field_type = double
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = "info:c_bigint"
+ field_type = bigint
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = "info:age"
+ field_type = int
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
\ No newline at end of file