This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new a357dca36 [Improve][Connector-V2] Refactor hive source & sink
connector (#2708)
a357dca36 is described below
commit a357dca36503c9ef49bdfb15c7a3bfeaa50ca0b8
Author: TyrantLucifer <[email protected]>
AuthorDate: Thu Sep 15 13:49:02 2022 +0800
[Improve][Connector-V2] Refactor hive source & sink connector (#2708)
---
docs/en/connector-v2/sink/Hive.md | 34 ++--
docs/en/connector-v2/source/Hive.md | 47 ++++++
seatunnel-connectors-v2/connector-hive/pom.xml | 131 +++------------
.../connectors/seatunnel/hive/config/Config.java | 21 ---
.../connectors/seatunnel/hive/config/Constant.java | 23 ---
.../seatunnel/hive/config/HiveConfig.java | 47 ++++++
.../seatunnel/hive/config/SourceConfig.java | 29 ----
.../hive/exception/HivePluginException.java | 29 ----
.../seatunnel/hive/sink/BaseHiveCommitInfo.java | 36 ----
.../hive/sink/HiveAggregatedCommitInfo.java | 36 ----
.../seatunnel/hive/sink/HiveCommitInfo.java | 36 ----
.../connectors/seatunnel/hive/sink/HiveSink.java | 166 +++++++++---------
.../hive/sink/HiveSinkAggregatedCommitter.java | 139 ----------------
.../seatunnel/hive/sink/HiveSinkConfig.java | 155 -----------------
.../seatunnel/hive/sink/HiveSinkState.java | 30 ----
.../seatunnel/hive/sink/HiveSinkWriter.java | 160 ------------------
.../seatunnel/hive/source/HadoopConf.java | 34 ----
.../seatunnel/hive/source/HiveSource.java | 100 +++++------
.../seatunnel/hive/source/HiveSourceReader.java | 98 -----------
.../seatunnel/hive/source/HiveSourceSplit.java | 36 ----
.../hive/source/HiveSourceSplitEnumerator.java | 126 --------------
.../seatunnel/hive/source/HiveSourceState.java | 35 ----
.../file/reader/format/AbstractReadStrategy.java | 84 ----------
.../source/file/reader/format/OrcReadStrategy.java | 168 -------------------
.../file/reader/format/ParquetReadStrategy.java | 185 ---------------------
.../source/file/reader/format/ReadStrategy.java | 42 -----
.../file/reader/format/ReadStrategyFactory.java | 38 -----
.../file/reader/format/TextReadStrategy.java | 56 -------
.../hive/source/file/reader/type/FileTypeEnum.java | 48 ------
.../seatunnel/hive/utils/HiveMetaStoreProxy.java | 45 ++++-
.../seatunnel/hive/sink/HiveSinkConfigTest.java | 51 ------
31 files changed, 293 insertions(+), 1972 deletions(-)
diff --git a/docs/en/connector-v2/sink/Hive.md
b/docs/en/connector-v2/sink/Hive.md
index 56b49ad7b..59d8dc5c3 100644
--- a/docs/en/connector-v2/sink/Hive.md
+++ b/docs/en/connector-v2/sink/Hive.md
@@ -22,22 +22,22 @@ By default, we use 2PC commit to ensure `exactly-once`
## Options
-| name | type | required | default value
|
-| --------------------------------- | ------ | -------- |
------------------------------------------------------------- |
-| hive_table_name | string | yes | -
|
-| hive_metastore_uris | string | yes | -
|
-| partition_by | array | no | -
|
-| sink_columns | array | no | When this parameter
is empty, all fields are sink columns |
-| is_enable_transaction | boolean| no | true
|
-| save_mode | string | no | "append"
|
+| name | type | required | default value
|
+|-----------------------| ------ | -------- |
------------------------------------------------------------- |
+| table_name | string | yes | -
|
+| metastore_uri | string | yes | -
|
+| partition_by | array | no | -
|
+| sink_columns | array | no | When this parameter is empty,
all fields are sink columns |
+| is_enable_transaction | boolean| no | true
|
+| save_mode | string | no | "append"
|
-### hive_table_name [string]
+### table_name [string]
Target Hive table name eg: db1.table1
-### hive_metastore_uris [string]
+### metastore_uri [string]
-Hive metastore uris
+Hive metastore uri
### partition_by [array]
@@ -64,13 +64,9 @@ Streaming Job not support `overwrite`.
```bash
-Hive {
- hive_table_name="db1.table1"
- hive_metastore_uris="thrift://localhost:9083"
- partition_by=["age"]
- sink_columns=["name","age"]
- is_enable_transaction=true
- save_mode="append"
-}
+ Hive {
+ table_name = "default.seatunnel_orc"
+ metastore_uri = "thrift://namenode001:9083"
+ }
```
diff --git a/docs/en/connector-v2/source/Hive.md
b/docs/en/connector-v2/source/Hive.md
new file mode 100644
index 000000000..dcc4030f5
--- /dev/null
+++ b/docs/en/connector-v2/source/Hive.md
@@ -0,0 +1,47 @@
+# Hive
+
+> Hive source connector
+
+## Description
+
+Read data from Hive.
+
+In order to use this connector, You must ensure your spark/flink cluster
already integrated hive. The tested hive version is 2.3.9.
+
+## Key features
+
+- [x] [exactly-once](../../concept/connector-v2-features.md)
+
+By default, we use 2PC commit to ensure `exactly-once`
+
+- [ ] [schema projection](../../concept/connector-v2-features.md)
+- [x] file format
+ - [x] text
+ - [x] parquet
+ - [x] orc
+
+## Options
+
+| name | type | required | default value
|
+|-----------------------| ------ | -------- |
------------------------------------------------------------- |
+| table_name | string | yes | -
|
+| metastore_uri | string | yes | -
|
+
+### table_name [string]
+
+Target Hive table name eg: db1.table1
+
+### metastore_uri [string]
+
+Hive metastore uri
+
+## Example
+
+```bash
+
+ Hive {
+ table_name = "default.seatunnel_orc"
+ metastore_uri = "thrift://namenode001:9083"
+ }
+
+```
diff --git a/seatunnel-connectors-v2/connector-hive/pom.xml
b/seatunnel-connectors-v2/connector-hive/pom.xml
index a00016488..39c6f6429 100644
--- a/seatunnel-connectors-v2/connector-hive/pom.xml
+++ b/seatunnel-connectors-v2/connector-hive/pom.xml
@@ -30,146 +30,61 @@
<artifactId>connector-hive</artifactId>
<properties>
- <hive.exec.version>2.3.9</hive.exec.version>
- <flink.hadoop.version>2.7.5-7.0</flink.hadoop.version>
- <orc.version>1.5.6</orc.version>
- <parquet-avro.version>1.10.0</parquet-avro.version>
- <commons.collecton4.version>4.4</commons.collecton4.version>
- <commons.lang3.version>3.4</commons.lang3.version>
- <janino.version>3.1.6</janino.version>
+ <hive.metastore.version>2.3.9</hive.metastore.version>
</properties>
<dependencies>
<dependency>
- <groupId>org.apache.hive</groupId>
- <artifactId>hive-exec</artifactId>
- <version>${hive.exec.version}</version>
- <scope>provided</scope>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-file-hadoop</artifactId>
+ <version>${project.version}</version>
<exclusions>
<exclusion>
- <groupId>org.pentaho</groupId>
- <artifactId>pentaho-aggdesigner-algorithm</artifactId>
- </exclusion>
- <exclusion>
- <groupId>javax.servlet</groupId>
- <artifactId>servlet-api</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.logging.log4j</groupId>
- <artifactId>log4j-1.2-api</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.logging.log4j</groupId>
- <artifactId>log4j-web</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>*</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apapche.hadoop</groupId>
- <artifactId>*</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.github.joshelser</groupId>
-
<artifactId>dropwizard-metrics-hadoop-metrics2-reporter</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.logging.log4j</groupId>
- <artifactId>*</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.zookeeper</groupId>
- <artifactId>zookeeper</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-yarn-server-resourcemanager</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-hdfs</artifactId>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-shaded-hadoop-2</artifactId>
</exclusion>
</exclusions>
</dependency>
-
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-hadoop-2</artifactId>
- <version>${flink.hadoop.version}</version>
<scope>provided</scope>
</dependency>
-
- <dependency>
- <groupId>org.apache.seatunnel</groupId>
- <artifactId>seatunnel-core-base</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- </dependency>
-
<dependency>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-lang3</artifactId>
- <version>${commons.lang3.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.orc</groupId>
- <artifactId>orc-core</artifactId>
- <version>${orc.version}</version>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-metastore</artifactId>
+ <version>${hive.metastore.version}</version>
<exclusions>
<exclusion>
- <groupId>javax.servlet</groupId>
- <artifactId>servlet-api</artifactId>
+ <artifactId>log4j</artifactId>
+ <groupId>log4j</groupId>
</exclusion>
<exclusion>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
+ <artifactId>log4j-1.2-api</artifactId>
+ <groupId>org.apache.logging.log4j</groupId>
</exclusion>
<exclusion>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-hdfs</artifactId>
+ <artifactId>log4j-slf4j-impl</artifactId>
+ <groupId>org.apache.logging.log4j</groupId>
</exclusion>
<exclusion>
+ <artifactId>log4j-web</artifactId>
<groupId>org.apache.logging.log4j</groupId>
- <artifactId>*</artifactId>
</exclusion>
<exclusion>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>*</artifactId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
- <groupId>org.apapche.hadoop</groupId>
- <artifactId>*</artifactId>
+ <artifactId>parquet-hadoop-bundle</artifactId>
+ <groupId>org.apache.parquet</groupId>
</exclusion>
<exclusion>
- <groupId>org.apache.curator</groupId>
- <artifactId>*</artifactId>
+ <artifactId>jdk.tools</artifactId>
+ <groupId>jdk.tools</groupId>
</exclusion>
</exclusions>
</dependency>
-
- <dependency>
- <groupId>org.apache.parquet</groupId>
- <artifactId>parquet-avro</artifactId>
- <version>${parquet-avro.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-collections4</artifactId>
- <version>${commons.collecton4.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.seatunnel</groupId>
- <artifactId>connector-file-hadoop</artifactId>
- <version>${project.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.codehaus.janino</groupId>
- <artifactId>janino</artifactId>
- <version>${janino.version}</version>
- </dependency>
</dependencies>
+
</project>
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/Config.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/Config.java
deleted file mode 100644
index 2177ffbde..000000000
---
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/Config.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.hive.config;
-
-public class Config {
-}
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/Constant.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/Constant.java
deleted file mode 100644
index dd0b2ab54..000000000
---
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/Constant.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.hive.config;
-
-public class Constant {
- public static final String HIVE_RESULT_TABLE_NAME = "hive_table_name";
- public static final String HIVE_METASTORE_URIS = "hive_metastore_uris";
-}
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/HiveConfig.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/HiveConfig.java
new file mode 100644
index 000000000..16ab34b5e
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/HiveConfig.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.config;
+
+import org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveMetaStoreProxy;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hive.metastore.api.Table;
+
+public class HiveConfig {
+ public static final String TABLE_NAME = "table_name";
+ public static final String METASTORE_URI = "metastore_uri";
+ public static final String TEXT_INPUT_FORMAT_CLASSNAME =
"org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextInputFormat";
+ public static final String TEXT_OUTPUT_FORMAT_CLASSNAME =
"org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat";
+ public static final String PARQUET_INPUT_FORMAT_CLASSNAME =
"org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat";
+ public static final String PARQUET_OUTPUT_FORMAT_CLASSNAME =
"org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat";
+ public static final String ORC_INPUT_FORMAT_CLASSNAME =
"org.apache.hadoop.hive.ql.io.orc.OrcInputFormat";
+ public static final String ORC_OUTPUT_FORMAT_CLASSNAME =
"org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat";
+
+ public static Pair<String[], Table> getTableInfo(Config config) {
+ String table = config.getString(TABLE_NAME);
+ String[] splits = table.split("\\.");
+ if (splits.length != 2) {
+ throw new RuntimeException("Please config " + TABLE_NAME + " as
db.table format");
+ }
+ HiveMetaStoreProxy hiveMetaStoreProxy =
HiveMetaStoreProxy.getInstance(config);
+ Table tableInformation = hiveMetaStoreProxy.getTable(splits[0],
splits[1]);
+ return Pair.of(splits, tableInformation);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/SourceConfig.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/SourceConfig.java
deleted file mode 100644
index 705440316..000000000
---
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/SourceConfig.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.hive.config;
-
-import static org.apache.hadoop.fs.FileSystem.FS_DEFAULT_NAME_KEY;
-
-public class SourceConfig {
-
- public static final String FILE_TYPE = "file.type";
-
- public static final String DEFAULT_FS = FS_DEFAULT_NAME_KEY;
-
- public static final String FILE_PATH = "file.path";
-}
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/exception/HivePluginException.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/exception/HivePluginException.java
deleted file mode 100644
index 8e1c22548..000000000
---
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/exception/HivePluginException.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.hive.exception;
-
-public class HivePluginException extends Exception{
-
- public HivePluginException(String message) {
- super(message);
- }
-
- public HivePluginException(String message, Throwable cause) {
- super(message, cause);
- }
-}
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/BaseHiveCommitInfo.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/BaseHiveCommitInfo.java
deleted file mode 100644
index 7422ecf4c..000000000
---
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/BaseHiveCommitInfo.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.hive.sink;
-
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-import org.apache.hadoop.hive.metastore.api.Table;
-
-import java.io.Serializable;
-
-@Data
-@AllArgsConstructor
-@NoArgsConstructor
-public class BaseHiveCommitInfo implements Serializable {
-
- private String hiveMetastoreUris;
-
- private Table table;
-
-}
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveAggregatedCommitInfo.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveAggregatedCommitInfo.java
deleted file mode 100644
index 1311c925d..000000000
---
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveAggregatedCommitInfo.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.hive.sink;
-
-import
org.apache.seatunnel.connectors.seatunnel.file.sink.FileAggregatedCommitInfo;
-
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import org.apache.hadoop.hive.metastore.api.Table;
-
-@Data
-@AllArgsConstructor
-public class HiveAggregatedCommitInfo extends BaseHiveCommitInfo {
-
- private FileAggregatedCommitInfo fileAggregatedCommitInfo;
-
- public HiveAggregatedCommitInfo(FileAggregatedCommitInfo
fileAggregatedCommitInfo, String hiveMetastoreUris, Table table) {
- super(hiveMetastoreUris, table);
- this.fileAggregatedCommitInfo = fileAggregatedCommitInfo;
- }
-}
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveCommitInfo.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveCommitInfo.java
deleted file mode 100644
index 509352f46..000000000
---
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveCommitInfo.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.hive.sink;
-
-import org.apache.seatunnel.connectors.seatunnel.file.sink.FileCommitInfo;
-
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import org.apache.hadoop.hive.metastore.api.Table;
-
-@Data
-@AllArgsConstructor
-public class HiveCommitInfo extends BaseHiveCommitInfo {
-
- private FileCommitInfo fileCommitInfo;
-
- public HiveCommitInfo(FileCommitInfo fileCommitInfo, String
hiveMetastoreUris, Table table) {
- super(hiveMetastoreUris, table);
- this.fileCommitInfo = fileCommitInfo;
- }
-}
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
index 36bd50ca3..9adae1158 100644
---
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
+++
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
@@ -17,40 +17,50 @@
package org.apache.seatunnel.connectors.seatunnel.hive.sink;
-import org.apache.seatunnel.api.common.JobContext;
+import static
org.apache.seatunnel.connectors.seatunnel.file.config.Constant.FIELD_DELIMITER;
+import static
org.apache.seatunnel.connectors.seatunnel.file.config.Constant.FILE_FORMAT;
+import static
org.apache.seatunnel.connectors.seatunnel.file.config.Constant.FILE_NAME_EXPRESSION;
+import static
org.apache.seatunnel.connectors.seatunnel.file.config.Constant.IS_PARTITION_FIELD_WRITE_IN_FILE;
+import static
org.apache.seatunnel.connectors.seatunnel.file.config.Constant.PATH;
+import static
org.apache.seatunnel.connectors.seatunnel.file.config.Constant.ROW_DELIMITER;
+import static
org.apache.seatunnel.connectors.seatunnel.file.config.Constant.SAVE_MODE;
+import static
org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig.ORC_OUTPUT_FORMAT_CLASSNAME;
+import static
org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig.PARQUET_OUTPUT_FORMAT_CLASSNAME;
+import static
org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig.TEXT_OUTPUT_FORMAT_CLASSNAME;
+
import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.serialization.DefaultSerializer;
-import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
-import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
-import org.apache.seatunnel.api.sink.SinkWriter;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.constants.JobMode;
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.BaseFileSink;
import org.apache.seatunnel.connectors.seatunnel.file.sink.config.SaveMode;
+import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig;
+import org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveMetaStoreProxy;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory;
import com.google.auto.service.AutoService;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Table;
-import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
import java.util.List;
-import java.util.Optional;
+import java.util.Map;
+import java.util.stream.Collectors;
-/**
- * Hive Sink implementation by using SeaTunnel sink API.
- * This class contains the method to create {@link HiveSinkWriter} and {@link
HiveSinkAggregatedCommitter}.
- */
@AutoService(SeaTunnelSink.class)
-public class HiveSink implements SeaTunnelSink<SeaTunnelRow, HiveSinkState,
HiveCommitInfo, HiveAggregatedCommitInfo> {
-
- private Config config;
- private String jobId;
- private Long checkpointId;
- private SeaTunnelRowType seaTunnelRowTypeInfo;
- private JobContext jobContext;
- private HiveSinkConfig hiveSinkConfig;
+public class HiveSink extends BaseFileSink {
+ private String dbName;
+ private String tableName;
+ private Table tableInformation;
@Override
public String getPluginName() {
@@ -58,73 +68,67 @@ public class HiveSink implements
SeaTunnelSink<SeaTunnelRow, HiveSinkState, Hive
}
@Override
- public void setTypeInfo(SeaTunnelRowType seaTunnelRowTypeInfo) {
- this.seaTunnelRowTypeInfo = seaTunnelRowTypeInfo;
- this.hiveSinkConfig = new HiveSinkConfig(config, seaTunnelRowTypeInfo);
- }
+ public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
+ super.setTypeInfo(seaTunnelRowType);
+ HiveMetaStoreProxy hiveMetaStoreProxy =
HiveMetaStoreProxy.getInstance(pluginConfig);
+ // --------------------Check textFileSinkConfig with the hive table
info-------------------
+ List<FieldSchema> fields = hiveMetaStoreProxy.getTableFields(dbName,
tableName);
+ List<FieldSchema> partitionKeys = tableInformation.getPartitionKeys();
+
+ // Remove partitionKeys from table fields
+ List<FieldSchema> fieldNotContainPartitionKey =
fields.stream().filter(filed ->
!partitionKeys.contains(filed)).collect(Collectors.toList());
+
+ // check fields size must same as sinkColumnList size
+ if (fieldNotContainPartitionKey.size() !=
textFileSinkConfig.getSinkColumnList().size()) {
+ throw new RuntimeException("sink columns size must same as hive
table field size");
+ }
- @Override
- public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
- return this.seaTunnelRowTypeInfo;
+ // check hivePartitionFieldList size must same as partitionFieldList
size
+ if (partitionKeys.size() !=
textFileSinkConfig.getPartitionFieldList().size()) {
+ throw new RuntimeException("partition by columns size must same as
hive table partition columns size");
+ }
+ hiveMetaStoreProxy.close();
}
@Override
public void prepare(Config pluginConfig) throws PrepareFailException {
- this.config = pluginConfig;
- this.checkpointId = 1L;
- }
-
- @Override
- public SinkWriter<SeaTunnelRow, HiveCommitInfo, HiveSinkState>
createWriter(SinkWriter.Context context) throws IOException {
- if (!jobContext.getJobMode().equals(JobMode.BATCH) &&
hiveSinkConfig.getTextFileSinkConfig().getSaveMode().equals(SaveMode.OVERWRITE))
{
- throw new RuntimeException("only batch job can overwrite hive
table");
+ CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig,
HiveConfig.METASTORE_URI, HiveConfig.TABLE_NAME);
+ if (!result.isSuccess()) {
+ throw new PrepareFailException(getPluginName(), PluginType.SINK,
result.getMsg());
}
-
- if
(!this.getSinkConfig().getTextFileSinkConfig().isEnableTransaction()) {
- throw new RuntimeException("Hive Sink Connector only support
transaction now");
+ Pair<String[], Table> tableInfo =
HiveConfig.getTableInfo(pluginConfig);
+ dbName = tableInfo.getLeft()[0];
+ tableName = tableInfo.getLeft()[1];
+ tableInformation = tableInfo.getRight();
+ String outputFormat = tableInformation.getSd().getOutputFormat();
+ if (TEXT_OUTPUT_FORMAT_CLASSNAME.equals(outputFormat)) {
+ Map<String, String> parameters =
tableInformation.getSd().getSerdeInfo().getParameters();
+ pluginConfig = pluginConfig.withValue(FILE_FORMAT,
ConfigValueFactory.fromAnyRef(FileFormat.TEXT.toString()))
+ .withValue(FIELD_DELIMITER,
ConfigValueFactory.fromAnyRef(parameters.get("field.delim")))
+ .withValue(ROW_DELIMITER,
ConfigValueFactory.fromAnyRef(parameters.get("line.delim")));
+ } else if (PARQUET_OUTPUT_FORMAT_CLASSNAME.equals(outputFormat)) {
+ pluginConfig = pluginConfig.withValue(FILE_FORMAT,
ConfigValueFactory.fromAnyRef(FileFormat.PARQUET.toString()));
+ } else if (ORC_OUTPUT_FORMAT_CLASSNAME.equals(outputFormat)) {
+ pluginConfig = pluginConfig.withValue(FILE_FORMAT,
ConfigValueFactory.fromAnyRef(FileFormat.ORC.toString()));
+ } else {
+ throw new RuntimeException("Only support [text parquet orc] file
now");
}
- return new HiveSinkWriter(seaTunnelRowTypeInfo,
- config,
- context,
- getSinkConfig(),
- jobId);
- }
+ pluginConfig =
pluginConfig.withValue(IS_PARTITION_FIELD_WRITE_IN_FILE,
ConfigValueFactory.fromAnyRef(false))
+ .withValue(FILE_NAME_EXPRESSION,
ConfigValueFactory.fromAnyRef("${transactionId}"))
+ .withValue(PATH,
ConfigValueFactory.fromAnyRef(tableInformation.getSd().getLocation()));
- @Override
- public SinkWriter<SeaTunnelRow, HiveCommitInfo, HiveSinkState>
restoreWriter(SinkWriter.Context context, List<HiveSinkState> states) throws
IOException {
- return new HiveSinkWriter(seaTunnelRowTypeInfo, config, context,
hiveSinkConfig, jobId, states);
- }
-
- @Override
- public void setJobContext(JobContext jobContext) {
- this.jobContext = jobContext;
- this.jobId = jobContext.getJobId();
- }
-
- @Override
- public Optional<SinkAggregatedCommitter<HiveCommitInfo,
HiveAggregatedCommitInfo>> createAggregatedCommitter() throws IOException {
- return Optional.of(new HiveSinkAggregatedCommitter());
- }
-
- @Override
- public Optional<Serializer<HiveSinkState>> getWriterStateSerializer() {
- return Optional.of(new DefaultSerializer<>());
- }
-
- @Override
- public Optional<Serializer<HiveAggregatedCommitInfo>>
getAggregatedCommitInfoSerializer() {
- return Optional.of(new DefaultSerializer<>());
- }
-
- @Override
- public Optional<Serializer<HiveCommitInfo>> getCommitInfoSerializer() {
- return Optional.of(new DefaultSerializer<>());
- }
-
- private HiveSinkConfig getSinkConfig() {
- if (this.hiveSinkConfig == null && (this.seaTunnelRowTypeInfo != null
&& this.config != null)) {
- this.hiveSinkConfig = new HiveSinkConfig(config,
seaTunnelRowTypeInfo);
+ if (!pluginConfig.hasPath(SAVE_MODE) ||
StringUtils.isBlank(pluginConfig.getString(SAVE_MODE))) {
+ pluginConfig = pluginConfig.withValue(SAVE_MODE,
ConfigValueFactory.fromAnyRef(SaveMode.APPEND.toString()));
+ }
+ String hdfsLocation = tableInformation.getSd().getLocation();
+ try {
+ URI uri = new URI(hdfsLocation);
+ String path = uri.getPath();
+ pluginConfig = pluginConfig.withValue(PATH,
ConfigValueFactory.fromAnyRef(path));
+ hadoopConf = new HadoopConf(hdfsLocation.replace(path, ""));
+ } catch (URISyntaxException e) {
+ throw new RuntimeException("Get hdfs cluster address failed,
please check.", e);
}
- return this.hiveSinkConfig;
+ super.prepare(pluginConfig);
}
}
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkAggregatedCommitter.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkAggregatedCommitter.java
deleted file mode 100644
index 6532bad07..000000000
---
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkAggregatedCommitter.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.hive.sink;
-
-import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
-import org.apache.seatunnel.connectors.seatunnel.file.hdfs.sink.util.HdfsUtils;
-import
org.apache.seatunnel.connectors.seatunnel.file.sink.FileAggregatedCommitInfo;
-import org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveMetaStoreProxy;
-
-import org.apache.commons.collections4.CollectionUtils;
-import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
-import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.thrift.TException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-public class HiveSinkAggregatedCommitter implements
SinkAggregatedCommitter<HiveCommitInfo, HiveAggregatedCommitInfo> {
- private static final Logger LOGGER =
LoggerFactory.getLogger(HiveSinkAggregatedCommitter.class);
-
- @Override
- public List<HiveAggregatedCommitInfo>
commit(List<HiveAggregatedCommitInfo> aggregatedCommitInfoList) throws
IOException {
- LOGGER.info("=============================agg
commit=================================");
- if (CollectionUtils.isEmpty(aggregatedCommitInfoList)) {
- return null;
- }
- List<HiveAggregatedCommitInfo> errorAggregatedCommitInfoList = new
ArrayList<>();
- HiveMetaStoreProxy hiveMetaStoreProxy = new
HiveMetaStoreProxy(aggregatedCommitInfoList.get(0).getHiveMetastoreUris());
- HiveMetaStoreClient hiveMetaStoreClient =
hiveMetaStoreProxy.getHiveMetaStoreClient();
- try {
- aggregatedCommitInfoList.forEach(aggregateCommitInfo -> {
- try {
- for (Map.Entry<String, Map<String, String>> entry :
aggregateCommitInfo.getFileAggregatedCommitInfo().getTransactionMap().entrySet())
{
- // rollback the file
- for (Map.Entry<String, String> mvFileEntry :
entry.getValue().entrySet()) {
- HdfsUtils.renameFile(mvFileEntry.getKey(),
mvFileEntry.getValue(), true);
- }
- // delete the transaction dir
- HdfsUtils.deleteFile(entry.getKey());
- }
- // add hive partition
-
aggregateCommitInfo.getFileAggregatedCommitInfo().getPartitionDirAndValsMap().entrySet().forEach(entry
-> {
- Partition part = new Partition();
-
part.setDbName(aggregateCommitInfo.getTable().getDbName());
-
part.setTableName(aggregateCommitInfo.getTable().getTableName());
- part.setValues(entry.getValue());
- part.setParameters(new HashMap<>());
-
part.setSd(aggregateCommitInfo.getTable().getSd().deepCopy());
-
part.getSd().setSerdeInfo(aggregateCommitInfo.getTable().getSd().getSerdeInfo());
-
part.getSd().setLocation(aggregateCommitInfo.getTable().getSd().getLocation() +
"/" + entry.getKey());
- try {
- hiveMetaStoreClient.add_partition(part);
- } catch (TException e) {
- throw new RuntimeException(e);
- }
- });
- } catch (Exception e) {
- LOGGER.error("commit aggregateCommitInfo error ", e);
- errorAggregatedCommitInfoList.add(aggregateCommitInfo);
- }
- });
- } finally {
- hiveMetaStoreClient.close();
- }
-
- return errorAggregatedCommitInfoList;
- }
-
- @Override
- public HiveAggregatedCommitInfo combine(List<HiveCommitInfo> commitInfos) {
- if (CollectionUtils.isEmpty(commitInfos)) {
- return null;
- }
- Map<String, Map<String, String>> aggregateCommitInfo = new HashMap<>();
- Map<String, List<String>> partitionDirAndValsMap = new HashMap<>();
- commitInfos.forEach(commitInfo -> {
- Map<String, String> needMoveFileMap =
aggregateCommitInfo.computeIfAbsent(commitInfo.getFileCommitInfo().getTransactionDir(),
k -> new HashMap<>());
-
needMoveFileMap.putAll(commitInfo.getFileCommitInfo().getNeedMoveFiles());
- Set<Map.Entry<String, List<String>>> entries =
commitInfo.getFileCommitInfo().getPartitionDirAndValsMap().entrySet();
- if (!CollectionUtils.isEmpty(entries)) {
-
partitionDirAndValsMap.putAll(commitInfo.getFileCommitInfo().getPartitionDirAndValsMap());
- }
- });
- return new HiveAggregatedCommitInfo(
- new FileAggregatedCommitInfo(aggregateCommitInfo,
partitionDirAndValsMap),
- commitInfos.get(0).getHiveMetastoreUris(),
- commitInfos.get(0).getTable());
- }
-
- @Override
- public void abort(List<HiveAggregatedCommitInfo> aggregatedCommitInfoList)
throws Exception {
- if (aggregatedCommitInfoList == null ||
aggregatedCommitInfoList.size() == 0) {
- return;
- }
- aggregatedCommitInfoList.stream().forEach(aggregateCommitInfo -> {
- try {
- for (Map.Entry<String, Map<String, String>> entry :
aggregateCommitInfo.getFileAggregatedCommitInfo().getTransactionMap().entrySet())
{
- // rollback the file
- for (Map.Entry<String, String> mvFileEntry :
entry.getValue().entrySet()) {
- if (HdfsUtils.fileExist(mvFileEntry.getValue()) &&
!HdfsUtils.fileExist(mvFileEntry.getKey())) {
- HdfsUtils.renameFile(mvFileEntry.getValue(),
mvFileEntry.getKey(), true);
- }
- }
- // delete the transaction dir
- HdfsUtils.deleteFile(entry.getKey());
-
- // The partitions that have been added will be preserved
and will not be deleted
- }
- } catch (IOException e) {
- LOGGER.error("abort aggregateCommitInfo error ", e);
- }
- });
- }
-
- @Override
- public void close() throws IOException {
- }
-}
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkConfig.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkConfig.java
deleted file mode 100644
index b05b8d2e4..000000000
---
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkConfig.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.hive.sink;
-
-import static
org.apache.seatunnel.connectors.seatunnel.file.config.Constant.FIELD_DELIMITER;
-import static
org.apache.seatunnel.connectors.seatunnel.file.config.Constant.FILE_FORMAT;
-import static
org.apache.seatunnel.connectors.seatunnel.file.config.Constant.FILE_NAME_EXPRESSION;
-import static
org.apache.seatunnel.connectors.seatunnel.file.config.Constant.IS_PARTITION_FIELD_WRITE_IN_FILE;
-import static
org.apache.seatunnel.connectors.seatunnel.file.config.Constant.PATH;
-import static
org.apache.seatunnel.connectors.seatunnel.file.config.Constant.ROW_DELIMITER;
-import static
org.apache.seatunnel.connectors.seatunnel.file.config.Constant.SAVE_MODE;
-import static
org.apache.seatunnel.connectors.seatunnel.hive.config.Constant.HIVE_METASTORE_URIS;
-import static
org.apache.seatunnel.connectors.seatunnel.hive.config.Constant.HIVE_RESULT_TABLE_NAME;
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.connectors.seatunnel.file.config.Constant;
-import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
-import org.apache.seatunnel.connectors.seatunnel.file.sink.config.SaveMode;
-import
org.apache.seatunnel.connectors.seatunnel.file.sink.config.TextFileSinkConfig;
-import org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveMetaStoreProxy;
-
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory;
-
-import lombok.Data;
-import lombok.NonNull;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.thrift.TException;
-
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-
-@Data
-public class HiveSinkConfig implements Serializable {
- private static final String TEXT_FORMAT_CLASSNAME =
"org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat";
- private static final String PARQUET_FORMAT_CLASSNAME =
"org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat";
- private static final String ORC_FORMAT_CLASSNAME =
"org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat";
- private String hiveTableName;
- private List<String> hivePartitionFieldList;
- private String hiveMetaUris;
-
- private String dbName;
-
- private String tableName;
-
- private Table table;
-
- private TextFileSinkConfig textFileSinkConfig;
-
- public HiveSinkConfig(@NonNull Config config, @NonNull SeaTunnelRowType
seaTunnelRowTypeInfo) {
-
checkArgument(!CollectionUtils.isEmpty(Arrays.asList(seaTunnelRowTypeInfo.getFieldNames())));
-
- if (config.hasPath(HIVE_RESULT_TABLE_NAME) &&
!StringUtils.isBlank(config.getString(HIVE_RESULT_TABLE_NAME))) {
- this.hiveTableName = config.getString(HIVE_RESULT_TABLE_NAME);
- }
- checkNotNull(hiveTableName);
-
- if (config.hasPath(HIVE_METASTORE_URIS) &&
!StringUtils.isBlank(config.getString(HIVE_METASTORE_URIS))) {
- this.hiveMetaUris = config.getString(HIVE_METASTORE_URIS);
- }
- checkNotNull(hiveMetaUris);
-
- String[] dbAndTableName = hiveTableName.split("\\.");
- if (dbAndTableName == null || dbAndTableName.length != 2) {
- throw new RuntimeException("Please config " +
HIVE_RESULT_TABLE_NAME + " as db.table format");
- }
- this.dbName = dbAndTableName[0];
- this.tableName = dbAndTableName[1];
- HiveMetaStoreProxy hiveMetaStoreProxy = new
HiveMetaStoreProxy(hiveMetaUris);
- HiveMetaStoreClient hiveMetaStoreClient =
hiveMetaStoreProxy.getHiveMetaStoreClient();
-
- try {
- table = hiveMetaStoreClient.getTable(dbName, tableName);
- String outputFormat = table.getSd().getOutputFormat();
- Map<String, String> parameters =
table.getSd().getSerdeInfo().getParameters();
- if (TEXT_FORMAT_CLASSNAME.equals(outputFormat)) {
- config = config.withValue(FILE_FORMAT,
ConfigValueFactory.fromAnyRef(FileFormat.TEXT.toString()))
- .withValue(FIELD_DELIMITER,
ConfigValueFactory.fromAnyRef(parameters.get("field.delim")))
- .withValue(ROW_DELIMITER,
ConfigValueFactory.fromAnyRef(parameters.get("line.delim")));
- } else if (PARQUET_FORMAT_CLASSNAME.equals(outputFormat)) {
- config = config.withValue(FILE_FORMAT,
ConfigValueFactory.fromAnyRef(FileFormat.PARQUET.toString()));
- } else if (ORC_FORMAT_CLASSNAME.equals(outputFormat)) {
- config = config.withValue(FILE_FORMAT,
ConfigValueFactory.fromAnyRef(FileFormat.ORC.toString()));
- } else {
- throw new RuntimeException("Only support [text parquet orc]
file now");
- }
-
- config = config.withValue(IS_PARTITION_FIELD_WRITE_IN_FILE,
ConfigValueFactory.fromAnyRef(false))
- .withValue(FILE_NAME_EXPRESSION,
ConfigValueFactory.fromAnyRef("${transactionId}"))
- .withValue(PATH,
ConfigValueFactory.fromAnyRef(table.getSd().getLocation()));
-
- if (!config.hasPath(SAVE_MODE) ||
StringUtils.isBlank(config.getString(Constant.SAVE_MODE))) {
- config = config.withValue(SAVE_MODE,
ConfigValueFactory.fromAnyRef(SaveMode.APPEND.toString()));
- }
-
- this.textFileSinkConfig = new TextFileSinkConfig(config,
seaTunnelRowTypeInfo);
-
- // --------------------Check textFileSinkConfig with the hive
table info-------------------
- List<FieldSchema> fields =
hiveMetaStoreClient.getFields(dbAndTableName[0], dbAndTableName[1]);
- List<FieldSchema> partitionKeys = table.getPartitionKeys();
-
- // Remove partitionKeys from table fields
- List<FieldSchema> fieldNotContainPartitionKey =
fields.stream().filter(filed ->
!partitionKeys.contains(filed)).collect(Collectors.toList());
-
- // check fields size must same as sinkColumnList size
- if (fieldNotContainPartitionKey.size() !=
textFileSinkConfig.getSinkColumnList().size()) {
- throw new RuntimeException("sink columns size must same as
hive table field size");
- }
-
- // check hivePartitionFieldList size must same as
partitionFieldList size
- if (partitionKeys.size() !=
textFileSinkConfig.getPartitionFieldList().size()) {
- throw new RuntimeException("partition by columns size must
same as hive table partition columns size");
- }
-
- // --------------------Check textFileSinkConfig with the hive
table info end----------------
- } catch (TException e) {
- throw new RuntimeException(e);
- } finally {
- hiveMetaStoreClient.close();
- }
-
- // hive only support append or overwrite
- if (!this.textFileSinkConfig.getSaveMode().equals(SaveMode.APPEND) &&
!this.textFileSinkConfig.getSaveMode().equals(SaveMode.OVERWRITE)) {
- throw new RuntimeException("hive only support append or overwrite
save mode");
- }
- }
-
- public TextFileSinkConfig getTextFileSinkConfig() {
- return textFileSinkConfig;
- }
-}
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkState.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkState.java
deleted file mode 100644
index a104151c3..000000000
---
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkState.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.hive.sink;
-
-import lombok.AllArgsConstructor;
-import lombok.Data;
-
-import java.io.Serializable;
-
-@Data
-@AllArgsConstructor
-public class HiveSinkState implements Serializable {
- private String transactionId;
- private Long checkpointId;
-}
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkWriter.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkWriter.java
deleted file mode 100644
index 69283a55a..000000000
---
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkWriter.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.hive.sink;
-
-import org.apache.seatunnel.api.sink.SinkWriter;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import
org.apache.seatunnel.connectors.seatunnel.file.hdfs.sink.HdfsFileSinkPlugin;
-import org.apache.seatunnel.connectors.seatunnel.file.sink.FileCommitInfo;
-import org.apache.seatunnel.connectors.seatunnel.file.sink.FileSinkState;
-import
org.apache.seatunnel.connectors.seatunnel.file.sink.spi.SinkFileSystemPlugin;
-import
org.apache.seatunnel.connectors.seatunnel.file.sink.transaction.TransactionStateFileWriter;
-import
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.FileSinkPartitionDirNameGenerator;
-import
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.FileSinkTransactionFileNameGenerator;
-
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import lombok.NonNull;
-import org.apache.commons.collections4.CollectionUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-import java.util.Optional;
-import java.util.stream.Collectors;
-
-public class HiveSinkWriter implements SinkWriter<SeaTunnelRow,
HiveCommitInfo, HiveSinkState> {
- private static final Logger LOGGER =
LoggerFactory.getLogger(HiveSinkWriter.class);
-
- private SeaTunnelRowType seaTunnelRowTypeInfo;
- private Config pluginConfig;
- private Context context;
- private String jobId;
-
- private TransactionStateFileWriter fileWriter;
-
- private HiveSinkConfig hiveSinkConfig;
-
- public HiveSinkWriter(@NonNull SeaTunnelRowType seaTunnelRowTypeInfo,
- @NonNull Config pluginConfig,
- @NonNull SinkWriter.Context context,
- @NonNull HiveSinkConfig hiveSinkConfig,
- @NonNull String jobId) {
- this.seaTunnelRowTypeInfo = seaTunnelRowTypeInfo;
- this.pluginConfig = pluginConfig;
- this.context = context;
- this.jobId = jobId;
- this.hiveSinkConfig = hiveSinkConfig;
- this.fileWriter = createFileWriter();
-
- fileWriter.beginTransaction(1L);
- }
-
- public HiveSinkWriter(@NonNull SeaTunnelRowType seaTunnelRowTypeInfo,
- @NonNull Config pluginConfig,
- @NonNull SinkWriter.Context context,
- @NonNull HiveSinkConfig hiveSinkConfig,
- @NonNull String jobId,
- @NonNull List<HiveSinkState> hiveSinkStates) {
- this.seaTunnelRowTypeInfo = seaTunnelRowTypeInfo;
- this.pluginConfig = pluginConfig;
- this.context = context;
- this.jobId = jobId;
- this.hiveSinkConfig = hiveSinkConfig;
- this.fileWriter = createFileWriter();
-
- // Rollback dirty transaction
- if (hiveSinkStates.size() > 0) {
- List<String> transactionAfter =
fileWriter.getTransactionAfter(hiveSinkStates.get(0).getTransactionId());
- fileWriter.abortTransactions(transactionAfter);
- }
- fileWriter.beginTransaction(hiveSinkStates.get(0).getCheckpointId() +
1);
- }
-
- @Override
- public void write(SeaTunnelRow element) throws IOException {
- fileWriter.write(element);
- }
-
- @Override
- public Optional<HiveCommitInfo> prepareCommit() throws IOException {
- Optional<FileCommitInfo> fileCommitInfoOptional =
fileWriter.prepareCommit();
- if (fileCommitInfoOptional.isPresent()) {
- FileCommitInfo fileCommitInfo = fileCommitInfoOptional.get();
- return Optional.of(new HiveCommitInfo(fileCommitInfo,
hiveSinkConfig.getHiveMetaUris(), this.hiveSinkConfig.getTable()));
- }
- return Optional.empty();
- }
-
- @Override
- public void close() throws IOException {
- fileWriter.finishAndCloseWriteFile();
- }
-
- @Override
- public List<HiveSinkState> snapshotState(long checkpointId) throws
IOException {
- List<FileSinkState> fileSinkStates =
fileWriter.snapshotState(checkpointId);
- if (!CollectionUtils.isEmpty(fileSinkStates)) {
- return fileSinkStates.stream().map(state ->
- new HiveSinkState(state.getTransactionId(),
state.getCheckpointId()))
- .collect(Collectors.toList());
- }
- return Collections.emptyList();
- }
-
- @Override
- public void abortPrepare() {
- fileWriter.abortTransaction();
- }
-
- private TransactionStateFileWriter createFileWriter() {
- SinkFileSystemPlugin sinkFileSystemPlugin = new HdfsFileSinkPlugin();
- Optional<TransactionStateFileWriter> transactionStateFileWriterOpt =
sinkFileSystemPlugin.getTransactionStateFileWriter(this.seaTunnelRowTypeInfo,
- getFilenameGenerator(),
- getPartitionDirNameGenerator(),
-
this.hiveSinkConfig.getTextFileSinkConfig().getSinkColumnsIndexInRow(),
- this.hiveSinkConfig.getTextFileSinkConfig().getTmpPath(),
- this.hiveSinkConfig.getTextFileSinkConfig().getPath(),
- this.jobId,
- this.context.getIndexOfSubtask(),
-
this.hiveSinkConfig.getTextFileSinkConfig().getFieldDelimiter(),
- this.hiveSinkConfig.getTextFileSinkConfig().getRowDelimiter(),
- sinkFileSystemPlugin.getFileSystem().get());
- if (!transactionStateFileWriterOpt.isPresent()) {
- throw new RuntimeException("A TransactionStateFileWriter is need");
- }
- return transactionStateFileWriterOpt.get();
- }
-
- private FileSinkTransactionFileNameGenerator getFilenameGenerator() {
- return new FileSinkTransactionFileNameGenerator(
- this.hiveSinkConfig.getTextFileSinkConfig().getFileFormat(),
-
this.hiveSinkConfig.getTextFileSinkConfig().getFileNameExpression(),
-
this.hiveSinkConfig.getTextFileSinkConfig().getFileNameTimeFormat());
- }
-
- private FileSinkPartitionDirNameGenerator getPartitionDirNameGenerator() {
- return new FileSinkPartitionDirNameGenerator(
-
this.hiveSinkConfig.getTextFileSinkConfig().getPartitionFieldList(),
-
this.hiveSinkConfig.getTextFileSinkConfig().getPartitionFieldsIndexInRow(),
-
this.hiveSinkConfig.getTextFileSinkConfig().getPartitionDirExpression());
- }
-}
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HadoopConf.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HadoopConf.java
deleted file mode 100644
index 31dddf7b4..000000000
---
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HadoopConf.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.hive.source;
-
-import lombok.Data;
-
-import java.io.Serializable;
-
-@Data
-public class HadoopConf implements Serializable {
-
- private String hdfsNameKey;
-
- private String fsHdfsImpl = "org.apache.hadoop.hdfs.DistributedFileSystem";
-
- public HadoopConf(String hdfsNameKey) {
- this.hdfsNameKey = hdfsNameKey;
- }
-}
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSource.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSource.java
index 1f1a8c3fb..92173892e 100644
---
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSource.java
+++
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSource.java
@@ -17,42 +17,34 @@
package org.apache.seatunnel.connectors.seatunnel.hive.source;
-import static
org.apache.seatunnel.connectors.seatunnel.hive.config.SourceConfig.FILE_PATH;
-import static org.apache.hadoop.fs.FileSystem.FS_DEFAULT_NAME_KEY;
+import static
org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig.ORC_INPUT_FORMAT_CLASSNAME;
+import static
org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig.PARQUET_INPUT_FORMAT_CLASSNAME;
+import static
org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig.TEXT_INPUT_FORMAT_CLASSNAME;
+import static
org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY;
import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
-import org.apache.seatunnel.api.source.SourceReader;
-import org.apache.seatunnel.api.source.SourceSplitEnumerator;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.PluginType;
-import org.apache.seatunnel.connectors.seatunnel.hive.config.SourceConfig;
-import
org.apache.seatunnel.connectors.seatunnel.hive.exception.HivePluginException;
-import
org.apache.seatunnel.connectors.seatunnel.hive.source.file.reader.format.ReadStrategy;
-import
org.apache.seatunnel.connectors.seatunnel.hive.source.file.reader.format.ReadStrategyFactory;
+import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfig;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
+import
org.apache.seatunnel.connectors.seatunnel.file.hdfs.source.HdfsFileSource;
+import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory;
import com.google.auto.service.AutoService;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hive.metastore.api.Table;
-import java.io.IOException;
-import java.util.List;
+import java.net.URI;
+import java.net.URISyntaxException;
@AutoService(SeaTunnelSource.class)
-public class HiveSource implements SeaTunnelSource<SeaTunnelRow,
HiveSourceSplit, HiveSourceState> {
-
- private SeaTunnelRowType typeInfo;
-
- private ReadStrategy readStrategy;
-
- private HadoopConf hadoopConf;
-
- private List<String> filesPath;
+public class HiveSource extends HdfsFileSource {
+ private Table tableInformation;
@Override
public String getPluginName() {
@@ -60,51 +52,33 @@ public class HiveSource implements
SeaTunnelSource<SeaTunnelRow, HiveSourceSplit
}
@Override
- public void prepare(Config pluginConfig) {
- CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig,
FILE_PATH, FS_DEFAULT_NAME_KEY);
+ public void prepare(Config pluginConfig) throws PrepareFailException {
+ CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig,
HiveConfig.METASTORE_URI, HiveConfig.TABLE_NAME);
if (!result.isSuccess()) {
throw new PrepareFailException(getPluginName(), PluginType.SOURCE,
result.getMsg());
}
- // use factory to generate readStrategy
- readStrategy =
ReadStrategyFactory.of(pluginConfig.getString(SourceConfig.FILE_TYPE));
- String path = pluginConfig.getString(FILE_PATH);
- hadoopConf = new
HadoopConf(pluginConfig.getString(FS_DEFAULT_NAME_KEY));
- try {
- filesPath = readStrategy.getFileNamesByPath(hadoopConf, path);
- } catch (IOException e) {
- throw new PrepareFailException(getPluginName(), PluginType.SOURCE,
"Check file path fail.");
+ Pair<String[], Table> tableInfo =
HiveConfig.getTableInfo(pluginConfig);
+ tableInformation = tableInfo.getRight();
+ String inputFormat = tableInformation.getSd().getInputFormat();
+ if (TEXT_INPUT_FORMAT_CLASSNAME.equals(inputFormat)) {
+ pluginConfig = pluginConfig.withValue(BaseSourceConfig.FILE_TYPE,
ConfigValueFactory.fromAnyRef(FileFormat.TEXT.toString()));
+ } else if (PARQUET_INPUT_FORMAT_CLASSNAME.equals(inputFormat)) {
+ pluginConfig = pluginConfig.withValue(BaseSourceConfig.FILE_TYPE,
ConfigValueFactory.fromAnyRef(FileFormat.PARQUET.toString()));
+ } else if (ORC_INPUT_FORMAT_CLASSNAME.equals(inputFormat)) {
+ pluginConfig = pluginConfig.withValue(BaseSourceConfig.FILE_TYPE,
ConfigValueFactory.fromAnyRef(FileFormat.ORC.toString()));
+ } else {
+ throw new RuntimeException("Only support [text parquet orc] file
now");
}
+ String hdfsLocation = tableInformation.getSd().getLocation();
try {
- // should read from config or read from hive metadata( wait catlog
done)
- this.typeInfo = readStrategy.getSeaTunnelRowTypeInfo(hadoopConf,
filesPath.get(0));
- } catch (HivePluginException e) {
- throw new PrepareFailException(getPluginName(), PluginType.SOURCE,
"Read hive file type error.", e);
+ URI uri = new URI(hdfsLocation);
+ String path = uri.getPath();
+ String defaultFs = hdfsLocation.replace(path, "");
+ pluginConfig = pluginConfig.withValue(BaseSourceConfig.FILE_PATH,
ConfigValueFactory.fromAnyRef(path))
+ .withValue(FS_DEFAULT_NAME_KEY,
ConfigValueFactory.fromAnyRef(defaultFs));
+ } catch (URISyntaxException e) {
+ throw new RuntimeException("Get hdfs cluster address failed,
please check.", e);
}
+ super.prepare(pluginConfig);
}
-
- @Override
- public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
- return this.typeInfo;
- }
-
- @Override
- public SourceReader<SeaTunnelRow, HiveSourceSplit>
createReader(SourceReader.Context readerContext) throws Exception {
- return new HiveSourceReader(this.readStrategy, this.hadoopConf,
readerContext);
- }
-
- @Override
- public Boundedness getBoundedness() {
- return Boundedness.BOUNDED;
- }
-
- @Override
- public SourceSplitEnumerator<HiveSourceSplit, HiveSourceState>
createEnumerator(SourceSplitEnumerator.Context<HiveSourceSplit>
enumeratorContext) throws Exception {
- return new HiveSourceSplitEnumerator(enumeratorContext, filesPath);
- }
-
- @Override
- public SourceSplitEnumerator<HiveSourceSplit, HiveSourceState>
restoreEnumerator(SourceSplitEnumerator.Context<HiveSourceSplit>
enumeratorContext, HiveSourceState checkpointState) throws Exception {
- return new HiveSourceSplitEnumerator(enumeratorContext, filesPath,
checkpointState);
- }
-
}
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSourceReader.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSourceReader.java
deleted file mode 100644
index 570f48e35..000000000
---
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSourceReader.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.hive.source;
-
-import org.apache.seatunnel.api.source.Collector;
-import org.apache.seatunnel.api.source.SourceReader;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import
org.apache.seatunnel.connectors.seatunnel.hive.source.file.reader.format.ReadStrategy;
-
-import lombok.extern.slf4j.Slf4j;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-@Slf4j
-public class HiveSourceReader implements SourceReader<SeaTunnelRow,
HiveSourceSplit> {
-
- private static final long THREAD_WAIT_TIME = 500L;
-
- private ReadStrategy readStrategy;
-
- private HadoopConf hadoopConf;
-
- private Set<HiveSourceSplit> sourceSplits;
-
- private final SourceReader.Context context;
-
- public HiveSourceReader(ReadStrategy readStrategy, HadoopConf hadoopConf,
SourceReader.Context context) {
- this.readStrategy = readStrategy;
- this.hadoopConf = hadoopConf;
- this.context = context;
- this.sourceSplits = new HashSet<>();
- }
-
- @Override
- public void open() {
- readStrategy.init(hadoopConf);
- }
-
- @Override
- public void close() {
-
- }
-
- @Override
- public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
- if (sourceSplits.isEmpty()) {
- Thread.sleep(THREAD_WAIT_TIME);
- return;
- }
- sourceSplits.forEach(source -> {
- try {
- readStrategy.read(source.splitId(), output);
- } catch (Exception e) {
- throw new RuntimeException("Hive source read error", e);
- }
-
- });
- context.signalNoMoreElement();
- }
-
- @Override
- public List<HiveSourceSplit> snapshotState(long checkpointId) {
- return new ArrayList<>(sourceSplits);
- }
-
- @Override
- public void addSplits(List<HiveSourceSplit> splits) {
- sourceSplits.addAll(splits);
- }
-
- @Override
- public void handleNoMoreSplits() {
-
- }
-
- @Override
- public void notifyCheckpointComplete(long checkpointId) {
-
- }
-}
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSourceSplit.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSourceSplit.java
deleted file mode 100644
index 44e062e54..000000000
---
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSourceSplit.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.hive.source;
-
-import org.apache.seatunnel.api.source.SourceSplit;
-
-public class HiveSourceSplit implements SourceSplit {
-
- private static final long serialVersionUID = -1L;
-
- private String splitId;
-
- public HiveSourceSplit(String splitId) {
- this.splitId = splitId;
- }
-
- @Override
- public String splitId() {
- return this.splitId;
- }
-}
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSourceSplitEnumerator.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSourceSplitEnumerator.java
deleted file mode 100644
index 301b1506f..000000000
---
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSourceSplitEnumerator.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.hive.source;
-
-import org.apache.seatunnel.api.source.SourceSplitEnumerator;
-import org.apache.seatunnel.common.config.Common;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-public class HiveSourceSplitEnumerator implements
SourceSplitEnumerator<HiveSourceSplit, HiveSourceState> {
-
- private final Context<HiveSourceSplit> context;
- private Set<HiveSourceSplit> pendingSplit;
- private Set<HiveSourceSplit> assignedSplit;
- private List<String> filePaths;
-
- public
HiveSourceSplitEnumerator(SourceSplitEnumerator.Context<HiveSourceSplit>
context, List<String> filePaths) {
- this.context = context;
- this.filePaths = filePaths;
- }
-
- public
HiveSourceSplitEnumerator(SourceSplitEnumerator.Context<HiveSourceSplit>
context, List<String> filePaths,
- HiveSourceState sourceState) {
- this(context, filePaths);
- this.assignedSplit = sourceState.getAssignedSplit();
- }
-
- @Override
- public void open() {
- this.assignedSplit = new HashSet<>();
- this.pendingSplit = new HashSet<>();
- }
-
- @Override
- public void run() {
- pendingSplit = getHiveFileSplit();
- assignSplit(context.registeredReaders());
- }
-
- private Set<HiveSourceSplit> getHiveFileSplit() {
- Set<HiveSourceSplit> hiveSourceSplits = new HashSet<>();
- filePaths.forEach(k -> hiveSourceSplits.add(new HiveSourceSplit(k)));
- return hiveSourceSplits;
-
- }
-
- @Override
- public void close() throws IOException {
-
- }
-
- @Override
- public void addSplitsBack(List<HiveSourceSplit> splits, int subtaskId) {
- if (!splits.isEmpty()) {
- pendingSplit.addAll(splits);
- assignSplit(Collections.singletonList(subtaskId));
- }
- }
-
- private void assignSplit(Collection<Integer> taskIDList) {
- Map<Integer, List<HiveSourceSplit>> readySplit = new
HashMap<>(Common.COLLECTION_SIZE);
- for (int taskID : taskIDList) {
- readySplit.computeIfAbsent(taskID, id -> new ArrayList<>());
- }
-
- pendingSplit.forEach(s -> readySplit.get(getSplitOwner(s.splitId(),
taskIDList.size()))
- .add(s));
- readySplit.forEach(context::assignSplit);
- assignedSplit.addAll(pendingSplit);
- pendingSplit.clear();
- }
-
- private static int getSplitOwner(String tp, int numReaders) {
- return tp.hashCode() % numReaders;
- }
-
- @Override
- public int currentUnassignedSplitSize() {
- return pendingSplit.size();
- }
-
- @Override
- public void registerReader(int subtaskId) {
- if (!pendingSplit.isEmpty()) {
- assignSplit(Collections.singletonList(subtaskId));
- }
- }
-
- @Override
- public HiveSourceState snapshotState(long checkpointId) {
- return new HiveSourceState(assignedSplit);
- }
-
- @Override
- public void notifyCheckpointComplete(long checkpointId) {
-
- }
-
- @Override
- public void handleSplitRequest(int subtaskId) {
-
- }
-}
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSourceState.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSourceState.java
deleted file mode 100644
index f982a71cb..000000000
---
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSourceState.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.hive.source;
-
-import java.io.Serializable;
-import java.util.Set;
-
-public class HiveSourceState implements Serializable {
-
-
- private Set<HiveSourceSplit> assignedSplit;
-
- public HiveSourceState(Set<HiveSourceSplit> assignedSplit) {
- this.assignedSplit = assignedSplit;
- }
-
- public Set<HiveSourceSplit> getAssignedSplit() {
- return assignedSplit;
- }
-}
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/file/reader/format/AbstractReadStrategy.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/file/reader/format/AbstractReadStrategy.java
deleted file mode 100644
index 2df0a21f7..000000000
---
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/file/reader/format/AbstractReadStrategy.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package
org.apache.seatunnel.connectors.seatunnel.hive.source.file.reader.format;
-
-import static org.apache.hadoop.fs.FileSystem.FS_DEFAULT_NAME_KEY;
-
-import
org.apache.seatunnel.connectors.seatunnel.hive.exception.HivePluginException;
-import org.apache.seatunnel.connectors.seatunnel.hive.source.HadoopConf;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-public abstract class AbstractReadStrategy implements ReadStrategy {
-
- HadoopConf hadoopConf;
-
- @Override
- public void init(HadoopConf conf) {
- this.hadoopConf = conf;
- }
-
- @Override
- public Configuration getConfiguration(HadoopConf hadoopConf) {
- Configuration configuration = new Configuration();
- configuration.set(FS_DEFAULT_NAME_KEY, hadoopConf.getHdfsNameKey());
- configuration.set("fs.hdfs.impl", hadoopConf.getFsHdfsImpl());
- return configuration;
- }
-
- Configuration getConfiguration() throws HivePluginException {
- if (null == hadoopConf) {
- throw new HivePluginException("Not init read config");
- }
- return getConfiguration(hadoopConf);
- }
-
- boolean checkFileType(String path) {
- return true;
- }
-
- @Override
- public List<String> getFileNamesByPath(HadoopConf hadoopConf, String path)
throws IOException {
- Configuration configuration = getConfiguration(hadoopConf);
- List<String> fileNames = new ArrayList<>();
- FileSystem hdfs = FileSystem.get(configuration);
- Path listFiles = new Path(path);
- FileStatus[] stats = hdfs.listStatus(listFiles);
- for (FileStatus fileStatus : stats) {
- if (fileStatus.isDirectory()) {
- fileNames.addAll(getFileNamesByPath(hadoopConf,
fileStatus.getPath().toString()));
- continue;
- }
- if (fileStatus.isFile()) {
- // filter '_SUCCESS' file
- if (!fileStatus.getPath().getName().equals("_SUCCESS")) {
- fileNames.add(fileStatus.getPath().toString());
- }
- }
- }
- return fileNames;
- }
-
-}
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/file/reader/format/OrcReadStrategy.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/file/reader/format/OrcReadStrategy.java
deleted file mode 100644
index 96ff1e983..000000000
---
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/file/reader/format/OrcReadStrategy.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package
org.apache.seatunnel.connectors.seatunnel.hive.source.file.reader.format;
-
-import org.apache.seatunnel.api.source.Collector;
-import org.apache.seatunnel.api.table.type.BasicType;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import
org.apache.seatunnel.connectors.seatunnel.hive.exception.HivePluginException;
-import org.apache.seatunnel.connectors.seatunnel.hive.source.HadoopConf;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.io.orc.OrcFile;
-import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
-import org.apache.hadoop.hive.ql.io.orc.OrcSerde;
-import org.apache.hadoop.hive.ql.io.orc.OrcStruct;
-import org.apache.hadoop.hive.serde2.objectinspector.StructField;
-import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.orc.Reader;
-import org.apache.orc.TypeDescription;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.Properties;
-
-@Slf4j
-public class OrcReadStrategy extends AbstractReadStrategy {
-
- private SeaTunnelRowType seaTunnelRowTypeInfo;
- private static final long MIN_SIZE = 16 * 1024;
-
- @Override
- public void read(String path, Collector<SeaTunnelRow> output) throws
Exception {
- if (Boolean.FALSE.equals(checkFileType(path))) {
- throw new Exception("please check file type");
- }
- JobConf conf = new JobConf();
- Path filePath = new Path(path);
- Properties p = new Properties();
- OrcSerde serde = new OrcSerde();
- String columns = String.join(",",
seaTunnelRowTypeInfo.getFieldNames());
- p.setProperty("columns", columns);
- //support types
- serde.initialize(conf, p);
- StructObjectInspector inspector = (StructObjectInspector)
serde.getObjectInspector();
- InputFormat<NullWritable, OrcStruct> in = new OrcInputFormat();
- FileInputFormat.setInputPaths(conf, filePath);
- InputSplit[] splits = in.getSplits(conf, 1);
-
- conf.set("hive.io.file.readcolumn.ids", "1");
- RecordReader<NullWritable, OrcStruct> reader =
in.getRecordReader(splits[0], conf, Reporter.NULL);
- NullWritable key = reader.createKey();
- OrcStruct value = reader.createValue();
- List<? extends StructField> fields = inspector.getAllStructFieldRefs();
- while (reader.next(key, value)) {
- Object[] datas = new Object[fields.size()];
- for (int i = 0; i < fields.size(); i++) {
- Object data = inspector.getStructFieldData(value,
fields.get(i));
- if (null != data) {
- datas[i] = String.valueOf(data);
- } else {
- datas[i] = null;
- }
- }
- output.collect(new SeaTunnelRow(datas));
- }
- reader.close();
- }
-
- @Override
- public SeaTunnelRowType getSeaTunnelRowTypeInfo(HadoopConf hadoopConf,
String path) throws HivePluginException {
-
- if (null != seaTunnelRowTypeInfo) {
- return seaTunnelRowTypeInfo;
- }
- Configuration configuration = getConfiguration(hadoopConf);
- Path dstDir = new Path(path);
- Reader reader;
- try {
- reader = OrcFile.createReader(FileSystem.get(configuration),
dstDir);
- } catch (IOException e) {
- throw new HivePluginException("Create OrcReader Fail", e);
- }
-
- TypeDescription schema = reader.getSchema();
- String[] fields = new String[schema.getFieldNames().size()];
- SeaTunnelDataType[] types = new
SeaTunnelDataType[schema.getFieldNames().size()];
-
- for (int i = 0; i < schema.getFieldNames().size(); i++) {
- fields[i] = schema.getFieldNames().get(i);
- types[i] = BasicType.STRING_TYPE;
- }
- seaTunnelRowTypeInfo = new SeaTunnelRowType(fields, types);
- return seaTunnelRowTypeInfo;
- }
-
- @SuppressWarnings("checkstyle:MagicNumber")
- @Override
- boolean checkFileType(String path) {
- try {
- boolean checkResult;
- Configuration configuration = getConfiguration();
- FileSystem fileSystem = FileSystem.get(configuration);
- Path filePath = new Path(path);
- FSDataInputStream in = fileSystem.open(filePath);
- // try to get Postscript in orc file
- long size = fileSystem.getFileStatus(filePath).getLen();
- int readSize = (int) Math.min(size, MIN_SIZE);
- in.seek(size - readSize);
- ByteBuffer buffer = ByteBuffer.allocate(readSize);
- in.readFully(buffer.array(), buffer.arrayOffset() +
buffer.position(), buffer.remaining());
- int psLen = buffer.get(readSize - 1) & 0xff;
- int len = OrcFile.MAGIC.length();
- if (psLen < len + 1) {
- in.close();
- return false;
- }
- int offset = buffer.arrayOffset() + buffer.position() +
buffer.limit() - 1 - len;
- byte[] array = buffer.array();
- if (Text.decode(array, offset, len).equals(OrcFile.MAGIC)) {
- checkResult = true;
- } else {
- // If it isn't there, this may be the 0.11.0 version of ORC.
- // Read the first 3 bytes of the file to check for the header
- in.seek(0);
- byte[] header = new byte[len];
- in.readFully(header, 0, len);
- // if it isn't there, this isn't an ORC file
- checkResult = Text.decode(header, 0,
len).equals(OrcFile.MAGIC);
- }
- in.close();
- return checkResult;
- } catch (HivePluginException | IOException e) {
- String errorMsg = String.format("Check orc file [%s] error", path);
- throw new RuntimeException(errorMsg, e);
- }
- }
-}
-
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/file/reader/format/ParquetReadStrategy.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/file/reader/format/ParquetReadStrategy.java
deleted file mode 100644
index 35d70e3ea..000000000
---
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/file/reader/format/ParquetReadStrategy.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package
org.apache.seatunnel.connectors.seatunnel.hive.source.file.reader.format;
-
-import org.apache.seatunnel.api.source.Collector;
-import org.apache.seatunnel.api.table.type.BasicType;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import
org.apache.seatunnel.connectors.seatunnel.hive.exception.HivePluginException;
-import org.apache.seatunnel.connectors.seatunnel.hive.source.HadoopConf;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.parquet.avro.AvroParquetReader;
-import org.apache.parquet.hadoop.ParquetFileReader;
-import org.apache.parquet.hadoop.ParquetReader;
-import org.apache.parquet.hadoop.metadata.FileMetaData;
-import org.apache.parquet.hadoop.metadata.ParquetMetadata;
-import org.apache.parquet.hadoop.util.HadoopInputFile;
-import org.apache.parquet.io.api.Binary;
-import org.apache.parquet.schema.MessageType;
-
-import java.io.IOException;
-import java.math.BigDecimal;
-import java.math.BigInteger;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.stream.Collectors;
-
-@Slf4j
-public class ParquetReadStrategy extends AbstractReadStrategy {
-
- private SeaTunnelRowType seaTunnelRowType;
-
- private static final byte[] PARQUET_MAGIC = new byte[]{(byte) 'P', (byte)
'A', (byte) 'R', (byte) '1'};
-
- @Override
- public void read(String path, Collector<SeaTunnelRow> output) throws
Exception {
- if (Boolean.FALSE.equals(checkFileType(path))) {
- throw new Exception("please check file type");
- }
- Path filePath = new Path(path);
- HadoopInputFile hadoopInputFile = HadoopInputFile.fromPath(filePath,
getConfiguration());
- int fieldsCount = seaTunnelRowType.getTotalFields();
- GenericRecord record;
- try (ParquetReader<GenericData.Record> reader =
AvroParquetReader.<GenericData.Record>builder(hadoopInputFile).build()) {
- while ((record = reader.read()) != null) {
- Object[] fields = new Object[fieldsCount];
- for (int i = 0; i < fieldsCount; i++) {
- Object data = record.get(i);
- try {
- if (data instanceof GenericData.Fixed) {
- // judge the data in upstream is or not decimal
type
- data = fixed2String((GenericData.Fixed) data);
- } else if (data instanceof ArrayList) {
- // judge the data in upstream is or not array type
- data =
array2String((ArrayList<GenericData.Record>) data);
- }
- } catch (Exception e) {
- data = record.get(i);
- } finally {
- fields[i] = data.toString();
- }
- }
- output.collect(new SeaTunnelRow(fields));
- }
- }
- }
-
- @Override
- public SeaTunnelRowType getSeaTunnelRowTypeInfo(HadoopConf hadoopConf,
String path) throws HivePluginException {
- if (seaTunnelRowType != null) {
- return seaTunnelRowType;
- }
- Configuration configuration = getConfiguration(hadoopConf);
- Path filePath = new Path(path);
- ParquetMetadata metadata;
- try {
- metadata = ParquetFileReader.readFooter(configuration, filePath);
- } catch (IOException e) {
- throw new HivePluginException("Create parquet reader failed", e);
- }
- FileMetaData fileMetaData = metadata.getFileMetaData();
- MessageType schema = fileMetaData.getSchema();
- int fieldCount = schema.getFieldCount();
- String[] fields = new String[fieldCount];
- SeaTunnelDataType[] types = new SeaTunnelDataType[fieldCount];
- for (int i = 0; i < fieldCount; i++) {
- fields[i] = schema.getFieldName(i);
- // Temporarily each field is treated as a string type
- // I think we can use the schema information to build seatunnel
column type
- types[i] = BasicType.STRING_TYPE;
- }
- seaTunnelRowType = new SeaTunnelRowType(fields, types);
- return seaTunnelRowType;
- }
-
- private String fixed2String(GenericData.Fixed fixed) {
- Schema schema = fixed.getSchema();
- byte[] bytes = fixed.bytes();
- int precision =
Integer.parseInt(schema.getObjectProps().get("precision").toString());
- int scale =
Integer.parseInt(schema.getObjectProps().get("scale").toString());
- BigDecimal bigDecimal = bytes2Decimal(bytes, precision, scale);
- return bigDecimal.toString();
- }
-
- @SuppressWarnings("checkstyle:MagicNumber")
- private BigDecimal bytes2Decimal(byte[] bytesArray, int precision, int
scale) {
- Binary value = Binary.fromConstantByteArray(bytesArray);
- if (precision <= 18) {
- ByteBuffer buffer = value.toByteBuffer();
- byte[] bytes = buffer.array();
- int start = buffer.arrayOffset() + buffer.position();
- int end = buffer.arrayOffset() + buffer.limit();
- long unscaled = 0L;
- int i = start;
- while (i < end) {
- unscaled = unscaled << 8 | bytes[i] & 0xff;
- i++;
- }
- int bits = 8 * (end - start);
- long unscaledNew = (unscaled << (64 - bits)) >> (64 - bits);
- if (unscaledNew <= -Math.pow(10, 18) || unscaledNew >=
Math.pow(10, 18)) {
- return new BigDecimal(unscaledNew);
- } else {
- return BigDecimal.valueOf(unscaledNew / Math.pow(10, scale));
- }
- } else {
- return new BigDecimal(new BigInteger(value.getBytes()), scale);
- }
- }
-
- @Override
- boolean checkFileType(String path) {
- boolean checkResult;
- byte[] magic = new byte[PARQUET_MAGIC.length];
- try {
- Configuration configuration = getConfiguration();
- FileSystem fileSystem = FileSystem.get(configuration);
- Path filePath = new Path(path);
- FSDataInputStream in = fileSystem.open(filePath);
- // try to get header information in a parquet file
- in.seek(0);
- in.readFully(magic);
- checkResult = Arrays.equals(magic, PARQUET_MAGIC);
- in.close();
- return checkResult;
- } catch (HivePluginException | IOException e) {
- String errorMsg = String.format("Check parquet file [%s] error",
path);
- throw new RuntimeException(errorMsg, e);
- }
- }
-
- private String array2String(ArrayList<GenericData.Record> data) throws
JsonProcessingException {
- ObjectMapper objectMapper = new ObjectMapper();
- List<String> values = data.stream().map(record ->
record.get(0).toString()).collect(Collectors.toList());
- return objectMapper.writeValueAsString(values);
- }
-}
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/file/reader/format/ReadStrategy.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/file/reader/format/ReadStrategy.java
deleted file mode 100644
index 5e7301914..000000000
---
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/file/reader/format/ReadStrategy.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package
org.apache.seatunnel.connectors.seatunnel.hive.source.file.reader.format;
-
-import org.apache.seatunnel.api.source.Collector;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import
org.apache.seatunnel.connectors.seatunnel.hive.exception.HivePluginException;
-import org.apache.seatunnel.connectors.seatunnel.hive.source.HadoopConf;
-
-import org.apache.hadoop.conf.Configuration;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.List;
-
-public interface ReadStrategy extends Serializable {
- void init(HadoopConf conf);
-
- Configuration getConfiguration(HadoopConf conf);
-
- void read(String path, Collector<SeaTunnelRow> output) throws Exception;
-
- SeaTunnelRowType getSeaTunnelRowTypeInfo(HadoopConf hadoopConf, String
path) throws HivePluginException;
-
- List<String> getFileNamesByPath(HadoopConf hadoopConf, String path) throws
IOException;
-}
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/file/reader/format/ReadStrategyFactory.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/file/reader/format/ReadStrategyFactory.java
deleted file mode 100644
index 56e88aa44..000000000
---
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/file/reader/format/ReadStrategyFactory.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package
org.apache.seatunnel.connectors.seatunnel.hive.source.file.reader.format;
-
-import
org.apache.seatunnel.connectors.seatunnel.hive.source.file.reader.type.FileTypeEnum;
-
-import lombok.extern.slf4j.Slf4j;
-
-@Slf4j
-public class ReadStrategyFactory {
-
- private ReadStrategyFactory() {}
-
- public static ReadStrategy of(String fileType) {
- try {
- FileTypeEnum fileTypeEnum =
FileTypeEnum.valueOf(fileType.toUpperCase());
- return fileTypeEnum.getReadStrategy();
- } catch (IllegalArgumentException e) {
- log.warn("Hive plugin not support this file type [{}], it will be
treated as a plain text file", fileType);
- return new TextReadStrategy();
- }
- }
-}
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/file/reader/format/TextReadStrategy.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/file/reader/format/TextReadStrategy.java
deleted file mode 100644
index 6b014d737..000000000
---
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/file/reader/format/TextReadStrategy.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package
org.apache.seatunnel.connectors.seatunnel.hive.source.file.reader.format;
-
-import org.apache.seatunnel.api.source.Collector;
-import org.apache.seatunnel.api.table.type.BasicType;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import
org.apache.seatunnel.connectors.seatunnel.hive.exception.HivePluginException;
-import org.apache.seatunnel.connectors.seatunnel.hive.source.HadoopConf;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.nio.charset.StandardCharsets;
-
-public class TextReadStrategy extends AbstractReadStrategy {
-
- private static final String TEXT_FIELD_NAME = "lines";
-
- @Override
- public void read(String path, Collector<SeaTunnelRow> output) throws
IOException, HivePluginException {
- Configuration conf = getConfiguration();
- FileSystem fs = FileSystem.get(conf);
- Path filePath = new Path(path);
- try (BufferedReader reader = new BufferedReader(new
InputStreamReader(fs.open(filePath), StandardCharsets.UTF_8))) {
- reader.lines().forEach(line -> output.collect(new SeaTunnelRow(new
String[]{line})));
- }
- }
-
- @Override
- public SeaTunnelRowType getSeaTunnelRowTypeInfo(HadoopConf hadoopConf,
String path) {
- return new SeaTunnelRowType(new String[]{TEXT_FIELD_NAME},
- new SeaTunnelDataType<?>[]{BasicType.STRING_TYPE});
- }
-}
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/file/reader/type/FileTypeEnum.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/file/reader/type/FileTypeEnum.java
deleted file mode 100644
index 3cf986b18..000000000
---
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/file/reader/type/FileTypeEnum.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.hive.source.file.reader.type;
-
-import
org.apache.seatunnel.connectors.seatunnel.hive.source.file.reader.format.OrcReadStrategy;
-import
org.apache.seatunnel.connectors.seatunnel.hive.source.file.reader.format.ParquetReadStrategy;
-import
org.apache.seatunnel.connectors.seatunnel.hive.source.file.reader.format.ReadStrategy;
-import
org.apache.seatunnel.connectors.seatunnel.hive.source.file.reader.format.TextReadStrategy;
-
-public enum FileTypeEnum {
- ORC {
- @Override
- public ReadStrategy getReadStrategy() {
- return new OrcReadStrategy();
- }
- },
- PARQUET {
- @Override
- public ReadStrategy getReadStrategy() {
- return new ParquetReadStrategy();
- }
- },
- TEXT {
- @Override
- public ReadStrategy getReadStrategy() {
- return new TextReadStrategy();
- }
- };
-
- public ReadStrategy getReadStrategy() {
- return null;
- }
-}
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java
index 30c9a2eba..d813a03a3 100644
---
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java
+++
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java
@@ -17,18 +17,26 @@
package org.apache.seatunnel.connectors.seatunnel.hive.utils;
+import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
import lombok.NonNull;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.thrift.TException;
-public class HiveMetaStoreProxy {
+import java.util.List;
+import java.util.Objects;
- private HiveMetaStoreClient hiveMetaStoreClient;
+public class HiveMetaStoreProxy {
+ private final HiveMetaStoreClient hiveMetaStoreClient;
+ private static volatile HiveMetaStoreProxy INSTANCE = null;
- public HiveMetaStoreProxy(@NonNull String uris) {
+ private HiveMetaStoreProxy(@NonNull String uris) {
HiveConf hiveConf = new HiveConf();
hiveConf.set("hive.metastore.uris", uris);
try {
@@ -38,15 +46,40 @@ public class HiveMetaStoreProxy {
}
}
+ public static HiveMetaStoreProxy getInstance(Config config) {
+ if (INSTANCE == null) {
+ synchronized (HiveMetaStoreProxy.class) {
+ if (INSTANCE == null) {
+ String metastoreUri =
config.getString(HiveConfig.METASTORE_URI);
+ INSTANCE = new HiveMetaStoreProxy(metastoreUri);
+ }
+ }
+ }
+ return INSTANCE;
+ }
+
public Table getTable(@NonNull String dbName, @NonNull String tableName) {
try {
return hiveMetaStoreClient.getTable(dbName, tableName);
} catch (TException e) {
- throw new RuntimeException(e);
+ String errorMsg = String.format("Get table [%s.%s] information
failed", dbName, tableName);
+ throw new RuntimeException(errorMsg, e);
}
}
- public HiveMetaStoreClient getHiveMetaStoreClient() {
- return hiveMetaStoreClient;
+ public List<FieldSchema> getTableFields(@NonNull String dbName, @NonNull
String tableName) {
+ try {
+ return hiveMetaStoreClient.getFields(dbName, tableName);
+ } catch (TException e) {
+ String errorMsg = String.format("Get table [%s.%s] fields
information failed", dbName, tableName);
+ throw new RuntimeException(errorMsg, e);
+ }
+ }
+
+ public synchronized void close() {
+ if (Objects.nonNull(hiveMetaStoreClient)) {
+ hiveMetaStoreClient.close();
+ HiveMetaStoreProxy.INSTANCE = null;
+ }
}
}
diff --git
a/seatunnel-connectors-v2/connector-hive/src/test/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkConfigTest.java
b/seatunnel-connectors-v2/connector-hive/src/test/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkConfigTest.java
deleted file mode 100644
index e822a49c6..000000000
---
a/seatunnel-connectors-v2/connector-hive/src/test/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkConfigTest.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.hive.sink;
-
-import org.apache.seatunnel.api.table.type.BasicType;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
-import org.apache.seatunnel.shade.com.typesafe.config.ConfigResolveOptions;
-
-import java.io.File;
-import java.util.List;
-
-public class HiveSinkConfigTest {
-
- /**
- * test hive sink config.
- * <p> TODO: Uncouple from the hive environment
- */
- public void testCreateHiveSinkConfig() {
- String[] fieldNames = new String[]{"name", "age"};
- SeaTunnelDataType[] seaTunnelDataTypes = new
SeaTunnelDataType[]{BasicType.STRING_TYPE, BasicType.INT_TYPE};
- SeaTunnelRowType seaTunnelRowTypeInfo = new
SeaTunnelRowType(fieldNames, seaTunnelDataTypes);
- String configFile = "fakesource_to_hive.conf";
- String configFilePath = System.getProperty("user.dir") +
"/src/test/resources/" + configFile;
- Config config = ConfigFactory
- .parseFile(new File(configFilePath))
- .resolve(ConfigResolveOptions.defaults().setAllowUnresolved(true))
- .resolveWith(ConfigFactory.systemProperties(),
- ConfigResolveOptions.defaults().setAllowUnresolved(true));
- List<? extends Config> sink = config.getConfigList("sink");
- HiveSinkConfig hiveSinkConfig = new HiveSinkConfig(sink.get(0),
seaTunnelRowTypeInfo);
- }
-}