This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 4d9287fce4 [Feature] Hive Source/Sink support multiple table (#5929)
4d9287fce4 is described below
commit 4d9287fce4943a30e31e5dbda013c1cee17034ab
Author: Wenjun Ruan <[email protected]>
AuthorDate: Tue Apr 30 14:41:44 2024 +0800
[Feature] Hive Source/Sink support multiple table (#5929)
---
docs/en/connector-v2/sink/Hive.md | 52 +++-
docs/en/connector-v2/source/Hive.md | 25 +-
.../api/table/catalog/CatalogTableUtil.java | 39 +++
.../hive/commit/HiveSinkAggregatedCommitter.java | 22 +-
.../seatunnel/hive/config/BaseHiveOptions.java | 43 +++
.../seatunnel/hive/config/HiveConfig.java | 31 ---
.../seatunnel/hive/config/HiveConstants.java | 36 +++
.../hive/exception/HiveConnectorErrorCode.java | 4 +-
.../connectors/seatunnel/hive/sink/HiveSink.java | 298 +++++++++++---------
.../seatunnel/hive/sink/HiveSinkFactory.java | 85 +++++-
.../{HiveSinkFactory.java => HiveSinkOptions.java} | 32 +--
.../hive/sink/writter/HiveSinkWriter.java | 51 ++++
.../seatunnel/hive/source/HiveSource.java | 238 ++++------------
.../seatunnel/hive/source/HiveSourceFactory.java | 23 +-
.../hive/source/config/HiveSourceConfig.java | 299 +++++++++++++++++++++
.../hive/source/config/HiveSourceOptions.java | 36 +++
.../config/MultipleTableHiveSourceConfig.java | 55 ++++
.../reader/MultipleTableHiveSourceReader.java | 129 +++++++++
.../hive/source/split/HiveSourceSplit.java | 40 +++
.../MultipleTableHiveSourceSplitEnumerator.java | 162 +++++++++++
.../hive/source/state/HiveSourceState.java | 38 +++
.../seatunnel/hive/utils/HiveMetaStoreProxy.java | 42 ++-
.../hive/utils/HiveMetaStoreProxyUtils.java | 13 +-
.../seatunnel/hive/utils/HiveTableUtils.java | 63 +++++
.../seatunnel/hive/utils/HiveTypeConvertor.java | 59 ++++
.../seatunnel/hive/source/TypeConvertTest.java | 51 ----
.../hive/utils/HiveMetaStoreProxyUtilsTest.java | 9 +-
.../hive/utils/HiveTypeConvertorTest.java | 54 ++++
.../connector/ConnectorSpecificationCheckTest.java | 1 +
.../seatunnel-engine-examples/pom.xml | 50 ++++
30 files changed, 1590 insertions(+), 490 deletions(-)
diff --git a/docs/en/connector-v2/sink/Hive.md
b/docs/en/connector-v2/sink/Hive.md
index 245cd5b271..dac4a814c2 100644
--- a/docs/en/connector-v2/sink/Hive.md
+++ b/docs/en/connector-v2/sink/Hive.md
@@ -47,7 +47,7 @@ By default, we use 2PC commit to ensure `exactly-once`
### table_name [string]
-Target Hive table name eg: db1.table1
+Target Hive table name eg: db1.table1, and if the source is multiple mode, you
can use `${database_name}.${table_name}` to generate the table name, it will
replace the `${database_name}` and `${table_name}` with the value of the
CatalogTable generate from the source.
### metastore_uri [string]
@@ -343,6 +343,56 @@ sink {
}
```
+### example 2
+
+We have multiple source table like this:
+
+```bash
+create table test_1(
+)
+PARTITIONED BY (xx);
+
+create table test_2(
+)
+PARTITIONED BY (xx);
+...
+```
+
+We need read data from these source tables and write to another tables:
+
+The job config file can like this:
+
+```
+env {
+ # You can set flink configuration here
+ parallelism = 3
+ job.name="test_hive_source_to_hive"
+}
+
+source {
+ Hive {
+ tables_configs = [
+ {
+ table_name = "test_hive.test_1"
+ metastore_uri = "thrift://ctyun6:9083"
+ },
+ {
+ table_name = "test_hive.test_2"
+ metastore_uri = "thrift://ctyun7:9083"
+ }
+ ]
+ }
+}
+
+sink {
+ # choose stdout output plugin to output data to console
+ Hive {
+ table_name = "${database_name}.${table_name}"
+ metastore_uri = "thrift://ctyun7:9083"
+ }
+}
+```
+
## Changelog
### 2.2.0-beta 2022-09-26
diff --git a/docs/en/connector-v2/source/Hive.md
b/docs/en/connector-v2/source/Hive.md
index 5273ca2ee5..bb7b851409 100644
--- a/docs/en/connector-v2/source/Hive.md
+++ b/docs/en/connector-v2/source/Hive.md
@@ -59,10 +59,6 @@ Hive metastore uri
The path of `hdfs-site.xml`, used to load ha configuration of namenodes
-### hive_site_path [string]
-
-The path of `hive-site.xml`, used to authentication hive metastore
-
### read_partitions [list]
The target partitions that user want to read from hive table, if user does not
set this parameter, it will read all the data from hive table.
@@ -102,6 +98,8 @@ Source plugin common parameters, please refer to [Source
Common Options](common-
## Example
+### Example 1: Single table
+
```bash
Hive {
@@ -111,6 +109,25 @@ Source plugin common parameters, please refer to [Source
Common Options](common-
```
+### Example 2: Multiple tables
+
+```bash
+
+ Hive {
+ tables_configs = [
+ {
+ table_name = "default.seatunnel_orc_1"
+ metastore_uri = "thrift://namenode001:9083"
+ },
+ {
+ table_name = "default.seatunnel_orc_2"
+ metastore_uri = "thrift://namenode001:9083"
+ }
+ ]
+ }
+
+```
+
## Changelog
### 2.2.0-beta 2022-09-26
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
index b268fe612e..eafaedf05d 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
@@ -42,6 +42,8 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
/** Utils contains some common methods for construct CatalogTable. */
@Slf4j
@@ -234,4 +236,41 @@ public class CatalogTableUtil implements Serializable {
public static CatalogTable buildSimpleTextTable() {
return getCatalogTable("default", buildSimpleTextSchema());
}
+
+ public static CatalogTable newCatalogTable(
+ CatalogTable catalogTable, SeaTunnelRowType seaTunnelRowType) {
+ TableSchema tableSchema = catalogTable.getTableSchema();
+
+ Map<String, Column> columnMap =
+ tableSchema.getColumns().stream()
+ .collect(Collectors.toMap(Column::getName,
Function.identity()));
+ String[] fieldNames = seaTunnelRowType.getFieldNames();
+ SeaTunnelDataType<?>[] fieldTypes = seaTunnelRowType.getFieldTypes();
+
+ List<Column> finalColumns = new ArrayList<>();
+ for (int i = 0; i < fieldNames.length; i++) {
+ Column column = columnMap.get(fieldNames[i]);
+ if (column != null) {
+ finalColumns.add(column);
+ } else {
+ finalColumns.add(
+ PhysicalColumn.of(fieldNames[i], fieldTypes[i], 0,
false, null, null));
+ }
+ }
+
+ TableSchema finalSchema =
+ TableSchema.builder()
+ .columns(finalColumns)
+ .primaryKey(tableSchema.getPrimaryKey())
+ .constraintKey(tableSchema.getConstraintKeys())
+ .build();
+
+ return CatalogTable.of(
+ catalogTable.getTableId(),
+ finalSchema,
+ catalogTable.getOptions(),
+ catalogTable.getPartitionKeys(),
+ catalogTable.getComment(),
+ catalogTable.getCatalogName());
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/commit/HiveSinkAggregatedCommitter.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/commit/HiveSinkAggregatedCommitter.java
index 2f2bc972a0..db4340b373 100644
---
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/commit/HiveSinkAggregatedCommitter.java
+++
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/commit/HiveSinkAggregatedCommitter.java
@@ -17,11 +17,11 @@
package org.apache.seatunnel.connectors.seatunnel.hive.commit;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileAggregatedCommitInfo;
import
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileSinkAggregatedCommitter;
+import org.apache.seatunnel.connectors.seatunnel.hive.sink.HiveSinkOptions;
import org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveMetaStoreProxy;
import org.apache.thrift.TException;
@@ -33,33 +33,31 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
-import static
org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig.ABORT_DROP_PARTITION_METADATA;
-
@Slf4j
public class HiveSinkAggregatedCommitter extends FileSinkAggregatedCommitter {
- private final Config pluginConfig;
private final String dbName;
private final String tableName;
private final boolean abortDropPartitionMetadata;
+ private final ReadonlyConfig readonlyConfig;
+
public HiveSinkAggregatedCommitter(
- Config pluginConfig, String dbName, String tableName, HadoopConf
hadoopConf) {
+ ReadonlyConfig readonlyConfig, String dbName, String tableName,
HadoopConf hadoopConf) {
super(hadoopConf);
- this.pluginConfig = pluginConfig;
+ this.readonlyConfig = readonlyConfig;
this.dbName = dbName;
this.tableName = tableName;
this.abortDropPartitionMetadata =
- pluginConfig.hasPath(ABORT_DROP_PARTITION_METADATA.key())
- ?
pluginConfig.getBoolean(ABORT_DROP_PARTITION_METADATA.key())
- : ABORT_DROP_PARTITION_METADATA.defaultValue();
+
readonlyConfig.get(HiveSinkOptions.ABORT_DROP_PARTITION_METADATA);
}
@Override
public List<FileAggregatedCommitInfo> commit(
List<FileAggregatedCommitInfo> aggregatedCommitInfos) throws
IOException {
+
List<FileAggregatedCommitInfo> errorCommitInfos =
super.commit(aggregatedCommitInfos);
if (errorCommitInfos.isEmpty()) {
- HiveMetaStoreProxy hiveMetaStore =
HiveMetaStoreProxy.getInstance(pluginConfig);
+ HiveMetaStoreProxy hiveMetaStore =
HiveMetaStoreProxy.getInstance(readonlyConfig);
try {
for (FileAggregatedCommitInfo aggregatedCommitInfo :
aggregatedCommitInfos) {
Map<String, List<String>> partitionDirAndValuesMap =
@@ -87,7 +85,7 @@ public class HiveSinkAggregatedCommitter extends
FileSinkAggregatedCommitter {
public void abort(List<FileAggregatedCommitInfo> aggregatedCommitInfos)
throws Exception {
super.abort(aggregatedCommitInfos);
if (abortDropPartitionMetadata) {
- HiveMetaStoreProxy hiveMetaStore =
HiveMetaStoreProxy.getInstance(pluginConfig);
+ HiveMetaStoreProxy hiveMetaStore =
HiveMetaStoreProxy.getInstance(readonlyConfig);
for (FileAggregatedCommitInfo aggregatedCommitInfo :
aggregatedCommitInfos) {
Map<String, List<String>> partitionDirAndValuesMap =
aggregatedCommitInfo.getPartitionDirAndValuesMap();
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/BaseHiveOptions.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/BaseHiveOptions.java
new file mode 100644
index 0000000000..efed4e91c5
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/BaseHiveOptions.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.config;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+import
org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions;
+
+public class BaseHiveOptions extends BaseSourceConfigOptions {
+
+ public static final Option<String> TABLE_NAME =
+ Options.key("table_name")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Hive table name");
+
+ public static final Option<String> METASTORE_URI =
+ Options.key("metastore_uri")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Hive metastore uri");
+
+ public static final Option<String> HIVE_SITE_PATH =
+ Options.key("hive_site_path")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The path of hive-site.xml");
+}
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/HiveConfig.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/HiveConfig.java
index 0afadc64d8..714be58619 100644
---
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/HiveConfig.java
+++
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/HiveConfig.java
@@ -17,14 +17,8 @@
package org.apache.seatunnel.connectors.seatunnel.hive.config;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
-import org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveMetaStoreProxy;
-
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.hadoop.hive.metastore.api.Table;
import java.util.HashMap;
import java.util.Map;
@@ -66,29 +60,4 @@ public class HiveConfig {
.noDefaultValue()
.withDescription(
"The specified loading path for the
'core-site.xml', 'hdfs-site.xml' files");
-
- public static final String TEXT_INPUT_FORMAT_CLASSNAME =
- "org.apache.hadoop.mapred.TextInputFormat";
- public static final String TEXT_OUTPUT_FORMAT_CLASSNAME =
- "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat";
- public static final String PARQUET_INPUT_FORMAT_CLASSNAME =
- "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat";
- public static final String PARQUET_OUTPUT_FORMAT_CLASSNAME =
- "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat";
- public static final String ORC_INPUT_FORMAT_CLASSNAME =
- "org.apache.hadoop.hive.ql.io.orc.OrcInputFormat";
- public static final String ORC_OUTPUT_FORMAT_CLASSNAME =
- "org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat";
-
- public static Pair<String[], Table> getTableInfo(Config config) {
- String table = config.getString(TABLE_NAME.key());
- String[] splits = table.split("\\.");
- if (splits.length != 2) {
- throw new RuntimeException("Please config " + TABLE_NAME + " as
db.table format");
- }
- HiveMetaStoreProxy hiveMetaStoreProxy =
HiveMetaStoreProxy.getInstance(config);
- Table tableInformation = hiveMetaStoreProxy.getTable(splits[0],
splits[1]);
- hiveMetaStoreProxy.close();
- return Pair.of(splits, tableInformation);
- }
}
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/HiveConstants.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/HiveConstants.java
new file mode 100644
index 0000000000..8539df68a8
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/HiveConstants.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.config;
+
+public class HiveConstants {
+
+ public static final String CONNECTOR_NAME = "Hive";
+
+ public static final String TEXT_INPUT_FORMAT_CLASSNAME =
+ "org.apache.hadoop.mapred.TextInputFormat";
+ public static final String TEXT_OUTPUT_FORMAT_CLASSNAME =
+ "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat";
+ public static final String PARQUET_INPUT_FORMAT_CLASSNAME =
+ "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat";
+ public static final String PARQUET_OUTPUT_FORMAT_CLASSNAME =
+ "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat";
+ public static final String ORC_INPUT_FORMAT_CLASSNAME =
+ "org.apache.hadoop.hive.ql.io.orc.OrcInputFormat";
+ public static final String ORC_OUTPUT_FORMAT_CLASSNAME =
+ "org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat";
+}
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/exception/HiveConnectorErrorCode.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/exception/HiveConnectorErrorCode.java
index 205a9b5dec..a0923acc1b 100644
---
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/exception/HiveConnectorErrorCode.java
+++
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/exception/HiveConnectorErrorCode.java
@@ -23,7 +23,9 @@ public enum HiveConnectorErrorCode implements
SeaTunnelErrorCode {
GET_HDFS_NAMENODE_HOST_FAILED("HIVE-01", "Get name node host from table
location failed"),
INITIALIZE_HIVE_METASTORE_CLIENT_FAILED("HIVE-02", "Initialize hive
metastore client failed"),
GET_HIVE_TABLE_INFORMATION_FAILED(
- "HIVE-03", "Get hive table information from hive metastore service
failed");
+ "HIVE-03", "Get hive table information from hive metastore service
failed"),
+ HIVE_TABLE_NAME_ERROR("HIVE-04", "Hive table name is invalid"),
+ ;
private final String code;
private final String description;
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
index 078ce83fd1..b91c65de9b 100644
---
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
+++
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
@@ -20,110 +20,78 @@ package
org.apache.seatunnel.connectors.seatunnel.hive.sink;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory;
-import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
+import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.serialization.DefaultSerializer;
+import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
-import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.sink.SupportMultiTableSink;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
-import
org.apache.seatunnel.connectors.seatunnel.file.hdfs.sink.BaseHdfsFileSink;
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileAggregatedCommitInfo;
import
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileCommitInfo;
+import
org.apache.seatunnel.connectors.seatunnel.file.sink.config.FileSinkConfig;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.state.FileSinkState;
+import
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategy;
+import
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategyFactory;
import
org.apache.seatunnel.connectors.seatunnel.hive.commit.HiveSinkAggregatedCommitter;
-import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig;
-import
org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorErrorCode;
+import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConstants;
import
org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorException;
+import
org.apache.seatunnel.connectors.seatunnel.hive.sink.writter.HiveSinkWriter;
+import
org.apache.seatunnel.connectors.seatunnel.hive.source.config.HiveSourceOptions;
import org.apache.seatunnel.connectors.seatunnel.hive.storage.StorageFactory;
+import org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveTableUtils;
-import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Table;
-import com.google.auto.service.AutoService;
-
-import java.net.URI;
-import java.net.URISyntaxException;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
-import static
org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY;
import static
org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig.FIELD_DELIMITER;
import static
org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig.FILE_FORMAT_TYPE;
import static
org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig.FILE_NAME_EXPRESSION;
import static
org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig.FILE_PATH;
-import static
org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig.HAVE_PARTITION;
import static
org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig.IS_PARTITION_FIELD_WRITE_IN_FILE;
import static
org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig.PARTITION_BY;
-import static
org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig.PARTITION_DIR_EXPRESSION;
import static
org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig.ROW_DELIMITER;
import static
org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig.SINK_COLUMNS;
-import static
org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig.METASTORE_URI;
-import static
org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig.ORC_OUTPUT_FORMAT_CLASSNAME;
-import static
org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig.PARQUET_OUTPUT_FORMAT_CLASSNAME;
-import static
org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig.TABLE_NAME;
-import static
org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig.TEXT_OUTPUT_FORMAT_CLASSNAME;
-
-@AutoService(SeaTunnelSink.class)
-public class HiveSink extends BaseHdfsFileSink {
- private String dbName;
- private String tableName;
- private Table tableInformation;
- @Override
- public String getPluginName() {
- return "Hive";
+public class HiveSink
+ implements SeaTunnelSink<
+ SeaTunnelRow, FileSinkState, FileCommitInfo,
FileAggregatedCommitInfo>,
+ SupportMultiTableSink {
+
+ // Since Table might contain some unserializable fields, we need to make
it transient
+ // And use getTableInformation to get the Table object
+ private transient Table tableInformation;
+ private final CatalogTable catalogTable;
+ private final ReadonlyConfig readonlyConfig;
+ private final HadoopConf hadoopConf;
+ private final FileSinkConfig fileSinkConfig;
+ private transient WriteStrategy writeStrategy;
+ private String jobId;
+
+ public HiveSink(ReadonlyConfig readonlyConfig, CatalogTable catalogTable) {
+ this.readonlyConfig = readonlyConfig;
+ this.catalogTable = catalogTable;
+ this.tableInformation = getTableInformation();
+ this.hadoopConf = createHadoopConf(readonlyConfig);
+ this.fileSinkConfig = generateFileSinkConfig(readonlyConfig,
catalogTable);
+ this.writeStrategy = getWriteStrategy();
}
- @Override
- public void prepare(Config pluginConfig) throws PrepareFailException {
- CheckResult result =
- CheckConfigUtil.checkAllExists(pluginConfig,
METASTORE_URI.key(), TABLE_NAME.key());
- if (!result.isSuccess()) {
- throw new HiveConnectorException(
- SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
- String.format(
- "PluginName: %s, PluginType: %s, Message: %s",
- getPluginName(), PluginType.SINK,
result.getMsg()));
- }
- result =
- CheckConfigUtil.checkAtLeastOneExists(
- pluginConfig,
- FILE_FORMAT_TYPE.key(),
- FILE_PATH.key(),
- FIELD_DELIMITER.key(),
- ROW_DELIMITER.key(),
- IS_PARTITION_FIELD_WRITE_IN_FILE.key(),
- PARTITION_DIR_EXPRESSION.key(),
- HAVE_PARTITION.key(),
- SINK_COLUMNS.key(),
- PARTITION_BY.key());
- if (result.isSuccess()) {
- throw new HiveConnectorException(
- SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
- String.format(
- "Hive sink connector does not support these
setting [%s]",
- String.join(
- ",",
- FILE_FORMAT_TYPE.key(),
- FILE_PATH.key(),
- FIELD_DELIMITER.key(),
- ROW_DELIMITER.key(),
- IS_PARTITION_FIELD_WRITE_IN_FILE.key(),
- PARTITION_DIR_EXPRESSION.key(),
- HAVE_PARTITION.key(),
- SINK_COLUMNS.key(),
- PARTITION_BY.key())));
- }
- Pair<String[], Table> tableInfo =
HiveConfig.getTableInfo(pluginConfig);
- dbName = tableInfo.getLeft()[0];
- tableName = tableInfo.getLeft()[1];
- tableInformation = tableInfo.getRight();
+ private FileSinkConfig generateFileSinkConfig(
+ ReadonlyConfig readonlyConfig, CatalogTable catalogTable) {
+ Table tableInformation = getTableInformation();
+ Config pluginConfig = readonlyConfig.toConfig();
List<String> sinkFields =
tableInformation.getSd().getCols().stream()
.map(FieldSchema::getName)
@@ -133,35 +101,42 @@ public class HiveSink extends BaseHdfsFileSink {
.map(FieldSchema::getName)
.collect(Collectors.toList());
sinkFields.addAll(partitionKeys);
- String outputFormat = tableInformation.getSd().getOutputFormat();
- if (TEXT_OUTPUT_FORMAT_CLASSNAME.equals(outputFormat)) {
- Map<String, String> parameters =
- tableInformation.getSd().getSerdeInfo().getParameters();
- pluginConfig =
- pluginConfig
- .withValue(
- FILE_FORMAT_TYPE.key(),
-
ConfigValueFactory.fromAnyRef(FileFormat.TEXT.toString()))
- .withValue(
- FIELD_DELIMITER.key(),
-
ConfigValueFactory.fromAnyRef(parameters.get("field.delim")))
- .withValue(
- ROW_DELIMITER.key(),
-
ConfigValueFactory.fromAnyRef(parameters.get("line.delim")));
- } else if (PARQUET_OUTPUT_FORMAT_CLASSNAME.equals(outputFormat)) {
- pluginConfig =
- pluginConfig.withValue(
- FILE_FORMAT_TYPE.key(),
-
ConfigValueFactory.fromAnyRef(FileFormat.PARQUET.toString()));
- } else if (ORC_OUTPUT_FORMAT_CLASSNAME.equals(outputFormat)) {
- pluginConfig =
- pluginConfig.withValue(
- FILE_FORMAT_TYPE.key(),
-
ConfigValueFactory.fromAnyRef(FileFormat.ORC.toString()));
- } else {
- throw new HiveConnectorException(
- CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
- "Hive connector only support [text parquet orc] table
now");
+
+ FileFormat fileFormat =
HiveTableUtils.parseFileFormat(tableInformation);
+ switch (fileFormat) {
+ case TEXT:
+ Map<String, String> parameters =
+
tableInformation.getSd().getSerdeInfo().getParameters();
+ pluginConfig =
+ pluginConfig
+ .withValue(
+ FILE_FORMAT_TYPE.key(),
+
ConfigValueFactory.fromAnyRef(FileFormat.TEXT.toString()))
+ .withValue(
+ FIELD_DELIMITER.key(),
+ ConfigValueFactory.fromAnyRef(
+ parameters.get("field.delim")))
+ .withValue(
+ ROW_DELIMITER.key(),
+ ConfigValueFactory.fromAnyRef(
+ parameters.get("line.delim")));
+ break;
+ case PARQUET:
+ pluginConfig =
+ pluginConfig.withValue(
+ FILE_FORMAT_TYPE.key(),
+
ConfigValueFactory.fromAnyRef(FileFormat.PARQUET.toString()));
+ break;
+ case ORC:
+ pluginConfig =
+ pluginConfig.withValue(
+ FILE_FORMAT_TYPE.key(),
+
ConfigValueFactory.fromAnyRef(FileFormat.ORC.toString()));
+ break;
+ default:
+ throw new HiveConnectorException(
+ CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
+ "Hive connector only support [text parquet orc] table
now");
}
pluginConfig =
pluginConfig
@@ -178,42 +153,97 @@ public class HiveSink extends BaseHdfsFileSink {
.withValue(SINK_COLUMNS.key(),
ConfigValueFactory.fromAnyRef(sinkFields))
.withValue(
PARTITION_BY.key(),
ConfigValueFactory.fromAnyRef(partitionKeys));
- String hiveSdLocation = tableInformation.getSd().getLocation();
- try {
- /**
- * Build hadoop conf(support s3、cos、oss、hdfs). The returned hadoop
conf can be
- * CosConf、OssConf、S3Conf、HadoopConf so that HadoopFileSystemProxy
can obtain the
- * correct Schema and FsHdfsImpl that can be filled into hadoop
configuration in {@link
- *
org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopFileSystemProxy#createConfiguration()}
- */
- hadoopConf =
- StorageFactory.getStorageType(hiveSdLocation)
- .buildHadoopConfWithReadOnlyConfig(
- ReadonlyConfig.fromConfig(pluginConfig));
- String path = new URI(hiveSdLocation).getPath();
- pluginConfig =
- pluginConfig
- .withValue(FILE_PATH.key(),
ConfigValueFactory.fromAnyRef(path))
- .withValue(
- FS_DEFAULT_NAME_KEY,
-
ConfigValueFactory.fromAnyRef(hadoopConf.getHdfsNameKey()));
- } catch (URISyntaxException e) {
- String errorMsg =
- String.format(
- "Get hdfs namenode host from table location [%s]
failed,"
- + "please check it",
- hiveSdLocation);
- throw new HiveConnectorException(
- HiveConnectorErrorCode.GET_HDFS_NAMENODE_HOST_FAILED,
errorMsg, e);
- }
- this.pluginConfig = pluginConfig;
- super.prepare(pluginConfig);
+
+ return new FileSinkConfig(pluginConfig,
catalogTable.getSeaTunnelRowType());
+ }
+
+ @Override
+ public String getPluginName() {
+ return HiveConstants.CONNECTOR_NAME;
}
@Override
public Optional<SinkAggregatedCommitter<FileCommitInfo,
FileAggregatedCommitInfo>>
createAggregatedCommitter() {
return Optional.of(
- new HiveSinkAggregatedCommitter(pluginConfig, dbName,
tableName, hadoopConf));
+ new HiveSinkAggregatedCommitter(
+ readonlyConfig,
+ getTableInformation().getDbName(),
+ getTableInformation().getTableName(),
+ hadoopConf));
+ }
+
+ @Override
+ public void setJobContext(JobContext jobContext) {
+ this.jobId = jobContext.getJobId();
+ }
+
+ @Override
+ public SinkWriter<SeaTunnelRow, FileCommitInfo, FileSinkState>
restoreWriter(
+ SinkWriter.Context context, List<FileSinkState> states) {
+ return new HiveSinkWriter(getWriteStrategy(), hadoopConf, context,
jobId, states);
+ }
+
+ @Override
+ public SinkWriter<SeaTunnelRow, FileCommitInfo, FileSinkState>
createWriter(
+ SinkWriter.Context context) {
+ return new HiveSinkWriter(getWriteStrategy(), hadoopConf, context,
jobId);
+ }
+
+ @Override
+ public Optional<Serializer<FileCommitInfo>> getCommitInfoSerializer() {
+ return Optional.of(new DefaultSerializer<>());
+ }
+
+ @Override
+ public Optional<Serializer<FileAggregatedCommitInfo>>
getAggregatedCommitInfoSerializer() {
+ return Optional.of(new DefaultSerializer<>());
+ }
+
+ @Override
+ public Optional<Serializer<FileSinkState>> getWriterStateSerializer() {
+ return Optional.of(new DefaultSerializer<>());
+ }
+
+ private HadoopConf createHadoopConf(ReadonlyConfig readonlyConfig) {
+ String hdfsLocation = getTableInformation().getSd().getLocation();
+
+ /**
+ * Build hadoop conf(support s3、cos、oss、hdfs). The returned hadoop
conf can be
+ * CosConf、OssConf、S3Conf、HadoopConf so that HadoopFileSystemProxy can
obtain the correct
+ * Schema and FsHdfsImpl that can be filled into hadoop configuration
in {@link
+ *
org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopFileSystemProxy#createConfiguration()}
+ */
+ HadoopConf hadoopConf =
+ StorageFactory.getStorageType(hdfsLocation)
+ .buildHadoopConfWithReadOnlyConfig(readonlyConfig);
+ readonlyConfig
+ .getOptional(HiveSourceOptions.HDFS_SITE_PATH)
+ .ifPresent(hadoopConf::setHdfsSitePath);
+ readonlyConfig
+ .getOptional(HiveSourceOptions.REMOTE_USER)
+ .ifPresent(hadoopConf::setRemoteUser);
+ readonlyConfig
+ .getOptional(HiveSourceOptions.KERBEROS_PRINCIPAL)
+ .ifPresent(hadoopConf::setKerberosPrincipal);
+ readonlyConfig
+ .getOptional(HiveSourceOptions.KERBEROS_KEYTAB_PATH)
+ .ifPresent(hadoopConf::setKerberosKeytabPath);
+ return hadoopConf;
+ }
+
+ private Table getTableInformation() {
+ if (tableInformation == null) {
+ tableInformation = HiveTableUtils.getTableInfo(readonlyConfig);
+ }
+ return tableInformation;
+ }
+
+ private WriteStrategy getWriteStrategy() {
+ if (writeStrategy == null) {
+ writeStrategy =
WriteStrategyFactory.of(fileSinkConfig.getFileFormat(), fileSinkConfig);
+
writeStrategy.setSeaTunnelRowTypeInfo(catalogTable.getSeaTunnelRowType());
+ }
+ return writeStrategy;
}
}
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkFactory.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkFactory.java
index e40864517f..e53aed86fc 100644
---
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkFactory.java
@@ -17,19 +17,32 @@
package org.apache.seatunnel.connectors.seatunnel.hive.sink;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.sink.SinkReplaceNameConstant;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig;
+import
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileAggregatedCommitInfo;
+import
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileCommitInfo;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.state.FileSinkState;
import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig;
+import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConstants;
import com.google.auto.service.AutoService;
+import java.util.HashMap;
+import java.util.Map;
+
@AutoService(Factory.class)
-public class HiveSinkFactory implements TableSinkFactory {
- @Override
- public String factoryIdentifier() {
- return "Hive";
- }
+public class HiveSinkFactory
+ implements TableSinkFactory<
+ SeaTunnelRow, FileSinkState, FileCommitInfo,
FileAggregatedCommitInfo> {
@Override
public OptionRule optionRule() {
@@ -37,8 +50,70 @@ public class HiveSinkFactory implements TableSinkFactory {
.required(HiveConfig.TABLE_NAME)
.required(HiveConfig.METASTORE_URI)
.optional(HiveConfig.ABORT_DROP_PARTITION_METADATA)
+ .optional(BaseSinkConfig.KERBEROS_PRINCIPAL)
+ .optional(BaseSinkConfig.KERBEROS_KEYTAB_PATH)
+ .optional(BaseSinkConfig.REMOTE_USER)
.optional(HiveConfig.HADOOP_CONF)
.optional(HiveConfig.HADOOP_CONF_PATH)
.build();
}
+
+ @Override
+ public TableSink<SeaTunnelRow, FileSinkState, FileCommitInfo,
FileAggregatedCommitInfo>
+ createSink(TableSinkFactoryContext context) {
+ ReadonlyConfig readonlyConfig = context.getOptions();
+ CatalogTable catalogTable = context.getCatalogTable();
+
+ ReadonlyConfig finalReadonlyConfig =
+ generateCurrentReadonlyConfig(readonlyConfig, catalogTable);
+ return () -> new HiveSink(finalReadonlyConfig, catalogTable);
+ }
+
+ @Override
+ public String factoryIdentifier() {
+ return HiveConstants.CONNECTOR_NAME;
+ }
+
+ private ReadonlyConfig generateCurrentReadonlyConfig(
+ ReadonlyConfig readonlyConfig, CatalogTable catalogTable) {
+
+ Map<String, String> configMap = readonlyConfig.toMap();
+
+ readonlyConfig
+ .getOptional(HiveSinkOptions.TABLE_NAME)
+ .ifPresent(
+ tableName -> {
+ String replacedPath =
+ replaceCatalogTableInPath(tableName,
catalogTable);
+ configMap.put(HiveSinkOptions.TABLE_NAME.key(),
replacedPath);
+ });
+
+ return ReadonlyConfig.fromMap(new HashMap<>(configMap));
+ }
+
+ private String replaceCatalogTableInPath(String originTableName,
CatalogTable catalogTable) {
+ String tableName = originTableName;
+ TableIdentifier tableIdentifier = catalogTable.getTableId();
+ if (tableIdentifier != null) {
+ if (tableIdentifier.getDatabaseName() != null) {
+ tableName =
+ tableName.replace(
+
SinkReplaceNameConstant.REPLACE_DATABASE_NAME_KEY,
+ tableIdentifier.getDatabaseName());
+ }
+ if (tableIdentifier.getSchemaName() != null) {
+ tableName =
+ tableName.replace(
+
SinkReplaceNameConstant.REPLACE_SCHEMA_NAME_KEY,
+ tableIdentifier.getSchemaName());
+ }
+ if (tableIdentifier.getTableName() != null) {
+ tableName =
+ tableName.replace(
+ SinkReplaceNameConstant.REPLACE_TABLE_NAME_KEY,
+ tableIdentifier.getTableName());
+ }
+ }
+ return tableName;
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkFactory.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkOptions.java
similarity index 50%
copy from
seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkFactory.java
copy to
seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkOptions.java
index e40864517f..a241717a44 100644
---
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkOptions.java
@@ -17,28 +17,16 @@
package org.apache.seatunnel.connectors.seatunnel.hive.sink;
-import org.apache.seatunnel.api.configuration.util.OptionRule;
-import org.apache.seatunnel.api.table.factory.Factory;
-import org.apache.seatunnel.api.table.factory.TableSinkFactory;
-import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig;
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.connectors.seatunnel.hive.config.BaseHiveOptions;
-import com.google.auto.service.AutoService;
+public class HiveSinkOptions extends BaseHiveOptions {
-@AutoService(Factory.class)
-public class HiveSinkFactory implements TableSinkFactory {
- @Override
- public String factoryIdentifier() {
- return "Hive";
- }
-
- @Override
- public OptionRule optionRule() {
- return OptionRule.builder()
- .required(HiveConfig.TABLE_NAME)
- .required(HiveConfig.METASTORE_URI)
- .optional(HiveConfig.ABORT_DROP_PARTITION_METADATA)
- .optional(HiveConfig.HADOOP_CONF)
- .optional(HiveConfig.HADOOP_CONF_PATH)
- .build();
- }
+ public static final Option<Boolean> ABORT_DROP_PARTITION_METADATA =
+ Options.key("abort_drop_partition_metadata")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Flag to decide whether to drop partition metadata
from Hive Metastore during an abort operation. Note: this only affects the
metadata in the metastore, the data in the partition will always be
deleted(data generated during the synchronization process).");
}
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/writter/HiveSinkWriter.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/writter/HiveSinkWriter.java
new file mode 100644
index 0000000000..a00f559082
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/writter/HiveSinkWriter.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.sink.writter;
+
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.BaseFileSinkWriter;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.state.FileSinkState;
+import
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategy;
+
+import java.util.Collections;
+import java.util.List;
+
+public class HiveSinkWriter extends BaseFileSinkWriter
+ implements SupportMultiTableSinkWriter<WriteStrategy> {
+
+ public HiveSinkWriter(
+ WriteStrategy writeStrategy,
+ HadoopConf hadoopConf,
+ Context context,
+ String jobId,
+ List<FileSinkState> fileSinkStates) {
+ // todo: do we need to set writeStrategy as share resource? then how
to deal with the pre
+ // fileSinkStates?
+ super(writeStrategy, hadoopConf, context, jobId, fileSinkStates);
+ }
+
+ public HiveSinkWriter(
+ WriteStrategy writeStrategy,
+ HadoopConf hadoopConf,
+ SinkWriter.Context context,
+ String jobId) {
+ this(writeStrategy, hadoopConf, context, jobId,
Collections.emptyList());
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSource.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSource.java
index f7642e3611..00f2bd0a8a 100644
---
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSource.java
+++
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSource.java
@@ -17,213 +17,67 @@
package org.apache.seatunnel.connectors.seatunnel.hive.source;
-import
org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
-import org.apache.seatunnel.shade.com.typesafe.config.ConfigRenderOptions;
-import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory;
-
-import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
-import org.apache.seatunnel.api.source.SeaTunnelSource;
-import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
-import org.apache.seatunnel.api.table.type.SqlType;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
-import org.apache.seatunnel.common.constants.PluginType;
-import org.apache.seatunnel.common.exception.CommonError;
-import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
-import org.apache.seatunnel.common.utils.JsonUtils;
-import
org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions;
-import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import
org.apache.seatunnel.connectors.seatunnel.file.hdfs.source.BaseHdfsFileSource;
-import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig;
-import
org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorErrorCode;
-import
org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorException;
-import org.apache.seatunnel.connectors.seatunnel.hive.storage.StorageFactory;
-
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.Table;
+import
org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSourceSplit;
+import
org.apache.seatunnel.connectors.seatunnel.file.source.state.FileSourceState;
+import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConstants;
+import
org.apache.seatunnel.connectors.seatunnel.hive.source.config.HiveSourceConfig;
+import
org.apache.seatunnel.connectors.seatunnel.hive.source.config.MultipleTableHiveSourceConfig;
+import
org.apache.seatunnel.connectors.seatunnel.hive.source.reader.MultipleTableHiveSourceReader;
+import
org.apache.seatunnel.connectors.seatunnel.hive.source.split.MultipleTableHiveSourceSplitEnumerator;
-import com.google.auto.service.AutoService;
-
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.LinkedHashMap;
import java.util.List;
-import java.util.Map;
-
-import static
org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY;
-import static
org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions.FILE_FORMAT_TYPE;
-import static
org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions.FILE_PATH;
-import static
org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig.ORC_INPUT_FORMAT_CLASSNAME;
-import static
org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig.PARQUET_INPUT_FORMAT_CLASSNAME;
-import static
org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig.TEXT_INPUT_FORMAT_CLASSNAME;
+import java.util.stream.Collectors;
-@AutoService(SeaTunnelSource.class)
public class HiveSource extends BaseHdfsFileSource {
- private Table tableInformation;
+
+ private final MultipleTableHiveSourceConfig multipleTableHiveSourceConfig;
+
+ public HiveSource(ReadonlyConfig readonlyConfig) {
+ this.multipleTableHiveSourceConfig = new
MultipleTableHiveSourceConfig(readonlyConfig);
+ }
@Override
public String getPluginName() {
- return "Hive";
+ return HiveConstants.CONNECTOR_NAME;
}
@Override
- public void prepare(Config pluginConfig) throws PrepareFailException {
- CheckResult result =
- CheckConfigUtil.checkAllExists(
- pluginConfig, HiveConfig.METASTORE_URI.key(),
HiveConfig.TABLE_NAME.key());
- if (!result.isSuccess()) {
- throw new HiveConnectorException(
- SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
- String.format(
- "PluginName: %s, PluginType: %s, Message: %s",
- getPluginName(), PluginType.SOURCE,
result.getMsg()));
- }
- result =
- CheckConfigUtil.checkAtLeastOneExists(
- pluginConfig,
- TableSchemaOptions.SCHEMA.key(),
- FILE_FORMAT_TYPE.key(),
- FILE_PATH.key(),
- FS_DEFAULT_NAME_KEY);
- if (result.isSuccess()) {
- throw new HiveConnectorException(
- SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
- String.format(
- "Hive source connector does not support these
setting [%s]",
- String.join(
- ",",
- TableSchemaOptions.SCHEMA.key(),
- FILE_FORMAT_TYPE.key(),
- FILE_PATH.key(),
- FS_DEFAULT_NAME_KEY)));
- }
- if
(pluginConfig.hasPath(BaseSourceConfigOptions.READ_PARTITIONS.key())) {
- // verify partition list
- List<String> partitionsList =
-
pluginConfig.getStringList(BaseSourceConfigOptions.READ_PARTITIONS.key());
- if (partitionsList.isEmpty()) {
- throw new HiveConnectorException(
- SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
- "Partitions list is empty, please check");
- }
- int depth = partitionsList.get(0).replaceAll("\\\\",
"/").split("/").length;
- long count =
- partitionsList.stream()
- .map(partition -> partition.replaceAll("\\\\",
"/").split("/").length)
- .filter(length -> length != depth)
- .count();
- if (count > 0) {
- throw new HiveConnectorException(
- SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
- "Every partition that in partition list should has the
same directory depth");
- }
- }
- Pair<String[], Table> tableInfo =
HiveConfig.getTableInfo(pluginConfig);
- tableInformation = tableInfo.getRight();
- String inputFormat = tableInformation.getSd().getInputFormat();
- if (TEXT_INPUT_FORMAT_CLASSNAME.equals(inputFormat)) {
- pluginConfig =
- pluginConfig.withValue(
- FILE_FORMAT_TYPE.key(),
-
ConfigValueFactory.fromAnyRef(FileFormat.TEXT.toString()));
- // Build schema from hive table information
- // Because the entrySet in typesafe config couldn't keep key-value
order
- // So use jackson to keep key-value order
- Map<String, Object> schema = parseSchema(tableInformation);
- ConfigRenderOptions options = ConfigRenderOptions.concise();
- String render = pluginConfig.root().render(options);
- ObjectNode jsonNodes = JsonUtils.parseObject(render);
- jsonNodes.putPOJO(TableSchemaOptions.SCHEMA.key(), schema);
- pluginConfig = ConfigFactory.parseString(jsonNodes.toString());
- } else if (PARQUET_INPUT_FORMAT_CLASSNAME.equals(inputFormat)) {
- pluginConfig =
- pluginConfig.withValue(
- FILE_FORMAT_TYPE.key(),
-
ConfigValueFactory.fromAnyRef(FileFormat.PARQUET.toString()));
- } else if (ORC_INPUT_FORMAT_CLASSNAME.equals(inputFormat)) {
- pluginConfig =
- pluginConfig.withValue(
- FILE_FORMAT_TYPE.key(),
-
ConfigValueFactory.fromAnyRef(FileFormat.ORC.toString()));
- } else {
- throw new HiveConnectorException(
- CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
- "Hive connector only support [text parquet orc] table
now");
- }
- String hiveSdLocation = tableInformation.getSd().getLocation();
- try {
- URI uri = new URI(hiveSdLocation);
- String path = uri.getPath();
- String defaultFs = hiveSdLocation.replace(path, "");
- pluginConfig =
- pluginConfig
- .withValue(
- BaseSourceConfigOptions.FILE_PATH.key(),
- ConfigValueFactory.fromAnyRef(path))
- .withValue(
- FS_DEFAULT_NAME_KEY,
ConfigValueFactory.fromAnyRef(defaultFs));
- /**
- * Build hadoop conf(support s3、cos、oss、hdfs). The returned hadoop
conf can be
- * CosConf、OssConf、S3Conf、HadoopConf so that HadoopFileSystemProxy
can obtain the
- * correct Schema and FsHdfsImpl that can be filled into hadoop
configuration in {@link
- *
org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopFileSystemProxy#createConfiguration()}
- */
- hadoopConf =
- StorageFactory.getStorageType(hiveSdLocation)
- .buildHadoopConfWithReadOnlyConfig(
- ReadonlyConfig.fromConfig(pluginConfig));
- } catch (URISyntaxException e) {
- String errorMsg =
- String.format(
- "Get hdfs namenode host from table location [%s]
failed,"
- + "please check it",
- hiveSdLocation);
- throw new HiveConnectorException(
- HiveConnectorErrorCode.GET_HDFS_NAMENODE_HOST_FAILED,
errorMsg, e);
- }
- super.prepare(pluginConfig);
+ public Boundedness getBoundedness() {
+ return Boundedness.BOUNDED;
}
- private Map<String, Object> parseSchema(Table table) {
- LinkedHashMap<String, Object> fields = new LinkedHashMap<>();
- LinkedHashMap<String, Object> schema = new LinkedHashMap<>();
- List<FieldSchema> cols = table.getSd().getCols();
- for (FieldSchema col : cols) {
- String name = col.getName();
- String type = col.getType();
- fields.put(name, covertHiveTypeToSeaTunnelType(name, type));
- }
- schema.put("fields", fields);
- return schema;
+ @Override
+ public List<CatalogTable> getProducedCatalogTables() {
+ return multipleTableHiveSourceConfig.getHiveSourceConfigs().stream()
+ .map(HiveSourceConfig::getCatalogTable)
+ .collect(Collectors.toList());
}
- private Object covertHiveTypeToSeaTunnelType(String name, String hiveType)
{
- if (hiveType.contains("varchar")) {
- return SqlType.STRING;
- }
- if (hiveType.contains("char")) {
- throw CommonError.convertToSeaTunnelTypeError(
- getPluginName(), PluginType.SOURCE, hiveType, name);
- }
- if (hiveType.contains("binary")) {
- return SqlType.BYTES.name();
- }
- if (hiveType.contains("struct")) {
- LinkedHashMap<String, Object> fields = new LinkedHashMap<>();
- int start = hiveType.indexOf("<");
- int end = hiveType.lastIndexOf(">");
- String[] columns = hiveType.substring(start + 1, end).split(",");
- for (String column : columns) {
- String[] splits = column.split(":");
- fields.put(splits[0], covertHiveTypeToSeaTunnelType(splits[0],
splits[1]));
- }
- return fields;
- }
- return hiveType;
+ @Override
+ public SourceReader<SeaTunnelRow, FileSourceSplit> createReader(
+ SourceReader.Context readerContext) {
+ return new MultipleTableHiveSourceReader(readerContext,
multipleTableHiveSourceConfig);
+ }
+
+ @Override
+ public SourceSplitEnumerator<FileSourceSplit, FileSourceState>
createEnumerator(
+ SourceSplitEnumerator.Context<FileSourceSplit> enumeratorContext) {
+ return new MultipleTableHiveSourceSplitEnumerator(
+ enumeratorContext, multipleTableHiveSourceConfig);
+ }
+
+ @Override
+ public SourceSplitEnumerator<FileSourceSplit, FileSourceState>
restoreEnumerator(
+ SourceSplitEnumerator.Context<FileSourceSplit> enumeratorContext,
+ FileSourceState checkpointState) {
+ return new MultipleTableHiveSourceSplitEnumerator(
+ enumeratorContext, multipleTableHiveSourceConfig,
checkpointState);
}
}
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSourceFactory.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSourceFactory.java
index afa1ae0e36..07adfef106 100644
---
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSourceFactory.java
@@ -19,27 +19,44 @@ package
org.apache.seatunnel.connectors.seatunnel.hive.source;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.table.connector.TableSource;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
import
org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions;
import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig;
+import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConstants;
+import
org.apache.seatunnel.connectors.seatunnel.hive.source.config.HiveSourceOptions;
import com.google.auto.service.AutoService;
+import java.io.Serializable;
+
@AutoService(Factory.class)
public class HiveSourceFactory implements TableSourceFactory {
@Override
public String factoryIdentifier() {
- return "Hive";
+ return HiveConstants.CONNECTOR_NAME;
+ }
+
+ @Override
+ public <T, SplitT extends SourceSplit, StateT extends Serializable>
+ TableSource<T, SplitT, StateT>
createSource(TableSourceFactoryContext context) {
+ return () -> (SeaTunnelSource<T, SplitT, StateT>) new
HiveSource(context.getOptions());
}
@Override
public OptionRule optionRule() {
return OptionRule.builder()
- .required(HiveConfig.TABLE_NAME)
- .required(HiveConfig.METASTORE_URI)
+ .optional(HiveConfig.TABLE_NAME)
+ .optional(HiveConfig.METASTORE_URI)
+ .optional(HiveSourceOptions.TABLE_CONFIGS)
.optional(BaseSourceConfigOptions.READ_PARTITIONS)
.optional(BaseSourceConfigOptions.READ_COLUMNS)
+ .optional(BaseSourceConfigOptions.KERBEROS_PRINCIPAL)
+ .optional(BaseSourceConfigOptions.KERBEROS_KEYTAB_PATH)
+ .optional(BaseSourceConfigOptions.REMOTE_USER)
.optional(HiveConfig.HADOOP_CONF)
.optional(HiveConfig.HADOOP_CONF_PATH)
.build();
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/config/HiveSourceConfig.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/config/HiveSourceConfig.java
new file mode 100644
index 0000000000..203491dcf9
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/config/HiveSourceConfig.java
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.source.config;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory;
+
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
+import
org.apache.seatunnel.connectors.seatunnel.file.hdfs.source.config.HdfsSourceConfigOptions;
+import
org.apache.seatunnel.connectors.seatunnel.file.source.reader.ReadStrategy;
+import
org.apache.seatunnel.connectors.seatunnel.file.source.reader.ReadStrategyFactory;
+import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConstants;
+import
org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorException;
+import org.apache.seatunnel.connectors.seatunnel.hive.storage.StorageFactory;
+import org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveTableUtils;
+import org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveTypeConvertor;
+
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Table;
+
+import lombok.Getter;
+import lombok.SneakyThrows;
+
+import java.io.Serializable;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static
org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig.FIELD_DELIMITER;
+import static
org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig.FILE_FORMAT_TYPE;
+import static
org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig.ROW_DELIMITER;
+
+@Getter
+public class HiveSourceConfig implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private final Table table;
+ private final CatalogTable catalogTable;
+ private final FileFormat fileFormat;
+ private final ReadStrategy readStrategy;
+ private final List<String> filePaths;
+ private final HadoopConf hadoopConf;
+
+ @SneakyThrows
+ public HiveSourceConfig(ReadonlyConfig readonlyConfig) {
+ readonlyConfig
+ .getOptional(HdfsSourceConfigOptions.READ_PARTITIONS)
+ .ifPresent(this::validatePartitions);
+ this.table = HiveTableUtils.getTableInfo(readonlyConfig);
+ this.hadoopConf = parseHiveHadoopConfig(readonlyConfig, table);
+ this.fileFormat = HiveTableUtils.parseFileFormat(table);
+ this.readStrategy = parseReadStrategy(table, readonlyConfig,
fileFormat, hadoopConf);
+ this.filePaths = parseFilePaths(table, readStrategy);
+ this.catalogTable =
+ parseCatalogTable(
+ readonlyConfig, readStrategy, fileFormat, hadoopConf,
filePaths, table);
+ }
+
+ private void validatePartitions(List<String> partitionsList) {
+ if (CollectionUtils.isEmpty(partitionsList)) {
+ throw new HiveConnectorException(
+ SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+ "Partitions list is empty, please check");
+ }
+ int depth = partitionsList.get(0).replaceAll("\\\\",
"/").split("/").length;
+ long count =
+ partitionsList.stream()
+ .map(partition -> partition.replaceAll("\\\\",
"/").split("/").length)
+ .filter(length -> length != depth)
+ .count();
+ if (count > 0) {
+ throw new HiveConnectorException(
+ SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+ "Every partition that in partition list should has the
same directory depth");
+ }
+ }
+
+ private ReadStrategy parseReadStrategy(
+ Table table,
+ ReadonlyConfig readonlyConfig,
+ FileFormat fileFormat,
+ HadoopConf hadoopConf) {
+
+ ReadStrategy readStrategy = ReadStrategyFactory.of(fileFormat.name());
+ Config config = readonlyConfig.toConfig();
+
+ switch (fileFormat) {
+ case TEXT:
+ // if the file format is text, we set the delim.
+ Map<String, String> parameters =
table.getSd().getSerdeInfo().getParameters();
+ config =
+ config.withValue(
+ FIELD_DELIMITER.key(),
+ ConfigValueFactory.fromAnyRef(
+ parameters.get("field.delim")))
+ .withValue(
+ ROW_DELIMITER.key(),
+
ConfigValueFactory.fromAnyRef(parameters.get("line.delim")))
+ .withValue(
+ FILE_FORMAT_TYPE.key(),
+
ConfigValueFactory.fromAnyRef(FileFormat.TEXT.name()));
+ break;
+ case ORC:
+ config =
+ config.withValue(
+ FILE_FORMAT_TYPE.key(),
+
ConfigValueFactory.fromAnyRef(FileFormat.ORC.name()));
+ break;
+ case PARQUET:
+ config =
+ config.withValue(
+ FILE_FORMAT_TYPE.key(),
+
ConfigValueFactory.fromAnyRef(FileFormat.PARQUET.name()));
+ break;
+ default:
+ }
+ readStrategy.setPluginConfig(config);
+ readStrategy.init(hadoopConf);
+ return readStrategy;
+ }
+
+ private HadoopConf parseHiveHadoopConfig(ReadonlyConfig readonlyConfig,
Table table) {
+ String hiveSdLocation = table.getSd().getLocation();
+ /**
+ * Build hadoop conf(support s3、cos、oss、hdfs). The returned hadoop
conf can be
+ * CosConf、OssConf、S3Conf、HadoopConf so that HadoopFileSystemProxy can
obtain the correct
+ * Schema and FsHdfsImpl that can be filled into hadoop configuration
in {@link
+ *
org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopFileSystemProxy#createConfiguration()}
+ */
+ HadoopConf hadoopConf =
+ StorageFactory.getStorageType(hiveSdLocation)
+ .buildHadoopConfWithReadOnlyConfig(readonlyConfig);
+ readonlyConfig
+ .getOptional(HdfsSourceConfigOptions.HDFS_SITE_PATH)
+ .ifPresent(hadoopConf::setHdfsSitePath);
+ readonlyConfig
+ .getOptional(HdfsSourceConfigOptions.KERBEROS_PRINCIPAL)
+ .ifPresent(hadoopConf::setKerberosPrincipal);
+ readonlyConfig
+ .getOptional(HdfsSourceConfigOptions.KERBEROS_KEYTAB_PATH)
+ .ifPresent(hadoopConf::setKerberosKeytabPath);
+ readonlyConfig
+ .getOptional(HdfsSourceConfigOptions.REMOTE_USER)
+ .ifPresent(hadoopConf::setRemoteUser);
+ return hadoopConf;
+ }
+
+ private List<String> parseFilePaths(Table table, ReadStrategy
readStrategy) {
+ String hdfsPath = parseHdfsPath(table);
+ try {
+ return readStrategy.getFileNamesByPath(hdfsPath);
+ } catch (Exception e) {
+ String errorMsg = String.format("Get file list from this path [%s]
failed", hdfsPath);
+ throw new FileConnectorException(
+ FileConnectorErrorCode.FILE_LIST_GET_FAILED, errorMsg, e);
+ }
+ }
+
+ private String parseFsDefaultName(Table table) {
+ String hdfsLocation = table.getSd().getLocation();
+ try {
+ URI uri = new URI(hdfsLocation);
+ String path = uri.getPath();
+ return hdfsLocation.replace(path, "");
+ } catch (URISyntaxException e) {
+ String errorMsg =
+ String.format(
+ "Get hdfs namenode host from table location [%s]
failed,"
+ + "please check it",
+ hdfsLocation);
+ throw new HiveConnectorException(
+ HiveConnectorErrorCode.GET_HDFS_NAMENODE_HOST_FAILED,
errorMsg, e);
+ }
+ }
+
+ private String parseHdfsPath(Table table) {
+ String hdfsLocation = table.getSd().getLocation();
+ try {
+ URI uri = new URI(hdfsLocation);
+ return uri.getPath();
+ } catch (URISyntaxException e) {
+ String errorMsg =
+ String.format(
+ "Get hdfs namenode host from table location [%s]
failed,"
+ + "please check it",
+ hdfsLocation);
+ throw new HiveConnectorException(
+ HiveConnectorErrorCode.GET_HDFS_NAMENODE_HOST_FAILED,
errorMsg, e);
+ }
+ }
+
+ private CatalogTable parseCatalogTable(
+ ReadonlyConfig readonlyConfig,
+ ReadStrategy readStrategy,
+ FileFormat fileFormat,
+ HadoopConf hadoopConf,
+ List<String> filePaths,
+ Table table) {
+ switch (fileFormat) {
+ case PARQUET:
+ case ORC:
+ return parseCatalogTableFromRemotePath(
+ readonlyConfig, hadoopConf, filePaths, table);
+ case TEXT:
+ return parseCatalogTableFromTable(readonlyConfig,
readStrategy, table);
+ default:
+ throw new HiveConnectorException(
+ CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
+ "Hive connector only support [text parquet orc] table
now");
+ }
+ }
+
+ private CatalogTable parseCatalogTableFromRemotePath(
+ ReadonlyConfig readonlyConfig,
+ HadoopConf hadoopConf,
+ List<String> filePaths,
+ Table table) {
+ if (CollectionUtils.isEmpty(filePaths)) {
+ // When the directory is empty, distribute default behavior schema
+ return buildEmptyCatalogTable(readonlyConfig, table);
+ }
+ CatalogTable catalogTable = buildEmptyCatalogTable(readonlyConfig,
table);
+ try {
+ SeaTunnelRowType seaTunnelRowTypeInfo =
+ readStrategy.getSeaTunnelRowTypeInfo(filePaths.get(0));
+ return CatalogTableUtil.newCatalogTable(catalogTable,
seaTunnelRowTypeInfo);
+ } catch (FileConnectorException e) {
+ String errorMsg =
+ String.format("Get table schema from file [%s] failed",
filePaths.get(0));
+ throw new FileConnectorException(
+ CommonErrorCodeDeprecated.TABLE_SCHEMA_GET_FAILED,
errorMsg, e);
+ }
+ }
+
+ private CatalogTable parseCatalogTableFromTable(
+ ReadonlyConfig readonlyConfig, ReadStrategy readStrategy, Table
table) {
+ List<FieldSchema> cols = table.getSd().getCols();
+ String[] fieldNames = new String[cols.size()];
+ SeaTunnelDataType<?>[] fieldTypes = new SeaTunnelDataType[cols.size()];
+ for (int i = 0; i < cols.size(); i++) {
+ FieldSchema col = cols.get(i);
+ fieldNames[i] = col.getName();
+ fieldTypes[i] =
+
HiveTypeConvertor.covertHiveTypeToSeaTunnelType(col.getName(), col.getType());
+ }
+
+ SeaTunnelRowType seaTunnelRowType = new SeaTunnelRowType(fieldNames,
fieldTypes);
+ readStrategy.setSeaTunnelRowTypeInfo(seaTunnelRowType);
+ final SeaTunnelRowType finalSeatunnelRowType =
readStrategy.getActualSeaTunnelRowTypeInfo();
+
+ CatalogTable catalogTable = buildEmptyCatalogTable(readonlyConfig,
table);
+ return CatalogTableUtil.newCatalogTable(catalogTable,
finalSeatunnelRowType);
+ }
+
+ private CatalogTable buildEmptyCatalogTable(ReadonlyConfig readonlyConfig,
Table table) {
+ TablePath tablePath = TablePath.of(table.getDbName(),
table.getTableName());
+ return CatalogTable.of(
+ TableIdentifier.of(HiveConstants.CONNECTOR_NAME, tablePath),
+ TableSchema.builder().build(),
+ new HashMap<>(),
+ new ArrayList<>(),
+
readonlyConfig.get(TableSchemaOptions.TableIdentifierOptions.COMMENT));
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/config/HiveSourceOptions.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/config/HiveSourceOptions.java
new file mode 100644
index 0000000000..c30cb1783d
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/config/HiveSourceOptions.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.source.config;
+
+import
org.apache.seatunnel.shade.com.fasterxml.jackson.core.type.TypeReference;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.connectors.seatunnel.hive.config.BaseHiveOptions;
+
+import java.util.List;
+import java.util.Map;
+
+public class HiveSourceOptions extends BaseHiveOptions {
+ public static final Option<List<Map<String, Object>>> TABLE_CONFIGS =
+ Options.key("tables_configs")
+ .type(new TypeReference<List<Map<String, Object>>>() {})
+ .noDefaultValue()
+ .withDescription(
+ "Local file source configs, used to create
multiple local file source.");
+}
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/config/MultipleTableHiveSourceConfig.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/config/MultipleTableHiveSourceConfig.java
new file mode 100644
index 0000000000..9db899ca8c
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/config/MultipleTableHiveSourceConfig.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.source.config;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+
+import com.google.common.collect.Lists;
+import lombok.Getter;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class MultipleTableHiveSourceConfig implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ @Getter private List<HiveSourceConfig> hiveSourceConfigs;
+
+ public MultipleTableHiveSourceConfig(ReadonlyConfig readonlyConfig) {
+ if
(readonlyConfig.getOptional(HiveSourceOptions.TABLE_CONFIGS).isPresent()) {
+ parseFromLocalFileSourceConfigs(readonlyConfig);
+ } else {
+ parseFromLocalFileSourceConfig(readonlyConfig);
+ }
+ }
+
+ private void parseFromLocalFileSourceConfigs(ReadonlyConfig
readonlyConfig) {
+ this.hiveSourceConfigs =
+ readonlyConfig.get(HiveSourceOptions.TABLE_CONFIGS).stream()
+ .map(ReadonlyConfig::fromMap)
+ .map(HiveSourceConfig::new)
+ .collect(Collectors.toList());
+ }
+
+ private void parseFromLocalFileSourceConfig(ReadonlyConfig
localFileSourceRootConfig) {
+ HiveSourceConfig hiveSourceConfig = new
HiveSourceConfig(localFileSourceRootConfig);
+ this.hiveSourceConfigs = Lists.newArrayList(hiveSourceConfig);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/reader/MultipleTableHiveSourceReader.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/reader/MultipleTableHiveSourceReader.java
new file mode 100644
index 0000000000..9ea6f1e632
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/reader/MultipleTableHiveSourceReader.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.source.reader;
+
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
+import
org.apache.seatunnel.connectors.seatunnel.file.source.reader.ReadStrategy;
+import
org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSourceSplit;
+import
org.apache.seatunnel.connectors.seatunnel.hive.source.config.HiveSourceConfig;
+import
org.apache.seatunnel.connectors.seatunnel.hive.source.config.MultipleTableHiveSourceConfig;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.stream.Collectors;
+
+import static
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode.FILE_READ_FAILED;
+import static
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode.FILE_READ_STRATEGY_NOT_SUPPORT;
+
+@Slf4j
+public class MultipleTableHiveSourceReader implements
SourceReader<SeaTunnelRow, FileSourceSplit> {
+
+ private final SourceReader.Context context;
+ private volatile boolean noMoreSplit;
+
+ private final Deque<FileSourceSplit> sourceSplits = new
ConcurrentLinkedDeque<>();
+
+ private final Map<String, ReadStrategy> readStrategyMap;
+
+ public MultipleTableHiveSourceReader(
+ SourceReader.Context context,
+ MultipleTableHiveSourceConfig multipleTableHiveSourceConfig) {
+ this.context = context;
+ this.readStrategyMap =
+ multipleTableHiveSourceConfig.getHiveSourceConfigs().stream()
+ .collect(
+ Collectors.toMap(
+ localFileSourceConfig ->
+ localFileSourceConfig
+ .getCatalogTable()
+ .getTableId()
+ .toTablePath()
+ .toString(),
+ HiveSourceConfig::getReadStrategy));
+ }
+
+ @Override
+ public void pollNext(Collector<SeaTunnelRow> output) {
+ synchronized (output.getCheckpointLock()) {
+ FileSourceSplit split = sourceSplits.poll();
+ if (null != split) {
+ ReadStrategy readStrategy =
readStrategyMap.get(split.getTableId());
+ if (readStrategy == null) {
+ throw new FileConnectorException(
+ FILE_READ_STRATEGY_NOT_SUPPORT,
+ "Cannot found the read strategy for this table: ["
+ + split.getTableId()
+ + "]");
+ }
+ try {
+ readStrategy.read(split.getFilePath(), split.getTableId(),
output);
+ } catch (Exception e) {
+ String errorMsg =
+ String.format("Read data from this file [%s]
failed", split.splitId());
+ throw new FileConnectorException(FILE_READ_FAILED,
errorMsg, e);
+ }
+ } else if (noMoreSplit && sourceSplits.isEmpty()) {
+ // signal to the source that we have reached the end of the
data.
+ log.info(
+ "There is no more element for the bounded
MultipleTableLocalFileSourceReader");
+ context.signalNoMoreElement();
+ }
+ }
+ }
+
+ @Override
+ public List<FileSourceSplit> snapshotState(long checkpointId) {
+ return new ArrayList<>(sourceSplits);
+ }
+
+ @Override
+ public void addSplits(List<FileSourceSplit> splits) {
+ sourceSplits.addAll(splits);
+ }
+
+ @Override
+ public void handleNoMoreSplits() {
+ noMoreSplit = true;
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) {
+ // do nothing
+ }
+
+ @Override
+ public void open() throws Exception {
+ // do nothing
+ log.info("Opened the MultipleTableHiveSourceReader");
+ }
+
+ @Override
+ public void close() throws IOException {
+ // do nothing
+ log.info("Closed the MultipleTableHiveSourceReader");
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/split/HiveSourceSplit.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/split/HiveSourceSplit.java
new file mode 100644
index 0000000000..58ebb808d5
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/split/HiveSourceSplit.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.source.split;
+
+import org.apache.seatunnel.api.source.SourceSplit;
+
+import lombok.Getter;
+
+public class HiveSourceSplit implements SourceSplit {
+
+ private static final long serialVersionUID = 1L;
+
+ @Getter private final String tableId;
+ @Getter private final String filePath;
+
+ public HiveSourceSplit(String tableId, String filePath) {
+ this.tableId = tableId;
+ this.filePath = filePath;
+ }
+
+ @Override
+ public String splitId() {
+ return tableId + "_" + filePath;
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/split/MultipleTableHiveSourceSplitEnumerator.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/split/MultipleTableHiveSourceSplitEnumerator.java
new file mode 100644
index 0000000000..aa4701e37b
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/split/MultipleTableHiveSourceSplitEnumerator.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.source.split;
+
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import
org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSourceSplit;
+import
org.apache.seatunnel.connectors.seatunnel.file.source.state.FileSourceState;
+import
org.apache.seatunnel.connectors.seatunnel.hive.source.config.HiveSourceConfig;
+import
org.apache.seatunnel.connectors.seatunnel.hive.source.config.MultipleTableHiveSourceConfig;
+
+import org.apache.commons.collections4.CollectionUtils;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+@Slf4j
+public class MultipleTableHiveSourceSplitEnumerator
+ implements SourceSplitEnumerator<FileSourceSplit, FileSourceState> {
+
+ private final SourceSplitEnumerator.Context<FileSourceSplit> context;
+ private final Set<FileSourceSplit> pendingSplit;
+ private final Set<FileSourceSplit> assignedSplit;
+ private final Map<String, List<String>> filePathMap;
+
+ public MultipleTableHiveSourceSplitEnumerator(
+ SourceSplitEnumerator.Context<FileSourceSplit> context,
+ MultipleTableHiveSourceConfig multipleTableLocalFileSourceConfig) {
+ this.context = context;
+ this.filePathMap =
+
multipleTableLocalFileSourceConfig.getHiveSourceConfigs().stream()
+ .collect(
+ Collectors.toMap(
+ localFileSourceConfig ->
+ localFileSourceConfig
+ .getCatalogTable()
+ .getTableId()
+ .toTablePath()
+ .toString(),
+ HiveSourceConfig::getFilePaths));
+ this.assignedSplit = new HashSet<>();
+ this.pendingSplit = new HashSet<>();
+ }
+
+ public MultipleTableHiveSourceSplitEnumerator(
+ SourceSplitEnumerator.Context<FileSourceSplit> context,
+ MultipleTableHiveSourceConfig multipleTableLocalFileSourceConfig,
+ FileSourceState localFileSourceState) {
+ this(context, multipleTableLocalFileSourceConfig);
+ this.assignedSplit.addAll(localFileSourceState.getAssignedSplit());
+ }
+
+ @Override
+ public void addSplitsBack(List<FileSourceSplit> splits, int subtaskId) {
+ if (CollectionUtils.isEmpty(splits)) {
+ return;
+ }
+ pendingSplit.addAll(splits);
+ assignSplit(subtaskId);
+ }
+
+ @Override
+ public int currentUnassignedSplitSize() {
+ return pendingSplit.size();
+ }
+
+ @Override
+ public void handleSplitRequest(int subtaskId) {}
+
+ @Override
+ public void registerReader(int subtaskId) {
+ for (Map.Entry<String, List<String>> filePathEntry :
filePathMap.entrySet()) {
+ String tableId = filePathEntry.getKey();
+ List<String> filePaths = filePathEntry.getValue();
+ for (String filePath : filePaths) {
+ pendingSplit.add(new FileSourceSplit(tableId, filePath));
+ }
+ }
+ assignSplit(subtaskId);
+ }
+
+ @Override
+ public FileSourceState snapshotState(long checkpointId) {
+ return new FileSourceState(assignedSplit);
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) {
+ // do nothing.
+ }
+
+ private void assignSplit(int taskId) {
+ List<FileSourceSplit> currentTaskSplits = new ArrayList<>();
+ if (context.currentParallelism() == 1) {
+ // if parallelism == 1, we should assign all the splits to reader
+ currentTaskSplits.addAll(pendingSplit);
+ } else {
+ // if parallelism > 1, according to hashCode of split's id to
determine whether to
+ // allocate the current task
+ for (FileSourceSplit fileSourceSplit : pendingSplit) {
+ int splitOwner =
+ getSplitOwner(fileSourceSplit.splitId(),
context.currentParallelism());
+ if (splitOwner == taskId) {
+ currentTaskSplits.add(fileSourceSplit);
+ }
+ }
+ }
+ // assign splits
+ context.assignSplit(taskId, currentTaskSplits);
+ // save the state of assigned splits
+ assignedSplit.addAll(currentTaskSplits);
+ // remove the assigned splits from pending splits
+ currentTaskSplits.forEach(pendingSplit::remove);
+ log.info(
+ "SubTask {} is assigned to [{}]",
+ taskId,
+ currentTaskSplits.stream()
+ .map(FileSourceSplit::splitId)
+ .collect(Collectors.joining(",")));
+ context.signalNoMoreSplits(taskId);
+ }
+
+ private static int getSplitOwner(String tp, int numReaders) {
+ return (tp.hashCode() & Integer.MAX_VALUE) % numReaders;
+ }
+
+ @Override
+ public void open() {
+ // do nothing
+ }
+
+ @Override
+ public void run() throws Exception {
+ // do nothing
+ }
+
+ @Override
+ public void close() throws IOException {
+ // do nothing
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/state/HiveSourceState.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/state/HiveSourceState.java
new file mode 100644
index 0000000000..cd9da5351e
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/state/HiveSourceState.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.source.state;
+
+import
org.apache.seatunnel.connectors.seatunnel.hive.source.split.HiveSourceSplit;
+
+import java.io.Serializable;
+import java.util.Set;
+
+public class HiveSourceState implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private final Set<HiveSourceSplit> assignedSplit;
+
+ public HiveSourceState(Set<HiveSourceSplit> assignedSplit) {
+ this.assignedSplit = assignedSplit;
+ }
+
+ public Set<HiveSourceSplit> getAssignedSplit() {
+ return assignedSplit;
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java
index 6a1288b661..b3c463d804 100644
---
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java
+++
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java
@@ -17,15 +17,14 @@
package org.apache.seatunnel.connectors.seatunnel.hive.utils;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.common.config.TypesafeConfigUtils;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import
org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions;
import
org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopLoginFactory;
-import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig;
import
org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorErrorCode;
import
org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorException;
+import
org.apache.seatunnel.connectors.seatunnel.hive.source.config.HiveSourceOptions;
+import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
@@ -47,36 +46,31 @@ public class HiveMetaStoreProxy {
private HiveMetaStoreClient hiveMetaStoreClient;
private static volatile HiveMetaStoreProxy INSTANCE = null;
- private HiveMetaStoreProxy(Config config) {
- String metastoreUri = config.getString(HiveConfig.METASTORE_URI.key());
-
+ private HiveMetaStoreProxy(ReadonlyConfig readonlyConfig) {
+ String metastoreUri =
readonlyConfig.get(HiveSourceOptions.METASTORE_URI);
+ HiveConf hiveConf = new HiveConf();
+ hiveConf.set("hive.metastore.uris", metastoreUri);
try {
- HiveConf hiveConf = new HiveConf();
- hiveConf.set("hive.metastore.uris", metastoreUri);
- if (config.hasPath(HiveConfig.HIVE_SITE_PATH.key())) {
- String hiveSitePath =
config.getString(HiveConfig.HIVE_SITE_PATH.key());
+ if
(StringUtils.isNotEmpty(readonlyConfig.get(HiveSourceOptions.HIVE_SITE_PATH))) {
+ String hiveSitePath =
readonlyConfig.get(HiveSourceOptions.HIVE_SITE_PATH);
hiveConf.addResource(new File(hiveSitePath).toURI().toURL());
}
- if (HiveMetaStoreProxyUtils.enableKerberos(config)) {
+ if (HiveMetaStoreProxyUtils.enableKerberos(readonlyConfig)) {
this.hiveMetaStoreClient =
HadoopLoginFactory.loginWithKerberos(
new Configuration(),
- TypesafeConfigUtils.getConfig(
- config,
-
BaseSourceConfigOptions.KRB5_PATH.key(),
-
BaseSourceConfigOptions.KRB5_PATH.defaultValue()),
-
config.getString(BaseSourceConfigOptions.KERBEROS_PRINCIPAL.key()),
- config.getString(
-
BaseSourceConfigOptions.KERBEROS_KEYTAB_PATH.key()),
+
readonlyConfig.get(BaseSourceConfigOptions.KRB5_PATH),
+
readonlyConfig.get(BaseSourceConfigOptions.KERBEROS_PRINCIPAL),
+
readonlyConfig.get(BaseSourceConfigOptions.KERBEROS_KEYTAB_PATH),
(configuration, userGroupInformation) ->
new HiveMetaStoreClient(hiveConf));
return;
}
- if (HiveMetaStoreProxyUtils.enableRemoteUser(config)) {
+ if (HiveMetaStoreProxyUtils.enableRemoteUser(readonlyConfig)) {
this.hiveMetaStoreClient =
HadoopLoginFactory.loginWithRemoteUser(
new Configuration(),
-
config.getString(BaseSourceConfigOptions.REMOTE_USER.key()),
+
readonlyConfig.get(BaseSourceConfigOptions.REMOTE_USER),
(configuration, userGroupInformation) ->
new HiveMetaStoreClient(hiveConf));
return;
@@ -95,7 +89,7 @@ public class HiveMetaStoreProxy {
String.format(
"Using this hive uris [%s], hive conf [%s] to
initialize "
+ "hive metastore client instance failed",
- metastoreUri,
config.getString(HiveConfig.HIVE_SITE_PATH.key()));
+ metastoreUri,
readonlyConfig.get(HiveSourceOptions.HIVE_SITE_PATH));
throw new HiveConnectorException(
HiveConnectorErrorCode.INITIALIZE_HIVE_METASTORE_CLIENT_FAILED, errorMsg, e);
} catch (Exception e) {
@@ -106,11 +100,11 @@ public class HiveMetaStoreProxy {
}
}
- public static HiveMetaStoreProxy getInstance(Config config) {
+ public static HiveMetaStoreProxy getInstance(ReadonlyConfig
readonlyConfig) {
if (INSTANCE == null) {
synchronized (HiveMetaStoreProxy.class) {
if (INSTANCE == null) {
- INSTANCE = new HiveMetaStoreProxy(config);
+ INSTANCE = new HiveMetaStoreProxy(readonlyConfig);
}
}
}
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxyUtils.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxyUtils.java
index fda221d886..f1474f7694 100644
---
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxyUtils.java
+++
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxyUtils.java
@@ -17,8 +17,7 @@
package org.apache.seatunnel.connectors.seatunnel.hive.utils;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import
org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions;
import lombok.experimental.UtilityClass;
@@ -26,11 +25,11 @@ import lombok.experimental.UtilityClass;
@UtilityClass
public class HiveMetaStoreProxyUtils {
- public boolean enableKerberos(Config config) {
+ public boolean enableKerberos(ReadonlyConfig config) {
boolean kerberosPrincipalEmpty =
-
config.hasPath(BaseSourceConfigOptions.KERBEROS_PRINCIPAL.key());
+
config.getOptional(BaseSourceConfigOptions.KERBEROS_PRINCIPAL).isPresent();
boolean kerberosKeytabPathEmpty =
-
config.hasPath(BaseSourceConfigOptions.KERBEROS_KEYTAB_PATH.key());
+
config.getOptional(BaseSourceConfigOptions.KERBEROS_KEYTAB_PATH).isPresent();
if (kerberosKeytabPathEmpty && kerberosPrincipalEmpty) {
return true;
}
@@ -43,7 +42,7 @@ public class HiveMetaStoreProxyUtils {
throw new IllegalArgumentException("Please set kerberosKeytabPath");
}
- public boolean enableRemoteUser(Config config) {
- return config.hasPath(BaseSourceConfigOptions.REMOTE_USER.key());
+ public boolean enableRemoteUser(ReadonlyConfig config) {
+ return
config.getOptional(BaseSourceConfigOptions.REMOTE_USER).isPresent();
}
}
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveTableUtils.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveTableUtils.java
new file mode 100644
index 0000000000..e4282db204
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveTableUtils.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.utils;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
+import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConstants;
+import
org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorException;
+import
org.apache.seatunnel.connectors.seatunnel.hive.source.config.HiveSourceOptions;
+
+import org.apache.hadoop.hive.metastore.api.Table;
+
+public class HiveTableUtils {
+
+ public static Table getTableInfo(ReadonlyConfig readonlyConfig) {
+ String table = readonlyConfig.get(HiveSourceOptions.TABLE_NAME);
+ TablePath tablePath = TablePath.of(table);
+ if (tablePath.getDatabaseName() == null || tablePath.getTableName() ==
null) {
+ throw new SeaTunnelRuntimeException(
+ HiveConnectorErrorCode.HIVE_TABLE_NAME_ERROR, "Current
table name is " + table);
+ }
+ HiveMetaStoreProxy hiveMetaStoreProxy =
HiveMetaStoreProxy.getInstance(readonlyConfig);
+ Table tableInformation =
+ hiveMetaStoreProxy.getTable(tablePath.getDatabaseName(),
tablePath.getTableName());
+ hiveMetaStoreProxy.close();
+ return tableInformation;
+ }
+
+ public static FileFormat parseFileFormat(Table table) {
+ String inputFormat = table.getSd().getInputFormat();
+ if (HiveConstants.TEXT_INPUT_FORMAT_CLASSNAME.equals(inputFormat)) {
+ return FileFormat.TEXT;
+ }
+ if (HiveConstants.PARQUET_INPUT_FORMAT_CLASSNAME.equals(inputFormat)) {
+ return FileFormat.PARQUET;
+ }
+ if (HiveConstants.ORC_INPUT_FORMAT_CLASSNAME.equals(inputFormat)) {
+ return FileFormat.ORC;
+ }
+ throw new HiveConnectorException(
+ CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
+ "Hive connector only support [text parquet orc] table now");
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveTypeConvertor.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveTypeConvertor.java
new file mode 100644
index 0000000000..7aac96c978
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveTypeConvertor.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.utils;
+
+import org.apache.seatunnel.api.table.catalog.SeaTunnelDataTypeConvertorUtil;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.common.exception.CommonError;
+import org.apache.seatunnel.common.utils.JsonUtils;
+import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConstants;
+
+import java.util.LinkedHashMap;
+
+public class HiveTypeConvertor {
+
+ public static SeaTunnelDataType<?> covertHiveTypeToSeaTunnelType(String
name, String hiveType) {
+ if (hiveType.contains("varchar")) {
+ return BasicType.STRING_TYPE;
+ }
+ if (hiveType.contains("char")) {
+ throw CommonError.convertToSeaTunnelTypeError(
+ HiveConstants.CONNECTOR_NAME, PluginType.SOURCE, hiveType,
name);
+ }
+ if (hiveType.contains("binary")) {
+ return PrimitiveByteArrayType.INSTANCE;
+ }
+ if (hiveType.contains("struct")) {
+ LinkedHashMap<String, Object> fields = new LinkedHashMap<>();
+ int start = hiveType.indexOf("<");
+ int end = hiveType.lastIndexOf(">");
+ String[] columns = hiveType.substring(start + 1, end).split(",");
+ for (String column : columns) {
+ String[] splits = column.split(":");
+ fields.put(
+ splits[0], covertHiveTypeToSeaTunnelType(splits[0],
splits[1]).toString());
+ }
+ return SeaTunnelDataTypeConvertorUtil.deserializeSeaTunnelDataType(
+ name, JsonUtils.toJsonString(fields));
+ }
+ return
SeaTunnelDataTypeConvertorUtil.deserializeSeaTunnelDataType(name, hiveType);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-hive/src/test/java/org/apache/seatunnel/connectors/seatunnel/hive/source/TypeConvertTest.java
b/seatunnel-connectors-v2/connector-hive/src/test/java/org/apache/seatunnel/connectors/seatunnel/hive/source/TypeConvertTest.java
deleted file mode 100644
index 3810085b36..0000000000
---
a/seatunnel-connectors-v2/connector-hive/src/test/java/org/apache/seatunnel/connectors/seatunnel/hive/source/TypeConvertTest.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.hive.source;
-
-import org.apache.seatunnel.common.utils.ReflectionUtils;
-
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
-import org.apache.hadoop.hive.metastore.api.Table;
-
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.util.Optional;
-
-public class TypeConvertTest {
-
- @Test
- void testWithUnsupportedType() {
- Optional<Method> parseSchema =
- ReflectionUtils.getDeclaredMethod(HiveSource.class,
"parseSchema", Table.class);
- Assertions.assertTrue(parseSchema.isPresent());
- Table table = new Table();
- table.setSd(new StorageDescriptor());
- table.getSd().addToCols(new FieldSchema("test", "char", null));
- InvocationTargetException exception =
- Assertions.assertThrows(
- InvocationTargetException.class,
- () -> parseSchema.get().invoke(new HiveSource(),
table));
- Assertions.assertEquals(
- "ErrorCode:[COMMON-16], ErrorDescription:['Hive' source
unsupported convert type 'char' of 'test' to SeaTunnel data type.]",
- exception.getCause().getMessage());
- }
-}
diff --git
a/seatunnel-connectors-v2/connector-hive/src/test/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxyUtilsTest.java
b/seatunnel-connectors-v2/connector-hive/src/test/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxyUtilsTest.java
index ad952112c4..eb0afe9d4a 100644
---
a/seatunnel-connectors-v2/connector-hive/src/test/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxyUtilsTest.java
+++
b/seatunnel-connectors-v2/connector-hive/src/test/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxyUtilsTest.java
@@ -20,6 +20,8 @@ package org.apache.seatunnel.connectors.seatunnel.hive.utils;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+
import org.junit.jupiter.api.Test;
import lombok.SneakyThrows;
@@ -35,7 +37,7 @@ class HiveMetaStoreProxyUtilsTest {
@Test
void enableKerberos() {
- Config config = parseConfig("/hive_without_kerberos.conf");
+ ReadonlyConfig config = parseConfig("/hive_without_kerberos.conf");
assertFalse(HiveMetaStoreProxyUtils.enableKerberos(config));
assertFalse(HiveMetaStoreProxyUtils.enableRemoteUser(config));
@@ -48,9 +50,10 @@ class HiveMetaStoreProxyUtilsTest {
}
@SneakyThrows
- private Config parseConfig(String configFile) {
+ private ReadonlyConfig parseConfig(String configFile) {
URL resource =
HiveMetaStoreProxyUtilsTest.class.getResource(configFile);
String filePath = Paths.get(resource.toURI()).toString();
- return ConfigFactory.parseFile(new File(filePath));
+ Config config = ConfigFactory.parseFile(new File(filePath));
+ return ReadonlyConfig.fromConfig(config);
}
}
diff --git
a/seatunnel-connectors-v2/connector-hive/src/test/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveTypeConvertorTest.java
b/seatunnel-connectors-v2/connector-hive/src/test/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveTypeConvertorTest.java
new file mode 100644
index 0000000000..ef87e083a4
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-hive/src/test/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveTypeConvertorTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.utils;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.api.table.type.SqlType;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class HiveTypeConvertorTest {
+
+ @Test
+ void covertHiveTypeToSeaTunnelType() {
+ SeaTunnelRuntimeException exception =
+ Assertions.assertThrows(
+ SeaTunnelRuntimeException.class,
+ () ->
HiveTypeConvertor.covertHiveTypeToSeaTunnelType("test", "char"));
+ assertEquals(
+ "ErrorCode:[COMMON-16], ErrorDescription:['Hive' source
unsupported convert type 'char' of 'test' to SeaTunnel data type.]",
+ exception.getMessage());
+ }
+
+ @Test
+ void convertHiveStructType() {
+ SeaTunnelDataType<?> structType =
+ HiveTypeConvertor.covertHiveTypeToSeaTunnelType(
+ "structType", "struct<country:String,city:String>");
+ assertEquals(SqlType.ROW, structType.getSqlType());
+ SeaTunnelRowType seaTunnelRowType = (SeaTunnelRowType) structType;
+ assertEquals(BasicType.STRING_TYPE, seaTunnelRowType.getFieldType(0));
+ assertEquals(BasicType.STRING_TYPE, seaTunnelRowType.getFieldType(0));
+ }
+}
diff --git
a/seatunnel-dist/src/test/java/org/apache/seatunnel/api/connector/ConnectorSpecificationCheckTest.java
b/seatunnel-dist/src/test/java/org/apache/seatunnel/api/connector/ConnectorSpecificationCheckTest.java
index 5b46d81201..6d59ff27f5 100644
---
a/seatunnel-dist/src/test/java/org/apache/seatunnel/api/connector/ConnectorSpecificationCheckTest.java
+++
b/seatunnel-dist/src/test/java/org/apache/seatunnel/api/connector/ConnectorSpecificationCheckTest.java
@@ -64,6 +64,7 @@ public class ConnectorSpecificationCheckTest {
// hive-exec.jar. We need to check manually.
List<String> blockList = new ArrayList<>();
blockList.add("HiveSourceFactory");
+ blockList.add("HiveSinkFactory");
for (TableSourceFactory factory : sourceFactories) {
if (ReflectionUtils.getDeclaredMethod(
diff --git a/seatunnel-examples/seatunnel-engine-examples/pom.xml
b/seatunnel-examples/seatunnel-engine-examples/pom.xml
index 3256bdde88..1f22a56658 100644
--- a/seatunnel-examples/seatunnel-engine-examples/pom.xml
+++ b/seatunnel-examples/seatunnel-engine-examples/pom.xml
@@ -67,5 +67,55 @@
<artifactId>connector-assert</artifactId>
<version>${project.version}</version>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-hive</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-exec</artifactId>
+ <version>2.3.9</version>
+ <exclusions>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-1.2-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-slf4j-impl</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-web</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.parquet</groupId>
+ <artifactId>parquet-hadoop-bundle</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>jdk.tools</groupId>
+ <artifactId>jdk.tools</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.pentaho</groupId>
+ <artifactId>pentaho-aggdesigner-algorithm</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
</dependencies>
</project>