This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 4762311e284fadd0779c266f0f09c72585fe03ef Author: Wencong Liu <[email protected]> AuthorDate: Wed Mar 6 20:34:45 2024 +0800 [FLINK-34543][datastream] Introduce the PartitionWindowedStream --- .../flink/streaming/api/datastream/DataStream.java | 12 +++++++ .../streaming/api/datastream/IterativeStream.java | 6 ++++ .../datastream/KeyedPartitionWindowedStream.java | 40 ++++++++++++++++++++++ .../streaming/api/datastream/KeyedStream.java | 13 +++++++ .../NonKeyedPartitionWindowedStream.java | 40 ++++++++++++++++++++++ .../api/datastream/PartitionWindowedStream.java | 32 +++++++++++++++++ .../scala/StreamingScalaAPICompletenessTest.scala | 7 +++- 7 files changed, 149 insertions(+), 1 deletion(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java index 179a3cb13f2..22be1f71f7a 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java @@ -1459,6 +1459,18 @@ public class DataStream<T> { collector.setIterator(iterator); } + /** + * Collect records from each partition into a separate full window. The window emission will be + * triggered at the end of inputs. For this non-keyed data stream(each record has no key), a + * partition contains all records of a subtask. + * + * @return The full windowed data stream on partition. + */ + @PublicEvolving + public PartitionWindowedStream<T> fullWindowPartition() { + return new NonKeyedPartitionWindowedStream<>(environment, this); + } + /** * This class acts as an accessor to elements collected via {@link #collectAsync(Collector)}. * diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/IterativeStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/IterativeStream.java index 6b53493cb2e..3a3b5de755e 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/IterativeStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/IterativeStream.java @@ -126,6 +126,12 @@ public class IterativeStream<T> extends SingleOutputStreamOperator<T> { return new ConnectedIterativeStreams<>(originalInput, feedbackType, maxWaitTime); } + @Override + public PartitionWindowedStream<T> fullWindowPartition() { + throw new UnsupportedOperationException( + "The fullWindowPartition is not supported because the IterativeStream has been deprecated since Flink 1.19."); + } + /** * The {@link ConnectedIterativeStreams} represent a start of an iterative part of a streaming * program, where the original input of the iteration and the feedback of the iteration are diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedPartitionWindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedPartitionWindowedStream.java new file mode 100644 index 00000000000..ecb9db8563b --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedPartitionWindowedStream.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.datastream; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +/** + * {@link KeyedPartitionWindowedStream} represents a data stream that collects all records with the + * same key separately into a full window. + */ +@Internal +public class KeyedPartitionWindowedStream<T, KEY> implements PartitionWindowedStream<T> { + + private final StreamExecutionEnvironment environment; + + private final KeyedStream<T, KEY> input; + + public KeyedPartitionWindowedStream( + StreamExecutionEnvironment environment, KeyedStream<T, KEY> input) { + this.environment = environment; + this.input = input; + } +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java index 82925a83896..3dae2b7bcbc 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java @@ -1071,6 +1071,19 @@ public class KeyedStream<T, KEY> extends DataStream<T> { return reduce(aggregate).name("Keyed Aggregation"); } + /** + * Collect records from each partition into a separate full window. The window emission will be + * triggered at the end of inputs. For this keyed data stream(each record has a key), a + * partition only contains all records with the same key. + * + * @return The full windowed data stream on partition. + */ + @PublicEvolving + @Override + public PartitionWindowedStream<T> fullWindowPartition() { + return new KeyedPartitionWindowedStream<>(environment, this); + } + /** * Publishes the keyed stream as queryable ValueState instance. * diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/NonKeyedPartitionWindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/NonKeyedPartitionWindowedStream.java new file mode 100644 index 00000000000..9454f04267d --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/NonKeyedPartitionWindowedStream.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.datastream; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +/** + * {@link NonKeyedPartitionWindowedStream} represents a data stream that collects all records of + * each subtask separately into a full window. + */ +@Internal +public class NonKeyedPartitionWindowedStream<T> implements PartitionWindowedStream<T> { + + private final StreamExecutionEnvironment environment; + + private final DataStream<T> input; + + public NonKeyedPartitionWindowedStream( + StreamExecutionEnvironment environment, DataStream<T> input) { + this.environment = environment; + this.input = input; + } +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/PartitionWindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/PartitionWindowedStream.java new file mode 100644 index 00000000000..8c3caf97982 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/PartitionWindowedStream.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.datastream; + +import org.apache.flink.annotation.PublicEvolving; + +/** + * {@link PartitionWindowedStream} represents a data stream that collects all records of each + * partition separately into a full window. Window emission will be triggered at the end of inputs. + * For non-keyed {@link DataStream}, a partition contains all records of a subtask. For {@link + * KeyedStream}, a partition contains all records of a key. + * + * @param <T> The type of the elements in this stream. + */ +@PublicEvolving +public interface PartitionWindowedStream<T> {} diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala index 8eb144635d3..d54e6817b1c 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala @@ -79,7 +79,12 @@ class StreamingScalaAPICompletenessTest extends ScalaAPICompletenessTestBase { // Deactivated until Scala API has new windowing API "org.apache.flink.streaming.api.datastream.DataStream.timeWindowAll", - "org.apache.flink.streaming.api.datastream.DataStream.windowAll" + "org.apache.flink.streaming.api.datastream.DataStream.windowAll", + + // The following newly added DataStream APIs should be ignored because all Scala APIs + // have been deprecated since Flink 1.18. + "org.apache.flink.streaming.api.datastream.DataStream.fullWindowPartition", + "org.apache.flink.streaming.api.datastream.KeyedStream.fullWindowPartition" ) val excludedPatterns = Seq( // We don't have project on tuples in the Scala API
