This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 1da9bd6ce4 [E2E][HBase]Refactor hbase e2e (#6859)
1da9bd6ce4 is described below
commit 1da9bd6ce475a012a702a4a864d5307303ed0660
Author: TaoZex <[email protected]>
AuthorDate: Tue Jun 4 09:57:09 2024 +0800
[E2E][HBase]Refactor hbase e2e (#6859)
---
.../hbase/format/HBaseDeserializationFormat.java | 2 +
.../connector-hbase-e2e/pom.xml | 14 ++-
.../e2e/connector/hbase/HbaseCluster.java | 138 +++++++++++++++++++++
.../seatunnel/e2e/connector/hbase/HbaseIT.java | 109 ++++++----------
.../src/test/resources/fake-to-hbase-array.conf | 2 +-
.../src/test/resources/fake-to-hbase.conf | 2 +-
.../src/test/resources/hbase-to-assert.conf | 48 ++++---
7 files changed, 220 insertions(+), 95 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/format/HBaseDeserializationFormat.java
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/format/HBaseDeserializationFormat.java
index 8d7a1bcbe1..578df7101c 100644
---
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/format/HBaseDeserializationFormat.java
+++
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/format/HBaseDeserializationFormat.java
@@ -58,7 +58,9 @@ public class HBaseDeserializationFormat {
switch (typeInfo.getSqlType()) {
case TINYINT:
+ return cell[0];
case SMALLINT:
+ return (short) ((cell[0] & 0xFF) << 8 | (cell[1] & 0xFF));
case INT:
return Bytes.toInt(cell);
case BOOLEAN:
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/pom.xml
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/pom.xml
index 93ba8de981..b9fdd76f6f 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/pom.xml
@@ -26,14 +26,24 @@
<name>SeaTunnel : E2E : Connector V2 : Hbase</name>
<dependencies>
-
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-fake</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-hbase</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
-
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-assert</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseCluster.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseCluster.java
new file mode 100644
index 0000000000..3b54c35592
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseCluster.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.hbase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Stream;
+
+import static org.apache.seatunnel.e2e.common.container.TestContainer.NETWORK;
+
+public class HbaseCluster {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(HbaseCluster.class);
+
+ private static final int ZOOKEEPER_PORT = 2181;
+ private static final int MASTER_PORT = 16000;
+ private static final int REGION_PORT = 16020;
+ private static final String HOST = "hbase_e2e";
+
+ private static final String DOCKER_NAME =
"jcjabouille/hbase-standalone:2.4.9";
+ private static final DockerImageName HBASE_DOCKER_IMAGE =
DockerImageName.parse(DOCKER_NAME);
+
+ private Connection connection;
+ private GenericContainer<?> hbaseContainer;
+
+ public Connection startService() throws IOException {
+ String hostname = InetAddress.getLocalHost().getHostName();
+ hbaseContainer =
+ new GenericContainer<>(HBASE_DOCKER_IMAGE)
+ .withNetwork(NETWORK)
+ .withNetworkAliases(HOST)
+ .withExposedPorts(MASTER_PORT)
+ .withExposedPorts(REGION_PORT)
+ .withExposedPorts(ZOOKEEPER_PORT)
+ .withCreateContainerCmdModifier(cmd ->
cmd.withHostName(hostname))
+ .withEnv("HBASE_MASTER_PORT",
String.valueOf(MASTER_PORT))
+ .withEnv("HBASE_REGION_PORT",
String.valueOf(REGION_PORT))
+ .withEnv(
+ "HBASE_ZOOKEEPER_PROPERTY_CLIENTPORT",
+ String.valueOf(ZOOKEEPER_PORT))
+ .withEnv("HBASE_ZOOKEEPER_QUORUM", HOST)
+ .withLogConsumer(
+ new
Slf4jLogConsumer(DockerLoggerFactory.getLogger(DOCKER_NAME)));
+ hbaseContainer.setPortBindings(
+ Arrays.asList(
+ String.format("%s:%s", MASTER_PORT, MASTER_PORT),
+ String.format("%s:%s", REGION_PORT, REGION_PORT),
+ String.format("%s:%s", ZOOKEEPER_PORT,
ZOOKEEPER_PORT)));
+ Startables.deepStart(Stream.of(hbaseContainer)).join();
+ LOG.info("HBase container started");
+
+ String zookeeperQuorum = getZookeeperQuorum();
+ LOG.info("Successfully start hbase service, zookeeper quorum: {}",
zookeeperQuorum);
+ Configuration configuration = HBaseConfiguration.create();
+ configuration.set("hbase.zookeeper.quorum", zookeeperQuorum);
+ configuration.set("hbase.security.authentication", "simple");
+ configuration.set("hbase.rpc.timeout", "10000");
+ configuration.set("hbase.master.port", String.valueOf(MASTER_PORT));
+ configuration.set("hbase.regionserver.port",
String.valueOf(REGION_PORT));
+ connection = ConnectionFactory.createConnection(configuration);
+ return connection;
+ }
+
+ public void createTable(String tableName, List<String> list) throws
IOException {
+ TableDescriptorBuilder tableDesc =
+
TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName));
+
+ List<ColumnFamilyDescriptor> colFamilyList = new ArrayList<>();
+ for (String columnFamilys : list) {
+ ColumnFamilyDescriptorBuilder c =
+
ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(columnFamilys));
+ colFamilyList.add(c.build());
+ }
+ tableDesc.setColumnFamilies(colFamilyList);
+ Admin hbaseAdmin = connection.getAdmin();
+ hbaseAdmin.createTable(tableDesc.build());
+ }
+
+ public void stopService() throws IOException {
+ if (Objects.nonNull(connection)) {
+ connection.close();
+ }
+ if (Objects.nonNull(hbaseContainer)) {
+ hbaseContainer.close();
+ }
+ hbaseContainer = null;
+ }
+
+ public static String getZookeeperQuorum() {
+ String host = null;
+ try {
+ host = InetAddress.getLocalHost().getHostAddress();
+ } catch (UnknownHostException e) {
+ throw new RuntimeException(e);
+ }
+ return String.format("%s:%s", host, ZOOKEEPER_PORT);
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java
index e27e5c715e..d3cd57b326 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java
@@ -19,88 +19,63 @@ package org.apache.seatunnel.e2e.connector.hbase;
import org.apache.seatunnel.e2e.common.TestResource;
import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.EngineType;
import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
-import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.client.TableDescriptor;
-import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
-import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.TestTemplate;
import org.testcontainers.containers.Container;
-import org.testcontainers.containers.GenericContainer;
-import org.testcontainers.containers.output.Slf4jLogConsumer;
-import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy;
-import org.testcontainers.lifecycle.Startables;
-import org.testcontainers.utility.DockerImageName;
-import org.testcontainers.utility.DockerLoggerFactory;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
-import java.time.Duration;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Objects;
-import java.util.stream.Stream;
@Slf4j
-@Disabled(
- "Hbase docker e2e case need user add mapping information of between
container id and ip address in hosts file")
+@DisabledOnContainer(
+ value = {},
+ type = {EngineType.SEATUNNEL},
+ disabledReason = "The hbase container authentication configuration is
incorrect.")
public class HbaseIT extends TestSuiteBase implements TestResource {
- private static final String IMAGE = "harisekhon/hbase:latest";
-
- private static final int PORT = 2181;
-
- private static final String HOST = "hbase-e2e";
-
private static final String TABLE_NAME = "seatunnel_test";
private static final String FAMILY_NAME = "info";
- private final Configuration hbaseConfiguration =
HBaseConfiguration.create();
-
private Connection hbaseConnection;
private Admin admin;
private TableName table;
- private GenericContainer<?> hbaseContainer;
+ private HbaseCluster hbaseCluster;
@BeforeAll
@Override
public void startUp() throws Exception {
- hbaseContainer =
- new GenericContainer<>(DockerImageName.parse(IMAGE))
- .withNetwork(NETWORK)
- .withNetworkAliases(HOST)
- .withExposedPorts(PORT)
- .withLogConsumer(new
Slf4jLogConsumer(DockerLoggerFactory.getLogger(IMAGE)))
- .waitingFor(
- new HostPortWaitStrategy()
-
.withStartupTimeout(Duration.ofMinutes(2)));
- Startables.deepStart(Stream.of(hbaseContainer)).join();
- log.info("Hbase container started");
- this.initialize();
+ hbaseCluster = new HbaseCluster();
+ hbaseConnection = hbaseCluster.startService();
+ // Create table for hbase sink test
+ log.info("initial");
+ hbaseCluster.createTable(TABLE_NAME, Arrays.asList(FAMILY_NAME));
+ table = TableName.valueOf(TABLE_NAME);
}
@AfterAll
@@ -109,59 +84,37 @@ public class HbaseIT extends TestSuiteBase implements
TestResource {
if (Objects.nonNull(admin)) {
admin.close();
}
- if (Objects.nonNull(hbaseConnection)) {
- hbaseConnection.close();
- }
- if (Objects.nonNull(hbaseContainer)) {
- hbaseContainer.close();
- }
- }
-
- private void initialize() throws IOException {
- hbaseConfiguration.set("hbase.zookeeper.quorum", HOST + ":" + PORT);
- hbaseConnection =
ConnectionFactory.createConnection(hbaseConfiguration);
- admin = hbaseConnection.getAdmin();
- table = TableName.valueOf(TABLE_NAME);
- ColumnFamilyDescriptor familyDescriptor =
-
ColumnFamilyDescriptorBuilder.newBuilder(FAMILY_NAME.getBytes())
- .setCompressionType(Compression.Algorithm.SNAPPY)
-
.setCompactionCompressionType(Compression.Algorithm.SNAPPY)
- .build();
- TableDescriptor tableDescriptor =
-
TableDescriptorBuilder.newBuilder(table).setColumnFamily(familyDescriptor).build();
- admin.createTable(tableDescriptor);
- log.info("Hbase table has been initialized");
+ hbaseCluster.stopService();
}
@TestTemplate
public void testHbaseSink(TestContainer container) throws IOException,
InterruptedException {
- Container.ExecResult execResult =
container.executeJob("/fake-to-hbase.conf");
- Assertions.assertEquals(0, execResult.getExitCode());
+ deleteData(table);
+ Container.ExecResult sinkExecResult =
container.executeJob("/fake-to-hbase.conf");
+ Assertions.assertEquals(0, sinkExecResult.getExitCode());
Table hbaseTable = hbaseConnection.getTable(table);
Scan scan = new Scan();
- ArrayList<Result> results = new ArrayList<>();
ResultScanner scanner = hbaseTable.getScanner(scan);
+ ArrayList<Result> results = new ArrayList<>();
for (Result result : scanner) {
results.add(result);
}
Assertions.assertEquals(results.size(), 5);
- }
-
- @TestTemplate
- public void testHbaseSource(TestContainer container) throws IOException,
InterruptedException {
- Container.ExecResult execResult =
container.executeJob("/hbase-to-assert.conf");
- Assertions.assertEquals(0, execResult.getExitCode());
+ scanner.close();
+ Container.ExecResult sourceExecResult =
container.executeJob("/hbase-to-assert.conf");
+ Assertions.assertEquals(0, sourceExecResult.getExitCode());
}
@TestTemplate
public void testHbaseSinkWithArray(TestContainer container)
throws IOException, InterruptedException {
+ deleteData(table);
Container.ExecResult execResult =
container.executeJob("/fake-to-hbase-array.conf");
Assertions.assertEquals(0, execResult.getExitCode());
Table hbaseTable = hbaseConnection.getTable(table);
Scan scan = new Scan();
- ArrayList<Result> results = new ArrayList<>();
ResultScanner scanner = hbaseTable.getScanner(scan);
+ ArrayList<Result> results = new ArrayList<>();
for (Result result : scanner) {
String rowKey = Bytes.toString(result.getRow());
for (Cell cell : result.listCells()) {
@@ -177,5 +130,17 @@ public class HbaseIT extends TestSuiteBase implements
TestResource {
results.add(result);
}
Assertions.assertEquals(results.size(), 3);
+ scanner.close();
+ }
+
+ private void deleteData(TableName table) throws IOException {
+ Table hbaseTable = hbaseConnection.getTable(table);
+ Scan scan = new Scan();
+ ResultScanner scanner = hbaseTable.getScanner(scan);
+ // Delete the data generated by the test
+ for (Result result = scanner.next(); result != null; result =
scanner.next()) {
+ Delete deleteRow = new Delete(result.getRow());
+ hbaseTable.delete(deleteRow);
+ }
}
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake-to-hbase-array.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake-to-hbase-array.conf
index 5cf1896ea2..9da70ea80a 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake-to-hbase-array.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake-to-hbase-array.conf
@@ -49,7 +49,7 @@ source {
sink {
Hbase {
- zookeeper_quorum = "hbase-e2e:2181"
+ zookeeper_quorum = "hbase_e2e:2181"
table = "seatunnel_test"
rowkey_column = ["name"]
family_name {
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake-to-hbase.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake-to-hbase.conf
index f3e58ec008..be99bf43fe 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake-to-hbase.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake-to-hbase.conf
@@ -40,7 +40,7 @@ source {
sink {
Hbase {
- zookeeper_quorum = "hbase-e2e:2181"
+ zookeeper_quorum = "hbase_e2e:2181"
table = "seatunnel_test"
rowkey_column = ["name"]
family_name {
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/hbase-to-assert.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/hbase-to-assert.conf
index f209875745..c8b750738d 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/hbase-to-assert.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/hbase-to-assert.conf
@@ -22,9 +22,9 @@ env {
source {
Hbase {
- zookeeper_quorum = "hbase-e2e:2181"
+ zookeeper_quorum = "hbase_e2e:2181"
table = "seatunnel_test"
- query_columns=["rowkey", "cf1:col1", "cf1:col2", "cf2:col1", "cf2:col2"]
+ query_columns=["rowkey", "info:age", "info:c_double",
"info:c_boolean","info:c_bigint","info:c_smallint","info:c_tinyint","info:c_float"]
schema = {
columns = [
{
@@ -32,39 +32,49 @@ source {
type = string
},
{
- name = "cf1:col1"
- type = boolean
+ name = "info:age"
+ type = int
},
{
- name = "cf1:col2"
+ name = "info:c_double"
type = double
},
{
- name = "cf2:col1"
+ name = "info:c_boolean"
+ type = boolean
+ },
+ {
+ name = "info:c_bigint"
type = bigint
},
{
- name = "cf2:col2"
- type = int
+ name = "info:c_smallint"
+ type = smallint
+ },
+ {
+ name = "info:c_tinyint"
+ type = tinyint
+ },
+ {
+ name = "info:c_float"
+ type = float
}
- ]
- }
- result_table_name = hbase_source
- }
+ ]
+ }
+ }
}
sink {
Assert {
- source_table_name = hbase_source
rules {
row_rules = [
{
rule_type = MAX_ROW
- rule_value = 10000
+ rule_value = 5
},
{
rule_type = MIN_ROW
- rule_value = 10000
+ rule_value = 5
}
],
field_rules = [
@@ -78,7 +88,7 @@ sink {
]
},
{
- field_name = "cf1:col1"
+ field_name = "info:c_boolean"
field_type = boolean
field_value = [
{
@@ -87,7 +97,7 @@ sink {
]
},
{
- field_name = "cf1:col2"
+ field_name = "info:c_double"
field_type = double
field_value = [
{
@@ -96,7 +106,7 @@ sink {
]
},
{
- field_name = "cf2:col1"
+ field_name = "info:c_bigint"
field_type = bigint
field_value = [
{
@@ -105,7 +115,7 @@ sink {
]
},
{
- field_name = "cf2:col2"
+ field_name = "info:age"
field_type = int
field_value = [
{