This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 085e0e5fc3 Add multiple table file sink to base (#6049)
085e0e5fc3 is described below

commit 085e0e5fc354e9dcbed2096b319d538e6671ec47
Author: Eric <[email protected]>
AuthorDate: Fri Dec 22 16:12:15 2023 +0800

    Add multiple table file sink to base (#6049)
---
 .../seatunnel/file/sink/BaseFileSinkWriter.java    |  6 +-
 .../file/sink/BaseMultipleTableFileSink.java}      | 23 +++----
 .../seatunnel/file/local/sink/LocalFileSink.java   | 79 +---------------------
 .../local/sink/writter/LocalFileSinkWriter.java    | 51 --------------
 4 files changed, 17 insertions(+), 142 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSinkWriter.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSinkWriter.java
index 27a36e245b..017b6d80c1 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSinkWriter.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.connectors.seatunnel.file.sink;
 
 import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
 import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
@@ -40,7 +41,10 @@ import java.util.Optional;
 import java.util.UUID;
 import java.util.stream.Collectors;
 
-public class BaseFileSinkWriter implements SinkWriter<SeaTunnelRow, 
FileCommitInfo, FileSinkState> {
+public class BaseFileSinkWriter
+        implements SinkWriter<SeaTunnelRow, FileCommitInfo, FileSinkState>,
+                SupportMultiTableSinkWriter<WriteStrategy> {
+
     protected final WriteStrategy writeStrategy;
 
     public BaseFileSinkWriter(
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSink.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseMultipleTableFileSink.java
similarity index 84%
copy from 
seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSink.java
copy to 
seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseMultipleTableFileSink.java
index 308a989008..fee66ad467 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSink.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseMultipleTableFileSink.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.seatunnel.file.local.sink;
+package org.apache.seatunnel.connectors.seatunnel.file.sink;
 
 import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
@@ -27,11 +27,8 @@ import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.sink.SupportMultiTableSink;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
 import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
 import 
org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopFileSystemProxy;
-import 
org.apache.seatunnel.connectors.seatunnel.file.local.config.LocalFileHadoopConf;
-import 
org.apache.seatunnel.connectors.seatunnel.file.local.sink.writter.LocalFileSinkWriter;
 import 
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileAggregatedCommitInfo;
 import 
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileCommitInfo;
 import 
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileSinkAggregatedCommitter;
@@ -43,7 +40,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategyF
 import java.util.List;
 import java.util.Optional;
 
-public class LocalFileSink
+public abstract class BaseMultipleTableFileSink
         implements SeaTunnelSink<
                         SeaTunnelRow, FileSinkState, FileCommitInfo, 
FileAggregatedCommitInfo>,
                 SupportMultiTableSink {
@@ -54,8 +51,11 @@ public class LocalFileSink
     private final WriteStrategy writeStrategy;
     private String jobId;
 
-    public LocalFileSink(ReadonlyConfig readonlyConfig, CatalogTable 
catalogTable) {
-        this.hadoopConf = new LocalFileHadoopConf();
+    public abstract String getPluginName();
+
+    public BaseMultipleTableFileSink(
+            HadoopConf hadoopConf, ReadonlyConfig readonlyConfig, CatalogTable 
catalogTable) {
+        this.hadoopConf = hadoopConf;
         this.fileSinkConfig =
                 new FileSinkConfig(readonlyConfig.toConfig(), 
catalogTable.getSeaTunnelRowType());
         this.writeStrategy =
@@ -72,7 +72,7 @@ public class LocalFileSink
     @Override
     public SinkWriter<SeaTunnelRow, FileCommitInfo, FileSinkState> 
restoreWriter(
             SinkWriter.Context context, List<FileSinkState> states) {
-        return new LocalFileSinkWriter(writeStrategy, hadoopConf, context, 
jobId, states);
+        return new BaseFileSinkWriter(writeStrategy, hadoopConf, context, 
jobId, states);
     }
 
     @Override
@@ -84,7 +84,7 @@ public class LocalFileSink
     @Override
     public SinkWriter<SeaTunnelRow, FileCommitInfo, FileSinkState> 
createWriter(
             SinkWriter.Context context) {
-        return new LocalFileSinkWriter(writeStrategy, hadoopConf, context, 
jobId);
+        return new BaseFileSinkWriter(writeStrategy, hadoopConf, context, 
jobId);
     }
 
     @Override
@@ -101,9 +101,4 @@ public class LocalFileSink
     public Optional<Serializer<FileSinkState>> getWriterStateSerializer() {
         return Optional.of(new DefaultSerializer<>());
     }
-
-    @Override
-    public String getPluginName() {
-        return FileSystemType.LOCAL.getFileSystemPluginName();
-    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSink.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSink.java
index 308a989008..94741941bf 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSink.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSink.java
@@ -17,89 +17,16 @@
 
 package org.apache.seatunnel.connectors.seatunnel.file.local.sink;
 
-import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
-import org.apache.seatunnel.api.serialization.DefaultSerializer;
-import org.apache.seatunnel.api.serialization.Serializer;
-import org.apache.seatunnel.api.sink.SeaTunnelSink;
-import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
-import org.apache.seatunnel.api.sink.SinkWriter;
-import org.apache.seatunnel.api.sink.SupportMultiTableSink;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
-import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
-import 
org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopFileSystemProxy;
 import 
org.apache.seatunnel.connectors.seatunnel.file.local.config.LocalFileHadoopConf;
-import 
org.apache.seatunnel.connectors.seatunnel.file.local.sink.writter.LocalFileSinkWriter;
-import 
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileAggregatedCommitInfo;
-import 
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileCommitInfo;
-import 
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileSinkAggregatedCommitter;
-import 
org.apache.seatunnel.connectors.seatunnel.file.sink.config.FileSinkConfig;
-import org.apache.seatunnel.connectors.seatunnel.file.sink.state.FileSinkState;
-import 
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategy;
-import 
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategyFactory;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.BaseMultipleTableFileSink;
 
-import java.util.List;
-import java.util.Optional;
-
-public class LocalFileSink
-        implements SeaTunnelSink<
-                        SeaTunnelRow, FileSinkState, FileCommitInfo, 
FileAggregatedCommitInfo>,
-                SupportMultiTableSink {
-
-    private final HadoopConf hadoopConf;
-    private final HadoopFileSystemProxy hadoopFileSystemProxy;
-    private final FileSinkConfig fileSinkConfig;
-    private final WriteStrategy writeStrategy;
-    private String jobId;
+public class LocalFileSink extends BaseMultipleTableFileSink {
 
     public LocalFileSink(ReadonlyConfig readonlyConfig, CatalogTable 
catalogTable) {
-        this.hadoopConf = new LocalFileHadoopConf();
-        this.fileSinkConfig =
-                new FileSinkConfig(readonlyConfig.toConfig(), 
catalogTable.getSeaTunnelRowType());
-        this.writeStrategy =
-                WriteStrategyFactory.of(fileSinkConfig.getFileFormat(), 
fileSinkConfig);
-        this.hadoopFileSystemProxy = new HadoopFileSystemProxy(hadoopConf);
-        
this.writeStrategy.setSeaTunnelRowTypeInfo(catalogTable.getSeaTunnelRowType());
-    }
-
-    @Override
-    public void setJobContext(JobContext jobContext) {
-        this.jobId = jobContext.getJobId();
-    }
-
-    @Override
-    public SinkWriter<SeaTunnelRow, FileCommitInfo, FileSinkState> 
restoreWriter(
-            SinkWriter.Context context, List<FileSinkState> states) {
-        return new LocalFileSinkWriter(writeStrategy, hadoopConf, context, 
jobId, states);
-    }
-
-    @Override
-    public Optional<SinkAggregatedCommitter<FileCommitInfo, 
FileAggregatedCommitInfo>>
-            createAggregatedCommitter() {
-        return Optional.of(new FileSinkAggregatedCommitter(hadoopConf));
-    }
-
-    @Override
-    public SinkWriter<SeaTunnelRow, FileCommitInfo, FileSinkState> 
createWriter(
-            SinkWriter.Context context) {
-        return new LocalFileSinkWriter(writeStrategy, hadoopConf, context, 
jobId);
-    }
-
-    @Override
-    public Optional<Serializer<FileCommitInfo>> getCommitInfoSerializer() {
-        return Optional.of(new DefaultSerializer<>());
-    }
-
-    @Override
-    public Optional<Serializer<FileAggregatedCommitInfo>> 
getAggregatedCommitInfoSerializer() {
-        return Optional.of(new DefaultSerializer<>());
-    }
-
-    @Override
-    public Optional<Serializer<FileSinkState>> getWriterStateSerializer() {
-        return Optional.of(new DefaultSerializer<>());
+        super(new LocalFileHadoopConf(), readonlyConfig, catalogTable);
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/writter/LocalFileSinkWriter.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/writter/LocalFileSinkWriter.java
deleted file mode 100644
index 88de32f820..0000000000
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/writter/LocalFileSinkWriter.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.file.local.sink.writter;
-
-import org.apache.seatunnel.api.sink.SinkWriter;
-import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
-import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
-import org.apache.seatunnel.connectors.seatunnel.file.sink.BaseFileSinkWriter;
-import org.apache.seatunnel.connectors.seatunnel.file.sink.state.FileSinkState;
-import 
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategy;
-
-import java.util.Collections;
-import java.util.List;
-
-public class LocalFileSinkWriter extends BaseFileSinkWriter
-        implements SupportMultiTableSinkWriter<WriteStrategy> {
-
-    public LocalFileSinkWriter(
-            WriteStrategy writeStrategy,
-            HadoopConf hadoopConf,
-            Context context,
-            String jobId,
-            List<FileSinkState> fileSinkStates) {
-        // todo: do we need to set writeStrategy as share resource? then how 
to deal with the pre
-        // fileSinkStates?
-        super(writeStrategy, hadoopConf, context, jobId, fileSinkStates);
-    }
-
-    public LocalFileSinkWriter(
-            WriteStrategy writeStrategy,
-            HadoopConf hadoopConf,
-            SinkWriter.Context context,
-            String jobId) {
-        this(writeStrategy, hadoopConf, context, jobId, 
Collections.emptyList());
-    }
-}

Reply via email to