This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new b6a702b58f [Improve] hbase options (#8923)
b6a702b58f is described below
commit b6a702b58f0503faf2d127332c4386c075006073
Author: Jarvis <[email protected]>
AuthorDate: Mon Mar 10 20:06:16 2025 +0800
[Improve] hbase options (#8923)
---
.../seatunnel/api/ConnectorOptionCheckTest.java | 2 -
.../seatunnel/hbase/config/HbaseBaseOptions.java | 41 ++++++++++
.../seatunnel/hbase/config/HbaseParameters.java | 95 +++++++++-------------
.../{HbaseConfig.java => HbaseSinkOptions.java} | 17 +---
.../seatunnel/hbase/config/HbaseSourceOptions.java | 20 +++++
.../connectors/seatunnel/hbase/sink/HbaseSink.java | 6 +-
.../seatunnel/hbase/sink/HbaseSinkFactory.java | 43 ++++------
.../seatunnel/hbase/source/HbaseSource.java | 36 ++------
.../seatunnel/hbase/source/HbaseSourceFactory.java | 12 ++-
.../seatunnel/e2e/connector/hbase/HbaseIT.java | 11 +--
10 files changed, 142 insertions(+), 141 deletions(-)
diff --git
a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
index d301086222..0df7f57c1e 100644
---
a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
+++
b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
@@ -183,12 +183,10 @@ public class ConnectorOptionCheckTest {
whiteList.add("TDengineSourceOptions");
whiteList.add("PulsarSourceOptions");
whiteList.add("FakeSourceOptions");
- whiteList.add("HbaseSinkOptions");
whiteList.add("MongodbSinkOptions");
whiteList.add("IoTDBSinkOptions");
whiteList.add("EasysearchSourceOptions");
whiteList.add("IcebergSourceOptions");
- whiteList.add("HbaseSourceOptions");
whiteList.add("PaimonSourceOptions");
whiteList.add("IoTDBSourceOptions");
whiteList.add("SlsSourceOptions");
diff --git
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseBaseOptions.java
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseBaseOptions.java
new file mode 100644
index 0000000000..46bb9cb0f6
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseBaseOptions.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hbase.config;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
+import java.util.List;
+
+public class HbaseBaseOptions {
+
+ public static final Option<String> ZOOKEEPER_QUORUM =
+ Options.key("zookeeper_quorum")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Hbase zookeeper quorum");
+
+ public static final Option<String> TABLE =
+
Options.key("table").stringType().noDefaultValue().withDescription("Hbase table
name");
+
+ public static final Option<List<String>> ROWKEY_COLUMNS =
+ Options.key("rowkey_column")
+ .listType()
+ .noDefaultValue()
+ .withDescription("Hbase rowkey column");
+}
diff --git
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java
index 66b4eb967b..35d9fbfcbe 100644
---
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java
+++
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java
@@ -17,10 +17,7 @@
package org.apache.seatunnel.connectors.seatunnel.hbase.config;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
-import org.apache.seatunnel.common.config.TypesafeConfigUtils;
import lombok.Builder;
import lombok.Getter;
@@ -29,22 +26,6 @@ import java.io.Serializable;
import java.util.List;
import java.util.Map;
-import static
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ENCODING;
-import static
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.FAMILY_NAME;
-import static
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.HBASE_BATCH_CONFIG;
-import static
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.HBASE_CACHE_BLOCKS_CONFIG;
-import static
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.HBASE_CACHING_CONFIG;
-import static
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.HBASE_EXTRA_CONFIG;
-import static
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.HBASE_TTL_CONFIG;
-import static
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.NULL_MODE;
-import static
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ROWKEY_COLUMNS;
-import static
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ROWKEY_DELIMITER;
-import static
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.TABLE;
-import static
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.VERSION_COLUMN;
-import static
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.WAL_WRITE;
-import static
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.WRITE_BUFFER_SIZE;
-import static
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ZOOKEEPER_QUORUM;
-
@Builder
@Getter
public class HbaseParameters implements Serializable {
@@ -65,27 +46,32 @@ public class HbaseParameters implements Serializable {
private Map<String, String> hbaseExtraConfig;
- @Builder.Default private int caching = HBASE_CACHING_CONFIG.defaultValue();
+ @Builder.Default private int caching =
HbaseSinkOptions.HBASE_CACHING_CONFIG.defaultValue();
- @Builder.Default private int batch = HBASE_BATCH_CONFIG.defaultValue();
+ @Builder.Default private int batch =
HbaseSinkOptions.HBASE_BATCH_CONFIG.defaultValue();
- @Builder.Default private Long ttl = HBASE_TTL_CONFIG.defaultValue();
+ @Builder.Default private Long ttl =
HbaseSinkOptions.HBASE_TTL_CONFIG.defaultValue();
- @Builder.Default private boolean cacheBlocks =
HBASE_CACHE_BLOCKS_CONFIG.defaultValue();
+ @Builder.Default
+ private boolean cacheBlocks =
HbaseSinkOptions.HBASE_CACHE_BLOCKS_CONFIG.defaultValue();
- @Builder.Default private String rowkeyDelimiter =
ROWKEY_DELIMITER.defaultValue();
+ @Builder.Default
+ private String rowkeyDelimiter =
HbaseSinkOptions.ROWKEY_DELIMITER.defaultValue();
- @Builder.Default private HbaseConfig.NullMode nullMode =
NULL_MODE.defaultValue();
+ @Builder.Default
+ private HbaseSinkOptions.NullMode nullMode =
HbaseSinkOptions.NULL_MODE.defaultValue();
- @Builder.Default private boolean walWrite = WAL_WRITE.defaultValue();
+ @Builder.Default private boolean walWrite =
HbaseSinkOptions.WAL_WRITE.defaultValue();
- @Builder.Default private int writeBufferSize =
WRITE_BUFFER_SIZE.defaultValue();
+ @Builder.Default
+ private int writeBufferSize =
HbaseSinkOptions.WRITE_BUFFER_SIZE.defaultValue();
- @Builder.Default private HbaseConfig.EnCoding enCoding =
ENCODING.defaultValue();
+ @Builder.Default
+ private HbaseSinkOptions.EnCoding enCoding =
HbaseSinkOptions.ENCODING.defaultValue();
public static HbaseParameters buildWithConfig(ReadonlyConfig config) {
HbaseParametersBuilder builder = HbaseParameters.builder();
- String table = config.get(TABLE);
+ String table = config.get(HbaseBaseOptions.TABLE);
int colonIndex = table.indexOf(':');
if (colonIndex != -1) {
String namespace = table.substring(0, colonIndex);
@@ -97,29 +83,29 @@ public class HbaseParameters implements Serializable {
}
// required parameters
- builder.zookeeperQuorum(config.get(ZOOKEEPER_QUORUM));
- builder.rowkeyColumns(config.get(ROWKEY_COLUMNS));
- builder.familyNames(config.get(FAMILY_NAME));
-
- builder.rowkeyDelimiter(config.get(ROWKEY_DELIMITER));
- builder.versionColumn(config.get(VERSION_COLUMN));
- String nullMode = String.valueOf(config.get(NULL_MODE));
- builder.nullMode(HbaseConfig.NullMode.valueOf(nullMode.toUpperCase()));
- builder.walWrite(config.get(WAL_WRITE));
- builder.writeBufferSize(config.get(WRITE_BUFFER_SIZE));
- String encoding = String.valueOf(config.get(ENCODING));
- builder.enCoding(HbaseConfig.EnCoding.valueOf(encoding.toUpperCase()));
- builder.hbaseExtraConfig(config.get(HBASE_EXTRA_CONFIG));
- builder.ttl(config.get(HBASE_TTL_CONFIG));
+ builder.zookeeperQuorum(config.get(HbaseBaseOptions.ZOOKEEPER_QUORUM));
+ builder.rowkeyColumns(config.get(HbaseBaseOptions.ROWKEY_COLUMNS));
+ builder.familyNames(config.get(HbaseSinkOptions.FAMILY_NAME));
+
+ builder.rowkeyDelimiter(config.get(HbaseSinkOptions.ROWKEY_DELIMITER));
+ builder.versionColumn(config.get(HbaseSinkOptions.VERSION_COLUMN));
+ String nullMode =
String.valueOf(config.get(HbaseSinkOptions.NULL_MODE));
+
builder.nullMode(HbaseSinkOptions.NullMode.valueOf(nullMode.toUpperCase()));
+ builder.walWrite(config.get(HbaseSinkOptions.WAL_WRITE));
+
builder.writeBufferSize(config.get(HbaseSinkOptions.WRITE_BUFFER_SIZE));
+ String encoding =
String.valueOf(config.get(HbaseSinkOptions.ENCODING));
+
builder.enCoding(HbaseSinkOptions.EnCoding.valueOf(encoding.toUpperCase()));
+
builder.hbaseExtraConfig(config.get(HbaseSinkOptions.HBASE_EXTRA_CONFIG));
+ builder.ttl(config.get(HbaseSinkOptions.HBASE_TTL_CONFIG));
return builder.build();
}
- public static HbaseParameters buildWithSourceConfig(Config pluginConfig) {
+ public static HbaseParameters buildWithSourceConfig(ReadonlyConfig
pluginConfig) {
HbaseParametersBuilder builder = HbaseParameters.builder();
// required parameters
-
builder.zookeeperQuorum(pluginConfig.getString(ZOOKEEPER_QUORUM.key()));
- String table = pluginConfig.getString(TABLE.key());
+
builder.zookeeperQuorum(pluginConfig.get(HbaseBaseOptions.ZOOKEEPER_QUORUM));
+ String table = pluginConfig.get(HbaseBaseOptions.TABLE);
int colonIndex = table.indexOf(':');
if (colonIndex != -1) {
String namespace = table.substring(0, colonIndex);
@@ -129,18 +115,17 @@ public class HbaseParameters implements Serializable {
builder.table(table);
}
- if (pluginConfig.hasPath(HBASE_EXTRA_CONFIG.key())) {
- Config extraConfig =
pluginConfig.getConfig(HBASE_EXTRA_CONFIG.key());
-
builder.hbaseExtraConfig(TypesafeConfigUtils.configToMap(extraConfig));
+ if
(pluginConfig.getOptional(HbaseSinkOptions.HBASE_EXTRA_CONFIG).isPresent()) {
+
builder.hbaseExtraConfig(pluginConfig.get(HbaseSinkOptions.HBASE_EXTRA_CONFIG));
}
- if (pluginConfig.hasPath(HBASE_CACHING_CONFIG.key())) {
- builder.caching(pluginConfig.getInt(HBASE_CACHING_CONFIG.key()));
+ if
(pluginConfig.getOptional(HbaseSinkOptions.HBASE_CACHING_CONFIG).isPresent()) {
+
builder.caching(pluginConfig.get(HbaseSinkOptions.HBASE_CACHING_CONFIG));
}
- if (pluginConfig.hasPath(HBASE_BATCH_CONFIG.key())) {
- builder.batch(pluginConfig.getInt(HBASE_BATCH_CONFIG.key()));
+ if
(pluginConfig.getOptional(HbaseSinkOptions.HBASE_BATCH_CONFIG).isPresent()) {
+
builder.batch(pluginConfig.get(HbaseSinkOptions.HBASE_BATCH_CONFIG));
}
- if (pluginConfig.hasPath(HBASE_CACHE_BLOCKS_CONFIG.key())) {
-
builder.cacheBlocks(pluginConfig.getBoolean(HBASE_CACHE_BLOCKS_CONFIG.key()));
+ if
(pluginConfig.getOptional(HbaseSinkOptions.HBASE_CACHE_BLOCKS_CONFIG).isPresent())
{
+
builder.cacheBlocks(pluginConfig.get(HbaseSinkOptions.HBASE_CACHE_BLOCKS_CONFIG));
}
return builder.build();
}
diff --git
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseConfig.java
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseSinkOptions.java
similarity index 93%
rename from
seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseConfig.java
rename to
seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseSinkOptions.java
index 2921e1f91c..7a520ee5ff 100644
---
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseConfig.java
+++
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseSinkOptions.java
@@ -30,18 +30,7 @@ import static
org.apache.seatunnel.api.sink.DataSaveMode.APPEND_DATA;
import static org.apache.seatunnel.api.sink.DataSaveMode.DROP_DATA;
import static
org.apache.seatunnel.api.sink.DataSaveMode.ERROR_WHEN_DATA_EXISTS;
-public class HbaseConfig {
-
- private static final Integer DEFAULT_BUFFER_SIZE = 8 * 1024 * 1024;
-
- public static final Option<String> ZOOKEEPER_QUORUM =
- Options.key("zookeeper_quorum")
- .stringType()
- .noDefaultValue()
- .withDescription("Hbase zookeeper quorum");
-
- public static final Option<String> TABLE =
-
Options.key("table").stringType().noDefaultValue().withDescription("Hbase table
name");
+public class HbaseSinkOptions extends HbaseBaseOptions {
public static final Option<List<String>> ROWKEY_COLUMNS =
Options.key("rowkey_column")
@@ -49,6 +38,8 @@ public class HbaseConfig {
.noDefaultValue()
.withDescription("Hbase rowkey column");
+ private static final Integer DEFAULT_BUFFER_SIZE = 8 * 1024 * 1024;
+
public static final Option<String> ROWKEY_DELIMITER =
Options.key("rowkey_delimiter")
.stringType()
@@ -149,6 +140,4 @@ public class HbaseConfig {
UTF8,
GBK;
}
-
- private HbaseConfig() {}
}
diff --git
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseSourceOptions.java
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseSourceOptions.java
new file mode 100644
index 0000000000..e1f151054d
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseSourceOptions.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hbase.config;
+
+public class HbaseSourceOptions extends HbaseBaseOptions {}
diff --git
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSink.java
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSink.java
index e8d7b8b205..9cd37f9986 100644
---
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSink.java
+++
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSink.java
@@ -32,8 +32,8 @@ import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.factory.CatalogFactory;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig;
import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseParameters;
+import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseSinkOptions;
import
org.apache.seatunnel.connectors.seatunnel.hbase.constant.HbaseIdentifier;
import
org.apache.seatunnel.connectors.seatunnel.hbase.state.HbaseAggregatedCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.hbase.state.HbaseCommitInfo;
@@ -102,8 +102,8 @@ public class HbaseSink
return Optional.empty();
}
Catalog catalog =
catalogFactory.createCatalog(catalogFactory.factoryIdentifier(), config);
- SchemaSaveMode schemaSaveMode =
config.get(HbaseConfig.SCHEMA_SAVE_MODE);
- DataSaveMode dataSaveMode = config.get(HbaseConfig.DATA_SAVE_MODE);
+ SchemaSaveMode schemaSaveMode =
config.get(HbaseSinkOptions.SCHEMA_SAVE_MODE);
+ DataSaveMode dataSaveMode =
config.get(HbaseSinkOptions.DATA_SAVE_MODE);
TablePath tablePath =
TablePath.of(hbaseParameters.getNamespace(),
hbaseParameters.getTable());
return Optional.of(
diff --git
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkFactory.java
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkFactory.java
index 9f3b2fdd5e..d265c4f431 100644
---
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkFactory.java
@@ -24,29 +24,14 @@ import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
+import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseSinkOptions;
import
org.apache.seatunnel.connectors.seatunnel.hbase.constant.HbaseIdentifier;
import com.google.auto.service.AutoService;
-import static
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.DATA_SAVE_MODE;
-import static
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ENCODING;
-import static
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.FAMILY_NAME;
-import static
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.HBASE_EXTRA_CONFIG;
-import static
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.NULL_MODE;
-import static
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ROWKEY_COLUMNS;
-import static
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ROWKEY_DELIMITER;
-import static
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.SCHEMA_SAVE_MODE;
-import static
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.TABLE;
-import static
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.VERSION_COLUMN;
-import static
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.WAL_WRITE;
-import static
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.WRITE_BUFFER_SIZE;
-import static
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ZOOKEEPER_QUORUM;
-
@AutoService(Factory.class)
public class HbaseSinkFactory implements TableSinkFactory {
- public static final String IDENTIFIER = "Hbase";
-
@Override
public String factoryIdentifier() {
return HbaseIdentifier.IDENTIFIER_NAME;
@@ -56,20 +41,20 @@ public class HbaseSinkFactory implements TableSinkFactory {
public OptionRule optionRule() {
return OptionRule.builder()
.required(
- ZOOKEEPER_QUORUM,
- TABLE,
- ROWKEY_COLUMNS,
- FAMILY_NAME,
- SCHEMA_SAVE_MODE,
- DATA_SAVE_MODE)
+ HbaseSinkOptions.ZOOKEEPER_QUORUM,
+ HbaseSinkOptions.TABLE,
+ HbaseSinkOptions.ROWKEY_COLUMNS,
+ HbaseSinkOptions.FAMILY_NAME,
+ HbaseSinkOptions.SCHEMA_SAVE_MODE,
+ HbaseSinkOptions.DATA_SAVE_MODE)
.optional(
- ROWKEY_DELIMITER,
- VERSION_COLUMN,
- NULL_MODE,
- WAL_WRITE,
- WRITE_BUFFER_SIZE,
- ENCODING,
- HBASE_EXTRA_CONFIG,
+ HbaseSinkOptions.ROWKEY_DELIMITER,
+ HbaseSinkOptions.VERSION_COLUMN,
+ HbaseSinkOptions.NULL_MODE,
+ HbaseSinkOptions.WAL_WRITE,
+ HbaseSinkOptions.WRITE_BUFFER_SIZE,
+ HbaseSinkOptions.ENCODING,
+ HbaseSinkOptions.HBASE_EXTRA_CONFIG,
SinkConnectorCommonOptions.MULTI_TABLE_SINK_REPLICA)
.build();
}
diff --git
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSource.java
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSource.java
index 1178878aa7..1ff95b4d97 100644
---
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSource.java
+++
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSource.java
@@ -19,9 +19,7 @@
package org.apache.seatunnel.connectors.seatunnel.hbase.source;
import org.apache.seatunnel.shade.com.google.common.collect.Lists;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceReader;
@@ -29,48 +27,27 @@ import
org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.api.source.SupportColumnProjection;
import org.apache.seatunnel.api.source.SupportParallelism;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
-import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
-import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseParameters;
import
org.apache.seatunnel.connectors.seatunnel.hbase.constant.HbaseIdentifier;
-import
org.apache.seatunnel.connectors.seatunnel.hbase.exception.HbaseConnectorException;
import java.util.List;
-import static
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.TABLE;
-import static
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ZOOKEEPER_QUORUM;
-
public class HbaseSource
implements SeaTunnelSource<SeaTunnelRow, HbaseSourceSplit,
HbaseSourceState>,
SupportParallelism,
SupportColumnProjection {
- private SeaTunnelRowType seaTunnelRowType;
- private HbaseParameters hbaseParameters;
-
- private CatalogTable catalogTable;
+ private final CatalogTable catalogTable;
+ private final HbaseParameters hbaseParameters;
@Override
public String getPluginName() {
return HbaseIdentifier.IDENTIFIER_NAME;
}
- HbaseSource(Config pluginConfig) {
- CheckResult result =
- CheckConfigUtil.checkAllExists(pluginConfig,
ZOOKEEPER_QUORUM.key(), TABLE.key());
- if (!result.isSuccess()) {
- throw new HbaseConnectorException(
- SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
- String.format(
- "PluginName: %s, PluginType: %s, Message: %s",
- getPluginName(), PluginType.SOURCE,
result.getMsg()));
- }
- this.hbaseParameters =
HbaseParameters.buildWithSourceConfig(pluginConfig);
- this.catalogTable = CatalogTableUtil.buildWithConfig(pluginConfig);
- this.seaTunnelRowType = catalogTable.getSeaTunnelRowType();
+ HbaseSource(HbaseParameters hbaseParameters, CatalogTable catalogTable) {
+ this.hbaseParameters = hbaseParameters;
+ this.catalogTable = catalogTable;
}
@Override
@@ -86,7 +63,8 @@ public class HbaseSource
@Override
public SourceReader<SeaTunnelRow, HbaseSourceSplit> createReader(
SourceReader.Context readerContext) throws Exception {
- return new HbaseSourceReader(hbaseParameters, readerContext,
seaTunnelRowType);
+ return new HbaseSourceReader(
+ hbaseParameters, readerContext,
catalogTable.getSeaTunnelRowType());
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceFactory.java
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceFactory.java
index 5e250337d7..70dcdab41e 100644
---
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceFactory.java
@@ -21,11 +21,13 @@ package
org.apache.seatunnel.connectors.seatunnel.hbase.source;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.connector.TableSource;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
-import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig;
+import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseParameters;
+import
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseSourceOptions;
import
org.apache.seatunnel.connectors.seatunnel.hbase.constant.HbaseIdentifier;
import com.google.auto.service.AutoService;
@@ -42,8 +44,8 @@ public class HbaseSourceFactory implements TableSourceFactory
{
@Override
public OptionRule optionRule() {
return OptionRule.builder()
- .required(HbaseConfig.ZOOKEEPER_QUORUM)
- .required(HbaseConfig.TABLE)
+ .required(HbaseSourceOptions.ZOOKEEPER_QUORUM)
+ .required(HbaseSourceOptions.TABLE)
.build();
}
@@ -57,6 +59,8 @@ public class HbaseSourceFactory implements TableSourceFactory
{
TableSource<T, SplitT, StateT>
createSource(TableSourceFactoryContext context) {
return () ->
(SeaTunnelSource<T, SplitT, StateT>)
- new HbaseSource(context.getOptions().toConfig());
+ new HbaseSource(
+
HbaseParameters.buildWithSourceConfig(context.getOptions()),
+
CatalogTableUtil.buildWithConfig(context.getOptions()));
}
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java
index 5274c1a8c9..48f3e48eee 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java
@@ -23,8 +23,9 @@ import org.apache.seatunnel.api.table.catalog.TablePath;
import
org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException;
import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
import org.apache.seatunnel.connectors.seatunnel.hbase.catalog.HbaseCatalog;
-import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig;
+import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseBaseOptions;
import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseParameters;
+import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseSinkOptions;
import org.apache.seatunnel.e2e.common.TestResource;
import org.apache.seatunnel.e2e.common.TestSuiteBase;
import org.apache.seatunnel.e2e.common.container.EngineType;
@@ -109,10 +110,10 @@ public class HbaseIT extends TestSuiteBase implements
TestResource {
hbaseCluster.createTable(MULTI_TABLE_TWO_NAME,
Arrays.asList(FAMILY_NAME));
Map<String, Object> config = new HashMap<>();
- config.put(HbaseConfig.ZOOKEEPER_QUORUM.key(),
hbaseCluster.getZookeeperQuorum());
- config.put(HbaseConfig.ROWKEY_COLUMNS.key(), "id");
- config.put(HbaseConfig.FAMILY_NAME.key(), Maps.of("all_columns",
FAMILY_NAME));
- config.put(HbaseConfig.TABLE.key(), TABLE_NAME);
+ config.put(HbaseBaseOptions.ZOOKEEPER_QUORUM.key(),
hbaseCluster.getZookeeperQuorum());
+ config.put(HbaseBaseOptions.ROWKEY_COLUMNS.key(), "id");
+ config.put(HbaseSinkOptions.FAMILY_NAME.key(), Maps.of("all_columns",
FAMILY_NAME));
+ config.put(HbaseBaseOptions.TABLE.key(), TABLE_NAME);
// config.put(HbaseConfig.)
catalog =