This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4d131705b19d06f55f94cc6e19728488fa5448f1
Author: codenohup <[email protected]>
AuthorDate: Wed Sep 11 16:44:15 2024 +0800

    [FLINK-35795][API] Introduce the framework of ProcessFunction Attribute
---
 .../flink/api/common/attribute/Attribute.java      |  57 +++++
 .../org/apache/flink/api/dag/Transformation.java   |  11 +
 .../api/attribute/NoOutputUntilEndOfInput.java     |  36 +++
 .../datastream/impl/attribute/AttributeParser.java |  36 +++
 .../impl/stream/BroadcastStreamImpl.java           |   4 +
 .../datastream/impl/stream/GlobalStreamImpl.java   |  21 +-
 .../impl/stream/KeyedPartitionStreamImpl.java      |  21 +-
 .../impl/stream/NonKeyedPartitionStreamImpl.java   |   5 +
 ...treamingJobGraphGeneratorWithAttributeTest.java | 244 +++++++++++++++++++++
 .../flink/streaming/api/graph/StreamConfig.java    |  18 ++
 .../flink/streaming/api/graph/StreamGraph.java     |   7 +
 .../flink/streaming/api/graph/StreamNode.java      |  11 +
 .../api/graph/StreamingJobGraphGenerator.java      |  35 +--
 .../AbstractOneInputTransformationTranslator.java  |   2 +-
 .../AbstractTwoInputTransformationTranslator.java  |   2 +-
 .../MultiInputTransformationTranslator.java        |   2 +-
 .../OneInputTransformationTranslator.java          |   3 +-
 ...obGraphGeneratorWithOperatorAttributesTest.java | 101 ++++++++-
 18 files changed, 581 insertions(+), 35 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/attribute/Attribute.java 
b/flink-core/src/main/java/org/apache/flink/api/common/attribute/Attribute.java
new file mode 100644
index 00000000000..3812a6c6e51
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/attribute/Attribute.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.attribute;
+
+import org.apache.flink.annotation.Internal;
+
+import java.io.Serializable;
+
+/** {@link Attribute} contains the information about the process logic of a 
process function. */
+@Internal
+public class Attribute implements Serializable {
+
+    private boolean isNoOutputUntilEndOfInput;
+
+    private Attribute(boolean isNoOutputUntilEndOfInput) {
+        this.isNoOutputUntilEndOfInput = isNoOutputUntilEndOfInput;
+    }
+
+    public boolean isNoOutputUntilEndOfInput() {
+        return isNoOutputUntilEndOfInput;
+    }
+
+    public void setNoOutputUntilEndOfInput(boolean noOutputUntilEndOfInput) {
+        isNoOutputUntilEndOfInput = noOutputUntilEndOfInput;
+    }
+
+    @Internal
+    public static class Builder {
+
+        private boolean isNoOutputUntilEndOfInput = false;
+
+        public Builder setNoOutputUntilEndOfInput(boolean 
isNoOutputUntilEndOfInput) {
+            this.isNoOutputUntilEndOfInput = isNoOutputUntilEndOfInput;
+            return this;
+        }
+
+        public Attribute build() {
+            return new Attribute(isNoOutputUntilEndOfInput);
+        }
+    }
+}
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/dag/Transformation.java 
b/flink-core/src/main/java/org/apache/flink/api/dag/Transformation.java
index b38f52c20df..804638c30fd 100644
--- a/flink-core/src/main/java/org/apache/flink/api/dag/Transformation.java
+++ b/flink-core/src/main/java/org/apache/flink/api/dag/Transformation.java
@@ -21,6 +21,7 @@ package org.apache.flink.api.dag;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.attribute.Attribute;
 import org.apache.flink.api.common.functions.InvalidTypesException;
 import org.apache.flink.api.common.operators.ResourceSpec;
 import org.apache.flink.api.common.operators.SlotSharingGroup;
@@ -188,6 +189,8 @@ public abstract class Transformation<T> {
 
     @Nullable private String coLocationGroupKey;
 
+    private Attribute attribute = new Attribute.Builder().build();
+
     /**
      * Creates a new {@code Transformation} with the given name, output type 
and parallelism.
      *
@@ -649,4 +652,12 @@ public abstract class Transformation<T> {
     public int hashCode() {
         return Objects.hash(id, name, outputType, parallelism, bufferTimeout);
     }
+
+    public void setAttribute(Attribute attribute) {
+        this.attribute = attribute;
+    }
+
+    public Attribute getAttribute() {
+        return attribute;
+    }
 }
diff --git 
a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/attribute/NoOutputUntilEndOfInput.java
 
b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/attribute/NoOutputUntilEndOfInput.java
new file mode 100644
index 00000000000..63adbfa4b6f
--- /dev/null
+++ 
b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/attribute/NoOutputUntilEndOfInput.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.api.attribute;
+
+import org.apache.flink.annotation.Experimental;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * {@link NoOutputUntilEndOfInput} indicates that the process function will 
only output records
+ * after all inputs are ended. If this annotation is applied to a process 
function with an unbounded
+ * source, a compilation error will occur.
+ */
+@Experimental
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.TYPE)
+public @interface NoOutputUntilEndOfInput {}
diff --git 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/attribute/AttributeParser.java
 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/attribute/AttributeParser.java
new file mode 100644
index 00000000000..cfc1d21f101
--- /dev/null
+++ 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/attribute/AttributeParser.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.impl.attribute;
+
+import org.apache.flink.api.common.attribute.Attribute;
+import org.apache.flink.datastream.api.attribute.NoOutputUntilEndOfInput;
+import org.apache.flink.datastream.api.function.ProcessFunction;
+
+/** {@link AttributeParser} is used to parse {@link Attribute} from {@link 
ProcessFunction}. */
+public class AttributeParser {
+
+    public static Attribute parseAttribute(ProcessFunction function) {
+        Class<? extends ProcessFunction> functionClass = function.getClass();
+        Attribute.Builder attributeBuilder = new Attribute.Builder();
+        if (functionClass.isAnnotationPresent(NoOutputUntilEndOfInput.class)) {
+            attributeBuilder.setNoOutputUntilEndOfInput(true);
+        }
+        return attributeBuilder.build();
+    }
+}
diff --git 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/BroadcastStreamImpl.java
 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/BroadcastStreamImpl.java
index defd870be68..8b02eabdaf3 100644
--- 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/BroadcastStreamImpl.java
+++ 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/BroadcastStreamImpl.java
@@ -30,6 +30,7 @@ import 
org.apache.flink.datastream.api.stream.KeyedPartitionStream.ProcessConfig
 import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream;
 import 
