This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4762311e284fadd0779c266f0f09c72585fe03ef
Author: Wencong Liu <[email protected]>
AuthorDate: Wed Mar 6 20:34:45 2024 +0800

    [FLINK-34543][datastream] Introduce the PartitionWindowedStream
---
 .../flink/streaming/api/datastream/DataStream.java | 12 +++++++
 .../streaming/api/datastream/IterativeStream.java  |  6 ++++
 .../datastream/KeyedPartitionWindowedStream.java   | 40 ++++++++++++++++++++++
 .../streaming/api/datastream/KeyedStream.java      | 13 +++++++
 .../NonKeyedPartitionWindowedStream.java           | 40 ++++++++++++++++++++++
 .../api/datastream/PartitionWindowedStream.java    | 32 +++++++++++++++++
 .../scala/StreamingScalaAPICompletenessTest.scala  |  7 +++-
 7 files changed, 149 insertions(+), 1 deletion(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 179a3cb13f2..22be1f71f7a 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -1459,6 +1459,18 @@ public class DataStream<T> {
         collector.setIterator(iterator);
     }
 
+    /**
+     * Collect records from each partition into a separate full window. The 
window emission will be
+     * triggered at the end of inputs. For this non-keyed data stream(each 
record has no key), a
+     * partition contains all records of a subtask.
+     *
+     * @return The full windowed data stream on partition.
+     */
+    @PublicEvolving
+    public PartitionWindowedStream<T> fullWindowPartition() {
+        return new NonKeyedPartitionWindowedStream<>(environment, this);
+    }
+
     /**
      * This class acts as an accessor to elements collected via {@link 
#collectAsync(Collector)}.
      *
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/IterativeStream.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/IterativeStream.java
index 6b53493cb2e..3a3b5de755e 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/IterativeStream.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/IterativeStream.java
@@ -126,6 +126,12 @@ public class IterativeStream<T> extends 
SingleOutputStreamOperator<T> {
         return new ConnectedIterativeStreams<>(originalInput, feedbackType, 
maxWaitTime);
     }
 
+    @Override
+    public PartitionWindowedStream<T> fullWindowPartition() {
+        throw new UnsupportedOperationException(
+                "The fullWindowPartition is not supported because the 
IterativeStream has been deprecated since Flink 1.19.");
+    }
+
     /**
      * The {@link ConnectedIterativeStreams} represent a start of an iterative 
part of a streaming
      * program, where the original input of the iteration and the feedback of 
the iteration are
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedPartitionWindowedStream.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedPartitionWindowedStream.java
new file mode 100644
index 00000000000..ecb9db8563b
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedPartitionWindowedStream.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+/**
+ * {@link KeyedPartitionWindowedStream} represents a data stream that collects 
all records with the
+ * same key separately into a full window.
+ */
+@Internal
+public class KeyedPartitionWindowedStream<T, KEY> implements 
PartitionWindowedStream<T> {
+
+    private final StreamExecutionEnvironment environment;
+
+    private final KeyedStream<T, KEY> input;
+
+    public KeyedPartitionWindowedStream(
+            StreamExecutionEnvironment environment, KeyedStream<T, KEY> input) 
{
+        this.environment = environment;
+        this.input = input;
+    }
+}
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
index 82925a83896..3dae2b7bcbc 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
@@ -1071,6 +1071,19 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
         return reduce(aggregate).name("Keyed Aggregation");
     }
 
+    /**
+     * Collect records from each partition into a separate full window. The 
window emission will be
+     * triggered at the end of inputs. For this keyed data stream(each record 
has a key), a
+     * partition only contains all records with the same key.
+     *
+     * @return The full windowed data stream on partition.
+     */
+    @PublicEvolving
+    @Override
+    public PartitionWindowedStream<T> fullWindowPartition() {
+        return new KeyedPartitionWindowedStream<>(environment, this);
+    }
+
     /**
      * Publishes the keyed stream as queryable ValueState instance.
      *
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/NonKeyedPartitionWindowedStream.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/NonKeyedPartitionWindowedStream.java
new file mode 100644
index 00000000000..9454f04267d
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/NonKeyedPartitionWindowedStream.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+/**
+ * {@link NonKeyedPartitionWindowedStream} represents a data stream that 
collects all records of
+ * each subtask separately into a full window.
+ */
+@Internal
+public class NonKeyedPartitionWindowedStream<T> implements 
PartitionWindowedStream<T> {
+
+    private final StreamExecutionEnvironment environment;
+
+    private final DataStream<T> input;
+
+    public NonKeyedPartitionWindowedStream(
+            StreamExecutionEnvironment environment, DataStream<T> input) {
+        this.environment = environment;
+        this.input = input;
+    }
+}
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/PartitionWindowedStream.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/PartitionWindowedStream.java
new file mode 100644
index 00000000000..8c3caf97982
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/PartitionWindowedStream.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * {@link PartitionWindowedStream} represents a data stream that collects all 
records of each
+ * partition separately into a full window. Window emission will be triggered 
at the end of inputs.
+ * For non-keyed {@link DataStream}, a partition contains all records of a 
subtask. For {@link
+ * KeyedStream}, a partition contains all records of a key.
+ *
+ * @param <T> The type of the elements in this stream.
+ */
+@PublicEvolving
+public interface PartitionWindowedStream<T> {}
diff --git 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
index 8eb144635d3..d54e6817b1c 100644
--- 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
+++ 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
@@ -79,7 +79,12 @@ class StreamingScalaAPICompletenessTest extends 
ScalaAPICompletenessTestBase {
 
       // Deactivated until Scala API has new windowing API
       "org.apache.flink.streaming.api.datastream.DataStream.timeWindowAll",
-      "org.apache.flink.streaming.api.datastream.DataStream.windowAll"
+      "org.apache.flink.streaming.api.datastream.DataStream.windowAll",
+
+      // The following newly added DataStream APIs should be ignored because 
all Scala APIs
+      // have been deprecated since Flink 1.18.
+      
"org.apache.flink.streaming.api.datastream.DataStream.fullWindowPartition",
+      
"org.apache.flink.streaming.api.datastream.KeyedStream.fullWindowPartition"
     )
     val excludedPatterns = Seq(
       // We don't have project on tuples in the Scala API

Reply via email to