This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 660d715 [Feature][Connector] add console limit and batch flink fake
source (#1507)
660d715 is described below
commit 660d7158958bac1a5412c97d4420501f50324e47
Author: TrickyZerg <[email protected]>
AuthorDate: Sat Mar 19 20:56:46 2022 +0800
[Feature][Connector] add console limit and batch flink fake source (#1507)
* add console limit and batch flink fake
* add age limit
---
.../en/flink/configuration/sink-plugins/Console.md | 7 ++-
docs/en/flink/configuration/source-plugins/Fake.md | 14 ++++-
.../apache/seatunnel/flink/sink/ConsoleSink.java | 6 +-
.../apache/seatunnel/flink/source/FakeSource.java | 70 ++++++++++++++++++++++
.../org.apache.seatunnel.flink.BaseFlinkSource | 1 +
5 files changed, 94 insertions(+), 4 deletions(-)
diff --git a/docs/en/flink/configuration/sink-plugins/Console.md
b/docs/en/flink/configuration/sink-plugins/Console.md
index 47d0af2..9ae4ee2 100644
--- a/docs/en/flink/configuration/sink-plugins/Console.md
+++ b/docs/en/flink/configuration/sink-plugins/Console.md
@@ -9,9 +9,14 @@ Used for functional testing and debugging, the results will be
output in the std
## Options
| name | type | required | default value |
-| -------------- | ------ | -------- | ------------- |
+|----------------|--------| -------- |---------------|
+| limit | int | no | INT_MAX |
| common-options | string | no | - |
+### limit [int]
+
+limit console result lines
+
### common options [string]
Sink plugin common parameters, please refer to [Sink Plugin](./sink-plugin.md)
for details
diff --git a/docs/en/flink/configuration/source-plugins/Fake.md
b/docs/en/flink/configuration/source-plugins/Fake.md
index df5cc74..efd5e7f 100644
--- a/docs/en/flink/configuration/source-plugins/Fake.md
+++ b/docs/en/flink/configuration/source-plugins/Fake.md
@@ -1,6 +1,7 @@
# Fake
-> Source plugin : Fake [Flink]
+> Source plugin : FakeSource [Flink]
+> Source plugin : FakeSourceStream [Flink]
## Description
@@ -15,7 +16,7 @@
### parallelism [`Int`]
-The parallelism of an individual operator, for Fake Source
+The parallelism of an individual operator, for Fake Source Stream
### common options [`string`]
@@ -31,3 +32,12 @@ source {
}
}
```
+
+```bash
+source {
+ FakeSource {
+ result_table_name = "fake"
+ field_name = "name,age"
+ }
+}
+```
diff --git
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-console/src/main/java/org/apache/seatunnel/flink/sink/ConsoleSink.java
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-console/src/main/java/org/apache/seatunnel/flink/sink/ConsoleSink.java
index e57d72e..0304088 100644
---
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-console/src/main/java/org/apache/seatunnel/flink/sink/ConsoleSink.java
+++
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-console/src/main/java/org/apache/seatunnel/flink/sink/ConsoleSink.java
@@ -37,6 +37,7 @@ import org.slf4j.LoggerFactory;
public class ConsoleSink extends RichOutputFormat<Row> implements
FlinkBatchSink, FlinkStreamSink {
private static final Logger LOGGER =
LoggerFactory.getLogger(ConsoleSink.class);
+ private Integer limit = Integer.MAX_VALUE;
private static final long serialVersionUID = 3482649370594181723L;
private Config config;
@@ -44,7 +45,7 @@ public class ConsoleSink extends RichOutputFormat<Row>
implements FlinkBatchSink
@Override
public DataSink<Row> outputBatch(FlinkEnvironment env, DataSet<Row>
rowDataSet) {
try {
- rowDataSet.print();
+ rowDataSet.first(limit).print();
} catch (Exception e) {
LOGGER.error("Failed to print result! ", e);
}
@@ -68,6 +69,9 @@ public class ConsoleSink extends RichOutputFormat<Row>
implements FlinkBatchSink
@Override
public CheckResult checkConfig() {
+ if (config.hasPath("limit") && config.getInt("limit") >= -1) {
+ limit = config.getInt("limit");
+ }
return CheckResult.success();
}
diff --git
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-fake/src/main/java/org/apache/seatunnel/flink/source/FakeSource.java
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-fake/src/main/java/org/apache/seatunnel/flink/source/FakeSource.java
new file mode 100644
index 0000000..54e48fd
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-fake/src/main/java/org/apache/seatunnel/flink/source/FakeSource.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.flink.source;
+
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.flink.FlinkEnvironment;
+import org.apache.seatunnel.flink.batch.FlinkBatchSource;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.types.Row;
+
+import java.util.Arrays;
+import java.util.Random;
+import java.util.stream.Collectors;
+
+public class FakeSource implements FlinkBatchSource {
+
+ private static final String[] NAME_ARRAY = new String[]{"Gary", "Ricky
Huo", "Kid Xiong"};
+ private Config config;
+ private static final int AGE_LIMIT = 100;
+
+ @Override
+ public void setConfig(Config config) {
+ this.config = config;
+ }
+
+ @Override
+ public Config getConfig() {
+ return config;
+ }
+
+ @Override
+ public CheckResult checkConfig() {
+ return CheckResult.success();
+ }
+
+ @Override
+ public void prepare(FlinkEnvironment prepareEnv) {
+
+ }
+
+ @Override
+ public DataSet<Row> getData(FlinkEnvironment env) {
+ Random random = new Random();
+ return env.getBatchTableEnvironment().toDataSet(
+ env.getBatchTableEnvironment().fromValues(
+ DataTypes.ROW(DataTypes.FIELD("name",
DataTypes.STRING()),
+ DataTypes.FIELD("age", DataTypes.INT())),
+ Arrays.stream(NAME_ARRAY).map(n -> Row.of(n,
random.nextInt(AGE_LIMIT)))
+ .collect(Collectors.toList())), Row.class);
+ }
+}
diff --git
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-fake/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSource
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-fake/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSource
index 5045e50..337ef61 100644
---
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-fake/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSource
+++
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-fake/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSource
@@ -15,4 +15,5 @@
# limitations under the License.
#
+org.apache.seatunnel.flink.source.FakeSource
org.apache.seatunnel.flink.source.FakeSourceStream