This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/api-draft by this push:
new b16f382c fix seatunnel source parallelism (#1892)
b16f382c is described below
commit b16f382c390dfecd17a10ba35290a81f2d306214
Author: Wenjun Ruan <[email protected]>
AuthorDate: Tue May 17 12:24:54 2022 +0800
fix seatunnel source parallelism (#1892)
---
.../org/apache/seatunnel/flink/FlinkEnvironment.java | 2 +-
.../seatunnel/console/sink/ConsoleSinkWriter.java | 4 +++-
.../seatunnel/fake/source/FakeSourceReader.java | 19 +++++++++++--------
.../flink/command/SeaTunnelApiTaskExecuteCommand.java | 5 +++--
4 files changed, 18 insertions(+), 12 deletions(-)
diff --git
a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/FlinkEnvironment.java
b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/FlinkEnvironment.java
index 5df4a8be..a2d57234 100644
---
a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/FlinkEnvironment.java
+++
b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/FlinkEnvironment.java
@@ -121,7 +121,7 @@ public class FlinkEnvironment implements RuntimeEnv {
@Override
public void registerPlugin(List<URL> pluginPaths) {
- LOGGER.info("register plugins :" + pluginPaths);
+ pluginPaths.forEach(url -> LOGGER.info("register plugins : {}", url));
Configuration configuration;
try {
if (isStreaming()) {
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java
index e395ad3e..af54609d 100644
---
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java
@@ -24,6 +24,8 @@ import
org.apache.seatunnel.connectors.seatunnel.console.state.ConsoleState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Arrays;
+
public class ConsoleSinkWriter implements SinkWriter<SeaTunnelRow,
ConsoleCommitInfo, ConsoleState> {
private static final Logger LOGGER =
LoggerFactory.getLogger(ConsoleSinkWriter.class);
@@ -31,7 +33,7 @@ public class ConsoleSinkWriter implements
SinkWriter<SeaTunnelRow, ConsoleCommit
@Override
@SuppressWarnings("checkstyle:RegexpSingleline")
public void write(SeaTunnelRow element) {
- System.out.println(element.toString());
+ System.out.println(Arrays.toString(element.getFields()));
}
@Override
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
index e6bdd0d4..be8b04f1 100644
---
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
@@ -38,7 +38,7 @@ public class FakeSourceReader implements
SourceReader<SeaTunnelRow, FakeSourceSp
private final SourceReader.Context context;
private final String[] names = {"Wenjun", "Fanjia", "Zongwen",
"CalvinKirs"};
- private final int[] ages = {1024, 2048, 4096, 8192};
+ private final int[] ages = {11, 22, 33, 44};
private final Random random = ThreadLocalRandom.current();
public FakeSourceReader(SourceReader.Context context) {
@@ -58,13 +58,16 @@ public class FakeSourceReader implements
SourceReader<SeaTunnelRow, FakeSourceSp
@Override
@SuppressWarnings("magicnumber")
public void pollNext(Collector<SeaTunnelRow> output) throws
InterruptedException {
- int i = random.nextInt(names.length);
- Map<String, Object> fieldMap = new HashMap<>(4);
- fieldMap.put("name", names[i]);
- fieldMap.put("age", ages[i]);
- fieldMap.put("timestamp", System.currentTimeMillis());
- SeaTunnelRow seaTunnelRow = new SeaTunnelRow(new Object[]{names[i],
ages[i], System.currentTimeMillis()}, fieldMap);
- output.collect(seaTunnelRow);
+ // Generate a random number of rows to emit.
+ for (int i = 0; i < random.nextInt(10); i++) {
+ int randomIndex = random.nextInt(names.length);
+ Map<String, Object> fieldMap = new HashMap<>(4);
+ fieldMap.put("name", names[randomIndex]);
+ fieldMap.put("age", ages[randomIndex]);
+ fieldMap.put("timestamp", System.currentTimeMillis());
+ SeaTunnelRow seaTunnelRow = new SeaTunnelRow(new
Object[]{names[randomIndex], ages[randomIndex], System.currentTimeMillis()},
fieldMap);
+ output.collect(seaTunnelRow);
+ }
if (Boundedness.BOUNDED.equals(context.getBoundedness())) {
// signal to the source that we have reached the end of the data.
context.signalNoMoreElement();
diff --git
a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/SeaTunnelApiTaskExecuteCommand.java
b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/SeaTunnelApiTaskExecuteCommand.java
index e563a3d9..14a23cd5 100644
---
a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/SeaTunnelApiTaskExecuteCommand.java
+++
b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/SeaTunnelApiTaskExecuteCommand.java
@@ -68,11 +68,12 @@ public class SeaTunnelApiTaskExecuteCommand implements
Command<FlinkCommandArgs>
Path configFile = FileUtils.getConfigPath(flinkCommandArgs);
Config config = new ConfigBuilder(configFile).getConfig();
+ FlinkEnvironment flinkEnvironment = getFlinkEnvironment(config);
+
SeaTunnelParallelSource source = getSource(config);
// todo: add basic type
Sink<WrappedRow, Object, Object, Object> flinkSink = getSink(config);
- FlinkEnvironment flinkEnvironment = getFlinkEnvironment(config);
registerPlugins(flinkEnvironment);
StreamExecutionEnvironment streamExecutionEnvironment =
flinkEnvironment.getStreamExecutionEnvironment();
@@ -129,7 +130,7 @@ public class SeaTunnelApiTaskExecuteCommand implements
Command<FlinkCommandArgs>
private FlinkEnvironment getFlinkEnvironment(Config config) {
FlinkEnvironment flinkEnvironment = new FlinkEnvironment();
flinkEnvironment.setJobMode(JobMode.STREAMING);
- flinkEnvironment.setConfig(config);
+ flinkEnvironment.setConfig(config.getConfig("env"));
flinkEnvironment.prepare();
return flinkEnvironment;