pnowojski commented on a change in pull request #13109:
URL: https://github.com/apache/flink/pull/13109#discussion_r468389517



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
##########
@@ -160,7 +160,7 @@ public void setup(StreamTask<?, ?> containingTask, 
StreamConfig config, Output<S
                        if (config.isChainStart()) {

Review comment:
       Can you rename the commit to:
   ```
   [FLINK-18808][runtime/metrics] Include side outputs in numRecordsOut metric.
   
   Before side outputs were ignored by the numRecordsOut metric.
   ```
   ?

##########
File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
##########
@@ -41,13 +41,17 @@
 import org.apache.flink.streaming.api.functions.co.CoMapFunction;

Review comment:
       If you are adding the test for both one and two inputs, I think you 
should also modify the `MultipleInputStreamTaskTest#testOperatorMetricReuse` 
test as well.

##########
File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java
##########
@@ -121,6 +124,91 @@ private void head(OperatorID headOperatorID) {
                        TypeSerializer<IN> inputSerializer,
                        TypeSerializer<OUT> outputSerializer,
                        boolean createKeyedStateBackend) {
+               return chain(operatorID,
+                       operatorFactory,
+                       inputSerializer,
+                       outputSerializer,
+                       createKeyedStateBackend,
+                       null);
+       }
+
+       public <T> StreamConfigChainer<OWNER> chainWithNonChainedOutputs(
+               OperatorID operatorID,

Review comment:
       nit: (in many places) according to our [coding 
style](https://flink.apache.org/contributing/code-style-and-quality-formatting.html#breaking-the-lines-of-too-long-statements)
 parameters should be double intended 

##########
File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java
##########
@@ -121,6 +124,91 @@ private void head(OperatorID headOperatorID) {
                        TypeSerializer<IN> inputSerializer,
                        TypeSerializer<OUT> outputSerializer,
                        boolean createKeyedStateBackend) {
+               return chain(operatorID,
+                       operatorFactory,
+                       inputSerializer,
+                       outputSerializer,
+                       createKeyedStateBackend,
+                       null);
+       }
+
+       public <T> StreamConfigChainer<OWNER> chainWithNonChainedOutputs(
+               OperatorID operatorID,
+               OneInputStreamOperator<T, T> operator,
+               TypeSerializer<T> typeSerializer,
+               boolean createKeyedStateBackend,
+               List<StreamEdge> nonChainedOutputs) {
+               return chainWithNonChainedOutputs(operatorID, operator, 
typeSerializer, typeSerializer, createKeyedStateBackend, nonChainedOutputs);
+       }
+
+       public <T> StreamConfigChainer<OWNER> chainWithNonChainedOutputs(
+               OneInputStreamOperator<T, T> operator,
+               TypeSerializer<T> typeSerializer,
+               List<StreamEdge> nonChainedOutputs) {
+               return chainWithNonChainedOutputs(new OperatorID(), operator, 
typeSerializer, nonChainedOutputs);
+       }
+
+       public <T> StreamConfigChainer<OWNER> chainWithNonChainedOutputs(
+               OperatorID operatorID,
+               OneInputStreamOperator<T, T> operator,
+               TypeSerializer<T> typeSerializer,
+               List<StreamEdge> nonChainedOutputs) {
+               return chainWithNonChainedOutputs(operatorID, operator, 
typeSerializer, typeSerializer, false, nonChainedOutputs);
+       }
+
+       public <T> StreamConfigChainer<OWNER> chainWithNonChainedOutputs(
+               OneInputStreamOperatorFactory<T, T> operatorFactory,
+               TypeSerializer<T> typeSerializer,
+               List<StreamEdge> nonChainedOutputs) {
+               return chainWithNonChainedOutputs(new OperatorID(), 
operatorFactory, typeSerializer, nonChainedOutputs);
+       }
+
+       public <T> StreamConfigChainer<OWNER> chainWithNonChainedOutputs(
+               OperatorID operatorID,
+               OneInputStreamOperatorFactory<T, T> operatorFactory,
+               TypeSerializer<T> typeSerializer,
+               List<StreamEdge> nonChainedOutputs) {
+               return chainWithNonChainedOutputs(operatorID, operatorFactory, 
typeSerializer, typeSerializer, false, nonChainedOutputs);
+       }
+
+       private <IN, OUT> StreamConfigChainer<OWNER> chainWithNonChainedOutputs(

Review comment:
       I think this builder class is growing a bit too big with too many 
overloaded methods. I think we need to split it into two step builder pattern. 
   
   Instead of providing many overloaded variants of `chain` and 
`chainWithNonChainedOutputs` (and maybe more in the future), I think it would 
be better to have a single `chain` method with just the obligatory parameters
   ```                  
   StreamConfigEdgeChainer chain(TypeSerializer<T> typeSerializer);
   ```
   (?)
   that returns `StreamConfigEdgeChainer` (different name?). 
`StreamConfigEdgeChainer` would be the 2nd level builder allowing to set more 
optional parameters like operator factory, nonChainedOutputs, 
createKeyedStateBackend, .....
   
   After calling `StreamConfigEdgeChainer.build()` it would return again the 
first level builder `StreamConfigChainer`.
   
   
   ```
   testHarness
     .chain(typeSerializer1)
       .setOperatorID(...)
       .setOutputTypeSerializer(...)
       .build()
     .chain(typeSerializer2)
       .setNonChainedOutputsCount(2)
       .build()
     .finish();
   ```
   WDYT?
   
   (for the sake of backward compatbility the existing `chain` methods could be 
left as they are (`@Deprecated`), so that we don't have to modify all of the 
existing tests.

##########
File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
##########
@@ -648,8 +649,16 @@ public void close() throws Exception {
        public void testOperatorMetricReuse() throws Exception {
                final OneInputStreamTaskTestHarness<String, String> testHarness 
= new OneInputStreamTaskTestHarness<>(OneInputStreamTask::new, 
BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
 
+               StreamEdge edge = new StreamEdge(
+                       new StreamNode(2, null, null, (StreamOperator<?>) null, 
null, null, null),
+                       new StreamNode(4 , null, null, (StreamOperator<?>) 
null, null, null, null),
+                       0,
+                       Collections.<String>emptyList(),
+                       new BroadcastPartitioner<Object>(),
+                       null);
+

Review comment:
       The whole idea of `StreamConfigChainer` is to avoid those scary manual 
creations of `StreamEdges` - this edge should be created automatically by the 
`StreamConfigChainer`.
   
   For example user of the `StreamConfigChainer` would just specify how many 
non chained ouput edges there should be, and the `StreamConfigChainer` would 
create those edges automatically.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to