org.apache.flink.datastream.api.stream.NonKeyedPartitionStream.ProcessConfigurableAndNonKeyedPartitionStream;
 import org.apache.flink.datastream.impl.ExecutionEnvironmentImpl;
+import org.apache.flink.datastream.impl.attribute.AttributeParser;
 import 
org.apache.flink.datastream.impl.operators.KeyedTwoInputBroadcastProcessOperator;
 import 
org.apache.flink.datastream.impl.operators.TwoInputBroadcastProcessOperator;
 import org.apache.flink.datastream.impl.utils.StreamUtils;
@@ -77,6 +78,7 @@ public class BroadcastStreamImpl<T> extends 
AbstractDataStream<T> implements Bro
                         this,
                         outTypeInfo,
                         processOperator);
+        
outTransformation.setAttribute(AttributeParser.parseAttribute(processFunction));
         environment.addOperator(outTransformation);
         return StreamUtils.wrapWithConfigureHandle(
                 new NonKeyedPartitionStreamImpl<>(environment, 
outTransformation));
@@ -105,6 +107,7 @@ public class BroadcastStreamImpl<T> extends 
AbstractDataStream<T> implements Bro
                         this,
                         outTypeInfo,
                         processOperator);
+        
outTransformation.setAttribute(AttributeParser.parseAttribute(processFunction));
         environment.addOperator(outTransformation);
         return StreamUtils.wrapWithConfigureHandle(
                 new NonKeyedPartitionStreamImpl<>(environment, 
outTransformation));
@@ -133,6 +136,7 @@ public class BroadcastStreamImpl<T> extends 
AbstractDataStream<T> implements Bro
 
         NonKeyedPartitionStreamImpl<OUT> outputStream =
                 new NonKeyedPartitionStreamImpl<>(environment, 
outTransformation);
+        
outTransformation.setAttribute(AttributeParser.parseAttribute(processFunction));
         environment.addOperator(outTransformation);
         // Construct a keyed stream directly without partitionTransformation 
to avoid shuffle.
         return StreamUtils.wrapWithConfigureHandle(
diff --git 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/GlobalStreamImpl.java
 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/GlobalStreamImpl.java
index 8dddc935318..137917bca4f 100644
--- 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/GlobalStreamImpl.java
+++ 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/GlobalStreamImpl.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.datastream.impl.stream;
 
+import org.apache.flink.api.common.attribute.Attribute;
 import org.apache.flink.api.common.state.StateDeclaration;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.connector.dsv2.Sink;
@@ -33,6 +34,7 @@ import 
org.apache.flink.datastream.api.stream.KeyedPartitionStream;
 import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream;
 import org.apache.flink.datastream.api.stream.ProcessConfigurable;
 import org.apache.flink.datastream.impl.ExecutionEnvironmentImpl;
+import org.apache.flink.datastream.impl.attribute.AttributeParser;
 import org.apache.flink.datastream.impl.operators.ProcessOperator;
 import 
org.apache.flink.datastream.impl.operators.TwoInputNonBroadcastProcessOperator;
 import org.apache.flink.datastream.impl.operators.TwoOutputProcessOperator;
@@ -69,7 +71,12 @@ public class GlobalStreamImpl<T> extends 
AbstractDataStream<T> implements Global
         TypeInformation<OUT> outType =
                 
StreamUtils.getOutputTypeForOneInputProcessFunction(processFunction, getType());
         ProcessOperator<T, OUT> operator = new 
ProcessOperator<>(processFunction);
-        return StreamUtils.wrapWithConfigureHandle(transform("Global Process", 
outType, operator));
+        return StreamUtils.wrapWithConfigureHandle(
+                transform(
+                        "Global Process",
+                        outType,
+                        operator,
+                        AttributeParser.parseAttribute(processFunction)));
     }
 
     @Override
@@ -89,7 +96,11 @@ public class GlobalStreamImpl<T> extends 
AbstractDataStream<T> implements Global
         TwoOutputProcessOperator<T, OUT1, OUT2> operator =
                 new TwoOutputProcessOperator<>(processFunction, 
secondOutputTag);
         GlobalStreamImpl<OUT1> firstStream =
-                transform("Two-Output-Operator", firstOutputType, operator);
+                transform(
+                        "Two-Output-Operator",
+                        firstOutputType,
+                        operator,
+                        AttributeParser.parseAttribute(processFunction));
         GlobalStreamImpl<OUT2> secondStream =
                 new GlobalStreamImpl<>(
                         environment, 
firstStream.getSideOutputTransform(secondOutputTag));
@@ -122,6 +133,7 @@ public class GlobalStreamImpl<T> extends 
AbstractDataStream<T> implements Global
         // Operator parallelism should always be 1 for global stream.
         // parallelismConfigured should be true to avoid overwritten by 
AdaptiveBatchScheduler.
         outTransformation.setParallelism(1, true);
+        
outTransformation.setAttribute(AttributeParser.parseAttribute(processFunction));
         environment.addOperator(outTransformation);
         return StreamUtils.wrapWithConfigureHandle(
                 new GlobalStreamImpl<>(environment, outTransformation));
@@ -162,7 +174,8 @@ public class GlobalStreamImpl<T> extends 
AbstractDataStream<T> implements Global
     private <R> GlobalStreamImpl<R> transform(
             String operatorName,
             TypeInformation<R> outputTypeInfo,
-            OneInputStreamOperator<T, R> operator) {
+            OneInputStreamOperator<T, R> operator,
+            Attribute attribute) {
         // read the output type of the input Transform to coax out errors 
about MissingTypeInfo
         transformation.getOutputType();
 
@@ -177,7 +190,7 @@ public class GlobalStreamImpl<T> extends 
AbstractDataStream<T> implements Global
                         // parallelismConfigured should be true to avoid 
overwritten by
                         // AdaptiveBatchScheduler.
                         true);
-
+        resultTransform.setAttribute(attribute);
         GlobalStreamImpl<R> returnStream = new GlobalStreamImpl<>(environment, 
resultTransform);
 
         environment.addOperator(resultTransform);
diff --git 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/KeyedPartitionStreamImpl.java
 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/KeyedPartitionStreamImpl.java
index d9650020478..6fca168b426 100644
--- 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/KeyedPartitionStreamImpl.java
+++ 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/KeyedPartitionStreamImpl.java
@@ -36,6 +36,7 @@ import 
org.apache.flink.datastream.api.stream.NonKeyedPartitionStream;
 import 
