This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new e99e6ee72 Remove same code to independent method in HiveSinkWriter
(#2307)
e99e6ee72 is described below
commit e99e6ee726acfd48dc2a5809f0d6b58b0e1e8356
Author: Xiao Zhao <[email protected]>
AuthorDate: Mon Aug 1 10:35:13 2022 +0800
Remove same code to independent method in HiveSinkWriter (#2307)
---
.../connectors/seatunnel/hive/sink/HiveSink.java | 2 +-
.../seatunnel/hive/sink/HiveSinkWriter.java | 85 +++++++++-------------
2 files changed, 36 insertions(+), 51 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
index 4df91b1a5..bcdca3d49 100644
---
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
+++
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
@@ -113,7 +113,7 @@ public class HiveSink implements
SeaTunnelSink<SeaTunnelRow, HiveSinkState, Hive
@Override
public Optional<Serializer<HiveAggregatedCommitInfo>>
getAggregatedCommitInfoSerializer() {
- return Optional.of(new DefaultSerializer<HiveAggregatedCommitInfo>());
+ return Optional.of(new DefaultSerializer<>());
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkWriter.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkWriter.java
index 4aff955cd..9192a2738 100644
---
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkWriter.java
@@ -63,31 +63,7 @@ public class HiveSinkWriter implements
SinkWriter<SeaTunnelRow, HiveCommitInfo,
this.context = context;
this.jobId = jobId;
this.hiveSinkConfig = hiveSinkConfig;
-
- SinkFileSystemPlugin sinkFileSystemPlugin = new HdfsFileSinkPlugin();
- Optional<TransactionStateFileWriter> transactionStateFileWriter =
sinkFileSystemPlugin.getTransactionStateFileWriter(this.seaTunnelRowTypeInfo,
- new FileSinkTransactionFileNameGenerator(
- this.hiveSinkConfig.getTextFileSinkConfig().getFileFormat(),
-
this.hiveSinkConfig.getTextFileSinkConfig().getFileNameExpression(),
-
this.hiveSinkConfig.getTextFileSinkConfig().getFileNameTimeFormat()),
- new FileSinkPartitionDirNameGenerator(
-
this.hiveSinkConfig.getTextFileSinkConfig().getPartitionFieldList(),
-
this.hiveSinkConfig.getTextFileSinkConfig().getPartitionFieldsIndexInRow(),
-
this.hiveSinkConfig.getTextFileSinkConfig().getPartitionDirExpression()),
-
this.hiveSinkConfig.getTextFileSinkConfig().getSinkColumnsIndexInRow(),
- this.hiveSinkConfig.getTextFileSinkConfig().getTmpPath(),
- this.hiveSinkConfig.getTextFileSinkConfig().getPath(),
- this.jobId,
- this.context.getIndexOfSubtask(),
- this.hiveSinkConfig.getTextFileSinkConfig().getFieldDelimiter(),
- this.hiveSinkConfig.getTextFileSinkConfig().getRowDelimiter(),
- sinkFileSystemPlugin.getFileSystem().get());
-
- if (!transactionStateFileWriter.isPresent()) {
- throw new RuntimeException("A TransactionStateFileWriter is need");
- }
-
- this.fileWriter = transactionStateFileWriter.get();
+ this.fileWriter = createFileWriter();
fileWriter.beginTransaction(1L);
}
@@ -103,31 +79,7 @@ public class HiveSinkWriter implements
SinkWriter<SeaTunnelRow, HiveCommitInfo,
this.context = context;
this.jobId = jobId;
this.hiveSinkConfig = hiveSinkConfig;
-
- SinkFileSystemPlugin sinkFileSystemPlugin = new HdfsFileSinkPlugin();
- Optional<TransactionStateFileWriter> transactionStateFileWriter =
sinkFileSystemPlugin.getTransactionStateFileWriter(this.seaTunnelRowTypeInfo,
- new FileSinkTransactionFileNameGenerator(
- this.hiveSinkConfig.getTextFileSinkConfig().getFileFormat(),
-
this.hiveSinkConfig.getTextFileSinkConfig().getFileNameExpression(),
-
this.hiveSinkConfig.getTextFileSinkConfig().getFileNameTimeFormat()),
- new FileSinkPartitionDirNameGenerator(
-
this.hiveSinkConfig.getTextFileSinkConfig().getPartitionFieldList(),
-
this.hiveSinkConfig.getTextFileSinkConfig().getPartitionFieldsIndexInRow(),
-
this.hiveSinkConfig.getTextFileSinkConfig().getPartitionDirExpression()),
-
this.hiveSinkConfig.getTextFileSinkConfig().getSinkColumnsIndexInRow(),
- this.hiveSinkConfig.getTextFileSinkConfig().getTmpPath(),
- this.hiveSinkConfig.getTextFileSinkConfig().getPath(),
- this.jobId,
- this.context.getIndexOfSubtask(),
- this.hiveSinkConfig.getTextFileSinkConfig().getFieldDelimiter(),
- this.hiveSinkConfig.getTextFileSinkConfig().getRowDelimiter(),
- sinkFileSystemPlugin.getFileSystem().get());
-
- if (!transactionStateFileWriter.isPresent()) {
- throw new RuntimeException("A TransactionStateFileWriter is need");
- }
-
- this.fileWriter = transactionStateFileWriter.get();
+ this.fileWriter = createFileWriter();
// Rollback dirty transaction
if (hiveSinkStates.size() > 0) {
@@ -172,4 +124,37 @@ public class HiveSinkWriter implements
SinkWriter<SeaTunnelRow, HiveCommitInfo,
public void abortPrepare() {
fileWriter.abortTransaction();
}
+
+ private TransactionStateFileWriter createFileWriter() {
+ SinkFileSystemPlugin sinkFileSystemPlugin = new HdfsFileSinkPlugin();
+ Optional<TransactionStateFileWriter> transactionStateFileWriterOpt =
sinkFileSystemPlugin.getTransactionStateFileWriter(this.seaTunnelRowTypeInfo,
+ getFilenameGenerator(),
+ getPartitionDirNameGenerator(),
+
this.hiveSinkConfig.getTextFileSinkConfig().getSinkColumnsIndexInRow(),
+ this.hiveSinkConfig.getTextFileSinkConfig().getTmpPath(),
+ this.hiveSinkConfig.getTextFileSinkConfig().getPath(),
+ this.jobId,
+ this.context.getIndexOfSubtask(),
+
this.hiveSinkConfig.getTextFileSinkConfig().getFieldDelimiter(),
+ this.hiveSinkConfig.getTextFileSinkConfig().getRowDelimiter(),
+ sinkFileSystemPlugin.getFileSystem().get());
+ if (!transactionStateFileWriterOpt.isPresent()) {
+ throw new RuntimeException("A TransactionStateFileWriter is need");
+ }
+ return transactionStateFileWriterOpt.get();
+ }
+
+ private FileSinkTransactionFileNameGenerator getFilenameGenerator() {
+ return new FileSinkTransactionFileNameGenerator(
+ this.hiveSinkConfig.getTextFileSinkConfig().getFileFormat(),
+
this.hiveSinkConfig.getTextFileSinkConfig().getFileNameExpression(),
+
this.hiveSinkConfig.getTextFileSinkConfig().getFileNameTimeFormat());
+ }
+
+ private FileSinkPartitionDirNameGenerator getPartitionDirNameGenerator() {
+ return new FileSinkPartitionDirNameGenerator(
+
this.hiveSinkConfig.getTextFileSinkConfig().getPartitionFieldList(),
+
this.hiveSinkConfig.getTextFileSinkConfig().getPartitionFieldsIndexInRow(),
+
this.hiveSinkConfig.getTextFileSinkConfig().getPartitionDirExpression());
+ }
}