This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 72af94e fix FileSink cannot work in flink stream mode (#1528)
72af94e is described below
commit 72af94eefedca888e96210564f0518db142b48eb
Author: Wenjun Ruan <[email protected]>
AuthorDate: Mon Mar 21 23:23:43 2022 +0800
fix FileSink cannot work in flink stream mode (#1528)
---
docs/en/flink/configuration/sink-plugins/File.md | 38 ++++++++++++++------
.../common/config/TypesafeConfigUtils.java | 14 ++++++++
.../common/config/TypesafeConfigUtilsTest.java | 10 +++++-
.../org/apache/seatunnel/flink/sink/FileSink.java | 40 +++++++++++++++++-----
4 files changed, 82 insertions(+), 20 deletions(-)
diff --git a/docs/en/flink/configuration/sink-plugins/File.md
b/docs/en/flink/configuration/sink-plugins/File.md
index fd2da5d..ba41462 100644
--- a/docs/en/flink/configuration/sink-plugins/File.md
+++ b/docs/en/flink/configuration/sink-plugins/File.md
@@ -8,15 +8,18 @@ Write data to the file system
## Options
-| name | type | required | default value |
-| -------------- | ------ | -------- | ------------- |
-| format | string | yes | - |
-| path | string | yes | - |
-| path_time_format | string | no | yyyyMMddHHmmss |
-| write_mode | string | no | - |
-| common-options | string | no | - |
-| parallelism | int | no | - |
-
+| name | type | required | default value |
+|-------------------|--------| -------- |----------------|
+| format | string | yes | - |
+| path | string | yes | - |
+| path_time_format | string | no | yyyyMMddHHmmss |
+| write_mode | string | no | - |
+| common-options | string | no | - |
+| parallelism | int | no | - |
+| rollover_interval | long | no | 1 |
+| max_part_size | long | no | 1024 |
+| prefix | string | no | seatunnel |
+| suffix | string | no | .ext |
### format [string]
@@ -51,7 +54,7 @@ See [Java
SimpleDateFormat](https://docs.oracle.com/javase/tutorial/i18n/format/
- OVERWRITE
- - Overwrite, delete and then write if the path exists
+ - Overwrite, delete and then write if the path exists
### common options [string]
@@ -61,6 +64,21 @@ Sink plugin common parameters, please refer to [Sink
Plugin](./sink-plugin.md) f
The parallelism of an individual operator, for FileSink
+### rollover_interval [long]
+
+The new file part rollover interval, unit min.
+
+### max_part_size [long]
+
+The max size of each file part, unit MB.
+
+### prefix [string]
+
+The prefix of each file part.
+
+### suffix [string]
+
+The suffix of each file part.
## Examples
diff --git
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/TypesafeConfigUtils.java
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/TypesafeConfigUtils.java
index ef4ec5e..3c294c8 100644
---
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/TypesafeConfigUtils.java
+++
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/TypesafeConfigUtils.java
@@ -91,4 +91,18 @@ public final class TypesafeConfigUtils {
return config;
}
+
+ @SuppressWarnings("unchecked")
+ public static <T> T getConfig(final Config config, final String configKey,
final T defaultValue) {
+ if (defaultValue.getClass().equals(Long.class)) {
+ return config.hasPath(configKey) ? (T)
Long.valueOf(config.getString(configKey)) : defaultValue;
+ }
+ if (defaultValue.getClass().equals(Integer.class)) {
+ return config.hasPath(configKey) ? (T)
Integer.valueOf(config.getString(configKey)) : defaultValue;
+ }
+ if (defaultValue.getClass().equals(String.class)) {
+ return config.hasPath(configKey) ? (T) config.getString(configKey)
: defaultValue;
+ }
+ throw new RuntimeException("Unsupported config type, configKey: " +
configKey);
+ }
}
diff --git
a/seatunnel-common/src/test/java/org/apache/seatunnel/common/config/TypesafeConfigUtilsTest.java
b/seatunnel-common/src/test/java/org/apache/seatunnel/common/config/TypesafeConfigUtilsTest.java
index 4ae5da8..29a778f 100644
---
a/seatunnel-common/src/test/java/org/apache/seatunnel/common/config/TypesafeConfigUtilsTest.java
+++
b/seatunnel-common/src/test/java/org/apache/seatunnel/common/config/TypesafeConfigUtilsTest.java
@@ -76,11 +76,19 @@ public class TypesafeConfigUtilsTest {
}
public Config getConfig() {
- Map<String, String> configMap = new HashMap<>();
+ Map<String, Object> configMap = new HashMap<>();
configMap.put("test.t0", "v0");
configMap.put("test.t1", "v1");
configMap.put("k0", "v2");
configMap.put("k1", "v3");
+ configMap.put("l1", Long.parseLong("100"));
return ConfigFactory.parseMap(configMap);
}
+
+ @Test
+ public void testGetConfig() {
+ Config config = getConfig();
+ Assert.assertEquals(Long.parseLong("100"), (long)
TypesafeConfigUtils.getConfig(config, "l1", Long.parseLong("101")));
+ Assert.assertEquals(Long.parseLong("100"), (long)
TypesafeConfigUtils.getConfig(config, "l2", Long.parseLong("100")));
+ }
}
diff --git
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/sink/FileSink.java
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/sink/FileSink.java
index defaef7..9e730d0 100644
---
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/sink/FileSink.java
+++
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/sink/FileSink.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.flink.sink;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.config.TypesafeConfigUtils;
import org.apache.seatunnel.common.utils.StringTemplate;
import org.apache.seatunnel.flink.FlinkEnvironment;
import org.apache.seatunnel.flink.batch.FlinkBatchSink;
@@ -28,7 +29,7 @@ import org.apache.seatunnel.flink.stream.FlinkStreamSink;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.flink.api.common.io.FileOutputFormat;
-import org.apache.flink.api.common.serialization.Encoder;
+import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.io.TextOutputFormat;
import org.apache.flink.api.java.operators.DataSink;
@@ -37,12 +38,14 @@ import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import
org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
+import
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.types.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.PrintStream;
+import java.util.concurrent.TimeUnit;
public class FileSink implements FlinkStreamSink, FlinkBatchSink {
@@ -56,6 +59,18 @@ public class FileSink implements FlinkStreamSink,
FlinkBatchSink {
private static final String PARALLELISM = "parallelism";
private static final String PATH_TIME_FORMAT = "path_time_format";
private static final String DEFAULT_TIME_FORMAT = "yyyyMMddHHmmss";
+ // *********************** For stream mode config ************************
+ private static final String ROLLOVER_INTERVAL = "rollover_interval";
+ private static final long DEFAULE_ROLLOVER_INTERVAL = 60;
+ private static final String MAX_PART_SIZE = "max_part_size";
+ private static final long DEFAULT_MAX_PART_SIZE = 1024;
+ private static final String PART_PREFIX = "prefix";
+ private static final String DEFAULT_PART_PREFIX = "seatunnel";
+ private static final String PART_SUFFIX = "suffix";
+ private static final String DEFAULT_PART_SUFFIX = ".ext";
+ private static final long MB = 1024 * 1024;
+ // ***********************************************************************
+
private Config config;
private FileOutputFormat<Row> outputFormat;
@@ -64,14 +79,21 @@ public class FileSink implements FlinkStreamSink,
FlinkBatchSink {
@Override
public DataStreamSink<Row> outputStream(FlinkEnvironment env,
DataStream<Row> dataStream) {
+ final DefaultRollingPolicy<Row, String> rollingPolicy =
DefaultRollingPolicy.builder()
+ .withMaxPartSize(MB * TypesafeConfigUtils.getConfig(config,
MAX_PART_SIZE, DEFAULT_MAX_PART_SIZE))
+ .withRolloverInterval(
+
TimeUnit.MINUTES.toMillis(TypesafeConfigUtils.getConfig(config,
ROLLOVER_INTERVAL, DEFAULE_ROLLOVER_INTERVAL)))
+ .build();
+ OutputFileConfig outputFileConfig = OutputFileConfig.builder()
+ .withPartPrefix(TypesafeConfigUtils.getConfig(config, PART_PREFIX,
DEFAULT_PART_PREFIX))
+ .withPartSuffix(TypesafeConfigUtils.getConfig(config, PART_SUFFIX,
DEFAULT_PART_SUFFIX))
+ .build();
final StreamingFileSink<Row> sink = StreamingFileSink
- .forRowFormat(filePath, (Encoder<Row>) (element, stream) -> {
- try (PrintStream out = new PrintStream(stream)) {
- out.println(element);
- }
- })
- .build();
+ .forRowFormat(filePath, new SimpleStringEncoder<Row>())
+ .withRollingPolicy(rollingPolicy)
+ .withOutputFileConfig(outputFileConfig)
+ .build();
return dataStream.addSink(sink);
}
@@ -124,7 +146,7 @@ public class FileSink implements FlinkStreamSink,
FlinkBatchSink {
@Override
public void prepare(FlinkEnvironment env) {
- String format = config.hasPath(PATH_TIME_FORMAT) ?
config.getString(PATH_TIME_FORMAT) : DEFAULT_TIME_FORMAT;
+ String format = TypesafeConfigUtils.getConfig(config,
PATH_TIME_FORMAT, DEFAULT_TIME_FORMAT);
String path = StringTemplate.substitute(config.getString(PATH),
format);
filePath = new Path(path);
}