org.apache.flink.datastream.api.stream.NonKeyedPartitionStream.ProcessConfigurableAndNonKeyedPartitionStream;
 import 
org.apache.flink.datastream.api.stream.NonKeyedPartitionStream.ProcessConfigurableAndTwoNonKeyedPartitionStream;
 import org.apache.flink.datastream.api.stream.ProcessConfigurable;
+import org.apache.flink.datastream.impl.attribute.AttributeParser;
 import org.apache.flink.datastream.impl.operators.KeyedProcessOperator;
 import 
org.apache.flink.datastream.impl.operators.KeyedTwoInputBroadcastProcessOperator;
 import 
org.apache.flink.datastream.impl.operators.KeyedTwoInputNonBroadcastProcessOperator;
@@ -118,6 +119,7 @@ public class KeyedPartitionStreamImpl<K, V> extends 
AbstractDataStream<V>
         Transformation<OUT> transform =
                 StreamUtils.getOneInputKeyedTransformation(
                         "KeyedProcess", this, outType, operator, keySelector, 
keyType);
+        
transform.setAttribute(AttributeParser.parseAttribute(processFunction));
         environment.addOperator(transform);
         return StreamUtils.wrapWithConfigureHandle(
                 new NonKeyedPartitionStreamImpl<>(environment, transform));
@@ -141,6 +143,7 @@ public class KeyedPartitionStreamImpl<K, V> extends 
AbstractDataStream<V>
                         "KeyedProcess", this, outType, operator, keySelector, 
keyType);
         NonKeyedPartitionStreamImpl<OUT> outputStream =
                 new NonKeyedPartitionStreamImpl<>(environment, transform);
+        
transform.setAttribute(AttributeParser.parseAttribute(processFunction));
         environment.addOperator(transform);
         // Construct a keyed stream directly without partitionTransformation 
to avoid shuffle.
         return StreamUtils.wrapWithConfigureHandle(
@@ -178,6 +181,7 @@ public class KeyedPartitionStreamImpl<K, V> extends 
AbstractDataStream<V>
                         operator,
                         keySelector,
                         keyType);
+        
mainOutputTransform.setAttribute(AttributeParser.parseAttribute(processFunction));
         NonKeyedPartitionStreamImpl<OUT1> nonKeyedMainOutputStream =
                 new NonKeyedPartitionStreamImpl<>(environment, 
mainOutputTransform);
         Transformation<OUT2> sideOutputTransform =
@@ -229,6 +233,7 @@ public class KeyedPartitionStreamImpl<K, V> extends 
AbstractDataStream<V>
                         operator,
                         keySelector,
                         keyType);
+        
firstTransformation.setAttribute(AttributeParser.parseAttribute(processFunction));
         NonKeyedPartitionStreamImpl<OUT1> firstStream =
                 new NonKeyedPartitionStreamImpl<>(environment, 
firstTransformation);
         NonKeyedPartitionStreamImpl<OUT2> secondStream =
@@ -267,6 +272,7 @@ public class KeyedPartitionStreamImpl<K, V> extends 
AbstractDataStream<V>
                         (KeyedPartitionStreamImpl<K, T_OTHER>) other,
                         outTypeInfo,
                         processOperator);
+        
outTransformation.setAttribute(AttributeParser.parseAttribute(processFunction));
         environment.addOperator(outTransformation);
         return StreamUtils.wrapWithConfigureHandle(
                 new NonKeyedPartitionStreamImpl<>(environment, 
outTransformation));
@@ -282,16 +288,18 @@ public class KeyedPartitionStreamImpl<K, V> extends 
AbstractDataStream<V>
                 new HashSet<>(
                         
Collections.singletonList(StateDeclaration.RedistributionMode.IDENTICAL)));
 
-        TypeInformation<OUT> outTypeInfo =
-                
StreamUtils.getOutputTypeForTwoInputNonBroadcastProcessFunction(
-                        processFunction,
-                        getType(),
-                        ((KeyedPartitionStreamImpl<K, T_OTHER>) 
other).getType());
         other =
                 other instanceof ProcessConfigurableAndKeyedPartitionStreamImpl
                         ? ((ProcessConfigurableAndKeyedPartitionStreamImpl) 
other)
                                 .getKeyedPartitionStream()
                         : other;
+
+        TypeInformation<OUT> outTypeInfo =
+                
StreamUtils.getOutputTypeForTwoInputNonBroadcastProcessFunction(
+                        processFunction,
+                        getType(),
+                        ((KeyedPartitionStreamImpl<K, T_OTHER>) 
other).getType());
+
         KeyedTwoInputNonBroadcastProcessOperator<K, V, T_OTHER, OUT> 
processOperator =
                 new 
KeyedTwoInputNonBroadcastProcessOperator<>(processFunction, newKeySelector);
         Transformation<OUT> outTransformation =
@@ -301,6 +309,7 @@ public class KeyedPartitionStreamImpl<K, V> extends 
AbstractDataStream<V>
                         (KeyedPartitionStreamImpl<K, T_OTHER>) other,
                         outTypeInfo,
                         processOperator);
+        
outTransformation.setAttribute(AttributeParser.parseAttribute(processFunction));
         NonKeyedPartitionStreamImpl<OUT> nonKeyedOutputStream =
                 new NonKeyedPartitionStreamImpl<>(environment, 
outTransformation);
         environment.addOperator(outTransformation);
@@ -336,6 +345,7 @@ public class KeyedPartitionStreamImpl<K, V> extends 
AbstractDataStream<V>
                         (BroadcastStreamImpl<T_OTHER>) other,
                         outTypeInfo,
                         processOperator);
+        
outTransformation.setAttribute(AttributeParser.parseAttribute(processFunction));
         environment.addOperator(outTransformation);
         return StreamUtils.wrapWithConfigureHandle(
                 new NonKeyedPartitionStreamImpl<>(environment, 
outTransformation));
@@ -368,6 +378,7 @@ public class KeyedPartitionStreamImpl<K, V> extends 
AbstractDataStream<V>
 
         NonKeyedPartitionStreamImpl<OUT> outputStream =
                 new NonKeyedPartitionStreamImpl<>(environment, 
outTransformation);
+        
outTransformation.setAttribute(AttributeParser.parseAttribute(processFunction));
         environment.addOperator(outTransformation);
         // Construct a keyed stream directly without partitionTransformation 
