This is an automated email from the ASF dual-hosted git repository.
wuchunfu pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 80cc9fa6ff [improve] update S3File connector config option (#8615)
80cc9fa6ff is described below
commit 80cc9fa6ffdd7286e4ffff64154a1bd446c10287
Author: litiliu <[email protected]>
AuthorDate: Sat Feb 8 13:46:25 2025 +0800
[improve] update S3File connector config option (#8615)
Co-authored-by: litiliu <[email protected]>
Co-authored-by: Jia Fan <[email protected]>
---
.../seatunnel/file/s3/config/S3ConfigOptions.java | 23 -----------
.../file/s3/config/S3FileSinkOptions.java | 48 ++++++++++++++++++++++
.../file/s3/config/S3FileSourceOptions.java | 20 +++++++++
.../seatunnel/file/s3/sink/S3FileSink.java | 10 ++---
.../seatunnel/file/s3/sink/S3FileSinkFactory.java | 24 +++++------
.../file/s3/source/S3FileSourceFactory.java | 22 +++++-----
6 files changed, 96 insertions(+), 51 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/config/S3ConfigOptions.java
b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/config/S3ConfigOptions.java
index a7231b6b2b..476f9f12ca 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/config/S3ConfigOptions.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/config/S3ConfigOptions.java
@@ -19,17 +19,10 @@ package
org.apache.seatunnel.connectors.seatunnel.file.s3.config;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
-import org.apache.seatunnel.api.sink.DataSaveMode;
-import org.apache.seatunnel.api.sink.SchemaSaveMode;
import
org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions;
-import java.util.Arrays;
import java.util.Map;
-import static org.apache.seatunnel.api.sink.DataSaveMode.APPEND_DATA;
-import static org.apache.seatunnel.api.sink.DataSaveMode.DROP_DATA;
-import static
org.apache.seatunnel.api.sink.DataSaveMode.ERROR_WHEN_DATA_EXISTS;
-
public class S3ConfigOptions extends BaseSourceConfigOptions {
public static final Option<String> S3_ACCESS_KEY =
Options.key("access_key")
@@ -55,22 +48,6 @@ public class S3ConfigOptions extends BaseSourceConfigOptions
{
.defaultValue(S3aAwsCredentialsProvider.InstanceProfileCredentialsProvider)
.withDescription("s3a aws credentials provider");
- public static final Option<SchemaSaveMode> SCHEMA_SAVE_MODE =
- Options.key("schema_save_mode")
- .enumType(SchemaSaveMode.class)
- .defaultValue(SchemaSaveMode.CREATE_SCHEMA_WHEN_NOT_EXIST)
- .withDescription(
- "Before the synchronization task begins, process
the existing path");
-
- public static final Option<DataSaveMode> DATA_SAVE_MODE =
- Options.key("data_save_mode")
- .singleChoice(
- DataSaveMode.class,
- Arrays.asList(DROP_DATA, APPEND_DATA,
ERROR_WHEN_DATA_EXISTS))
- .defaultValue(APPEND_DATA)
- .withDescription(
- "Before the synchronization task begins, different
processing of data files that already exist in the directory");
-
/**
* The current key for that config option. if you need to add a new
option, you can add it here
* and refer to this:
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/config/S3FileSinkOptions.java
b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/config/S3FileSinkOptions.java
new file mode 100644
index 0000000000..a8ac9e1a12
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/config/S3FileSinkOptions.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.s3.config;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.api.sink.DataSaveMode;
+import org.apache.seatunnel.api.sink.SchemaSaveMode;
+
+import java.util.Arrays;
+
+import static org.apache.seatunnel.api.sink.DataSaveMode.APPEND_DATA;
+import static org.apache.seatunnel.api.sink.DataSaveMode.DROP_DATA;
+import static
org.apache.seatunnel.api.sink.DataSaveMode.ERROR_WHEN_DATA_EXISTS;
+
+public class S3FileSinkOptions extends S3ConfigOptions {
+
+ public static final Option<SchemaSaveMode> SCHEMA_SAVE_MODE =
+ Options.key("schema_save_mode")
+ .enumType(SchemaSaveMode.class)
+ .defaultValue(SchemaSaveMode.CREATE_SCHEMA_WHEN_NOT_EXIST)
+ .withDescription(
+ "Before the synchronization task begins, process
the existing path");
+
+ public static final Option<DataSaveMode> DATA_SAVE_MODE =
+ Options.key("data_save_mode")
+ .singleChoice(
+ DataSaveMode.class,
+ Arrays.asList(DROP_DATA, APPEND_DATA,
ERROR_WHEN_DATA_EXISTS))
+ .defaultValue(APPEND_DATA)
+ .withDescription(
+ "Before the synchronization task begins, different
processing of data files that already exist in the directory");
+}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/config/S3FileSourceOptions.java
b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/config/S3FileSourceOptions.java
new file mode 100644
index 0000000000..801a89ec0e
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/config/S3FileSourceOptions.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.s3.config;
+
+public class S3FileSourceOptions extends S3ConfigOptions {}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/sink/S3FileSink.java
b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/sink/S3FileSink.java
index b0b6d9fbbb..e62e5ea593 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/sink/S3FileSink.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/sink/S3FileSink.java
@@ -34,7 +34,7 @@ import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
-import
org.apache.seatunnel.connectors.seatunnel.file.s3.config.S3ConfigOptions;
+import
org.apache.seatunnel.connectors.seatunnel.file.s3.config.S3FileSinkOptions;
import org.apache.seatunnel.connectors.seatunnel.file.s3.config.S3HadoopConf;
import
org.apache.seatunnel.connectors.seatunnel.file.sink.BaseMultipleTableFileSink;
@@ -62,8 +62,8 @@ public class S3FileSink extends BaseMultipleTableFileSink
implements SupportSave
CheckResult result =
CheckConfigUtil.checkAllExists(
pluginConfig,
- S3ConfigOptions.FILE_PATH.key(),
- S3ConfigOptions.S3_BUCKET.key());
+ S3FileSinkOptions.FILE_PATH.key(),
+ S3FileSinkOptions.S3_BUCKET.key());
if (!result.isSuccess()) {
throw new FileConnectorException(
SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
@@ -83,8 +83,8 @@ public class S3FileSink extends BaseMultipleTableFileSink
implements SupportSave
return Optional.empty();
}
final Catalog catalog = catalogFactory.createCatalog(S3,
readonlyConfig);
- SchemaSaveMode schemaSaveMode =
readonlyConfig.get(S3ConfigOptions.SCHEMA_SAVE_MODE);
- DataSaveMode dataSaveMode =
readonlyConfig.get(S3ConfigOptions.DATA_SAVE_MODE);
+ SchemaSaveMode schemaSaveMode =
readonlyConfig.get(S3FileSinkOptions.SCHEMA_SAVE_MODE);
+ DataSaveMode dataSaveMode =
readonlyConfig.get(S3FileSinkOptions.DATA_SAVE_MODE);
return Optional.of(
new DefaultSaveModeHandler(
schemaSaveMode, dataSaveMode, catalog, catalogTable,
null));
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/sink/S3FileSinkFactory.java
b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/sink/S3FileSinkFactory.java
index 492605b874..85b58c1745 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/sink/S3FileSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/sink/S3FileSinkFactory.java
@@ -28,7 +28,7 @@ import
org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
-import
org.apache.seatunnel.connectors.seatunnel.file.s3.config.S3ConfigOptions;
+import
org.apache.seatunnel.connectors.seatunnel.file.s3.config.S3FileSinkOptions;
import com.google.auto.service.AutoService;
@@ -42,18 +42,18 @@ public class S3FileSinkFactory implements TableSinkFactory {
@Override
public OptionRule optionRule() {
return OptionRule.builder()
- .required(S3ConfigOptions.FILE_PATH)
- .required(S3ConfigOptions.S3_BUCKET)
- .required(S3ConfigOptions.FS_S3A_ENDPOINT)
- .required(S3ConfigOptions.S3A_AWS_CREDENTIALS_PROVIDER)
- .required(S3ConfigOptions.SCHEMA_SAVE_MODE)
- .required(S3ConfigOptions.DATA_SAVE_MODE)
+ .required(S3FileSinkOptions.FILE_PATH)
+ .required(S3FileSinkOptions.S3_BUCKET)
+ .required(S3FileSinkOptions.FS_S3A_ENDPOINT)
+ .required(S3FileSinkOptions.S3A_AWS_CREDENTIALS_PROVIDER)
+ .required(S3FileSinkOptions.SCHEMA_SAVE_MODE)
+ .required(S3FileSinkOptions.DATA_SAVE_MODE)
.conditional(
- S3ConfigOptions.S3A_AWS_CREDENTIALS_PROVIDER,
-
S3ConfigOptions.S3aAwsCredentialsProvider.SimpleAWSCredentialsProvider,
- S3ConfigOptions.S3_ACCESS_KEY,
- S3ConfigOptions.S3_SECRET_KEY)
- .optional(S3ConfigOptions.S3_PROPERTIES)
+ S3FileSinkOptions.S3A_AWS_CREDENTIALS_PROVIDER,
+
S3FileSinkOptions.S3aAwsCredentialsProvider.SimpleAWSCredentialsProvider,
+ S3FileSinkOptions.S3_ACCESS_KEY,
+ S3FileSinkOptions.S3_SECRET_KEY)
+ .optional(S3FileSinkOptions.S3_PROPERTIES)
.optional(BaseSinkConfig.FILE_FORMAT_TYPE)
.conditional(
BaseSinkConfig.FILE_FORMAT_TYPE,
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSourceFactory.java
b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSourceFactory.java
index a3376e745e..5ec1684894 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSourceFactory.java
@@ -28,7 +28,7 @@ import
org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
import
org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
-import
org.apache.seatunnel.connectors.seatunnel.file.s3.config.S3ConfigOptions;
+import
org.apache.seatunnel.connectors.seatunnel.file.s3.config.S3FileSourceOptions;
import com.google.auto.service.AutoService;
@@ -51,17 +51,17 @@ public class S3FileSourceFactory implements
TableSourceFactory {
@Override
public OptionRule optionRule() {
return OptionRule.builder()
- .required(S3ConfigOptions.FILE_PATH)
- .required(S3ConfigOptions.FILE_FORMAT_TYPE)
- .required(S3ConfigOptions.S3_BUCKET)
- .required(S3ConfigOptions.FS_S3A_ENDPOINT)
- .required(S3ConfigOptions.S3A_AWS_CREDENTIALS_PROVIDER)
+ .required(S3FileSourceOptions.FILE_PATH)
+ .required(S3FileSourceOptions.FILE_FORMAT_TYPE)
+ .required(S3FileSourceOptions.S3_BUCKET)
+ .required(S3FileSourceOptions.FS_S3A_ENDPOINT)
+ .required(S3FileSourceOptions.S3A_AWS_CREDENTIALS_PROVIDER)
.conditional(
- S3ConfigOptions.S3A_AWS_CREDENTIALS_PROVIDER,
-
S3ConfigOptions.S3aAwsCredentialsProvider.SimpleAWSCredentialsProvider,
- S3ConfigOptions.S3_ACCESS_KEY,
- S3ConfigOptions.S3_SECRET_KEY)
- .optional(S3ConfigOptions.S3_PROPERTIES)
+ S3FileSourceOptions.S3A_AWS_CREDENTIALS_PROVIDER,
+
S3FileSourceOptions.S3aAwsCredentialsProvider.SimpleAWSCredentialsProvider,
+ S3FileSourceOptions.S3_ACCESS_KEY,
+ S3FileSourceOptions.S3_SECRET_KEY)
+ .optional(S3FileSourceOptions.S3_PROPERTIES)
.conditional(
BaseSourceConfigOptions.FILE_FORMAT_TYPE,
FileFormat.TEXT,