This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new b978792cb1 [Feature][Connector-V2][Hbase] implement hbase catalog
(#7516)
b978792cb1 is described below
commit b978792cb1a3ec5e49a953da92a62bee3b446a3d
Author: Jast <[email protected]>
AuthorDate: Mon Sep 9 13:31:42 2024 +0800
[Feature][Connector-V2][Hbase] implement hbase catalog (#7516)
---
seatunnel-connectors-v2/connector-hbase/pom.xml | 6 +
.../seatunnel/hbase/catalog/HbaseCatalog.java | 194 ++++++++++
.../hbase/catalog/HbaseCatalogFactory.java | 50 +++
.../seatunnel/hbase/client/HbaseClient.java | 393 +++++++++++++++++++++
.../seatunnel/hbase/config/HbaseConfig.java | 21 ++
.../seatunnel/hbase/config/HbaseParameters.java | 23 +-
.../seatunnel/hbase/constant/HbaseIdentifier.java | 22 ++
.../hbase/exception/HbaseConnectorErrorCode.java | 13 +-
.../connectors/seatunnel/hbase/sink/HbaseSink.java | 75 +++-
.../seatunnel/hbase/sink/HbaseSinkFactory.java | 25 +-
.../seatunnel/hbase/sink/HbaseSinkWriter.java | 65 ++--
.../seatunnel/hbase/source/HbaseSource.java | 10 +-
.../seatunnel/hbase/source/HbaseSourceFactory.java | 5 +-
.../seatunnel/hbase/source/HbaseSourceReader.java | 36 +-
.../hbase/source/HbaseSourceSplitEnumerator.java | 29 +-
.../hbase/state/HbaseAggregatedCommitInfo.java | 22 ++
.../seatunnel/hbase/state/HbaseCommitInfo.java | 22 ++
.../seatunnel/hbase/state/HbaseSinkState.java | 22 ++
.../seatunnel/hbase/utils/HbaseConnectionUtil.java | 48 ---
.../seatunnel/e2e/connector/hbase/HbaseIT.java | 118 +++++++
.../resources/fake_to_hbase_with_append_data.conf | 52 +++
.../fake_to_hbase_with_create_when_not_exists.conf | 51 +++
.../resources/fake_to_hbase_with_drop_data.conf | 52 +++
.../fake_to_hbase_with_error_when_data_exists.conf | 52 +++
.../fake_to_hbase_with_error_when_not_exists.conf | 51 +++
.../fake_to_hbase_with_recreate_schema.conf | 51 +++
26 files changed, 1339 insertions(+), 169 deletions(-)
diff --git a/seatunnel-connectors-v2/connector-hbase/pom.xml
b/seatunnel-connectors-v2/connector-hbase/pom.xml
index 663bdcfdd3..bda49ade0e 100644
--- a/seatunnel-connectors-v2/connector-hbase/pom.xml
+++ b/seatunnel-connectors-v2/connector-hbase/pom.xml
@@ -47,6 +47,12 @@
<version>${hbase.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-format-json</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
</dependencies>
</project>
diff --git
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/catalog/HbaseCatalog.java
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/catalog/HbaseCatalog.java
new file mode 100644
index 0000000000..f6a4815073
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/catalog/HbaseCatalog.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hbase.catalog;
+
+import org.apache.seatunnel.api.configuration.util.ConfigUtil;
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.InfoPreviewResult;
+import org.apache.seatunnel.api.table.catalog.PreviewResult;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
+import
org.apache.seatunnel.api.table.catalog.exception.DatabaseAlreadyExistException;
+import
org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
+import
org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException;
+import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
+import org.apache.seatunnel.connectors.seatunnel.hbase.client.HbaseClient;
+import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseParameters;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/** Hbase catalog implementation. */
+@Slf4j
+public class HbaseCatalog implements Catalog {
+
+ private final String catalogName;
+ private final String defaultDatabase;
+ private final HbaseParameters hbaseParameters;
+
+ private HbaseClient hbaseClient;
+
+ public HbaseCatalog(
+ String catalogName, String defaultDatabase, HbaseParameters
hbaseParameters) {
+ this.catalogName = checkNotNull(catalogName, "catalogName cannot be
null");
+ this.defaultDatabase = defaultDatabase;
+ this.hbaseParameters = checkNotNull(hbaseParameters, "Hbase Config
cannot be null");
+ }
+
+ @Override
+ public void open() throws CatalogException {
+ try {
+ hbaseClient = HbaseClient.createInstance(hbaseParameters);
+ } catch (Exception e) {
+ throw new CatalogException(String.format("Failed to open catalog
%s", catalogName), e);
+ }
+ }
+
+ @Override
+ public void close() throws CatalogException {
+ hbaseClient.close();
+ }
+
+ @Override
+ public String name() {
+ return catalogName;
+ }
+
+ @Override
+ public String getDefaultDatabase() throws CatalogException {
+ return defaultDatabase;
+ }
+
+ @Override
+ public boolean databaseExists(String databaseName) throws CatalogException
{
+ return hbaseClient.databaseExists(databaseName);
+ }
+
+ @Override
+ public List<String> listDatabases() throws CatalogException {
+ return hbaseClient.listDatabases();
+ }
+
+ @Override
+ public List<String> listTables(String databaseName)
+ throws CatalogException, DatabaseNotExistException {
+ if (!databaseExists(databaseName)) {
+ throw new DatabaseNotExistException(catalogName, databaseName);
+ }
+ return hbaseClient.listTables(databaseName);
+ }
+
+ @Override
+ public boolean tableExists(TablePath tablePath) throws CatalogException {
+ checkNotNull(tablePath);
+ return hbaseClient.tableExists(tablePath.getTableName());
+ }
+
+ @Override
+ public CatalogTable getTable(TablePath tablePath)
+ throws CatalogException, TableNotExistException {
+ throw new UnsupportedOperationException("Not implement");
+ }
+
+ @Override
+ public void createTable(TablePath tablePath, CatalogTable table, boolean
ignoreIfExists)
+ throws TableAlreadyExistException, DatabaseNotExistException,
CatalogException {
+ checkNotNull(tablePath, "tablePath cannot be null");
+ hbaseClient.createTable(
+ tablePath.getDatabaseName(),
+ tablePath.getTableName(),
+ hbaseParameters.getFamilyNames().values().stream()
+ .filter(value -> !"all_columns".equals(value))
+ .collect(Collectors.toList()),
+ ignoreIfExists);
+ }
+
+ @Override
+ public void dropTable(TablePath tablePath, boolean ignoreIfNotExists)
+ throws TableNotExistException, CatalogException {
+ checkNotNull(tablePath);
+ if (!tableExists(tablePath) && !ignoreIfNotExists) {
+ throw new TableNotExistException(catalogName, tablePath);
+ }
+ hbaseClient.dropTable(tablePath.getDatabaseName(),
tablePath.getTableName());
+ }
+
+ @Override
+ public void createDatabase(TablePath tablePath, boolean ignoreIfExists)
+ throws DatabaseAlreadyExistException, CatalogException {
+ if (databaseExists(tablePath.getDatabaseName()) && !ignoreIfExists) {
+ throw new DatabaseAlreadyExistException(catalogName,
tablePath.getDatabaseName());
+ }
+ hbaseClient.createNamespace(tablePath.getDatabaseName());
+ }
+
+ @Override
+ public void dropDatabase(TablePath tablePath, boolean ignoreIfNotExists)
+ throws DatabaseNotExistException, CatalogException {
+ if (!databaseExists(tablePath.getDatabaseName()) &&
!ignoreIfNotExists) {
+ throw new DatabaseNotExistException(catalogName,
tablePath.getDatabaseName());
+ }
+ hbaseClient.deleteNamespace(tablePath.getDatabaseName());
+ }
+
+ @Override
+ public void truncateTable(TablePath tablePath, boolean ignoreIfNotExists) {
+ if (!tableExists(tablePath) && !ignoreIfNotExists) {
+ throw new TableNotExistException(catalogName, tablePath);
+ }
+ hbaseClient.truncateTable(tablePath.getDatabaseName(),
tablePath.getTableName());
+ }
+
+ @Override
+ public boolean isExistsData(TablePath tablePath) {
+ return hbaseClient.isExistsData(tablePath.getDatabaseName(),
tablePath.getTableName());
+ }
+
+ private Map<String, String> buildTableOptions(TablePath tablePath) {
+ Map<String, String> options = new HashMap<>();
+ options.put("connector", "hbase");
+ options.put("config", ConfigUtil.convertToJsonString(tablePath));
+ return options;
+ }
+
+ @Override
+ public PreviewResult previewAction(
+ ActionType actionType, TablePath tablePath, Optional<CatalogTable>
catalogTable) {
+ if (actionType == ActionType.CREATE_TABLE) {
+ return new InfoPreviewResult("create index " +
tablePath.getTableName());
+ } else if (actionType == ActionType.DROP_TABLE) {
+ return new InfoPreviewResult("delete index " +
tablePath.getTableName());
+ } else if (actionType == ActionType.TRUNCATE_TABLE) {
+ return new InfoPreviewResult("delete and create index " +
tablePath.getTableName());
+ } else if (actionType == ActionType.CREATE_DATABASE) {
+ return new InfoPreviewResult("create index " +
tablePath.getTableName());
+ } else if (actionType == ActionType.DROP_DATABASE) {
+ return new InfoPreviewResult("delete index " +
tablePath.getTableName());
+ } else {
+ throw new UnsupportedOperationException("Unsupported action type:
" + actionType);
+ }
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/catalog/HbaseCatalogFactory.java
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/catalog/HbaseCatalogFactory.java
new file mode 100644
index 0000000000..b9a3fc25fd
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/catalog/HbaseCatalogFactory.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hbase.catalog;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.factory.CatalogFactory;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseParameters;
+import
org.apache.seatunnel.connectors.seatunnel.hbase.constant.HbaseIdentifier;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class HbaseCatalogFactory implements CatalogFactory {
+
+ @Override
+ public Catalog createCatalog(String catalogName, ReadonlyConfig options) {
+ // Create an instance of HbaseCatalog, passing in the catalog name,
namespace, and Hbase
+ // parameters
+ HbaseParameters hbaseParameters =
HbaseParameters.buildWithConfig(options);
+ return new HbaseCatalog(catalogName, hbaseParameters.getNamespace(),
hbaseParameters);
+ }
+
+ @Override
+ public String factoryIdentifier() {
+ return HbaseIdentifier.IDENTIFIER_NAME;
+ }
+
+ @Override
+ public OptionRule optionRule() {
+ return OptionRule.builder().build();
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/client/HbaseClient.java
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/client/HbaseClient.java
new file mode 100644
index 0000000000..aec64bf7cf
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/client/HbaseClient.java
@@ -0,0 +1,393 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hbase.client;
+
+import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseParameters;
+import
org.apache.seatunnel.connectors.seatunnel.hbase.exception.HbaseConnectorErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.hbase.exception.HbaseConnectorException;
+import org.apache.seatunnel.connectors.seatunnel.hbase.source.HbaseSourceSplit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.NamespaceDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.BufferedMutator;
+import org.apache.hadoop.hbase.client.BufferedMutatorParams;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RegionLocator;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static
org.apache.seatunnel.connectors.seatunnel.hbase.exception.HbaseConnectorErrorCode.CONNECTION_FAILED_FOR_ADMIN;
+
+@Slf4j
+public class HbaseClient {
+
+ private final Connection connection;
+ private final Admin admin;
+ private final BufferedMutator hbaseMutator;
+ public static Configuration hbaseConfiguration;
+
+ /**
+ * Constructor for HbaseClient.
+ *
+ * @param connection Hbase connection
+ * @param hbaseParameters Hbase parameters
+ */
+ private HbaseClient(Connection connection, HbaseParameters
hbaseParameters) {
+ this.connection = connection;
+ try {
+ this.admin = connection.getAdmin();
+
+ BufferedMutatorParams bufferedMutatorParams =
+ new BufferedMutatorParams(
+ TableName.valueOf(
+ hbaseParameters.getNamespace(),
+ hbaseParameters.getTable()))
+
.pool(HTable.getDefaultExecutor(hbaseConfiguration))
+
.writeBufferSize(hbaseParameters.getWriteBufferSize());
+ hbaseMutator =
connection.getBufferedMutator(bufferedMutatorParams);
+ } catch (IOException e) {
+ throw new HbaseConnectorException(
+ CONNECTION_FAILED_FOR_ADMIN,
CONNECTION_FAILED_FOR_ADMIN.getDescription(), e);
+ }
+ }
+
+ /**
+ * Create a new instance of HbaseClient.
+ *
+ * @param hbaseParameters Hbase parameters
+ * @return HbaseClient
+ */
+ public static HbaseClient createInstance(HbaseParameters hbaseParameters) {
+ return new HbaseClient(getHbaseConnection(hbaseParameters),
hbaseParameters);
+ }
+
+ /**
+ * Get Hbase connection.
+ *
+ * @param hbaseParameters Hbase parameters
+ * @return Hbase connection
+ */
+ private static Connection getHbaseConnection(HbaseParameters
hbaseParameters) {
+ hbaseConfiguration = HBaseConfiguration.create();
+ hbaseConfiguration.set("hbase.zookeeper.quorum",
hbaseParameters.getZookeeperQuorum());
+ if (hbaseParameters.getHbaseExtraConfig() != null) {
+
hbaseParameters.getHbaseExtraConfig().forEach(hbaseConfiguration::set);
+ }
+ try {
+ Connection connection =
ConnectionFactory.createConnection(hbaseConfiguration);
+ return connection;
+ } catch (IOException e) {
+ String errorMsg = "Build Hbase connection failed.";
+ throw new HbaseConnectorException(
+ HbaseConnectorErrorCode.CONNECTION_FAILED, errorMsg, e);
+ }
+ }
+
+ /**
+ * Check if a database exists.
+ *
+ * @param databaseName database name
+ * @return true if the database exists, false otherwise
+ */
+ public boolean databaseExists(String databaseName) {
+ try {
+ return Arrays.stream(admin.listNamespaceDescriptors())
+ .anyMatch(descriptor ->
descriptor.getName().equals(databaseName));
+ } catch (IOException e) {
+ throw new HbaseConnectorException(
+ HbaseConnectorErrorCode.DATABASE_QUERY_EXCEPTION,
+
HbaseConnectorErrorCode.DATABASE_QUERY_EXCEPTION.getErrorMessage(),
+ e);
+ }
+ }
+
+ /**
+ * List all databases.
+ *
+ * @return List of database names
+ */
+ public List<String> listDatabases() {
+ try {
+ return Arrays.stream(admin.listNamespaceDescriptors())
+ .map(NamespaceDescriptor::getName)
+ .collect(Collectors.toList());
+ } catch (IOException e) {
+ throw new HbaseConnectorException(
+ HbaseConnectorErrorCode.DATABASE_QUERY_EXCEPTION,
+
HbaseConnectorErrorCode.DATABASE_QUERY_EXCEPTION.getErrorMessage(),
+ e);
+ }
+ }
+
+ /**
+ * List all tables in a database.
+ *
+ * @param databaseName database name
+ * @return List of table names
+ */
+ public List<String> listTables(String databaseName) {
+ try {
+ return Arrays.stream(admin.listTableNamesByNamespace(databaseName))
+ .map(tableName -> tableName.getNameAsString())
+ .collect(Collectors.toList());
+ } catch (IOException e) {
+ throw new HbaseConnectorException(
+ HbaseConnectorErrorCode.DATABASE_QUERY_EXCEPTION,
+
HbaseConnectorErrorCode.DATABASE_QUERY_EXCEPTION.getErrorMessage(),
+ e);
+ }
+ }
+
+ /**
+ * Check if a table exists.
+ *
+ * @param tableName table name
+ * @return true if the table exists, false otherwise
+ */
+ public boolean tableExists(String tableName) {
+ try {
+ return admin.tableExists(TableName.valueOf(tableName));
+ } catch (IOException e) {
+ throw new HbaseConnectorException(
+ HbaseConnectorErrorCode.TABLE_QUERY_EXCEPTION,
+
HbaseConnectorErrorCode.TABLE_QUERY_EXCEPTION.getErrorMessage(),
+ e);
+ }
+ }
+
+ /**
+ * Create a table.
+ *
+ * @param databaseName database name
+ * @param tableName table name
+ * @param columnFamilies column families
+ * @param ignoreIfExists ignore if the table already exists
+ */
+ public void createTable(
+ String databaseName,
+ String tableName,
+ List<String> columnFamilies,
+ boolean ignoreIfExists) {
+ try {
+ if (!databaseExists(databaseName)) {
+
admin.createNamespace(NamespaceDescriptor.create(databaseName).build());
+ }
+ TableName table = TableName.valueOf(databaseName, tableName);
+ if (tableExists(table.getNameAsString())) {
+ log.info("Table {} already exists.", table.getNameAsString());
+ if (!ignoreIfExists) {
+ throw new HbaseConnectorException(
+ HbaseConnectorErrorCode.TABLE_EXISTS_EXCEPTION,
+
HbaseConnectorErrorCode.TABLE_EXISTS_EXCEPTION.getErrorMessage());
+ }
+ return;
+ }
+ TableDescriptorBuilder hbaseTableDescriptor =
TableDescriptorBuilder.newBuilder(table);
+ columnFamilies.forEach(
+ family ->
+ hbaseTableDescriptor.setColumnFamily(
+
ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(family))
+ .build()));
+ admin.createTable(hbaseTableDescriptor.build());
+ } catch (IOException e) {
+ throw new HbaseConnectorException(
+ HbaseConnectorErrorCode.TABLE_CREATE_EXCEPTION,
+
HbaseConnectorErrorCode.TABLE_CREATE_EXCEPTION.getErrorMessage(),
+ e);
+ }
+ }
+
+ /**
+ * Drop a table.
+ *
+ * @param databaseName database name
+ * @param tableName table name
+ */
+ public void dropTable(String databaseName, String tableName) {
+ try {
+ TableName table = TableName.valueOf(databaseName, tableName);
+ admin.disableTable(table);
+ admin.deleteTable(table);
+ } catch (IOException e) {
+ throw new HbaseConnectorException(
+ HbaseConnectorErrorCode.TABLE_DELETE_EXCEPTION,
+
HbaseConnectorErrorCode.TABLE_DELETE_EXCEPTION.getErrorMessage(),
+ e);
+ }
+ }
+
+ /**
+ * Create a namespace.
+ *
+ * @param namespace namespace name
+ */
+ public void createNamespace(String namespace) {
+ try {
+
admin.createNamespace(NamespaceDescriptor.create(namespace).build());
+ } catch (IOException e) {
+ throw new HbaseConnectorException(
+ HbaseConnectorErrorCode.NAMESPACE_CREATE_EXCEPTION,
+
HbaseConnectorErrorCode.NAMESPACE_CREATE_EXCEPTION.getErrorMessage(),
+ e);
+ }
+ }
+
+ /**
+ * Drop a namespace.
+ *
+ * @param namespace namespace name
+ */
+ public void deleteNamespace(String namespace) {
+ try {
+ admin.deleteNamespace(namespace);
+ } catch (IOException e) {
+ throw new HbaseConnectorException(
+ HbaseConnectorErrorCode.NAMESPACE_DELETE_EXCEPTION,
+
HbaseConnectorErrorCode.NAMESPACE_DELETE_EXCEPTION.getErrorMessage(),
+ e);
+ }
+ }
+
+ /**
+ * Truncate a table.
+ *
+ * @param databaseName database name
+ * @param tableName table name
+ */
+ public void truncateTable(String databaseName, String tableName) {
+ try {
+ TableName table = TableName.valueOf(databaseName, tableName);
+ admin.disableTable(table);
+ admin.truncateTable(table, true);
+ } catch (IOException e) {
+ throw new HbaseConnectorException(
+ HbaseConnectorErrorCode.TABLE_TRUNCATE_EXCEPTION,
+
HbaseConnectorErrorCode.TABLE_TRUNCATE_EXCEPTION.getErrorMessage(),
+ e);
+ }
+ }
+
+ /**
+ * Check if a table has data.
+ *
+ * @param databaseName database name
+ * @param tableName table name
+ * @return true if the table has data, false otherwise
+ */
+ public boolean isExistsData(String databaseName, String tableName) {
+ try {
+ Table table = connection.getTable(TableName.valueOf(databaseName,
tableName));
+ Scan scan = new Scan();
+ scan.setCaching(1);
+ scan.setLimit(1);
+ try (ResultScanner scanner = table.getScanner(scan)) {
+ Result result = scanner.next();
+ return !result.isEmpty();
+ }
+ } catch (IOException e) {
+ throw new HbaseConnectorException(
+ HbaseConnectorErrorCode.TABLE_QUERY_EXCEPTION,
+
HbaseConnectorErrorCode.TABLE_QUERY_EXCEPTION.getErrorMessage(),
+ e);
+ }
+ }
+
+ /** Close Hbase connection. */
+ public void close() {
+ try {
+ if (hbaseMutator != null) {
+ hbaseMutator.flush();
+ hbaseMutator.close();
+ }
+ if (admin != null) {
+ admin.close();
+ }
+ if (connection != null) {
+ connection.close();
+ }
+ } catch (IOException e) {
+ log.error("Close Hbase connection failed.", e);
+ }
+ }
+
+ /**
+ * Mutate a Put.
+ *
+ * @param put Hbase put
+ * @throws IOException exception
+ */
+ public void mutate(Put put) throws IOException {
+ hbaseMutator.mutate(put);
+ }
+
+ /**
+ * Scan a table.
+ *
+ * @param split Hbase source split
+ * @param hbaseParameters Hbase parameters
+ * @param columnNames column names
+ * @return ResultScanner
+ * @throws IOException exception
+ */
+ public ResultScanner scan(
+ HbaseSourceSplit split, HbaseParameters hbaseParameters,
List<String> columnNames)
+ throws IOException {
+ Scan scan = new Scan();
+ scan.withStartRow(split.getStartRow(), true);
+ scan.withStopRow(split.getEndRow(), true);
+ scan.setCacheBlocks(hbaseParameters.isCacheBlocks());
+ scan.setCaching(hbaseParameters.getCaching());
+ scan.setBatch(hbaseParameters.getBatch());
+ for (String columnName : columnNames) {
+ String[] columnNameSplit = columnName.split(":");
+ scan.addColumn(Bytes.toBytes(columnNameSplit[0]),
Bytes.toBytes(columnNameSplit[1]));
+ }
+ return this.connection
+ .getTable(TableName.valueOf(hbaseParameters.getTable()))
+ .getScanner(scan);
+ }
+
+ /**
+ * Get a RegionLocator.
+ *
+ * @param tableName table name
+ * @return RegionLocator
+ * @throws IOException exception
+ */
+ public RegionLocator getRegionLocator(String tableName) throws IOException
{
+ return this.connection.getRegionLocator(TableName.valueOf(tableName));
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseConfig.java
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseConfig.java
index 44a5640ffe..2921e1f91c 100644
---
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseConfig.java
+++
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseConfig.java
@@ -19,10 +19,17 @@ package
org.apache.seatunnel.connectors.seatunnel.hbase.config;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.api.sink.DataSaveMode;
+import org.apache.seatunnel.api.sink.SchemaSaveMode;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import static org.apache.seatunnel.api.sink.DataSaveMode.APPEND_DATA;
+import static org.apache.seatunnel.api.sink.DataSaveMode.DROP_DATA;
+import static
org.apache.seatunnel.api.sink.DataSaveMode.ERROR_WHEN_DATA_EXISTS;
+
public class HbaseConfig {
private static final Integer DEFAULT_BUFFER_SIZE = 8 * 1024 * 1024;
@@ -119,6 +126,20 @@ public class HbaseConfig {
.withDescription(
"Set the batch size to control the maximum number
of cells returned each time, thereby controlling the amount of data returned by
a single RPC call. The default value is -1.");
+ public static final Option<SchemaSaveMode> SCHEMA_SAVE_MODE =
+ Options.key("schema_save_mode")
+ .enumType(SchemaSaveMode.class)
+ .defaultValue(SchemaSaveMode.CREATE_SCHEMA_WHEN_NOT_EXIST)
+ .withDescription("schema_save_mode");
+
+ public static final Option<DataSaveMode> DATA_SAVE_MODE =
+ Options.key("data_save_mode")
+ .singleChoice(
+ DataSaveMode.class,
+ Arrays.asList(DROP_DATA, APPEND_DATA,
ERROR_WHEN_DATA_EXISTS))
+ .defaultValue(APPEND_DATA)
+ .withDescription("data_save_mode");
+
public enum NullMode {
SKIP,
EMPTY;
diff --git
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java
index 4d020700ad..66b4eb967b 100644
---
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java
+++
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java
@@ -51,6 +51,8 @@ public class HbaseParameters implements Serializable {
private String zookeeperQuorum;
+ private String namespace;
+
private String table;
private List<String> rowkeyColumns;
@@ -83,13 +85,22 @@ public class HbaseParameters implements Serializable {
public static HbaseParameters buildWithConfig(ReadonlyConfig config) {
HbaseParametersBuilder builder = HbaseParameters.builder();
+ String table = config.get(TABLE);
+ int colonIndex = table.indexOf(':');
+ if (colonIndex != -1) {
+ String namespace = table.substring(0, colonIndex);
+ builder.namespace(namespace);
+ builder.table(table.substring(colonIndex + 1));
+ } else {
+ builder.table(table);
+ builder.namespace("default");
+ }
// required parameters
builder.zookeeperQuorum(config.get(ZOOKEEPER_QUORUM));
builder.rowkeyColumns(config.get(ROWKEY_COLUMNS));
builder.familyNames(config.get(FAMILY_NAME));
- builder.table(config.get(TABLE));
builder.rowkeyDelimiter(config.get(ROWKEY_DELIMITER));
builder.versionColumn(config.get(VERSION_COLUMN));
String nullMode = String.valueOf(config.get(NULL_MODE));
@@ -108,7 +119,15 @@ public class HbaseParameters implements Serializable {
// required parameters
builder.zookeeperQuorum(pluginConfig.getString(ZOOKEEPER_QUORUM.key()));
- builder.table(pluginConfig.getString(TABLE.key()));
+ String table = pluginConfig.getString(TABLE.key());
+ int colonIndex = table.indexOf(':');
+ if (colonIndex != -1) {
+ String namespace = table.substring(0, colonIndex);
+ builder.namespace(namespace);
+ builder.table(table.substring(colonIndex + 1));
+ } else {
+ builder.table(table);
+ }
if (pluginConfig.hasPath(HBASE_EXTRA_CONFIG.key())) {
Config extraConfig =
pluginConfig.getConfig(HBASE_EXTRA_CONFIG.key());
diff --git
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/constant/HbaseIdentifier.java
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/constant/HbaseIdentifier.java
new file mode 100644
index 0000000000..3d84216d66
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/constant/HbaseIdentifier.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hbase.constant;
+
+public class HbaseIdentifier {
+ public static final String IDENTIFIER_NAME = "Hbase";
+}
diff --git
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/exception/HbaseConnectorErrorCode.java
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/exception/HbaseConnectorErrorCode.java
index 5717c933b0..7f6a60f955 100644
---
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/exception/HbaseConnectorErrorCode.java
+++
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/exception/HbaseConnectorErrorCode.java
@@ -21,8 +21,17 @@ package
org.apache.seatunnel.connectors.seatunnel.hbase.exception;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
public enum HbaseConnectorErrorCode implements SeaTunnelErrorCode {
- CONNECTION_FAILED("Hbase-01", "Build Hbase connection failed");
-
+ CONNECTION_FAILED("Hbase-01", "Build Hbase connection failed"),
+ CONNECTION_FAILED_FOR_ADMIN("Hbase-02", "Build Hbase Admin failed"),
+ DATABASE_QUERY_EXCEPTION("Hbase-03", "Hbase namespace query failed"),
+ TABLE_QUERY_EXCEPTION("Hbase-04", "Hbase table query failed"),
+ TABLE_CREATE_EXCEPTION("Hbase-05", "Hbase table create failed"),
+ TABLE_DELETE_EXCEPTION("Hbase-06", "Hbase table delete failed"),
+ TABLE_EXISTS_EXCEPTION("Hbase-07", "Hbase table exists failed"),
+ NAMESPACE_CREATE_EXCEPTION("Hbase-08", "Hbase namespace create failed"),
+ NAMESPACE_DELETE_EXCEPTION("Hbase-09", "Hbase namespace delete failed"),
+ TABLE_TRUNCATE_EXCEPTION("Hbase-10", "Hbase table truncate failed"),
+ ;
private final String code;
private final String description;
diff --git
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSink.java
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSink.java
index 0c592dd65a..0a46b1baef 100644
---
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSink.java
+++
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSink.java
@@ -17,52 +17,97 @@
package org.apache.seatunnel.connectors.seatunnel.hbase.sink;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.sink.DataSaveMode;
+import org.apache.seatunnel.api.sink.DefaultSaveModeHandler;
+import org.apache.seatunnel.api.sink.SaveModeHandler;
+import org.apache.seatunnel.api.sink.SchemaSaveMode;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.sink.SupportMultiTableSink;
+import org.apache.seatunnel.api.sink.SupportSaveMode;
+import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.factory.CatalogFactory;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
+import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig;
import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseParameters;
+import
org.apache.seatunnel.connectors.seatunnel.hbase.constant.HbaseIdentifier;
+import
org.apache.seatunnel.connectors.seatunnel.hbase.state.HbaseAggregatedCommitInfo;
+import org.apache.seatunnel.connectors.seatunnel.hbase.state.HbaseCommitInfo;
+import org.apache.seatunnel.connectors.seatunnel.hbase.state.HbaseSinkState;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Optional;
-public class HbaseSink extends AbstractSimpleSink<SeaTunnelRow, Void>
- implements SupportMultiTableSink {
+import static
org.apache.seatunnel.api.table.factory.FactoryUtil.discoverFactory;
- private Config pluginConfig;
+public class HbaseSink
+ implements SeaTunnelSink<
+ SeaTunnelRow, HbaseSinkState, HbaseCommitInfo,
HbaseAggregatedCommitInfo>,
+ SupportMultiTableSink,
+ SupportSaveMode {
- private SeaTunnelRowType seaTunnelRowType;
+ private ReadonlyConfig config;
+
+ private CatalogTable catalogTable;
- private HbaseParameters hbaseParameters;
+ private final HbaseParameters hbaseParameters;
+
+ private SeaTunnelRowType seaTunnelRowType;
private List<Integer> rowkeyColumnIndexes = new ArrayList<>();
private int versionColumnIndex = -1;
+ public HbaseSink(ReadonlyConfig config, CatalogTable catalogTable) {
+ this.hbaseParameters = HbaseParameters.buildWithConfig(config);
+ this.config = config;
+ this.catalogTable = catalogTable;
+ this.seaTunnelRowType = catalogTable.getSeaTunnelRowType();
+ if (hbaseParameters.getVersionColumn() != null) {
+ this.versionColumnIndex =
seaTunnelRowType.indexOf(hbaseParameters.getVersionColumn());
+ }
+ }
+
@Override
public String getPluginName() {
- return HbaseSinkFactory.IDENTIFIER;
+ return HbaseIdentifier.IDENTIFIER_NAME;
}
- public HbaseSink(HbaseParameters hbaseParameters, CatalogTable
catalogTable) {
- this.hbaseParameters = hbaseParameters;
- this.seaTunnelRowType =
catalogTable.getTableSchema().toPhysicalRowDataType();
+ @Override
+ public HbaseSinkWriter createWriter(SinkWriter.Context context) throws
IOException {
for (String rowkeyColumn : hbaseParameters.getRowkeyColumns()) {
this.rowkeyColumnIndexes.add(seaTunnelRowType.indexOf(rowkeyColumn));
}
if (hbaseParameters.getVersionColumn() != null) {
this.versionColumnIndex =
seaTunnelRowType.indexOf(hbaseParameters.getVersionColumn());
}
+ return new HbaseSinkWriter(
+ seaTunnelRowType, hbaseParameters, rowkeyColumnIndexes,
versionColumnIndex);
}
@Override
- public HbaseSinkWriter createWriter(SinkWriter.Context context) throws
IOException {
- return new HbaseSinkWriter(
- seaTunnelRowType, hbaseParameters, rowkeyColumnIndexes,
versionColumnIndex);
+ public Optional<SaveModeHandler> getSaveModeHandler() {
+ CatalogFactory catalogFactory =
+ discoverFactory(
+ Thread.currentThread().getContextClassLoader(),
+ CatalogFactory.class,
+ getPluginName());
+ if (catalogFactory == null) {
+ return Optional.empty();
+ }
+ Catalog catalog =
catalogFactory.createCatalog(catalogFactory.factoryIdentifier(), config);
+ SchemaSaveMode schemaSaveMode =
config.get(HbaseConfig.SCHEMA_SAVE_MODE);
+ DataSaveMode dataSaveMode = config.get(HbaseConfig.DATA_SAVE_MODE);
+ TablePath tablePath =
+ TablePath.of(hbaseParameters.getNamespace(),
hbaseParameters.getTable());
+ return Optional.of(
+ new DefaultSaveModeHandler(
+ schemaSaveMode, dataSaveMode, catalog, tablePath,
null, null));
}
}
diff --git
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkFactory.java
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkFactory.java
index 1bbeb43f4e..0992b11d71 100644
---
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkFactory.java
@@ -17,23 +17,25 @@
package org.apache.seatunnel.connectors.seatunnel.hbase.sink;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.sink.SinkCommonOptions;
-import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
-import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseParameters;
+import
org.apache.seatunnel.connectors.seatunnel.hbase.constant.HbaseIdentifier;
import com.google.auto.service.AutoService;
+import static
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.DATA_SAVE_MODE;
import static
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ENCODING;
import static
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.FAMILY_NAME;
import static
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.HBASE_EXTRA_CONFIG;
import static
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.NULL_MODE;
import static
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ROWKEY_COLUMNS;
import static
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ROWKEY_DELIMITER;
+import static
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.SCHEMA_SAVE_MODE;
import static
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.TABLE;
import static
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.VERSION_COLUMN;
import static
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.WAL_WRITE;
@@ -47,29 +49,34 @@ public class HbaseSinkFactory implements TableSinkFactory {
@Override
public String factoryIdentifier() {
- return IDENTIFIER;
+ return HbaseIdentifier.IDENTIFIER_NAME;
}
@Override
public OptionRule optionRule() {
return OptionRule.builder()
- .required(ZOOKEEPER_QUORUM, TABLE, ROWKEY_COLUMNS, FAMILY_NAME)
+ .required(
+ ZOOKEEPER_QUORUM,
+ TABLE,
+ ROWKEY_COLUMNS,
+ FAMILY_NAME,
+ SCHEMA_SAVE_MODE,
+ DATA_SAVE_MODE)
.optional(
- SinkCommonOptions.MULTI_TABLE_SINK_REPLICA,
ROWKEY_DELIMITER,
VERSION_COLUMN,
NULL_MODE,
WAL_WRITE,
WRITE_BUFFER_SIZE,
ENCODING,
- HBASE_EXTRA_CONFIG)
+ HBASE_EXTRA_CONFIG,
+ SinkCommonOptions.MULTI_TABLE_SINK_REPLICA)
.build();
}
@Override
public TableSink createSink(TableSinkFactoryContext context) {
- HbaseParameters hbaseParameters =
HbaseParameters.buildWithConfig(context.getOptions());
- CatalogTable catalogTable = context.getCatalogTable();
- return () -> new HbaseSink(hbaseParameters, catalogTable);
+ ReadonlyConfig readonlyConfig = context.getOptions();
+ return () -> new HbaseSink(readonlyConfig, context.getCatalogTable());
}
}
diff --git
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriter.java
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriter.java
index e1e312d305..73ee19f936 100644
---
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriter.java
@@ -17,52 +17,46 @@
package org.apache.seatunnel.connectors.seatunnel.hbase.sink;
+import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
-import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+import org.apache.seatunnel.connectors.seatunnel.hbase.client.HbaseClient;
import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseParameters;
import
org.apache.seatunnel.connectors.seatunnel.hbase.exception.HbaseConnectorException;
+import org.apache.seatunnel.connectors.seatunnel.hbase.state.HbaseCommitInfo;
+import org.apache.seatunnel.connectors.seatunnel.hbase.state.HbaseSinkState;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.BufferedMutator;
-import org.apache.hadoop.hbase.client.BufferedMutatorParams;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Durability;
-import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.List;
+import java.util.Map;
+import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
-public class HbaseSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void>
- implements SupportMultiTableSinkWriter<Void> {
+public class HbaseSinkWriter
+ implements SinkWriter<SeaTunnelRow, HbaseCommitInfo, HbaseSinkState>,
+ SupportMultiTableSinkWriter<Void> {
private static final String ALL_COLUMNS = "all_columns";
- private final Configuration hbaseConfiguration =
HBaseConfiguration.create();
-
- private final Connection hbaseConnection;
-
- private final BufferedMutator hbaseMutator;
+ private final HbaseClient hbaseClient;
private final SeaTunnelRowType seaTunnelRowType;
private final HbaseParameters hbaseParameters;
- private final List<Integer> rowkeyColumnIndexes;
+ private List<Integer> rowkeyColumnIndexes;
- private final int versionColumnIndex;
+ private int versionColumnIndex;
private String defaultFamilyName = "value";
@@ -70,8 +64,7 @@ public class HbaseSinkWriter extends
AbstractSinkWriter<SeaTunnelRow, Void>
SeaTunnelRowType seaTunnelRowType,
HbaseParameters hbaseParameters,
List<Integer> rowkeyColumnIndexes,
- int versionColumnIndex)
- throws IOException {
+ int versionColumnIndex) {
this.seaTunnelRowType = seaTunnelRowType;
this.hbaseParameters = hbaseParameters;
this.rowkeyColumnIndexes = rowkeyColumnIndexes;
@@ -82,34 +75,27 @@ public class HbaseSinkWriter extends
AbstractSinkWriter<SeaTunnelRow, Void>
hbaseParameters.getFamilyNames().getOrDefault(ALL_COLUMNS,
defaultFamilyName);
}
- // initialize hbase configuration
- hbaseConfiguration.set("hbase.zookeeper.quorum",
hbaseParameters.getZookeeperQuorum());
- if (hbaseParameters.getHbaseExtraConfig() != null) {
-
hbaseParameters.getHbaseExtraConfig().forEach(hbaseConfiguration::set);
- }
- // initialize hbase connection
- hbaseConnection =
ConnectionFactory.createConnection(hbaseConfiguration);
- // initialize hbase mutator
- BufferedMutatorParams bufferedMutatorParams =
- new
BufferedMutatorParams(TableName.valueOf(hbaseParameters.getTable()))
- .pool(HTable.getDefaultExecutor(hbaseConfiguration))
- .writeBufferSize(hbaseParameters.getWriteBufferSize());
- hbaseMutator =
hbaseConnection.getBufferedMutator(bufferedMutatorParams);
+ this.hbaseClient = HbaseClient.createInstance(hbaseParameters);
}
@Override
public void write(SeaTunnelRow element) throws IOException {
Put put = convertRowToPut(element);
- hbaseMutator.mutate(put);
+ hbaseClient.mutate(put);
}
+ @Override
+ public Optional<HbaseCommitInfo> prepareCommit() throws IOException {
+ return Optional.empty();
+ }
+
+ @Override
+ public void abortPrepare() {}
+
@Override
public void close() throws IOException {
- if (hbaseMutator != null) {
- hbaseMutator.close();
- }
- if (hbaseConnection != null) {
- hbaseConnection.close();
+ if (hbaseClient != null) {
+ hbaseClient.close();
}
}
@@ -134,6 +120,7 @@ public class HbaseSinkWriter extends
AbstractSinkWriter<SeaTunnelRow, Void>
.collect(Collectors.toList());
for (Integer writeColumnIndex : writeColumnIndexes) {
String fieldName = seaTunnelRowType.getFieldName(writeColumnIndex);
+ Map<String, String> configurationFamilyNames =
hbaseParameters.getFamilyNames();
String familyName =
hbaseParameters.getFamilyNames().getOrDefault(fieldName,
defaultFamilyName);
byte[] bytes = convertColumnToBytes(row, writeColumnIndex);
diff --git
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSource.java
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSource.java
index 3aca316151..1a597eea13 100644
---
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSource.java
+++
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSource.java
@@ -35,11 +35,9 @@ import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseParameters;
+import
org.apache.seatunnel.connectors.seatunnel.hbase.constant.HbaseIdentifier;
import
org.apache.seatunnel.connectors.seatunnel.hbase.exception.HbaseConnectorException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import com.google.common.collect.Lists;
import java.util.List;
@@ -51,9 +49,6 @@ public class HbaseSource
implements SeaTunnelSource<SeaTunnelRow, HbaseSourceSplit,
HbaseSourceState>,
SupportParallelism,
SupportColumnProjection {
- private static final Logger LOG =
LoggerFactory.getLogger(HbaseSource.class);
- public static final String PLUGIN_NAME = "Hbase";
- private Config pluginConfig;
private SeaTunnelRowType seaTunnelRowType;
private HbaseParameters hbaseParameters;
@@ -61,11 +56,10 @@ public class HbaseSource
@Override
public String getPluginName() {
- return PLUGIN_NAME;
+ return HbaseIdentifier.IDENTIFIER_NAME;
}
HbaseSource(Config pluginConfig) {
- this.pluginConfig = pluginConfig;
CheckResult result =
CheckConfigUtil.checkAllExists(pluginConfig,
ZOOKEEPER_QUORUM.key(), TABLE.key());
if (!result.isSuccess()) {
diff --git
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceFactory.java
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceFactory.java
index 2de385dbd1..5e250337d7 100644
---
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceFactory.java
@@ -26,18 +26,17 @@ import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig;
+import
org.apache.seatunnel.connectors.seatunnel.hbase.constant.HbaseIdentifier;
import com.google.auto.service.AutoService;
import java.io.Serializable;
-import static
org.apache.seatunnel.connectors.seatunnel.hbase.sink.HbaseSinkFactory.IDENTIFIER;
-
@AutoService(Factory.class)
public class HbaseSourceFactory implements TableSourceFactory {
@Override
public String factoryIdentifier() {
- return IDENTIFIER;
+ return HbaseIdentifier.IDENTIFIER_NAME;
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceReader.java
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceReader.java
index 526ac826db..2f78fb280c 100644
---
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceReader.java
+++
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceReader.java
@@ -22,16 +22,12 @@ import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.hbase.client.HbaseClient;
import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseParameters;
import
org.apache.seatunnel.connectors.seatunnel.hbase.format.HBaseDeserializationFormat;
-import
org.apache.seatunnel.connectors.seatunnel.hbase.utils.HbaseConnectionUtil;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.util.Bytes;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
@@ -55,13 +51,13 @@ public class HbaseSourceReader implements
SourceReader<SeaTunnelRow, HbaseSource
private final transient Map<String, byte[][]> namesMap;
- private final SourceReader.Context context;
+ private final Context context;
private final SeaTunnelRowType seaTunnelRowType;
private volatile boolean noMoreSplit = false;
+ private final HbaseClient hbaseClient;
private HbaseParameters hbaseParameters;
private final List<String> columnNames;
- private final transient Connection connection;
private HBaseDeserializationFormat hbaseDeserializationFormat =
new HBaseDeserializationFormat();
@@ -85,8 +81,7 @@ public class HbaseSourceReader implements
SourceReader<SeaTunnelRow, HbaseSource
Preconditions.checkArgument(
column.contains(":") &&
column.split(":").length == 2,
"Invalid column names, it should be
[ColumnFamily:Column] format"));
-
- connection = HbaseConnectionUtil.getHbaseConnection(hbaseParameters);
+ hbaseClient = HbaseClient.createInstance(hbaseParameters);
}
@Override
@@ -103,9 +98,9 @@ public class HbaseSourceReader implements
SourceReader<SeaTunnelRow, HbaseSource
throw new IOException("Failed to close HBase Scanner.", e);
}
}
- if (this.connection != null) {
+ if (this.hbaseClient != null) {
try {
- this.connection.close();
+ this.hbaseClient.close();
} catch (Exception e) {
throw new IOException("Failed to close HBase connection.", e);
}
@@ -119,23 +114,8 @@ public class HbaseSourceReader implements
SourceReader<SeaTunnelRow, HbaseSource
final HbaseSourceSplit split = sourceSplits.poll();
if (Objects.nonNull(split)) {
// read logic
- if (this.currentScanner == null) {
- Scan scan = new Scan();
- scan.withStartRow(split.getStartRow(), true);
- scan.withStopRow(split.getEndRow(), true);
- scan.setCacheBlocks(hbaseParameters.isCacheBlocks());
- scan.setCaching(hbaseParameters.getCaching());
- scan.setBatch(hbaseParameters.getBatch());
- for (String columnName : this.columnNames) {
- String[] columnNameSplit = columnName.split(":");
- scan.addColumn(
- Bytes.toBytes(columnNameSplit[0]),
- Bytes.toBytes(columnNameSplit[1]));
- }
- this.currentScanner =
- this.connection
-
.getTable(TableName.valueOf(hbaseParameters.getTable()))
- .getScanner(scan);
+ if (currentScanner == null) {
+ currentScanner = hbaseClient.scan(split, hbaseParameters,
this.columnNames);
}
for (Result result : currentScanner) {
SeaTunnelRow seaTunnelRow =
diff --git
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceSplitEnumerator.java
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceSplitEnumerator.java
index 094128b174..f5508c9037 100644
---
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceSplitEnumerator.java
+++
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceSplitEnumerator.java
@@ -18,14 +18,10 @@
package org.apache.seatunnel.connectors.seatunnel.hbase.source;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.connectors.seatunnel.hbase.client.HbaseClient;
import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseParameters;
-import
org.apache.seatunnel.connectors.seatunnel.hbase.utils.HbaseConnectionUtil;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.RegionLocator;
import lombok.extern.slf4j.Slf4j;
@@ -43,7 +39,6 @@ public class HbaseSourceSplitEnumerator
/** Source split enumerator context */
private final Context<HbaseSourceSplit> context;
- private Config pluginConfig;
/** The splits that has assigned */
private final Set<HbaseSourceSplit> assignedSplit;
@@ -51,24 +46,29 @@ public class HbaseSourceSplitEnumerator
private Set<HbaseSourceSplit> pendingSplit;
private HbaseParameters hbaseParameters;
- private Connection connection;
+
+ private HbaseClient hbaseClient;
public HbaseSourceSplitEnumerator(
Context<HbaseSourceSplit> context, HbaseParameters
hbaseParameters) {
- this.context = context;
- this.hbaseParameters = hbaseParameters;
- this.assignedSplit = new HashSet<>();
- connection = HbaseConnectionUtil.getHbaseConnection(hbaseParameters);
+ this(context, hbaseParameters, new HashSet<>());
}
public HbaseSourceSplitEnumerator(
Context<HbaseSourceSplit> context,
HbaseParameters hbaseParameters,
HbaseSourceState sourceState) {
+ this(context, hbaseParameters, sourceState.getAssignedSplits());
+ }
+
+ private HbaseSourceSplitEnumerator(
+ Context<HbaseSourceSplit> context,
+ HbaseParameters hbaseParameters,
+ Set<HbaseSourceSplit> assignedSplit) {
this.context = context;
this.hbaseParameters = hbaseParameters;
- this.assignedSplit = sourceState.getAssignedSplits();
- connection = HbaseConnectionUtil.getHbaseConnection(hbaseParameters);
+ this.assignedSplit = assignedSplit;
+ this.hbaseClient = HbaseClient.createInstance(hbaseParameters);
}
@Override
@@ -157,8 +157,7 @@ public class HbaseSourceSplitEnumerator
List<HbaseSourceSplit> splits = new ArrayList<>();
try {
- RegionLocator regionLocator =
-
connection.getRegionLocator(TableName.valueOf(hbaseParameters.getTable()));
+ RegionLocator regionLocator =
hbaseClient.getRegionLocator(hbaseParameters.getTable());
byte[][] startKeys = regionLocator.getStartKeys();
byte[][] endKeys = regionLocator.getEndKeys();
if (startKeys.length != endKeys.length) {
diff --git
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/state/HbaseAggregatedCommitInfo.java
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/state/HbaseAggregatedCommitInfo.java
new file mode 100644
index 0000000000..c1996dc057
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/state/HbaseAggregatedCommitInfo.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hbase.state;
+
+import java.io.Serializable;
+
+public class HbaseAggregatedCommitInfo implements Serializable {}
diff --git
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/state/HbaseCommitInfo.java
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/state/HbaseCommitInfo.java
new file mode 100644
index 0000000000..39999ceddc
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/state/HbaseCommitInfo.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hbase.state;
+
+import java.io.Serializable;
+
+public class HbaseCommitInfo implements Serializable {}
diff --git
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/state/HbaseSinkState.java
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/state/HbaseSinkState.java
new file mode 100644
index 0000000000..6e1f068cf6
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/state/HbaseSinkState.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hbase.state;
+
+import java.io.Serializable;
+
+public class HbaseSinkState implements Serializable {}
diff --git
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/utils/HbaseConnectionUtil.java
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/utils/HbaseConnectionUtil.java
deleted file mode 100644
index f006986e66..0000000000
---
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/utils/HbaseConnectionUtil.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.hbase.utils;
-
-import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseParameters;
-import
org.apache.seatunnel.connectors.seatunnel.hbase.exception.HbaseConnectorErrorCode;
-import
org.apache.seatunnel.connectors.seatunnel.hbase.exception.HbaseConnectorException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-
-import java.io.IOException;
-
-public class HbaseConnectionUtil {
- public static Connection getHbaseConnection(HbaseParameters
hbaseParameters) {
- Configuration hbaseConfiguration = HBaseConfiguration.create();
- hbaseConfiguration.set("hbase.zookeeper.quorum",
hbaseParameters.getZookeeperQuorum());
- if (hbaseParameters.getHbaseExtraConfig() != null) {
-
hbaseParameters.getHbaseExtraConfig().forEach(hbaseConfiguration::set);
- }
- // initialize hbase connection
- try {
- Connection connection =
ConnectionFactory.createConnection(hbaseConfiguration);
- return connection;
- } catch (IOException e) {
- String errorMsg = "Build Hbase connection failed.";
- throw new
HbaseConnectorException(HbaseConnectorErrorCode.CONNECTION_FAILED, errorMsg);
- }
- }
-}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java
index fe736f965e..1957e1bd08 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java
@@ -29,10 +29,12 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.jupiter.api.AfterAll;
@@ -47,6 +49,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Objects;
+import java.util.UUID;
@Slf4j
@DisabledOnContainer(
@@ -79,6 +82,7 @@ public class HbaseIT extends TestSuiteBase implements
TestResource {
public void startUp() throws Exception {
hbaseCluster = new HbaseCluster();
hbaseConnection = hbaseCluster.startService();
+ admin = hbaseConnection.getAdmin();
// Create table for hbase sink test
log.info("initial");
hbaseCluster.createTable(TABLE_NAME, Arrays.asList(FAMILY_NAME));
@@ -112,6 +116,87 @@ public class HbaseIT extends TestSuiteBase implements
TestResource {
Assertions.assertEquals(0, sourceExecResult.getExitCode());
}
+ @TestTemplate
+ public void testHbaseSinkWithErrorWhenDataExists(TestContainer container)
+ throws IOException, InterruptedException {
+ deleteData(table);
+ insertData(table);
+ Assertions.assertEquals(5, countData(table));
+ Container.ExecResult execResult =
+
container.executeJob("/fake_to_hbase_with_error_when_data_exists.conf");
+ Assertions.assertEquals(1, execResult.getExitCode());
+ }
+
+ @TestTemplate
+ public void testHbaseSinkWithRecreateSchema(TestContainer container)
+ throws IOException, InterruptedException {
+ String tableName = "seatunnel_test_with_recreate_schema";
+ TableName table = TableName.valueOf(tableName);
+ dropTable(table);
+ hbaseCluster.createTable(tableName, Arrays.asList("test_rs"));
+ TableDescriptor descriptorBefore =
hbaseConnection.getTable(table).getDescriptor();
+ String[] familiesBefore =
+ Arrays.stream(descriptorBefore.getColumnFamilies())
+ .map(f -> f.getNameAsString())
+ .toArray(String[]::new);
+ Assertions.assertTrue(Arrays.equals(familiesBefore, new String[]
{"test_rs"}));
+ Container.ExecResult execResult =
+
container.executeJob("/fake_to_hbase_with_recreate_schema.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ TableDescriptor descriptorAfter =
hbaseConnection.getTable(table).getDescriptor();
+ String[] familiesAfter =
+ Arrays.stream(descriptorAfter.getColumnFamilies())
+ .map(f -> f.getNameAsString())
+ .toArray(String[]::new);
+ Assertions.assertTrue(!Arrays.equals(familiesBefore, familiesAfter));
+ }
+
+ @TestTemplate
+ public void testHbaseSinkWithDropData(TestContainer container)
+ throws IOException, InterruptedException {
+ deleteData(table);
+ insertData(table);
+ countData(table);
+ Assertions.assertEquals(5, countData(table));
+ Container.ExecResult execResult =
+ container.executeJob("/fake_to_hbase_with_drop_data.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ Assertions.assertEquals(5, countData(table));
+ }
+
+ @TestTemplate
+ public void testHbaseSinkWithCreateWhenNotExists(TestContainer container)
+ throws IOException, InterruptedException {
+ TableName seatunnelTestWithCreateWhenNotExists =
+
TableName.valueOf("seatunnel_test_with_create_when_not_exists");
+ dropTable(seatunnelTestWithCreateWhenNotExists);
+ Container.ExecResult execResult =
+
container.executeJob("/fake_to_hbase_with_create_when_not_exists.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ Assertions.assertEquals(5,
countData(seatunnelTestWithCreateWhenNotExists));
+ }
+
+ @TestTemplate
+ public void testHbaseSinkWithAppendData(TestContainer container)
+ throws IOException, InterruptedException {
+ deleteData(table);
+ insertData(table);
+ countData(table);
+ Assertions.assertEquals(5, countData(table));
+ Container.ExecResult execResult =
+ container.executeJob("/fake_to_hbase_with_append_data.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ Assertions.assertEquals(10, countData(table));
+ }
+
+ @TestTemplate
+ public void testHbaseSinkWithErrorWhenNotExists(TestContainer container)
+ throws IOException, InterruptedException {
+ Container.ExecResult execResult =
+
container.executeJob("/fake_to_hbase_with_error_when_not_exists.conf");
+ Assertions.assertEquals(1, execResult.getExitCode());
+ }
+
@TestTemplate
public void testHbaseSinkWithArray(TestContainer container)
throws IOException, InterruptedException {
@@ -223,6 +308,13 @@ public class HbaseIT extends TestSuiteBase implements
TestResource {
scanner.close();
}
+ private void dropTable(TableName tableName) throws IOException {
+ if (admin.tableExists(tableName)) {
+ admin.disableTable(tableName);
+ admin.deleteTable(tableName);
+ }
+ }
+
private void deleteData(TableName table) throws IOException {
Table hbaseTable = hbaseConnection.getTable(table);
Scan scan = new Scan();
@@ -234,6 +326,32 @@ public class HbaseIT extends TestSuiteBase implements
TestResource {
}
}
+ private void insertData(TableName table) throws IOException {
+ Table hbaseTable = hbaseConnection.getTable(table);
+ for (int i = 0; i < 5; i++) {
+ String rowKey = "row" + UUID.randomUUID();
+ String value = "value" + i;
+ hbaseTable.put(
+ new Put(Bytes.toBytes(rowKey))
+ .addColumn(
+ Bytes.toBytes(FAMILY_NAME),
+ Bytes.toBytes("name"),
+ Bytes.toBytes(value)));
+ }
+ }
+
+ private int countData(TableName table) throws IOException {
+ Table hbaseTable = hbaseConnection.getTable(table);
+ Scan scan = new Scan();
+ ResultScanner scanner = hbaseTable.getScanner(scan);
+ int count = 0;
+ for (Result result = scanner.next(); result != null; result =
scanner.next()) {
+ count++;
+ }
+ scanner.close();
+ return count;
+ }
+
public ArrayList<Result> readData(TableName table) throws IOException {
Table hbaseTable = hbaseConnection.getTable(table);
Scan scan = new Scan();
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake_to_hbase_with_append_data.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake_to_hbase_with_append_data.conf
new file mode 100644
index 0000000000..0778d8cb36
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake_to_hbase_with_append_data.conf
@@ -0,0 +1,52 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ row.num = 5
+ schema {
+ fields {
+ name = string
+ age = int
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_boolean = boolean
+ }
+ }
+ }
+}
+
+sink {
+ Hbase {
+ zookeeper_quorum = "hbase_e2e:2181"
+ table = "seatunnel_test"
+ rowkey_column = ["name"]
+ family_name {
+ all_columns = info
+ }
+ schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
+ data_save_mode = "APPEND_DATA"
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake_to_hbase_with_create_when_not_exists.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake_to_hbase_with_create_when_not_exists.conf
new file mode 100644
index 0000000000..2132717082
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake_to_hbase_with_create_when_not_exists.conf
@@ -0,0 +1,51 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ row.num = 5
+ schema {
+ fields {
+ name = string
+ age = int
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_boolean = boolean
+ }
+ }
+ }
+}
+
+sink {
+ Hbase {
+ zookeeper_quorum = "hbase_e2e:2181"
+ table = "seatunnel_test_with_create_when_not_exists"
+ rowkey_column = ["name"]
+ family_name {
+ all_columns = info
+ }
+ schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake_to_hbase_with_drop_data.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake_to_hbase_with_drop_data.conf
new file mode 100644
index 0000000000..66b3981206
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake_to_hbase_with_drop_data.conf
@@ -0,0 +1,52 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ row.num = 5
+ schema {
+ fields {
+ name = string
+ age = int
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_boolean = boolean
+ }
+ }
+ }
+}
+
+sink {
+ Hbase {
+ zookeeper_quorum = "hbase_e2e:2181"
+ table = "seatunnel_test_with_create_when_not_exists"
+ rowkey_column = ["name"]
+ family_name {
+ all_columns = info
+ }
+ schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
+ data_save_mode = "DROP_DATA"
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake_to_hbase_with_error_when_data_exists.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake_to_hbase_with_error_when_data_exists.conf
new file mode 100644
index 0000000000..00e0485e3d
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake_to_hbase_with_error_when_data_exists.conf
@@ -0,0 +1,52 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ row.num = 5
+ schema {
+ fields {
+ name = string
+ age = int
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_boolean = boolean
+ }
+ }
+ }
+}
+
+sink {
+ Hbase {
+ zookeeper_quorum = "hbase_e2e:2181"
+ table = "seatunnel_test"
+ rowkey_column = ["name"]
+ family_name {
+ all_columns = info
+ }
+ schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
+ data_save_mode = "ERROR_WHEN_DATA_EXISTS"
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake_to_hbase_with_error_when_not_exists.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake_to_hbase_with_error_when_not_exists.conf
new file mode 100644
index 0000000000..359b71b79f
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake_to_hbase_with_error_when_not_exists.conf
@@ -0,0 +1,51 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ row.num = 5
+ schema {
+ fields {
+ name = string
+ age = int
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_boolean = boolean
+ }
+ }
+ }
+}
+
+sink {
+ Hbase {
+ zookeeper_quorum = "hbase_e2e:2181"
+ table = "seatunnel_test_with_error_when_not_exists"
+ rowkey_column = ["name"]
+ family_name {
+ all_columns = info
+ }
+ schema_save_mode = "ERROR_WHEN_SCHEMA_NOT_EXIST"
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake_to_hbase_with_recreate_schema.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake_to_hbase_with_recreate_schema.conf
new file mode 100644
index 0000000000..c8a8c43d9c
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake_to_hbase_with_recreate_schema.conf
@@ -0,0 +1,51 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ row.num = 5
+ schema {
+ fields {
+ name = string
+ age = int
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_boolean = boolean
+ }
+ }
+ }
+}
+
+sink {
+ Hbase {
+ zookeeper_quorum = "hbase_e2e:2181"
+ table = "seatunnel_test_with_recreate_schema"
+ rowkey_column = ["name"]
+ family_name {
+ all_columns = info
+ }
+ schema_save_mode = "RECREATE_SCHEMA"
+ }
+}
\ No newline at end of file