to avoid shuffle.
         return StreamUtils.wrapWithConfigureHandle(
diff --git 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/NonKeyedPartitionStreamImpl.java
 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/NonKeyedPartitionStreamImpl.java
index 82b81147f71..ea747f8c898 100644
--- 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/NonKeyedPartitionStreamImpl.java
+++ 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/NonKeyedPartitionStreamImpl.java
@@ -34,6 +34,7 @@ import 
org.apache.flink.datastream.api.stream.KeyedPartitionStream;
 import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream;
 import org.apache.flink.datastream.api.stream.ProcessConfigurable;
 import org.apache.flink.datastream.impl.ExecutionEnvironmentImpl;
+import org.apache.flink.datastream.impl.attribute.AttributeParser;
 import org.apache.flink.datastream.impl.operators.ProcessOperator;
 import 
org.apache.flink.datastream.impl.operators.TwoInputBroadcastProcessOperator;
 import 
org.apache.flink.datastream.impl.operators.TwoInputNonBroadcastProcessOperator;
@@ -75,6 +76,7 @@ public class NonKeyedPartitionStreamImpl<T> extends 
AbstractDataStream<T>
         ProcessOperator<T, OUT> operator = new 
ProcessOperator<>(processFunction);
         OneInputTransformation<T, OUT> outputTransform =
                 StreamUtils.getOneInputTransformation("Process", this, 
outType, operator);
+        
outputTransform.setAttribute(AttributeParser.parseAttribute(processFunction));
         environment.addOperator(outputTransform);
         return StreamUtils.wrapWithConfigureHandle(
                 new NonKeyedPartitionStreamImpl<>(environment, 
outputTransform));
@@ -101,6 +103,7 @@ public class NonKeyedPartitionStreamImpl<T> extends 
AbstractDataStream<T>
         OneInputTransformation<T, OUT1> outTransformation =
                 StreamUtils.getOneInputTransformation(
                         "Two-Output-Operator", this, firstOutputType, 
operator);
+        
outTransformation.setAttribute(AttributeParser.parseAttribute(processFunction));
         NonKeyedPartitionStreamImpl<OUT1> firstStream =
                 new NonKeyedPartitionStreamImpl<>(environment, 
outTransformation);
         NonKeyedPartitionStreamImpl<OUT2> secondStream =
@@ -141,6 +144,7 @@ public class NonKeyedPartitionStreamImpl<T> extends 
AbstractDataStream<T>
                         (NonKeyedPartitionStreamImpl<T_OTHER>) other,
                         outTypeInfo,
                         processOperator);
+        
outTransformation.setAttribute(AttributeParser.parseAttribute(processFunction));
         environment.addOperator(outTransformation);
         return StreamUtils.wrapWithConfigureHandle(
                 new NonKeyedPartitionStreamImpl<>(environment, 
outTransformation));
@@ -170,6 +174,7 @@ public class NonKeyedPartitionStreamImpl<T> extends 
AbstractDataStream<T>
                         (BroadcastStreamImpl<T_OTHER>) other,
                         outTypeInfo,
                         processOperator);
+        
outTransformation.setAttribute(AttributeParser.parseAttribute(processFunction));
         environment.addOperator(outTransformation);
         return StreamUtils.wrapWithConfigureHandle(
                 new NonKeyedPartitionStreamImpl<>(environment, 
outTransformation));
diff --git 
a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/attribute/StreamingJobGraphGeneratorWithAttributeTest.java
 
