reswqa commented on code in PR #21774:
URL: https://github.com/apache/flink/pull/21774#discussion_r1239979075


##########
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/util/CarGeneratorFunction.java:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.windowing.util;
+
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.connector.datagen.source.GeneratorFunction;
+
+import java.util.Arrays;
+import java.util.Random;
+
+/** A simple in-memory source. */

Review Comment:
   Strictly speaking, this is not a `source`. Btw, maybe we should add more 
description about this function.



##########
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java:
##########
@@ -99,7 +104,20 @@ public static void main(String[] args) throws Exception {
                             .map(new ParseCarData())
                             .name("parse-input");
         } else {
-            carData = 
env.addSource(CarSource.create(2)).name("in-memory-source");
+            CarGeneratorFunction carGenerator = new CarGeneratorFunction(2);
+            DataGeneratorSource<Tuple4<Integer, Integer, Double, Long>> 
carGeneratorSource =
+                    new DataGeneratorSource<>(
+                            carGenerator,
+                            Long.MAX_VALUE,
+                            parallelismIgnored -> new GuavaRateLimiter(1),
+                            TypeInformation.of(
+                                    new TypeHint<Tuple4<Integer, Integer, 
Double, Long>>() {}));
+            carData =
+                    env.fromSource(
+                            carGeneratorSource,
+                            WatermarkStrategy.noWatermarks(),

Review Comment:
   Just to confirm, is the `emitWatermark` in the previous `CarSource` 
redundant(or rather meaningless) as we will assign watermarks in line `126`.?



##########
flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/connectors/SocketSource.java:
##########
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.examples.java.connectors;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.table.data.RowData;
+import 
org.apache.flink.table.examples.java.connectors.SocketSource.DummyCheckpoint;
+import org.apache.flink.table.examples.java.connectors.SocketSource.DummySplit;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * The {@link SocketSource} opens a socket and consumes bytes.
+ *
+ * <p>It splits records by the given byte delimiter (`\n` by default) and 
delegates the decoding to
+ * a pluggable {@link DeserializationSchema}.
+ *
+ * <p>Note: This is only an example and should not be used in production. The 
source is not
+ * fault-tolerant and can only work with a parallelism of 1.
+ */
+public final class SocketSource
+        implements Source<RowData, DummySplit, DummyCheckpoint>, 
ResultTypeQueryable<RowData> {
+
+    private final String hostname;
+    private final int port;
+    private final byte byteDelimiter;
+    private final DeserializationSchema<RowData> deserializer;
+
+    public SocketSource(
+            String hostname,
+            int port,
+            byte byteDelimiter,
+            DeserializationSchema<RowData> deserializer) {
+        this.hostname = hostname;
+        this.port = port;
+        this.byteDelimiter = byteDelimiter;
+        this.deserializer = deserializer;
+    }
+
+    @Override
+    public TypeInformation<RowData> getProducedType() {
+        return deserializer.getProducedType();
+    }
+
+    @Override
+    public Boundedness getBoundedness() {
+        return Boundedness.CONTINUOUS_UNBOUNDED;
+    }
+
+    @Override
+    public SplitEnumerator<DummySplit, DummyCheckpoint> createEnumerator(
+            SplitEnumeratorContext<DummySplit> enumContext) throws Exception {
+        // The socket itself implicitly represents the only split and the 
enumerator is not
+        // utilized.
+        return null;
+    }
+
+    @Override
+    public SplitEnumerator<DummySplit, DummyCheckpoint> restoreEnumerator(
+            SplitEnumeratorContext<DummySplit> enumContext, DummyCheckpoint 
checkpoint)
+            throws Exception {
+        // This source is not fault-tolerant.
+        return null;
+    }
+
+    @Override
+    public SimpleVersionedSerializer<DummySplit> getSplitSerializer() {
+        return new NoOpDummySplitSerializer();
+    }
+
+    @Override
+    public SimpleVersionedSerializer<DummyCheckpoint> 
getEnumeratorCheckpointSerializer() {
+        // This source is not fault-tolerant.
+        return null;
+    }
+
+    @Override
+    public SourceReader<RowData, DummySplit> createReader(SourceReaderContext 
readerContext)
+            throws Exception {
+        return new SocketReader();
+    }
+
+    /**
+     * Placeholder because the socket itself implicitly represents the only 
split and does not
+     * require an actual split object.
+     */
+    public static class DummySplit implements SourceSplit {
+        @Override
+        public String splitId() {
+            return "dummy";
+        }
+    }
+
+    /**
+     * Placeholder because the SocketSource does not support fault-tolerance 
and thus does not
+     * require actual checkpointing.
+     */
+    public static class DummyCheckpoint {}
+
+    private class SocketReader implements SourceReader<RowData, DummySplit> {
+
+        private Socket socket;
+        private ByteArrayOutputStream buffer;
+        private InputStream stream;
+        int b;
+
+        @Override
+        public void start() {
+            try {
+                socket = new Socket();
+                socket.connect(new InetSocketAddress(hostname, port), 0);
+                buffer = new ByteArrayOutputStream();
+                stream = socket.getInputStream();
+            } catch (Throwable t) {
+                t.printStackTrace(); // print and continue

Review Comment:
   If we execute job with this source but does not setup a `socket` bind to 
specific port, this job will be failed with:
   ```
   java.lang.NullPointerException: null
        at 
org.apache.flink.table.examples.java.connectors.SocketSource$SocketReader.pollNext(SocketSource.java:156)
 ~[classes/:?]
   ```
   This is very unfriendly to the user. In the old version source function, we 
will always wait for the connection to be established.



##########
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java:
##########
@@ -76,8 +79,19 @@ public static void main(String[] args) throws Exception {
         // obtain execution environment
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 
+        DataGeneratorSource<Integer> generatorSource =
+                new DataGeneratorSource<>(
+                        Long::intValue,
+                        Integer.MAX_VALUE,
+                        RateLimiterStrategy.perSecond(100),
+                        Types.INT);
+
         // create input stream of a single integer
-        DataStream<Integer> inputStream = env.addSource(new SimpleSource());
+        DataStream<Integer> inputStream =
+                env.fromSource(
+                        generatorSource,
+                        WatermarkStrategy.noWatermarks(),
+                        "Generated tuples Source");

Review Comment:
   Why this is `tuples` source?



##########
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java:
##########
@@ -99,7 +104,20 @@ public static void main(String[] args) throws Exception {
                             .map(new ParseCarData())
                             .name("parse-input");
         } else {
-            carData = 
env.addSource(CarSource.create(2)).name("in-memory-source");
+            CarGeneratorFunction carGenerator = new CarGeneratorFunction(2);
+            DataGeneratorSource<Tuple4<Integer, Integer, Double, Long>> 
carGeneratorSource =
+                    new DataGeneratorSource<>(
+                            carGenerator,
+                            Long.MAX_VALUE,
+                            parallelismIgnored -> new GuavaRateLimiter(1),

Review Comment:
   Does this mean that the speed of data generation has slowed down compared to 
the previous `CarSource`? I did not carefully confirm whether this has an 
impact on this example.



##########
flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/connectors/SocketSource.java:
##########
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.examples.java.connectors;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.table.data.RowData;
+import 
org.apache.flink.table.examples.java.connectors.SocketSource.DummyCheckpoint;
+import org.apache.flink.table.examples.java.connectors.SocketSource.DummySplit;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * The {@link SocketSource} opens a socket and consumes bytes.
+ *
+ * <p>It splits records by the given byte delimiter (`\n` by default) and 
delegates the decoding to
+ * a pluggable {@link DeserializationSchema}.
+ *
+ * <p>Note: This is only an example and should not be used in production. The 
source is not
+ * fault-tolerant and can only work with a parallelism of 1.
+ */
+public final class SocketSource
+        implements Source<RowData, DummySplit, DummyCheckpoint>, 
ResultTypeQueryable<RowData> {
+
+    private final String hostname;
+    private final int port;
+    private final byte byteDelimiter;
+    private final DeserializationSchema<RowData> deserializer;
+
+    public SocketSource(
+            String hostname,
+            int port,
+            byte byteDelimiter,
+            DeserializationSchema<RowData> deserializer) {

Review Comment:
   I'm not aware where we opened it but this should be necessary for some 
`DeserializationSchema`.



##########
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java:
##########
@@ -64,24 +67,19 @@ public static void main(String[] args) throws Exception {
         // We expect to detect session "b" and "c" at this point as well
         input.add(new Tuple3<>("c", 11L, 1));
 
-        DataStream<Tuple3<String, Long, Integer>> source =
-                env.addSource(
-                        new SourceFunction<Tuple3<String, Long, Integer>>() {
-                            private static final long serialVersionUID = 1L;
-
-                            @Override
-                            public void run(SourceContext<Tuple3<String, Long, 
Integer>> ctx)
-                                    throws Exception {
-                                for (Tuple3<String, Long, Integer> value : 
input) {
-                                    ctx.collectWithTimestamp(value, value.f1);

Review Comment:
   In the new implementation, it seems that no code related to extracting 
timestamps. Did I miss something? 
   
   I ran both the pre-modified and post-modified code base, and the output of 
both seems to be different. 🤔 



##########
flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/connectors/SocketSourceFunction.java:
##########
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.examples.java.connectors;
-
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import 
org.apache.flink.api.common.serialization.RuntimeContextInitializationContextAdapters;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
-import org.apache.flink.table.data.RowData;
-
-import java.io.ByteArrayOutputStream;
-import java.io.InputStream;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-
-/**
- * The {@link SocketSourceFunction} opens a socket and consumes bytes.
- *
- * <p>It splits records by the given byte delimiter (`\n` by default) and 
delegates the decoding to
- * a pluggable {@link DeserializationSchema}.
- *
- * <p>Note: This is only an example and should not be used in production. The 
source function is not
- * fault-tolerant and can only work with a parallelism of 1.
- */
-public final class SocketSourceFunction extends RichSourceFunction<RowData>
-        implements ResultTypeQueryable<RowData> {
-
-    private final String hostname;
-    private final int port;
-    private final byte byteDelimiter;
-    private final DeserializationSchema<RowData> deserializer;
-
-    private volatile boolean isRunning = true;
-    private Socket currentSocket;
-
-    public SocketSourceFunction(
-            String hostname,
-            int port,
-            byte byteDelimiter,
-            DeserializationSchema<RowData> deserializer) {
-        this.hostname = hostname;
-        this.port = port;
-        this.byteDelimiter = byteDelimiter;
-        this.deserializer = deserializer;
-    }
-
-    @Override
-    public TypeInformation<RowData> getProducedType() {
-        return deserializer.getProducedType();
-    }
-
-    @Override
-    public void open(Configuration parameters) throws Exception {
-        deserializer.open(
-                
RuntimeContextInitializationContextAdapters.deserializationAdapter(
-                        getRuntimeContext()));
-    }
-
-    @Override
-    public void run(SourceContext<RowData> ctx) throws Exception {
-        while (isRunning) {
-            // open and consume from socket
-            try (final Socket socket = new Socket()) {
-                currentSocket = socket;
-                socket.connect(new InetSocketAddress(hostname, port), 0);
-                try (InputStream stream = socket.getInputStream()) {
-                    ByteArrayOutputStream buffer = new ByteArrayOutputStream();
-                    int b;
-                    while ((b = stream.read()) >= 0) {
-                        // buffer until delimiter
-                        if (b != byteDelimiter) {
-                            buffer.write(b);
-                        }
-                        // decode and emit record
-                        else {
-                            
ctx.collect(deserializer.deserialize(buffer.toByteArray()));
-                            buffer.reset();
-                        }
-                    }
-                }
-            } catch (Throwable t) {
-                t.printStackTrace(); // print and continue

Review Comment:
   This will not fail(only print stack trace to stdout) the source task if we 
get a record from socket with wrong format. But the new implementation will 
trigger failover. I tend to prefer the old behavior.



##########
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java:
##########
@@ -96,7 +100,21 @@ public static void main(String[] args) throws Exception {
         } else {
             System.out.println("Executing Iterate example with default input 
data set.");
             System.out.println("Use --input to specify file input.");
-            inputStream = env.addSource(new RandomFibonacciSource());
+
+            GeneratorFunction<Long, Tuple2<Integer, Integer>> dataGenerator =
+                    new RandomFibonacciGenerator();
+            DataGeneratorSource<Tuple2<Integer, Integer>> generatorSource =
+                    new DataGeneratorSource<>(
+                            dataGenerator,
+                            BOUND,
+                            RateLimiterStrategy.perSecond(20),
+                            TypeInformation.of(new TypeHint<Tuple2<Integer, 
Integer>>() {}));

Review Comment:
   Could `Types.TUPLE(Types.INT, Types.INT)` work here? If possible, it would 
be more concise.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to