This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 72af94e  fix FileSink cannot work in flink stream mode (#1528)
72af94e is described below

commit 72af94eefedca888e96210564f0518db142b48eb
Author: Wenjun Ruan <[email protected]>
AuthorDate: Mon Mar 21 23:23:43 2022 +0800

    fix FileSink cannot work in flink stream mode (#1528)
---
 docs/en/flink/configuration/sink-plugins/File.md   | 38 ++++++++++++++------
 .../common/config/TypesafeConfigUtils.java         | 14 ++++++++
 .../common/config/TypesafeConfigUtilsTest.java     | 10 +++++-
 .../org/apache/seatunnel/flink/sink/FileSink.java  | 40 +++++++++++++++++-----
 4 files changed, 82 insertions(+), 20 deletions(-)

diff --git a/docs/en/flink/configuration/sink-plugins/File.md 
b/docs/en/flink/configuration/sink-plugins/File.md
index fd2da5d..ba41462 100644
--- a/docs/en/flink/configuration/sink-plugins/File.md
+++ b/docs/en/flink/configuration/sink-plugins/File.md
@@ -8,15 +8,18 @@ Write data to the file system
 
 ## Options
 
-| name           | type   | required | default value |
-| -------------- | ------ | -------- | ------------- |
-| format         | string | yes      | -             |
-| path           | string | yes      | -             |
-| path_time_format | string | no       | yyyyMMddHHmmss |
-| write_mode     | string | no       | -             |
-| common-options | string | no       | -             |
-| parallelism    | int    | no       | -             |
-
+| name              | type   | required | default value  |
+|-------------------|--------| -------- |----------------|
+| format            | string | yes      | -              |
+| path              | string | yes      | -              |
+| path_time_format  | string | no       | yyyyMMddHHmmss |
+| write_mode        | string | no       | -              |
+| common-options    | string | no       | -              |
+| parallelism       | int    | no       | -              |
+| rollover_interval | long   | no       | 1              |
+| max_part_size     | long   | no       | 1024          |
+| prefix            | string | no       | seatunnel      |
+| suffix            | string | no       | .ext           |
 
 ### format [string]
 
@@ -51,7 +54,7 @@ See [Java 
SimpleDateFormat](https://docs.oracle.com/javase/tutorial/i18n/format/
 
 - OVERWRITE
 
-    - Overwrite, delete and then write if the path exists
+  - Overwrite, delete and then write if the path exists
 
 ### common options [string]
 
@@ -61,6 +64,21 @@ Sink plugin common parameters, please refer to [Sink 
Plugin](./sink-plugin.md) f
 
 The parallelism of an individual operator, for FileSink
 
+### rollover_interval [long]
+
+The new file part rollover interval, unit min.
+
+### max_part_size [long]
+
+The max size of each file part, unit MB.
+
+### prefix [string]
+
+The prefix of each file part.
+
+### suffix [string]
+
+The suffix of each file part.
 
 ## Examples
 
diff --git 
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/TypesafeConfigUtils.java
 
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/TypesafeConfigUtils.java
index ef4ec5e..3c294c8 100644
--- 
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/TypesafeConfigUtils.java
+++ 
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/TypesafeConfigUtils.java
@@ -91,4 +91,18 @@ public final class TypesafeConfigUtils {
 
         return config;
     }
+
+    @SuppressWarnings("unchecked")
+    public static <T> T getConfig(final Config config, final String configKey, 
final T defaultValue) {
+        if (defaultValue.getClass().equals(Long.class)) {
+            return config.hasPath(configKey) ? (T) 
Long.valueOf(config.getString(configKey)) : defaultValue;
+        }
+        if (defaultValue.getClass().equals(Integer.class)) {
+            return config.hasPath(configKey) ? (T) 
Integer.valueOf(config.getString(configKey)) : defaultValue;
+        }
+        if (defaultValue.getClass().equals(String.class)) {
+            return config.hasPath(configKey) ? (T) config.getString(configKey) 
: defaultValue;
+        }
+        throw new RuntimeException("Unsupported config type, configKey: " + 
configKey);
+    }
 }
diff --git 
a/seatunnel-common/src/test/java/org/apache/seatunnel/common/config/TypesafeConfigUtilsTest.java
 
b/seatunnel-common/src/test/java/org/apache/seatunnel/common/config/TypesafeConfigUtilsTest.java
index 4ae5da8..29a778f 100644
--- 
a/seatunnel-common/src/test/java/org/apache/seatunnel/common/config/TypesafeConfigUtilsTest.java
+++ 
b/seatunnel-common/src/test/java/org/apache/seatunnel/common/config/TypesafeConfigUtilsTest.java
@@ -76,11 +76,19 @@ public class TypesafeConfigUtilsTest {
     }
 
     public Config getConfig() {
-        Map<String, String> configMap = new HashMap<>();
+        Map<String, Object> configMap = new HashMap<>();
         configMap.put("test.t0", "v0");
         configMap.put("test.t1", "v1");
         configMap.put("k0", "v2");
         configMap.put("k1", "v3");
+        configMap.put("l1", Long.parseLong("100"));
         return ConfigFactory.parseMap(configMap);
     }
+
+    @Test
+    public void testGetConfig() {
+        Config config = getConfig();
+        Assert.assertEquals(Long.parseLong("100"), (long) 
TypesafeConfigUtils.getConfig(config, "l1", Long.parseLong("101")));
+        Assert.assertEquals(Long.parseLong("100"), (long) 
TypesafeConfigUtils.getConfig(config, "l2", Long.parseLong("100")));
+    }
 }
diff --git 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/sink/FileSink.java
 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/sink/FileSink.java
index defaef7..9e730d0 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/sink/FileSink.java
+++ 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/sink/FileSink.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.flink.sink;
 
 import org.apache.seatunnel.common.config.CheckConfigUtil;
 import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.config.TypesafeConfigUtils;
 import org.apache.seatunnel.common.utils.StringTemplate;
 import org.apache.seatunnel.flink.FlinkEnvironment;
 import org.apache.seatunnel.flink.batch.FlinkBatchSink;
@@ -28,7 +29,7 @@ import org.apache.seatunnel.flink.stream.FlinkStreamSink;
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import org.apache.flink.api.common.io.FileOutputFormat;
-import org.apache.flink.api.common.serialization.Encoder;
+import org.apache.flink.api.common.serialization.SimpleStringEncoder;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.io.TextOutputFormat;
 import org.apache.flink.api.java.operators.DataSink;
@@ -37,12 +38,14 @@ import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
 import 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
 import org.apache.flink.types.Row;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.PrintStream;
+import java.util.concurrent.TimeUnit;
 
 public class FileSink implements FlinkStreamSink, FlinkBatchSink {
 
@@ -56,6 +59,18 @@ public class FileSink implements FlinkStreamSink, 
FlinkBatchSink {
     private static final String PARALLELISM = "parallelism";
     private static final String PATH_TIME_FORMAT = "path_time_format";
     private static final String DEFAULT_TIME_FORMAT = "yyyyMMddHHmmss";
+    // *********************** For stream mode config ************************
+    private static final String ROLLOVER_INTERVAL = "rollover_interval";
+    private static final long DEFAULE_ROLLOVER_INTERVAL = 60;
+    private static final String MAX_PART_SIZE = "max_part_size";
+    private static final long DEFAULT_MAX_PART_SIZE = 1024;
+    private static final String PART_PREFIX = "prefix";
+    private static final String DEFAULT_PART_PREFIX = "seatunnel";
+    private static final String PART_SUFFIX = "suffix";
+    private static final String DEFAULT_PART_SUFFIX = ".ext";
+    private static final long MB = 1024 * 1024;
+    // ***********************************************************************
+
     private Config config;
 
     private FileOutputFormat<Row> outputFormat;
@@ -64,14 +79,21 @@ public class FileSink implements FlinkStreamSink, 
FlinkBatchSink {
 
     @Override
     public DataStreamSink<Row> outputStream(FlinkEnvironment env, 
DataStream<Row> dataStream) {
+        final DefaultRollingPolicy<Row, String> rollingPolicy = 
DefaultRollingPolicy.builder()
+            .withMaxPartSize(MB * TypesafeConfigUtils.getConfig(config, 
MAX_PART_SIZE, DEFAULT_MAX_PART_SIZE))
+            .withRolloverInterval(
+                
TimeUnit.MINUTES.toMillis(TypesafeConfigUtils.getConfig(config, 
ROLLOVER_INTERVAL, DEFAULE_ROLLOVER_INTERVAL)))
+            .build();
+        OutputFileConfig outputFileConfig = OutputFileConfig.builder()
+            .withPartPrefix(TypesafeConfigUtils.getConfig(config, PART_PREFIX, 
DEFAULT_PART_PREFIX))
+            .withPartSuffix(TypesafeConfigUtils.getConfig(config, PART_SUFFIX, 
DEFAULT_PART_SUFFIX))
+            .build();
 
         final StreamingFileSink<Row> sink = StreamingFileSink
-                .forRowFormat(filePath, (Encoder<Row>) (element, stream) -> {
-                    try (PrintStream out = new PrintStream(stream)) {
-                        out.println(element);
-                    }
-                })
-                .build();
+            .forRowFormat(filePath, new SimpleStringEncoder<Row>())
+            .withRollingPolicy(rollingPolicy)
+            .withOutputFileConfig(outputFileConfig)
+            .build();
         return dataStream.addSink(sink);
     }
 
@@ -124,7 +146,7 @@ public class FileSink implements FlinkStreamSink, 
FlinkBatchSink {
 
     @Override
     public void prepare(FlinkEnvironment env) {
-        String format = config.hasPath(PATH_TIME_FORMAT) ? 
config.getString(PATH_TIME_FORMAT) : DEFAULT_TIME_FORMAT;
+        String format = TypesafeConfigUtils.getConfig(config, 
PATH_TIME_FORMAT, DEFAULT_TIME_FORMAT);
         String path = StringTemplate.substitute(config.getString(PATH), 
format);
         filePath = new Path(path);
     }

Reply via email to