b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/attribute/StreamingJobGraphGeneratorWithAttributeTest.java
new file mode 100644
index 00000000000..73a79dd3d9f
--- /dev/null
+++ 
b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/attribute/StreamingJobGraphGeneratorWithAttributeTest.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.impl.attribute;
+
+import org.apache.flink.api.connector.dsv2.DataStreamV2SourceUtils;
+import org.apache.flink.api.connector.dsv2.WrappedSink;
+import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.datastream.api.ExecutionEnvironment;
+import org.apache.flink.datastream.api.attribute.NoOutputUntilEndOfInput;
+import org.apache.flink.datastream.api.common.Collector;
+import org.apache.flink.datastream.api.context.PartitionedContext;
+import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction;
+import org.apache.flink.datastream.api.function.TwoOutputStreamProcessFunction;
+import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream;
+import org.apache.flink.datastream.impl.ExecutionEnvironmentImpl;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.streaming.api.graph.StreamNode;
+import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link StreamingJobGraphGenerator} with different attributes. */
+class StreamingJobGraphGeneratorWithAttributeTest {
+
+    @Test
+    void testNoOutputUntilEndOfInputWithOperatorChainCase1() throws Exception {
+        ExecutionEnvironmentImpl env =
+                (ExecutionEnvironmentImpl) ExecutionEnvironment.getInstance();
+        NonKeyedPartitionStream<Integer> source =
+                env.fromSource(
+                        DataStreamV2SourceUtils.fromData(Arrays.asList(1, 2, 
3)), "test-source");
+        source.keyBy(x -> x)
+                .process(new NoOutputUntilEndOfInputMapTask())
+                .withParallelism(2)
+                .process(new TestMapTask())
+                .withParallelism(2)
+                .toSink(new WrappedSink<>(new DiscardingSink<>()))
+                .withParallelism(3);
+        StreamGraph streamGraph = env.getStreamGraph();
+        Map<String, StreamNode> nodeMap = new HashMap<>();
+        for (StreamNode node : streamGraph.getStreamNodes()) {
+            nodeMap.put(node.getOperatorName(), node);
+        }
+        assertThat(nodeMap).hasSize(4);
+        JobGraph jobGraph = 
StreamingJobGraphGenerator.createJobGraph(streamGraph);
+        Map<String, JobVertex> vertexMap = new HashMap<>();
+        for (JobVertex vertex : jobGraph.getVertices()) {
+            vertexMap.put(vertex.getName(), vertex);
+        }
+        assertThat(vertexMap).hasSize(3);
+        assertHasOutputPartitionType(
+                vertexMap.get("Source: Collection Source"), 
ResultPartitionType.PIPELINED_BOUNDED);
+        assertHasOutputPartitionType(
+                vertexMap.get("KeyedProcess -> Process"), 
ResultPartitionType.BLOCKING);
+        assertThat(vertexMap.get("Source: Collection 
Source").isAnyOutputBlocking()).isFalse();
+        assertThat(vertexMap.get("KeyedProcess -> 
Process").isAnyOutputBlocking()).isTrue();
+        assertThat(vertexMap.get("Sink: 
Writer").isAnyOutputBlocking()).isFalse();
+    }
+
+    @Test
+    void testPropagateAlongOperatorChain() throws Exception {
+        ExecutionEnvironmentImpl env =
+                (ExecutionEnvironmentImpl) ExecutionEnvironment.getInstance();
+        NonKeyedPartitionStream<Integer> source =
+                env.fromSource(
+                        DataStreamV2SourceUtils.fromData(Arrays.asList(1, 2, 
3)), "test-source");
+        source.keyBy(x -> x)
+                .process(new TestMapTask())
+                .withParallelism(2)
+                .process(new NoOutputUntilEndOfInputMapTask())
+                .withParallelism(2)
+                .toSink(new WrappedSink<>(new DiscardingSink<>()))
+                .withParallelism(3);
+        StreamGraph streamGraph = env.getStreamGraph();
+        Map<String, StreamNode> nodeMap = new HashMap<>();
+        for (StreamNode node : streamGraph.getStreamNodes()) {
+            nodeMap.put(node.getOperatorName(), node);
+        }
+        assertThat(nodeMap).hasSize(4);
+        JobGraph jobGraph = 
StreamingJobGraphGenerator.createJobGraph(streamGraph);
+        Map<String, JobVertex> vertexMap = new HashMap<>();
+        for (JobVertex vertex : jobGraph.getVertices()) {
+            vertexMap.put(vertex.getName(), vertex);
+        }
+        assertThat(vertexMap).hasSize(3);
+        assertHasOutputPartitionType(
+                vertexMap.get("Source: Collection Source"), 
ResultPartitionType.PIPELINED_BOUNDED);
+        assertHasOutputPartitionType(
+                vertexMap.get("KeyedProcess -> Process"), 
ResultPartitionType.BLOCKING);
+        assertThat(vertexMap.get("Source: Collection 
Source").isAnyOutputBlocking()).isFalse();
+        assertThat(vertexMap.get("KeyedProcess -> 
Process").isAnyOutputBlocking()).isTrue();
+        assertThat(vertexMap.get("Sink: 
Writer").isAnyOutputBlocking()).isFalse();
+    }
+
+    @Test
+    void testTwoOutput() throws Exception {
+        ExecutionEnvironmentImpl env =
+                (ExecutionEnvironmentImpl) ExecutionEnvironment.getInstance();
+        NonKeyedPartitionStream<Integer> source =
+                env.fromSource(
+                        DataStreamV2SourceUtils.fromData(Arrays.asList(1, 2, 
3)), "test-source");
+        
NonKeyedPartitionStream.ProcessConfigurableAndTwoNonKeyedPartitionStream<Integer,
 Integer>
+                twoOutputStream =
+                        source.process(new TestMapTask())
+                                .withParallelism(2)
+                                .process(new TestTwoOutputProcessFunction())
+                                .withParallelism(2);
+        
NonKeyedPartitionStream.ProcessConfigurableAndNonKeyedPartitionStream<Integer> 
firstStream =
+                twoOutputStream.getFirst();
+        
NonKeyedPartitionStream.ProcessConfigurableAndNonKeyedPartitionStream<Integer>
+                secondStream = twoOutputStream.getSecond();
+        firstStream
+                .process(new NoOutputUntilEndOfInputMapTask())
+                .withParallelism(2)
+                .toSink(new WrappedSink<>(new DiscardingSink<>()))
+                .withParallelism(3);
+        secondStream
+                .process(new TestMapTask())
+                .withParallelism(2)
+                .toSink(new WrappedSink<>(new DiscardingSink<>()))
+                .withParallelism(3);
+        StreamGraph streamGraph = env.getStreamGraph();
+        JobGraph jobGraph = 
StreamingJobGraphGenerator.createJobGraph(streamGraph);
+        Map<String, JobVertex> vertexMap = new HashMap<>();
+        for (JobVertex vertex : jobGraph.getVertices()) {
+            vertexMap.put(vertex.getName(), vertex);
+        }
+        assertThat(vertexMap).hasSize(3);
+        assertHasOutputPartitionType(
+                vertexMap.get("Process -> Two-Output-Operator -> (Process, 
Process)"),
+                ResultPartitionType.BLOCKING);
+        assertThat(
+                        vertexMap
+                                .get("Process -> Two-Output-Operator -> 
(Process, Process)")
+                                .isAnyOutputBlocking())
+                .isTrue();
+    }
+
+    @Test
+    void testWithoutOperatorChain() throws Exception {
+        ExecutionEnvironmentImpl env =
+                (ExecutionEnvironmentImpl) ExecutionEnvironment.getInstance();
+        env.getConfiguration().set(PipelineOptions.OPERATOR_CHAINING, false);
+        NonKeyedPartitionStream<Integer> source =
+                env.fromSource(
+                        DataStreamV2SourceUtils.fromData(Arrays.asList(1, 2, 
3)), "test-source");
+        source.keyBy(x -> x)
+                .process(new NoOutputUntilEndOfInputMapTask())
+                .withParallelism(2)
+                .process(new TestMapTask())
+                .withParallelism(2)
+                .toSink(new WrappedSink<>(new DiscardingSink<>()))
+                .withParallelism(3);
+        StreamGraph streamGraph = env.getStreamGraph();
+        Map<String, StreamNode> nodeMap = new HashMap<>();
+        for (StreamNode node : streamGraph.getStreamNodes()) {
+            nodeMap.put(node.getOperatorName(), node);
+        }
+        assertThat(nodeMap).hasSize(4);
+        JobGraph jobGraph = 
StreamingJobGraphGenerator.createJobGraph(streamGraph);
+        Map<String, JobVertex> vertexMap = new HashMap<>();
+        for (JobVertex vertex : jobGraph.getVertices()) {
+            vertexMap.put(vertex.getName(), vertex);
+        }
+        assertThat(vertexMap).hasSize(4);
+        assertHasOutputPartitionType(
+                vertexMap.get("Source: Collection Source"), 
ResultPartitionType.PIPELINED_BOUNDED);
+        assertHasOutputPartitionType(vertexMap.get("KeyedProcess"), 
ResultPartitionType.BLOCKING);
+        assertHasOutputPartitionType(
+                vertexMap.get("Process"), 
ResultPartitionType.PIPELINED_BOUNDED);
+        assertThat(vertexMap.get("Source: Collection 
Source").isAnyOutputBlocking()).isFalse();
+        
assertThat(vertexMap.get("KeyedProcess").isAnyOutputBlocking()).isTrue();
+        assertThat(vertexMap.get("Process").isAnyOutputBlocking()).isFalse();
+        assertThat(vertexMap.get("Sink: 
Writer").isAnyOutputBlocking()).isFalse();
+    }
+
+    private void assertHasOutputPartitionType(
+            JobVertex jobVertex, ResultPartitionType partitionType) {
+        
assertThat(jobVertex.getProducedDataSets().get(0).getResultType()).isEqualTo(partitionType);
+    }
+
+    @NoOutputUntilEndOfInput
+    private static class NoOutputUntilEndOfInputMapTask
+            implements OneInputStreamProcessFunction<Integer, Integer> {
+
+        @Override
+        public void processRecord(
+                Integer record, Collector<Integer> output, PartitionedContext 
ctx) {
+            output.collect(record + 1);
+        }
+    }
+
+    private static class TestMapTask implements 
OneInputStreamProcessFunction<Integer, Integer> {
+
+        @Override
+        public void processRecord(
+                Integer record, Collector<Integer> output, PartitionedContext 
ctx) {
+            if (record != 2) {
+                output.collect(record + 1);
+            }
+        }
+    }
+
+    /** The test {@link TwoOutputStreamProcessFunction}. */
+    private static class TestTwoOutputProcessFunction
+            implements TwoOutputStreamProcessFunction<Integer, Integer, 
Integer> {
+
+        @Override
+        public void processRecord(
+                Integer record,
+                Collector<Integer> output1,
+                Collector<Integer> output2,
+                PartitionedContext ctx) {
+            output1.collect(record + 1);
+            output2.collect(record - 1);
+        }
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
index e56ffdbf10a..aa466e16b39 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
@@ -19,6 +19,7 @@ package org.apache.flink.streaming.api.graph;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.attribute.Attribute;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.configuration.CheckpointingOptions;
@@ -119,6 +120,9 @@ public class StreamConfig implements Serializable {
     private static final String TIME_CHARACTERISTIC = "timechar";
 
     private static final String MANAGED_MEMORY_FRACTION_PREFIX = 
"managedMemFraction.";
+
+    private static final String ATTRIBUTE = "attribute";
+
     private static final ConfigOption<Boolean> 
STATE_BACKEND_USE_MANAGED_MEMORY =
             ConfigOptions.key("statebackend.useManagedMemory")
                     .booleanType()
@@ -800,6 +804,20 @@ public class StreamConfig implements Serializable {
         return config.getBoolean(GRAPH_CONTAINING_LOOPS, false);
     }
 
+    public void setAttribute(Attribute attribute) {
+        if (attribute != null) {
+            toBeSerializedConfigObjects.put(ATTRIBUTE, attribute);
+        }
+    }
+
+    public Attribute getAttribute(ClassLoader cl) {
+        try {
+            return InstantiationUtil.readObjectFromConfig(this.config, 
ATTRIBUTE, cl);
+        } catch (Exception e) {
+            throw new StreamTaskException("Could not instantiate checkpoint 
storage.", e);
+        }
+    }
+
     /**
      * In general, we don't clear any configuration. However, the {@link 
#SERIALIZED_UDF} may be
      * very large when operator includes some large objects, the 
SERIALIZED_UDF is used to create a
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index fc794022bf8..0823ed29e32 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -21,6 +21,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.attribute.Attribute;
 import org.apache.flink.api.common.cache.DistributedCache;
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.common.io.OutputFormat;
@@ -1085,4 +1086,10 @@ public class StreamGraph implements Pipeline {
             
streamNode.setSupportsConcurrentExecutionAttempts(supportsConcurrentExecutionAttempts);
         }
     }
+
+    public void setAttribute(Integer vertexId, Attribute attribute) {
+        if (getStreamNode(vertexId) != null) {
+            getStreamNode(vertexId).setAttribute(attribute);
+        }
+    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
index d86cec1059e..f47f1814332 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.graph;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.attribute.Attribute;
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.common.io.OutputFormat;
 import org.apache.flink.api.common.operators.ResourceSpec;
@@ -98,6 +99,8 @@ public class StreamNode {
 
     private boolean parallelismConfigured = false;
 
+    private Attribute attribute = new Attribute.Builder().build();
+
     @VisibleForTesting
     public StreamNode(
             Integer id,
@@ -189,6 +192,14 @@ public class StreamNode {
         return id;
     }
 
+    public void setAttribute(Attribute attribute) {
+        this.attribute = attribute;
+    }
+
+    public Attribute getAttribute() {
+        return attribute;
+    }
+
     public int getParallelism() {
         return parallelism;
     }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 7b0a6cf90b4..358c62d8610 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.common.attribute.Attribute;
 import org.apache.flink.api.common.cache.DistributedCache;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.operators.ResourceSpec;
@@ -187,9 +188,6 @@ public class StreamingJobGraphGenerator {
 
     private final Map<Integer, InputOutputFormatContainer> 
chainedInputOutputFormats;
 
-    // the ids of nodes whose output result partition type should be set to 
BLOCKING
-    private final Set<Integer> outputBlockingNodesID;
-
     private final StreamGraphHasher defaultStreamGraphHasher;
     private final List<StreamGraphHasher> legacyStreamGraphHashers;
 
@@ -230,7 +228,6 @@ public class StreamingJobGraphGenerator {
         this.chainedMinResources = new HashMap<>();
         this.chainedPreferredResources = new HashMap<>();
         this.chainedInputOutputFormats = new HashMap<>();
-        this.outputBlockingNodesID = new HashSet<>();
         this.physicalEdgesInOrder = new ArrayList<>();
         this.serializationExecutor = 
Preconditions.checkNotNull(serializationExecutor);
         this.chainInfos = new HashMap<>();
@@ -682,10 +679,12 @@ public class StreamingJobGraphGenerator {
             List<StreamEdge> nonChainableOutputs = new ArrayList<StreamEdge>();
 
             StreamNode currentNode = streamGraph.getStreamNode(currentNodeId);
-
-            boolean isOutputOnlyAfterEndOfStream = 
currentNode.isOutputOnlyAfterEndOfStream();
-            if (isOutputOnlyAfterEndOfStream) {
-                outputBlockingNodesID.add(currentNode.getId());
+            Attribute currentNodeAttribute = currentNode.getAttribute();
+            boolean isNoOutputUntilEndOfInput =
+                    currentNode.isOutputOnlyAfterEndOfStream()
+                            || 
currentNodeAttribute.isNoOutputUntilEndOfInput();
+            if (isNoOutputUntilEndOfInput) {
+                currentNodeAttribute.setNoOutputUntilEndOfInput(true);
             }
 
             for (StreamEdge outEdge : currentNode.getOutEdges()) {
@@ -697,9 +696,12 @@ public class StreamingJobGraphGenerator {
             }
 
             for (StreamEdge chainable : chainableOutputs) {
-                // Mark downstream nodes in the same chain as outputBlocking
-                if (isOutputOnlyAfterEndOfStream) {
-                    outputBlockingNodesID.add(chainable.getTargetId());
+                StreamNode targetNode = 
streamGraph.getStreamNode(chainable.getTargetId());
+                Attribute targetNodeAttribute = targetNode.getAttribute();
+                if (isNoOutputUntilEndOfInput) {
+                    if (targetNodeAttribute != null) {
+                        targetNodeAttribute.setNoOutputUntilEndOfInput(true);
+                    }
                 }
                 transitiveOutEdges.addAll(
                         createChain(
@@ -708,8 +710,9 @@ public class StreamingJobGraphGenerator {
                                 chainInfo,
                                 chainEntryPoints));
                 // Mark upstream nodes in the same chain as outputBlocking
-                if (outputBlockingNodesID.contains(chainable.getTargetId())) {
-                    outputBlockingNodesID.add(currentNodeId);
+                if (targetNodeAttribute != null
+                        && targetNodeAttribute.isNoOutputUntilEndOfInput()) {
+                    currentNodeAttribute.setNoOutputUntilEndOfInput(true);
                 }
             }
 
@@ -757,7 +760,7 @@ public class StreamingJobGraphGenerator {
                             : new StreamConfig(new Configuration());
 
             tryConvertPartitionerForDynamicGraph(chainableOutputs, 
nonChainableOutputs);
-
+            config.setAttribute(currentNodeAttribute);
             setOperatorConfig(currentNodeId, config, 
chainInfo.getChainedSources());
 
             setOperatorChainedOutputsConfig(config, chainableOutputs);
@@ -1504,7 +1507,9 @@ public class StreamingJobGraphGenerator {
     }
 
     private ResultPartitionType 
determineUndefinedResultPartitionType(StreamEdge edge) {
-        if (outputBlockingNodesID.contains(edge.getSourceId())) {
+        Attribute sourceNodeAttribute =
+                streamGraph.getStreamNode(edge.getSourceId()).getAttribute();
+        if (sourceNodeAttribute.isNoOutputUntilEndOfInput()) {
             
edge.setBufferTimeout(ExecutionOptions.DISABLED_NETWORK_BUFFER_TIMEOUT);
             return ResultPartitionType.BLOCKING;
         }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/AbstractOneInputTransformationTranslator.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/AbstractOneInputTransformationTranslator.java
index 54cfda86339..3f5269c97a7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/AbstractOneInputTransformationTranslator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/AbstractOneInputTransformationTranslator.java
@@ -69,7 +69,7 @@ abstract class AbstractOneInputTransformationTranslator<IN, 
OUT, OP extends Tran
                 inputType,
                 transformation.getOutputType(),
                 transformation.getName());
-
+        streamGraph.setAttribute(transformationId, 
transformation.getAttribute());
         if (stateKeySelector != null) {
             TypeSerializer<?> keySerializer =
                     
stateKeyType.createSerializer(executionConfig.getSerializerConfig());
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/AbstractTwoInputTransformationTranslator.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/AbstractTwoInputTransformationTranslator.java
index 134c10ab806..653c1f43024 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/AbstractTwoInputTransformationTranslator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/AbstractTwoInputTransformationTranslator.java
@@ -75,7 +75,7 @@ public abstract class 
AbstractTwoInputTransformationTranslator<
                 secondInputTransformation.getOutputType(),
                 transformation.getOutputType(),
                 transformation.getName());
-
+        streamGraph.setAttribute(transformationId, 
transformation.getAttribute());
         if (firstKeySelector != null || secondKeySelector != null) {
             checkState(
                     keyTypeInfo != null,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/MultiInputTransformationTranslator.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/MultiInputTransformationTranslator.java
index be2980f3ab6..c52ade1d638 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/MultiInputTransformationTranslator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/MultiInputTransformationTranslator.java
@@ -96,7 +96,7 @@ public class MultiInputTransformationTranslator<OUT>
                 transformation.getInputTypes(),
                 transformation.getOutputType(),
                 transformation.getName());
-
+        streamGraph.setAttribute(transformationId, 
transformation.getAttribute());
         final int parallelism =
                 transformation.getParallelism() != 
ExecutionConfig.PARALLELISM_DEFAULT
                         ? transformation.getParallelism()
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/OneInputTransformationTranslator.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/OneInputTransformationTranslator.java
index 6d7ae8103f1..e17941d6551 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/OneInputTransformationTranslator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/OneInputTransformationTranslator.java
@@ -67,7 +67,8 @@ public final class OneInputTransformationTranslator<IN, OUT>
                         transformation.getStateKeyType(),
                         context);
 
-        if (transformation.isOutputOnlyAfterEndOfStream()) {
+        if (transformation.isOutputOnlyAfterEndOfStream()
+                || transformation.getAttribute().isNoOutputUntilEndOfInput()) {
             maybeApplyBatchExecutionSettings(transformation, context);
         }
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorWithOperatorAttributesTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorWithOperatorAttributesTest.java
index 94c9e2ec173..d710bbe2b13 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorWithOperatorAttributesTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorWithOperatorAttributesTest.java
@@ -49,8 +49,9 @@ import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for {@link StreamingJobGraphGenerator} with internal sorter. */
 public class StreamingJobGraphGeneratorWithOperatorAttributesTest {
+
     @Test
-    void testOutputOnlyAfterEndOfStream() {
+    void testOutputOnlyAfterEndOfStreamEnableChain() {
         final StreamExecutionEnvironment env =
                 StreamExecutionEnvironment.getExecutionEnvironment(new 
Configuration());
 
@@ -97,10 +98,46 @@ public class 
StreamingJobGraphGeneratorWithOperatorAttributesTest {
         assertThat(vertexMap.get("Source: 
source").isAnyOutputBlocking()).isFalse();
         assertThat(vertexMap.get("transform -> 
Map").isAnyOutputBlocking()).isTrue();
         assertThat(vertexMap.get("sink: 
Writer").isAnyOutputBlocking()).isFalse();
+    }
+
+    @Test
+    void testOutputOnlyAfterEndOfStreamDisableChain() {
+        final StreamExecutionEnvironment env =
+                StreamExecutionEnvironment.getExecutionEnvironment(new 
Configuration());
+
+        final DataStream<Integer> source = env.fromData(1, 2, 
3).name("source");
+        source.keyBy(x -> x)
+                .transform(
+                        "transform",
+                        Types.INT,
+                        new StreamOperatorWithConfigurableOperatorAttributes<>(
+                                x -> x,
+                                new OperatorAttributesBuilder()
+                                        .setOutputOnlyAfterEndOfStream(true)
+                                        .build()))
+                .map(x -> x)
+                .sinkTo(new DiscardingSink<>())
+                .disableChaining()
+                .name("sink");
+
+        final StreamGraph streamGraph = env.getStreamGraph(false);
+        Map<String, StreamNode> nodeMap = new HashMap<>();
+        for (StreamNode node : streamGraph.getStreamNodes()) {
+            nodeMap.put(node.getOperatorName(), node);
+        }
+        assertThat(nodeMap).hasSize(4);
+        assertThat(nodeMap.get("Source: 
source").isOutputOnlyAfterEndOfStream()).isFalse();
+        
assertThat(nodeMap.get("transform").isOutputOnlyAfterEndOfStream()).isTrue();
+        
assertThat(nodeMap.get("Map").isOutputOnlyAfterEndOfStream()).isFalse();
+        assertThat(nodeMap.get("sink: 
Writer").isOutputOnlyAfterEndOfStream()).isFalse();
+        assertManagedMemoryWeightsSize(nodeMap.get("Source: source"), 0);
+        assertManagedMemoryWeightsSize(nodeMap.get("transform"), 1);
+        assertManagedMemoryWeightsSize(nodeMap.get("Map"), 0);
+        assertManagedMemoryWeightsSize(nodeMap.get("sink: Writer"), 0);
 
         env.disableOperatorChaining();
-        jobGraph = 
StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph(false));
-        vertexMap = new HashMap<>();
+        JobGraph jobGraph = 
StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph(false));
+        Map<String, JobVertex> vertexMap = new HashMap<>();
         for (JobVertex vertex : jobGraph.getVertices()) {
             vertexMap.put(vertex.getName(), vertex);
         }
@@ -115,6 +152,56 @@ public class 
StreamingJobGraphGeneratorWithOperatorAttributesTest {
         assertThat(vertexMap.get("sink: 
Writer").isAnyOutputBlocking()).isFalse();
     }
 
+    @Test
+    void testOutputOnlyAfterEndOfStreamPropagateToUpstreamWithinChain() {
+        final StreamExecutionEnvironment env =
+                StreamExecutionEnvironment.getExecutionEnvironment(new 
Configuration());
+
+        final DataStream<Integer> source = env.fromData(1, 2, 
3).name("source");
+        source.keyBy(x -> x)
+                .map(x -> x)
+                .transform(
+                        "transform",
+                        Types.INT,
+                        new StreamOperatorWithConfigurableOperatorAttributes<>(
+                                x -> x,
+                                new OperatorAttributesBuilder()
+                                        .setOutputOnlyAfterEndOfStream(true)
+                                        .build()))
+                .sinkTo(new DiscardingSink<>())
+                .disableChaining()
+                .name("sink");
+
+        final StreamGraph streamGraph = env.getStreamGraph(false);
+        Map<String, StreamNode> nodeMap = new HashMap<>();
+        for (StreamNode node : streamGraph.getStreamNodes()) {
+            nodeMap.put(node.getOperatorName(), node);
+        }
+        assertThat(nodeMap).hasSize(4);
+        assertThat(nodeMap.get("Source: 
source").isOutputOnlyAfterEndOfStream()).isFalse();
+        
assertThat(nodeMap.get("transform").isOutputOnlyAfterEndOfStream()).isTrue();
+        
assertThat(nodeMap.get("Map").isOutputOnlyAfterEndOfStream()).isFalse();
+        assertThat(nodeMap.get("sink: 
Writer").isOutputOnlyAfterEndOfStream()).isFalse();
+        assertManagedMemoryWeightsSize(nodeMap.get("Source: source"), 0);
+        assertManagedMemoryWeightsSize(nodeMap.get("Map"), 0);
+        assertManagedMemoryWeightsSize(nodeMap.get("transform"), 0);
+        assertManagedMemoryWeightsSize(nodeMap.get("sink: Writer"), 0);
+
+        JobGraph jobGraph = 
StreamingJobGraphGenerator.createJobGraph(streamGraph);
+        Map<String, JobVertex> vertexMap = new HashMap<>();
+        for (JobVertex vertex : jobGraph.getVertices()) {
+            vertexMap.put(vertex.getName(), vertex);
+        }
+        assertThat(vertexMap).hasSize(3);
+        assertHasOutputPartitionType(
+                vertexMap.get("Source: source"), 
ResultPartitionType.PIPELINED_BOUNDED);
+        assertHasOutputPartitionType(
+                vertexMap.get("Map -> transform"), 
ResultPartitionType.BLOCKING);
+        assertThat(vertexMap.get("Source: 
source").isAnyOutputBlocking()).isFalse();
+        assertThat(vertexMap.get("Map -> 
transform").isAnyOutputBlocking()).isTrue();
+        assertThat(vertexMap.get("sink: 
Writer").isAnyOutputBlocking()).isFalse();
+    }
+
     @Test
     void testApplyBatchExecutionSettingsOnTwoInputOperator() {
         final StreamExecutionEnvironment env =
@@ -146,10 +233,6 @@ public class 
StreamingJobGraphGeneratorWithOperatorAttributesTest {
         assertManagedMemoryWeightsSize(nodeMap.get("sink: Writer"), 0);
     }
 
-    private static void assertManagedMemoryWeightsSize(StreamNode node, int 
weightSize) {
-        
assertThat(node.getManagedMemoryOperatorScopeUseCaseWeights()).hasSize(weightSize);
-    }
-
     @Test
     void testOneInputOperatorWithInternalSorterSupported() {
         final StreamExecutionEnvironment env =
@@ -361,6 +444,10 @@ public class 
StreamingJobGraphGeneratorWithOperatorAttributesTest {
                 IN2 value, CoProcessFunction<IN1, IN2, OUT>.Context ctx, 
Collector<OUT> out) {}
     }
 
+    private void assertManagedMemoryWeightsSize(StreamNode node, int 
weightSize) {
+        
assertThat(node.getManagedMemoryOperatorScopeUseCaseWeights()).hasSize(weightSize);
+    }
+
     private void assertHasOutputPartitionType(
             JobVertex jobVertex, ResultPartitionType partitionType) {
         
assertThat(jobVertex.getProducedDataSets().get(0).getResultType()).isEqualTo(partitionType);

Reply via email to