xintongsong commented on code in PR #26118:
URL: https://github.com/apache/flink/pull/26118#discussion_r1948044491


##########
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/dsv2/wordcount/WordCount.java:
##########
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.dsv2.wordcount;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.common.serialization.SimpleStringEncoder;
+import org.apache.flink.api.common.state.StateDeclaration;
+import org.apache.flink.api.common.state.StateDeclarations;
+import org.apache.flink.api.common.state.ValueStateDeclaration;
+import org.apache.flink.api.common.typeinfo.TypeDescriptors;
+import org.apache.flink.api.connector.dsv2.DataStreamV2SourceUtils;
+import org.apache.flink.api.connector.dsv2.WrappedSink;
+import org.apache.flink.api.connector.dsv2.WrappedSource;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.connector.file.src.FileSource;
+import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
+import org.apache.flink.datastream.api.ExecutionEnvironment;
+import org.apache.flink.datastream.api.common.Collector;
+import org.apache.flink.datastream.api.context.PartitionedContext;
+import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction;
+import org.apache.flink.datastream.api.stream.KeyedPartitionStream;
+import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream;
+import org.apache.flink.streaming.api.functions.sink.PrintSink;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
+import org.apache.flink.streaming.examples.wordcount.util.CLI;
+import org.apache.flink.streaming.examples.wordcount.util.WordCountData;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Set;
+
+/**
+ * Implements the "WordCount" program by DataStream API V2 that computes a 
simple word occurrence
+ * histogram over text files. This Job can be executed in both streaming and 
batch execution modes.
+ *
+ * <p>The input is a [list of] plain text file[s] with lines separated by a 
newline character.
+ *
+ * <p>Usage:
+ *
+ * <ul>
+ *   <li><code>--input &lt;path&gt;</code>A list of input files and / or 
directories to read. If no
+ *       input is provided, the program is run with default data from {@link 
WordCountData}.
+ *   <li><code>--discovery-interval &lt;duration&gt;</code>Turns the file 
reader into a continuous
+ *       source that will monitor the provided input directories every 
interval and read any new
+ *       files.
+ *   <li><code>--output &lt;path&gt;</code>The output directory where the Job 
will write the
+ *       results. If no output path is provided, the Job will print the 
results to <code>stdout
+ *       </code>.
+ *   <li><code>--execution-mode &lt;mode&gt;</code>The execution mode (BATCH, 
STREAMING, or
+ *       AUTOMATIC) of this pipeline.
+ * </ul>
+ *
+ * <p>This example shows how to:
+ *
+ * <ul>
+ *   <li>Write a simple Flink program by DataStream API V2
+ *   <li>Use tuple data types
+ *   <li>Write and use a user-defined process function
+ * </ul>
+ *
+ * <p>Please note that if you intend to run this example in an IDE, you must 
first add the following
+ * VM options: "--add-opens=java.base/java.util=ALL-UNNAMED". This is 
necessary because the module
+ * system in JDK 17+ restricts some reflection operations.
+ *
+ * <p>Please note that the DataStream API V2 is a new set of APIs, to 
gradually replace the original
+ * DataStream API. It is currently in the experimental stage and is not fully 
available for
+ * production.
+ */
+public class WordCount {
+
+    // 
*************************************************************************
+    // PROGRAM
+    // 
*************************************************************************
+
+    public static void main(String[] args) throws Exception {
+        final CLI params = CLI.fromArgs(args);
+
+        // Get the execution environment instance. This is the main entrypoint
+        // to building a Flink application.
+        final ExecutionEnvironment env = ExecutionEnvironment.getInstance();
+
+        // Apache Flink’s unified approach to stream and batch processing 
means that a DataStream
+        // application executed over bounded input will produce the same final 
results regardless
+        // of the configured execution mode. It is important to note what 
final means here: a job
+        // executing in STREAMING mode might produce incremental updates 
(think upserts in
+        // a database) while in BATCH mode, it would only produce one final 
result at the end. The
+        // final result will be the same if interpreted correctly, but getting 
there can be
+        // different.
+        //
+        // The “classic” execution behavior of the DataStream API is called 
STREAMING execution
+        // mode. Applications should use streaming execution for unbounded 
jobs that require
+        // continuous incremental processing and are expected to stay online 
indefinitely.
+        //
+        // By enabling BATCH execution, we allow Flink to apply additional 
optimizations that we
+        // can only do when we know that our input is bounded. For example, 
different
+        // join/aggregation strategies can be used, in addition to a different 
shuffle
+        // implementation that allows more efficient task scheduling and 
failure recovery behavior.
+        //
+        // By setting the runtime mode to AUTOMATIC, Flink will choose BATCH 
if all sources
+        // are bounded and otherwise STREAMING.
+        env.setExecutionMode(params.getExecutionMode());
+
+        NonKeyedPartitionStream<String> text;
+
+        if (params.getInputs().isPresent()) {
+            // Create a new file source that will read files from a given set 
of directories.
+            // Each file will be processed as plain text and split based on 
newlines.
+            FileSource.FileSourceBuilder<String> builder =
+                    FileSource.forRecordStreamFormat(
+                            new TextLineInputFormat(), 
params.getInputs().get());
+
+            // If a discovery interval is provided, the source will
+            // continuously watch the given directories for new files.
+            
params.getDiscoveryInterval().ifPresent(builder::monitorContinuously);
+
+            text = env.fromSource(new WrappedSource<>(builder.build()), 
"file-input");
+        } else {
+            // Create a new from data source with default data {@code 
WordCountData}.
+            text =
+                    env.fromSource(
+                            
DataStreamV2SourceUtils.fromData(Arrays.asList(WordCountData.WORDS)),
+                            "in-memory-input");
+        }
+
+        KeyedPartitionStream<String, Tuple2<String, Integer>> keyedStream =
+                // The text lines read from the source are split into words
+                // using a user-defined process function. The tokenizer, 
implemented below,
+                // will output each word as a (2-tuple) containing (word, 1)
+                text.process(new Tokenizer())
+                        .withName("tokenizer")
+                        // keyBy groups tuples based on the first field, the 
word.
+                        // Using a keyBy allows performing aggregations and 
other
+                        // stateful transformations over data on a per-key 
basis.
+                        // This is similar to a GROUP BY clause in a SQL query.
+                        .keyBy(value -> value.f0);
+
+        // For each key, we perform a simple sum of the second field, the 
count by user-defined
+        // StreamingCounter/BatchCounter process function.
+        // If the execution mode is streaming, it will continuously output 
updates each time it sees
+        // a new instance of each word in the stream.
+        // If the execution mode is batch, sum will output a final count for
+        // each word.
+        // Flink will provide more built-in functions, such as aggregation, in 
the future. This will
+        // allow users to avoid having to write their own StreamingCounter or 
BatchCounter process
+        // functions in this application.
+        NonKeyedPartitionStream<Tuple2<String, Integer>> counts;
+        if (params.getExecutionMode() == RuntimeExecutionMode.STREAMING) {
+            counts = keyedStream.process(new StreamingCounter());
+        } else {
+            counts = keyedStream.process(new BatchCounter());
+        }

Review Comment:
   We should not have different process functions for streaming / batch. 
Instead, we can use `context.getJobInfo().getExecutionMode()` to get the 
execution mode in `ProcessFunction.open()`, to make the process function behave 
differently in streaming / batch modes.



##########
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/dsv2/eventtime/StatisticNewsClickNumber.java:
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.dsv2.eventtime;
+
+import org.apache.flink.api.common.serialization.SimpleStringEncoder;
+import org.apache.flink.api.connector.dsv2.DataStreamV2SourceUtils;
+import org.apache.flink.api.connector.dsv2.WrappedSink;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.datastream.api.ExecutionEnvironment;
+import org.apache.flink.datastream.api.common.Collector;
+import org.apache.flink.datastream.api.context.PartitionedContext;
+import org.apache.flink.datastream.api.extension.eventtime.EventTimeExtension;
+import 
org.apache.flink.datastream.api.extension.eventtime.function.OneInputEventTimeStreamProcessFunction;
+import 
org.apache.flink.datastream.api.extension.eventtime.timer.EventTimeManager;
+import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream;
+import org.apache.flink.streaming.api.functions.sink.PrintSink;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
+import 
org.apache.flink.streaming.examples.dsv2.eventtime.util.StatisticNewsClickNumberData;
+import org.apache.flink.util.ParameterTool;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This example illustrates how to count the number of clicks on each news at 
1 minute, 5 minutes,
+ * 10 minutes, and 1 hour after news publication.
+ *
+ * <p>The input consists of a series of {@link NewsEvent}s, which fall into 
two categories: news
+ * releases and news clicks. Each {@link NewsEvent} contains three components: 
the event type, the
+ * news ID and the timestamp. Notably, there is only one event of type {@link 
NewsEventType#RELEASE}
+ * for each news.
+ *
+ * <p>Usage:
+ *
+ * <ul>
+ *   <li><code>--output &lt;path&gt;</code>The output directory where the Job 
will write the
+ *       results. If no output path is provided, the Job will print the 
results to <code>stdout
+ *       </code>.
+ * </ul>
+ *
+ * <p>This example shows how to:
+ *
+ * <ul>
+ *   <li>Usage of Event Time extension in DataStream API V2
+ * </ul>
+ *
+ * <p>Please note that if you intend to run this example in an IDE, you must 
first add the following
+ * VM options: "--add-opens=java.base/java.util=ALL-UNNAMED". This is 
necessary because the module
+ * system in JDK 17+ restricts some reflection operations.
+ *
+ * <p>Please note that the DataStream API V2 is a new set of APIs, to 
gradually replace the original
+ * DataStream API. It is currently in the experimental stage and is not fully 
available for
+ * production.
+ */
+public class StatisticNewsClickNumber {
+
+    // we will count the number of clicks within 1min/5min/10min/30min/1hour 
after the news release
+    private static final Duration[] TIMES_AFTER_NEWS_RELEASE =
+            new Duration[] {
+                Duration.ofMinutes(1),
+                Duration.ofMinutes(5),
+                Duration.ofMinutes(10),
+                Duration.ofMinutes(30),
+                Duration.ofHours(1)
+            };
+
+    /**
+     * The type of {@link NewsEvent}, note that only one event of type {@link 
NewsEventType#RELEASE}
+     * for each news.
+     */
+    public enum NewsEventType {
+        RELEASE,
+        CLICK
+    }
+
+    /**
+     * The {@link NewsEvent} represents an event on news, containing the event 
type, news id and the
+     * timestamp.
+     */
+    public static class NewsEvent {
+        public NewsEventType type;
+        public long newsId;
+        public long timestamp;
+
+        public NewsEvent(NewsEventType type, long newsId, long timestamp) {
+            this.type = type;
+            this.newsId = newsId;
+            this.timestamp = timestamp;
+        }
+    }
+
+    /**
+     * The {@link NewsClickNumber} represents the number of clicks on news 
within a specified
+     * duration following its release. For example, 
NewsClickNumber{newsId="12345678",
+     * timeAfterRelease=60000, clickNumber=132} indicates that the news 
"12345678" has been clicked
+     * 132 times within 60,000 milliseconds after its release.
+     */
+    public static class NewsClickNumber {
+        public long newsId;
+        public long timeAfterRelease;
+        public long clickNumber;
+
+        public NewsClickNumber(long newsId, long timeAfterRelease, long 
clickCount) {
+            this.newsId = newsId;
+            this.timeAfterRelease = timeAfterRelease;
+            this.clickNumber = clickCount;
+        }
+
+        public String toString() {
+            return String.format(
+                    "(%d,%d,%d)", this.newsId, this.timeAfterRelease, 
this.clickNumber);
+        }
+    }
+
+    public static void main(String[] args) throws Exception {
+        // parse the parameters
+        final ParameterTool params = ParameterTool.fromArgs(args);
+        final boolean fileOutput = params.has("output");
+
+        // obtain execution environment
+        ExecutionEnvironment env = ExecutionEnvironment.getInstance();
+
+        // the input consists of a series of {code NewsEvent}s, which include 
two types: news
+        // release event and news click event.
+        NonKeyedPartitionStream<NewsEvent> source =
+                env.fromSource(
+                        DataStreamV2SourceUtils.fromData(
+                                
Arrays.asList(StatisticNewsClickNumberData.NEWS_EVENTS)),
+                        "news event source");
+
+        NonKeyedPartitionStream<NewsClickNumber> clickNumberStream =
+                // extract event time and generate the event time watermark
+                source.process(
+                                // the timestamp field of the input is 
considered to be the
+                                // event time
+                                
EventTimeExtension.<NewsEvent>newWatermarkGeneratorBuilder(
+                                                event -> event.timestamp)
+                                        // generate event time watermarks 
every 200ms
+                                        
.periodicWatermark(Duration.ofMillis(200))
+                                        // if the input is idle for more than 
30 seconds, it
+                                        // is ignored during the event time 
watermark
+                                        // combination process
+                                        .withIdleness(Duration.ofSeconds(30))
+                                        // set the maximum out-of-order time 
of the event to
+                                        // 30 seconds, meaning that if an 
event is received
+                                        // at 12:00:00, then no further events 
should be
+                                        // received earlier than 11:59:30
+                                        
.withMaxOutOfOrderTime(Duration.ofSeconds(10))
+                                        // build the event time watermark 
generator as
+                                        // ProcessFunction
+                                        .buildAsProcessFunction())
+                        // key by the news id
+                        .keyBy(event -> event.newsId)
+                        // count the click number of each news
+                        .process(
+                                EventTimeExtension.wrapProcessFunction(
+                                        new 
CountNewsClickNumberProcessFunction()));
+
+        if (fileOutput) {
+            // write results to file
+            clickNumberStream
+                    .toSink(
+                            new WrappedSink<>(
+                                    FileSink.<NewsClickNumber>forRowFormat(
+                                                    new 
Path(params.get("output")),
+                                                    new 
SimpleStringEncoder<>())
+                                            .withRollingPolicy(
+                                                    
DefaultRollingPolicy.builder()
+                                                            .withMaxPartSize(
+                                                                    
MemorySize.ofMebiBytes(1))
+                                                            
.withRolloverInterval(
+                                                                    
Duration.ofSeconds(10))
+                                                            .build())
+                                            .build()))
+                    .withName("output");
+        } else {
+            // Print the results to the STDOUT.
+            clickNumberStream.toSink(new WrappedSink<>(new 
PrintSink<>())).withName("print-sink");
+        }
+
+        env.execute("StatisticNewsClickNumberExample");
+    }
+
+    /**
+     * This process function will consume {@link NewsEvent} and count the 
number of clicks within 1
+     * minute, 5 minutes, 10 minutes, 30 minutes and 1 hour of the news 
releasing and send the
+     * results {@link NewsClickNumber} to the output.
+     *
+     * <p>To achieve the goal, we will register a series of timers for the 
news, which will be
+     * triggered at the time of the news's release time + 1 minute/5 
minutes/10 minutes/30 minutes/1
+     * hour, and record a list of the click times of each news. In the timer 
callback {@code
+     * onEventTimer}, we will count the number of clicks between the news 
release time and the timer
+     * trigger timer and send the result to the output.
+     */
+    public static class CountNewsClickNumberProcessFunction
+            implements OneInputEventTimeStreamProcessFunction<NewsEvent, 
NewsClickNumber> {
+
+        private EventTimeManager eventTimeManager;
+
+        // news id to release time
+        private final Map<Long, Long> releaseTimeOfNews = new HashMap<>();
+
+        // news id to click time list
+        private final Map<Long, List<Long>> clickTimeListOfNews = new 
HashMap<>();

Review Comment:
   Use states.



##########
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/dsv2/watermark/SyncOffsetByWatermark.java:
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.dsv2.watermark;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.watermark.LongWatermark;
+import org.apache.flink.api.common.watermark.LongWatermarkDeclaration;
+import org.apache.flink.api.common.watermark.Watermark;
+import org.apache.flink.api.common.watermark.WatermarkDeclaration;
+import org.apache.flink.api.common.watermark.WatermarkDeclarations;
+import org.apache.flink.api.common.watermark.WatermarkHandlingResult;
+import org.apache.flink.api.connector.dsv2.WrappedSource;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.connector.datagen.source.DataGeneratorSource;
+import org.apache.flink.connector.datagen.source.GeneratorFunction;
+import org.apache.flink.datastream.api.ExecutionEnvironment;
+import org.apache.flink.datastream.api.common.Collector;
+import org.apache.flink.datastream.api.context.NonPartitionedContext;
+import org.apache.flink.datastream.api.context.PartitionedContext;
+import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction;
+import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream;
+import org.apache.flink.util.ParameterTool;
+
+import java.util.Set;
+
+/**
+ * The example simulates Flink reading a message queue. Each partition of the 
topic has its own
+ * consumption offset, and the example shows how to synchronise these offsets.
+ *
+ * <p>The example uses a {@link DataGeneratorSource} to simulate a message 
queue and a custom {@link
+ * Watermark} to propagate the offset in the stream.
+ *
+ * <p>Usage:
+ *
+ * <ul>
+ *   <li><code>--parallelism &lt;path&gt;</code>The parallelism of the 
simulated message queue
+ *       source, it is equal to the number of consumed topic partitions in 
this example. The default
+ *       value is 5.
+ * </ul>
+ *
+ * <p>This example shows how to:
+ *
+ * <ul>
+ *   <li>How to define and propagate custom event (Watermark) in DataStream 
API V2
+ * </ul>
+ *
+ * <p>Please note that if you intend to run this example in an IDE, you must 
first add the following
+ * VM options: "--add-opens=java.base/java.util=ALL-UNNAMED". This is 
necessary because the module
+ * system in JDK 17+ restricts some reflection operations.
+ *
+ * <p>Please note that the DataStream API V2 is a new set of APIs, to 
gradually replace the original
+ * DataStream API. It is currently in the experimental stage and is not fully 
available for
+ * production.
+ */
+public class SyncOffsetByWatermark {

Review Comment:
   The use case doesn't really make sense. Why synchronizing the offsets? What 
is the point of logging a combined offset?
   
   I would suggest a different use case. E.g., rate / timeout control. 



##########
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/dsv2/eventtime/StatisticNewsClickNumber.java:
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.dsv2.eventtime;
+
+import org.apache.flink.api.common.serialization.SimpleStringEncoder;
+import org.apache.flink.api.connector.dsv2.DataStreamV2SourceUtils;
+import org.apache.flink.api.connector.dsv2.WrappedSink;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.datastream.api.ExecutionEnvironment;
+import org.apache.flink.datastream.api.common.Collector;
+import org.apache.flink.datastream.api.context.PartitionedContext;
+import org.apache.flink.datastream.api.extension.eventtime.EventTimeExtension;
+import 
org.apache.flink.datastream.api.extension.eventtime.function.OneInputEventTimeStreamProcessFunction;
+import 
org.apache.flink.datastream.api.extension.eventtime.timer.EventTimeManager;
+import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream;
+import org.apache.flink.streaming.api.functions.sink.PrintSink;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
+import 
org.apache.flink.streaming.examples.dsv2.eventtime.util.StatisticNewsClickNumberData;
+import org.apache.flink.util.ParameterTool;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This example illustrates how to count the number of clicks on each news at 
1 minute, 5 minutes,
+ * 10 minutes, and 1 hour after news publication.
+ *
+ * <p>The input consists of a series of {@link NewsEvent}s, which fall into 
two categories: news
+ * releases and news clicks. Each {@link NewsEvent} contains three components: 
the event type, the
+ * news ID and the timestamp. Notably, there is only one event of type {@link 
NewsEventType#RELEASE}
+ * for each news.
+ *
+ * <p>Usage:
+ *
+ * <ul>
+ *   <li><code>--output &lt;path&gt;</code>The output directory where the Job 
will write the
+ *       results. If no output path is provided, the Job will print the 
results to <code>stdout
+ *       </code>.
+ * </ul>
+ *
+ * <p>This example shows how to:
+ *
+ * <ul>
+ *   <li>Usage of Event Time extension in DataStream API V2
+ * </ul>
+ *
+ * <p>Please note that if you intend to run this example in an IDE, you must 
first add the following
+ * VM options: "--add-opens=java.base/java.util=ALL-UNNAMED". This is 
necessary because the module
+ * system in JDK 17+ restricts some reflection operations.
+ *
+ * <p>Please note that the DataStream API V2 is a new set of APIs, to 
gradually replace the original
+ * DataStream API. It is currently in the experimental stage and is not fully 
available for
+ * production.
+ */
+public class StatisticNewsClickNumber {
+
+    // we will count the number of clicks within 1min/5min/10min/30min/1hour 
after the news release
+    private static final Duration[] TIMES_AFTER_NEWS_RELEASE =
+            new Duration[] {
+                Duration.ofMinutes(1),
+                Duration.ofMinutes(5),
+                Duration.ofMinutes(10),
+                Duration.ofMinutes(30),
+                Duration.ofHours(1)
+            };
+
+    /**
+     * The type of {@link NewsEvent}, note that only one event of type {@link 
NewsEventType#RELEASE}
+     * for each news.
+     */
+    public enum NewsEventType {
+        RELEASE,

Review Comment:
   ```suggestion
           PUBLISH,
   ```



##########
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/dsv2/eventtime/StatisticNewsClickNumber.java:
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.dsv2.eventtime;
+
+import org.apache.flink.api.common.serialization.SimpleStringEncoder;
+import org.apache.flink.api.connector.dsv2.DataStreamV2SourceUtils;
+import org.apache.flink.api.connector.dsv2.WrappedSink;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.datastream.api.ExecutionEnvironment;
+import org.apache.flink.datastream.api.common.Collector;
+import org.apache.flink.datastream.api.context.PartitionedContext;
+import org.apache.flink.datastream.api.extension.eventtime.EventTimeExtension;
+import 
org.apache.flink.datastream.api.extension.eventtime.function.OneInputEventTimeStreamProcessFunction;
+import 
org.apache.flink.datastream.api.extension.eventtime.timer.EventTimeManager;
+import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream;
+import org.apache.flink.streaming.api.functions.sink.PrintSink;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
+import 
org.apache.flink.streaming.examples.dsv2.eventtime.util.StatisticNewsClickNumberData;
+import org.apache.flink.util.ParameterTool;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This example illustrates how to count the number of clicks on each news at 
1 minute, 5 minutes,
+ * 10 minutes, and 1 hour after news publication.
+ *
+ * <p>The input consists of a series of {@link NewsEvent}s, which fall into 
two categories: news
+ * releases and news clicks. Each {@link NewsEvent} contains three components: 
the event type, the
+ * news ID and the timestamp. Notably, there is only one event of type {@link 
NewsEventType#RELEASE}
+ * for each news.
+ *
+ * <p>Usage:
+ *
+ * <ul>
+ *   <li><code>--output &lt;path&gt;</code>The output directory where the Job 
will write the
+ *       results. If no output path is provided, the Job will print the 
results to <code>stdout
+ *       </code>.
+ * </ul>
+ *
+ * <p>This example shows how to:
+ *
+ * <ul>
+ *   <li>Usage of Event Time extension in DataStream API V2
+ * </ul>
+ *
+ * <p>Please note that if you intend to run this example in an IDE, you must 
first add the following
+ * VM options: "--add-opens=java.base/java.util=ALL-UNNAMED". This is 
necessary because the module
+ * system in JDK 17+ restricts some reflection operations.
+ *
+ * <p>Please note that the DataStream API V2 is a new set of APIs, to 
gradually replace the original
+ * DataStream API. It is currently in the experimental stage and is not fully 
available for
+ * production.
+ */
+public class StatisticNewsClickNumber {

Review Comment:
   ```suggestion
   public class CountNewsClicks {
   ```



##########
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/dsv2/eventtime/util/StatisticNewsClickNumberData.java:
##########
@@ -0,0 +1,633 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.dsv2.eventtime.util;
+
+import 
org.apache.flink.streaming.examples.dsv2.eventtime.StatisticNewsClickNumber;
+import 
org.apache.flink.streaming.examples.dsv2.eventtime.StatisticNewsClickNumber.NewsEvent;
+import 
org.apache.flink.streaming.examples.dsv2.eventtime.StatisticNewsClickNumber.NewsEventType;
+
+/** Sample data for the {@link StatisticNewsClickNumber} example. */
+public class StatisticNewsClickNumberData {

Review Comment:
   Same for other examples.



##########
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/dsv2/wordcount/WordCount.java:
##########
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.dsv2.wordcount;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.common.serialization.SimpleStringEncoder;
+import org.apache.flink.api.common.state.StateDeclaration;
+import org.apache.flink.api.common.state.StateDeclarations;
+import org.apache.flink.api.common.state.ValueStateDeclaration;
+import org.apache.flink.api.common.typeinfo.TypeDescriptors;
+import org.apache.flink.api.connector.dsv2.DataStreamV2SourceUtils;
+import org.apache.flink.api.connector.dsv2.WrappedSink;
+import org.apache.flink.api.connector.dsv2.WrappedSource;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.connector.file.src.FileSource;
+import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
+import org.apache.flink.datastream.api.ExecutionEnvironment;
+import org.apache.flink.datastream.api.common.Collector;
+import org.apache.flink.datastream.api.context.PartitionedContext;
+import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction;
+import org.apache.flink.datastream.api.stream.KeyedPartitionStream;
+import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream;
+import org.apache.flink.streaming.api.functions.sink.PrintSink;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
+import org.apache.flink.streaming.examples.wordcount.util.CLI;
+import org.apache.flink.streaming.examples.wordcount.util.WordCountData;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Set;
+
+/**
+ * Implements the "WordCount" program by DataStream API V2 that computes a 
simple word occurrence
+ * histogram over text files. This Job can be executed in both streaming and 
batch execution modes.
+ *
+ * <p>The input is a [list of] plain text file[s] with lines separated by a 
newline character.
+ *
+ * <p>Usage:
+ *
+ * <ul>
+ *   <li><code>--input &lt;path&gt;</code>A list of input files and / or 
directories to read. If no
+ *       input is provided, the program is run with default data from {@link 
WordCountData}.
+ *   <li><code>--discovery-interval &lt;duration&gt;</code>Turns the file 
reader into a continuous
+ *       source that will monitor the provided input directories every 
interval and read any new
+ *       files.
+ *   <li><code>--output &lt;path&gt;</code>The output directory where the Job 
will write the
+ *       results. If no output path is provided, the Job will print the 
results to <code>stdout
+ *       </code>.
+ *   <li><code>--execution-mode &lt;mode&gt;</code>The execution mode (BATCH, 
STREAMING, or
+ *       AUTOMATIC) of this pipeline.
+ * </ul>
+ *
+ * <p>This example shows how to:
+ *
+ * <ul>
+ *   <li>Write a simple Flink program by DataStream API V2
+ *   <li>Use tuple data types
+ *   <li>Write and use a user-defined process function
+ * </ul>
+ *
+ * <p>Please note that if you intend to run this example in an IDE, you must 
first add the following
+ * VM options: "--add-opens=java.base/java.util=ALL-UNNAMED". This is 
necessary because the module
+ * system in JDK 17+ restricts some reflection operations.
+ *
+ * <p>Please note that the DataStream API V2 is a new set of APIs, to 
gradually replace the original
+ * DataStream API. It is currently in the experimental stage and is not fully 
available for
+ * production.
+ */
+public class WordCount {
+
+    // 
*************************************************************************
+    // PROGRAM
+    // 
*************************************************************************
+
+    public static void main(String[] args) throws Exception {
+        final CLI params = CLI.fromArgs(args);
+
+        // Get the execution environment instance. This is the main entrypoint
+        // to building a Flink application.
+        final ExecutionEnvironment env = ExecutionEnvironment.getInstance();
+
+        // Apache Flink’s unified approach to stream and batch processing 
means that a DataStream
+        // application executed over bounded input will produce the same final 
results regardless
+        // of the configured execution mode. It is important to note what 
final means here: a job
+        // executing in STREAMING mode might produce incremental updates 
(think upserts in
+        // a database) while in BATCH mode, it would only produce one final 
result at the end. The
+        // final result will be the same if interpreted correctly, but getting 
there can be
+        // different.
+        //
+        // The “classic” execution behavior of the DataStream API is called 
STREAMING execution
+        // mode. Applications should use streaming execution for unbounded 
jobs that require
+        // continuous incremental processing and are expected to stay online 
indefinitely.
+        //
+        // By enabling BATCH execution, we allow Flink to apply additional 
optimizations that we
+        // can only do when we know that our input is bounded. For example, 
different
+        // join/aggregation strategies can be used, in addition to a different 
shuffle
+        // implementation that allows more efficient task scheduling and 
failure recovery behavior.
+        //
+        // By setting the runtime mode to AUTOMATIC, Flink will choose BATCH 
if all sources
+        // are bounded and otherwise STREAMING.
+        env.setExecutionMode(params.getExecutionMode());
+
+        NonKeyedPartitionStream<String> text;
+
+        if (params.getInputs().isPresent()) {
+            // Create a new file source that will read files from a given set 
of directories.
+            // Each file will be processed as plain text and split based on 
newlines.
+            FileSource.FileSourceBuilder<String> builder =
+                    FileSource.forRecordStreamFormat(
+                            new TextLineInputFormat(), 
params.getInputs().get());
+
+            // If a discovery interval is provided, the source will
+            // continuously watch the given directories for new files.
+            
params.getDiscoveryInterval().ifPresent(builder::monitorContinuously);
+
+            text = env.fromSource(new WrappedSource<>(builder.build()), 
"file-input");
+        } else {
+            // Create a new from data source with default data {@code 
WordCountData}.
+            text =
+                    env.fromSource(
+                            
DataStreamV2SourceUtils.fromData(Arrays.asList(WordCountData.WORDS)),
+                            "in-memory-input");
+        }
+
+        KeyedPartitionStream<String, Tuple2<String, Integer>> keyedStream =
+                // The text lines read from the source are split into words
+                // using a user-defined process function. The tokenizer, 
implemented below,
+                // will output each word as a (2-tuple) containing (word, 1)
+                text.process(new Tokenizer())
+                        .withName("tokenizer")
+                        // keyBy groups tuples based on the first field, the 
word.
+                        // Using a keyBy allows performing aggregations and 
other
+                        // stateful transformations over data on a per-key 
basis.
+                        // This is similar to a GROUP BY clause in a SQL query.
+                        .keyBy(value -> value.f0);
+
+        // For each key, we perform a simple sum of the second field, the 
count by user-defined
+        // StreamingCounter/BatchCounter process function.
+        // If the execution mode is streaming, it will continuously output 
updates each time it sees
+        // a new instance of each word in the stream.
+        // If the execution mode is batch, sum will output a final count for
+        // each word.
+        // Flink will provide more built-in functions, such as aggregation, in 
the future. This will
+        // allow users to avoid having to write their own StreamingCounter or 
BatchCounter process
+        // functions in this application.
+        NonKeyedPartitionStream<Tuple2<String, Integer>> counts;
+        if (params.getExecutionMode() == RuntimeExecutionMode.STREAMING) {
+            counts = keyedStream.process(new StreamingCounter());
+        } else {
+            counts = keyedStream.process(new BatchCounter());
+        }
+
+        if (params.getOutput().isPresent()) {
+            // Given an output directory, Flink will write the results to a 
file
+            // using a simple string encoding. In a production environment, 
this might
+            // be something more structured like CSV, Avro, JSON, or Parquet.
+            counts.toSink(
+                            new WrappedSink<>(
+                                    FileSink.<Tuple2<String, 
Integer>>forRowFormat(
+                                                    params.getOutput().get(),
+                                                    new 
SimpleStringEncoder<>())
+                                            .withRollingPolicy(
+                                                    
DefaultRollingPolicy.builder()
+                                                            .withMaxPartSize(
+                                                                    
MemorySize.ofMebiBytes(1))
+                                                            
.withRolloverInterval(
+                                                                    
Duration.ofSeconds(10))
+                                                            .build())
+                                            .build()))
+                    .withName("file-sink");
+        } else {
+            // Print the results to the STDOUT.
+            counts.toSink(new WrappedSink<>(new 
PrintSink<>())).withName("print-sink");
+        }
+
+        // Apache Flink applications are composed lazily. Calling execute
+        // submits the Job and begins processing.
+        env.execute("WordCount");
+    }
+
+    // 
*************************************************************************
+    // USER PROCESS FUNCTIONS
+    // 
*************************************************************************
+
+    /**
+     * Implements the string tokenizer that splits sentences into words as a 
user-defined
+     * ProcessFunction. The process function takes a line (String) and splits 
it into multiple pairs
+     * in the form of "(word,1)" ({@code Tuple2<String, Integer>}).
+     */
+    public static final class Tokenizer
+            implements OneInputStreamProcessFunction<String, Tuple2<String, 
Integer>> {
+
+        @Override
+        public void processRecord(
+                String record,
+                Collector<Tuple2<String, Integer>> output,
+                PartitionedContext<Tuple2<String, Integer>> ctx)
+                throws Exception {
+            // normalize and split the line
+            String[] tokens = record.toLowerCase().split("\\W+");
+
+            // emit the pairs
+            for (String token : tokens) {
+                if (!token.isEmpty()) {
+                    output.collect(new Tuple2<>(token, 1));
+                }
+            }
+        }
+    }
+
+    /**
+     * Implements a word counter as a user-defined ProcessFunction that counts 
received words in
+     * streaming mode. The function uses a ValueState to store the count of 
each word, it will
+     * update the count of word and output the result when receive a record 
"(word,1)".
+     *
+     * <p>Note that this is just an example of how to code a streaming job 
using the DataStream API
+     * V2. It currently involves some complexity. In the future, we will 
provide more user-friendly
+     * APIs and extensions to simplify the process, eliminating the need for 
users to distinguish
+     * between process functions in streaming and batch execution modes.
+     */
+    public static final class StreamingCounter
+            implements OneInputStreamProcessFunction<
+                    Tuple2<String, Integer>, Tuple2<String, Integer>> {
+
+        // uses a ValueState to store the count of each word
+        private final ValueStateDeclaration<Integer> countStateDeclaration =
+                StateDeclarations.valueState("count", TypeDescriptors.INT);
+
+        @Override
+        public Set<StateDeclaration> usesStates() {
+            // declare a ValueState to store the count of each word
+            return Set.of(countStateDeclaration);
+        }
+
+        @Override
+        public void processRecord(
+                Tuple2<String, Integer> record,
+                Collector<Tuple2<String, Integer>> output,
+                PartitionedContext<Tuple2<String, Integer>> ctx)
+                throws Exception {
+            // calculate the new count of the word
+            String word = record.f0;
+            Integer count = record.f1;
+            Integer previousCount =
+                    
ctx.getStateManager().getState(countStateDeclaration).get().value();
+            Integer newlyCount = previousCount == null ? count : previousCount 
+ count;
+
+            // update the count of the word
+            
ctx.getStateManager().getState(countStateDeclaration).get().update(newlyCount);
+
+            // output the result
+            output.collect(Tuple2.of(word, newlyCount));
+        }
+    }
+
+    /**
+     * Implements a word counter as a user-defined ProcessFunction that counts 
received words in
+     * batch mode. The function uses a ValueState to store the count of each 
word and registers a
+     * processing timer with Long.MAX_VALUE. This timer will be invoked when 
all data for the same
+     * key has been processed, at which point it outputs the result.
+     *
+     * <p>Note that this is just an example of how to code a batch job using 
the DataStream API V2.
+     * It currently involves some complexity. In the future, we will provide 
more user-friendly APIs
+     * and extensions to simplify the process, eliminating the need for users 
to distinguish between
+     * process functions in streaming and batch execution modes.
+     */
+    public static final class BatchCounter
+            implements OneInputStreamProcessFunction<
+                    Tuple2<String, Integer>, Tuple2<String, Integer>> {
+
+        // uses a ValueState to store the count of each word
+        private final ValueStateDeclaration<Integer> countStateDeclaration =
+                StateDeclarations.valueState("count", TypeDescriptors.INT);
+
+        @Override
+        public Set<StateDeclaration> usesStates() {
+            // declare a ValueState to store the count of each word
+            return Set.of(countStateDeclaration);
+        }
+
+        @Override
+        public void processRecord(
+                Tuple2<String, Integer> record,
+                Collector<Tuple2<String, Integer>> output,
+                PartitionedContext<Tuple2<String, Integer>> ctx)
+                throws Exception {
+            // calculate the new count of the word
+            Integer count = record.f1;
+            Integer previousCount =
+                    
ctx.getStateManager().getState(countStateDeclaration).get().value();
+            Integer newlyCount = previousCount == null ? count : previousCount 
+ count;
+
+            // update the count of the word
+            
ctx.getStateManager().getState(countStateDeclaration).get().update(newlyCount);
+
+            // register a timer to output the result
+            ctx.getProcessingTimeManager().registerTimer(Long.MAX_VALUE);

Review Comment:
   1. Should not register a new timer for every record.
   2. Should not have this special timer. Instead, we can do:
   ```
   public void endInput(NonPartitionedContext ctx) {
       ctx.applyToAllPartitions((output, parCtx) -> {
           parCtx.getStateManager().getState(xxx);
           output.collect(xxx);
       });
   }
   ```



##########
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/dsv2/eventtime/util/StatisticNewsClickNumberData.java:
##########
@@ -0,0 +1,633 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.dsv2.eventtime.util;
+
+import 
org.apache.flink.streaming.examples.dsv2.eventtime.StatisticNewsClickNumber;
+import 
org.apache.flink.streaming.examples.dsv2.eventtime.StatisticNewsClickNumber.NewsEvent;
+import 
org.apache.flink.streaming.examples.dsv2.eventtime.StatisticNewsClickNumber.NewsEventType;
+
+/** Sample data for the {@link StatisticNewsClickNumber} example. */
+public class StatisticNewsClickNumberData {

Review Comment:
   Use a csv file.



##########
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/dsv2/wordcount/WordCount.java:
##########
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.dsv2.wordcount;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.common.serialization.SimpleStringEncoder;
+import org.apache.flink.api.common.state.StateDeclaration;
+import org.apache.flink.api.common.state.StateDeclarations;
+import org.apache.flink.api.common.state.ValueStateDeclaration;
+import org.apache.flink.api.common.typeinfo.TypeDescriptors;
+import org.apache.flink.api.connector.dsv2.DataStreamV2SourceUtils;
+import org.apache.flink.api.connector.dsv2.WrappedSink;
+import org.apache.flink.api.connector.dsv2.WrappedSource;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.connector.file.src.FileSource;
+import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
+import org.apache.flink.datastream.api.ExecutionEnvironment;
+import org.apache.flink.datastream.api.common.Collector;
+import org.apache.flink.datastream.api.context.PartitionedContext;
+import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction;
+import org.apache.flink.datastream.api.stream.KeyedPartitionStream;
+import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream;
+import org.apache.flink.streaming.api.functions.sink.PrintSink;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
+import org.apache.flink.streaming.examples.wordcount.util.CLI;
+import org.apache.flink.streaming.examples.wordcount.util.WordCountData;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Set;
+
+/**
+ * Implements the "WordCount" program by DataStream API V2 that computes a 
simple word occurrence
+ * histogram over text files. This Job can be executed in both streaming and 
batch execution modes.
+ *
+ * <p>The input is a [list of] plain text file[s] with lines separated by a 
newline character.
+ *
+ * <p>Usage:
+ *
+ * <ul>
+ *   <li><code>--input &lt;path&gt;</code>A list of input files and / or 
directories to read. If no
+ *       input is provided, the program is run with default data from {@link 
WordCountData}.
+ *   <li><code>--discovery-interval &lt;duration&gt;</code>Turns the file 
reader into a continuous
+ *       source that will monitor the provided input directories every 
interval and read any new
+ *       files.
+ *   <li><code>--output &lt;path&gt;</code>The output directory where the Job 
will write the
+ *       results. If no output path is provided, the Job will print the 
results to <code>stdout
+ *       </code>.
+ *   <li><code>--execution-mode &lt;mode&gt;</code>The execution mode (BATCH, 
STREAMING, or
+ *       AUTOMATIC) of this pipeline.
+ * </ul>
+ *
+ * <p>This example shows how to:
+ *
+ * <ul>
+ *   <li>Write a simple Flink program by DataStream API V2
+ *   <li>Use tuple data types
+ *   <li>Write and use a user-defined process function
+ * </ul>
+ *
+ * <p>Please note that if you intend to run this example in an IDE, you must 
first add the following
+ * VM options: "--add-opens=java.base/java.util=ALL-UNNAMED". This is 
necessary because the module
+ * system in JDK 17+ restricts some reflection operations.
+ *
+ * <p>Please note that the DataStream API V2 is a new set of APIs, to 
gradually replace the original
+ * DataStream API. It is currently in the experimental stage and is not fully 
available for
+ * production.
+ */
+public class WordCount {
+
+    // 
*************************************************************************
+    // PROGRAM
+    // 
*************************************************************************
+
+    public static void main(String[] args) throws Exception {
+        final CLI params = CLI.fromArgs(args);
+
+        // Get the execution environment instance. This is the main entrypoint
+        // to building a Flink application.
+        final ExecutionEnvironment env = ExecutionEnvironment.getInstance();
+
+        // Apache Flink’s unified approach to stream and batch processing 
means that a DataStream
+        // application executed over bounded input will produce the same final 
results regardless
+        // of the configured execution mode. It is important to note what 
final means here: a job
+        // executing in STREAMING mode might produce incremental updates 
(think upserts in
+        // a database) while in BATCH mode, it would only produce one final 
result at the end. The
+        // final result will be the same if interpreted correctly, but getting 
there can be
+        // different.
+        //
+        // The “classic” execution behavior of the DataStream API is called 
STREAMING execution
+        // mode. Applications should use streaming execution for unbounded 
jobs that require
+        // continuous incremental processing and are expected to stay online 
indefinitely.
+        //
+        // By enabling BATCH execution, we allow Flink to apply additional 
optimizations that we
+        // can only do when we know that our input is bounded. For example, 
different
+        // join/aggregation strategies can be used, in addition to a different 
shuffle
+        // implementation that allows more efficient task scheduling and 
failure recovery behavior.
+        //
+        // By setting the runtime mode to AUTOMATIC, Flink will choose BATCH 
if all sources
+        // are bounded and otherwise STREAMING.
+        env.setExecutionMode(params.getExecutionMode());
+
+        NonKeyedPartitionStream<String> text;
+
+        if (params.getInputs().isPresent()) {
+            // Create a new file source that will read files from a given set 
of directories.
+            // Each file will be processed as plain text and split based on 
newlines.
+            FileSource.FileSourceBuilder<String> builder =
+                    FileSource.forRecordStreamFormat(
+                            new TextLineInputFormat(), 
params.getInputs().get());
+
+            // If a discovery interval is provided, the source will
+            // continuously watch the given directories for new files.
+            
params.getDiscoveryInterval().ifPresent(builder::monitorContinuously);
+
+            text = env.fromSource(new WrappedSource<>(builder.build()), 
"file-input");
+        } else {
+            // Create a new from data source with default data {@code 
WordCountData}.
+            text =
+                    env.fromSource(
+                            
DataStreamV2SourceUtils.fromData(Arrays.asList(WordCountData.WORDS)),
+                            "in-memory-input");
+        }
+
+        KeyedPartitionStream<String, Tuple2<String, Integer>> keyedStream =
+                // The text lines read from the source are split into words
+                // using a user-defined process function. The tokenizer, 
implemented below,
+                // will output each word as a (2-tuple) containing (word, 1)
+                text.process(new Tokenizer())
+                        .withName("tokenizer")
+                        // keyBy groups tuples based on the first field, the 
word.
+                        // Using a keyBy allows performing aggregations and 
other
+                        // stateful transformations over data on a per-key 
basis.
+                        // This is similar to a GROUP BY clause in a SQL query.
+                        .keyBy(value -> value.f0);
+
+        // For each key, we perform a simple sum of the second field, the 
count by user-defined
+        // StreamingCounter/BatchCounter process function.
+        // If the execution mode is streaming, it will continuously output 
updates each time it sees
+        // a new instance of each word in the stream.
+        // If the execution mode is batch, sum will output a final count for
+        // each word.
+        // Flink will provide more built-in functions, such as aggregation, in 
the future. This will
+        // allow users to avoid having to write their own StreamingCounter or 
BatchCounter process
+        // functions in this application.
+        NonKeyedPartitionStream<Tuple2<String, Integer>> counts;
+        if (params.getExecutionMode() == RuntimeExecutionMode.STREAMING) {
+            counts = keyedStream.process(new StreamingCounter());
+        } else {
+            counts = keyedStream.process(new BatchCounter());
+        }
+
+        if (params.getOutput().isPresent()) {
+            // Given an output directory, Flink will write the results to a 
file
+            // using a simple string encoding. In a production environment, 
this might
+            // be something more structured like CSV, Avro, JSON, or Parquet.
+            counts.toSink(
+                            new WrappedSink<>(
+                                    FileSink.<Tuple2<String, 
Integer>>forRowFormat(
+                                                    params.getOutput().get(),
+                                                    new 
SimpleStringEncoder<>())
+                                            .withRollingPolicy(
+                                                    
DefaultRollingPolicy.builder()
+                                                            .withMaxPartSize(
+                                                                    
MemorySize.ofMebiBytes(1))
+                                                            
.withRolloverInterval(
+                                                                    
Duration.ofSeconds(10))
+                                                            .build())
+                                            .build()))
+                    .withName("file-sink");
+        } else {
+            // Print the results to the STDOUT.
+            counts.toSink(new WrappedSink<>(new 
PrintSink<>())).withName("print-sink");
+        }
+
+        // Apache Flink applications are composed lazily. Calling execute
+        // submits the Job and begins processing.
+        env.execute("WordCount");
+    }
+
+    // 
*************************************************************************
+    // USER PROCESS FUNCTIONS
+    // 
*************************************************************************
+
+    /**
+     * Implements the string tokenizer that splits sentences into words as a 
user-defined
+     * ProcessFunction. The process function takes a line (String) and splits 
it into multiple pairs
+     * in the form of "(word,1)" ({@code Tuple2<String, Integer>}).
+     */
+    public static final class Tokenizer
+            implements OneInputStreamProcessFunction<String, Tuple2<String, 
Integer>> {
+
+        @Override
+        public void processRecord(
+                String record,
+                Collector<Tuple2<String, Integer>> output,
+                PartitionedContext<Tuple2<String, Integer>> ctx)
+                throws Exception {
+            // normalize and split the line
+            String[] tokens = record.toLowerCase().split("\\W+");
+
+            // emit the pairs
+            for (String token : tokens) {
+                if (!token.isEmpty()) {
+                    output.collect(new Tuple2<>(token, 1));
+                }
+            }
+        }
+    }
+
+    /**
+     * Implements a word counter as a user-defined ProcessFunction that counts 
received words in
+     * streaming mode. The function uses a ValueState to store the count of 
each word, it will
+     * update the count of word and output the result when receive a record 
"(word,1)".
+     *
+     * <p>Note that this is just an example of how to code a streaming job 
using the DataStream API
+     * V2. It currently involves some complexity. In the future, we will 
provide more user-friendly
+     * APIs and extensions to simplify the process, eliminating the need for 
users to distinguish
+     * between process functions in streaming and batch execution modes.
+     */
+    public static final class StreamingCounter
+            implements OneInputStreamProcessFunction<
+                    Tuple2<String, Integer>, Tuple2<String, Integer>> {
+
+        // uses a ValueState to store the count of each word
+        private final ValueStateDeclaration<Integer> countStateDeclaration =
+                StateDeclarations.valueState("count", TypeDescriptors.INT);
+
+        @Override
+        public Set<StateDeclaration> usesStates() {
+            // declare a ValueState to store the count of each word
+            return Set.of(countStateDeclaration);
+        }
+
+        @Override
+        public void processRecord(
+                Tuple2<String, Integer> record,
+                Collector<Tuple2<String, Integer>> output,
+                PartitionedContext<Tuple2<String, Integer>> ctx)
+                throws Exception {
+            // calculate the new count of the word
+            String word = record.f0;
+            Integer count = record.f1;
+            Integer previousCount =
+                    
ctx.getStateManager().getState(countStateDeclaration).get().value();

Review Comment:
   `orElseThrow()` is preferred over `get()`.



##########
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/dsv2/eventtime/StatisticNewsClickNumber.java:
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.dsv2.eventtime;
+
+import org.apache.flink.api.common.serialization.SimpleStringEncoder;
+import org.apache.flink.api.connector.dsv2.DataStreamV2SourceUtils;
+import org.apache.flink.api.connector.dsv2.WrappedSink;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.datastream.api.ExecutionEnvironment;
+import org.apache.flink.datastream.api.common.Collector;
+import org.apache.flink.datastream.api.context.PartitionedContext;
+import org.apache.flink.datastream.api.extension.eventtime.EventTimeExtension;
+import 
org.apache.flink.datastream.api.extension.eventtime.function.OneInputEventTimeStreamProcessFunction;
+import 
org.apache.flink.datastream.api.extension.eventtime.timer.EventTimeManager;
+import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream;
+import org.apache.flink.streaming.api.functions.sink.PrintSink;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
+import 
org.apache.flink.streaming.examples.dsv2.eventtime.util.StatisticNewsClickNumberData;
+import org.apache.flink.util.ParameterTool;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This example illustrates how to count the number of clicks on each news at 
1 minute, 5 minutes,
+ * 10 minutes, and 1 hour after news publication.
+ *
+ * <p>The input consists of a series of {@link NewsEvent}s, which fall into 
two categories: news
+ * releases and news clicks. Each {@link NewsEvent} contains three components: 
the event type, the
+ * news ID and the timestamp. Notably, there is only one event of type {@link 
NewsEventType#RELEASE}
+ * for each news.
+ *
+ * <p>Usage:
+ *
+ * <ul>
+ *   <li><code>--output &lt;path&gt;</code>The output directory where the Job 
will write the
+ *       results. If no output path is provided, the Job will print the 
results to <code>stdout
+ *       </code>.
+ * </ul>
+ *
+ * <p>This example shows how to:
+ *
+ * <ul>
+ *   <li>Usage of Event Time extension in DataStream API V2
+ * </ul>
+ *
+ * <p>Please note that if you intend to run this example in an IDE, you must 
first add the following
+ * VM options: "--add-opens=java.base/java.util=ALL-UNNAMED". This is 
necessary because the module
+ * system in JDK 17+ restricts some reflection operations.
+ *
+ * <p>Please note that the DataStream API V2 is a new set of APIs, to 
gradually replace the original
+ * DataStream API. It is currently in the experimental stage and is not fully 
available for
+ * production.
+ */
+public class StatisticNewsClickNumber {
+
+    // we will count the number of clicks within 1min/5min/10min/30min/1hour 
after the news release
+    private static final Duration[] TIMES_AFTER_NEWS_RELEASE =
+            new Duration[] {
+                Duration.ofMinutes(1),
+                Duration.ofMinutes(5),
+                Duration.ofMinutes(10),
+                Duration.ofMinutes(30),
+                Duration.ofHours(1)
+            };
+
+    /**
+     * The type of {@link NewsEvent}, note that only one event of type {@link 
NewsEventType#RELEASE}
+     * for each news.
+     */
+    public enum NewsEventType {
+        RELEASE,
+        CLICK
+    }
+
+    /**
+     * The {@link NewsEvent} represents an event on news, containing the event 
type, news id and the
+     * timestamp.
+     */
+    public static class NewsEvent {
+        public NewsEventType type;
+        public long newsId;
+        public long timestamp;
+
+        public NewsEvent(NewsEventType type, long newsId, long timestamp) {
+            this.type = type;
+            this.newsId = newsId;
+            this.timestamp = timestamp;
+        }
+    }
+
+    /**
+     * The {@link NewsClickNumber} represents the number of clicks on news 
within a specified
+     * duration following its release. For example, 
NewsClickNumber{newsId="12345678",
+     * timeAfterRelease=60000, clickNumber=132} indicates that the news 
"12345678" has been clicked
+     * 132 times within 60,000 milliseconds after its release.
+     */
+    public static class NewsClickNumber {
+        public long newsId;
+        public long timeAfterRelease;
+        public long clickNumber;
+
+        public NewsClickNumber(long newsId, long timeAfterRelease, long 
clickCount) {
+            this.newsId = newsId;
+            this.timeAfterRelease = timeAfterRelease;
+            this.clickNumber = clickCount;
+        }
+
+        public String toString() {
+            return String.format(
+                    "(%d,%d,%d)", this.newsId, this.timeAfterRelease, 
this.clickNumber);
+        }
+    }
+
+    public static void main(String[] args) throws Exception {
+        // parse the parameters
+        final ParameterTool params = ParameterTool.fromArgs(args);
+        final boolean fileOutput = params.has("output");
+
+        // obtain execution environment
+        ExecutionEnvironment env = ExecutionEnvironment.getInstance();
+
+        // the input consists of a series of {code NewsEvent}s, which include 
two types: news
+        // release event and news click event.
+        NonKeyedPartitionStream<NewsEvent> source =
+                env.fromSource(
+                        DataStreamV2SourceUtils.fromData(
+                                
Arrays.asList(StatisticNewsClickNumberData.NEWS_EVENTS)),
+                        "news event source");
+
+        NonKeyedPartitionStream<NewsClickNumber> clickNumberStream =
+                // extract event time and generate the event time watermark
+                source.process(
+                                // the timestamp field of the input is 
considered to be the
+                                // event time
+                                
EventTimeExtension.<NewsEvent>newWatermarkGeneratorBuilder(
+                                                event -> event.timestamp)
+                                        // generate event time watermarks 
every 200ms
+                                        
.periodicWatermark(Duration.ofMillis(200))
+                                        // if the input is idle for more than 
30 seconds, it
+                                        // is ignored during the event time 
watermark
+                                        // combination process
+                                        .withIdleness(Duration.ofSeconds(30))
+                                        // set the maximum out-of-order time 
of the event to
+                                        // 30 seconds, meaning that if an 
event is received
+                                        // at 12:00:00, then no further events 
should be
+                                        // received earlier than 11:59:30
+                                        
.withMaxOutOfOrderTime(Duration.ofSeconds(10))
+                                        // build the event time watermark 
generator as
+                                        // ProcessFunction
+                                        .buildAsProcessFunction())
+                        // key by the news id
+                        .keyBy(event -> event.newsId)
+                        // count the click number of each news
+                        .process(
+                                EventTimeExtension.wrapProcessFunction(
+                                        new 
CountNewsClickNumberProcessFunction()));
+
+        if (fileOutput) {
+            // write results to file
+            clickNumberStream
+                    .toSink(
+                            new WrappedSink<>(
+                                    FileSink.<NewsClickNumber>forRowFormat(
+                                                    new 
Path(params.get("output")),
+                                                    new 
SimpleStringEncoder<>())
+                                            .withRollingPolicy(
+                                                    
DefaultRollingPolicy.builder()
+                                                            .withMaxPartSize(
+                                                                    
MemorySize.ofMebiBytes(1))
+                                                            
.withRolloverInterval(
+                                                                    
Duration.ofSeconds(10))
+                                                            .build())
+                                            .build()))
+                    .withName("output");
+        } else {
+            // Print the results to the STDOUT.
+            clickNumberStream.toSink(new WrappedSink<>(new 
PrintSink<>())).withName("print-sink");
+        }
+
+        env.execute("StatisticNewsClickNumberExample");
+    }
+
+    /**
+     * This process function will consume {@link NewsEvent} and count the 
number of clicks within 1
+     * minute, 5 minutes, 10 minutes, 30 minutes and 1 hour of the news 
releasing and send the
+     * results {@link NewsClickNumber} to the output.
+     *
+     * <p>To achieve the goal, we will register a series of timers for the 
news, which will be
+     * triggered at the time of the news's release time + 1 minute/5 
minutes/10 minutes/30 minutes/1
+     * hour, and record a list of the click times of each news. In the timer 
callback {@code
+     * onEventTimer}, we will count the number of clicks between the news 
release time and the timer
+     * trigger timer and send the result to the output.
+     */
+    public static class CountNewsClickNumberProcessFunction
+            implements OneInputEventTimeStreamProcessFunction<NewsEvent, 
NewsClickNumber> {
+
+        private EventTimeManager eventTimeManager;
+
+        // news id to release time
+        private final Map<Long, Long> releaseTimeOfNews = new HashMap<>();
+
+        // news id to click time list
+        private final Map<Long, List<Long>> clickTimeListOfNews = new 
HashMap<>();
+
+        @Override
+        public void initEventTimeProcessFunction(EventTimeManager 
eventTimeManager) {
+            this.eventTimeManager = eventTimeManager;
+        }
+
+        @Override
+        public void processRecord(
+                NewsEvent record,
+                Collector<NewsClickNumber> output,
+                PartitionedContext<NewsClickNumber> ctx)
+                throws Exception {
+            if (record.type == NewsEventType.RELEASE) {
+                // for the news release event, record the release time and 
register timers
+                long releaseTime = record.timestamp;
+                releaseTimeOfNews.put(record.newsId, releaseTime);
+                for (Duration targetTime : TIMES_AFTER_NEWS_RELEASE) {
+                    eventTimeManager.registerTimer(releaseTime + 
targetTime.toMillis());
+                }
+            } else {
+                // for the news click event, record the click time
+                clickTimeListOfNews
+                        .computeIfAbsent(record.newsId, k -> new ArrayList<>())
+                        .add(record.timestamp);
+            }
+        }
+
+        @Override
+        public void onEventTimer(
+                long timestamp,
+                Collector<NewsClickNumber> output,
+                PartitionedContext<NewsClickNumber> ctx) {
+            // get the news that the current event timer belongs to
+            long newsId = ctx.getStateManager().getCurrentKey();
+
+            // calculate the difference between the current time and the news 
release time
+            Duration diffTime = Duration.ofMillis(timestamp - 
releaseTimeOfNews.get(newsId));

Review Comment:
   Maybe for simplicity, we just need one counting period.
   1. We maintain a counter for each news id. When receiving new clicks, we 
check whether it falls into the desired time period, and increase the counter 
only if it does.
   2. When the event timer triggers, we output the counter and clear the states.
   3. We should also consider the case where publishing event is received after 
clicking events. In that case, we need to cache the early arrived clicking 
events somewhere, and handle and clear them once the publishing event is 
received.
   4. We should also consider the case where publishing event is never 
received, and have a timer to check and clear the cached events.



##########
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/dsv2/eventtime/StatisticNewsClickNumber.java:
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.dsv2.eventtime;
+
+import org.apache.flink.api.common.serialization.SimpleStringEncoder;
+import org.apache.flink.api.connector.dsv2.DataStreamV2SourceUtils;
+import org.apache.flink.api.connector.dsv2.WrappedSink;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.datastream.api.ExecutionEnvironment;
+import org.apache.flink.datastream.api.common.Collector;
+import org.apache.flink.datastream.api.context.PartitionedContext;
+import org.apache.flink.datastream.api.extension.eventtime.EventTimeExtension;
+import 
org.apache.flink.datastream.api.extension.eventtime.function.OneInputEventTimeStreamProcessFunction;
+import 
org.apache.flink.datastream.api.extension.eventtime.timer.EventTimeManager;
+import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream;
+import org.apache.flink.streaming.api.functions.sink.PrintSink;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
+import 
org.apache.flink.streaming.examples.dsv2.eventtime.util.StatisticNewsClickNumberData;
+import org.apache.flink.util.ParameterTool;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This example illustrates how to count the number of clicks on each news at 
1 minute, 5 minutes,
+ * 10 minutes, and 1 hour after news publication.
+ *
+ * <p>The input consists of a series of {@link NewsEvent}s, which fall into 
two categories: news
+ * releases and news clicks. Each {@link NewsEvent} contains three components: 
the event type, the
+ * news ID and the timestamp. Notably, there is only one event of type {@link 
NewsEventType#RELEASE}
+ * for each news.
+ *
+ * <p>Usage:
+ *
+ * <ul>
+ *   <li><code>--output &lt;path&gt;</code>The output directory where the Job 
will write the
+ *       results. If no output path is provided, the Job will print the 
results to <code>stdout
+ *       </code>.
+ * </ul>
+ *
+ * <p>This example shows how to:
+ *
+ * <ul>
+ *   <li>Usage of Event Time extension in DataStream API V2
+ * </ul>
+ *
+ * <p>Please note that if you intend to run this example in an IDE, you must 
first add the following
+ * VM options: "--add-opens=java.base/java.util=ALL-UNNAMED". This is 
necessary because the module
+ * system in JDK 17+ restricts some reflection operations.
+ *
+ * <p>Please note that the DataStream API V2 is a new set of APIs, to 
gradually replace the original
+ * DataStream API. It is currently in the experimental stage and is not fully 
available for
+ * production.
+ */
+public class StatisticNewsClickNumber {
+
+    // we will count the number of clicks within 1min/5min/10min/30min/1hour 
after the news release
+    private static final Duration[] TIMES_AFTER_NEWS_RELEASE =
+            new Duration[] {
+                Duration.ofMinutes(1),
+                Duration.ofMinutes(5),
+                Duration.ofMinutes(10),
+                Duration.ofMinutes(30),
+                Duration.ofHours(1)
+            };
+
+    /**
+     * The type of {@link NewsEvent}, note that only one event of type {@link 
NewsEventType#RELEASE}
+     * for each news.
+     */
+    public enum NewsEventType {
+        RELEASE,
+        CLICK
+    }
+
+    /**
+     * The {@link NewsEvent} represents an event on news, containing the event 
type, news id and the
+     * timestamp.
+     */
+    public static class NewsEvent {
+        public NewsEventType type;
+        public long newsId;
+        public long timestamp;
+
+        public NewsEvent(NewsEventType type, long newsId, long timestamp) {
+            this.type = type;
+            this.newsId = newsId;
+            this.timestamp = timestamp;
+        }
+    }
+
+    /**
+     * The {@link NewsClickNumber} represents the number of clicks on news 
within a specified
+     * duration following its release. For example, 
NewsClickNumber{newsId="12345678",
+     * timeAfterRelease=60000, clickNumber=132} indicates that the news 
"12345678" has been clicked
+     * 132 times within 60,000 milliseconds after its release.
+     */
+    public static class NewsClickNumber {
+        public long newsId;
+        public long timeAfterRelease;
+        public long clickNumber;
+
+        public NewsClickNumber(long newsId, long timeAfterRelease, long 
clickCount) {
+            this.newsId = newsId;
+            this.timeAfterRelease = timeAfterRelease;
+            this.clickNumber = clickCount;
+        }
+
+        public String toString() {
+            return String.format(
+                    "(%d,%d,%d)", this.newsId, this.timeAfterRelease, 
this.clickNumber);
+        }
+    }
+
+    public static void main(String[] args) throws Exception {
+        // parse the parameters
+        final ParameterTool params = ParameterTool.fromArgs(args);
+        final boolean fileOutput = params.has("output");
+
+        // obtain execution environment
+        ExecutionEnvironment env = ExecutionEnvironment.getInstance();
+
+        // the input consists of a series of {code NewsEvent}s, which include 
two types: news
+        // release event and news click event.
+        NonKeyedPartitionStream<NewsEvent> source =
+                env.fromSource(
+                        DataStreamV2SourceUtils.fromData(
+                                
Arrays.asList(StatisticNewsClickNumberData.NEWS_EVENTS)),
+                        "news event source");
+
+        NonKeyedPartitionStream<NewsClickNumber> clickNumberStream =
+                // extract event time and generate the event time watermark
+                source.process(
+                                // the timestamp field of the input is 
considered to be the
+                                // event time
+                                
EventTimeExtension.<NewsEvent>newWatermarkGeneratorBuilder(
+                                                event -> event.timestamp)
+                                        // generate event time watermarks 
every 200ms
+                                        
.periodicWatermark(Duration.ofMillis(200))
+                                        // if the input is idle for more than 
30 seconds, it
+                                        // is ignored during the event time 
watermark
+                                        // combination process
+                                        .withIdleness(Duration.ofSeconds(30))
+                                        // set the maximum out-of-order time 
of the event to
+                                        // 30 seconds, meaning that if an 
event is received
+                                        // at 12:00:00, then no further events 
should be
+                                        // received earlier than 11:59:30
+                                        
.withMaxOutOfOrderTime(Duration.ofSeconds(10))
+                                        // build the event time watermark 
generator as
+                                        // ProcessFunction
+                                        .buildAsProcessFunction())
+                        // key by the news id
+                        .keyBy(event -> event.newsId)
+                        // count the click number of each news
+                        .process(
+                                EventTimeExtension.wrapProcessFunction(
+                                        new 
CountNewsClickNumberProcessFunction()));
+
+        if (fileOutput) {
+            // write results to file
+            clickNumberStream
+                    .toSink(
+                            new WrappedSink<>(
+                                    FileSink.<NewsClickNumber>forRowFormat(
+                                                    new 
Path(params.get("output")),
+                                                    new 
SimpleStringEncoder<>())
+                                            .withRollingPolicy(
+                                                    
DefaultRollingPolicy.builder()
+                                                            .withMaxPartSize(
+                                                                    
MemorySize.ofMebiBytes(1))
+                                                            
.withRolloverInterval(
+                                                                    
Duration.ofSeconds(10))
+                                                            .build())
+                                            .build()))
+                    .withName("output");
+        } else {
+            // Print the results to the STDOUT.
+            clickNumberStream.toSink(new WrappedSink<>(new 
PrintSink<>())).withName("print-sink");
+        }
+
+        env.execute("StatisticNewsClickNumberExample");
+    }
+
+    /**
+     * This process function will consume {@link NewsEvent} and count the 
number of clicks within 1
+     * minute, 5 minutes, 10 minutes, 30 minutes and 1 hour of the news 
releasing and send the
+     * results {@link NewsClickNumber} to the output.
+     *
+     * <p>To achieve the goal, we will register a series of timers for the 
news, which will be
+     * triggered at the time of the news's release time + 1 minute/5 
minutes/10 minutes/30 minutes/1
+     * hour, and record a list of the click times of each news. In the timer 
callback {@code
+     * onEventTimer}, we will count the number of clicks between the news 
release time and the timer
+     * trigger timer and send the result to the output.
+     */
+    public static class CountNewsClickNumberProcessFunction
+            implements OneInputEventTimeStreamProcessFunction<NewsEvent, 
NewsClickNumber> {
+
+        private EventTimeManager eventTimeManager;
+
+        // news id to release time
+        private final Map<Long, Long> releaseTimeOfNews = new HashMap<>();
+
+        // news id to click time list
+        private final Map<Long, List<Long>> clickTimeListOfNews = new 
HashMap<>();
+
+        @Override
+        public void initEventTimeProcessFunction(EventTimeManager 
eventTimeManager) {
+            this.eventTimeManager = eventTimeManager;
+        }
+
+        @Override
+        public void processRecord(
+                NewsEvent record,
+                Collector<NewsClickNumber> output,
+                PartitionedContext<NewsClickNumber> ctx)
+                throws Exception {
+            if (record.type == NewsEventType.RELEASE) {
+                // for the news release event, record the release time and 
register timers
+                long releaseTime = record.timestamp;
+                releaseTimeOfNews.put(record.newsId, releaseTime);
+                for (Duration targetTime : TIMES_AFTER_NEWS_RELEASE) {
+                    eventTimeManager.registerTimer(releaseTime + 
targetTime.toMillis());
+                }
+            } else {
+                // for the news click event, record the click time
+                clickTimeListOfNews
+                        .computeIfAbsent(record.newsId, k -> new ArrayList<>())
+                        .add(record.timestamp);
+            }
+        }
+
+        @Override
+        public void onEventTimer(
+                long timestamp,
+                Collector<NewsClickNumber> output,
+                PartitionedContext<NewsClickNumber> ctx) {
+            // get the news that the current event timer belongs to
+            long newsId = ctx.getStateManager().getCurrentKey();
+
+            // calculate the difference between the current time and the news 
release time
+            Duration diffTime = Duration.ofMillis(timestamp - 
releaseTimeOfNews.get(newsId));

Review Comment:
   Need normalization.



##########
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/dsv2/wordcount/WordCount.java:
##########
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.dsv2.wordcount;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.common.serialization.SimpleStringEncoder;
+import org.apache.flink.api.common.state.StateDeclaration;
+import org.apache.flink.api.common.state.StateDeclarations;
+import org.apache.flink.api.common.state.ValueStateDeclaration;
+import org.apache.flink.api.common.typeinfo.TypeDescriptors;
+import org.apache.flink.api.connector.dsv2.DataStreamV2SourceUtils;
+import org.apache.flink.api.connector.dsv2.WrappedSink;
+import org.apache.flink.api.connector.dsv2.WrappedSource;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.connector.file.src.FileSource;
+import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
+import org.apache.flink.datastream.api.ExecutionEnvironment;
+import org.apache.flink.datastream.api.common.Collector;
+import org.apache.flink.datastream.api.context.PartitionedContext;
+import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction;
+import org.apache.flink.datastream.api.stream.KeyedPartitionStream;
+import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream;
+import org.apache.flink.streaming.api.functions.sink.PrintSink;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
+import org.apache.flink.streaming.examples.wordcount.util.CLI;
+import org.apache.flink.streaming.examples.wordcount.util.WordCountData;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Set;
+
+/**
+ * Implements the "WordCount" program by DataStream API V2 that computes a 
simple word occurrence
+ * histogram over text files. This Job can be executed in both streaming and 
batch execution modes.
+ *
+ * <p>The input is a [list of] plain text file[s] with lines separated by a 
newline character.
+ *
+ * <p>Usage:
+ *
+ * <ul>
+ *   <li><code>--input &lt;path&gt;</code>A list of input files and / or 
directories to read. If no
+ *       input is provided, the program is run with default data from {@link 
WordCountData}.
+ *   <li><code>--discovery-interval &lt;duration&gt;</code>Turns the file 
reader into a continuous
+ *       source that will monitor the provided input directories every 
interval and read any new
+ *       files.
+ *   <li><code>--output &lt;path&gt;</code>The output directory where the Job 
will write the
+ *       results. If no output path is provided, the Job will print the 
results to <code>stdout
+ *       </code>.
+ *   <li><code>--execution-mode &lt;mode&gt;</code>The execution mode (BATCH, 
STREAMING, or
+ *       AUTOMATIC) of this pipeline.
+ * </ul>
+ *
+ * <p>This example shows how to:
+ *
+ * <ul>
+ *   <li>Write a simple Flink program by DataStream API V2
+ *   <li>Use tuple data types
+ *   <li>Write and use a user-defined process function
+ * </ul>
+ *
+ * <p>Please note that if you intend to run this example in an IDE, you must 
first add the following
+ * VM options: "--add-opens=java.base/java.util=ALL-UNNAMED". This is 
necessary because the module
+ * system in JDK 17+ restricts some reflection operations.
+ *
+ * <p>Please note that the DataStream API V2 is a new set of APIs, to 
gradually replace the original
+ * DataStream API. It is currently in the experimental stage and is not fully 
available for
+ * production.
+ */
+public class WordCount {
+
+    // 
*************************************************************************
+    // PROGRAM
+    // 
*************************************************************************
+
+    public static void main(String[] args) throws Exception {
+        final CLI params = CLI.fromArgs(args);
+
+        // Get the execution environment instance. This is the main entrypoint
+        // to building a Flink application.
+        final ExecutionEnvironment env = ExecutionEnvironment.getInstance();
+
+        // Apache Flink’s unified approach to stream and batch processing 
means that a DataStream
+        // application executed over bounded input will produce the same final 
results regardless
+        // of the configured execution mode. It is important to note what 
final means here: a job
+        // executing in STREAMING mode might produce incremental updates 
(think upserts in
+        // a database) while in BATCH mode, it would only produce one final 
result at the end. The
+        // final result will be the same if interpreted correctly, but getting 
there can be
+        // different.
+        //
+        // The “classic” execution behavior of the DataStream API is called 
STREAMING execution
+        // mode. Applications should use streaming execution for unbounded 
jobs that require
+        // continuous incremental processing and are expected to stay online 
indefinitely.
+        //
+        // By enabling BATCH execution, we allow Flink to apply additional 
optimizations that we
+        // can only do when we know that our input is bounded. For example, 
different
+        // join/aggregation strategies can be used, in addition to a different 
shuffle
+        // implementation that allows more efficient task scheduling and 
failure recovery behavior.
+        //
+        // By setting the runtime mode to AUTOMATIC, Flink will choose BATCH 
if all sources
+        // are bounded and otherwise STREAMING.
+        env.setExecutionMode(params.getExecutionMode());
+
+        NonKeyedPartitionStream<String> text;
+
+        if (params.getInputs().isPresent()) {
+            // Create a new file source that will read files from a given set 
of directories.
+            // Each file will be processed as plain text and split based on 
newlines.
+            FileSource.FileSourceBuilder<String> builder =
+                    FileSource.forRecordStreamFormat(
+                            new TextLineInputFormat(), 
params.getInputs().get());
+
+            // If a discovery interval is provided, the source will
+            // continuously watch the given directories for new files.
+            
params.getDiscoveryInterval().ifPresent(builder::monitorContinuously);
+
+            text = env.fromSource(new WrappedSource<>(builder.build()), 
"file-input");
+        } else {
+            // Create a new from data source with default data {@code 
WordCountData}.
+            text =
+                    env.fromSource(
+                            
DataStreamV2SourceUtils.fromData(Arrays.asList(WordCountData.WORDS)),
+                            "in-memory-input");
+        }
+
+        KeyedPartitionStream<String, Tuple2<String, Integer>> keyedStream =
+                // The text lines read from the source are split into words
+                // using a user-defined process function. The tokenizer, 
implemented below,
+                // will output each word as a (2-tuple) containing (word, 1)
+                text.process(new Tokenizer())
+                        .withName("tokenizer")
+                        // keyBy groups tuples based on the first field, the 
word.
+                        // Using a keyBy allows performing aggregations and 
other
+                        // stateful transformations over data on a per-key 
basis.
+                        // This is similar to a GROUP BY clause in a SQL query.
+                        .keyBy(value -> value.f0);
+
+        // For each key, we perform a simple sum of the second field, the 
count by user-defined
+        // StreamingCounter/BatchCounter process function.
+        // If the execution mode is streaming, it will continuously output 
updates each time it sees
+        // a new instance of each word in the stream.
+        // If the execution mode is batch, sum will output a final count for
+        // each word.
+        // Flink will provide more built-in functions, such as aggregation, in 
the future. This will
+        // allow users to avoid having to write their own StreamingCounter or 
BatchCounter process
+        // functions in this application.
+        NonKeyedPartitionStream<Tuple2<String, Integer>> counts;
+        if (params.getExecutionMode() == RuntimeExecutionMode.STREAMING) {
+            counts = keyedStream.process(new StreamingCounter());
+        } else {
+            counts = keyedStream.process(new BatchCounter());
+        }

Review Comment:
   And we should comment that, once FLIP-466 is completed, such differences can 
be removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to