This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 085e0e5fc3 Add multiple table file sink to base (#6049)
085e0e5fc3 is described below
commit 085e0e5fc354e9dcbed2096b319d538e6671ec47
Author: Eric <[email protected]>
AuthorDate: Fri Dec 22 16:12:15 2023 +0800
Add multiple table file sink to base (#6049)
---
.../seatunnel/file/sink/BaseFileSinkWriter.java | 6 +-
.../file/sink/BaseMultipleTableFileSink.java} | 23 +++----
.../seatunnel/file/local/sink/LocalFileSink.java | 79 +---------------------
.../local/sink/writter/LocalFileSinkWriter.java | 51 --------------
4 files changed, 17 insertions(+), 142 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSinkWriter.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSinkWriter.java
index 27a36e245b..017b6d80c1 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSinkWriter.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.file.sink;
import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
@@ -40,7 +41,10 @@ import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
-public class BaseFileSinkWriter implements SinkWriter<SeaTunnelRow,
FileCommitInfo, FileSinkState> {
+public class BaseFileSinkWriter
+ implements SinkWriter<SeaTunnelRow, FileCommitInfo, FileSinkState>,
+ SupportMultiTableSinkWriter<WriteStrategy> {
+
protected final WriteStrategy writeStrategy;
public BaseFileSinkWriter(
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSink.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseMultipleTableFileSink.java
similarity index 84%
copy from
seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSink.java
copy to
seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseMultipleTableFileSink.java
index 308a989008..fee66ad467 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSink.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseMultipleTableFileSink.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connectors.seatunnel.file.local.sink;
+package org.apache.seatunnel.connectors.seatunnel.file.sink;
import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
@@ -27,11 +27,8 @@ import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.sink.SupportMultiTableSink;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import
org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopFileSystemProxy;
-import
org.apache.seatunnel.connectors.seatunnel.file.local.config.LocalFileHadoopConf;
-import
org.apache.seatunnel.connectors.seatunnel.file.local.sink.writter.LocalFileSinkWriter;
import
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileAggregatedCommitInfo;
import
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileCommitInfo;
import
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileSinkAggregatedCommitter;
@@ -43,7 +40,7 @@ import
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategyF
import java.util.List;
import java.util.Optional;
-public class LocalFileSink
+public abstract class BaseMultipleTableFileSink
implements SeaTunnelSink<
SeaTunnelRow, FileSinkState, FileCommitInfo,
FileAggregatedCommitInfo>,
SupportMultiTableSink {
@@ -54,8 +51,11 @@ public class LocalFileSink
private final WriteStrategy writeStrategy;
private String jobId;
- public LocalFileSink(ReadonlyConfig readonlyConfig, CatalogTable
catalogTable) {
- this.hadoopConf = new LocalFileHadoopConf();
+ public abstract String getPluginName();
+
+ public BaseMultipleTableFileSink(
+ HadoopConf hadoopConf, ReadonlyConfig readonlyConfig, CatalogTable
catalogTable) {
+ this.hadoopConf = hadoopConf;
this.fileSinkConfig =
new FileSinkConfig(readonlyConfig.toConfig(),
catalogTable.getSeaTunnelRowType());
this.writeStrategy =
@@ -72,7 +72,7 @@ public class LocalFileSink
@Override
public SinkWriter<SeaTunnelRow, FileCommitInfo, FileSinkState>
restoreWriter(
SinkWriter.Context context, List<FileSinkState> states) {
- return new LocalFileSinkWriter(writeStrategy, hadoopConf, context,
jobId, states);
+ return new BaseFileSinkWriter(writeStrategy, hadoopConf, context,
jobId, states);
}
@Override
@@ -84,7 +84,7 @@ public class LocalFileSink
@Override
public SinkWriter<SeaTunnelRow, FileCommitInfo, FileSinkState>
createWriter(
SinkWriter.Context context) {
- return new LocalFileSinkWriter(writeStrategy, hadoopConf, context,
jobId);
+ return new BaseFileSinkWriter(writeStrategy, hadoopConf, context,
jobId);
}
@Override
@@ -101,9 +101,4 @@ public class LocalFileSink
public Optional<Serializer<FileSinkState>> getWriterStateSerializer() {
return Optional.of(new DefaultSerializer<>());
}
-
- @Override
- public String getPluginName() {
- return FileSystemType.LOCAL.getFileSystemPluginName();
- }
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSink.java
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSink.java
index 308a989008..94741941bf 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSink.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSink.java
@@ -17,89 +17,16 @@
package org.apache.seatunnel.connectors.seatunnel.file.local.sink;
-import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
-import org.apache.seatunnel.api.serialization.DefaultSerializer;
-import org.apache.seatunnel.api.serialization.Serializer;
-import org.apache.seatunnel.api.sink.SeaTunnelSink;
-import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
-import org.apache.seatunnel.api.sink.SinkWriter;
-import org.apache.seatunnel.api.sink.SupportMultiTableSink;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
-import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
-import
org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopFileSystemProxy;
import
org.apache.seatunnel.connectors.seatunnel.file.local.config.LocalFileHadoopConf;
-import
org.apache.seatunnel.connectors.seatunnel.file.local.sink.writter.LocalFileSinkWriter;
-import
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileAggregatedCommitInfo;
-import
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileCommitInfo;
-import
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileSinkAggregatedCommitter;
-import
org.apache.seatunnel.connectors.seatunnel.file.sink.config.FileSinkConfig;
-import org.apache.seatunnel.connectors.seatunnel.file.sink.state.FileSinkState;
-import
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategy;
-import
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategyFactory;
+import
org.apache.seatunnel.connectors.seatunnel.file.sink.BaseMultipleTableFileSink;
-import java.util.List;
-import java.util.Optional;
-
-public class LocalFileSink
- implements SeaTunnelSink<
- SeaTunnelRow, FileSinkState, FileCommitInfo,
FileAggregatedCommitInfo>,
- SupportMultiTableSink {
-
- private final HadoopConf hadoopConf;
- private final HadoopFileSystemProxy hadoopFileSystemProxy;
- private final FileSinkConfig fileSinkConfig;
- private final WriteStrategy writeStrategy;
- private String jobId;
+public class LocalFileSink extends BaseMultipleTableFileSink {
public LocalFileSink(ReadonlyConfig readonlyConfig, CatalogTable
catalogTable) {
- this.hadoopConf = new LocalFileHadoopConf();
- this.fileSinkConfig =
- new FileSinkConfig(readonlyConfig.toConfig(),
catalogTable.getSeaTunnelRowType());
- this.writeStrategy =
- WriteStrategyFactory.of(fileSinkConfig.getFileFormat(),
fileSinkConfig);
- this.hadoopFileSystemProxy = new HadoopFileSystemProxy(hadoopConf);
-
this.writeStrategy.setSeaTunnelRowTypeInfo(catalogTable.getSeaTunnelRowType());
- }
-
- @Override
- public void setJobContext(JobContext jobContext) {
- this.jobId = jobContext.getJobId();
- }
-
- @Override
- public SinkWriter<SeaTunnelRow, FileCommitInfo, FileSinkState>
restoreWriter(
- SinkWriter.Context context, List<FileSinkState> states) {
- return new LocalFileSinkWriter(writeStrategy, hadoopConf, context,
jobId, states);
- }
-
- @Override
- public Optional<SinkAggregatedCommitter<FileCommitInfo,
FileAggregatedCommitInfo>>
- createAggregatedCommitter() {
- return Optional.of(new FileSinkAggregatedCommitter(hadoopConf));
- }
-
- @Override
- public SinkWriter<SeaTunnelRow, FileCommitInfo, FileSinkState>
createWriter(
- SinkWriter.Context context) {
- return new LocalFileSinkWriter(writeStrategy, hadoopConf, context,
jobId);
- }
-
- @Override
- public Optional<Serializer<FileCommitInfo>> getCommitInfoSerializer() {
- return Optional.of(new DefaultSerializer<>());
- }
-
- @Override
- public Optional<Serializer<FileAggregatedCommitInfo>>
getAggregatedCommitInfoSerializer() {
- return Optional.of(new DefaultSerializer<>());
- }
-
- @Override
- public Optional<Serializer<FileSinkState>> getWriterStateSerializer() {
- return Optional.of(new DefaultSerializer<>());
+ super(new LocalFileHadoopConf(), readonlyConfig, catalogTable);
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/writter/LocalFileSinkWriter.java
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/writter/LocalFileSinkWriter.java
deleted file mode 100644
index 88de32f820..0000000000
---
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/writter/LocalFileSinkWriter.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.file.local.sink.writter;
-
-import org.apache.seatunnel.api.sink.SinkWriter;
-import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
-import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
-import org.apache.seatunnel.connectors.seatunnel.file.sink.BaseFileSinkWriter;
-import org.apache.seatunnel.connectors.seatunnel.file.sink.state.FileSinkState;
-import
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategy;
-
-import java.util.Collections;
-import java.util.List;
-
-public class LocalFileSinkWriter extends BaseFileSinkWriter
- implements SupportMultiTableSinkWriter<WriteStrategy> {
-
- public LocalFileSinkWriter(
- WriteStrategy writeStrategy,
- HadoopConf hadoopConf,
- Context context,
- String jobId,
- List<FileSinkState> fileSinkStates) {
- // todo: do we need to set writeStrategy as share resource? then how
to deal with the pre
- // fileSinkStates?
- super(writeStrategy, hadoopConf, context, jobId, fileSinkStates);
- }
-
- public LocalFileSinkWriter(
- WriteStrategy writeStrategy,
- HadoopConf hadoopConf,
- SinkWriter.Context context,
- String jobId) {
- this(writeStrategy, hadoopConf, context, jobId,
Collections.emptyList());
- }
-}