This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 660d715  [Feature][Connector] add console limit and batch flink fake 
source (#1507)
660d715 is described below

commit 660d7158958bac1a5412c97d4420501f50324e47
Author: TrickyZerg <[email protected]>
AuthorDate: Sat Mar 19 20:56:46 2022 +0800

    [Feature][Connector] add console limit and batch flink fake source (#1507)
    
    * add console limit and batch flink fake
    * add age limit
---
 .../en/flink/configuration/sink-plugins/Console.md |  7 ++-
 docs/en/flink/configuration/source-plugins/Fake.md | 14 ++++-
 .../apache/seatunnel/flink/sink/ConsoleSink.java   |  6 +-
 .../apache/seatunnel/flink/source/FakeSource.java  | 70 ++++++++++++++++++++++
 .../org.apache.seatunnel.flink.BaseFlinkSource     |  1 +
 5 files changed, 94 insertions(+), 4 deletions(-)

diff --git a/docs/en/flink/configuration/sink-plugins/Console.md 
b/docs/en/flink/configuration/sink-plugins/Console.md
index 47d0af2..9ae4ee2 100644
--- a/docs/en/flink/configuration/sink-plugins/Console.md
+++ b/docs/en/flink/configuration/sink-plugins/Console.md
@@ -9,9 +9,14 @@ Used for functional testing and debugging, the results will be 
output in the std
 ## Options
 
 | name           | type   | required | default value |
-| -------------- | ------ | -------- | ------------- |
+|----------------|--------| -------- |---------------|
+| limit          | int    | no       | INT_MAX       |
 | common-options | string | no       | -             |
 
+### limit [int]
+
+limit console result lines
+
 ### common options [string]
 
 Sink plugin common parameters, please refer to [Sink Plugin](./sink-plugin.md) 
for details
diff --git a/docs/en/flink/configuration/source-plugins/Fake.md 
b/docs/en/flink/configuration/source-plugins/Fake.md
index df5cc74..efd5e7f 100644
--- a/docs/en/flink/configuration/source-plugins/Fake.md
+++ b/docs/en/flink/configuration/source-plugins/Fake.md
@@ -1,6 +1,7 @@
 # Fake
 
-> Source plugin : Fake [Flink]
+> Source plugin : FakeSource [Flink]  
+> Source plugin : FakeSourceStream [Flink]
 
 ## Description
 
@@ -15,7 +16,7 @@
 
 ### parallelism [`Int`]
 
-The parallelism of an individual operator, for Fake Source
+The parallelism of an individual operator, for Fake Source Stream
 
 ### common options [`string`]
 
@@ -31,3 +32,12 @@ source {
     }
 }
 ```
+
+```bash
+source {
+    FakeSource {
+      result_table_name = "fake"
+      field_name = "name,age"
+    }
+}
+```
diff --git 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-console/src/main/java/org/apache/seatunnel/flink/sink/ConsoleSink.java
 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-console/src/main/java/org/apache/seatunnel/flink/sink/ConsoleSink.java
index e57d72e..0304088 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-console/src/main/java/org/apache/seatunnel/flink/sink/ConsoleSink.java
+++ 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-console/src/main/java/org/apache/seatunnel/flink/sink/ConsoleSink.java
@@ -37,6 +37,7 @@ import org.slf4j.LoggerFactory;
 public class ConsoleSink extends RichOutputFormat<Row> implements 
FlinkBatchSink, FlinkStreamSink {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(ConsoleSink.class);
+    private Integer limit = Integer.MAX_VALUE;
 
     private static final long serialVersionUID = 3482649370594181723L;
     private Config config;
@@ -44,7 +45,7 @@ public class ConsoleSink extends RichOutputFormat<Row> 
implements FlinkBatchSink
     @Override
     public DataSink<Row> outputBatch(FlinkEnvironment env, DataSet<Row> 
rowDataSet) {
         try {
-            rowDataSet.print();
+            rowDataSet.first(limit).print();
         } catch (Exception e) {
             LOGGER.error("Failed to print result! ", e);
         }
@@ -68,6 +69,9 @@ public class ConsoleSink extends RichOutputFormat<Row> 
implements FlinkBatchSink
 
     @Override
     public CheckResult checkConfig() {
+        if (config.hasPath("limit") && config.getInt("limit") >= -1) {
+            limit = config.getInt("limit");
+        }
         return CheckResult.success();
     }
 
diff --git 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-fake/src/main/java/org/apache/seatunnel/flink/source/FakeSource.java
 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-fake/src/main/java/org/apache/seatunnel/flink/source/FakeSource.java
new file mode 100644
index 0000000..54e48fd
--- /dev/null
+++ 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-fake/src/main/java/org/apache/seatunnel/flink/source/FakeSource.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.flink.source;
+
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.flink.FlinkEnvironment;
+import org.apache.seatunnel.flink.batch.FlinkBatchSource;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.types.Row;
+
+import java.util.Arrays;
+import java.util.Random;
+import java.util.stream.Collectors;
+
+public class FakeSource implements FlinkBatchSource {
+
+    private static final String[] NAME_ARRAY = new String[]{"Gary", "Ricky 
Huo", "Kid Xiong"};
+    private Config config;
+    private static final int AGE_LIMIT = 100;
+
+    @Override
+    public void setConfig(Config config) {
+        this.config = config;
+    }
+
+    @Override
+    public Config getConfig() {
+        return config;
+    }
+
+    @Override
+    public CheckResult checkConfig() {
+        return CheckResult.success();
+    }
+
+    @Override
+    public void prepare(FlinkEnvironment prepareEnv) {
+
+    }
+
+    @Override
+    public DataSet<Row> getData(FlinkEnvironment env) {
+        Random random = new Random();
+        return env.getBatchTableEnvironment().toDataSet(
+                env.getBatchTableEnvironment().fromValues(
+                        DataTypes.ROW(DataTypes.FIELD("name", 
DataTypes.STRING()),
+                                DataTypes.FIELD("age", DataTypes.INT())),
+                        Arrays.stream(NAME_ARRAY).map(n -> Row.of(n, 
random.nextInt(AGE_LIMIT)))
+                                .collect(Collectors.toList())), Row.class);
+    }
+}
diff --git 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-fake/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSource
 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-fake/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSource
index 5045e50..337ef61 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-fake/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSource
+++ 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-fake/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSource
@@ -15,4 +15,5 @@
 # limitations under the License.
 #
 
+org.apache.seatunnel.flink.source.FakeSource
 org.apache.seatunnel.flink.source.FakeSourceStream

Reply via email to