xintongsong commented on code in PR #26118:
URL: https://github.com/apache/flink/pull/26118#discussion_r1953709967


##########
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/dsv2/eventtime/util/CountNewsClicksData.java:
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.dsv2.eventtime.util;
+
+import org.apache.flink.datastream.api.ExecutionEnvironment;
+import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream;
+import org.apache.flink.streaming.examples.dsv2.eventtime.CountNewsClicks;
+import 
org.apache.flink.streaming.examples.dsv2.eventtime.CountNewsClicks.NewsEvent;
+import org.apache.flink.streaming.examples.dsv2.util.IOUtil;
+
+import java.net.URL;
+import java.util.Objects;
+
+/** Sample data for the {@link CountNewsClicks} example. */
+public class CountNewsClicksData {

Review Comment:
   Should not introduce such a custom class. Instead, we can use file source 
with csv format.



##########
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/dsv2/watermark/CountSales.java:
##########
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.dsv2.watermark;
+
+import org.apache.flink.api.common.state.StateDeclaration;
+import org.apache.flink.api.common.state.StateDeclarations;
+import org.apache.flink.api.common.state.ValueStateDeclaration;
+import org.apache.flink.api.common.state.v2.ValueState;
+import org.apache.flink.api.common.typeinfo.TypeDescriptors;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.watermark.LongWatermark;
+import org.apache.flink.api.common.watermark.LongWatermarkDeclaration;
+import org.apache.flink.api.common.watermark.Watermark;
+import org.apache.flink.api.common.watermark.WatermarkDeclaration;
+import org.apache.flink.api.common.watermark.WatermarkDeclarations;
+import org.apache.flink.api.common.watermark.WatermarkHandlingResult;
+import org.apache.flink.api.connector.dsv2.WrappedSink;
+import org.apache.flink.api.connector.dsv2.WrappedSource;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.connector.datagen.source.DataGeneratorSource;
+import org.apache.flink.connector.datagen.source.GeneratorFunction;
+import org.apache.flink.datastream.api.ExecutionEnvironment;
+import org.apache.flink.datastream.api.common.Collector;
+import org.apache.flink.datastream.api.context.NonPartitionedContext;
+import org.apache.flink.datastream.api.context.PartitionedContext;
+import org.apache.flink.datastream.api.function.ApplyPartitionFunction;
+import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction;
+import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream;
+import org.apache.flink.streaming.api.functions.sink.PrintSink;
+import org.apache.flink.streaming.examples.dsv2.eventtime.CountNewsClicks;
+import org.apache.flink.util.ParameterTool;
+
+import java.time.Duration;
+import java.util.Random;
+import java.util.Set;
+
+/**
+ * This example shows how to count the cumulative sales of each product at the 
moment. In this
+ * example, we simulate the creation and propagation of event time watermarks 
through {@link
+ * Watermark}.
+ *
+ * <p>Please note that Flink provides users with an event time extension to 
support event time. You
+ * can refer to the {@link CountNewsClicks} example.
+ *
+ * <p>The example uses a {@link DataGeneratorSource} as input source and a 
custom {@link Watermark}
+ * to propagate the event time in the stream.
+ *
+ * <p>Usage:
+ *
+ * <ul>
+ *   <li><code>--parallelism &lt;path&gt;</code>The parallelism of the source. 
The default value is
+ *       5.
+ * </ul>
+ *
+ * <p>This example shows how to:
+ *
+ * <ul>
+ *   <li>How to define and propagate custom event (Watermark) in DataStream 
API V2
+ * </ul>
+ *
+ * <p>Please note that if you intend to run this example in an IDE, you must 
first add the following
+ * VM options: "--add-opens=java.base/java.util=ALL-UNNAMED". This is 
necessary because the module
+ * system in JDK 17+ restricts some reflection operations.
+ *
+ * <p>Please note that the DataStream API V2 is a new set of APIs, to 
gradually replace the original
+ * DataStream API. It is currently in the experimental stage and is not fully 
available for
+ * production.
+ */
+public class CountSales {
+
+    /** Pojo class for Order. */
+    public static class Order {
+        public long productId;
+        public double totalPrice;
+        public long timestamp;
+
+        public Order(long productId, double price, long timestamp) {
+            this.productId = productId;
+            this.totalPrice = price;
+            this.timestamp = timestamp;
+        }
+    }
+
+    /** {@link CumulativeSales} represents the cumulative sales at a certain 
moment of a product. */
+    public static class CumulativeSales {
+        public long productId;
+        public long timestamp;
+        public double sales;
+
+        public CumulativeSales(long productId, long timestamp, double sales) {
+            this.productId = productId;
+            this.timestamp = timestamp;
+            this.sales = sales;
+        }
+
+        @Override
+        public String toString() {
+            return String.format("%d,%d,%.2f", this.productId, this.timestamp, 
this.sales);
+        }
+    }
+
+    /**
+     * Firstly, we define an event time watermark, which represents the time 
of currently processing
+     * event. Since the watermark needs to convey the timestamp, its data type 
is long. To determine
+     * the minimum event time across all watermarks, we utilize the 
combineFunctionMin() method to
+     * combine the watermarks. The default handling strategy is forward, 
meaning that the watermark
+     * will typically be advanced to downstream operators in most scenarios. 
Thus, we create a
+     * WatermarkDeclaration instance that can be used to declare and generate 
the watermark.
+     */
+    public static final LongWatermarkDeclaration 
EVENT_TIME_WATERMARK_DECLARATION =
+            WatermarkDeclarations.newBuilder("EVENT_TIME")
+                    .typeLong()
+                    .combineFunctionMin()
+                    .combineWaitForAllChannels(true)
+                    .defaultHandlingStrategyForward()
+                    .build();
+
+    public static void main(String[] args) throws Exception {
+        // parse the parameters
+        final ParameterTool params = ParameterTool.fromArgs(args);
+        final int parallelism = params.getInt("parallelism", 5);
+
+        // obtain execution environment
+        ExecutionEnvironment env = ExecutionEnvironment.getInstance();
+
+        // Create the Order source, the source will declare and generate event 
time watermarks.
+        NonKeyedPartitionStream<Order> source =
+                env.fromSource(new WrappedSource<>(new OrderSource()), "order 
source")
+                        .withParallelism(parallelism);
+
+        source
+                // key by product id
+                .keyBy(order -> order.productId)
+                .process(
+                        // handle event time watermark in downstream
+                        new CountSalesProcessFunction())
+                .toSink(new WrappedSink<>(new PrintSink<>()));
+
+        // execute program
+        env.execute("Count Sales");
+    }
+
+    /** Source of Orders. We will declare and generate the event time 
watermark in this source. */
+    private static class OrderSource extends DataGeneratorSource<Order> {
+
+        public OrderSource() {
+            super(new CustomGeneratorFunction(), 1_000_000L, 
TypeInformation.of(Order.class));
+        }
+
+        @Override
+        public Set<? extends WatermarkDeclaration> declareWatermarks() {
+            // Declare the event time watermark.
+            return Set.of(EVENT_TIME_WATERMARK_DECLARATION);
+        }
+    }
+
+    /**
+     * Generator function for the Order source. We will generate event time 
watermark every second
+     * within this generator function.
+     */
+    private static class CustomGeneratorFunction implements 
GeneratorFunction<Long, Order> {
+
+        private final Random random = new Random();
+
+        private SourceReaderContext readerContext;
+
+        private long previousTimestamp = 0;
+
+        @Override
+        public void open(SourceReaderContext readerContext) throws Exception {
+            this.readerContext = readerContext;
+        }
+
+        @Override
+        public Order map(Long value) throws Exception {
+            long productId = random.nextLong(1, 100);
+            double price = random.nextDouble(1, 1000);

Review Comment:
   These random generating apis are introduced in Java 17. There will be 
compiling errors on Java 11.



##########
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/dsv2/wordcount/WordCount.java:
##########
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.dsv2.wordcount;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.common.serialization.SimpleStringEncoder;
+import org.apache.flink.api.common.state.StateDeclaration;
+import org.apache.flink.api.common.state.StateDeclarations;
+import org.apache.flink.api.common.state.ValueStateDeclaration;
+import org.apache.flink.api.common.typeinfo.TypeDescriptors;
+import org.apache.flink.api.connector.dsv2.DataStreamV2SourceUtils;
+import org.apache.flink.api.connector.dsv2.WrappedSink;
+import org.apache.flink.api.connector.dsv2.WrappedSource;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.connector.file.src.FileSource;
+import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.datastream.api.ExecutionEnvironment;
+import org.apache.flink.datastream.api.common.Collector;
+import org.apache.flink.datastream.api.context.PartitionedContext;
+import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction;
+import org.apache.flink.datastream.api.stream.KeyedPartitionStream;
+import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream;
+import org.apache.flink.streaming.api.functions.sink.PrintSink;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
+import org.apache.flink.streaming.examples.wordcount.util.WordCountData;
+import org.apache.flink.util.ParameterTool;
+import org.apache.flink.util.TimeUtils;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Set;
+
+/**
+ * Implements the "WordCount" program by DataStream API V2 that computes a 
simple word occurrence
+ * histogram over text files. The job will currently be executed in streaming 
mode, and will support
+ * batch mode execution in the future.
+ *
+ * <p>The input is a [list of] plain text file[s] with lines separated by a 
newline character.
+ *
+ * <p>Usage:
+ *
+ * <ul>
+ *   <li><code>--input &lt;path&gt;</code>A list of input files and / or 
directories to read. If no
+ *       input is provided, the program is run with default data from {@link 
WordCountData}.
+ *   <li><code>--discovery-interval &lt;duration&gt;</code>Turns the file 
reader into a continuous
+ *       source that will monitor the provided input directories every 
interval and read any new
+ *       files.
+ *   <li><code>--output &lt;path&gt;</code>The output directory where the Job 
will write the
+ *       results. If no output path is provided, the Job will print the 
results to <code>stdout
+ *       </code>.
+ * </ul>
+ *
+ * <p>This example shows how to:
+ *
+ * <ul>
+ *   <li>Write a simple Flink program by DataStream API V2
+ *   <li>Use tuple data types
+ *   <li>Write and use a user-defined process function
+ * </ul>
+ *
+ * <p>Please note that if you intend to run this example in an IDE, you must 
first add the following
+ * VM options: "--add-opens=java.base/java.util=ALL-UNNAMED". This is 
necessary because the module
+ * system in JDK 17+ restricts some reflection operations.
+ *
+ * <p>Please note that the DataStream API V2 is a new set of APIs, to 
gradually replace the original
+ * DataStream API. It is currently in the experimental stage and is not fully 
available for
+ * production.
+ */
+public class WordCount {
+
+    // 
*************************************************************************
+    // PROGRAM
+    // 
*************************************************************************
+
+    public static void main(String[] args) throws Exception {
+        // parse the parameters
+        final ParameterTool params = ParameterTool.fromArgs(args);
+
+        // Get the execution environment instance. This is the main entrypoint
+        // to building a Flink application.
+        final ExecutionEnvironment env = ExecutionEnvironment.getInstance();
+
+        // Apache Flink’s unified approach to stream and batch processing 
means that a DataStream
+        // application executed over bounded input will produce the same final 
results regardless
+        // of the configured execution mode. It is important to note what 
final means here: a job
+        // executing in STREAMING mode might produce incremental updates 
(think upserts in
+        // a database) while in BATCH mode, it would only produce one final 
result at the end. The
+        // final result will be the same if interpreted correctly, but getting 
there can be
+        // different.
+        //
+        // The “classic” execution behavior of the DataStream API is called 
STREAMING execution
+        // mode. Applications should use streaming execution for unbounded 
jobs that require
+        // continuous incremental processing and are expected to stay online 
indefinitely.
+        //
+        // By enabling BATCH execution, we allow Flink to apply additional 
optimizations that we
+        // can only do when we know that our input is bounded. For example, 
different
+        // join/aggregation strategies can be used, in addition to a different 
shuffle
+        // implementation that allows more efficient task scheduling and 
failure recovery behavior.
+        //
+        // By setting the runtime mode to AUTOMATIC, Flink will choose BATCH 
if all sources
+        // are bounded and otherwise STREAMING.
+        //
+        // This job will currently be executed in streaming mode, and will 
support batch mode
+        // execution in the future.

Review Comment:
   Let's use `// TODO` and points to the jira ticket that blocks supporting 
batch mode.



##########
flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/function/OneInputEventTimeStreamProcessFunction.java:
##########
@@ -43,5 +43,6 @@ default void onEventTimeWatermark(
      * Invoked when an event-time timer fires. Note that it is only used in 
{@link
      * KeyedPartitionStream}.
      */
-    default void onEventTimer(long timestamp, Collector<OUT> output, 
PartitionedContext<OUT> ctx) {}
+    default void onEventTimer(long timestamp, Collector<OUT> output, 
PartitionedContext<OUT> ctx)
+            throws Exception {}

Review Comment:
   I think throwing Exceptions is currently the only way for a process function 
to tell the framework to fail the task. That reminds me that we should add the 
throwing signature to wherever a process function may want to fail. E.g., 
endInput / onProcessingTimer / onWatermark, etc.



##########
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/dsv2/eventtime/CountNewsClicks.java:
##########
@@ -0,0 +1,384 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.dsv2.eventtime;
+
+import org.apache.flink.api.common.serialization.SimpleStringEncoder;
+import org.apache.flink.api.common.state.ListStateDeclaration;
+import org.apache.flink.api.common.state.StateDeclaration;
+import org.apache.flink.api.common.state.StateDeclarations;
+import org.apache.flink.api.common.state.ValueStateDeclaration;
+import org.apache.flink.api.common.typeinfo.TypeDescriptors;
+import org.apache.flink.api.connector.dsv2.WrappedSink;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.datastream.api.ExecutionEnvironment;
+import org.apache.flink.datastream.api.common.Collector;
+import org.apache.flink.datastream.api.context.PartitionedContext;
+import org.apache.flink.datastream.api.context.StateManager;
+import org.apache.flink.datastream.api.extension.eventtime.EventTimeExtension;
+import 
org.apache.flink.datastream.api.extension.eventtime.function.OneInputEventTimeStreamProcessFunction;
+import 
org.apache.flink.datastream.api.extension.eventtime.timer.EventTimeManager;
+import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream;
+import org.apache.flink.streaming.api.functions.sink.PrintSink;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
+import 
org.apache.flink.streaming.examples.dsv2.eventtime.util.CountNewsClicksData;
+import org.apache.flink.util.ParameterTool;
+
+import java.time.Duration;
+import java.util.Set;
+
+/**
+ * This example illustrates how to count the number of clicks on each news at 
1 hour after news
+ * publication.
+ *
+ * <p>The input consists of a series of {@link NewsEvent}s, which fall into 
two categories: news
+ * publish and news click. Each {@link NewsEvent} contains three components: 
the event type, the
+ * news ID and the timestamp. Notably, there is only one event of type {@link 
NewsEventType#PUBLISH}
+ * for each news.
+ *
+ * <p>Usage:
+ *
+ * <ul>
+ *   <li><code>--output &lt;path&gt;</code>The output directory where the Job 
will write the
+ *       results. If no output path is provided, the Job will print the 
results to <code>stdout
+ *       </code>.
+ * </ul>
+ *
+ * <p>This example shows how to:
+ *
+ * <ul>
+ *   <li>Usage of Event Time extension in DataStream API V2
+ * </ul>
+ *
+ * <p>Please note that if you intend to run this example in an IDE, you must 
first add the following
+ * VM options: "--add-opens=java.base/java.util=ALL-UNNAMED". This is 
necessary because the module
+ * system in JDK 17+ restricts some reflection operations.
+ *
+ * <p>Please note that the DataStream API V2 is a new set of APIs, to 
gradually replace the original
+ * DataStream API. It is currently in the experimental stage and is not fully 
available for
+ * production.
+ */
+public class CountNewsClicks {
+
+    /**
+     * The type of {@link NewsEvent}, note that only one event of type {@link 
NewsEventType#PUBLISH}
+     * for each news.
+     */
+    public enum NewsEventType {
+        PUBLISH,
+        CLICK
+    }
+
+    /**
+     * The {@link NewsEvent} represents an event on news, containing the event 
type, news id and the
+     * timestamp.
+     */
+    public static class NewsEvent {
+        public long newsId;
+        public long timestamp;
+        public NewsEventType type;
+    }
+
+    /**
+     * The {@link NewsClicks} represents the number of clicks on news within 
one hour following its
+     * publication.
+     */
+    public static class NewsClicks {
+        public long newsId;
+        public long clicks;
+
+        public NewsClicks(long newsId, long clicks) {
+            this.newsId = newsId;
+            this.clicks = clicks;
+        }
+
+        @Override
+        public String toString() {
+            return String.format("%d,%d", this.newsId, this.clicks);
+        }
+    }
+
+    public static void main(String[] args) throws Exception {
+        // parse the parameters
+        final ParameterTool params = ParameterTool.fromArgs(args);
+        final boolean fileOutput = params.has("output");
+
+        // obtain execution environment
+        ExecutionEnvironment env = ExecutionEnvironment.getInstance();
+
+        // the input consists of a series of {code NewsEvent}s, which include 
two types: news
+        // publish event and news click event.
+        NonKeyedPartitionStream<NewsEvent> source = 
CountNewsClicksData.createNewsEventSource(env);
+
+        NonKeyedPartitionStream<NewsClicks> clicksStream =
+                // extract event time and generate the event time watermark
+                source.process(
+                                // the timestamp field of the input is 
considered to be the
+                                // event time
+                                
EventTimeExtension.<NewsEvent>newWatermarkGeneratorBuilder(
+                                                event -> event.timestamp)
+                                        // generate event time watermarks 
every 200ms
+                                        
.periodicWatermark(Duration.ofMillis(200))
+                                        // if the input is idle for more than 
30 seconds, it
+                                        // is ignored during the event time 
watermark
+                                        // combination process
+                                        .withIdleness(Duration.ofSeconds(30))
+                                        // set the maximum out-of-order time 
of the event to
+                                        // 30 seconds, meaning that if an 
event is received
+                                        // at 12:00:00, then no further events 
should be
+                                        // received earlier than 11:59:30
+                                        
.withMaxOutOfOrderTime(Duration.ofSeconds(10))
+                                        // build the event time watermark 
generator as
+                                        // ProcessFunction
+                                        .buildAsProcessFunction())
+                        // key by the news id
+                        .keyBy(event -> event.newsId)
+                        // count the click number of each news
+                        .process(
+                                EventTimeExtension.wrapProcessFunction(
+                                        new CountNewsClicksProcessFunction()));
+
+        if (fileOutput) {
+            // write results to file
+            clicksStream
+                    .toSink(
+                            new WrappedSink<>(
+                                    FileSink.<NewsClicks>forRowFormat(
+                                                    new 
Path(params.get("output")),
+                                                    new 
SimpleStringEncoder<>())
+                                            .withRollingPolicy(
+                                                    
DefaultRollingPolicy.builder()
+                                                            .withMaxPartSize(
+                                                                    
MemorySize.ofMebiBytes(1))
+                                                            
.withRolloverInterval(
+                                                                    
Duration.ofSeconds(10))
+                                                            .build())
+                                            .build()))
+                    .withName("output");
+        } else {
+            // Print the results to the STDOUT.
+            clicksStream.toSink(new WrappedSink<>(new 
PrintSink<>())).withName("print-sink");
+        }
+
+        env.execute("CountNewsClicks");
+    }
+
+    /**
+     * This process function will consume {@link NewsEvent} and count the 
number of clicks within 1
+     * hour of the news publication and send the results {@link NewsClicks} to 
the output.
+     *
+     * <p>To achieve the goal, we will register an event timer for each news, 
which will be
+     * triggered at the time of the news's release time + 1 hour, and record a 
click count of each
+     * news. In the timer callback {@code onEventTimer}, we will output the 
number of clicks.
+     *
+     * <p>To handle the potential disorder between news publication event and 
click events, we will
+     * use a ListState to store the timestamps of click events that occur 
prior to their
+     * corresponding publication events.
+     *
+     * <p>To handle the potential missing news publication event, we will 
register an event timer
+     * for the first click event and set the timer to the timestamp of the 
click event plus one
+     * hour.
+     */
+    public static class CountNewsClicksProcessFunction
+            implements OneInputEventTimeStreamProcessFunction<NewsEvent, 
NewsClicks> {
+
+        private static final long ONE_HOUR_IN_MS = 
Duration.ofHours(1).toMillis();
+
+        private EventTimeManager eventTimeManager;
+
+        // uses a ValueState to store the publishing time of each news
+        private final ValueStateDeclaration<Long> publishTimeStateDeclaration =
+                StateDeclarations.valueState("publish_time", 
TypeDescriptors.LONG);
+
+        // uses a ValueState to store the click count of each news
+        private final ValueStateDeclaration<Long> clickCountStateDeclaration =
+                StateDeclarations.valueState("click_count", 
TypeDescriptors.LONG);
+
+        // uses a ListState to store the timestamp of pending clicks for each 
news.
+        // If a click {@code NewsEvent} occurs before the corresponding 
publication {@link
+        // NewsEvent}, we cannot process this click event because we do not 
know the publication
+        // time of the news.
+        // Therefore, we store the timestamp of this pending {@code NewsEvent} 
click in a list
+        // state.
+        // Once the publication {@link NewsEvent} arrives, we can proceed to 
process the pending
+        // clicks.
+        private final ListStateDeclaration<Long> pendingClicksStateDeclaration 
=
+                StateDeclarations.listState("pending_clicks", 
TypeDescriptors.LONG);
+
+        // uses a ValueState to indicate whether the news has been processed 
(either outputted or
+        // ignored) in timer callback {@link onEventTimer}
+        private final ValueStateDeclaration<Boolean> processedStateDeclaration 
=
+                StateDeclarations.valueState("processed", 
TypeDescriptors.BOOLEAN);
+
+        // uses a ValueState to indicate whether the timer has been registered
+        private final ValueStateDeclaration<Boolean> 
timerRegisteredStateDeclaration =
+                StateDeclarations.valueState("timer_registered", 
TypeDescriptors.BOOLEAN);

Review Comment:
   I think we don't need these two states.
   1. We can compare the publish time with current event time when receiving 
the publish event. If it's too late, we can just discard it.
   2. What's the problem of having multiple timer?



##########
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/dsv2/wordcount/WordCount.java:
##########
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.dsv2.wordcount;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.common.serialization.SimpleStringEncoder;
+import org.apache.flink.api.common.state.StateDeclaration;
+import org.apache.flink.api.common.state.StateDeclarations;
+import org.apache.flink.api.common.state.ValueStateDeclaration;
+import org.apache.flink.api.common.typeinfo.TypeDescriptors;
+import org.apache.flink.api.connector.dsv2.DataStreamV2SourceUtils;
+import org.apache.flink.api.connector.dsv2.WrappedSink;
+import org.apache.flink.api.connector.dsv2.WrappedSource;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.connector.file.src.FileSource;
+import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.datastream.api.ExecutionEnvironment;
+import org.apache.flink.datastream.api.common.Collector;
+import org.apache.flink.datastream.api.context.PartitionedContext;
+import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction;
+import org.apache.flink.datastream.api.stream.KeyedPartitionStream;
+import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream;
+import org.apache.flink.streaming.api.functions.sink.PrintSink;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
+import org.apache.flink.streaming.examples.wordcount.util.WordCountData;
+import org.apache.flink.util.ParameterTool;
+import org.apache.flink.util.TimeUtils;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Set;
+
+/**
+ * Implements the "WordCount" program by DataStream API V2 that computes a 
simple word occurrence
+ * histogram over text files. The job will currently be executed in streaming 
mode, and will support
+ * batch mode execution in the future.
+ *
+ * <p>The input is a [list of] plain text file[s] with lines separated by a 
newline character.
+ *
+ * <p>Usage:
+ *
+ * <ul>
+ *   <li><code>--input &lt;path&gt;</code>A list of input files and / or 
directories to read. If no
+ *       input is provided, the program is run with default data from {@link 
WordCountData}.
+ *   <li><code>--discovery-interval &lt;duration&gt;</code>Turns the file 
reader into a continuous
+ *       source that will monitor the provided input directories every 
interval and read any new
+ *       files.
+ *   <li><code>--output &lt;path&gt;</code>The output directory where the Job 
will write the
+ *       results. If no output path is provided, the Job will print the 
results to <code>stdout
+ *       </code>.
+ * </ul>
+ *
+ * <p>This example shows how to:
+ *
+ * <ul>
+ *   <li>Write a simple Flink program by DataStream API V2
+ *   <li>Use tuple data types
+ *   <li>Write and use a user-defined process function
+ * </ul>
+ *
+ * <p>Please note that if you intend to run this example in an IDE, you must 
first add the following
+ * VM options: "--add-opens=java.base/java.util=ALL-UNNAMED". This is 
necessary because the module
+ * system in JDK 17+ restricts some reflection operations.
+ *
+ * <p>Please note that the DataStream API V2 is a new set of APIs, to 
gradually replace the original
+ * DataStream API. It is currently in the experimental stage and is not fully 
available for
+ * production.
+ */
+public class WordCount {
+
+    // 
*************************************************************************
+    // PROGRAM
+    // 
*************************************************************************
+
+    public static void main(String[] args) throws Exception {
+        // parse the parameters
+        final ParameterTool params = ParameterTool.fromArgs(args);
+
+        // Get the execution environment instance. This is the main entrypoint
+        // to building a Flink application.
+        final ExecutionEnvironment env = ExecutionEnvironment.getInstance();
+
+        // Apache Flink’s unified approach to stream and batch processing 
means that a DataStream
+        // application executed over bounded input will produce the same final 
results regardless
+        // of the configured execution mode. It is important to note what 
final means here: a job
+        // executing in STREAMING mode might produce incremental updates 
(think upserts in
+        // a database) while in BATCH mode, it would only produce one final 
result at the end. The
+        // final result will be the same if interpreted correctly, but getting 
there can be
+        // different.
+        //
+        // The “classic” execution behavior of the DataStream API is called 
STREAMING execution
+        // mode. Applications should use streaming execution for unbounded 
jobs that require
+        // continuous incremental processing and are expected to stay online 
indefinitely.
+        //
+        // By enabling BATCH execution, we allow Flink to apply additional 
optimizations that we
+        // can only do when we know that our input is bounded. For example, 
different
+        // join/aggregation strategies can be used, in addition to a different 
shuffle
+        // implementation that allows more efficient task scheduling and 
failure recovery behavior.
+        //
+        // By setting the runtime mode to AUTOMATIC, Flink will choose BATCH 
if all sources
+        // are bounded and otherwise STREAMING.
+        //
+        // This job will currently be executed in streaming mode, and will 
support batch mode
+        // execution in the future.
+        env.setExecutionMode(RuntimeExecutionMode.STREAMING);
+
+        NonKeyedPartitionStream<String> text;
+
+        if (params.has("input")) {
+            // Create a new file source that will read files from a given set 
of directories.
+            // Each file will be processed as plain text and split based on 
newlines.
+            FileSource.FileSourceBuilder<String> builder =
+                    FileSource.forRecordStreamFormat(
+                            new TextLineInputFormat(), new 
Path(params.get("input")));
+
+            // If a discovery interval is provided, the source will
+            // continuously watch the given directories for new files.
+            if (params.has("discovery-interval")) {
+                Duration discoveryInterval =
+                        
TimeUtils.parseDuration(params.get("discovery-interval"));
+                builder.monitorContinuously(discoveryInterval);
+            }
+
+            text = env.fromSource(new WrappedSource<>(builder.build()), 
"file-input");
+        } else {
+            // Create a new from data source with default data {@code 
WordCountData}.
+            text =
+                    env.fromSource(
+                            
DataStreamV2SourceUtils.fromData(Arrays.asList(WordCountData.WORDS)),
+                            "in-memory-input");
+        }
+
+        KeyedPartitionStream<String, Tuple2<String, Integer>> keyedStream =
+                // The text lines read from the source are split into words
+                // using a user-defined process function. The tokenizer, 
implemented below,
+                // will output each word as a (2-tuple) containing (word, 1)
+                text.process(new Tokenizer())
+                        .withName("tokenizer")
+                        // keyBy groups tuples based on the first field, the 
word.
+                        // Using a keyBy allows performing aggregations and 
other
+                        // stateful transformations over data on a per-key 
basis.
+                        // This is similar to a GROUP BY clause in a SQL query.
+                        .keyBy(value -> value.f0);
+
+        // For each key, we perform a simple sum of the second field, the 
count by user-defined
+        // Counter process function. It will continuously output updates each 
time it sees
+        // a new instance of each word in the stream.
+        // Flink will provide more convenient built-in functions, such as 
aggregation, in the
+        // future. This will
+        // allow users to avoid having to write their own Counter process
+        // functions in this application.
+        NonKeyedPartitionStream<Tuple2<String, Integer>> counts =
+                keyedStream.process(new Counter());
+
+        if (params.has("output")) {
+            // Given an output directory, Flink will write the results to a 
file
+            // using a simple string encoding. In a production environment, 
this might
+            // be something more structured like CSV, Avro, JSON, or Parquet.
+            counts.toSink(
+                            new WrappedSink<>(
+                                    FileSink.<Tuple2<String, 
Integer>>forRowFormat(
+                                                    new 
Path(params.get("output")),
+                                                    new 
SimpleStringEncoder<>())
+                                            .withRollingPolicy(
+                                                    
DefaultRollingPolicy.builder()
+                                                            .withMaxPartSize(
+                                                                    
MemorySize.ofMebiBytes(1))
+                                                            
.withRolloverInterval(
+                                                                    
Duration.ofSeconds(10))
+                                                            .build())
+                                            .build()))
+                    .withName("file-sink");
+        } else {
+            // Print the results to the STDOUT.
+            counts.toSink(new WrappedSink<>(new 
PrintSink<>())).withName("print-sink");
+        }
+
+        // Apache Flink applications are composed lazily. Calling execute
+        // submits the Job and begins processing.
+        env.execute("WordCount");
+    }
+
+    // 
*************************************************************************
+    // USER PROCESS FUNCTIONS
+    // 
*************************************************************************
+
+    /**
+     * Implements the string tokenizer that splits sentences into words as a 
user-defined
+     * ProcessFunction. The process function takes a line (String) and splits 
it into multiple pairs
+     * in the form of "(word,1)" ({@code Tuple2<String, Integer>}).
+     */
+    public static final class Tokenizer
+            implements OneInputStreamProcessFunction<String, Tuple2<String, 
Integer>> {
+
+        @Override
+        public void processRecord(
+                String record,
+                Collector<Tuple2<String, Integer>> output,
+                PartitionedContext<Tuple2<String, Integer>> ctx)
+                throws Exception {
+            // normalize and split the line
+            String[] tokens = record.toLowerCase().split("\\W+");
+
+            // emit the pairs
+            for (String token : tokens) {
+                if (!token.isEmpty()) {
+                    output.collect(new Tuple2<>(token, 1));
+                }
+            }
+        }
+    }
+
+    /**
+     * Implements a word counter as a user-defined ProcessFunction that counts 
received words in
+     * streaming mode. The function uses a ValueState to store the count of 
each word, it will
+     * update the count of word and output the result when receive a record 
"(word,1)".
+     *
+     * <p>Note that this is just an example of how to code a streaming job 
using the DataStream API
+     * V2. It currently involves some complexity. In the future, we will 
provide more user-friendly
+     * APIs and extensions to simplify the process.
+     */
+    public static final class Counter
+            implements OneInputStreamProcessFunction<
+                    Tuple2<String, Integer>, Tuple2<String, Integer>> {
+
+        // uses a ValueState to store the count of each word
+        private final ValueStateDeclaration<Integer> countStateDeclaration =
+                StateDeclarations.valueState("count", TypeDescriptors.INT);
+
+        @Override
+        public Set<StateDeclaration> usesStates() {
+            // declare a ValueState to store the count of each word
+            return Set.of(countStateDeclaration);
+        }
+
+        @Override
+        public void processRecord(
+                Tuple2<String, Integer> record,
+                Collector<Tuple2<String, Integer>> output,
+                PartitionedContext<Tuple2<String, Integer>> ctx)
+                throws Exception {
+            // calculate the new count of the word
+            String word = record.f0;
+            Integer count = record.f1;
+            Integer previousCount =
+                    
ctx.getStateManager().getState(countStateDeclaration).orElseThrow().value();

Review Comment:
   `orElseThrow` should include an explicit error message



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to