This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 7b2f538310 [feature][connector-file-local] add save mode function for 
localfile (#7080)
7b2f538310 is described below

commit 7b2f53831025c5ac1c74c05fec3b88128f705ea6
Author: 老王 <[email protected]>
AuthorDate: Tue Jul 9 10:32:09 2024 +0800

    [feature][connector-file-local] add save mode function for localfile (#7080)
---
 docs/en/connector-v2/sink/LocalFile.md             |  18 +++
 .../file/catalog/AbstractFileCatalog.java          | 131 +++++++++++++++++++++
 .../seatunnel/file/config/BaseSinkConfig.java      |  22 ++++
 .../file/sink/BaseMultipleTableFileSink.java       |  34 +++++-
 .../file/local/catalog/LocalFileCatalog.java       |  29 +++++
 .../local/catalog/LocalFileCatalogFactory.java     |  53 +++++++++
 .../file/local/sink/LocalFileSinkFactory.java      |   2 +
 .../e2e/connector/file/local/LocalFileIT.java      |  87 +++++++++++++-
 .../resources/json/fake_to_local_file_json.conf    |   2 +
 ...conf => fake_to_local_file_json_save_mode.conf} |  32 ++---
 .../resources/batch_last_checkpoint_error.conf     |  14 +--
 11 files changed, 385 insertions(+), 39 deletions(-)

diff --git a/docs/en/connector-v2/sink/LocalFile.md 
b/docs/en/connector-v2/sink/LocalFile.md
index b0d6b45bd1..b0d41419d5 100644
--- a/docs/en/connector-v2/sink/LocalFile.md
+++ b/docs/en/connector-v2/sink/LocalFile.md
@@ -60,6 +60,8 @@ By default, we use 2PC commit to ensure `exactly-once`
 | parquet_avro_write_fixed_as_int96     | array   | no       | -               
                           | Only used when file_format is parquet.             
                                               |
 | enable_header_write                   | boolean | no       | false           
                           | Only used when file_format_type is text,csv.<br/> 
false:don't write header,true:write header.     |
 | encoding                              | string  | no       | "UTF-8"         
                           | Only used when file_format_type is 
json,text,csv,xml.                                             |
+| schema_save_mode                      | string  | no       | 
CREATE_SCHEMA_WHEN_NOT_EXIST               | Existing dir processing method     
                                                               |
+| data_save_mode                        | string  | no       | APPEND_DATA     
                           | Existing data processing method                    
                                               |
 
 ### path [string]
 
@@ -205,6 +207,20 @@ Only used when file_format_type is text,csv.false:don't 
write header,true:write
 Only used when file_format_type is json,text,csv,xml.
 The encoding of the file to write. This param will be parsed by 
`Charset.forName(encoding)`.
 
+### schema_save_mode [string]
+
+Existing dir processing method.
+- RECREATE_SCHEMA: will create when the dir does not exist, delete and 
recreate when the dir is exist
+- CREATE_SCHEMA_WHEN_NOT_EXIST: will create when the dir does not exist, 
skipped when the dir is exist
+- ERROR_WHEN_SCHEMA_NOT_EXIST: error will be reported when the dir does not 
exist
+
+### data_save_mode [string]
+
+Existing data processing method.
+- DROP_DATA: preserve dir and delete data files
+- APPEND_DATA: preserve dir, preserve data files
+- ERROR_WHEN_DATA_EXISTS: when there is data files, an error is reported
+
 ## Example
 
 For orc file format simple config
@@ -278,6 +294,8 @@ LocalFile {
     file_format_type="excel"
     filename_time_format="yyyy.MM.dd"
     is_enable_transaction=true
+    schema_save_mode=RECREATE_SCHEMA
+    data_save_mode=DROP_DATA
   }
 
 ```
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/catalog/AbstractFileCatalog.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/catalog/AbstractFileCatalog.java
new file mode 100644
index 0000000000..f7a1b46a8b
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/catalog/AbstractFileCatalog.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.catalog;
+
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
+import 
org.apache.seatunnel.api.table.catalog.exception.DatabaseAlreadyExistException;
+import 
org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
+import 
org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException;
+import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
+import 
org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopFileSystemProxy;
+
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.hadoop.fs.LocatedFileStatus;
+
+import lombok.SneakyThrows;
+
+import java.util.List;
+
+public abstract class AbstractFileCatalog implements Catalog {
+
+    protected final String catalogName;
+    private final HadoopFileSystemProxy hadoopFileSystemProxy;
+    private final String filePath;
+
+    protected AbstractFileCatalog(
+            HadoopFileSystemProxy hadoopFileSystemProxy, String filePath, 
String catalogName) {
+        this.catalogName = catalogName;
+        this.filePath = filePath;
+        this.hadoopFileSystemProxy = hadoopFileSystemProxy;
+    }
+
+    @Override
+    public void open() throws CatalogException {}
+
+    @Override
+    public void close() throws CatalogException {}
+
+    @Override
+    public String name() {
+        return catalogName;
+    }
+
+    @Override
+    public String getDefaultDatabase() throws CatalogException {
+        return null;
+    }
+
+    @Override
+    public boolean databaseExists(String databaseName) throws CatalogException 
{
+        return false;
+    }
+
+    @Override
+    public List<String> listDatabases() throws CatalogException {
+        return null;
+    }
+
+    @Override
+    public List<String> listTables(String databaseName)
+            throws CatalogException, DatabaseNotExistException {
+        return null;
+    }
+
+    @SneakyThrows
+    @Override
+    public boolean tableExists(TablePath tablePath) throws CatalogException {
+        return hadoopFileSystemProxy.fileExist(filePath);
+    }
+
+    @Override
+    public CatalogTable getTable(TablePath tablePath)
+            throws CatalogException, TableNotExistException {
+        return null;
+    }
+
+    @SneakyThrows
+    @Override
+    public void createTable(TablePath tablePath, CatalogTable table, boolean 
ignoreIfExists)
+            throws TableAlreadyExistException, DatabaseNotExistException, 
CatalogException {
+        hadoopFileSystemProxy.createDir(filePath);
+    }
+
+    @SneakyThrows
+    @Override
+    public void dropTable(TablePath tablePath, boolean ignoreIfNotExists)
+            throws TableNotExistException, CatalogException {
+        hadoopFileSystemProxy.deleteFile(filePath);
+    }
+
+    @Override
+    public void createDatabase(TablePath tablePath, boolean ignoreIfExists)
+            throws DatabaseAlreadyExistException, CatalogException {}
+
+    @Override
+    public void dropDatabase(TablePath tablePath, boolean ignoreIfNotExists)
+            throws DatabaseNotExistException, CatalogException {}
+
+    @SneakyThrows
+    @Override
+    public void truncateTable(TablePath tablePath, boolean ignoreIfNotExists)
+            throws TableNotExistException, CatalogException {
+        hadoopFileSystemProxy.deleteFile(filePath);
+        hadoopFileSystemProxy.createDir(filePath);
+    }
+
+    @SneakyThrows
+    @Override
+    public boolean isExistsData(TablePath tablePath) {
+        final List<LocatedFileStatus> locatedFileStatuses =
+                hadoopFileSystemProxy.listFile(filePath);
+        return CollectionUtils.isNotEmpty(locatedFileStatuses);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java
index 48a2d2b436..0759baf9e4 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java
@@ -19,6 +19,8 @@ package org.apache.seatunnel.connectors.seatunnel.file.config;
 
 import org.apache.seatunnel.api.configuration.Option;
 import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.api.sink.DataSaveMode;
+import org.apache.seatunnel.api.sink.SchemaSaveMode;
 import org.apache.seatunnel.common.utils.DateTimeUtils;
 import org.apache.seatunnel.common.utils.DateUtils;
 import org.apache.seatunnel.common.utils.TimeUtils;
@@ -28,6 +30,10 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
+import static org.apache.seatunnel.api.sink.DataSaveMode.APPEND_DATA;
+import static org.apache.seatunnel.api.sink.DataSaveMode.DROP_DATA;
+import static 
org.apache.seatunnel.api.sink.DataSaveMode.ERROR_WHEN_DATA_EXISTS;
+
 public class BaseSinkConfig {
     public static final String SEATUNNEL = "seatunnel";
     public static final String NON_PARTITION = "NON_PARTITION";
@@ -293,4 +299,20 @@ public class BaseSinkConfig {
                     .defaultValue(Collections.emptyList())
                     .withDescription(
                             "Support writing Parquet INT96 from a 12-byte 
field, only valid for parquet files.");
+
+    public static final Option<SchemaSaveMode> SCHEMA_SAVE_MODE =
+            Options.key("schema_save_mode")
+                    .enumType(SchemaSaveMode.class)
+                    .defaultValue(SchemaSaveMode.CREATE_SCHEMA_WHEN_NOT_EXIST)
+                    .withDescription(
+                            "Before the synchronization task begins, process 
the existing path");
+
+    public static final Option<DataSaveMode> DATA_SAVE_MODE =
+            Options.key("data_save_mode")
+                    .singleChoice(
+                            DataSaveMode.class,
+                            Arrays.asList(DROP_DATA, APPEND_DATA, 
ERROR_WHEN_DATA_EXISTS))
+                    .defaultValue(APPEND_DATA)
+                    .withDescription(
+                            "Before the synchronization task begins, different 
processing of data files that already exist in the directory");
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseMultipleTableFileSink.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseMultipleTableFileSink.java
index 6beb62d7e8..1ae4b84029 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseMultipleTableFileSink.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseMultipleTableFileSink.java
@@ -21,12 +21,20 @@ import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.serialization.DefaultSerializer;
 import org.apache.seatunnel.api.serialization.Serializer;
+import org.apache.seatunnel.api.sink.DataSaveMode;
+import org.apache.seatunnel.api.sink.DefaultSaveModeHandler;
+import org.apache.seatunnel.api.sink.SaveModeHandler;
+import org.apache.seatunnel.api.sink.SchemaSaveMode;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
 import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.sink.SupportMultiTableSink;
+import org.apache.seatunnel.api.sink.SupportSaveMode;
+import org.apache.seatunnel.api.table.catalog.Catalog;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.factory.CatalogFactory;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig;
 import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
 import 
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileAggregatedCommitInfo;
 import 
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileCommitInfo;
@@ -39,20 +47,25 @@ import 
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategyF
 import java.util.List;
 import java.util.Optional;
 
+import static 
org.apache.seatunnel.api.table.factory.FactoryUtil.discoverFactory;
+
 public abstract class BaseMultipleTableFileSink
         implements SeaTunnelSink<
                         SeaTunnelRow, FileSinkState, FileCommitInfo, 
FileAggregatedCommitInfo>,
-                SupportMultiTableSink {
+                SupportMultiTableSink,
+                SupportSaveMode {
 
     private final HadoopConf hadoopConf;
     private final CatalogTable catalogTable;
     private final FileSinkConfig fileSinkConfig;
     private String jobId;
+    private final ReadonlyConfig readonlyConfig;
 
     public abstract String getPluginName();
 
     public BaseMultipleTableFileSink(
             HadoopConf hadoopConf, ReadonlyConfig readonlyConfig, CatalogTable 
catalogTable) {
+        this.readonlyConfig = readonlyConfig;
         this.hadoopConf = hadoopConf;
         this.fileSinkConfig =
                 new FileSinkConfig(readonlyConfig.toConfig(), 
catalogTable.getSeaTunnelRowType());
@@ -103,4 +116,23 @@ public abstract class BaseMultipleTableFileSink
         
writeStrategy.setSeaTunnelRowTypeInfo(catalogTable.getSeaTunnelRowType());
         return writeStrategy;
     }
+
+    @Override
+    public Optional<SaveModeHandler> getSaveModeHandler() {
+
+        CatalogFactory catalogFactory =
+                discoverFactory(
+                        Thread.currentThread().getContextClassLoader(),
+                        CatalogFactory.class,
+                        getPluginName());
+        if (catalogFactory == null) {
+            return Optional.empty();
+        }
+        final Catalog catalog = catalogFactory.createCatalog(getPluginName(), 
readonlyConfig);
+        SchemaSaveMode schemaSaveMode = 
readonlyConfig.get(BaseSinkConfig.SCHEMA_SAVE_MODE);
+        DataSaveMode dataSaveMode = 
readonlyConfig.get(BaseSinkConfig.DATA_SAVE_MODE);
+        return Optional.of(
+                new DefaultSaveModeHandler(
+                        schemaSaveMode, dataSaveMode, catalog, catalogTable, 
null));
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/catalog/LocalFileCatalog.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/catalog/LocalFileCatalog.java
new file mode 100644
index 0000000000..ab784f56ea
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/catalog/LocalFileCatalog.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.local.catalog;
+
+import 
org.apache.seatunnel.connectors.seatunnel.file.catalog.AbstractFileCatalog;
+import 
org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopFileSystemProxy;
+
+public class LocalFileCatalog extends AbstractFileCatalog {
+
+    public LocalFileCatalog(
+            HadoopFileSystemProxy hadoopFileSystemProxy, String filePath, 
String catalogName) {
+        super(hadoopFileSystemProxy, filePath, catalogName);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/catalog/LocalFileCatalogFactory.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/catalog/LocalFileCatalogFactory.java
new file mode 100644
index 0000000000..ac1c6b37ba
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/catalog/LocalFileCatalogFactory.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.local.catalog;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.factory.CatalogFactory;
+import org.apache.seatunnel.api.table.factory.Factory;
+import 
org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
+import 
org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopFileSystemProxy;
+import 
org.apache.seatunnel.connectors.seatunnel.file.local.config.LocalFileHadoopConf;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class LocalFileCatalogFactory implements CatalogFactory {
+    @Override
+    public Catalog createCatalog(String catalogName, ReadonlyConfig options) {
+        HadoopFileSystemProxy fileSystemUtils =
+                new HadoopFileSystemProxy(new LocalFileHadoopConf());
+        return new LocalFileCatalog(
+                fileSystemUtils,
+                options.get(BaseSourceConfigOptions.FILE_PATH),
+                factoryIdentifier());
+    }
+
+    @Override
+    public String factoryIdentifier() {
+        return FileSystemType.LOCAL.getFileSystemPluginName();
+    }
+
+    @Override
+    public OptionRule optionRule() {
+        return OptionRule.builder().build();
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSinkFactory.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSinkFactory.java
index 01d5fb1060..fc699b4296 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSinkFactory.java
@@ -46,6 +46,8 @@ public class LocalFileSinkFactory extends 
BaseMultipleTableFileSinkFactory {
         return OptionRule.builder()
                 .required(BaseSinkConfig.FILE_PATH)
                 .optional(BaseSinkConfig.FILE_FORMAT_TYPE)
+                .optional(BaseSinkConfig.SCHEMA_SAVE_MODE)
+                .optional(BaseSinkConfig.DATA_SAVE_MODE)
                 .conditional(
                         BaseSinkConfig.FILE_FORMAT_TYPE,
                         FileFormat.TEXT,
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java
index 51494b5059..e278159efa 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java
@@ -17,6 +17,11 @@
 
 package org.apache.seatunnel.e2e.connector.file.local;
 
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
+import 
org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopFileSystemProxy;
+import 
org.apache.seatunnel.connectors.seatunnel.file.local.catalog.LocalFileCatalog;
+import 
org.apache.seatunnel.connectors.seatunnel.file.local.config.LocalFileHadoopConf;
 import org.apache.seatunnel.e2e.common.TestSuiteBase;
 import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
 import org.apache.seatunnel.e2e.common.container.EngineType;
@@ -27,27 +32,43 @@ import 
org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
 import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
 import org.apache.seatunnel.e2e.common.util.ContainerUtil;
 
+import org.apache.commons.lang3.StringUtils;
+
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.GenericContainer;
+import 
org.testcontainers.shaded.com.github.dockerjava.core.command.ExecStartResultCallback;
 
+import com.github.dockerjava.api.command.ExecCreateCmdResponse;
 import io.airlift.compress.lzo.LzopCodec;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
 
+import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.IOException;
 import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
 
 @DisabledOnContainer(
         value = {TestContainerId.SPARK_2_4},
         type = {},
         disabledReason = "The apache-compress version is not compatible with 
apache-poi")
+@Slf4j
 public class LocalFileIT extends TestSuiteBase {
 
+    private GenericContainer<?> baseContainer;
+
     /** Copy data files to container */
     @TestContainerExtension
     private final ContainerExtendedFactory extendedFactory =
             container -> {
+                this.baseContainer = container;
                 ContainerUtil.copyFileIntoContainers(
                         "/json/e2e.json",
                         
"/seatunnel/read/json/name=tyrantlucifer/hobby=coding/e2e.json",
@@ -121,7 +142,6 @@ public class LocalFileIT extends TestSuiteBase {
     public void testLocalFileReadAndWrite(TestContainer container)
             throws IOException, InterruptedException {
         TestHelper helper = new TestHelper(container);
-
         helper.execute("/excel/fake_to_local_excel.conf");
         helper.execute("/excel/local_excel_to_assert.conf");
         helper.execute("/excel/local_excel_projection_to_assert.conf");
@@ -181,6 +201,71 @@ public class LocalFileIT extends TestSuiteBase {
         }
     }
 
+    @TestTemplate
+    @DisabledOnContainer(
+            value = {TestContainerId.SPARK_2_4},
+            type = {EngineType.FLINK},
+            disabledReason =
+                    "Fink test is multi-node, LocalFile connector will use 
different containers for obtaining files")
+    public void testLocalFileReadAndWriteWithSaveMode(TestContainer container)
+            throws IOException, InterruptedException {
+        TestHelper helper = new TestHelper(container);
+        // test save_mode
+        String path = "/tmp/seatunnel/localfile/json/fake";
+        Assertions.assertEquals(getFileListFromContainer(path).size(), 0);
+        helper.execute("/json/fake_to_local_file_json_save_mode.conf");
+        Assertions.assertEquals(getFileListFromContainer(path).size(), 1);
+        helper.execute("/json/fake_to_local_file_json_save_mode.conf");
+        Assertions.assertEquals(getFileListFromContainer(path).size(), 1);
+    }
+
+    @SneakyThrows
+    private List<String> getFileListFromContainer(String path) {
+        String command = "ls -1 " + path;
+        ExecCreateCmdResponse execCreateCmdResponse =
+                dockerClient
+                        .execCreateCmd(baseContainer.getContainerId())
+                        .withCmd("sh", "-c", command)
+                        .withAttachStdout(true)
+                        .withAttachStderr(true)
+                        .exec();
+
+        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+        dockerClient
+                .execStartCmd(execCreateCmdResponse.getId())
+                .exec(new ExecStartResultCallback(outputStream, System.err))
+                .awaitCompletion();
+
+        String output = new String(outputStream.toByteArray(), 
StandardCharsets.UTF_8).trim();
+        List<String> fileList = new ArrayList<>();
+        log.info("container path file list is :{}", output);
+        String[] files = output.split("\n");
+        for (String file : files) {
+            if (StringUtils.isNotEmpty(file)) {
+                log.info("container path file name is :{}", file);
+                fileList.add(file);
+            }
+        }
+        return fileList;
+    }
+
+    @TestTemplate
+    public void testLocalFileCatalog(TestContainer container)
+            throws IOException, InterruptedException {
+        final LocalFileCatalog localFileCatalog =
+                new LocalFileCatalog(
+                        new HadoopFileSystemProxy(new LocalFileHadoopConf()),
+                        "/tmp/seatunnel/json/test1",
+                        FileSystemType.LOCAL.getFileSystemPluginName());
+        final TablePath tablePath = TablePath.DEFAULT;
+        Assertions.assertFalse(localFileCatalog.tableExists(tablePath));
+        localFileCatalog.createTable(null, null, false);
+        Assertions.assertTrue(localFileCatalog.tableExists(tablePath));
+        Assertions.assertFalse(localFileCatalog.isExistsData(tablePath));
+        localFileCatalog.dropTable(tablePath, false);
+        Assertions.assertFalse(localFileCatalog.tableExists(tablePath));
+    }
+
     private Path convertToLzoFile(File file) throws IOException {
         LzopCodec lzo = new LzopCodec();
         Path path = Paths.get(file.getAbsolutePath() + ".lzo");
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/fake_to_local_file_json.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/fake_to_local_file_json.conf
index c5ea2a734e..458bb05258 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/fake_to_local_file_json.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/fake_to_local_file_json.conf
@@ -77,5 +77,7 @@ sink {
     file_format_type = "json"
     filename_time_format = "yyyy.MM.dd"
     is_enable_transaction = true
+    "schema_save_mode"="RECREATE_SCHEMA"
+    "data_save_mode"="DROP_DATA"
   }
 }
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/fake_to_local_file_json.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/fake_to_local_file_json_save_mode.conf
similarity index 68%
copy from 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/fake_to_local_file_json.conf
copy to 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/fake_to_local_file_json_save_mode.conf
index c5ea2a734e..087bf3f29b 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/fake_to_local_file_json.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/fake_to_local_file_json_save_mode.conf
@@ -31,45 +31,25 @@ source {
   FakeSource {
     schema = {
       fields {
-        c_map = "map<string, string>"
-        c_array = "array<int>"
         c_string = string
-        c_boolean = boolean
         c_tinyint = tinyint
         c_smallint = smallint
         c_int = int
         c_bigint = bigint
         c_float = float
-        c_double = double
-        c_bytes = bytes
-        c_date = date
-        c_decimal = "decimal(38, 18)"
-        c_timestamp = timestamp
-        c_row = {
-          c_map = "map<string, string>"
-          c_array = "array<int>"
-          c_string = string
-          c_boolean = boolean
-          c_tinyint = tinyint
-          c_smallint = smallint
-          c_int = int
-          c_bigint = bigint
-          c_float = float
-          c_double = double
-          c_bytes = bytes
-          c_date = date
-          c_decimal = "decimal(38, 18)"
-          c_timestamp = timestamp
-        }
       }
     }
     result_table_name = "fake"
+    rows = [
+       {fields = ["1",1,1,123,42543,1.2], kind = INSERT}
+       {fields = ["2",1,1,123,42543,1.2], kind = INSERT}
+    ]
   }
 }
 
 sink {
   LocalFile {
-    path = "/tmp/seatunnel/json"
+    path = "/tmp/seatunnel/localfile/json/${table_name}"
     row_delimiter = "\n"
     partition_dir_expression = "${k0}=${v0}"
     is_partition_field_write_in_file = true
@@ -77,5 +57,7 @@ sink {
     file_format_type = "json"
     filename_time_format = "yyyy.MM.dd"
     is_enable_transaction = true
+    "schema_save_mode"="RECREATE_SCHEMA"
+    "data_save_mode"="DROP_DATA"
   }
 }
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_last_checkpoint_error.conf
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_last_checkpoint_error.conf
index 84356210ea..910484f65b 100644
--- 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_last_checkpoint_error.conf
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_last_checkpoint_error.conf
@@ -75,17 +75,7 @@ transform {
 }
 
 sink {
-  LocalFile {
-    path = "/hive/warehouse/test1"
-    field_delimiter = "\t"
-    row_delimiter = "\n"
-    partition_by = ["c_string"]
-    partition_dir_expression = "${k0}=${v0}"
-    is_partition_field_write_in_file = true
-    file_name_expression = "${transactionId}_${now}"
-    file_format_type = "text"
-    filename_time_format = "yyyy.MM.dd"
-    is_enable_transaction = true
-    save_mode = "error"
+  InMemory {
+    throw_exception = true
   }
 }

Reply via email to