reswqa commented on code in PR #26118:
URL: https://github.com/apache/flink/pull/26118#discussion_r1950176974


##########
flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/DSv2ExamplesITCase.java:
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.test.examples;
+
+import org.apache.flink.streaming.examples.dsv2.eventtime.CountNewsClicks;
+import 
org.apache.flink.streaming.examples.dsv2.eventtime.util.CountNewsClicksData;
+import org.apache.flink.streaming.examples.dsv2.join.util.JoinData;
+import 
org.apache.flink.streaming.examples.dsv2.windowing.util.CountProductSalesWindowingData;
+import org.apache.flink.test.testdata.WordCountData;
+import org.apache.flink.test.util.AbstractTestBase;
+
+import org.junit.jupiter.api.Test;
+
+import static 
org.apache.flink.test.util.TestBaseUtils.compareResultsByLinesInMemory;
+
+/** Integration test for DataStream API V2 examples. */
+class DSv2ExamplesITCase extends AbstractTestBase {
+
+    @Test
+    void testStreamingWordCount() throws Exception {

Review Comment:
   ```suggestion
       void testWordCount() throws Exception {
   ```



##########
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/dsv2/watermark/CountSales.java:
##########
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.dsv2.watermark;
+
+import org.apache.flink.api.common.state.StateDeclaration;
+import org.apache.flink.api.common.state.StateDeclarations;
+import org.apache.flink.api.common.state.ValueStateDeclaration;
+import org.apache.flink.api.common.state.v2.ValueState;
+import org.apache.flink.api.common.typeinfo.TypeDescriptors;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.watermark.LongWatermark;
+import org.apache.flink.api.common.watermark.LongWatermarkDeclaration;
+import org.apache.flink.api.common.watermark.Watermark;
+import org.apache.flink.api.common.watermark.WatermarkDeclaration;
+import org.apache.flink.api.common.watermark.WatermarkDeclarations;
+import org.apache.flink.api.common.watermark.WatermarkHandlingResult;
+import org.apache.flink.api.connector.dsv2.WrappedSink;
+import org.apache.flink.api.connector.dsv2.WrappedSource;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.connector.datagen.source.DataGeneratorSource;
+import org.apache.flink.connector.datagen.source.GeneratorFunction;
+import org.apache.flink.datastream.api.ExecutionEnvironment;
+import org.apache.flink.datastream.api.common.Collector;
+import org.apache.flink.datastream.api.context.NonPartitionedContext;
+import org.apache.flink.datastream.api.context.PartitionedContext;
+import org.apache.flink.datastream.api.function.ApplyPartitionFunction;
+import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction;
+import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream;
+import org.apache.flink.streaming.api.functions.sink.PrintSink;
+import org.apache.flink.streaming.examples.dsv2.eventtime.CountNewsClicks;
+import org.apache.flink.util.ParameterTool;
+
+import java.time.Duration;
+import java.util.Random;
+import java.util.Set;
+
+/**
+ * This example shows how to count the cumulative sales of each product at the 
moment. In this
+ * example, we simulate the creation and propagation of event time watermarks 
through {@link
+ * Watermark}.
+ *
+ * <p>Please note that Flink provides users with an event time extension to 
support event time. You
+ * can refer to the {@link CountNewsClicks} example.
+ *
+ * <p>The example uses a {@link DataGeneratorSource} as input source and a 
custom {@link Watermark}
+ * to propagate the event time in the stream.
+ *
+ * <p>Usage:
+ *
+ * <ul>
+ *   <li><code>--parallelism &lt;path&gt;</code>The parallelism of the source. 
The default value is
+ *       5.
+ * </ul>
+ *
+ * <p>This example shows how to:
+ *
+ * <ul>
+ *   <li>How to define and propagate custom event (Watermark) in DataStream 
API V2
+ * </ul>
+ *
+ * <p>Please note that if you intend to run this example in an IDE, you must 
first add the following
+ * VM options: "--add-opens=java.base/java.util=ALL-UNNAMED". This is 
necessary because the module
+ * system in JDK 17+ restricts some reflection operations.
+ *
+ * <p>Please note that the DataStream API V2 is a new set of APIs, to 
gradually replace the original
+ * DataStream API. It is currently in the experimental stage and is not fully 
available for
+ * production.
+ */
+public class CountSales {
+
+    /** Pojo class for Order. */
+    public static class Order {
+        public long productId;
+        public double totalPrice;
+        public long timestamp;
+
+        public Order(long productId, double price, long timestamp) {
+            this.productId = productId;
+            this.totalPrice = price;
+            this.timestamp = timestamp;
+        }
+    }
+
+    /** {@link CumulativeSales} represents the cumulative sales at a certain 
moment of a product. */
+    public static class CumulativeSales {
+        public long productId;
+        public long timestamp;
+        public double sales;
+
+        public CumulativeSales(long productId, long timestamp, double sales) {
+            this.productId = productId;
+            this.timestamp = timestamp;
+            this.sales = sales;
+        }
+
+        @Override
+        public String toString() {
+            return String.format("%d,%d,%.2f", this.productId, this.timestamp, 
this.sales);
+        }
+    }
+
+    /**
+     * Firstly, we define an event time watermark, which represents the time 
of currently processing
+     * event. Since the watermark needs to convey the timestamp, its data type 
is long. To determine
+     * the minimum event time across all watermarks, we utilize the 
combineFunctionMin() method to
+     * combine the watermarks. The default handling strategy is forward, 
meaning that the watermark
+     * will typically be advanced to downstream operators in most scenarios. 
Thus, we create a
+     * WatermarkDeclaration instance that can be used to declare and generate 
the watermark.
+     */
+    public static final LongWatermarkDeclaration 
EVENT_TIME_WATERMARK_DECLARATION =
+            WatermarkDeclarations.newBuilder("EVENT_TIME")
+                    .typeLong()
+                    .combineFunctionMin()
+                    .combineWaitForAllChannels(true)
+                    .defaultHandlingStrategyForward()
+                    .build();
+
+    public static void main(String[] args) throws Exception {
+        // parse the parameters
+        final ParameterTool params = ParameterTool.fromArgs(args);
+        final int parallelism = params.getInt("parallelism", 5);
+
+        // obtain execution environment
+        ExecutionEnvironment env = ExecutionEnvironment.getInstance();
+
+        // Create a source that simulates a message queue with a parallelism 
set to user-specified
+        // {@code parallelism}, indicating
+        // that the consumed topic has {@code parallelism} partitions.
+        // The source will declare and generate offset watermarks.

Review Comment:
   ?



##########
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/dsv2/eventtime/CountNewsClicks.java:
##########
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.dsv2.eventtime;
+
+import org.apache.flink.api.common.serialization.SimpleStringEncoder;
+import org.apache.flink.api.common.state.ListStateDeclaration;
+import org.apache.flink.api.common.state.StateDeclaration;
+import org.apache.flink.api.common.state.StateDeclarations;
+import org.apache.flink.api.common.state.ValueStateDeclaration;
+import org.apache.flink.api.common.typeinfo.TypeDescriptors;
+import org.apache.flink.api.connector.dsv2.WrappedSink;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.datastream.api.ExecutionEnvironment;
+import org.apache.flink.datastream.api.common.Collector;
+import org.apache.flink.datastream.api.context.PartitionedContext;
+import org.apache.flink.datastream.api.context.StateManager;
+import org.apache.flink.datastream.api.extension.eventtime.EventTimeExtension;
+import 
org.apache.flink.datastream.api.extension.eventtime.function.OneInputEventTimeStreamProcessFunction;
+import 
org.apache.flink.datastream.api.extension.eventtime.timer.EventTimeManager;
+import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream;
+import org.apache.flink.streaming.api.functions.sink.PrintSink;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
+import 
org.apache.flink.streaming.examples.dsv2.eventtime.util.CountNewsClicksData;
+import org.apache.flink.util.ParameterTool;
+
+import java.time.Duration;
+import java.util.Set;
+
+/**
+ * This example illustrates how to count the number of clicks on each news at 
1 hour after news
+ * publication.
+ *
+ * <p>The input consists of a series of {@link NewsEvent}s, which fall into 
two categories: news
+ * publish and news click. Each {@link NewsEvent} contains three components: 
the event type, the
+ * news ID and the timestamp. Notably, there is only one event of type {@link 
NewsEventType#PUBLISH}
+ * for each news.
+ *
+ * <p>Usage:
+ *
+ * <ul>
+ *   <li><code>--output &lt;path&gt;</code>The output directory where the Job 
will write the
+ *       results. If no output path is provided, the Job will print the 
results to <code>stdout
+ *       </code>.
+ * </ul>
+ *
+ * <p>This example shows how to:
+ *
+ * <ul>
+ *   <li>Usage of Event Time extension in DataStream API V2
+ * </ul>
+ *
+ * <p>Please note that if you intend to run this example in an IDE, you must 
first add the following
+ * VM options: "--add-opens=java.base/java.util=ALL-UNNAMED". This is 
necessary because the module
+ * system in JDK 17+ restricts some reflection operations.
+ *
+ * <p>Please note that the DataStream API V2 is a new set of APIs, to 
gradually replace the original
+ * DataStream API. It is currently in the experimental stage and is not fully 
available for
+ * production.
+ */
+public class CountNewsClicks {
+
+    /**
+     * The type of {@link NewsEvent}, note that only one event of type {@link 
NewsEventType#PUBLISH}
+     * for each news.
+     */
+    public enum NewsEventType {
+        PUBLISH,
+        CLICK
+    }
+
+    /**
+     * The {@link NewsEvent} represents an event on news, containing the event 
type, news id and the
+     * timestamp.
+     */
+    public static class NewsEvent {
+        public long newsId;
+        public long timestamp;
+        public NewsEventType type;
+    }
+
+    /**
+     * The {@link NewsClicks} represents the number of clicks on news within 
one hour following its
+     * publication.
+     */
+    public static class NewsClicks {
+        public long newsId;
+        public long clicks;
+
+        public NewsClicks(long newsId, long clicks) {
+            this.newsId = newsId;
+            this.clicks = clicks;
+        }
+
+        @Override
+        public String toString() {
+            return String.format("%d,%d", this.newsId, this.clicks);
+        }
+    }
+
+    public static void main(String[] args) throws Exception {
+        // parse the parameters
+        final ParameterTool params = ParameterTool.fromArgs(args);
+        final boolean fileOutput = params.has("output");
+
+        // obtain execution environment
+        ExecutionEnvironment env = ExecutionEnvironment.getInstance();
+
+        // the input consists of a series of {code NewsEvent}s, which include 
two types: news
+        // publish event and news click event.
+        NonKeyedPartitionStream<NewsEvent> source = 
CountNewsClicksData.createNewsEventSource(env);
+
+        NonKeyedPartitionStream<NewsClicks> clicksStream =
+                // extract event time and generate the event time watermark
+                source.process(
+                                // the timestamp field of the input is 
considered to be the
+                                // event time
+                                
EventTimeExtension.<NewsEvent>newWatermarkGeneratorBuilder(
+                                                event -> event.timestamp)
+                                        // generate event time watermarks 
every 200ms
+                                        
.periodicWatermark(Duration.ofMillis(200))
+                                        // if the input is idle for more than 
30 seconds, it
+                                        // is ignored during the event time 
watermark
+                                        // combination process
+                                        .withIdleness(Duration.ofSeconds(30))
+                                        // set the maximum out-of-order time 
of the event to
+                                        // 30 seconds, meaning that if an 
event is received
+                                        // at 12:00:00, then no further events 
should be
+                                        // received earlier than 11:59:30
+                                        
.withMaxOutOfOrderTime(Duration.ofSeconds(10))
+                                        // build the event time watermark 
generator as
+                                        // ProcessFunction
+                                        .buildAsProcessFunction())
+                        // key by the news id
+                        .keyBy(event -> event.newsId)
+                        // count the click number of each news
+                        .process(
+                                EventTimeExtension.wrapProcessFunction(
+                                        new CountNewsClicksProcessFunction()));
+
+        if (fileOutput) {
+            // write results to file
+            clicksStream
+                    .toSink(
+                            new WrappedSink<>(
+                                    FileSink.<NewsClicks>forRowFormat(
+                                                    new 
Path(params.get("output")),
+                                                    new 
SimpleStringEncoder<>())
+                                            .withRollingPolicy(
+                                                    
DefaultRollingPolicy.builder()
+                                                            .withMaxPartSize(
+                                                                    
MemorySize.ofMebiBytes(1))
+                                                            
.withRolloverInterval(
+                                                                    
Duration.ofSeconds(10))
+                                                            .build())
+                                            .build()))
+                    .withName("output");
+        } else {
+            // Print the results to the STDOUT.
+            clicksStream.toSink(new WrappedSink<>(new 
PrintSink<>())).withName("print-sink");
+        }
+
+        env.execute("CountNewsClicks");
+    }
+
+    /**
+     * This process function will consume {@link NewsEvent} and count the 
number of clicks within 1
+     * hour of the news publication and send the results {@link NewsClicks} to 
the output.
+     *
+     * <p>To achieve the goal, we will register an event timer for each news, 
which will be
+     * triggered at the time of the news's release time + 1 hour, and record a 
click count of each
+     * news. In the timer callback {@code onEventTimer}, we will output the 
number of clicks.
+     *
+     * <p>To handle the potential disorder between news publication event and 
click events, we will
+     * use a ListState to store the timestamps of click events that occur 
prior to their
+     * corresponding publication events.
+     *
+     * <p>To handle the potential missing news publication event, we will 
register an event timer
+     * for the first click event and set the timer to the timestamp of the 
click event plus one
+     * hour.
+     */
+    public static class CountNewsClicksProcessFunction
+            implements OneInputEventTimeStreamProcessFunction<NewsEvent, 
NewsClicks> {
+
+        private static final long ONE_HOUR_IN_MS = 
Duration.ofHours(1).toMillis();
+
+        private EventTimeManager eventTimeManager;
+
+        // uses a ValueState to store the publishing time of each news
+        private final ValueStateDeclaration<Long> publishTimeStateDeclaration =
+                StateDeclarations.valueState("publish_time", 
TypeDescriptors.LONG);
+
+        // uses a ValueState to store the click count of each news
+        private final ValueStateDeclaration<Long> clickCountStateDeclaration =
+                StateDeclarations.valueState("click_count", 
TypeDescriptors.LONG);
+
+        // uses a ListState to store the timestamp of pending clicks for each 
news.
+        // If a click {@code NewsEvent} occurs before the corresponding 
publication {@link
+        // NewsEvent}, we cannot process this click event because we do not 
know the publication
+        // time of the news.
+        // Therefore, we store the timestamp of this pending {@code NewsEvent} 
click in a list
+        // state.
+        // Once the publication {@link NewsEvent} arrives, we can proceed to 
process the pending
+        // clicks.
+        private final ListStateDeclaration<Long> pendingClicksStateDeclaration 
=
+                StateDeclarations.listState("pending_clicks", 
TypeDescriptors.LONG);
+
+        // uses a ValueState to indicate whether the news has been processed 
(either outputted or
+        // ignored) in timer callback {@link onEventTimer}
+        private final ValueStateDeclaration<Boolean> processedStateDeclaration 
=
+                StateDeclarations.valueState("processed", 
TypeDescriptors.BOOLEAN);
+
+        // uses a ValueState to indicate whether the timer has been registered
+        private final ValueStateDeclaration<Boolean> 
timerRegisteredStateDeclaration =
+                StateDeclarations.valueState("timer_registered", 
TypeDescriptors.BOOLEAN);
+
+        @Override
+        public void initEventTimeProcessFunction(EventTimeManager 
eventTimeManager) {
+            this.eventTimeManager = eventTimeManager;
+        }
+
+        @Override
+        public Set<StateDeclaration> usesStates() {
+            return Set.of(
+                    publishTimeStateDeclaration,
+                    clickCountStateDeclaration,
+                    pendingClicksStateDeclaration,
+                    processedStateDeclaration,
+                    timerRegisteredStateDeclaration);
+        }
+
+        @Override
+        public void processRecord(
+                NewsEvent record, Collector<NewsClicks> output, 
PartitionedContext<NewsClicks> ctx)
+                throws Exception {
+
+            Boolean processed =
+                    
ctx.getStateManager().getState(processedStateDeclaration).orElseThrow().value();
+            if (processed != null && processed) {
+                // if the news has been processed, ignore this record
+                return;
+            }
+
+            if (record.type == NewsEventType.PUBLISH) {
+                // for the news publication event
+
+                // 1. record the publishing time
+                long publishTime = record.timestamp;
+                ctx.getStateManager()
+                        .getState(publishTimeStateDeclaration)
+                        .orElseThrow()
+                        .update(publishTime);
+
+                // 2. process pending clicks
+                Iterable<Long> pendingClicks =
+                        ctx.getStateManager()
+                                .getState(pendingClicksStateDeclaration)
+                                .orElseThrow()
+                                .get();
+                if (pendingClicks != null) {
+                    long addition = 0;
+                    for (Long clickTime : pendingClicks) {
+                        if (clickTime <= publishTime + ONE_HOUR_IN_MS) {
+                            addition++;
+                        }
+                    }
+                    addClickCount(ctx.getStateManager(), addition);
+                }
+
+                // 3. register timer with the publishing time + 1 hour
+                tryRegisterTimer(ctx.getStateManager(), publishTime + 
ONE_HOUR_IN_MS);
+            } else {
+                // for the news click event
+                Long publishTime =
+                        ctx.getStateManager()
+                                .getState(publishTimeStateDeclaration)
+                                .orElseThrow()
+                                .value();
+                if (publishTime == null) {
+                    // If the news publication event has not yet arrived
+                    // 1. record the click timestamp in the pending clicks 
list.
+                    ctx.getStateManager()
+                            .getState(pendingClicksStateDeclaration)
+                            .orElseThrow()
+                            .add(record.timestamp);
+                    // 2. To avoid the news not being processed for a long 
time when there is no
+                    // publish event, register timer with the click timestamp 
+ 1 hour.
+                    tryRegisterTimer(ctx.getStateManager(), record.timestamp + 
ONE_HOUR_IN_MS);
+                } else if (record.timestamp <= publishTime + ONE_HOUR_IN_MS) {
+                    // If the click event occurs within one hour of the news 
publication, update the
+                    // click count.
+                    addClickCount(ctx.getStateManager(), 1);
+                }
+            }
+        }
+
+        @Override
+        public void onEventTimer(
+                long timestamp, Collector<NewsClicks> output, 
PartitionedContext<NewsClicks> ctx)
+                throws Exception {
+
+            Boolean processed =
+                    
ctx.getStateManager().getState(processedStateDeclaration).orElseThrow().value();
+            if (processed != null && processed) {
+                // if the news has been processed, ignore this record
+                return;
+            }
+
+            Long publishTime =
+                    ctx.getStateManager()
+                            .getState(publishTimeStateDeclaration)
+                            .orElseThrow()
+                            .value();
+
+            if (publishTime == null) {
+                // If the publishing time is null, it indicates that there is 
no publish event for
+                // this news, so it should be ignored.
+                clearStateAndMarkProcessed(ctx.getStateManager());
+                return;
+            }
+
+            // get the news that the current event timer belongs to
+            long newsId = ctx.getStateManager().getCurrentKey();
+            // get the click count of the news
+            Long clickCount =
+                    ctx.getStateManager()
+                            .getState(clickCountStateDeclaration)
+                            .orElseThrow()
+                            .value();
+            clickCount = clickCount == null ? 0 : clickCount;
+
+            // send the result to output
+            output.collect(new NewsClicks(newsId, clickCount));

Review Comment:
   clear state?



##########
flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/DSv2ExamplesITCase.java:
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.test.examples;
+
+import org.apache.flink.streaming.examples.dsv2.eventtime.CountNewsClicks;
+import 
org.apache.flink.streaming.examples.dsv2.eventtime.util.CountNewsClicksData;
+import org.apache.flink.streaming.examples.dsv2.join.util.JoinData;
+import 
org.apache.flink.streaming.examples.dsv2.windowing.util.CountProductSalesWindowingData;
+import org.apache.flink.test.testdata.WordCountData;
+import org.apache.flink.test.util.AbstractTestBase;
+
+import org.junit.jupiter.api.Test;
+
+import static 
org.apache.flink.test.util.TestBaseUtils.compareResultsByLinesInMemory;
+
+/** Integration test for DataStream API V2 examples. */
+class DSv2ExamplesITCase extends AbstractTestBase {
+
+    @Test
+    void testStreamingWordCount() throws Exception {
+        final String textPath = createTempFile("text.txt", WordCountData.TEXT);
+        final String resultPath = getTempDirPath("result");
+
+        org.apache.flink.streaming.examples.dsv2.wordcount.WordCount.main(
+                new String[] {
+                    "--input", textPath,
+                    "--output", resultPath,
+                    "--execution-mode", "streaming"

Review Comment:
   ```suggestion
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to