This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/api-draft by this push:
     new b16f382c fix seatunnel source parallelism (#1892)
b16f382c is described below

commit b16f382c390dfecd17a10ba35290a81f2d306214
Author: Wenjun Ruan <[email protected]>
AuthorDate: Tue May 17 12:24:54 2022 +0800

    fix seatunnel source parallelism (#1892)
---
 .../org/apache/seatunnel/flink/FlinkEnvironment.java  |  2 +-
 .../seatunnel/console/sink/ConsoleSinkWriter.java     |  4 +++-
 .../seatunnel/fake/source/FakeSourceReader.java       | 19 +++++++++++--------
 .../flink/command/SeaTunnelApiTaskExecuteCommand.java |  5 +++--
 4 files changed, 18 insertions(+), 12 deletions(-)

diff --git 
a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/FlinkEnvironment.java
 
b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/FlinkEnvironment.java
index 5df4a8be..a2d57234 100644
--- 
a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/FlinkEnvironment.java
+++ 
b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/FlinkEnvironment.java
@@ -121,7 +121,7 @@ public class FlinkEnvironment implements RuntimeEnv {
 
     @Override
     public void registerPlugin(List<URL> pluginPaths) {
-        LOGGER.info("register plugins :" + pluginPaths);
+        pluginPaths.forEach(url -> LOGGER.info("register plugins : {}", url));
         Configuration configuration;
         try {
             if (isStreaming()) {
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java
index e395ad3e..af54609d 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java
@@ -24,6 +24,8 @@ import 
org.apache.seatunnel.connectors.seatunnel.console.state.ConsoleState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Arrays;
+
 public class ConsoleSinkWriter implements SinkWriter<SeaTunnelRow, 
ConsoleCommitInfo, ConsoleState> {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(ConsoleSinkWriter.class);
@@ -31,7 +33,7 @@ public class ConsoleSinkWriter implements 
SinkWriter<SeaTunnelRow, ConsoleCommit
     @Override
     @SuppressWarnings("checkstyle:RegexpSingleline")
     public void write(SeaTunnelRow element) {
-        System.out.println(element.toString());
+        System.out.println(Arrays.toString(element.getFields()));
     }
 
     @Override
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
index e6bdd0d4..be8b04f1 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
@@ -38,7 +38,7 @@ public class FakeSourceReader implements 
SourceReader<SeaTunnelRow, FakeSourceSp
     private final SourceReader.Context context;
 
     private final String[] names = {"Wenjun", "Fanjia", "Zongwen", 
"CalvinKirs"};
-    private final int[] ages = {1024, 2048, 4096, 8192};
+    private final int[] ages = {11, 22, 33, 44};
     private final Random random = ThreadLocalRandom.current();
 
     public FakeSourceReader(SourceReader.Context context) {
@@ -58,13 +58,16 @@ public class FakeSourceReader implements 
SourceReader<SeaTunnelRow, FakeSourceSp
     @Override
     @SuppressWarnings("magicnumber")
     public void pollNext(Collector<SeaTunnelRow> output) throws 
InterruptedException {
-        int i = random.nextInt(names.length);
-        Map<String, Object> fieldMap = new HashMap<>(4);
-        fieldMap.put("name", names[i]);
-        fieldMap.put("age", ages[i]);
-        fieldMap.put("timestamp", System.currentTimeMillis());
-        SeaTunnelRow seaTunnelRow = new SeaTunnelRow(new Object[]{names[i], 
ages[i], System.currentTimeMillis()}, fieldMap);
-        output.collect(seaTunnelRow);
+        // Generate a random number of rows to emit.
+        for (int i = 0; i < random.nextInt(10); i++) {
+            int randomIndex = random.nextInt(names.length);
+            Map<String, Object> fieldMap = new HashMap<>(4);
+            fieldMap.put("name", names[randomIndex]);
+            fieldMap.put("age", ages[randomIndex]);
+            fieldMap.put("timestamp", System.currentTimeMillis());
+            SeaTunnelRow seaTunnelRow = new SeaTunnelRow(new 
Object[]{names[randomIndex], ages[randomIndex], System.currentTimeMillis()}, 
fieldMap);
+            output.collect(seaTunnelRow);
+        }
         if (Boundedness.BOUNDED.equals(context.getBoundedness())) {
             // signal to the source that we have reached the end of the data.
             context.signalNoMoreElement();
diff --git 
a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/SeaTunnelApiTaskExecuteCommand.java
 
b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/SeaTunnelApiTaskExecuteCommand.java
index e563a3d9..14a23cd5 100644
--- 
a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/SeaTunnelApiTaskExecuteCommand.java
+++ 
b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/SeaTunnelApiTaskExecuteCommand.java
@@ -68,11 +68,12 @@ public class SeaTunnelApiTaskExecuteCommand implements 
Command<FlinkCommandArgs>
         Path configFile = FileUtils.getConfigPath(flinkCommandArgs);
 
         Config config = new ConfigBuilder(configFile).getConfig();
+        FlinkEnvironment flinkEnvironment = getFlinkEnvironment(config);
+
         SeaTunnelParallelSource source = getSource(config);
         // todo: add basic type
         Sink<WrappedRow, Object, Object, Object> flinkSink = getSink(config);
 
-        FlinkEnvironment flinkEnvironment = getFlinkEnvironment(config);
         registerPlugins(flinkEnvironment);
 
         StreamExecutionEnvironment streamExecutionEnvironment = 
flinkEnvironment.getStreamExecutionEnvironment();
@@ -129,7 +130,7 @@ public class SeaTunnelApiTaskExecuteCommand implements 
Command<FlinkCommandArgs>
     private FlinkEnvironment getFlinkEnvironment(Config config) {
         FlinkEnvironment flinkEnvironment = new FlinkEnvironment();
         flinkEnvironment.setJobMode(JobMode.STREAMING);
-        flinkEnvironment.setConfig(config);
+        flinkEnvironment.setConfig(config.getConfig("env"));
         flinkEnvironment.prepare();
 
         return flinkEnvironment;

Reply via email to