reswqa commented on code in PR #26118:
URL: https://github.com/apache/flink/pull/26118#discussion_r1947422299


##########
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/dsv2/watermark/SyncOffsetByWatermark.java:
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.dsv2.watermark;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.watermark.LongWatermark;
+import org.apache.flink.api.common.watermark.LongWatermarkDeclaration;
+import org.apache.flink.api.common.watermark.Watermark;
+import org.apache.flink.api.common.watermark.WatermarkDeclaration;
+import org.apache.flink.api.common.watermark.WatermarkDeclarations;
+import org.apache.flink.api.common.watermark.WatermarkHandlingResult;
+import org.apache.flink.api.connector.dsv2.WrappedSource;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.connector.datagen.source.DataGeneratorSource;
+import org.apache.flink.connector.datagen.source.GeneratorFunction;
+import org.apache.flink.datastream.api.ExecutionEnvironment;
+import org.apache.flink.datastream.api.common.Collector;
+import org.apache.flink.datastream.api.context.NonPartitionedContext;
+import org.apache.flink.datastream.api.context.PartitionedContext;
+import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction;
+import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream;
+
+import java.util.Set;
+
+/**
+ * The example simulates Flink reading a message queue. Each partition of the 
topic has its own
+ * consumption offset, and the example shows how to synchronise these offsets.
+ *
+ * <p>The example uses a {@link DataGeneratorSource} to simulate a message 
queue and a custom {@link
+ * Watermark} to propagate the offset in the stream.
+ *
+ * <p>This example shows how to:
+ *
+ * <ul>
+ *   <li>How to define and propagate custom event (Watermark) in DataStream 
API V2
+ * </ul>
+ *
+ * <p>Please note that if you intend to run this example in an IDE, you must 
first add the following
+ * VM options: "--add-opens=java.base/java.util=ALL-UNNAMED". This is 
necessary because the module
+ * system in JDK 17+ restricts some reflection operations.
+ *
+ * <p>Please note that the DataStream API V2 is a new set of APIs, to 
gradually replace the original
+ * DataStream API. It is currently in the experimental stage and is not fully 
available for
+ * production.
+ */
+public class SyncOffsetByWatermark {
+
+    // 
*************************************************************************
+    // PROGRAM
+    // 
*************************************************************************
+
+    /**
+     * The {@link Message} represents the message in the message queue. Each 
message has a topic,
+     * partition, offset and associated data.
+     */
+    public static class Message {
+        public String topic;
+        public long partition;
+        public long offset;
+        public String data;
+
+        public Message(String topic, long partition, long offset, String data) 
{
+            this.topic = topic;
+            this.partition = partition;
+            this.offset = offset;
+            this.data = data;
+        }
+    }
+
+    /**
+     * Firstly, we define an offset watermark, which represents the currently 
consumed offset of the
+     * partition. Since the watermark needs to convey the offset, its data 
type is long. To
+     * determine the minimum offset across all partitions, we utilize the 
combineFunctionMin()
+     * method to combine the watermarks. The default handling strategy is 
forward, meaning that the
+     * watermark will typically be advanced to downstream operators in most 
scenarios. Thus, we
+     * create a WatermarkDeclaration instance that can be used to declare and 
generate the
+     * watermark.
+     */
+    public static final LongWatermarkDeclaration OFFSET_WATERMARK_DECLARATION =
+            WatermarkDeclarations.newBuilder("OFFSET_WATERMARK")
+                    .typeLong()
+                    .combineFunctionMin()
+                    .combineWaitForAllChannels(true)
+                    .defaultHandlingStrategyForward()
+                    .build();
+
+    public static void main(String[] args) throws Exception {
+        // obtain execution environment
+        ExecutionEnvironment env = ExecutionEnvironment.getInstance();
+
+        // Create a source that simulates a message queue with a parallelism 
set to 5, indicating
+        // that the consumed topic has 5 partitions.
+        // The source will declare and generate offset watermarks.
+        NonKeyedPartitionStream<Message> source =
+                env.fromSource(
+                                new WrappedSource<>(new MessageQueueSource()),
+                                "message queue source")
+                        .withParallelism(5);

Review Comment:
   introduce a param to control parallelism.



##########
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/dsv2/watermark/SyncOffsetByWatermark.java:
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.dsv2.watermark;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.watermark.LongWatermark;
+import org.apache.flink.api.common.watermark.LongWatermarkDeclaration;
+import org.apache.flink.api.common.watermark.Watermark;
+import org.apache.flink.api.common.watermark.WatermarkDeclaration;
+import org.apache.flink.api.common.watermark.WatermarkDeclarations;
+import org.apache.flink.api.common.watermark.WatermarkHandlingResult;
+import org.apache.flink.api.connector.dsv2.WrappedSource;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.connector.datagen.source.DataGeneratorSource;
+import org.apache.flink.connector.datagen.source.GeneratorFunction;
+import org.apache.flink.datastream.api.ExecutionEnvironment;
+import org.apache.flink.datastream.api.common.Collector;
+import org.apache.flink.datastream.api.context.NonPartitionedContext;
+import org.apache.flink.datastream.api.context.PartitionedContext;
+import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction;
+import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream;
+
+import java.util.Set;
+
+/**
+ * The example simulates Flink reading a message queue. Each partition of the 
topic has its own
+ * consumption offset, and the example shows how to synchronise these offsets.
+ *
+ * <p>The example uses a {@link DataGeneratorSource} to simulate a message 
queue and a custom {@link
+ * Watermark} to propagate the offset in the stream.
+ *
+ * <p>This example shows how to:
+ *
+ * <ul>
+ *   <li>How to define and propagate custom event (Watermark) in DataStream 
API V2
+ * </ul>
+ *
+ * <p>Please note that if you intend to run this example in an IDE, you must 
first add the following
+ * VM options: "--add-opens=java.base/java.util=ALL-UNNAMED". This is 
necessary because the module
+ * system in JDK 17+ restricts some reflection operations.
+ *
+ * <p>Please note that the DataStream API V2 is a new set of APIs, to 
gradually replace the original
+ * DataStream API. It is currently in the experimental stage and is not fully 
available for
+ * production.
+ */
+public class SyncOffsetByWatermark {
+
+    // 
*************************************************************************
+    // PROGRAM
+    // 
*************************************************************************
+
+    /**
+     * The {@link Message} represents the message in the message queue. Each 
message has a topic,
+     * partition, offset and associated data.
+     */
+    public static class Message {
+        public String topic;
+        public long partition;
+        public long offset;
+        public String data;
+
+        public Message(String topic, long partition, long offset, String data) 
{
+            this.topic = topic;
+            this.partition = partition;
+            this.offset = offset;
+            this.data = data;
+        }
+    }
+
+    /**
+     * Firstly, we define an offset watermark, which represents the currently 
consumed offset of the
+     * partition. Since the watermark needs to convey the offset, its data 
type is long. To
+     * determine the minimum offset across all partitions, we utilize the 
combineFunctionMin()
+     * method to combine the watermarks. The default handling strategy is 
forward, meaning that the
+     * watermark will typically be advanced to downstream operators in most 
scenarios. Thus, we
+     * create a WatermarkDeclaration instance that can be used to declare and 
generate the
+     * watermark.
+     */
+    public static final LongWatermarkDeclaration OFFSET_WATERMARK_DECLARATION =
+            WatermarkDeclarations.newBuilder("OFFSET_WATERMARK")
+                    .typeLong()
+                    .combineFunctionMin()
+                    .combineWaitForAllChannels(true)
+                    .defaultHandlingStrategyForward()
+                    .build();
+
+    public static void main(String[] args) throws Exception {
+        // obtain execution environment
+        ExecutionEnvironment env = ExecutionEnvironment.getInstance();
+
+        // Create a source that simulates a message queue with a parallelism 
set to 5, indicating
+        // that the consumed topic has 5 partitions.
+        // The source will declare and generate offset watermarks.
+        NonKeyedPartitionStream<Message> source =
+                env.fromSource(
+                                new WrappedSource<>(new MessageQueueSource()),
+                                "message queue source")
+                        .withParallelism(5);
+
+        source.process(
+                // handle offset watermark in downstream
+                new OneInputStreamProcessFunction<>() {
+                    @Override
+                    public void processRecord(
+                            Message record,
+                            Collector<Object> output,
+                            PartitionedContext<Object> ctx)
+                            throws Exception {
+                        // do something
+                    }
+
+                    @Override
+                    public WatermarkHandlingResult onWatermark(
+                            Watermark watermark,
+                            Collector<Object> output,
+                            NonPartitionedContext<Object> ctx) {
+                        if (watermark
+                                .getIdentifier()
+                                
.equals(OFFSET_WATERMARK_DECLARATION.getIdentifier())) {
+                            // For our custom offset watermark, we can obtain 
the combined offset
+                            // here and take further action with it.
+                            long syncedOffset = ((LongWatermark) 
watermark).getValue();
+                            System.out.println(
+                                    "All partitions have been consumed up to 
the "
+                                            + syncedOffset
+                                            + " offset.");
+                            return WatermarkHandlingResult.PEEK;
+                        }
+
+                        return WatermarkHandlingResult.PEEK;
+                    }
+                });
+
+        // execute program
+        env.execute("SyncOffsetByWatermark Example");
+    }
+
+    private static class MessageQueueSource extends 
DataGeneratorSource<Message> {

Review Comment:
   doc



##########
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/dsv2/windowing/CountProductSalesWindowing.java:
##########
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.dsv2.windowing;
+
+import org.apache.flink.api.common.serialization.SimpleStringEncoder;
+import org.apache.flink.api.common.state.StateDeclaration;
+import org.apache.flink.api.common.state.StateDeclarations;
+import org.apache.flink.api.common.state.ValueStateDeclaration;
+import org.apache.flink.api.common.state.v2.ValueState;
+import org.apache.flink.api.common.typeinfo.TypeDescriptors;
+import org.apache.flink.api.connector.dsv2.DataStreamV2SourceUtils;
+import org.apache.flink.api.connector.dsv2.WrappedSink;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.datastream.api.ExecutionEnvironment;
+import org.apache.flink.datastream.api.builtin.BuiltinFuncs;
+import org.apache.flink.datastream.api.common.Collector;
+import org.apache.flink.datastream.api.context.PartitionedContext;
+import org.apache.flink.datastream.api.extension.eventtime.EventTimeExtension;
+import 
org.apache.flink.datastream.api.extension.window.context.OneInputWindowContext;
+import 
org.apache.flink.datastream.api.extension.window.function.OneInputWindowStreamProcessFunction;
+import 
org.apache.flink.datastream.api.extension.window.strategy.WindowStrategy;
+import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream;
+import org.apache.flink.streaming.api.functions.sink.PrintSink;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
+import 
org.apache.flink.streaming.examples.dsv2.windowing.util.CountProductSalesWindowingData;
+import org.apache.flink.util.ParameterTool;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Set;
+
+/**
+ * Example illustrating how to use Window to count the sales of each product 
in each hour by
+ * DataStream API V2.
+ *
+ * <p>The input is a [list of] order, each order is a tuple of productId and 
orderTime.
+ *
+ * <p>Usage:
+ *
+ * <ul>
+ *   <li><code>--output &lt;path&gt;</code>The output directory where the Job 
will write the
+ *       results. If no output path is provided, the Job will print the 
results to <code>stdout
+ *       </code>.
+ * </ul>
+ *
+ * <p>This example shows how to:
+ *
+ * <ul>
+ *   <li>Usage of Window extension in DataStream API V2
+ * </ul>
+ *
+ * <p>Please note that if you intend to run this example in an IDE, you must 
first add the following
+ * VM options: "--add-opens=java.base/java.util=ALL-UNNAMED". This is 
necessary because the module
+ * system in JDK 17+ restricts some reflection operations.
+ *
+ * <p>Please note that the DataStream API V2 is a new set of APIs, to 
gradually replace the original
+ * DataStream API. It is currently in the experimental stage and is not fully 
available for
+ * production.
+ */
+public class CountProductSalesWindowing {
+
+    public static void main(String[] args) throws Exception {
+        // parse the parameters
+        final ParameterTool params = ParameterTool.fromArgs(args);
+        final boolean fileOutput = params.has("output");
+
+        // Get the execution environment instance. This is the main entrypoint
+        // to building a Flink application.
+        final ExecutionEnvironment env = ExecutionEnvironment.getInstance();
+
+        // create order source stream
+        NonKeyedPartitionStream<Tuple2<Integer, Long>> orders =
+                env.fromSource(
+                        DataStreamV2SourceUtils.fromData(
+                                
Arrays.asList(CountProductSalesWindowingData.ORDERS)),
+                        "order source");
+
+        // extract and propagate event time from order
+        NonKeyedPartitionStream<Tuple2<Integer, Long>> orderStream =
+                orders.process(
+                        EventTimeExtension.<Tuple2<Integer, 
Long>>newWatermarkGeneratorBuilder(
+                                        order -> order.f1)
+                                .periodicWatermark(Duration.ofMillis(200))
+                                .buildAsProcessFunction());
+
+        NonKeyedPartitionStream<Tuple3<Integer, Long, Long>> 
productSalesQuantityStream =
+                orderStream
+                        // key by productId
+                        .keyBy(order -> order.f0)
+                        .process(
+                                BuiltinFuncs.window(
+                                        // declare tumbling window with window 
size 1 hour
+                                        WindowStrategy.tumbling(
+                                                Duration.ofHours(1), 
WindowStrategy.EVENT_TIME),
+                                        // define window process function to 
calculate total sales
+                                        // quantity per product per window.
+                                        // you can choose to use either 
CountSalesQuantity or
+                                        // 
CountSalesQuantityWithPreAggregation; both
+                                        // implementations are equivalent, but 
the latter is more
+                                        // efficient.
+                                        new CountSalesQuantity()));
+
+        if (fileOutput) {
+            productSalesQuantityStream
+                    .toSink(
+                            new WrappedSink<>(
+                                    FileSink.<Tuple3<Integer, Long, 
Long>>forRowFormat(
+                                                    new 
Path(params.get("output")),
+                                                    new 
SimpleStringEncoder<>())
+                                            .withRollingPolicy(
+                                                    
DefaultRollingPolicy.builder()
+                                                            .withMaxPartSize(
+                                                                    
MemorySize.ofMebiBytes(1))
+                                                            
.withRolloverInterval(
+                                                                    
Duration.ofSeconds(10))
+                                                            .build())
+                                            .build()))
+                    .withName("file-sink");
+        } else {
+            // Print the results to the STDOUT.
+            productSalesQuantityStream
+                    .toSink(new WrappedSink<>(new PrintSink<>()))
+                    .withName("print-sink");
+        }
+
+        env.execute("CountProductSalesWindowing example");
+    }
+
+    /**
+     * Count sales quantity per product.
+     *
+     * <p>We will obtain all orders and calculate the total sales quantity for 
each product when
+     * window trigger.
+     */
+    public static class CountSalesQuantity
+            implements OneInputWindowStreamProcessFunction<
+                    Tuple2<Integer, Long>, Tuple3<Integer, Long, Long>> {
+
+        @Override
+        public void onTrigger(
+                Collector<Tuple3<Integer, Long, Long>> output,
+                PartitionedContext<Tuple3<Integer, Long, Long>> ctx,
+                OneInputWindowContext<Tuple2<Integer, Long>> windowContext)
+                throws Exception {
+            // get current productId
+            int productId = ctx.getStateManager().getCurrentKey();
+            // calculate total sales quantity
+            long totalSalesQuantity = 0;
+            for (Tuple2<Integer, Long> ignored : 
windowContext.getAllRecords()) {
+                totalSalesQuantity += 1;
+            }
+            // emit result
+            output.collect(Tuple3.of(productId, windowContext.getStartTime(), 
totalSalesQuantity));
+        }
+    }
+
+    /**
+     * Count sales quantity per product.
+     *
+     * <p>Firstly, we declare a state to store the sales quantity for each 
product.
+     *
+     * <p>When receive a record, we update the sales quantity in state.
+     *
+     * <p>When window trigger, we emit the result.
+     */
+    public static class CountSalesQuantityWithPreAggregation

Review Comment:
   I think we could remove this unused function. User can learn how to use 
pre-agg window from documentation.



##########
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/dsv2/windowing/util/CountProductSalesWindowingData.java:
##########
@@ -0,0 +1,648 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.dsv2.windowing.util;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+
+public class CountProductSalesWindowingData {

Review Comment:
   java doc.



##########
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/dsv2/join/Join.java:
##########
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.dsv2.join;
+
+import org.apache.flink.api.common.serialization.SimpleStringEncoder;
+import org.apache.flink.api.connector.dsv2.DataStreamV2SourceUtils;
+import org.apache.flink.api.connector.dsv2.WrappedSink;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.datastream.api.ExecutionEnvironment;
+import org.apache.flink.datastream.api.builtin.BuiltinFuncs;
+import org.apache.flink.datastream.api.common.Collector;
+import org.apache.flink.datastream.api.context.RuntimeContext;
+import org.apache.flink.datastream.api.extension.join.JoinFunction;
+import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream;
+import org.apache.flink.streaming.api.functions.sink.PrintSink;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
+import org.apache.flink.streaming.examples.dsv2.join.util.JoinData;
+import org.apache.flink.util.ParameterTool;
+
+import java.time.Duration;
+import java.util.Arrays;
+
+/**
+ * Example illustrating a join between two data streams. The example works on 
two input streams with
+ * pairs (name, grade) and (name, salary) * respectively. It joins the streams 
based on "name".
+ *
+ * <p>The example uses a built-in sample data {@link JoinData} as inputs.
+ *
+ * <p>Usage:
+ *
+ * <ul>
+ *   <li><code>--output &lt;path&gt;</code>The output directory where the Job 
will write the
+ *       results. If no output path is provided, the Job will print the 
results to <code>stdout
+ *       </code>.
+ * </ul>
+ *
+ * <p>This example shows how to:
+ *
+ * <ul>
+ *   <li>Usage of Join extension in DataStream API V2
+ * </ul>
+ *
+ * <p>Please note that if you intend to run this example in an IDE, you must 
first add the following
+ * VM options: "--add-opens=java.base/java.util=ALL-UNNAMED". This is 
necessary because the module
+ * system in JDK 17+ restricts some reflection operations.
+ *
+ * <p>Please note that the DataStream API V2 is a new set of APIs, to 
gradually replace the original
+ * DataStream API. It is currently in the experimental stage and is not fully 
available for
+ * production.
+ */
+public class Join {
+
+    // 
*************************************************************************
+    // PROGRAM
+    // 
*************************************************************************
+
+    public static void main(String[] args) throws Exception {
+        // parse the parameters
+        final ParameterTool params = ParameterTool.fromArgs(args);
+        final boolean fileOutput = params.has("output");
+
+        // obtain execution environment
+        ExecutionEnvironment env = ExecutionEnvironment.getInstance();
+
+        // create the data sources for both grades and salaries
+        NonKeyedPartitionStream<Tuple2<String, Integer>> grades =
+                env.fromSource(
+                        
DataStreamV2SourceUtils.fromData(Arrays.asList(JoinData.GRADE_DATAS)),
+                        "grades source");
+        NonKeyedPartitionStream<Tuple2<String, Long>> salaries =
+                env.fromSource(
+                        
DataStreamV2SourceUtils.fromData(Arrays.asList(JoinData.SALARY_DATAS)),
+                        "salaries source");
+
+        // joining two DataStreams
+        NonKeyedPartitionStream<Tuple3<String, Integer, Long>> joinedStream =
+                BuiltinFuncs.join(
+                        grades,
+                        new GradeKeySelector(),
+                        salaries,
+                        new SalaryKeySelector(),
+                        new JoinGradeAndSalaryFunction());
+
+        if (fileOutput) {
+            // write joined results to file
+            joinedStream
+                    .toSink(
+                            new WrappedSink<>(
+                                    FileSink.<Tuple3<String, Integer, 
Long>>forRowFormat(
+                                                    new 
Path(params.get("output")),
+                                                    new 
SimpleStringEncoder<>())
+                                            .withRollingPolicy(
+                                                    
DefaultRollingPolicy.builder()
+                                                            .withMaxPartSize(
+                                                                    
MemorySize.ofMebiBytes(1))
+                                                            
.withRolloverInterval(
+                                                                    
Duration.ofSeconds(10))
+                                                            .build())
+                                            .build()))
+                    .withName("output");
+        } else {
+            // Print the results to the STDOUT.
+            joinedStream.toSink(new WrappedSink<>(new 
PrintSink<>())).withName("print-sink");
+        }
+
+        // execute program
+        env.execute("Join Example");
+    }
+
+    private static class GradeKeySelector implements 
KeySelector<Tuple2<String, Integer>, String> {
+        @Override
+        public String getKey(Tuple2<String, Integer> value) {
+            return value.f0;
+        }
+    }
+
+    private static class SalaryKeySelector implements 
KeySelector<Tuple2<String, Long>, String> {
+        @Override
+        public String getKey(Tuple2<String, Long> value) {
+            return value.f0;
+        }
+    }
+
+    private static class JoinGradeAndSalaryFunction
+            implements JoinFunction<
+                    Tuple2<String, Integer>, Tuple2<String, Long>, 
Tuple3<String, Integer, Long>> {
+
+        @Override
+        public void processRecord(
+                Tuple2<String, Integer> leftRecord,
+                Tuple2<String, Long> rightRecord,
+                Collector<Tuple3<String, Integer, Long>> output,
+                RuntimeContext ctx)
+                throws Exception {
+            output.collect(Tuple3.of(leftRecord.f0, leftRecord.f1, 
rightRecord.f1));
+        }
+    }

Review Comment:
   java doc



##########
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/dsv2/windowing/CountProductSalesWindowing.java:
##########
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.dsv2.windowing;
+
+import org.apache.flink.api.common.serialization.SimpleStringEncoder;
+import org.apache.flink.api.common.state.StateDeclaration;
+import org.apache.flink.api.common.state.StateDeclarations;
+import org.apache.flink.api.common.state.ValueStateDeclaration;
+import org.apache.flink.api.common.state.v2.ValueState;
+import org.apache.flink.api.common.typeinfo.TypeDescriptors;
+import org.apache.flink.api.connector.dsv2.DataStreamV2SourceUtils;
+import org.apache.flink.api.connector.dsv2.WrappedSink;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.datastream.api.ExecutionEnvironment;
+import org.apache.flink.datastream.api.builtin.BuiltinFuncs;
+import org.apache.flink.datastream.api.common.Collector;
+import org.apache.flink.datastream.api.context.PartitionedContext;
+import org.apache.flink.datastream.api.extension.eventtime.EventTimeExtension;
+import 
org.apache.flink.datastream.api.extension.window.context.OneInputWindowContext;
+import 
org.apache.flink.datastream.api.extension.window.function.OneInputWindowStreamProcessFunction;
+import 
org.apache.flink.datastream.api.extension.window.strategy.WindowStrategy;
+import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream;
+import org.apache.flink.streaming.api.functions.sink.PrintSink;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
+import 
org.apache.flink.streaming.examples.dsv2.windowing.util.CountProductSalesWindowingData;
+import org.apache.flink.util.ParameterTool;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Set;
+
+/**
+ * Example illustrating how to use Window to count the sales of each product 
in each hour by
+ * DataStream API V2.
+ *
+ * <p>The input is a [list of] order, each order is a tuple of productId and 
orderTime.

Review Comment:
   ```suggestion
    * <p>The input is a list of order, each order is a tuple of productId and 
orderTime.
   ```



##########
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/dsv2/watermark/SyncOffsetByWatermark.java:
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.dsv2.watermark;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.watermark.LongWatermark;
+import org.apache.flink.api.common.watermark.LongWatermarkDeclaration;
+import org.apache.flink.api.common.watermark.Watermark;
+import org.apache.flink.api.common.watermark.WatermarkDeclaration;
+import org.apache.flink.api.common.watermark.WatermarkDeclarations;
+import org.apache.flink.api.common.watermark.WatermarkHandlingResult;
+import org.apache.flink.api.connector.dsv2.WrappedSource;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.connector.datagen.source.DataGeneratorSource;
+import org.apache.flink.connector.datagen.source.GeneratorFunction;
+import org.apache.flink.datastream.api.ExecutionEnvironment;
+import org.apache.flink.datastream.api.common.Collector;
+import org.apache.flink.datastream.api.context.NonPartitionedContext;
+import org.apache.flink.datastream.api.context.PartitionedContext;
+import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction;
+import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream;
+
+import java.util.Set;
+
+/**
+ * The example simulates Flink reading a message queue. Each partition of the 
topic has its own
+ * consumption offset, and the example shows how to synchronise these offsets.
+ *
+ * <p>The example uses a {@link DataGeneratorSource} to simulate a message 
queue and a custom {@link
+ * Watermark} to propagate the offset in the stream.
+ *
+ * <p>This example shows how to:
+ *
+ * <ul>
+ *   <li>How to define and propagate custom event (Watermark) in DataStream 
API V2
+ * </ul>
+ *
+ * <p>Please note that if you intend to run this example in an IDE, you must 
first add the following
+ * VM options: "--add-opens=java.base/java.util=ALL-UNNAMED". This is 
necessary because the module
+ * system in JDK 17+ restricts some reflection operations.
+ *
+ * <p>Please note that the DataStream API V2 is a new set of APIs, to 
gradually replace the original
+ * DataStream API. It is currently in the experimental stage and is not fully 
available for
+ * production.
+ */
+public class SyncOffsetByWatermark {
+
+    // 
*************************************************************************
+    // PROGRAM
+    // 
*************************************************************************
+
+    /**
+     * The {@link Message} represents the message in the message queue. Each 
message has a topic,
+     * partition, offset and associated data.
+     */
+    public static class Message {
+        public String topic;
+        public long partition;
+        public long offset;
+        public String data;
+
+        public Message(String topic, long partition, long offset, String data) 
{
+            this.topic = topic;
+            this.partition = partition;
+            this.offset = offset;
+            this.data = data;
+        }
+    }
+
+    /**
+     * Firstly, we define an offset watermark, which represents the currently 
consumed offset of the
+     * partition. Since the watermark needs to convey the offset, its data 
type is long. To
+     * determine the minimum offset across all partitions, we utilize the 
combineFunctionMin()
+     * method to combine the watermarks. The default handling strategy is 
forward, meaning that the
+     * watermark will typically be advanced to downstream operators in most 
scenarios. Thus, we
+     * create a WatermarkDeclaration instance that can be used to declare and 
generate the
+     * watermark.
+     */
+    public static final LongWatermarkDeclaration OFFSET_WATERMARK_DECLARATION =
+            WatermarkDeclarations.newBuilder("OFFSET_WATERMARK")
+                    .typeLong()
+                    .combineFunctionMin()
+                    .combineWaitForAllChannels(true)
+                    .defaultHandlingStrategyForward()
+                    .build();
+
+    public static void main(String[] args) throws Exception {
+        // obtain execution environment
+        ExecutionEnvironment env = ExecutionEnvironment.getInstance();
+
+        // Create a source that simulates a message queue with a parallelism 
set to 5, indicating
+        // that the consumed topic has 5 partitions.
+        // The source will declare and generate offset watermarks.
+        NonKeyedPartitionStream<Message> source =
+                env.fromSource(
+                                new WrappedSource<>(new MessageQueueSource()),
+                                "message queue source")
+                        .withParallelism(5);
+
+        source.process(
+                // handle offset watermark in downstream
+                new OneInputStreamProcessFunction<>() {
+                    @Override
+                    public void processRecord(
+                            Message record,
+                            Collector<Object> output,
+                            PartitionedContext<Object> ctx)
+                            throws Exception {
+                        // do something
+                    }
+
+                    @Override
+                    public WatermarkHandlingResult onWatermark(
+                            Watermark watermark,
+                            Collector<Object> output,
+                            NonPartitionedContext<Object> ctx) {
+                        if (watermark
+                                .getIdentifier()
+                                
.equals(OFFSET_WATERMARK_DECLARATION.getIdentifier())) {
+                            // For our custom offset watermark, we can obtain 
the combined offset
+                            // here and take further action with it.
+                            long syncedOffset = ((LongWatermark) 
watermark).getValue();
+                            System.out.println(
+                                    "All partitions have been consumed up to 
the "
+                                            + syncedOffset
+                                            + " offset.");
+                            return WatermarkHandlingResult.PEEK;
+                        }
+
+                        return WatermarkHandlingResult.PEEK;
+                    }
+                });
+
+        // execute program
+        env.execute("SyncOffsetByWatermark Example");
+    }
+
+    private static class MessageQueueSource extends 
DataGeneratorSource<Message> {
+
+        public MessageQueueSource() {
+            super(new CustomGeneratorFunction(), 1_000_000L, 
TypeInformation.of(Message.class));
+        }
+
+        @Override
+        public Set<? extends WatermarkDeclaration> declareWatermarks() {
+            // Declare the offset watermark.
+            return Set.of(OFFSET_WATERMARK_DECLARATION);
+        }
+    }
+
+    private static class CustomGeneratorFunction implements 
GeneratorFunction<Long, Message> {

Review Comment:
   doc



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to