This is an automated email from the ASF dual-hosted git repository.
yunqing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 21bfeb155 [INLONG-6654][Sort] Supports s3 side-output for dirty data
(#6655)
21bfeb155 is described below
commit 21bfeb1555f17a49c40a771f3fc52e44c048a20d
Author: yunqingmoswu <[email protected]>
AuthorDate: Tue Nov 29 15:46:47 2022 +0800
[INLONG-6654][Sort] Supports s3 side-output for dirty data (#6655)
---
inlong-sort/sort-connectors/base/pom.xml | 5 +
.../sort/base/dirty/sink/DirtySinkFactory.java | 7 +-
.../sort/base/dirty/sink/log/LogDirtySink.java | 4 +-
.../base/dirty/sink/log/LogDirtySinkFactory.java | 10 +-
.../sort/base/dirty/sink/s3/S3DirtySink.java | 279 +++++++++++++++++++++
.../base/dirty/sink/s3/S3DirtySinkFactory.java | 148 +++++++++++
.../inlong/sort/base/dirty/sink/s3/S3Helper.java | 100 ++++++++
.../inlong/sort/base/dirty/sink/s3/S3Options.java | 241 ++++++++++++++++++
.../org.apache.flink.table.factories.Factory | 3 +-
licenses/inlong-sort-connectors/LICENSE | 2 +-
pom.xml | 7 +
11 files changed, 795 insertions(+), 11 deletions(-)
diff --git a/inlong-sort/sort-connectors/base/pom.xml
b/inlong-sort/sort-connectors/base/pom.xml
index 8e4d2701e..675190c51 100644
--- a/inlong-sort/sort-connectors/base/pom.xml
+++ b/inlong-sort/sort-connectors/base/pom.xml
@@ -42,6 +42,11 @@
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
+
+ <dependency>
+ <groupId>com.amazonaws</groupId>
+ <artifactId>aws-java-sdk-s3</artifactId>
+ </dependency>
</dependencies>
</project>
diff --git
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/DirtySinkFactory.java
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/DirtySinkFactory.java
index b6725ddd8..07784b443 100644
---
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/DirtySinkFactory.java
+++
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/DirtySinkFactory.java
@@ -17,12 +17,13 @@
package org.apache.inlong.sort.base.dirty.sink;
-import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.factories.DynamicTableFactory.Context;
+import org.apache.flink.table.factories.Factory;
/**
* Dirty sink factory class, it is used to create dirty sink
*/
-public interface DirtySinkFactory extends DynamicTableFactory {
+public interface DirtySinkFactory extends Factory {
/**
* Create dirty sink
@@ -31,6 +32,6 @@ public interface DirtySinkFactory extends DynamicTableFactory
{
* @param <T> The data mode that is handled by the dirty sink
* @return A dirty sink
*/
- <T> DirtySink<T> createDirtySink(DynamicTableFactory.Context context);
+ <T> DirtySink<T> createDirtySink(Context context);
}
diff --git
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/log/LogDirtySink.java
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/log/LogDirtySink.java
index bf5a4f135..a57c981fe 100644
---
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/log/LogDirtySink.java
+++
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/log/LogDirtySink.java
@@ -46,7 +46,7 @@ public class LogDirtySink<T> implements DirtySink<T> {
private static final long serialVersionUID = 1L;
- private static final Logger LOG =
LoggerFactory.getLogger(LogDirtySink.class);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(LogDirtySink.class);
private final RowData.FieldGetter[] fieldGetters;
private final String format;
@@ -85,7 +85,7 @@ public class LogDirtySink<T> implements DirtySink<T> {
// Only support csv format when the row is not a 'RowData' and
'JsonNode'
value = FormatUtils.csvFormat(data, labelMap, fieldDelimiter);
}
- LOG.info("[{}] {}", dirtyData.getLogTag(), value);
+ LOGGER.info("[{}] {}", dirtyData.getLogTag(), value);
}
private String format(RowData data, Map<String, String> labels) throws
JsonProcessingException {
diff --git
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/log/LogDirtySinkFactory.java
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/log/LogDirtySinkFactory.java
index 93a12f584..c3720a93d 100644
---
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/log/LogDirtySinkFactory.java
+++
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/log/LogDirtySinkFactory.java
@@ -18,12 +18,14 @@
package org.apache.inlong.sort.base.dirty.sink.log;
import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.table.factories.DynamicTableFactory.Context;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.inlong.sort.base.dirty.sink.DirtySink;
import org.apache.inlong.sort.base.dirty.sink.DirtySinkFactory;
import java.util.HashSet;
import java.util.Set;
+import static org.apache.inlong.sort.base.Constants.DIRTY_IDENTIFIER;
import static
org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_FIELD_DELIMITER;
import static org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_FORMAT;
@@ -36,10 +38,9 @@ public class LogDirtySinkFactory implements DirtySinkFactory
{
@Override
public <T> DirtySink<T> createDirtySink(Context context) {
- final FactoryUtil.TableFactoryHelper helper =
FactoryUtil.createTableFactoryHelper(this, context);
- FactoryUtil.validateFactoryOptions(this, helper.getOptions());
- String format = helper.getOptions().get(DIRTY_SIDE_OUTPUT_FORMAT);
- String fieldDelimiter =
helper.getOptions().get(DIRTY_SIDE_OUTPUT_FIELD_DELIMITER);
+ FactoryUtil.validateFactoryOptions(this, context.getConfiguration());
+ String format =
context.getConfiguration().get(DIRTY_SIDE_OUTPUT_FORMAT);
+ String fieldDelimiter =
context.getConfiguration().get(DIRTY_SIDE_OUTPUT_FIELD_DELIMITER);
return new LogDirtySink<>(format, fieldDelimiter,
context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType());
}
@@ -59,6 +60,7 @@ public class LogDirtySinkFactory implements DirtySinkFactory {
final Set<ConfigOption<?>> options = new HashSet<>();
options.add(DIRTY_SIDE_OUTPUT_FORMAT);
options.add(DIRTY_SIDE_OUTPUT_FIELD_DELIMITER);
+ options.add(DIRTY_IDENTIFIER);
return options;
}
}
diff --git
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3DirtySink.java
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3DirtySink.java
new file mode 100644
index 000000000..3cb20c7b8
--- /dev/null
+++
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3DirtySink.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.dirty.sink.s3;
+
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.client.builder.AwsClientBuilder;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3ClientBuilder;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.formats.common.TimestampFormat;
+import org.apache.flink.formats.json.JsonOptions.MapNullKeyMode;
+import org.apache.flink.formats.json.RowDataToJsonConverters;
+import
org.apache.flink.formats.json.RowDataToJsonConverters.RowDataToJsonConverter;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.inlong.sort.base.dirty.DirtyData;
+import org.apache.inlong.sort.base.dirty.sink.DirtySink;
+import org.apache.inlong.sort.base.dirty.utils.FormatUtils;
+import org.apache.inlong.sort.base.util.LabelUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.flink.table.data.RowData.createFieldGetter;
+
+/**
+ * S3 dirty sink that is used to sink dirty data to s3
+ *
+ * @param <T>
+ */
+public class S3DirtySink<T> implements DirtySink<T> {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(S3DirtySink.class);
+
+ private final Map<String, List<String>> batchMap = new HashMap<>();
+ private final S3Options s3Options;
+ private final AtomicLong readInNum = new AtomicLong(0);
+ private final AtomicLong writeOutNum = new AtomicLong(0);
+ private final AtomicLong errorNum = new AtomicLong(0);
+ private final DataType physicalRowDataType;
+ private final RowData.FieldGetter[] fieldGetters;
+ private RowDataToJsonConverter converter;
+ private long batchBytes = 0L;
+ private int size;
+ private transient volatile boolean closed = false;
+ private transient volatile boolean flushing = false;
+ private transient ScheduledExecutorService scheduler;
+ private transient ScheduledFuture<?> scheduledFuture;
+ private transient S3Helper s3Helper;
+
+ public S3DirtySink(S3Options s3Options, DataType physicalRowDataType) {
+ this.s3Options = s3Options;
+ this.physicalRowDataType = physicalRowDataType;
+ final LogicalType[] logicalTypes = physicalRowDataType.getChildren()
+
.stream().map(DataType::getLogicalType).toArray(LogicalType[]::new);
+ this.fieldGetters = new RowData.FieldGetter[logicalTypes.length];
+ for (int i = 0; i < logicalTypes.length; i++) {
+ fieldGetters[i] = createFieldGetter(logicalTypes[i], i);
+ }
+ }
+
+ @Override
+ public void open(Configuration configuration) throws Exception {
+ converter = new RowDataToJsonConverters(TimestampFormat.SQL,
MapNullKeyMode.DROP, null)
+ .createConverter(physicalRowDataType.getLogicalType());
+ AmazonS3 s3Client;
+ if (s3Options.getAccessKeyId() != null && s3Options.getSecretKeyId()
!= null) {
+ BasicAWSCredentials awsCreds =
+ new BasicAWSCredentials(s3Options.getAccessKeyId(),
s3Options.getSecretKeyId());
+ s3Client = AmazonS3ClientBuilder.standard().withCredentials(new
AWSStaticCredentialsProvider(awsCreds))
+ .withEndpointConfiguration(new
AwsClientBuilder.EndpointConfiguration(
+ s3Options.getEndpoint(),
+ s3Options.getRegion()))
+ .build();
+ } else {
+ s3Client =
AmazonS3ClientBuilder.standard().withEndpointConfiguration(
+ new
AwsClientBuilder.EndpointConfiguration(s3Options.getEndpoint(),
s3Options.getRegion())).build();
+ }
+ s3Helper = new S3Helper(s3Client, s3Options);
+ this.scheduler = new ScheduledThreadPoolExecutor(1,
+ new ExecutorThreadFactory("s3-dirty-sink"));
+ this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(() -> {
+ if (!closed && !flushing) {
+ flush();
+ }
+ }, s3Options.getBatchIntervalMs(), s3Options.getBatchIntervalMs(),
TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public synchronized void invoke(DirtyData<T> dirtyData) throws Exception {
+ try {
+ addBatch(dirtyData);
+ } catch (Exception e) {
+ if (!s3Options.ignoreSideOutputErrors()) {
+ throw new RuntimeException(String.format("Add batch to
identifier:%s failed, the dirty data: %s.",
+ dirtyData.getIdentifier(), dirtyData.toString()), e);
+ }
+ LOGGER.warn("Add batch to identifier:{} failed "
+ + "and the dirty data will be throw away in the future"
+ + " because the option 'dirty.side-output.ignore-errors'
is 'true'", dirtyData.getIdentifier());
+ }
+ if (valid() && !flushing) {
+ flush();
+ }
+ }
+
+ private boolean valid() {
+ return (s3Options.getBatchSize() > 0 && size >=
s3Options.getBatchSize())
+ || batchBytes >= s3Options.getMaxBatchBytes();
+ }
+
+ private void addBatch(DirtyData<T> dirtyData) throws IOException {
+ readInNum.incrementAndGet();
+ String value;
+ Map<String, String> labelMap =
LabelUtils.parseLabels(dirtyData.getLabels());
+ T data = dirtyData.getData();
+ if (data instanceof RowData) {
+ value = format((RowData) data, labelMap);
+ } else if (data instanceof JsonNode) {
+ value = format((JsonNode) data, labelMap);
+ } else {
+ // Only support csv format when the row is not a 'RowData' and
'JsonNode'
+ value = FormatUtils.csvFormat(data, labelMap,
s3Options.getFieldDelimiter());
+ }
+ if (s3Options.enableDirtyLog()) {
+ LOGGER.info("[{}] {}", dirtyData.getLogTag(), value);
+ }
+ batchBytes += value.getBytes(UTF_8).length;
+ size++;
+ batchMap.computeIfAbsent(dirtyData.getIdentifier(), k -> new
ArrayList<>()).add(value);
+ }
+
+ private String format(RowData data, Map<String, String> labels) throws
JsonProcessingException {
+ String value;
+ switch (s3Options.getFormat()) {
+ case "csv":
+ value = FormatUtils.csvFormat(data, fieldGetters, labels,
s3Options.getFieldDelimiter());
+ break;
+ case "json":
+ value = FormatUtils.jsonFormat(data, converter, labels);
+ break;
+ default:
+ throw new UnsupportedOperationException(
+ String.format("Unsupported format for: %s",
s3Options.getFormat()));
+ }
+ return value;
+ }
+
+ private String format(JsonNode data, Map<String, String> labels) throws
JsonProcessingException {
+ String value;
+ switch (s3Options.getFormat()) {
+ case "csv":
+ value = FormatUtils.csvFormat(data, labels,
s3Options.getFieldDelimiter());
+ break;
+ case "json":
+ value = FormatUtils.jsonFormat(data, labels);
+ break;
+ default:
+ throw new UnsupportedOperationException(
+ String.format("Unsupported format for: %s",
s3Options.getFormat()));
+ }
+ return value;
+ }
+
+ @Override
+ public synchronized void close() throws IOException {
+ if (!closed) {
+ closed = true;
+ if (this.scheduledFuture != null) {
+ scheduledFuture.cancel(false);
+ this.scheduler.shutdown();
+ }
+ try {
+ flush();
+ } catch (Exception e) {
+ LOGGER.warn("Writing records to s3 failed.", e);
+ throw new RuntimeException("Writing records to s3 failed.", e);
+ }
+ }
+ }
+
+ /**
+ * Flush data to s3
+ */
+ public synchronized void flush() {
+ flushing = true;
+ if (!hasRecords()) {
+ flushing = false;
+ return;
+ }
+ for (Entry<String, List<String>> kvs : batchMap.entrySet()) {
+ flushSingleIdentifier(kvs.getKey(), kvs.getValue());
+ }
+ batchMap.clear();
+ batchBytes = 0;
+ size = 0;
+ flushing = false;
+ LOGGER.info("S3 dirty sink statistics: readInNum: {}, writeOutNum: {},
errorNum: {}",
+ readInNum.get(), writeOutNum.get(), errorNum.get());
+ }
+
+ /**
+ * Flush data of single identifier to s3
+ *
+ * @param identifier The identifier of dirty data
+ * @param values The values of the identifier
+ */
+ private void flushSingleIdentifier(String identifier, List<String> values)
{
+ if (values == null || values.isEmpty()) {
+ return;
+ }
+ String content = null;
+ try {
+ content = StringUtils.join(values, s3Options.getLineDelimiter());
+ s3Helper.upload(identifier, content);
+ LOGGER.info("Write {} records to s3 of identifier: {}",
values.size(), identifier);
+ writeOutNum.addAndGet(values.size());
+ // Clean the data that has been loaded.
+ values.clear();
+ } catch (Exception e) {
+ errorNum.addAndGet(values.size());
+ if (!s3Options.ignoreSideOutputErrors()) {
+ throw new RuntimeException(
+ String.format("Writing records to s3 of identifier:%s
failed, the value: %s.",
+ identifier, content),
+ e);
+ }
+ LOGGER.warn("Writing records to s3 of identifier:{} failed "
+ + "and the dirty data will be throw away in the future"
+ + " because the option 'dirty.side-output.ignore-errors'
is 'true'", identifier);
+ }
+ }
+
+ private boolean hasRecords() {
+ if (batchMap.isEmpty()) {
+ return false;
+ }
+ for (List<String> value : batchMap.values()) {
+ if (!value.isEmpty()) {
+ return true;
+ }
+ }
+ return false;
+ }
+}
diff --git
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3DirtySinkFactory.java
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3DirtySinkFactory.java
new file mode 100644
index 000000000..d9ec26434
--- /dev/null
+++
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3DirtySinkFactory.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.dirty.sink.s3;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.factories.DynamicTableFactory.Context;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.inlong.sort.base.dirty.sink.DirtySink;
+import org.apache.inlong.sort.base.dirty.sink.DirtySinkFactory;
+
+import java.util.HashSet;
+import java.util.Set;
+import static org.apache.inlong.sort.base.Constants.DIRTY_IDENTIFIER;
+import static
org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_BATCH_BYTES;
+import static
org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_BATCH_INTERVAL;
+import static
org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_BATCH_SIZE;
+import static
org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_FIELD_DELIMITER;
+import static org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_FORMAT;
+import static
org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_IGNORE_ERRORS;
+import static
org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_LINE_DELIMITER;
+import static
org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_LOG_ENABLE;
+import static org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_RETRIES;
+
+/**
+ * S3 dirty sink factory
+ */
+public class S3DirtySinkFactory implements DirtySinkFactory {
+
+ private static final String IDENTIFIER = "s3";
+
+ private static final ConfigOption<String> DIRTY_SIDE_OUTPUT_ENDPOINT =
+ ConfigOptions.key("dirty.side-output.s3.endpoint")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The endpoint of s3");
+ private static final ConfigOption<String> DIRTY_SIDE_OUTPUT_REGION =
+ ConfigOptions.key("dirty.side-output.s3.region")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The region of s3");
+ private static final ConfigOption<String> DIRTY_SIDE_OUTPUT_BUCKET =
+ ConfigOptions.key("dirty.side-output.s3.bucket")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The bucket of s3");
+ private static final ConfigOption<String> DIRTY_SIDE_OUTPUT_KEY =
+ ConfigOptions.key("dirty.side-output.s3.key")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The key of s3");
+ private static final ConfigOption<String> DIRTY_SIDE_OUTPUT_ACCESS_KEY_ID =
+ ConfigOptions.key("dirty.side-output.s3.access-key-id")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The access key of s3");
+ private static final ConfigOption<String> DIRTY_SIDE_OUTPUT_SECRET_KEY_ID =
+ ConfigOptions.key("dirty.side-output.s3.secret-key-id")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The secret key of s3");
+
+ @Override
+ public <T> DirtySink<T> createDirtySink(Context context) {
+ FactoryUtil.validateFactoryOptions(this, context.getConfiguration());
+ validate(context.getConfiguration());
+ return new S3DirtySink<>(getS3Options(context.getConfiguration()),
+
context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType());
+ }
+
+ private void validate(ReadableConfig config) {
+ String identifier = config.getOptional(DIRTY_IDENTIFIER).orElse(null);
+ if (identifier == null || identifier.trim().length() == 0) {
+ throw new ValidationException(
+ "The option 'dirty.identifier' is not allowed to be
empty.");
+ }
+ }
+
+ private S3Options getS3Options(ReadableConfig config) {
+ final S3Options.Builder builder = S3Options.builder()
+
.setEndpoint(config.getOptional(DIRTY_SIDE_OUTPUT_ENDPOINT).orElse(null))
+
.setRegion(config.getOptional(DIRTY_SIDE_OUTPUT_REGION).orElse(null))
+
.setBucket(config.getOptional(DIRTY_SIDE_OUTPUT_BUCKET).orElse(null))
+ .setKey(config.getOptional(DIRTY_SIDE_OUTPUT_KEY).orElse(null))
+ .setBatchSize(config.get(DIRTY_SIDE_OUTPUT_BATCH_SIZE))
+ .setMaxRetries(config.get(DIRTY_SIDE_OUTPUT_RETRIES))
+
.setBatchIntervalMs(config.get(DIRTY_SIDE_OUTPUT_BATCH_INTERVAL))
+ .setMaxBatchBytes(config.get(DIRTY_SIDE_OUTPUT_BATCH_BYTES))
+ .setFormat(config.get(DIRTY_SIDE_OUTPUT_FORMAT))
+
.setIgnoreSideOutputErrors(config.get(DIRTY_SIDE_OUTPUT_IGNORE_ERRORS))
+ .setEnableDirtyLog(config.get(DIRTY_SIDE_OUTPUT_LOG_ENABLE))
+
.setFieldDelimiter(config.get(DIRTY_SIDE_OUTPUT_FIELD_DELIMITER))
+ .setLineDelimiter(config.get(DIRTY_SIDE_OUTPUT_LINE_DELIMITER))
+
.setAccessKeyId(config.getOptional(DIRTY_SIDE_OUTPUT_ACCESS_KEY_ID).orElse(null))
+
.setSecretKeyId(config.getOptional(DIRTY_SIDE_OUTPUT_SECRET_KEY_ID).orElse(null));
+ return builder.build();
+ }
+
+ @Override
+ public String factoryIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public Set<ConfigOption<?>> requiredOptions() {
+ final Set<ConfigOption<?>> options = new HashSet<>();
+ options.add(DIRTY_SIDE_OUTPUT_ENDPOINT);
+ options.add(DIRTY_SIDE_OUTPUT_REGION);
+ options.add(DIRTY_SIDE_OUTPUT_BUCKET);
+ options.add(DIRTY_SIDE_OUTPUT_KEY);
+ options.add(DIRTY_IDENTIFIER);
+ return options;
+ }
+
+ @Override
+ public Set<ConfigOption<?>> optionalOptions() {
+ final Set<ConfigOption<?>> options = new HashSet<>();
+ options.add(DIRTY_SIDE_OUTPUT_BATCH_SIZE);
+ options.add(DIRTY_SIDE_OUTPUT_RETRIES);
+ options.add(DIRTY_SIDE_OUTPUT_BATCH_INTERVAL);
+ options.add(DIRTY_SIDE_OUTPUT_BATCH_BYTES);
+ options.add(DIRTY_SIDE_OUTPUT_FORMAT);
+ options.add(DIRTY_SIDE_OUTPUT_IGNORE_ERRORS);
+ options.add(DIRTY_SIDE_OUTPUT_LOG_ENABLE);
+ options.add(DIRTY_SIDE_OUTPUT_FIELD_DELIMITER);
+ options.add(DIRTY_SIDE_OUTPUT_LINE_DELIMITER);
+ options.add(DIRTY_SIDE_OUTPUT_ACCESS_KEY_ID);
+ options.add(DIRTY_SIDE_OUTPUT_SECRET_KEY_ID);
+ return options;
+ }
+}
diff --git
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3Helper.java
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3Helper.java
new file mode 100644
index 000000000..d79b8aecd
--- /dev/null
+++
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3Helper.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.dirty.sink.s3;
+
+import com.amazonaws.services.s3.AmazonS3;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Random;
+
+/**
+ * S3 helper class, it helps write to s3
+ */
+public class S3Helper implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOG =
LoggerFactory.getLogger(S3DirtySink.class);
+
+ private static final DateTimeFormatter DATE_TIME_FORMAT =
DateTimeFormatter.ofPattern("yyyyMMddHHmmss");
+
+ private static final int SEQUENCE_LENGTH = 4;
+ private static final String ESCAPE_PATTERN = "[\\pP\\p{Punct}\\s]";
+ private static final String FILE_NAME_SUFFIX = ".txt";
+ private final Random r = new Random();
+ private final AmazonS3 s3Client;
+ private final S3Options s3Options;
+
+ S3Helper(AmazonS3 s3Client, S3Options s3Options) {
+ this.s3Client = s3Client;
+ this.s3Options = s3Options;
+ }
+
+ /**
+ * Upload data to s3
+ *
+ * @param identifier The identifier of dirty data
+ * @param content The content that will be upload
+ * @throws IOException The exception may be thrown when executing
+ */
+ public void upload(String identifier, String content) throws IOException {
+ String path = genFileName(identifier);
+ for (int i = 0; i < s3Options.getMaxRetries(); i++) {
+ try {
+ s3Client.putObject(s3Options.getBucket(), path, content);
+ break;
+ } catch (Exception e) {
+ LOG.error("s3 dirty sink error, retry times = {}", i, e);
+ if (i >= s3Options.getMaxRetries()) {
+ throw new IOException(e);
+ }
+ try {
+ Thread.sleep(1000L * i);
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ throw new IOException("unable to flush; interrupted while
doing another attempt", e);
+ }
+ }
+ }
+ }
+
+ /**
+ * Generate the file name for s3
+ *
+ * @param identifier The identifier of dirty data
+ * @return File name of s3
+ */
+ private String genFileName(String identifier) {
+ return String.format("%s/%s-%s%s", s3Options.getKey(),
+ identifier.replaceAll(ESCAPE_PATTERN, ""), generateSequence(),
FILE_NAME_SUFFIX);
+ }
+
+ private String generateSequence() {
+ StringBuilder sb = new
StringBuilder(DATE_TIME_FORMAT.format(LocalDateTime.now()));
+ for (int i = 0; i < SEQUENCE_LENGTH; i++) {
+ sb.append(r.nextInt(10));
+ }
+ return sb.toString();
+ }
+
+}
diff --git
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3Options.java
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3Options.java
new file mode 100644
index 000000000..f507b4802
--- /dev/null
+++
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3Options.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.dirty.sink.s3;
+
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+
+/**
+ * S3 options
+ */
+public class S3Options implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final int DEFAULT_BATCH_SIZE = 100;
+ private static final int DEFAULT_MAX_RETRY_TIMES = 3;
+ private static final long DEFAULT_MAX_BATCH_BYTES = 1024 * 10L;
+ private static final long DEFAULT_INTERVAL_MILLIS = 10000L;
+ private static final String DEFAULT_FIELD_DELIMITER = ",";
+ private static final String DEFAULT_LINE_DELIMITER = "\n";
+ private static final String DEFAULT_FORMAT = "csv";
+
+ private final Integer batchSize;
+ private final Integer maxRetries;
+ private final Long batchIntervalMs;
+ private final Long maxBatchBytes;
+ private final boolean ignoreSideOutputErrors;
+ private final boolean enableDirtyLog;
+ private final String format;
+ private final String fieldDelimiter;
+ private final String lineDelimiter;
+ private final String endpoint;
+ private final String region;
+ private final String bucket;
+ private final String key;
+ private final String accessKeyId;
+ private final String secretKeyId;
+
+ private S3Options(Integer batchSize, Integer maxRetries, Long
batchIntervalMs, Long maxBatchBytes,
+ String format, boolean ignoreSideOutputErrors, boolean
enableDirtyLog, String fieldDelimiter,
+ String lineDelimiter, String endpoint, String region, String
bucket, String key,
+ String accessKeyId, String secretKeyId) {
+ Preconditions.checkArgument(maxRetries >= 0);
+ Preconditions.checkArgument(maxBatchBytes >= 0);
+ this.batchSize = batchSize;
+ this.maxRetries = maxRetries;
+ this.batchIntervalMs = batchIntervalMs;
+ this.maxBatchBytes = maxBatchBytes;
+ this.format = format;
+ this.ignoreSideOutputErrors = ignoreSideOutputErrors;
+ this.enableDirtyLog = enableDirtyLog;
+ this.fieldDelimiter = fieldDelimiter;
+ this.lineDelimiter = lineDelimiter;
+ this.endpoint = Preconditions.checkNotNull(endpoint, "endpoint is
null");
+ this.region = Preconditions.checkNotNull(region, "region is null");
+ this.bucket = Preconditions.checkNotNull(bucket, "bucket is null");
+ this.key = Preconditions.checkNotNull(key, "key is null");
+ this.accessKeyId = accessKeyId;
+ this.secretKeyId = secretKeyId;
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public Integer getBatchSize() {
+ return batchSize;
+ }
+
+ public Integer getMaxRetries() {
+ return maxRetries;
+ }
+
+ public Long getBatchIntervalMs() {
+ return batchIntervalMs;
+ }
+
+ public Long getMaxBatchBytes() {
+ return maxBatchBytes;
+ }
+
+ public String getFormat() {
+ return format;
+ }
+
+ public boolean ignoreSideOutputErrors() {
+ return ignoreSideOutputErrors;
+ }
+
+ public boolean enableDirtyLog() {
+ return enableDirtyLog;
+ }
+
+ public String getFieldDelimiter() {
+ return fieldDelimiter;
+ }
+
+ public String getLineDelimiter() {
+ return lineDelimiter;
+ }
+
+ public String getEndpoint() {
+ return endpoint;
+ }
+
+ public String getRegion() {
+ return region;
+ }
+
+ public String getBucket() {
+ return bucket;
+ }
+
+ public String getKey() {
+ return key;
+ }
+
+ public String getAccessKeyId() {
+ return accessKeyId;
+ }
+
+ public String getSecretKeyId() {
+ return secretKeyId;
+ }
+
+ public static class Builder {
+
+ private Integer batchSize = DEFAULT_BATCH_SIZE;
+ private Integer maxRetries = DEFAULT_MAX_RETRY_TIMES;
+ private Long batchIntervalMs = DEFAULT_INTERVAL_MILLIS;
+ private Long maxBatchBytes = DEFAULT_MAX_BATCH_BYTES;
+ private String format = DEFAULT_FORMAT;
+ private boolean ignoreSideOutputErrors;
+ private boolean enableDirtyLog;
+ private String fieldDelimiter = DEFAULT_FIELD_DELIMITER;
+ private String lineDelimiter = DEFAULT_LINE_DELIMITER;
+ private String endpoint;
+ private String region;
+ private String bucket;
+ private String key;
+ private String accessKeyId;
+ private String secretKeyId;
+
+ public Builder setBatchSize(Integer batchSize) {
+ this.batchSize = batchSize;
+ return this;
+ }
+
+ public Builder setMaxRetries(Integer maxRetries) {
+ this.maxRetries = maxRetries;
+ return this;
+ }
+
+ public Builder setBatchIntervalMs(Long batchIntervalMs) {
+ this.batchIntervalMs = batchIntervalMs;
+ return this;
+ }
+
+ public Builder setMaxBatchBytes(Long maxBatchBytes) {
+ this.maxBatchBytes = maxBatchBytes;
+ return this;
+ }
+
+ public Builder setFormat(String format) {
+ this.format = format;
+ return this;
+ }
+
+ public Builder setIgnoreSideOutputErrors(boolean
ignoreSideOutputErrors) {
+ this.ignoreSideOutputErrors = ignoreSideOutputErrors;
+ return this;
+ }
+
+ public Builder setEnableDirtyLog(boolean enableDirtyLog) {
+ this.enableDirtyLog = enableDirtyLog;
+ return this;
+ }
+
+ public Builder setFieldDelimiter(String fieldDelimiter) {
+ this.fieldDelimiter = fieldDelimiter;
+ return this;
+ }
+
+ public Builder setLineDelimiter(String lineDelimiter) {
+ this.lineDelimiter = lineDelimiter;
+ return this;
+ }
+
+ public Builder setEndpoint(String endpoint) {
+ this.endpoint = endpoint;
+ return this;
+ }
+
+ public Builder setRegion(String region) {
+ this.region = region;
+ return this;
+ }
+
+ public Builder setBucket(String bucket) {
+ this.bucket = bucket;
+ return this;
+ }
+
+ public Builder setKey(String key) {
+ this.key = key;
+ return this;
+ }
+
+ public Builder setAccessKeyId(String accessKeyId) {
+ this.accessKeyId = accessKeyId;
+ return this;
+ }
+
+ public Builder setSecretKeyId(String secretKeyId) {
+ this.secretKeyId = secretKeyId;
+ return this;
+ }
+
+ public S3Options build() {
+ return new S3Options(batchSize, maxRetries, batchIntervalMs,
maxBatchBytes, format,
+ ignoreSideOutputErrors, enableDirtyLog, fieldDelimiter,
lineDelimiter, endpoint,
+ region, bucket, key, accessKeyId, secretKeyId);
+ }
+ }
+}
diff --git
a/inlong-sort/sort-connectors/base/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
b/inlong-sort/sort-connectors/base/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
index 412dedf67..a83e4d025 100644
---
a/inlong-sort/sort-connectors/base/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
+++
b/inlong-sort/sort-connectors/base/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -13,4 +13,5 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-org.apache.inlong.sort.base.dirty.sink.log.LogDirtySinkFactory
\ No newline at end of file
+org.apache.inlong.sort.base.dirty.sink.log.LogDirtySinkFactory
+org.apache.inlong.sort.base.dirty.sink.s3.S3DirtySinkFactory
\ No newline at end of file
diff --git a/licenses/inlong-sort-connectors/LICENSE
b/licenses/inlong-sort-connectors/LICENSE
index 95793e251..5a0d591b5 100644
--- a/licenses/inlong-sort-connectors/LICENSE
+++ b/licenses/inlong-sort-connectors/LICENSE
@@ -844,7 +844,7 @@ The text of each license is the standard Apache 2.0 license.
com.tencentcloudapi:tencentcloud-sdk-java:3.1.545 - tencentcloud-sdk-java
(https://github.com/TencentCloud/tencentcloud-sdk-java), (The Apache Software
License, Version 2.0)
com.qcloud:dlc-data-catalog-metastore-client:1.1.1 -
dlc-data-catalog-metastore-client
(https://mvnrepository.com/artifact/com.qcloud/dlc-data-catalog-metastore-client/1.1),
(The Apache Software License, Version 2.0)
org.apache.doris:flink-doris-connector-1.13_2.11:1.0.3 - Flink Connector for
Apache Doris
(https://github.com/apache/doris-flink-connector/tree/1.13_2.11-1.0.3), (The
Apache Software License, Version 2.0)
-
+ com.amazonaws:aws-java-sdk-s3:jar:1.12.346 - AWS Java SDK for Amazon S3
(https://aws.amazon.com/sdkforjava), (The Apache Software License, Version 2.0)
========================================================================
Apache 2.0 licenses
diff --git a/pom.xml b/pom.xml
index 6ecfe05a2..a3897cc04 100644
--- a/pom.xml
+++ b/pom.xml
@@ -148,6 +148,7 @@
<ini4j.version>0.5.4</ini4j.version>
<dom4j.version>2.1.3</dom4j.version>
+ <aws.sdk.version>1.12.346</aws.sdk.version>
<zookeeper.version>3.6.3</zookeeper.version>
<pulsar.version>2.8.1</pulsar.version>
<pulsar.testcontainers.version>1.15.3</pulsar.testcontainers.version>
@@ -1073,6 +1074,12 @@
<version>${flink.version}</version>
</dependency>
+ <dependency>
+ <groupId>com.amazonaws</groupId>
+ <artifactId>aws-java-sdk-s3</artifactId>
+ <version>${aws.sdk.version}</version>
+ </dependency>
+
<dependency>
<groupId>io.streamnative.connectors</groupId>
<artifactId>pulsar-flink-connector_${scala.binary.version}</artifactId>