This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 23ad4ee73 [Connector-V2] Add Hive sink connector v2 (#2158)
23ad4ee73 is described below
commit 23ad4ee735b68c097b2564269580c6c584044f62
Author: Eric <[email protected]>
AuthorDate: Sat Jul 16 10:12:22 2022 +0800
[Connector-V2] Add Hive sink connector v2 (#2158)
* tmp commit
* add hadoop2 and hadoop3 shade jar
* add hadoop2 and hadoop3 shade jar
* add license head
* change know denpendencies
* tmp commit
* tmp commit
* change hadoop dependency scope to provide
* back pom
* fix checkstyle
* add example
* fix example bug
* remove file connector from example and e2e because hadoop2 can not
compile with jdk11
* no need jdk8 and jdk11 profile because we don't use hadoop shade jar
* change hadoop jar dependency scope to provided
* back
* file connector can not build in jdk11
* drop hadoop shade
* add gitignore item
* add hadoop and local file sink
* fix pom error
* fix pom error
* fix pom error
* implement new interface
* fix UT error
* fix e2e error
* update build timeout from 30min to 40min
* fix e2e error
* remove auto service
* fix e2e error
* fix e2e error
* fix e2e error
* found e2e error
* fix e2e error
* fix e2e error
* fix e2e error
* merge from upstream
* merge from upstream
* merge from upstream
* merge from upstream
* merge from upstream
* add mvn jvm option
* add mvn jvm option
* add license
* add licnese
* add licnese
* fix dependency
* fix build jvm oom
* fix build jvm oom
* fix build jvm oom
* fix dependency
* fix dependency
* fix e2e error
* add codeql check timeout from 30min to 60min
* merge from dev
* merge from dev
* fix ci error
* fix checkstyle
* fix ci
* fix ci
* aa
* aa
* aa
* add .idea
* del .idea
* del .idea
* del .idea
* del .idea
* remove no use license
* remove no use before and after method in test
* fix license; remove dependency
* fix review
* fix build order
* fix license
* fix license
* fix review
* fix review
* fix review
* fix review
* fix review
* fix review
* fix review
* fix review
* fix review
* add code-analysys timeout to 120
* retry ci
* update license and remove no use jar from LICENSE file
* retry ci
* add hive sink
* add hive sink connector doc
* add hive sink connector doc
* fix checkstyle error.
* fix bug
* tmp
* fix hive shade error
* fix hive shade error
* fix commit bug
* optimaze doc
* optimaze doc
* optimize doc
* optimize code
---
docs/en/connector-v2/sink/File.mdx | 2 +
docs/en/connector-v2/sink/Hive.md | 62 ++++++++
docs/en/connector-v2/source/Hudi.md | 2 +
pom.xml | 4 +-
...TextFileConfig.java => BaseTextFileConfig.java} | 10 +-
.../seatunnel/file/sink/AbstractFileSink.java | 3 +-
.../file/sink/FileAggregatedCommitInfo.java | 3 +
.../seatunnel/file/sink/FileCommitInfo.java | 3 +
.../file/sink/FileSinkAggregatedCommitter.java | 9 +-
.../file/sink/FileSinkWriterWithTransaction.java | 160 +++++++++++++++++++++
.../file/sink/config/TextFileSinkConfig.java | 10 +-
.../file/sink/transaction/Transaction.java | 5 +-
.../writer/AbstractTransactionStateFileWriter.java | 14 +-
.../writer/FileSinkPartitionDirNameGenerator.java | 18 ++-
.../sink/writer/PartitionDirNameGenerator.java | 4 +-
.../TestFileSinkPartitionDirNameGenerator.java | 6 +-
.../sink/hdfs/FileSinkAggregatedCommitterTest.java | 36 +++--
.../TestHdfsTxtTransactionStateFileWriter.java | 10 ++
.../local/FileSinkAggregatedCommitterTest.java | 39 +++--
.../TestLocalTxtTransactionStateFileWriter.java | 10 ++
seatunnel-connectors-v2/connector-hive/pom.xml | 22 ++-
.../HiveSinkState.java => config/Constant.java} | 14 +-
.../hive/sink/HiveAggregatedCommitInfo.java | 14 +-
.../seatunnel/hive/sink/HiveCommitInfo.java | 16 ++-
.../connectors/seatunnel/hive/sink/HiveSink.java | 56 ++++++--
.../hive/sink/HiveSinkAggregatedCommitter.java | 93 +++++++++---
.../seatunnel/hive/sink/HiveSinkConfig.java | 148 ++++++++++++-------
.../seatunnel/hive/sink/HiveSinkState.java | 3 +-
.../seatunnel/hive/sink/HiveSinkWriter.java | 137 ++++++++++++++----
.../hive/sink/file/writer/AbstractFileWriter.java | 155 --------------------
.../hive/sink/file/writer/FileWriter.java | 49 -------
.../hive/sink/file/writer/HdfsTxtFileWriter.java | 151 -------------------
.../seatunnel/hive/sink/file/writer/HdfsUtils.java | 96 -------------
.../seatunnel/hive/utils/HiveMetaStoreProxy.java | 52 +++++++
.../seatunnel/hive/sink/TestHiveSinkConfig.java | 53 +++++++
.../src/test/resources/fakesource_to_hive.conf | 23 +--
seatunnel-connectors-v2/connector-hudi/pom.xml | 6 +-
seatunnel-dependency-shade/pom.xml | 39 -----
.../seatunnel-hive-shade/pom.xml | 69 ---------
.../seatunnel-flink-connector-v2-example/pom.xml | 11 ++
.../resources/examples/fakesource_to_file.conf | 6 +-
41 files changed, 851 insertions(+), 772 deletions(-)
diff --git a/docs/en/connector-v2/sink/File.mdx
b/docs/en/connector-v2/sink/File.mdx
index cf92d188b..7e2e3efd7 100644
--- a/docs/en/connector-v2/sink/File.mdx
+++ b/docs/en/connector-v2/sink/File.mdx
@@ -111,6 +111,8 @@ Streaming Job not support `overwrite`.
</TabItem>
<TabItem value="HdfsFile">
+In order to use this connector, You must ensure your spark/flink cluster
already integrated hadoop. The tested hadoop version is 2.x.
+
| name | type | required | default value
|
| --------------------------------- | ------ | -------- |
------------------------------------------------------------- |
| path | string | yes | -
|
diff --git a/docs/en/connector-v2/sink/Hive.md
b/docs/en/connector-v2/sink/Hive.md
new file mode 100644
index 000000000..1794633f1
--- /dev/null
+++ b/docs/en/connector-v2/sink/Hive.md
@@ -0,0 +1,62 @@
+# Hive
+
+## Description
+
+Write data to Hive.
+
+In order to use this connector, You must ensure your spark/flink cluster
already integrated hive. The tested hive version is 2.3.9.
+
+## Options
+
+| name | type | required | default value
|
+| --------------------------------- | ------ | -------- |
------------------------------------------------------------- |
+| hive_table_name | string | yes | -
|
+| hive_metastore_uris | string | yes | -
|
+| partition_by | array | no | -
|
+| sink_columns | array | no | When this parameter
is empty, all fields are sink columns |
+| is_enable_transaction | boolean| no | true
|
+| save_mode | string | no | "append"
|
+
+### hive_table_name [string]
+
+Target Hive table name eg: db1.table1
+
+### hive_metastore_uris [string]
+
+Hive metastore uris
+
+### partition_by [array]
+
+Partition data based on selected fields
+
+### sink_columns [array]
+
+Which columns need be write to hive, default value is all of the columns get
from `Transform` or `Source`.
+The order of the fields determines the order in which the file is actually
written.
+
+### is_enable_transaction [boolean]
+
+If `is_enable_transaction` is true, we will ensure that data will not be lost
or duplicated when it is written to the target directory.
+
+Only support `true` now.
+
+### save_mode [string]
+
+Storage mode, we need support `overwrite` and `append`. `append` is now
supported.
+
+Streaming Job not support `overwrite`.
+
+## Example
+
+```bash
+
+Hive {
+ hive_table_name="db1.table1"
+ hive_metastore_uris="thrift://localhost:9083"
+ partition_by=["age"]
+ sink_columns=["name","age"]
+ is_enable_transaction=true
+ save_mode="append"
+}
+
+```
diff --git a/docs/en/connector-v2/source/Hudi.md
b/docs/en/connector-v2/source/Hudi.md
index 4803b1f44..4851d61ef 100644
--- a/docs/en/connector-v2/source/Hudi.md
+++ b/docs/en/connector-v2/source/Hudi.md
@@ -4,6 +4,8 @@
Used to read data from Hudi. Currently, only supports hudi cow table and
Snapshot Query with Batch Mode.
+In order to use this connector, You must ensure your spark/flink cluster
already integrated hive. The tested hive version is 2.3.9.
+
## Options
| name | type | required | default value |
diff --git a/pom.xml b/pom.xml
index fe73780d2..17aa94ed4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -87,7 +87,7 @@
<module>seatunnel-plugin-discovery</module>
<module>seatunnel-formats</module>
<module>seatunnel-dist</module>
- <module>seatunnel-server</module>
+ <module>seatunnel-server</module>
</modules>
<profiles>
@@ -97,7 +97,6 @@
<activeByDefault>true</activeByDefault>
</activation>
<modules>
- <module>seatunnel-dependency-shade</module>
<module>seatunnel-connectors-v2</module>
<module>seatunnel-connectors-v2-dist</module>
<module>seatunnel-examples</module>
@@ -928,7 +927,6 @@
<artifactId>hibernate-validator</artifactId>
<version>${hibernate.validator.version}</version>
</dependency>
-
</dependencies>
</dependencyManagement>
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/AbstractTextFileConfig.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseTextFileConfig.java
similarity index 91%
rename from
seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/AbstractTextFileConfig.java
rename to
seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseTextFileConfig.java
index 32672066d..d6fd26d1b 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/AbstractTextFileConfig.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseTextFileConfig.java
@@ -29,7 +29,7 @@ import java.io.Serializable;
import java.util.Locale;
@Data
-public class AbstractTextFileConfig implements DelimiterConfig,
CompressConfig, Serializable {
+public class BaseTextFileConfig implements DelimiterConfig, CompressConfig,
Serializable {
private static final long serialVersionUID = 1L;
protected String compressCodec;
@@ -42,9 +42,7 @@ public class AbstractTextFileConfig implements
DelimiterConfig, CompressConfig,
protected String fileNameExpression;
protected FileFormat fileFormat = FileFormat.TEXT;
- public AbstractTextFileConfig(@NonNull Config config) {
- checkNotNull(config.getString(Constant.PATH));
-
+ public BaseTextFileConfig(@NonNull Config config) {
if (config.hasPath(Constant.COMPRESS_CODEC)) {
throw new RuntimeException("compress not support now");
}
@@ -60,6 +58,7 @@ public class AbstractTextFileConfig implements
DelimiterConfig, CompressConfig,
if (config.hasPath(Constant.PATH) &&
!StringUtils.isBlank(config.getString(Constant.PATH))) {
this.path = config.getString(Constant.PATH);
}
+ checkNotNull(path);
if (config.hasPath(Constant.FILE_NAME_EXPRESSION) &&
!StringUtils.isBlank(config.getString(Constant.FILE_NAME_EXPRESSION))) {
this.fileNameExpression =
config.getString(Constant.FILE_NAME_EXPRESSION);
@@ -70,6 +69,5 @@ public class AbstractTextFileConfig implements
DelimiterConfig, CompressConfig,
}
}
- protected AbstractTextFileConfig() {
- }
+ public BaseTextFileConfig() {}
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/AbstractFileSink.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/AbstractFileSink.java
index a296eea53..77b72f004 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/AbstractFileSink.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/AbstractFileSink.java
@@ -41,7 +41,6 @@ import java.util.Optional;
/**
* Hive Sink implementation by using SeaTunnel sink API.
- * This class contains the method to create {@link
TransactionStateFileSinkWriter} and {@link FileSinkAggregatedCommitter}.
*/
public abstract class AbstractFileSink implements SeaTunnelSink<SeaTunnelRow,
FileSinkState, FileCommitInfo, FileAggregatedCommitInfo> {
private Config config;
@@ -92,7 +91,7 @@ public abstract class AbstractFileSink implements
SeaTunnelSink<SeaTunnelRow, Fi
@Override
public SinkWriter<SeaTunnelRow, FileCommitInfo, FileSinkState>
restoreWriter(SinkWriter.Context context, List<FileSinkState> states) throws
IOException {
if (this.getSinkConfig().isEnableTransaction()) {
- return new TransactionStateFileSinkWriter(seaTunnelRowTypeInfo,
+ return new FileSinkWriterWithTransaction(seaTunnelRowTypeInfo,
config,
context,
textFileSinkConfig,
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/FileAggregatedCommitInfo.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/FileAggregatedCommitInfo.java
index 1036c3a59..c847ff659 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/FileAggregatedCommitInfo.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/FileAggregatedCommitInfo.java
@@ -21,6 +21,7 @@ import lombok.AllArgsConstructor;
import lombok.Data;
import java.io.Serializable;
+import java.util.List;
import java.util.Map;
@Data
@@ -33,4 +34,6 @@ public class FileAggregatedCommitInfo implements Serializable
{
* V is the target file path of the data file.
*/
private Map<String, Map<String, String>> transactionMap;
+
+ private Map<String, List<String>> partitionDirAndValsMap;
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/FileCommitInfo.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/FileCommitInfo.java
index 689b85ebf..0fcb04a03 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/FileCommitInfo.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/FileCommitInfo.java
@@ -21,6 +21,7 @@ import lombok.AllArgsConstructor;
import lombok.Data;
import java.io.Serializable;
+import java.util.List;
import java.util.Map;
@Data
@@ -34,5 +35,7 @@ public class FileCommitInfo implements Serializable {
*/
private Map<String, String> needMoveFiles;
+ private Map<String, List<String>> partitionDirAndValsMap;
+
private String transactionDir;
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/FileSinkAggregatedCommitter.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/FileSinkAggregatedCommitter.java
index cc8ff2404..3c7c8cf9c 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/FileSinkAggregatedCommitter.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/FileSinkAggregatedCommitter.java
@@ -21,6 +21,7 @@ import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
import
org.apache.seatunnel.connectors.seatunnel.file.sink.spi.FileSystemCommitter;
import lombok.NonNull;
+import org.apache.commons.collections4.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -29,6 +30,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
public class FileSinkAggregatedCommitter implements
SinkAggregatedCommitter<FileCommitInfo, FileAggregatedCommitInfo> {
private static final Logger LOGGER =
LoggerFactory.getLogger(FileSinkAggregatedCommitter.class);
@@ -63,6 +65,7 @@ public class FileSinkAggregatedCommitter implements
SinkAggregatedCommitter<File
return null;
}
Map<String, Map<String, String>> aggregateCommitInfo = new HashMap<>();
+ Map<String, List<String>> partitionDirAndValsMap = new HashMap<>();
commitInfos.stream().forEach(commitInfo -> {
Map<String, String> needMoveFileMap =
aggregateCommitInfo.get(commitInfo.getTransactionDir());
if (needMoveFileMap == null) {
@@ -70,8 +73,12 @@ public class FileSinkAggregatedCommitter implements
SinkAggregatedCommitter<File
aggregateCommitInfo.put(commitInfo.getTransactionDir(),
needMoveFileMap);
}
needMoveFileMap.putAll(commitInfo.getNeedMoveFiles());
+ Set<Map.Entry<String, List<String>>> entries =
commitInfo.getPartitionDirAndValsMap().entrySet();
+ if (!CollectionUtils.isEmpty(entries)) {
+
partitionDirAndValsMap.putAll(commitInfo.getPartitionDirAndValsMap());
+ }
});
- return new FileAggregatedCommitInfo(aggregateCommitInfo);
+ return new FileAggregatedCommitInfo(aggregateCommitInfo,
partitionDirAndValsMap);
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/FileSinkWriterWithTransaction.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/FileSinkWriterWithTransaction.java
new file mode 100644
index 000000000..83e51d1bc
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/FileSinkWriterWithTransaction.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.sink;
+
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import
org.apache.seatunnel.connectors.seatunnel.file.sink.config.TextFileSinkConfig;
+import
org.apache.seatunnel.connectors.seatunnel.file.sink.spi.SinkFileSystemPlugin;
+import
org.apache.seatunnel.connectors.seatunnel.file.sink.transaction.TransactionStateFileWriter;
+import
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.FileSinkPartitionDirNameGenerator;
+import
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.FileSinkTransactionFileNameGenerator;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import lombok.NonNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+
+public class FileSinkWriterWithTransaction implements SinkWriter<SeaTunnelRow,
FileCommitInfo, FileSinkState> {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(FileSinkWriterWithTransaction.class);
+
+ private SeaTunnelRowType seaTunnelRowTypeInfo;
+ private Config pluginConfig;
+ private Context context;
+ private String jobId;
+
+ private TransactionStateFileWriter fileWriter;
+
+ private TextFileSinkConfig textFileSinkConfig;
+
+ public FileSinkWriterWithTransaction(@NonNull SeaTunnelRowType
seaTunnelRowTypeInfo,
+ @NonNull Config pluginConfig,
+ @NonNull SinkWriter.Context context,
+ @NonNull TextFileSinkConfig
textFileSinkConfig,
+ @NonNull String jobId,
+ @NonNull SinkFileSystemPlugin
sinkFileSystemPlugin) {
+ this.seaTunnelRowTypeInfo = seaTunnelRowTypeInfo;
+ this.pluginConfig = pluginConfig;
+ this.context = context;
+ this.jobId = jobId;
+ this.textFileSinkConfig = textFileSinkConfig;
+
+ Optional<TransactionStateFileWriter> transactionStateFileWriter =
sinkFileSystemPlugin.getTransactionStateFileWriter(this.seaTunnelRowTypeInfo,
+ new FileSinkTransactionFileNameGenerator(
+ this.textFileSinkConfig.getFileFormat(),
+ this.textFileSinkConfig.getFileNameExpression(),
+ this.textFileSinkConfig.getFileNameTimeFormat()),
+ new FileSinkPartitionDirNameGenerator(
+ this.textFileSinkConfig.getPartitionFieldList(),
+ this.textFileSinkConfig.getPartitionFieldsIndexInRow(),
+ this.textFileSinkConfig.getPartitionDirExpression()),
+ this.textFileSinkConfig.getSinkColumnsIndexInRow(),
+ this.textFileSinkConfig.getTmpPath(),
+ this.textFileSinkConfig.getPath(),
+ this.jobId,
+ this.context.getIndexOfSubtask(),
+ this.textFileSinkConfig.getFieldDelimiter(),
+ this.textFileSinkConfig.getRowDelimiter(),
+ sinkFileSystemPlugin.getFileSystem().get());
+
+ if (!transactionStateFileWriter.isPresent()) {
+ throw new RuntimeException("A TransactionStateFileWriter is need");
+ }
+
+ this.fileWriter = transactionStateFileWriter.get();
+
+ fileWriter.beginTransaction(1L);
+ }
+
+ public FileSinkWriterWithTransaction(@NonNull SeaTunnelRowType
seaTunnelRowTypeInfo,
+ @NonNull Config pluginConfig,
+ @NonNull SinkWriter.Context context,
+ @NonNull TextFileSinkConfig
textFileSinkConfig,
+ @NonNull String jobId,
+ @NonNull List<FileSinkState>
fileSinkStates,
+ @NonNull SinkFileSystemPlugin
sinkFileSystemPlugin) {
+ this.seaTunnelRowTypeInfo = seaTunnelRowTypeInfo;
+ this.pluginConfig = pluginConfig;
+ this.context = context;
+ this.jobId = jobId;
+
+ Optional<TransactionStateFileWriter> transactionStateFileWriter =
sinkFileSystemPlugin.getTransactionStateFileWriter(this.seaTunnelRowTypeInfo,
+ new FileSinkTransactionFileNameGenerator(
+ this.textFileSinkConfig.getFileFormat(),
+ this.textFileSinkConfig.getFileNameExpression(),
+ this.textFileSinkConfig.getFileNameTimeFormat()),
+ new FileSinkPartitionDirNameGenerator(
+ this.textFileSinkConfig.getPartitionFieldList(),
+ this.textFileSinkConfig.getPartitionFieldsIndexInRow(),
+ this.textFileSinkConfig.getPartitionDirExpression()),
+ this.textFileSinkConfig.getSinkColumnsIndexInRow(),
+ this.textFileSinkConfig.getTmpPath(),
+ this.textFileSinkConfig.getPath(),
+ this.jobId,
+ this.context.getIndexOfSubtask(),
+ this.textFileSinkConfig.getFieldDelimiter(),
+ this.textFileSinkConfig.getRowDelimiter(),
+ sinkFileSystemPlugin.getFileSystem().get());
+
+ if (!transactionStateFileWriter.isPresent()) {
+ throw new RuntimeException("A TransactionStateFileWriter is need");
+ }
+
+ this.fileWriter = transactionStateFileWriter.get();
+
+ // Rollback dirty transaction
+ if (fileSinkStates.size() > 0) {
+ List<String> transactionAfter =
fileWriter.getTransactionAfter(fileSinkStates.get(0).getTransactionId());
+ fileWriter.abortTransactions(transactionAfter);
+ }
+ fileWriter.beginTransaction(fileSinkStates.get(0).getCheckpointId() +
1);
+ }
+
+ @Override
+ public void write(SeaTunnelRow element) throws IOException {
+ fileWriter.write(element);
+ }
+
+ @Override
+ public Optional<FileCommitInfo> prepareCommit() throws IOException {
+ return fileWriter.prepareCommit();
+ }
+
+ @Override
+ public void abortPrepare() {
+ fileWriter.abortTransaction();
+ }
+
+ @Override
+ public void close() throws IOException {
+ fileWriter.finishAndCloseWriteFile();
+ }
+
+ @Override
+ public List<FileSinkState> snapshotState(long checkpointId) throws
IOException {
+ List<FileSinkState> fileSinkStates =
fileWriter.snapshotState(checkpointId);
+ fileWriter.beginTransaction(checkpointId);
+ return fileSinkStates;
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/config/TextFileSinkConfig.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/config/TextFileSinkConfig.java
index 63b747bf2..ce94ff847 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/config/TextFileSinkConfig.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/config/TextFileSinkConfig.java
@@ -19,10 +19,12 @@ package
org.apache.seatunnel.connectors.seatunnel.file.sink.config;
import static com.google.common.base.Preconditions.checkArgument;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import
org.apache.seatunnel.connectors.seatunnel.file.config.AbstractTextFileConfig;
+import
org.apache.seatunnel.connectors.seatunnel.file.config.BaseTextFileConfig;
import org.apache.seatunnel.connectors.seatunnel.file.config.Constant;
import org.apache.seatunnel.connectors.seatunnel.file.config.PartitionConfig;
+import
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.FileSinkPartitionDirNameGenerator;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
@@ -38,14 +40,14 @@ import java.util.Map;
import java.util.stream.Collectors;
@Data
-public class TextFileSinkConfig extends AbstractTextFileConfig implements
PartitionConfig {
+public class TextFileSinkConfig extends BaseTextFileConfig implements
PartitionConfig {
private List<String> sinkColumnList;
private List<String> partitionFieldList;
/**
- * default is ${k1}=${v1}/${k2}=${v2}/...
+ * default is ${k0}=${v0}/${k1}=${v1}/... {@link
FileSinkPartitionDirNameGenerator#generatorPartitionDir(SeaTunnelRow)} ()}
*/
private String partitionDirExpression;
@@ -69,7 +71,7 @@ public class TextFileSinkConfig extends
AbstractTextFileConfig implements Partit
super(config);
checkArgument(!CollectionUtils.isEmpty(Arrays.asList(seaTunnelRowTypeInfo.getFieldNames())));
- if (config.hasPath(Constant.FILE_FORMAT) &&
!CollectionUtils.isEmpty(config.getStringList(Constant.SINK_COLUMNS))) {
+ if (config.hasPath(Constant.SINK_COLUMNS) &&
!CollectionUtils.isEmpty(config.getStringList(Constant.SINK_COLUMNS))) {
this.sinkColumnList = config.getStringList(Constant.SINK_COLUMNS);
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/transaction/Transaction.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/transaction/Transaction.java
index dd8d41bf6..d9a39c5df 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/transaction/Transaction.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/transaction/Transaction.java
@@ -22,7 +22,6 @@ import
org.apache.seatunnel.connectors.seatunnel.file.sink.AbstractFileSink;
import org.apache.seatunnel.connectors.seatunnel.file.sink.FileCommitInfo;
import
org.apache.seatunnel.connectors.seatunnel.file.sink.FileSinkAggregatedCommitter;
import org.apache.seatunnel.connectors.seatunnel.file.sink.FileSinkState;
-import
org.apache.seatunnel.connectors.seatunnel.file.sink.TransactionStateFileSinkWriter;
import lombok.NonNull;
@@ -40,7 +39,7 @@ public interface Transaction extends Serializable {
String beginTransaction(@NonNull Long checkpointId);
/**
- * Abort current Transaction, called when {@link
TransactionStateFileSinkWriter#prepareCommit()} or {@link
TransactionStateFileSinkWriter#snapshotState(long)} failed
+ * Abort current Transaction, called when {@link
org.apache.seatunnel.connectors.seatunnel.file.sink.TransactionStateFileSinkWriter#prepareCommit()}
or {@link
org.apache.seatunnel.connectors.seatunnel.file.sink.TransactionStateFileSinkWriter#snapshotState(long)}
failed
*/
void abortTransaction();
@@ -56,7 +55,7 @@ public interface Transaction extends Serializable {
List<String> getTransactionAfter(@NonNull String transactionId);
/**
- * Called by {@link TransactionStateFileSinkWriter#prepareCommit()}
+ * Called by {@link
org.apache.seatunnel.connectors.seatunnel.file.sink.TransactionStateFileSinkWriter#prepareCommit()}
* We should end the transaction in this method. After this method is
called, the transaction will no longer accept data writing
*
* @return Return the commit information that can be commit in {@link
FileSinkAggregatedCommitter#commit(List)}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractTransactionStateFileWriter.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractTransactionStateFileWriter.java
index 784d87046..b14827b56 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractTransactionStateFileWriter.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractTransactionStateFileWriter.java
@@ -66,6 +66,8 @@ public abstract class AbstractTransactionStateFileWriter
implements TransactionS
private FileSystem fileSystem;
+ private Map<String, List<String>> partitionDirAndValsMap;
+
public AbstractTransactionStateFileWriter(@NonNull SeaTunnelRowType
seaTunnelRowTypeInfo,
@NonNull
TransactionFileNameGenerator transactionFileNameGenerator,
@NonNull
PartitionDirNameGenerator partitionDirNameGenerator,
@@ -89,7 +91,8 @@ public abstract class AbstractTransactionStateFileWriter
implements TransactionS
}
public String getOrCreateFilePathBeingWritten(@NonNull SeaTunnelRow
seaTunnelRow) {
- String beingWrittenFileKey =
this.partitionDirNameGenerator.generatorPartitionDir(seaTunnelRow);
+ Map<String, List<String>> dataPartitionDirAndValsMap =
this.partitionDirNameGenerator.generatorPartitionDir(seaTunnelRow);
+ String beingWrittenFileKey =
dataPartitionDirAndValsMap.keySet().toArray()[0].toString();
// get filePath from beingWrittenFile
String beingWrittenFilePath =
beingWrittenFile.get(beingWrittenFileKey);
if (beingWrittenFilePath != null) {
@@ -99,6 +102,9 @@ public abstract class AbstractTransactionStateFileWriter
implements TransactionS
sbf.append("/").append(beingWrittenFileKey).append("/").append(transactionFileNameGenerator.generateFileName(this.transactionId));
String newBeingWrittenFilePath = sbf.toString();
beingWrittenFile.put(beingWrittenFileKey, newBeingWrittenFilePath);
+ if
(!Constant.NON_PARTITION.equals(dataPartitionDirAndValsMap.keySet().toArray()[0].toString())){
+ partitionDirAndValsMap.putAll(dataPartitionDirAndValsMap);
+ }
return newBeingWrittenFilePath;
}
}
@@ -114,6 +120,7 @@ public abstract class AbstractTransactionStateFileWriter
implements TransactionS
this.transactionId = "T" + Constant.TRANSACTION_ID_SPLIT + jobId +
Constant.TRANSACTION_ID_SPLIT + subTaskIndex + Constant.TRANSACTION_ID_SPLIT +
checkpointId;
this.transactionDir = getTransactionDir(this.transactionId);
this.needMoveFiles = new HashMap<>();
+ this.partitionDirAndValsMap = new HashMap<>();
this.beingWrittenFile = new HashMap<>();
this.beginTransaction(this.transactionId);
this.checkpointId = checkpointId;
@@ -164,7 +171,10 @@ public abstract class AbstractTransactionStateFileWriter
implements TransactionS
// this.needMoveFiles will be clear when beginTransaction, so we need
copy the needMoveFiles.
Map<String, String> commitMap = new HashMap<>();
commitMap.putAll(this.needMoveFiles);
- return Optional.of(new FileCommitInfo(commitMap, this.transactionDir));
+
+ Map<String, List<String>> copyMap =
this.partitionDirAndValsMap.entrySet().stream()
+ .collect(Collectors.toMap(e -> e.getKey(), e -> new
ArrayList<String>(e.getValue())));
+ return Optional.of(new FileCommitInfo(commitMap, copyMap,
this.transactionDir));
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/FileSinkPartitionDirNameGenerator.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/FileSinkPartitionDirNameGenerator.java
index aa8a3656b..a9175409f 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/FileSinkPartitionDirNameGenerator.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/FileSinkPartitionDirNameGenerator.java
@@ -25,6 +25,7 @@ import lombok.Data;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -59,11 +60,15 @@ public class FileSinkPartitionDirNameGenerator implements
PartitionDirNameGenera
}
@Override
- public String generatorPartitionDir(SeaTunnelRow seaTunnelRow) {
+ public Map<String, List<String>> generatorPartitionDir(SeaTunnelRow
seaTunnelRow) {
+ Map<String, List<String>> partitionDirAndValsMap = new HashMap<>(1);
if (CollectionUtils.isEmpty(this.partitionFieldsIndexInRow)) {
- return Constant.NON_PARTITION;
+ partitionDirAndValsMap.put(Constant.NON_PARTITION, null);
+ return partitionDirAndValsMap;
}
+ List<String> vals = new ArrayList<>(partitionFieldsIndexInRow.size());
+ String partitionDir;
if (StringUtils.isBlank(partitionDirExpression)) {
StringBuilder sbd = new StringBuilder();
for (int i = 0; i < partitionFieldsIndexInRow.size(); i++) {
@@ -71,15 +76,20 @@ public class FileSinkPartitionDirNameGenerator implements
PartitionDirNameGenera
.append("=")
.append(seaTunnelRow.getFields()[partitionFieldsIndexInRow.get(i)])
.append("/");
+
vals.add(seaTunnelRow.getFields()[partitionFieldsIndexInRow.get(i)].toString());
}
- return sbd.toString();
+ partitionDir = sbd.toString();
} else {
Map<String, String> valueMap = new
HashMap<>(partitionFieldList.size() * 2);
for (int i = 0; i < partitionFieldsIndexInRow.size(); i++) {
valueMap.put(keys[i], partitionFieldList.get(i));
valueMap.put(values[i],
seaTunnelRow.getFields()[partitionFieldsIndexInRow.get(i)].toString());
+
vals.add(seaTunnelRow.getFields()[partitionFieldsIndexInRow.get(i)].toString());
}
- return VariablesSubstitute.substitute(partitionDirExpression,
valueMap);
+ partitionDir =
VariablesSubstitute.substitute(partitionDirExpression, valueMap);
}
+
+ partitionDirAndValsMap.put(partitionDir, vals);
+ return partitionDirAndValsMap;
}
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/PartitionDirNameGenerator.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/PartitionDirNameGenerator.java
index 1145e847d..05c90256b 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/PartitionDirNameGenerator.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/PartitionDirNameGenerator.java
@@ -20,7 +20,9 @@ package
org.apache.seatunnel.connectors.seatunnel.file.sink.writer;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
public interface PartitionDirNameGenerator extends Serializable {
- String generatorPartitionDir(SeaTunnelRow seaTunnelRow);
+ Map<String, List<String>> generatorPartitionDir(SeaTunnelRow seaTunnelRow);
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/TestFileSinkPartitionDirNameGenerator.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/TestFileSinkPartitionDirNameGenerator.java
index 1989275f4..0867f104a 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/TestFileSinkPartitionDirNameGenerator.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/TestFileSinkPartitionDirNameGenerator.java
@@ -55,15 +55,15 @@ public class TestFileSinkPartitionDirNameGenerator {
partitionFieldsIndexInRow.add(3);
PartitionDirNameGenerator partitionDirNameGenerator = new
FileSinkPartitionDirNameGenerator(partitionFieldList,
partitionFieldsIndexInRow, "${v0}/${v1}");
- String partitionDir =
partitionDirNameGenerator.generatorPartitionDir(seaTunnelRow);
+ String partitionDir =
partitionDirNameGenerator.generatorPartitionDir(seaTunnelRow).keySet().toArray()[0].toString();
Assert.assertEquals("test/3", partitionDir);
partitionDirNameGenerator = new
FileSinkPartitionDirNameGenerator(partitionFieldList,
partitionFieldsIndexInRow, "${k0}=${v0}/${k1}=${v1}");
- partitionDir =
partitionDirNameGenerator.generatorPartitionDir(seaTunnelRow);
+ partitionDir =
partitionDirNameGenerator.generatorPartitionDir(seaTunnelRow).keySet().toArray()[0].toString();
Assert.assertEquals("c3=test/c4=3", partitionDir);
partitionDirNameGenerator = new
FileSinkPartitionDirNameGenerator(null, null, "${k0}=${v0}/${k1}=${v1}");
- partitionDir =
partitionDirNameGenerator.generatorPartitionDir(seaTunnelRow);
+ partitionDir =
partitionDirNameGenerator.generatorPartitionDir(seaTunnelRow).keySet().toArray()[0].toString();
Assert.assertEquals(Constant.NON_PARTITION, partitionDir);
}
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/sink/hdfs/FileSinkAggregatedCommitterTest.java
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/sink/hdfs/FileSinkAggregatedCommitterTest.java
index 79c54fcc9..7554cee09 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/sink/hdfs/FileSinkAggregatedCommitterTest.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/sink/hdfs/FileSinkAggregatedCommitterTest.java
@@ -25,12 +25,15 @@ import org.junit.Assert;
import org.junit.Test;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
+import java.util.stream.Collectors;
public class FileSinkAggregatedCommitterTest {
+ @SuppressWarnings("checkstyle:UnnecessaryParentheses")
@Test
public void testCommit() throws Exception {
FileSinkAggregatedCommitter fileSinkAggregatedCommitter = new
FileSinkAggregatedCommitter(new HdfsFileSystemCommitter());
@@ -46,7 +49,12 @@ public class FileSinkAggregatedCommitterTest {
HdfsUtils.createFile(transactionDir + "/c3=4/c4=bbb/test1.txt");
transactionFiles.put(transactionDir, needMoveFiles);
- FileAggregatedCommitInfo fileAggregatedCommitInfo = new
FileAggregatedCommitInfo(transactionFiles);
+
+ Map<String, List<String>> partitionDirAndVals = new HashMap<>();
+ partitionDirAndVals.put("/c3=4/c4=rrr", Arrays.stream((new
String[]{"4", "rrr"})).collect(Collectors.toList()));
+ partitionDirAndVals.put("/c3=4/c4=bbb", Arrays.stream((new
String[]{"4", "bbb"})).collect(Collectors.toList()));
+
+ FileAggregatedCommitInfo fileAggregatedCommitInfo = new
FileAggregatedCommitInfo(transactionFiles, partitionDirAndVals);
List<FileAggregatedCommitInfo> fileAggregatedCommitInfoList = new
ArrayList<>();
fileAggregatedCommitInfoList.add(fileAggregatedCommitInfo);
fileSinkAggregatedCommitter.commit(fileAggregatedCommitInfoList);
@@ -56,7 +64,7 @@ public class FileSinkAggregatedCommitterTest {
Assert.assertTrue(!HdfsUtils.fileExist(transactionDir));
}
- @SuppressWarnings("checkstyle:MagicNumber")
+ @SuppressWarnings({"checkstyle:MagicNumber",
"checkstyle:UnnecessaryParentheses"})
@Test
public void testCombine() throws Exception {
FileSinkAggregatedCommitter fileSinkAggregatedCommitter = new
FileSinkAggregatedCommitter(new HdfsFileSystemCommitter());
@@ -66,28 +74,37 @@ public class FileSinkAggregatedCommitterTest {
String transactionDir =
String.format("/tmp/seatunnel/seatunnel/%s/T_%s_0_1", jobId, jobId);
String targetDir = String.format("/tmp/hive/warehouse/%s", jobId);
Map<String, String> needMoveFiles = new HashMap<>();
- needMoveFiles.put(transactionDir + "/c3=4/c4=rrr/test1.txt", targetDir
+ "/c3=4/c4=rrr/test1.txt");
+ needMoveFiles.put(transactionDir + "/c3=3/c4=rrr/test1.txt", targetDir
+ "/c3=3/c4=rrr/test1.txt");
needMoveFiles.put(transactionDir + "/c3=4/c4=bbb/test1.txt", targetDir
+ "/c3=4/c4=bbb/test1.txt");
- HdfsUtils.createFile(transactionDir + "/c3=4/c4=rrr/test1.txt");
+ Map<String, List<String>> partitionDirAndVals = new HashMap<>();
+ partitionDirAndVals.put("/c3=3/c4=rrr", Arrays.stream((new
String[]{"3", "rrr"})).collect(Collectors.toList()));
+ partitionDirAndVals.put("/c3=4/c4=bbb", Arrays.stream((new
String[]{"4", "bbb"})).collect(Collectors.toList()));
+ FileCommitInfo fileCommitInfo = new FileCommitInfo(needMoveFiles,
partitionDirAndVals, transactionDir);
+ HdfsUtils.createFile(transactionDir + "/c3=3/c4=rrr/test1.txt");
HdfsUtils.createFile(transactionDir + "/c3=4/c4=bbb/test1.txt");
Map<String, String> needMoveFiles1 = new HashMap<>();
needMoveFiles1.put(transactionDir + "/c3=4/c4=rrr/test2.txt",
targetDir + "/c3=4/c4=rrr/test2.txt");
needMoveFiles1.put(transactionDir + "/c3=4/c4=bbb/test2.txt",
targetDir + "/c3=4/c4=bbb/test2.txt");
- FileCommitInfo fileCommitInfo = new FileCommitInfo(needMoveFiles,
transactionDir);
- FileCommitInfo fileCommitInfo1 = new FileCommitInfo(needMoveFiles1,
transactionDir);
+ Map<String, List<String>> partitionDirAndVals1 = new HashMap<>();
+ partitionDirAndVals.put("/c3=4/c4=rrr", Arrays.stream((new
String[]{"4", "rrr"})).collect(Collectors.toList()));
+ partitionDirAndVals.put("/c3=4/c4=bbb", Arrays.stream((new
String[]{"4", "bbb"})).collect(Collectors.toList()));
+ FileCommitInfo fileCommitInfo1 = new FileCommitInfo(needMoveFiles1,
partitionDirAndVals1, transactionDir);
List<FileCommitInfo> fileCommitInfoList = new ArrayList<>();
fileCommitInfoList.add(fileCommitInfo);
fileCommitInfoList.add(fileCommitInfo1);
+
FileAggregatedCommitInfo combine =
fileSinkAggregatedCommitter.combine(fileCommitInfoList);
Assert.assertEquals(1, combine.getTransactionMap().size());
Assert.assertEquals(4,
combine.getTransactionMap().get(transactionDir).size());
- Assert.assertEquals(targetDir + "/c3=4/c4=rrr/test1.txt",
combine.getTransactionMap().get(transactionDir).get(transactionDir +
"/c3=4/c4=rrr/test1.txt"));
+ Assert.assertEquals(targetDir + "/c3=3/c4=rrr/test1.txt",
combine.getTransactionMap().get(transactionDir).get(transactionDir +
"/c3=3/c4=rrr/test1.txt"));
Assert.assertEquals(targetDir + "/c3=4/c4=bbb/test1.txt",
combine.getTransactionMap().get(transactionDir).get(transactionDir +
"/c3=4/c4=bbb/test1.txt"));
Assert.assertEquals(targetDir + "/c3=4/c4=rrr/test2.txt",
combine.getTransactionMap().get(transactionDir).get(transactionDir +
"/c3=4/c4=rrr/test2.txt"));
Assert.assertEquals(targetDir + "/c3=4/c4=bbb/test2.txt",
combine.getTransactionMap().get(transactionDir).get(transactionDir +
"/c3=4/c4=bbb/test2.txt"));
+ Assert.assertEquals(3,
combine.getPartitionDirAndValsMap().keySet().size());
}
+ @SuppressWarnings("checkstyle:UnnecessaryParentheses")
@Test
public void testAbort() throws Exception {
FileSinkAggregatedCommitter fileSinkAggregatedCommitter = new
FileSinkAggregatedCommitter(new HdfsFileSystemCommitter());
@@ -99,11 +116,14 @@ public class FileSinkAggregatedCommitterTest {
Map<String, String> needMoveFiles = new HashMap<>();
needMoveFiles.put(transactionDir + "/c3=4/c4=rrr/test1.txt", targetDir
+ "/c3=4/c4=rrr/test1.txt");
needMoveFiles.put(transactionDir + "/c3=4/c4=bbb/test1.txt", targetDir
+ "/c3=4/c4=bbb/test1.txt");
+ Map<String, List<String>> partitionDirAndVals = new HashMap<>();
+ partitionDirAndVals.put("/c3=4/c4=rrr", Arrays.stream((new
String[]{"4", "rrr"})).collect(Collectors.toList()));
+ partitionDirAndVals.put("/c3=4/c4=bbb", Arrays.stream((new
String[]{"4", "bbb"})).collect(Collectors.toList()));
HdfsUtils.createFile(transactionDir + "/c3=4/c4=rrr/test1.txt");
HdfsUtils.createFile(transactionDir + "/c3=4/c4=bbb/test1.txt");
transactionFiles.put(transactionDir, needMoveFiles);
- FileAggregatedCommitInfo fileAggregatedCommitInfo = new
FileAggregatedCommitInfo(transactionFiles);
+ FileAggregatedCommitInfo fileAggregatedCommitInfo = new
FileAggregatedCommitInfo(transactionFiles, partitionDirAndVals);
List<FileAggregatedCommitInfo> fileAggregatedCommitInfoList = new
ArrayList<>();
fileAggregatedCommitInfoList.add(fileAggregatedCommitInfo);
fileSinkAggregatedCommitter.commit(fileAggregatedCommitInfoList);
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/sink/hdfs/TestHdfsTxtTransactionStateFileWriter.java
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/sink/hdfs/TestHdfsTxtTransactionStateFileWriter.java
index f3e1847f4..d8633e86d 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/sink/hdfs/TestHdfsTxtTransactionStateFileWriter.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/sink/hdfs/TestHdfsTxtTransactionStateFileWriter.java
@@ -93,5 +93,15 @@ public class TestHdfsTxtTransactionStateFileWriter {
Map<String, String> needMoveFiles = fileCommitInfo.getNeedMoveFiles();
Assert.assertEquals(targetPath + "/c3=str1/c4=str2/" + transactionId +
".txt", needMoveFiles.get(transactionDir + "/c3=str1/c4=str2/" + transactionId
+ ".txt"));
Assert.assertEquals(targetPath + "/c3=str1/c4=str3/" + transactionId +
".txt", needMoveFiles.get(transactionDir + "/c3=str1/c4=str3/" + transactionId
+ ".txt"));
+
+ Map<String, List<String>> partitionDirAndValsMap =
fileCommitInfo.getPartitionDirAndValsMap();
+ Assert.assertEquals(2, partitionDirAndValsMap.size());
+
Assert.assertTrue(partitionDirAndValsMap.keySet().contains("c3=str1/c4=str2"));
+
Assert.assertTrue(partitionDirAndValsMap.keySet().contains("c3=str1/c4=str3"));
+ Assert.assertTrue(partitionDirAndValsMap.get("c3=str1/c4=str2").size()
== 2);
+ Assert.assertEquals("str1",
partitionDirAndValsMap.get("c3=str1/c4=str2").get(0));
+ Assert.assertEquals("str2",
partitionDirAndValsMap.get("c3=str1/c4=str2").get(1));
+ Assert.assertEquals("str1",
partitionDirAndValsMap.get("c3=str1/c4=str3").get(0));
+ Assert.assertEquals("str3",
partitionDirAndValsMap.get("c3=str1/c4=str3").get(1));
}
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/sink/local/FileSinkAggregatedCommitterTest.java
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/sink/local/FileSinkAggregatedCommitterTest.java
index 89524aa5f..6b82041b1 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/sink/local/FileSinkAggregatedCommitterTest.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/sink/local/FileSinkAggregatedCommitterTest.java
@@ -25,13 +25,15 @@ import org.junit.Assert;
import org.junit.Test;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
+import java.util.stream.Collectors;
public class FileSinkAggregatedCommitterTest {
- @Test
+ @SuppressWarnings("checkstyle:UnnecessaryParentheses")
public void testCommit() throws Exception {
FileSinkAggregatedCommitter fileSinkAggregatedCommitter = new
FileSinkAggregatedCommitter(new LocalFileSystemCommitter());
Map<String, Map<String, String>> transactionFiles = new HashMap<>();
@@ -47,7 +49,12 @@ public class FileSinkAggregatedCommitterTest {
FileUtils.createFile(transactionDir + "/c3=4/c4=bbb/test1.txt");
transactionFiles.put(transactionDir, needMoveFiles);
- FileAggregatedCommitInfo fileAggregatedCommitInfo = new
FileAggregatedCommitInfo(transactionFiles);
+
+ Map<String, List<String>> partitionDirAndVals = new HashMap<>();
+ partitionDirAndVals.put("/c3=4/c4=rrr", Arrays.stream((new
String[]{"4", "rrr"})).collect(Collectors.toList()));
+ partitionDirAndVals.put("/c3=4/c4=bbb", Arrays.stream((new
String[]{"4", "bbb"})).collect(Collectors.toList()));
+
+ FileAggregatedCommitInfo fileAggregatedCommitInfo = new
FileAggregatedCommitInfo(transactionFiles, partitionDirAndVals);
List<FileAggregatedCommitInfo> fileAggregatedCommitInfoList = new
ArrayList<>();
fileAggregatedCommitInfoList.add(fileAggregatedCommitInfo);
fileSinkAggregatedCommitter.commit(fileAggregatedCommitInfoList);
@@ -57,7 +64,7 @@ public class FileSinkAggregatedCommitterTest {
Assert.assertTrue(!FileUtils.fileExist(transactionDir));
}
- @SuppressWarnings("checkstyle:MagicNumber")
+ @SuppressWarnings({"checkstyle:UnnecessaryParentheses",
"checkstyle:MagicNumber"})
@Test
public void testCombine() throws Exception {
FileSinkAggregatedCommitter fileSinkAggregatedCommitter = new
FileSinkAggregatedCommitter(new LocalFileSystemCommitter());
@@ -68,28 +75,37 @@ public class FileSinkAggregatedCommitterTest {
String transactionDir =
String.format("/tmp/seatunnel/seatunnel/%s/T_%s_0_1", jobId, jobId);
String targetDir = String.format("/tmp/hive/warehouse/%s", jobId);
Map<String, String> needMoveFiles = new HashMap<>();
- needMoveFiles.put(transactionDir + "/c3=4/c4=rrr/test1.txt", targetDir
+ "/c3=4/c4=rrr/test1.txt");
+ needMoveFiles.put(transactionDir + "/c3=3/c4=rrr/test1.txt", targetDir
+ "/c3=3/c4=rrr/test1.txt");
needMoveFiles.put(transactionDir + "/c3=4/c4=bbb/test1.txt", targetDir
+ "/c3=4/c4=bbb/test1.txt");
- FileUtils.createFile(transactionDir + "/c3=4/c4=rrr/test1.txt");
+ Map<String, List<String>> partitionDirAndVals = new HashMap<>();
+ partitionDirAndVals.put("/c3=3/c4=rrr", Arrays.stream((new
String[]{"3", "rrr"})).collect(Collectors.toList()));
+ partitionDirAndVals.put("/c3=4/c4=bbb", Arrays.stream((new
String[]{"4", "bbb"})).collect(Collectors.toList()));
+ FileCommitInfo fileCommitInfo = new FileCommitInfo(needMoveFiles,
partitionDirAndVals, transactionDir);
+ FileUtils.createFile(transactionDir + "/c3=3/c4=rrr/test1.txt");
FileUtils.createFile(transactionDir + "/c3=4/c4=bbb/test1.txt");
Map<String, String> needMoveFiles1 = new HashMap<>();
needMoveFiles1.put(transactionDir + "/c3=4/c4=rrr/test2.txt",
targetDir + "/c3=4/c4=rrr/test2.txt");
needMoveFiles1.put(transactionDir + "/c3=4/c4=bbb/test2.txt",
targetDir + "/c3=4/c4=bbb/test2.txt");
- FileCommitInfo fileCommitInfo = new FileCommitInfo(needMoveFiles,
transactionDir);
- FileCommitInfo fileCommitInfo1 = new FileCommitInfo(needMoveFiles1,
transactionDir);
+ Map<String, List<String>> partitionDirAndVals1 = new HashMap<>();
+ partitionDirAndVals.put("/c3=4/c4=rrr", Arrays.stream((new
String[]{"4", "rrr"})).collect(Collectors.toList()));
+ partitionDirAndVals.put("/c3=4/c4=bbb", Arrays.stream((new
String[]{"4", "bbb"})).collect(Collectors.toList()));
+ FileCommitInfo fileCommitInfo1 = new FileCommitInfo(needMoveFiles1,
partitionDirAndVals1, transactionDir);
List<FileCommitInfo> fileCommitInfoList = new ArrayList<>();
fileCommitInfoList.add(fileCommitInfo);
fileCommitInfoList.add(fileCommitInfo1);
+
FileAggregatedCommitInfo combine =
fileSinkAggregatedCommitter.combine(fileCommitInfoList);
Assert.assertEquals(1, combine.getTransactionMap().size());
Assert.assertEquals(4,
combine.getTransactionMap().get(transactionDir).size());
- Assert.assertEquals(targetDir + "/c3=4/c4=rrr/test1.txt",
combine.getTransactionMap().get(transactionDir).get(transactionDir +
"/c3=4/c4=rrr/test1.txt"));
+ Assert.assertEquals(targetDir + "/c3=3/c4=rrr/test1.txt",
combine.getTransactionMap().get(transactionDir).get(transactionDir +
"/c3=3/c4=rrr/test1.txt"));
Assert.assertEquals(targetDir + "/c3=4/c4=bbb/test1.txt",
combine.getTransactionMap().get(transactionDir).get(transactionDir +
"/c3=4/c4=bbb/test1.txt"));
Assert.assertEquals(targetDir + "/c3=4/c4=rrr/test2.txt",
combine.getTransactionMap().get(transactionDir).get(transactionDir +
"/c3=4/c4=rrr/test2.txt"));
Assert.assertEquals(targetDir + "/c3=4/c4=bbb/test2.txt",
combine.getTransactionMap().get(transactionDir).get(transactionDir +
"/c3=4/c4=bbb/test2.txt"));
+ Assert.assertEquals(3,
combine.getPartitionDirAndValsMap().keySet().size());
}
+ @SuppressWarnings("checkstyle:UnnecessaryParentheses")
@Test
public void testAbort() throws Exception {
FileSinkAggregatedCommitter fileSinkAggregatedCommitter = new
FileSinkAggregatedCommitter(new LocalFileSystemCommitter());
@@ -102,18 +118,21 @@ public class FileSinkAggregatedCommitterTest {
Map<String, String> needMoveFiles = new HashMap<>();
needMoveFiles.put(transactionDir + "/c3=4/c4=rrr/test1.txt", targetDir
+ "/c3=4/c4=rrr/test1.txt");
needMoveFiles.put(transactionDir + "/c3=4/c4=bbb/test1.txt", targetDir
+ "/c3=4/c4=bbb/test1.txt");
+ Map<String, List<String>> partitionDirAndVals = new HashMap<>();
+ partitionDirAndVals.put("/c3=4/c4=rrr", Arrays.stream((new
String[]{"4", "rrr"})).collect(Collectors.toList()));
+ partitionDirAndVals.put("/c3=4/c4=bbb", Arrays.stream((new
String[]{"4", "bbb"})).collect(Collectors.toList()));
FileUtils.createFile(transactionDir + "/c3=4/c4=rrr/test1.txt");
FileUtils.createFile(transactionDir + "/c3=4/c4=bbb/test1.txt");
transactionFiles.put(transactionDir, needMoveFiles);
- FileAggregatedCommitInfo fileAggregatedCommitInfo = new
FileAggregatedCommitInfo(transactionFiles);
+ FileAggregatedCommitInfo fileAggregatedCommitInfo = new
FileAggregatedCommitInfo(transactionFiles, partitionDirAndVals);
List<FileAggregatedCommitInfo> fileAggregatedCommitInfoList = new
ArrayList<>();
fileAggregatedCommitInfoList.add(fileAggregatedCommitInfo);
fileSinkAggregatedCommitter.commit(fileAggregatedCommitInfoList);
Assert.assertTrue(FileUtils.fileExist(targetDir +
"/c3=4/c4=bbb/test1.txt"));
Assert.assertTrue(FileUtils.fileExist(targetDir +
"/c3=4/c4=rrr/test1.txt"));
- Assert.assertFalse(FileUtils.fileExist(transactionDir));
+ Assert.assertTrue(!FileUtils.fileExist(transactionDir));
fileSinkAggregatedCommitter.abort(fileAggregatedCommitInfoList);
Assert.assertTrue(!FileUtils.fileExist(targetDir +
"/c3=4/c4=bbb/test1.txt"));
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/sink/local/TestLocalTxtTransactionStateFileWriter.java
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/sink/local/TestLocalTxtTransactionStateFileWriter.java
index 007ea39ef..d739ac41c 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/sink/local/TestLocalTxtTransactionStateFileWriter.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/sink/local/TestLocalTxtTransactionStateFileWriter.java
@@ -93,5 +93,15 @@ public class TestLocalTxtTransactionStateFileWriter {
Map<String, String> needMoveFiles = fileCommitInfo.getNeedMoveFiles();
Assert.assertEquals(targetPath + "/c3=str1/c4=str2/" + transactionId +
".txt", needMoveFiles.get(transactionDir + "/c3=str1/c4=str2/" + transactionId
+ ".txt"));
Assert.assertEquals(targetPath + "/c3=str1/c4=str3/" + transactionId +
".txt", needMoveFiles.get(transactionDir + "/c3=str1/c4=str3/" + transactionId
+ ".txt"));
+
+ Map<String, List<String>> partitionDirAndValsMap =
fileCommitInfo.getPartitionDirAndValsMap();
+ Assert.assertEquals(2, partitionDirAndValsMap.size());
+
Assert.assertTrue(partitionDirAndValsMap.keySet().contains("c3=str1/c4=str2"));
+
Assert.assertTrue(partitionDirAndValsMap.keySet().contains("c3=str1/c4=str3"));
+ Assert.assertTrue(partitionDirAndValsMap.get("c3=str1/c4=str2").size()
== 2);
+ Assert.assertEquals("str1",
partitionDirAndValsMap.get("c3=str1/c4=str2").get(0));
+ Assert.assertEquals("str2",
partitionDirAndValsMap.get("c3=str1/c4=str2").get(1));
+ Assert.assertEquals("str1",
partitionDirAndValsMap.get("c3=str1/c4=str3").get(0));
+ Assert.assertEquals("str3",
partitionDirAndValsMap.get("c3=str1/c4=str3").get(1));
}
}
diff --git a/seatunnel-connectors-v2/connector-hive/pom.xml
b/seatunnel-connectors-v2/connector-hive/pom.xml
index 62a2eed1f..342a01792 100644
--- a/seatunnel-connectors-v2/connector-hive/pom.xml
+++ b/seatunnel-connectors-v2/connector-hive/pom.xml
@@ -31,9 +31,9 @@
<dependencies>
<dependency>
- <groupId>org.apache.seatunnel</groupId>
- <artifactId>seatunnel-hive-shade</artifactId>
- <version>${project.version}</version>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-exec</artifactId>
+ <scope>provided</scope>
</dependency>
<dependency>
@@ -80,9 +80,25 @@
<artifactId>commons-collections4</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-file-hadoop</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-module-junit4</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-api-mockito2</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkState.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/Constant.java
similarity index 75%
copy from
seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkState.java
copy to
seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/Constant.java
index 4f9f5d12e..dd0b2ab54 100644
---
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkState.java
+++
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/Constant.java
@@ -15,15 +15,9 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connectors.seatunnel.hive.sink;
+package org.apache.seatunnel.connectors.seatunnel.hive.config;
-import lombok.AllArgsConstructor;
-import lombok.Data;
-
-import java.io.Serializable;
-
-@Data
-@AllArgsConstructor
-public class HiveSinkState implements Serializable {
- private HiveSinkConfig hiveSinkConfig;
+public class Constant {
+ public static final String HIVE_RESULT_TABLE_NAME = "hive_table_name";
+ public static final String HIVE_METASTORE_URIS = "hive_metastore_uris";
}
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveAggregatedCommitInfo.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveAggregatedCommitInfo.java
index 025fbefbf..6259389c4 100644
---
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveAggregatedCommitInfo.java
+++
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveAggregatedCommitInfo.java
@@ -17,20 +17,18 @@
package org.apache.seatunnel.connectors.seatunnel.hive.sink;
+import
org.apache.seatunnel.connectors.seatunnel.file.sink.FileAggregatedCommitInfo;
+
import lombok.AllArgsConstructor;
import lombok.Data;
+import org.apache.hadoop.hive.metastore.api.Table;
import java.io.Serializable;
-import java.util.Map;
@Data
@AllArgsConstructor
public class HiveAggregatedCommitInfo implements Serializable {
-
- /**
- * Storage the commit info in map.
- * K is the file path need to be moved to hive data dir.
- * V is the target file path of the data file.
- */
- private Map<String, String> needMoveFiles;
+ private FileAggregatedCommitInfo fileAggregatedCommitInfo;
+ private String hiveMetastoreUris;
+ private Table table;
}
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveCommitInfo.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveCommitInfo.java
index 0dd58f8f4..002beea32 100644
---
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveCommitInfo.java
+++
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveCommitInfo.java
@@ -17,20 +17,22 @@
package org.apache.seatunnel.connectors.seatunnel.hive.sink;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.FileCommitInfo;
+
import lombok.AllArgsConstructor;
import lombok.Data;
+import org.apache.hadoop.hive.metastore.api.Table;
import java.io.Serializable;
-import java.util.Map;
@Data
@AllArgsConstructor
public class HiveCommitInfo implements Serializable {
- /**
- * Storage the commit info in map.
- * K is the file path need to be moved to hive data dir.
- * V is the target file path of the data file.
- */
- private Map<String, String> needMoveFiles;
+ private FileCommitInfo fileCommitInfo;
+
+ private String hiveMetastoreUris;
+
+ private Table table;
+
}
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
index f1ba12edd..4df91b1a5 100644
---
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
+++
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.hive.sink;
import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelContext;
import org.apache.seatunnel.api.serialization.DefaultSerializer;
import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
@@ -26,6 +27,8 @@ import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.constants.JobMode;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.config.SaveMode;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
@@ -43,8 +46,11 @@ import java.util.Optional;
public class HiveSink implements SeaTunnelSink<SeaTunnelRow, HiveSinkState,
HiveCommitInfo, HiveAggregatedCommitInfo> {
private Config config;
- private long jobId;
- private SeaTunnelRowType seaTunnelRowType;
+ private String jobId;
+ private Long checkpointId;
+ private SeaTunnelRowType seaTunnelRowTypeInfo;
+ private SeaTunnelContext seaTunnelContext;
+ private HiveSinkConfig hiveSinkConfig;
@Override
public String getPluginName() {
@@ -52,34 +58,47 @@ public class HiveSink implements
SeaTunnelSink<SeaTunnelRow, HiveSinkState, Hive
}
@Override
- public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
- this.seaTunnelRowType = seaTunnelRowType;
+ public void setTypeInfo(SeaTunnelRowType seaTunnelRowTypeInfo) {
+ this.seaTunnelRowTypeInfo = seaTunnelRowTypeInfo;
+ this.hiveSinkConfig = new HiveSinkConfig(config, seaTunnelRowTypeInfo);
}
@Override
public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
- return this.seaTunnelRowType;
+ return this.seaTunnelRowTypeInfo;
}
@Override
public void prepare(Config pluginConfig) throws PrepareFailException {
this.config = pluginConfig;
- this.jobId = System.currentTimeMillis();
+ this.checkpointId = 1L;
}
@Override
public SinkWriter<SeaTunnelRow, HiveCommitInfo, HiveSinkState>
createWriter(SinkWriter.Context context) throws IOException {
- return new HiveSinkWriter(seaTunnelRowType, config, context,
System.currentTimeMillis());
+ if (!seaTunnelContext.getJobMode().equals(JobMode.BATCH) &&
hiveSinkConfig.getTextFileSinkConfig().getSaveMode().equals(SaveMode.OVERWRITE))
{
+ throw new RuntimeException("only batch job can overwrite hive
table");
+ }
+
+ if
(!this.getSinkConfig().getTextFileSinkConfig().isEnableTransaction()) {
+ throw new RuntimeException("Hive Sink Connector only support
transaction now");
+ }
+ return new HiveSinkWriter(seaTunnelRowTypeInfo,
+ config,
+ context,
+ getSinkConfig(),
+ jobId);
}
@Override
public SinkWriter<SeaTunnelRow, HiveCommitInfo, HiveSinkState>
restoreWriter(SinkWriter.Context context, List<HiveSinkState> states) throws
IOException {
- return new HiveSinkWriter(seaTunnelRowType, config, context,
System.currentTimeMillis());
+ return new HiveSinkWriter(seaTunnelRowTypeInfo, config, context,
hiveSinkConfig, jobId, states);
}
@Override
- public Optional<Serializer<HiveCommitInfo>> getCommitInfoSerializer() {
- return Optional.of(new DefaultSerializer<>());
+ public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
+ this.seaTunnelContext = seaTunnelContext;
+ this.jobId = seaTunnelContext.getJobId();
}
@Override
@@ -87,8 +106,25 @@ public class HiveSink implements
SeaTunnelSink<SeaTunnelRow, HiveSinkState, Hive
return Optional.of(new HiveSinkAggregatedCommitter());
}
+ @Override
+ public Optional<Serializer<HiveSinkState>> getWriterStateSerializer() {
+ return Optional.of(new DefaultSerializer<>());
+ }
+
@Override
public Optional<Serializer<HiveAggregatedCommitInfo>>
getAggregatedCommitInfoSerializer() {
+ return Optional.of(new DefaultSerializer<HiveAggregatedCommitInfo>());
+ }
+
+ @Override
+ public Optional<Serializer<HiveCommitInfo>> getCommitInfoSerializer() {
return Optional.of(new DefaultSerializer<>());
}
+
+ private HiveSinkConfig getSinkConfig() {
+ if (this.hiveSinkConfig == null && (this.seaTunnelRowTypeInfo != null
&& this.config != null)) {
+ this.hiveSinkConfig = new HiveSinkConfig(config,
seaTunnelRowTypeInfo);
+ }
+ return this.hiveSinkConfig;
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkAggregatedCommitter.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkAggregatedCommitter.java
index 673923a51..3a0448179 100644
---
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkAggregatedCommitter.java
+++
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkAggregatedCommitter.java
@@ -18,8 +18,14 @@
package org.apache.seatunnel.connectors.seatunnel.hive.sink;
import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
-import
org.apache.seatunnel.connectors.seatunnel.hive.sink.file.writer.HdfsUtils;
+import
org.apache.seatunnel.connectors.seatunnel.file.sink.FileAggregatedCommitInfo;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.hdfs.HdfsUtils;
+import org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveMetaStoreProxy;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -28,41 +34,82 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
public class HiveSinkAggregatedCommitter implements
SinkAggregatedCommitter<HiveCommitInfo, HiveAggregatedCommitInfo> {
private static final Logger LOGGER =
LoggerFactory.getLogger(HiveSinkAggregatedCommitter.class);
@Override
public List<HiveAggregatedCommitInfo>
commit(List<HiveAggregatedCommitInfo> aggregatedCommitInfoList) throws
IOException {
- if (aggregatedCommitInfoList == null ||
aggregatedCommitInfoList.size() == 0) {
+ LOGGER.info("=============================agg
commit=================================");
+ if (CollectionUtils.isEmpty(aggregatedCommitInfoList)) {
return null;
}
List errorAggregatedCommitInfoList = new ArrayList();
- aggregatedCommitInfoList.stream().forEach(aggregateCommitInfo -> {
- try {
- Map<String, String> needMoveFiles =
aggregateCommitInfo.getNeedMoveFiles();
- for (Map.Entry<String, String> entry :
needMoveFiles.entrySet()) {
- HdfsUtils.renameFile(entry.getKey(), entry.getValue(),
true);
+ HiveMetaStoreProxy hiveMetaStoreProxy = new
HiveMetaStoreProxy(aggregatedCommitInfoList.get(0).getHiveMetastoreUris());
+ HiveMetaStoreClient hiveMetaStoreClient =
hiveMetaStoreProxy.getHiveMetaStoreClient();
+ try {
+ aggregatedCommitInfoList.stream().forEach(aggregateCommitInfo -> {
+ try {
+ for (Map.Entry<String, Map<String, String>> entry :
aggregateCommitInfo.getFileAggregatedCommitInfo().getTransactionMap().entrySet())
{
+ // rollback the file
+ for (Map.Entry<String, String> mvFileEntry :
entry.getValue().entrySet()) {
+ HdfsUtils.renameFile(mvFileEntry.getKey(),
mvFileEntry.getValue(), true);
+ }
+ // delete the transaction dir
+ HdfsUtils.deleteFile(entry.getKey());
+ }
+ // add hive partition
+
aggregateCommitInfo.getFileAggregatedCommitInfo().getPartitionDirAndValsMap().entrySet().forEach(entry
-> {
+ Partition part = new Partition();
+
part.setDbName(aggregateCommitInfo.getTable().getDbName());
+
part.setTableName(aggregateCommitInfo.getTable().getTableName());
+ part.setValues(entry.getValue());
+ part.setParameters(new HashMap<>());
+
part.setSd(aggregateCommitInfo.getTable().getSd().deepCopy());
+
part.getSd().setSerdeInfo(aggregateCommitInfo.getTable().getSd().getSerdeInfo());
+
part.getSd().setLocation(aggregateCommitInfo.getTable().getSd().getLocation() +
"/" + entry.getKey());
+ try {
+ hiveMetaStoreClient.add_partition(part);
+ } catch (TException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ } catch (Exception e) {
+ LOGGER.error("commit aggregateCommitInfo error ", e);
+ errorAggregatedCommitInfoList.add(aggregateCommitInfo);
}
- } catch (IOException e) {
- LOGGER.error("commit aggregateCommitInfo error ", e);
- errorAggregatedCommitInfoList.add(aggregateCommitInfo);
- }
- });
+ });
+ } finally {
+ hiveMetaStoreClient.close();
+ }
return errorAggregatedCommitInfoList;
}
@Override
public HiveAggregatedCommitInfo combine(List<HiveCommitInfo> commitInfos) {
- if (commitInfos == null || commitInfos.size() == 0) {
+ if (CollectionUtils.isEmpty(commitInfos)) {
return null;
}
- Map<String, String> aggregateCommitInfo = new HashMap<>();
+ Map<String, Map<String, String>> aggregateCommitInfo = new HashMap<>();
+ Map<String, List<String>> partitionDirAndValsMap = new HashMap<>();
commitInfos.stream().forEach(commitInfo -> {
- aggregateCommitInfo.putAll(commitInfo.getNeedMoveFiles());
+ Map<String, String> needMoveFileMap =
aggregateCommitInfo.get(commitInfo.getFileCommitInfo().getTransactionDir());
+ if (needMoveFileMap == null) {
+ needMoveFileMap = new HashMap<>();
+
aggregateCommitInfo.put(commitInfo.getFileCommitInfo().getTransactionDir(),
needMoveFileMap);
+ }
+
needMoveFileMap.putAll(commitInfo.getFileCommitInfo().getNeedMoveFiles());
+ Set<Map.Entry<String, List<String>>> entries =
commitInfo.getFileCommitInfo().getPartitionDirAndValsMap().entrySet();
+ if (!CollectionUtils.isEmpty(entries)) {
+
partitionDirAndValsMap.putAll(commitInfo.getFileCommitInfo().getPartitionDirAndValsMap());
+ }
});
- return new HiveAggregatedCommitInfo(aggregateCommitInfo);
+ return new HiveAggregatedCommitInfo(
+ new FileAggregatedCommitInfo(aggregateCommitInfo,
partitionDirAndValsMap),
+ commitInfos.get(0).getHiveMetastoreUris(),
+ commitInfos.get(0).getTable());
}
@Override
@@ -72,9 +119,17 @@ public class HiveSinkAggregatedCommitter implements
SinkAggregatedCommitter<Hive
}
aggregatedCommitInfoList.stream().forEach(aggregateCommitInfo -> {
try {
- Map<String, String> needMoveFiles =
aggregateCommitInfo.getNeedMoveFiles();
- for (Map.Entry<String, String> entry :
needMoveFiles.entrySet()) {
- HdfsUtils.renameFile(entry.getValue(), entry.getKey(),
true);
+ for (Map.Entry<String, Map<String, String>> entry :
aggregateCommitInfo.getFileAggregatedCommitInfo().getTransactionMap().entrySet())
{
+ // rollback the file
+ for (Map.Entry<String, String> mvFileEntry :
entry.getValue().entrySet()) {
+ if (HdfsUtils.fileExist(mvFileEntry.getValue()) &&
!HdfsUtils.fileExist(mvFileEntry.getKey())) {
+ HdfsUtils.renameFile(mvFileEntry.getValue(),
mvFileEntry.getKey(), true);
+ }
+ }
+ // delete the transaction dir
+ HdfsUtils.deleteFile(entry.getKey());
+
+ // The partitions that have been added will be preserved
and will not be deleted
}
} catch (IOException e) {
LOGGER.error("abort aggregateCommitInfo error ", e);
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkConfig.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkConfig.java
index e4495121b..a37dd3a51 100644
---
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkConfig.java
+++
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkConfig.java
@@ -17,90 +17,132 @@
package org.apache.seatunnel.connectors.seatunnel.hive.sink;
+import static
org.apache.seatunnel.connectors.seatunnel.file.config.Constant.FIELD_DELIMITER;
+import static
org.apache.seatunnel.connectors.seatunnel.file.config.Constant.FILE_FORMAT;
+import static
org.apache.seatunnel.connectors.seatunnel.file.config.Constant.FILE_NAME_EXPRESSION;
+import static
org.apache.seatunnel.connectors.seatunnel.file.config.Constant.IS_PARTITION_FIELD_WRITE_IN_FILE;
+import static
org.apache.seatunnel.connectors.seatunnel.file.config.Constant.PATH;
+import static
org.apache.seatunnel.connectors.seatunnel.file.config.Constant.ROW_DELIMITER;
+import static
org.apache.seatunnel.connectors.seatunnel.file.config.Constant.SAVE_MODE;
+import static
org.apache.seatunnel.connectors.seatunnel.hive.config.Constant.HIVE_METASTORE_URIS;
+import static
org.apache.seatunnel.connectors.seatunnel.hive.config.Constant.HIVE_RESULT_TABLE_NAME;
+import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.file.config.Constant;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.config.SaveMode;
+import
org.apache.seatunnel.connectors.seatunnel.file.sink.config.TextFileSinkConfig;
+import org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveMetaStoreProxy;
+
import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory;
import lombok.Data;
import lombok.NonNull;
-import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.thrift.TException;
+import java.io.Serializable;
+import java.util.Arrays;
import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
@Data
-public class HiveSinkConfig {
-
- private static final String HIVE_SAVE_MODE = "save_mode";
-
- private static final String HIVE_SINK_COLUMNS = "sink_columns";
-
- private static final String HIVE_PARTITION_BY = "partition_by";
-
- private static final String HIVE_RESULT_TABLE_NAME = "result_table_name";
+public class HiveSinkConfig implements Serializable {
+ private String hiveTableName;
+ private List<String> hivePartitionFieldList;
+ private String hiveMetaUris;
- private static final String SINK_TMP_FS_ROOT_PATH =
"sink_tmp_fs_root_path";
+ private String dbName;
- private static final String HIVE_TABLE_FS_PATH = "hive_table_fs_path";
+ private String tableName;
- private static final String HIVE_TXT_FILE_FIELD_DELIMITER =
"hive_txt_file_field_delimiter";
+ private Table table;
- private static final String HIVE_TXT_FILE_LINE_DELIMITER =
"hive_txt_file_line_delimiter";
+ private TextFileSinkConfig textFileSinkConfig;
- private SaveMode saveMode = SaveMode.APPEND;
+ public HiveSinkConfig(@NonNull Config config, @NonNull SeaTunnelRowType
seaTunnelRowTypeInfo) {
+
checkArgument(!CollectionUtils.isEmpty(Arrays.asList(seaTunnelRowTypeInfo.getFieldNames())));
- private String sinkTmpFsRootPath = "/tmp/seatunnel";
+ if (config.hasPath(HIVE_RESULT_TABLE_NAME) &&
!StringUtils.isBlank(config.getString(HIVE_RESULT_TABLE_NAME))) {
+ this.hiveTableName = config.getString(HIVE_RESULT_TABLE_NAME);
+ }
+ checkNotNull(hiveTableName);
- private List<String> partitionFieldNames;
+ if (config.hasPath(HIVE_METASTORE_URIS) &&
!StringUtils.isBlank(config.getString(HIVE_METASTORE_URIS))) {
+ this.hiveMetaUris = config.getString(HIVE_METASTORE_URIS);
+ }
+ checkNotNull(hiveMetaUris);
- private String hiveTableName;
+ String[] dbAndTableName = hiveTableName.split("\\.");
+ if (dbAndTableName == null || dbAndTableName.length != 2) {
+ throw new RuntimeException("Please config " +
HIVE_RESULT_TABLE_NAME + " as db.table format");
+ }
+ this.dbName = dbAndTableName[0];
+ this.tableName = dbAndTableName[1];
+ HiveMetaStoreProxy hiveMetaStoreProxy = new
HiveMetaStoreProxy(hiveMetaUris);
+ HiveMetaStoreClient hiveMetaStoreClient =
hiveMetaStoreProxy.getHiveMetaStoreClient();
+
+ try {
+ table = hiveMetaStoreClient.getTable(dbName, tableName);
+ String inputFormat = table.getSd().getInputFormat();
+ if
("org.apache.hadoop.mapred.TextInputFormat".equals(inputFormat)) {
+ config = config.withValue(FILE_FORMAT,
ConfigValueFactory.fromAnyRef(FileFormat.TEXT.toString()));
+ } else {
+ throw new RuntimeException("Only support text file now");
+ }
- private List<String> sinkColumns;
+ Map<String, String> parameters =
table.getSd().getSerdeInfo().getParameters();
+ config = config.withValue(IS_PARTITION_FIELD_WRITE_IN_FILE,
ConfigValueFactory.fromAnyRef(false))
+ .withValue(FIELD_DELIMITER,
ConfigValueFactory.fromAnyRef(parameters.get("field.delim")))
+ .withValue(ROW_DELIMITER,
ConfigValueFactory.fromAnyRef(parameters.get("line.delim")))
+ .withValue(FILE_NAME_EXPRESSION,
ConfigValueFactory.fromAnyRef("${transactionId}"))
+ .withValue(PATH,
ConfigValueFactory.fromAnyRef(table.getSd().getLocation()));
- private String hiveTableFsPath;
+ if (!config.hasPath(SAVE_MODE) ||
StringUtils.isBlank(config.getString(Constant.SAVE_MODE))) {
+ config = config.withValue(SAVE_MODE,
ConfigValueFactory.fromAnyRef(SaveMode.APPEND.toString()));
+ }
- private String hiveTxtFileFieldDelimiter = String.valueOf('\001');
+ this.textFileSinkConfig = new TextFileSinkConfig(config,
seaTunnelRowTypeInfo);
- private String hiveTxtFileLineDelimiter = "\n";
+ // --------------------Check textFileSinkConfig with the hive
table info-------------------
+ List<FieldSchema> fields =
hiveMetaStoreClient.getFields(dbAndTableName[0], dbAndTableName[1]);
+ List<FieldSchema> partitionKeys = table.getPartitionKeys();
- public enum SaveMode {
- APPEND(),
- OVERWRITE();
+ // Remove partitionKeys from table fields
+ List<FieldSchema> fieldNotContainPartitionKey =
fields.stream().filter(filed ->
!partitionKeys.contains(filed)).collect(Collectors.toList());
- public static SaveMode fromStr(String str) {
- if ("overwrite".equals(str)) {
- return OVERWRITE;
- } else {
- return APPEND;
+ // check fields size must same as sinkColumnList size
+ if (fieldNotContainPartitionKey.size() !=
textFileSinkConfig.getSinkColumnList().size()) {
+ throw new RuntimeException("sink columns size must same as
hive table field size");
}
- }
- }
-
- public HiveSinkConfig(@NonNull Config pluginConfig) {
- checkNotNull(pluginConfig.getString(HIVE_RESULT_TABLE_NAME));
- checkNotNull(pluginConfig.getString(HIVE_TABLE_FS_PATH));
- this.hiveTableName = pluginConfig.getString(HIVE_RESULT_TABLE_NAME);
- this.hiveTableFsPath = pluginConfig.getString(HIVE_TABLE_FS_PATH);
- this.saveMode =
StringUtils.isBlank(pluginConfig.getString(HIVE_SAVE_MODE)) ? SaveMode.APPEND :
SaveMode.fromStr(pluginConfig.getString(HIVE_SAVE_MODE));
- if
(!StringUtils.isBlank(pluginConfig.getString(SINK_TMP_FS_ROOT_PATH))) {
- this.sinkTmpFsRootPath =
pluginConfig.getString(SINK_TMP_FS_ROOT_PATH);
- }
-
- this.partitionFieldNames =
pluginConfig.getStringList(HIVE_PARTITION_BY);
- this.sinkColumns = pluginConfig.getStringList(HIVE_SINK_COLUMNS);
+ // check hivePartitionFieldList size must same as
partitionFieldList size
+ if (partitionKeys.size() !=
textFileSinkConfig.getPartitionFieldList().size()) {
+ throw new RuntimeException("partition by columns size must
same as hive table partition columns size");
+ }
- if
(!StringUtils.isBlank(pluginConfig.getString(HIVE_TXT_FILE_FIELD_DELIMITER))) {
- this.hiveTxtFileFieldDelimiter =
pluginConfig.getString(HIVE_TXT_FILE_FIELD_DELIMITER);
+ // --------------------Check textFileSinkConfig with the hive
table info end----------------
+ } catch (TException e) {
+ throw new RuntimeException(e);
+ } finally {
+ hiveMetaStoreClient.close();
}
- if
(!StringUtils.isBlank(pluginConfig.getString(HIVE_TXT_FILE_LINE_DELIMITER))) {
- this.hiveTxtFileLineDelimiter =
pluginConfig.getString(HIVE_TXT_FILE_LINE_DELIMITER);
+ // hive only support append or overwrite
+ if (!this.textFileSinkConfig.getSaveMode().equals(SaveMode.APPEND) &&
!this.textFileSinkConfig.getSaveMode().equals(SaveMode.OVERWRITE)) {
+ throw new RuntimeException("hive only support append or overwrite
save mode");
}
+ }
- // partition fields must in sink columns
- if (!CollectionUtils.isEmpty(this.sinkColumns) &&
!CollectionUtils.isEmpty(this.partitionFieldNames) &&
!this.sinkColumns.containsAll(this.partitionFieldNames)) {
- throw new RuntimeException("partition fields must in sink
columns");
- }
+ public TextFileSinkConfig getTextFileSinkConfig() {
+ return textFileSinkConfig;
}
}
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkState.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkState.java
index 4f9f5d12e..a104151c3 100644
---
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkState.java
+++
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkState.java
@@ -25,5 +25,6 @@ import java.io.Serializable;
@Data
@AllArgsConstructor
public class HiveSinkState implements Serializable {
- private HiveSinkConfig hiveSinkConfig;
+ private String transactionId;
+ private Long checkpointId;
}
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkWriter.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkWriter.java
index e04f57729..4bdeae478 100644
---
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkWriter.java
@@ -20,48 +20,120 @@ package
org.apache.seatunnel.connectors.seatunnel.hive.sink;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import
org.apache.seatunnel.connectors.seatunnel.hive.sink.file.writer.FileWriter;
-import
org.apache.seatunnel.connectors.seatunnel.hive.sink.file.writer.HdfsTxtFileWriter;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.FileCommitInfo;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.FileSinkState;
+import
org.apache.seatunnel.connectors.seatunnel.file.sink.hdfs.HdfsFileSinkPlugin;
+import
org.apache.seatunnel.connectors.seatunnel.file.sink.spi.SinkFileSystemPlugin;
+import
org.apache.seatunnel.connectors.seatunnel.file.sink.transaction.TransactionStateFileWriter;
+import
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.FileSinkPartitionDirNameGenerator;
+import
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.FileSinkTransactionFileNameGenerator;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
-import com.google.common.collect.Lists;
import lombok.NonNull;
+import org.apache.commons.collections4.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import java.util.Optional;
+import java.util.stream.Collectors;
public class HiveSinkWriter implements SinkWriter<SeaTunnelRow,
HiveCommitInfo, HiveSinkState> {
private static final Logger LOGGER =
LoggerFactory.getLogger(HiveSinkWriter.class);
- private SeaTunnelRowType seaTunnelRowType;
+ private SeaTunnelRowType seaTunnelRowTypeInfo;
private Config pluginConfig;
- private SinkWriter.Context context;
- private long jobId;
+ private Context context;
+ private String jobId;
- private FileWriter fileWriter;
+ private TransactionStateFileWriter fileWriter;
private HiveSinkConfig hiveSinkConfig;
- public HiveSinkWriter(@NonNull SeaTunnelRowType seaTunnelRowType,
+ public HiveSinkWriter(@NonNull SeaTunnelRowType seaTunnelRowTypeInfo,
@NonNull Config pluginConfig,
@NonNull SinkWriter.Context context,
- long jobId) {
- this.seaTunnelRowType = seaTunnelRowType;
+ @NonNull HiveSinkConfig hiveSinkConfig,
+ @NonNull String jobId) {
+ this.seaTunnelRowTypeInfo = seaTunnelRowTypeInfo;
this.pluginConfig = pluginConfig;
this.context = context;
this.jobId = jobId;
+ this.hiveSinkConfig = hiveSinkConfig;
+
+ SinkFileSystemPlugin sinkFileSystemPlugin = new HdfsFileSinkPlugin();
+ Optional<TransactionStateFileWriter> transactionStateFileWriter =
sinkFileSystemPlugin.getTransactionStateFileWriter(this.seaTunnelRowTypeInfo,
+ new FileSinkTransactionFileNameGenerator(
+ this.hiveSinkConfig.getTextFileSinkConfig().getFileFormat(),
+
this.hiveSinkConfig.getTextFileSinkConfig().getFileNameExpression(),
+
this.hiveSinkConfig.getTextFileSinkConfig().getFileNameTimeFormat()),
+ new FileSinkPartitionDirNameGenerator(
+
this.hiveSinkConfig.getTextFileSinkConfig().getPartitionFieldList(),
+
this.hiveSinkConfig.getTextFileSinkConfig().getPartitionFieldsIndexInRow(),
+
this.hiveSinkConfig.getTextFileSinkConfig().getPartitionDirExpression()),
+
this.hiveSinkConfig.getTextFileSinkConfig().getSinkColumnsIndexInRow(),
+ this.hiveSinkConfig.getTextFileSinkConfig().getTmpPath(),
+ this.hiveSinkConfig.getTextFileSinkConfig().getPath(),
+ this.jobId,
+ this.context.getIndexOfSubtask(),
+ this.hiveSinkConfig.getTextFileSinkConfig().getFieldDelimiter(),
+ this.hiveSinkConfig.getTextFileSinkConfig().getRowDelimiter(),
+ sinkFileSystemPlugin.getFileSystem().get());
+
+ if (!transactionStateFileWriter.isPresent()) {
+ throw new RuntimeException("A TransactionStateFileWriter is need");
+ }
+
+ this.fileWriter = transactionStateFileWriter.get();
+
+ fileWriter.beginTransaction(1L);
+ }
- hiveSinkConfig = new HiveSinkConfig(this.pluginConfig);
- fileWriter = new HdfsTxtFileWriter(this.seaTunnelRowType,
- hiveSinkConfig,
+ public HiveSinkWriter(@NonNull SeaTunnelRowType seaTunnelRowTypeInfo,
+ @NonNull Config pluginConfig,
+ @NonNull SinkWriter.Context context,
+ @NonNull HiveSinkConfig hiveSinkConfig,
+ @NonNull String jobId,
+ @NonNull List<HiveSinkState> hiveSinkStates) {
+ this.seaTunnelRowTypeInfo = seaTunnelRowTypeInfo;
+ this.pluginConfig = pluginConfig;
+ this.context = context;
+ this.jobId = jobId;
+ this.hiveSinkConfig = hiveSinkConfig;
+
+ SinkFileSystemPlugin sinkFileSystemPlugin = new HdfsFileSinkPlugin();
+ Optional<TransactionStateFileWriter> transactionStateFileWriter =
sinkFileSystemPlugin.getTransactionStateFileWriter(this.seaTunnelRowTypeInfo,
+ new FileSinkTransactionFileNameGenerator(
+ this.hiveSinkConfig.getTextFileSinkConfig().getFileFormat(),
+
this.hiveSinkConfig.getTextFileSinkConfig().getFileNameExpression(),
+
this.hiveSinkConfig.getTextFileSinkConfig().getFileNameTimeFormat()),
+ new FileSinkPartitionDirNameGenerator(
+
this.hiveSinkConfig.getTextFileSinkConfig().getPartitionFieldList(),
+
this.hiveSinkConfig.getTextFileSinkConfig().getPartitionFieldsIndexInRow(),
+
this.hiveSinkConfig.getTextFileSinkConfig().getPartitionDirExpression()),
+
this.hiveSinkConfig.getTextFileSinkConfig().getSinkColumnsIndexInRow(),
+ this.hiveSinkConfig.getTextFileSinkConfig().getTmpPath(),
+ this.hiveSinkConfig.getTextFileSinkConfig().getPath(),
this.jobId,
- this.context.getIndexOfSubtask());
+ this.context.getIndexOfSubtask(),
+ this.hiveSinkConfig.getTextFileSinkConfig().getFieldDelimiter(),
+ this.hiveSinkConfig.getTextFileSinkConfig().getRowDelimiter(),
+ sinkFileSystemPlugin.getFileSystem().get());
+
+ if (!transactionStateFileWriter.isPresent()) {
+ throw new RuntimeException("A TransactionStateFileWriter is need");
+ }
+
+ this.fileWriter = transactionStateFileWriter.get();
+
+ // Rollback dirty transaction
+ if (hiveSinkStates.size() > 0) {
+ List<String> transactionAfter =
fileWriter.getTransactionAfter(hiveSinkStates.get(0).getTransactionId());
+ fileWriter.abortTransactions(transactionAfter);
+ }
+ fileWriter.beginTransaction(hiveSinkStates.get(0).getCheckpointId() +
1);
}
@Override
@@ -71,18 +143,12 @@ public class HiveSinkWriter implements
SinkWriter<SeaTunnelRow, HiveCommitInfo,
@Override
public Optional<HiveCommitInfo> prepareCommit() throws IOException {
- fileWriter.finishAndCloseWriteFile();
- /**
- * We will clear the needMoveFiles in {@link #snapshotState()}, So we
need copy the needMoveFiles map here.
- */
- Map<String, String> commitInfoMap = new
HashMap<>(fileWriter.getNeedMoveFiles().size());
- commitInfoMap.putAll(fileWriter.getNeedMoveFiles());
- return Optional.of(new HiveCommitInfo(commitInfoMap));
- }
-
- @Override
- public void abortPrepare() {
- fileWriter.abort();
+ Optional<FileCommitInfo> fileCommitInfoOptional =
fileWriter.prepareCommit();
+ if (fileCommitInfoOptional.isPresent()) {
+ FileCommitInfo fileCommitInfo = fileCommitInfoOptional.get();
+ return Optional.of(new HiveCommitInfo(fileCommitInfo,
hiveSinkConfig.getHiveMetaUris(), this.hiveSinkConfig.getTable()));
+ }
+ return Optional.empty();
}
@Override
@@ -92,8 +158,17 @@ public class HiveSinkWriter implements
SinkWriter<SeaTunnelRow, HiveCommitInfo,
@Override
public List<HiveSinkState> snapshotState(long checkpointId) throws
IOException {
- //reset FileWrite
- fileWriter.resetFileWriter(System.currentTimeMillis() + "");
- return Lists.newArrayList(new HiveSinkState(hiveSinkConfig));
+ List<FileSinkState> fileSinkStates =
fileWriter.snapshotState(checkpointId);
+ if (!CollectionUtils.isEmpty(fileSinkStates)) {
+ return fileSinkStates.stream().map(state ->
+ new HiveSinkState(state.getTransactionId(),
state.getCheckpointId()))
+ .collect(Collectors.toList());
+ }
+ return null;
+ }
+
+ @Override
+ public void abortPrepare() {
+ fileWriter.abortTransaction();
}
}
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/file/writer/AbstractFileWriter.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/file/writer/AbstractFileWriter.java
deleted file mode 100644
index 57ca3c901..000000000
---
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/file/writer/AbstractFileWriter.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.hive.sink.file.writer;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.connectors.seatunnel.hive.sink.HiveSinkConfig;
-
-import lombok.NonNull;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-
-public abstract class AbstractFileWriter implements FileWriter {
- protected Map<String, String> needMoveFiles;
- protected SeaTunnelRowType seaTunnelRowType;
- protected long jobId;
- protected int subTaskIndex;
- protected HiveSinkConfig hiveSinkConfig;
-
- private static final String SEATUNNEL = "seatunnel";
- private static final String NON_PARTITION = "NON_PARTITION";
-
- protected Map<String, String> beingWrittenFile;
-
- protected String checkpointId;
- protected final int[] partitionKeyIndexes;
-
- public AbstractFileWriter(@NonNull SeaTunnelRowType seaTunnelRowType,
- @NonNull HiveSinkConfig hiveSinkConfig,
- long jobId,
- int subTaskIndex) {
- checkArgument(jobId > 0);
- checkArgument(subTaskIndex > -1);
-
- this.needMoveFiles = new HashMap<>();
- this.seaTunnelRowType = seaTunnelRowType;
- this.jobId = jobId;
- this.subTaskIndex = subTaskIndex;
- this.hiveSinkConfig = hiveSinkConfig;
-
- this.beingWrittenFile = new HashMap<>();
- if (this.hiveSinkConfig.getPartitionFieldNames() == null) {
- this.partitionKeyIndexes = new int[0];
- } else {
- this.partitionKeyIndexes = IntStream.range(0,
seaTunnelRowType.getTotalFields())
- .filter(i ->
hiveSinkConfig.getPartitionFieldNames().contains(seaTunnelRowType.getFieldName(i)))
- .toArray();
- }
- }
-
- public String getOrCreateFilePathBeingWritten(@NonNull SeaTunnelRow
seaTunnelRow) {
- String beingWrittenFileKey = getBeingWrittenFileKey(seaTunnelRow);
- // get filePath from beingWrittenFile
- String beingWrittenFilePath =
beingWrittenFile.get(beingWrittenFileKey);
- if (beingWrittenFilePath != null) {
- return beingWrittenFilePath;
- } else {
- StringBuilder sbf = new
StringBuilder(hiveSinkConfig.getSinkTmpFsRootPath());
- sbf.append("/")
- .append(SEATUNNEL)
- .append("/")
- .append(jobId)
- .append("/")
- .append(checkpointId)
- .append("/")
- .append(hiveSinkConfig.getHiveTableName())
- .append("/")
- .append(beingWrittenFileKey)
- .append("/")
- .append(jobId)
- .append("_")
- .append(subTaskIndex)
- .append(".")
- .append(getFileSuffix());
- String newBeingWrittenFilePath = sbf.toString();
- beingWrittenFile.put(beingWrittenFileKey, newBeingWrittenFilePath);
- return newBeingWrittenFilePath;
- }
- }
-
- private String getBeingWrittenFileKey(@NonNull SeaTunnelRow seaTunnelRow) {
- if (partitionKeyIndexes.length > 0) {
- return Arrays.stream(partitionKeyIndexes)
- .boxed()
- .map(i -> seaTunnelRowType.getFieldName(i) + "=" +
seaTunnelRow.getField(i))
- .collect(Collectors.joining("/"));
- } else {
- // If there is no partition field in data, We use the fixed value
NON_PARTITION as the partition directory
- return NON_PARTITION;
- }
- }
-
- /**
- * FileWriter need return the file suffix. eg: tex, orc, parquet
- *
- * @return
- */
- @NonNull
- public abstract String getFileSuffix();
-
- public String getHiveLocation(@NonNull String seaTunnelFilePath) {
- StringBuilder sbf = new
StringBuilder(hiveSinkConfig.getSinkTmpFsRootPath());
- sbf.append("/")
- .append(SEATUNNEL)
- .append("/")
- .append(jobId)
- .append("/")
- .append(checkpointId)
- .append("/")
- .append(hiveSinkConfig.getHiveTableName());
- String seaTunnelPath = sbf.toString();
- String tmpPath = seaTunnelFilePath.replaceAll(seaTunnelPath,
hiveSinkConfig.getHiveTableFsPath());
- return tmpPath.replaceAll(NON_PARTITION + "/", "");
- }
-
- @Override
- public void resetFileWriter(@NonNull String checkpointId) {
- this.checkpointId = checkpointId;
- this.needMoveFiles = new HashMap<>();
- this.beingWrittenFile = new HashMap<>();
- this.resetMoreFileWriter(checkpointId);
- }
-
- public abstract void resetMoreFileWriter(@NonNull String checkpointId);
-
- @Override
- public void abort() {
- this.needMoveFiles = new HashMap<>();
- this.beingWrittenFile = new HashMap<>();
- this.abortMore();
- }
-
- public abstract void abortMore();
-}
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/file/writer/FileWriter.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/file/writer/FileWriter.java
deleted file mode 100644
index 8ee8777a1..000000000
---
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/file/writer/FileWriter.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.hive.sink.file.writer;
-
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-
-import lombok.NonNull;
-
-import java.util.Map;
-
-public interface FileWriter {
-
- void write(@NonNull SeaTunnelRow seaTunnelRow);
-
- @NonNull
- Map<String, String> getNeedMoveFiles();
-
- /**
- * In this method we need finish write the file. The following operations
are often required:
- * 1. Flush memory to disk.
- * 2. Close output stream.
- * 3. Add the mapping relationship between seatunnel file path and hive
file path to needMoveFiles.
- */
- void finishAndCloseWriteFile();
-
- /**
- * The writer needs to be reset after each checkpoint is completed
- *
- * @param checkpointId checkpointId
- */
- void resetFileWriter(@NonNull String checkpointId);
-
- void abort();
-}
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/file/writer/HdfsTxtFileWriter.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/file/writer/HdfsTxtFileWriter.java
deleted file mode 100644
index 71b26568f..000000000
---
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/file/writer/HdfsTxtFileWriter.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.hive.sink.file.writer;
-
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.connectors.seatunnel.hive.sink.HiveSinkConfig;
-
-import lombok.Lombok;
-import lombok.NonNull;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-
-public class HdfsTxtFileWriter extends AbstractFileWriter {
- private static final Logger LOGGER =
LoggerFactory.getLogger(HdfsTxtFileWriter.class);
- private Map<String, FSDataOutputStream> beingWrittenOutputStream;
- protected final int[] sinkColumnIndexes;
-
- public HdfsTxtFileWriter(SeaTunnelRowType seaTunnelRowType,
- HiveSinkConfig hiveSinkConfig,
- long sinkId,
- int subTaskIndex) {
- super(seaTunnelRowType, hiveSinkConfig, sinkId, subTaskIndex);
- beingWrittenOutputStream = new HashMap<>();
- List<String> sinkColumns = hiveSinkConfig.getSinkColumns();
- if (sinkColumns == null || sinkColumns.size() == 0) {
- this.sinkColumnIndexes = IntStream.range(0,
seaTunnelRowType.getTotalFields()).toArray();
- } else {
- this.sinkColumnIndexes = IntStream.range(0,
seaTunnelRowType.getTotalFields())
- .filter(i ->
sinkColumns.contains(seaTunnelRowType.getFieldName(i)))
- .toArray();
- }
- }
-
- @Override
- @NonNull
- public String getFileSuffix() {
- return "txt";
- }
-
- @Override
- public void resetMoreFileWriter(@NonNull String checkpointId) {
- this.beingWrittenOutputStream = new HashMap<>();
- }
-
- @Override
- public void abortMore() {
- // delete files
- beingWrittenOutputStream.keySet().stream().forEach(file -> {
- try {
- boolean deleted = HdfsUtils.deleteFile(file);
- if (!deleted) {
- LOGGER.error("delete file {} error", file);
- throw new IOException(String.format("delete file {}
error", file));
- }
- } catch (IOException e) {
- LOGGER.error("delete file {} error", file);
- throw new RuntimeException(e);
- }
- });
-
- this.beingWrittenOutputStream = new HashMap<>();
- }
-
- @Override
- public void write(@NonNull SeaTunnelRow seaTunnelRow) {
- Lombok.checkNotNull(seaTunnelRow, "seaTunnelRow is null");
- String filePath = getOrCreateFilePathBeingWritten(seaTunnelRow);
- FSDataOutputStream fsDataOutputStream =
getOrCreateOutputStream(filePath);
- String line = transformRowToLine(seaTunnelRow);
- try {
- fsDataOutputStream.write(line.getBytes());
-
fsDataOutputStream.write(hiveSinkConfig.getHiveTxtFileLineDelimiter().getBytes());
- } catch (IOException e) {
- LOGGER.error("write data to file {} error", filePath);
- throw new RuntimeException(e);
- }
- }
-
- @NonNull
- @Override
- public Map<String, String> getNeedMoveFiles() {
- return this.needMoveFiles;
- }
-
- @Override
- public void finishAndCloseWriteFile() {
- beingWrittenOutputStream.entrySet().forEach(entry -> {
- try {
- entry.getValue().flush();
- } catch (IOException e) {
- LOGGER.error("error when flush file {}", entry.getKey());
- throw new RuntimeException(e);
- } finally {
- try {
- entry.getValue().close();
- } catch (IOException e) {
- LOGGER.error("error when close output stream {}",
entry.getKey());
- }
- }
-
- needMoveFiles.put(entry.getKey(), getHiveLocation(entry.getKey()));
- });
- }
-
- private FSDataOutputStream getOrCreateOutputStream(@NonNull String
filePath) {
- FSDataOutputStream fsDataOutputStream =
beingWrittenOutputStream.get(filePath);
- if (fsDataOutputStream == null) {
- try {
- fsDataOutputStream = HdfsUtils.getOutputStream(filePath);
- beingWrittenOutputStream.put(filePath, fsDataOutputStream);
- } catch (IOException e) {
- LOGGER.error("can not get output file stream");
- throw new RuntimeException(e);
- }
- }
- return fsDataOutputStream;
- }
-
- private String transformRowToLine(@NonNull SeaTunnelRow seaTunnelRow) {
- return Arrays.stream(sinkColumnIndexes)
- .boxed()
- .map(seaTunnelRow::getField)
- .map(value -> value == null ? "" : value.toString())
-
.collect(Collectors.joining(hiveSinkConfig.getHiveTxtFileFieldDelimiter()));
- }
-}
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/file/writer/HdfsUtils.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/file/writer/HdfsUtils.java
deleted file mode 100644
index 23b1e5843..000000000
---
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/file/writer/HdfsUtils.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.hive.sink.file.writer;
-
-import lombok.NonNull;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-
-public class HdfsUtils {
- private static final Logger LOGGER =
LoggerFactory.getLogger(HdfsUtils.class);
-
- public static final int WRITE_BUFFER_SIZE = 2048;
-
- public static FileSystem getHdfsFs(@NonNull String path)
- throws IOException {
- Configuration conf = new Configuration();
- conf.set("fs.hdfs.impl",
"org.apache.hadoop.hdfs.DistributedFileSystem");
- conf.set("fs.defaultFs", path);
- return FileSystem.get(conf);
- }
-
- public static FSDataOutputStream getOutputStream(@NonNull String
outFilePath) throws IOException {
- FileSystem hdfsFs = getHdfsFs(outFilePath);
- Path path = new Path(outFilePath);
- FSDataOutputStream fsDataOutputStream = hdfsFs.create(path, true,
WRITE_BUFFER_SIZE);
- return fsDataOutputStream;
- }
-
- public static boolean deleteFile(@NonNull String file) throws IOException {
- FileSystem hdfsFs = getHdfsFs(file);
- return hdfsFs.delete(new Path(file), true);
- }
-
- /**
- * rename file
- *
- * @param oldName old file name
- * @param newName target file name
- * @param rmWhenExist if this is true, we will delete the target file when
it already exists
- * @throws IOException throw IOException
- */
- public static void renameFile(@NonNull String oldName, @NonNull String
newName, boolean rmWhenExist) throws IOException {
- FileSystem hdfsFs = getHdfsFs(newName);
- LOGGER.info("begin rename file oldName :[" + oldName + "] to newName
:[" + newName + "]");
-
- Path oldPath = new Path(oldName);
- Path newPath = new Path(newName);
- if (rmWhenExist) {
- if (fileExist(newName) && fileExist(oldName)) {
- hdfsFs.delete(newPath, true);
- }
- }
- if (!fileExist(newName.substring(0, newName.lastIndexOf("/")))) {
- createDir(newName.substring(0, newName.lastIndexOf("/")));
- }
- LOGGER.info("rename file :[" + oldPath + "] to [" + newPath + "]
finish");
-
- hdfsFs.rename(oldPath, newPath);
- }
-
- public static boolean createDir(@NonNull String filePath)
- throws IOException {
-
- FileSystem hdfsFs = getHdfsFs(filePath);
- Path dfs = new Path(filePath);
- return hdfsFs.mkdirs(dfs);
- }
-
- public static boolean fileExist(@NonNull String filePath)
- throws IOException {
- FileSystem hdfsFs = getHdfsFs(filePath);
- Path fileName = new Path(filePath);
- return hdfsFs.exists(fileName);
- }
-}
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java
new file mode 100644
index 000000000..30c9a2eba
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.utils;
+
+import lombok.NonNull;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.thrift.TException;
+
+public class HiveMetaStoreProxy {
+
+ private HiveMetaStoreClient hiveMetaStoreClient;
+
+ public HiveMetaStoreProxy(@NonNull String uris) {
+ HiveConf hiveConf = new HiveConf();
+ hiveConf.set("hive.metastore.uris", uris);
+ try {
+ hiveMetaStoreClient = new HiveMetaStoreClient(hiveConf);
+ } catch (MetaException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public Table getTable(@NonNull String dbName, @NonNull String tableName) {
+ try {
+ return hiveMetaStoreClient.getTable(dbName, tableName);
+ } catch (TException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public HiveMetaStoreClient getHiveMetaStoreClient() {
+ return hiveMetaStoreClient;
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-hive/src/test/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/TestHiveSinkConfig.java
b/seatunnel-connectors-v2/connector-hive/src/test/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/TestHiveSinkConfig.java
new file mode 100644
index 000000000..92e5a6931
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-hive/src/test/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/TestHiveSinkConfig.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.sink;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigResolveOptions;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.io.File;
+import java.util.List;
+
+@RunWith(JUnit4.class)
+public class TestHiveSinkConfig {
+
+ @Test
+ public void testCreateHiveSinkConfig() {
+ String[] fieldNames = new String[]{"name", "age"};
+ SeaTunnelDataType[] seaTunnelDataTypes = new
SeaTunnelDataType[]{BasicType.STRING_TYPE, BasicType.INT_TYPE};
+ SeaTunnelRowType seaTunnelRowTypeInfo = new
SeaTunnelRowType(fieldNames, seaTunnelDataTypes);
+ String configFile = "fakesource_to_hive.conf";
+ String configFilePath = System.getProperty("user.dir") +
"/src/test/resources/" + configFile;
+ Config config = ConfigFactory
+ .parseFile(new File(configFilePath))
+ .resolve(ConfigResolveOptions.defaults().setAllowUnresolved(true))
+ .resolveWith(ConfigFactory.systemProperties(),
+ ConfigResolveOptions.defaults().setAllowUnresolved(true));
+ List<? extends Config> sink = config.getConfigList("sink");
+ HiveSinkConfig hiveSinkConfig = new HiveSinkConfig(sink.get(0),
seaTunnelRowTypeInfo);
+ }
+}
diff --git
a/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fakesource_to_file.conf
b/seatunnel-connectors-v2/connector-hive/src/test/resources/fakesource_to_hive.conf
similarity index 80%
copy from
seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fakesource_to_file.conf
copy to
seatunnel-connectors-v2/connector-hive/src/test/resources/fakesource_to_hive.conf
index c1ce63055..3412ea663 100644
---
a/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fakesource_to_file.conf
+++
b/seatunnel-connectors-v2/connector-hive/src/test/resources/fakesource_to_hive.conf
@@ -21,8 +21,8 @@
env {
# You can set flink configuration here
execution.parallelism = 1
- job.mode = "STREAMING"
- execution.checkpoint.interval = 5000
+ job.mode = "BATCH"
+ #execution.checkpoint.interval = 10000
#execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
}
@@ -38,29 +38,16 @@ source {
}
transform {
-
- sql {
- sql = "select name,age from fake"
- }
# If you would like to get more information about how to configure seatunnel
and see full list of transform plugins,
# please go to
https://seatunnel.apache.org/docs/flink/configuration/transform-plugins/Sql
}
sink {
- File {
- path="file:///tmp/hive/warehouse/test2"
- field_delimiter="\t"
- row_delimiter="\n"
+ Hive {
+ hive_table_name="default.test_fake_to_hive"
+ hive_metastore_uris="thrift://localhost:9083"
partition_by=["age"]
- partition_dir_expression="${k0}=${v0}"
- is_partition_field_write_in_file=true
- file_name_expression="${transactionId}_${now}"
- file_format="text"
sink_columns=["name","age"]
- filename_time_format="yyyy.MM.dd"
- is_enable_transaction=true
- save_mode="error"
-
}
# If you would like to get more information about how to configure seatunnel
and see full list of sink plugins,
diff --git a/seatunnel-connectors-v2/connector-hudi/pom.xml
b/seatunnel-connectors-v2/connector-hudi/pom.xml
index 26b3c7b7c..0b2ee9ce8 100644
--- a/seatunnel-connectors-v2/connector-hudi/pom.xml
+++ b/seatunnel-connectors-v2/connector-hudi/pom.xml
@@ -32,9 +32,9 @@
<dependencies>
<dependency>
- <groupId>org.apache.seatunnel</groupId>
- <artifactId>seatunnel-hive-shade</artifactId>
- <version>${project.version}</version>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-exec</artifactId>
+ <scope>provided</scope>
</dependency>
<dependency>
diff --git a/seatunnel-dependency-shade/pom.xml
b/seatunnel-dependency-shade/pom.xml
deleted file mode 100644
index 1e6259991..000000000
--- a/seatunnel-dependency-shade/pom.xml
+++ /dev/null
@@ -1,39 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
-
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
-
- <parent>
- <artifactId>seatunnel</artifactId>
- <groupId>org.apache.seatunnel</groupId>
- <version>${revision}</version>
- <relativePath>../pom.xml</relativePath>
- </parent>
- <modelVersion>4.0.0</modelVersion>
-
- <artifactId>seatunnel-dependency-shade</artifactId>
- <packaging>pom</packaging>
-
- <modules>
- <module>seatunnel-hive-shade</module>
- </modules>
-
-</project>
\ No newline at end of file
diff --git a/seatunnel-dependency-shade/seatunnel-hive-shade/pom.xml
b/seatunnel-dependency-shade/seatunnel-hive-shade/pom.xml
deleted file mode 100644
index ddab451b6..000000000
--- a/seatunnel-dependency-shade/seatunnel-hive-shade/pom.xml
+++ /dev/null
@@ -1,69 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
-
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <parent>
- <artifactId>seatunnel-dependency-shade</artifactId>
- <groupId>org.apache.seatunnel</groupId>
- <version>${revision}</version>
- </parent>
- <modelVersion>4.0.0</modelVersion>
- <artifactId>seatunnel-hive-shade</artifactId>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.hive</groupId>
- <artifactId>hive-exec</artifactId>
- </dependency>
- </dependencies>
- <build>
- <finalName>${project.artifactId}-${project.version}</finalName>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-shade-plugin</artifactId>
- <executions>
- <execution>
- <phase>package</phase>
- <goals>
- <goal>shade</goal>
- </goals>
- <configuration>
- <filters>
- <filter>
- <artifact>*:*</artifact>
- <excludes>
- <exclude>META-INF/*.SF</exclude>
- <exclude>META-INF/*.DSA</exclude>
- <exclude>META-INF/*.RSA</exclude>
-
<exclude>org/apache/calcite/**</exclude>
- <exclude>org/htrace/**</exclude>
- </excludes>
- </filter>
- </filters>
- </configuration>
- </execution>
- </executions>
- </plugin>
-
- </plugins>
- </build>
-</project>
\ No newline at end of file
diff --git a/seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml
b/seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml
index 68e0ad2e8..f2d18d62f 100644
--- a/seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml
+++ b/seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml
@@ -56,6 +56,11 @@
<artifactId>connector-console</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-file-local</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-socket</artifactId>
@@ -101,6 +106,12 @@
<scope>${flink.scope}</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-fake</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
</dependencies>
</project>
\ No newline at end of file
diff --git
a/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fakesource_to_file.conf
b/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fakesource_to_file.conf
index c1ce63055..f7b790c40 100644
---
a/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fakesource_to_file.conf
+++
b/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fakesource_to_file.conf
@@ -21,8 +21,8 @@
env {
# You can set flink configuration here
execution.parallelism = 1
- job.mode = "STREAMING"
- execution.checkpoint.interval = 5000
+ job.mode = "BATCH"
+ execution.checkpoint.interval = 10
#execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
}
@@ -47,7 +47,7 @@ transform {
}
sink {
- File {
+ LocalFile {
path="file:///tmp/hive/warehouse/test2"
field_delimiter="\t"
row_delimiter="\n"