gaoyunhaii commented on a change in pull request #17938:
URL: https://github.com/apache/flink/pull/17938#discussion_r769772550



##########
File path: 
flink-core/src/main/java/org/apache/flink/configuration/PipelineOptions.java
##########
@@ -259,4 +259,19 @@
                                             TextElement.code(
                                                     
"name:file1,path:`file:///tmp/file1`;name:file2,path:`hdfs:///tmp/file2`"))
                                     .build());
+
+    public static final ConfigOption<VertexDescriptionMode> 
VERTEX_DESCRIPTION_MODE =
+            key("pipeline.vertex-description-mode")
+                    .enumType(VertexDescriptionMode.class)
+                    .defaultValue(VertexDescriptionMode.TREE)
+                    .withDescription("Mode how we organize description of a 
job vertex.");
+
+    /** the mode how we organize description of a vertex. */
+    @PublicEvolving
+    public enum VertexDescriptionMode {
+        /** Organize the description in a multi line tree mode. */

Review comment:
       `Organize` -> `Organizes` ?

##########
File path: 
flink-core/src/main/java/org/apache/flink/configuration/PipelineOptions.java
##########
@@ -259,4 +259,19 @@
                                             TextElement.code(
                                                     
"name:file1,path:`file:///tmp/file1`;name:file2,path:`hdfs:///tmp/file2`"))
                                     .build());
+
+    public static final ConfigOption<VertexDescriptionMode> 
VERTEX_DESCRIPTION_MODE =
+            key("pipeline.vertex-description-mode")
+                    .enumType(VertexDescriptionMode.class)
+                    .defaultValue(VertexDescriptionMode.TREE)
+                    .withDescription("Mode how we organize description of a 
job vertex.");
+
+    /** the mode how we organize description of a vertex. */

Review comment:
       `the mode` -> `The mode` ? 

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
##########
@@ -212,9 +213,125 @@ private JobGraph createJobGraph() {
                             + "This indicates that non-serializable types 
(like custom serializers) were registered");
         }
 
+        setVertexDescription();
+
         return jobGraph;
     }
 
+    private void setVertexDescription() {
+        for (Map.Entry<Integer, JobVertex> headOpAndJobVertex : 
jobVertices.entrySet()) {
+            Integer headOpId = headOpAndJobVertex.getKey();
+            JobVertex vertex = headOpAndJobVertex.getValue();
+            StringBuilder builder = new StringBuilder();
+            switch (streamGraph.getVertexDescriptionMode()) {
+                case CASCADING:
+                    getCascadingDescription(builder, headOpId, headOpId);
+                    break;
+                case TREE:
+                    getTreeDescription(builder, headOpId, headOpId, "", true);
+                    break;
+                default:
+                    throw new IllegalArgumentException(
+                            String.format(
+                                    "Description mode %s not supported",
+                                    streamGraph.getVertexDescriptionMode()));
+            }
+            vertex.setOperatorPrettyName(builder.toString());
+        }
+    }
+
+    private void getCascadingDescription(StringBuilder builder, int headOpId, 
int currentOpId) {
+        StreamNode node = streamGraph.getStreamNode(currentOpId);
+        builder.append(descWithChainedSourcesInfo(node));
+
+        LinkedList<Integer> chainedOutput = getChainedOutputNodes(headOpId, 
node);
+        if (chainedOutput.isEmpty()) {
+            return;
+        }
+        builder.append(" -> ");
+
+        boolean multiOutput = chainedOutput.size() > 1;
+        if (multiOutput) {
+            builder.append("(");
+        }
+
+        while (!chainedOutput.isEmpty()) {

Review comment:
       Perhaps we could simplify it a bit to the following ? 
   ```
   while (true) {
        Integer outputId = chainedOutput.pollFirst();
        getCascadingDescription(builder, headOpId, outputId);
        if (chainedOutput.isEmpty){
           break;
       }
       builder.append(" , ");
   }
   
   if (multiOutput) {
        builder.append(")");
   }
   ```

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
##########
@@ -212,9 +213,125 @@ private JobGraph createJobGraph() {
                             + "This indicates that non-serializable types 
(like custom serializers) were registered");
         }
 
+        setVertexDescription();
+
         return jobGraph;
     }
 
+    private void setVertexDescription() {
+        for (Map.Entry<Integer, JobVertex> headOpAndJobVertex : 
jobVertices.entrySet()) {
+            Integer headOpId = headOpAndJobVertex.getKey();
+            JobVertex vertex = headOpAndJobVertex.getValue();
+            StringBuilder builder = new StringBuilder();
+            switch (streamGraph.getVertexDescriptionMode()) {
+                case CASCADING:
+                    getCascadingDescription(builder, headOpId, headOpId);
+                    break;
+                case TREE:
+                    getTreeDescription(builder, headOpId, headOpId, "", true);
+                    break;
+                default:
+                    throw new IllegalArgumentException(
+                            String.format(
+                                    "Description mode %s not supported",
+                                    streamGraph.getVertexDescriptionMode()));
+            }
+            vertex.setOperatorPrettyName(builder.toString());
+        }
+    }
+
+    private void getCascadingDescription(StringBuilder builder, int headOpId, 
int currentOpId) {
+        StreamNode node = streamGraph.getStreamNode(currentOpId);
+        builder.append(descWithChainedSourcesInfo(node));
+
+        LinkedList<Integer> chainedOutput = getChainedOutputNodes(headOpId, 
node);
+        if (chainedOutput.isEmpty()) {
+            return;
+        }
+        builder.append(" -> ");
+
+        boolean multiOutput = chainedOutput.size() > 1;
+        if (multiOutput) {
+            builder.append("(");
+        }
+
+        while (!chainedOutput.isEmpty()) {
+            Integer outputId = chainedOutput.pollFirst();
+            getCascadingDescription(builder, headOpId, outputId);
+            if (chainedOutput.isEmpty() && multiOutput) {
+                builder.append(")");
+            } else if (!chainedOutput.isEmpty()) {
+                builder.append(" , ");
+            }
+        }
+    }
+
+    private LinkedList<Integer> getChainedOutputNodes(int headOpId, StreamNode 
node) {
+        LinkedList<Integer> chainedOutput = new LinkedList<>();
+        for (StreamEdge edge : node.getOutEdges()) {
+            int targetId = edge.getTargetId();
+            if (chainedConfigs.containsKey(headOpId)
+                    && chainedConfigs.get(headOpId).containsKey(targetId)) {
+                chainedOutput.add(targetId);
+            }
+        }
+        return chainedOutput;
+    }
+
+    private void getTreeDescription(
+            StringBuilder builder,
+            int headOpId,
+            int currentOpId,
+            String prefix,
+            boolean parentIsLast) {
+        StreamNode node = streamGraph.getStreamNode(currentOpId);
+        builder.append(prefix);
+        builder.append(descWithChainedSourcesInfo(node));
+        builder.append("\n");
+
+        StringBuilder childPrefixBuilder = new StringBuilder();
+        if (parentIsLast) {
+            childPrefixBuilder.append(prefix.replace('-', ' ').replace('+', ' 
'));
+        } else {
+            childPrefixBuilder.append(prefix.replace('-', ' '));
+        }
+        String childPrefix = childPrefixBuilder.toString();
+
+        LinkedList<Integer> chainedOutput = getChainedOutputNodes(headOpId, 
node);
+        while (!chainedOutput.isEmpty()) {
+            Integer outputId = chainedOutput.pollFirst();
+            String currentPrefix;
+            if (chainedOutput.isEmpty()) {
+                // last
+                currentPrefix = childPrefix + "+- ";
+            } else {
+                currentPrefix = childPrefix + ":- ";
+            }
+            getTreeDescription(builder, headOpId, outputId, currentPrefix, 
chainedOutput.isEmpty());
+        }
+    }
+
+    private String descWithChainedSourcesInfo(StreamNode node) {
+        List<StreamNode> chainedSource =

Review comment:
       `chainedSource` -> `chainedSources` ?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
##########
@@ -212,9 +213,125 @@ private JobGraph createJobGraph() {
                             + "This indicates that non-serializable types 
(like custom serializers) were registered");
         }
 
+        setVertexDescription();
+
         return jobGraph;
     }
 
+    private void setVertexDescription() {
+        for (Map.Entry<Integer, JobVertex> headOpAndJobVertex : 
jobVertices.entrySet()) {
+            Integer headOpId = headOpAndJobVertex.getKey();
+            JobVertex vertex = headOpAndJobVertex.getValue();
+            StringBuilder builder = new StringBuilder();
+            switch (streamGraph.getVertexDescriptionMode()) {
+                case CASCADING:
+                    getCascadingDescription(builder, headOpId, headOpId);
+                    break;
+                case TREE:
+                    getTreeDescription(builder, headOpId, headOpId, "", true);
+                    break;
+                default:
+                    throw new IllegalArgumentException(
+                            String.format(
+                                    "Description mode %s not supported",
+                                    streamGraph.getVertexDescriptionMode()));
+            }
+            vertex.setOperatorPrettyName(builder.toString());
+        }
+    }
+
+    private void getCascadingDescription(StringBuilder builder, int headOpId, 
int currentOpId) {
+        StreamNode node = streamGraph.getStreamNode(currentOpId);
+        builder.append(descWithChainedSourcesInfo(node));
+
+        LinkedList<Integer> chainedOutput = getChainedOutputNodes(headOpId, 
node);
+        if (chainedOutput.isEmpty()) {
+            return;
+        }
+        builder.append(" -> ");
+
+        boolean multiOutput = chainedOutput.size() > 1;
+        if (multiOutput) {
+            builder.append("(");
+        }
+
+        while (!chainedOutput.isEmpty()) {
+            Integer outputId = chainedOutput.pollFirst();
+            getCascadingDescription(builder, headOpId, outputId);
+            if (chainedOutput.isEmpty() && multiOutput) {
+                builder.append(")");
+            } else if (!chainedOutput.isEmpty()) {
+                builder.append(" , ");
+            }
+        }
+    }
+
+    private LinkedList<Integer> getChainedOutputNodes(int headOpId, StreamNode 
node) {
+        LinkedList<Integer> chainedOutput = new LinkedList<>();
+        for (StreamEdge edge : node.getOutEdges()) {
+            int targetId = edge.getTargetId();
+            if (chainedConfigs.containsKey(headOpId)
+                    && chainedConfigs.get(headOpId).containsKey(targetId)) {
+                chainedOutput.add(targetId);
+            }
+        }
+        return chainedOutput;
+    }
+
+    private void getTreeDescription(
+            StringBuilder builder,
+            int headOpId,
+            int currentOpId,
+            String prefix,
+            boolean parentIsLast) {
+        StreamNode node = streamGraph.getStreamNode(currentOpId);
+        builder.append(prefix);
+        builder.append(descWithChainedSourcesInfo(node));
+        builder.append("\n");
+
+        StringBuilder childPrefixBuilder = new StringBuilder();

Review comment:
       Do we need to use `StringBuilder` here? It seems it only append once ? 

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
##########
@@ -212,9 +213,125 @@ private JobGraph createJobGraph() {
                             + "This indicates that non-serializable types 
(like custom serializers) were registered");
         }
 
+        setVertexDescription();
+
         return jobGraph;
     }
 
+    private void setVertexDescription() {
+        for (Map.Entry<Integer, JobVertex> headOpAndJobVertex : 
jobVertices.entrySet()) {
+            Integer headOpId = headOpAndJobVertex.getKey();
+            JobVertex vertex = headOpAndJobVertex.getValue();
+            StringBuilder builder = new StringBuilder();
+            switch (streamGraph.getVertexDescriptionMode()) {
+                case CASCADING:
+                    getCascadingDescription(builder, headOpId, headOpId);
+                    break;
+                case TREE:
+                    getTreeDescription(builder, headOpId, headOpId, "", true);
+                    break;
+                default:
+                    throw new IllegalArgumentException(
+                            String.format(
+                                    "Description mode %s not supported",
+                                    streamGraph.getVertexDescriptionMode()));
+            }
+            vertex.setOperatorPrettyName(builder.toString());
+        }
+    }
+
+    private void getCascadingDescription(StringBuilder builder, int headOpId, 
int currentOpId) {
+        StreamNode node = streamGraph.getStreamNode(currentOpId);
+        builder.append(descWithChainedSourcesInfo(node));
+
+        LinkedList<Integer> chainedOutput = getChainedOutputNodes(headOpId, 
node);
+        if (chainedOutput.isEmpty()) {
+            return;
+        }
+        builder.append(" -> ");
+
+        boolean multiOutput = chainedOutput.size() > 1;
+        if (multiOutput) {
+            builder.append("(");
+        }
+
+        while (!chainedOutput.isEmpty()) {
+            Integer outputId = chainedOutput.pollFirst();
+            getCascadingDescription(builder, headOpId, outputId);
+            if (chainedOutput.isEmpty() && multiOutput) {
+                builder.append(")");
+            } else if (!chainedOutput.isEmpty()) {
+                builder.append(" , ");
+            }
+        }
+    }
+
+    private LinkedList<Integer> getChainedOutputNodes(int headOpId, StreamNode 
node) {
+        LinkedList<Integer> chainedOutput = new LinkedList<>();
+        for (StreamEdge edge : node.getOutEdges()) {
+            int targetId = edge.getTargetId();
+            if (chainedConfigs.containsKey(headOpId)
+                    && chainedConfigs.get(headOpId).containsKey(targetId)) {
+                chainedOutput.add(targetId);
+            }
+        }
+        return chainedOutput;
+    }
+
+    private void getTreeDescription(

Review comment:
       Perhaps we could simplify the method a bit ? 
   
   For the recursion, we pass the parameters 
   
   - precedingSpaces: the spaces before prefix and the description
   - isLast: if the current op is the last of its parent
   
   Then in the method, we could first append
   ```
   precedingSpaces + (isLast ? "+-" : ":-") + description 
   ```
   
   Then for each outputs, we could call with 
   ```
   precedingSpaces = precedingSpaces + "   "
   isLast = chainedOutput.isEmpty()
   ```

##########
File path: 
flink-core/src/main/java/org/apache/flink/configuration/PipelineOptions.java
##########
@@ -259,4 +259,19 @@
                                             TextElement.code(
                                                     
"name:file1,path:`file:///tmp/file1`;name:file2,path:`hdfs:///tmp/file2`"))
                                     .build());
+
+    public static final ConfigOption<VertexDescriptionMode> 
VERTEX_DESCRIPTION_MODE =
+            key("pipeline.vertex-description-mode")
+                    .enumType(VertexDescriptionMode.class)
+                    .defaultValue(VertexDescriptionMode.TREE)
+                    .withDescription("Mode how we organize description of a 
job vertex.");

Review comment:
       The mode?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
##########
@@ -212,9 +213,125 @@ private JobGraph createJobGraph() {
                             + "This indicates that non-serializable types 
(like custom serializers) were registered");
         }
 
+        setVertexDescription();
+
         return jobGraph;
     }
 
+    private void setVertexDescription() {
+        for (Map.Entry<Integer, JobVertex> headOpAndJobVertex : 
jobVertices.entrySet()) {
+            Integer headOpId = headOpAndJobVertex.getKey();
+            JobVertex vertex = headOpAndJobVertex.getValue();
+            StringBuilder builder = new StringBuilder();
+            switch (streamGraph.getVertexDescriptionMode()) {
+                case CASCADING:
+                    getCascadingDescription(builder, headOpId, headOpId);
+                    break;
+                case TREE:
+                    getTreeDescription(builder, headOpId, headOpId, "", true);
+                    break;
+                default:
+                    throw new IllegalArgumentException(
+                            String.format(
+                                    "Description mode %s not supported",
+                                    streamGraph.getVertexDescriptionMode()));
+            }
+            vertex.setOperatorPrettyName(builder.toString());
+        }
+    }
+
+    private void getCascadingDescription(StringBuilder builder, int headOpId, 
int currentOpId) {
+        StreamNode node = streamGraph.getStreamNode(currentOpId);
+        builder.append(descWithChainedSourcesInfo(node));
+
+        LinkedList<Integer> chainedOutput = getChainedOutputNodes(headOpId, 
node);
+        if (chainedOutput.isEmpty()) {
+            return;
+        }
+        builder.append(" -> ");
+
+        boolean multiOutput = chainedOutput.size() > 1;
+        if (multiOutput) {
+            builder.append("(");
+        }
+
+        while (!chainedOutput.isEmpty()) {
+            Integer outputId = chainedOutput.pollFirst();
+            getCascadingDescription(builder, headOpId, outputId);
+            if (chainedOutput.isEmpty() && multiOutput) {
+                builder.append(")");
+            } else if (!chainedOutput.isEmpty()) {
+                builder.append(" , ");
+            }
+        }
+    }
+
+    private LinkedList<Integer> getChainedOutputNodes(int headOpId, StreamNode 
node) {
+        LinkedList<Integer> chainedOutput = new LinkedList<>();
+        for (StreamEdge edge : node.getOutEdges()) {
+            int targetId = edge.getTargetId();
+            if (chainedConfigs.containsKey(headOpId)
+                    && chainedConfigs.get(headOpId).containsKey(targetId)) {
+                chainedOutput.add(targetId);
+            }
+        }
+        return chainedOutput;
+    }
+
+    private void getTreeDescription(
+            StringBuilder builder,
+            int headOpId,
+            int currentOpId,
+            String prefix,
+            boolean parentIsLast) {
+        StreamNode node = streamGraph.getStreamNode(currentOpId);
+        builder.append(prefix);
+        builder.append(descWithChainedSourcesInfo(node));
+        builder.append("\n");
+
+        StringBuilder childPrefixBuilder = new StringBuilder();
+        if (parentIsLast) {
+            childPrefixBuilder.append(prefix.replace('-', ' ').replace('+', ' 
'));
+        } else {
+            childPrefixBuilder.append(prefix.replace('-', ' '));
+        }
+        String childPrefix = childPrefixBuilder.toString();
+
+        LinkedList<Integer> chainedOutput = getChainedOutputNodes(headOpId, 
node);
+        while (!chainedOutput.isEmpty()) {
+            Integer outputId = chainedOutput.pollFirst();
+            String currentPrefix;
+            if (chainedOutput.isEmpty()) {
+                // last
+                currentPrefix = childPrefix + "+- ";
+            } else {
+                currentPrefix = childPrefix + ":- ";
+            }
+            getTreeDescription(builder, headOpId, outputId, currentPrefix, 
chainedOutput.isEmpty());
+        }
+    }
+
+    private String descWithChainedSourcesInfo(StreamNode node) {
+        List<StreamNode> chainedSource =
+                node.getInEdges().stream()
+                        .map(StreamEdge::getSourceId)
+                        .filter(
+                                id ->
+                                        
chainedConfigs.containsKey(node.getId())

Review comment:
       Could we move this condition out of the iteration ? 

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
##########
@@ -212,9 +213,125 @@ private JobGraph createJobGraph() {
                             + "This indicates that non-serializable types 
(like custom serializers) were registered");
         }
 
+        setVertexDescription();
+
         return jobGraph;
     }
 
+    private void setVertexDescription() {
+        for (Map.Entry<Integer, JobVertex> headOpAndJobVertex : 
jobVertices.entrySet()) {
+            Integer headOpId = headOpAndJobVertex.getKey();
+            JobVertex vertex = headOpAndJobVertex.getValue();
+            StringBuilder builder = new StringBuilder();
+            switch (streamGraph.getVertexDescriptionMode()) {
+                case CASCADING:
+                    getCascadingDescription(builder, headOpId, headOpId);
+                    break;
+                case TREE:
+                    getTreeDescription(builder, headOpId, headOpId, "", true);
+                    break;
+                default:
+                    throw new IllegalArgumentException(
+                            String.format(
+                                    "Description mode %s not supported",
+                                    streamGraph.getVertexDescriptionMode()));
+            }
+            vertex.setOperatorPrettyName(builder.toString());
+        }
+    }
+
+    private void getCascadingDescription(StringBuilder builder, int headOpId, 
int currentOpId) {
+        StreamNode node = streamGraph.getStreamNode(currentOpId);
+        builder.append(descWithChainedSourcesInfo(node));
+
+        LinkedList<Integer> chainedOutput = getChainedOutputNodes(headOpId, 
node);
+        if (chainedOutput.isEmpty()) {
+            return;
+        }
+        builder.append(" -> ");
+
+        boolean multiOutput = chainedOutput.size() > 1;
+        if (multiOutput) {
+            builder.append("(");
+        }
+
+        while (!chainedOutput.isEmpty()) {
+            Integer outputId = chainedOutput.pollFirst();
+            getCascadingDescription(builder, headOpId, outputId);
+            if (chainedOutput.isEmpty() && multiOutput) {
+                builder.append(")");
+            } else if (!chainedOutput.isEmpty()) {
+                builder.append(" , ");
+            }
+        }
+    }
+
+    private LinkedList<Integer> getChainedOutputNodes(int headOpId, StreamNode 
node) {
+        LinkedList<Integer> chainedOutput = new LinkedList<>();
+        for (StreamEdge edge : node.getOutEdges()) {
+            int targetId = edge.getTargetId();
+            if (chainedConfigs.containsKey(headOpId)
+                    && chainedConfigs.get(headOpId).containsKey(targetId)) {
+                chainedOutput.add(targetId);
+            }
+        }
+        return chainedOutput;
+    }
+
+    private void getTreeDescription(
+            StringBuilder builder,
+            int headOpId,
+            int currentOpId,
+            String prefix,
+            boolean parentIsLast) {
+        StreamNode node = streamGraph.getStreamNode(currentOpId);
+        builder.append(prefix);
+        builder.append(descWithChainedSourcesInfo(node));
+        builder.append("\n");
+
+        StringBuilder childPrefixBuilder = new StringBuilder();
+        if (parentIsLast) {
+            childPrefixBuilder.append(prefix.replace('-', ' ').replace('+', ' 
'));
+        } else {
+            childPrefixBuilder.append(prefix.replace('-', ' '));
+        }
+        String childPrefix = childPrefixBuilder.toString();
+
+        LinkedList<Integer> chainedOutput = getChainedOutputNodes(headOpId, 
node);
+        while (!chainedOutput.isEmpty()) {
+            Integer outputId = chainedOutput.pollFirst();
+            String currentPrefix;
+            if (chainedOutput.isEmpty()) {
+                // last
+                currentPrefix = childPrefix + "+- ";
+            } else {
+                currentPrefix = childPrefix + ":- ";
+            }
+            getTreeDescription(builder, headOpId, outputId, currentPrefix, 
chainedOutput.isEmpty());
+        }
+    }
+
+    private String descWithChainedSourcesInfo(StreamNode node) {

Review comment:
       `descWithChainedSourcesInfo` -> `getDescriptionWithChainedSources` ?

##########
File path: 
flink-runtime-web/web-dashboard/src/app/pages/job/overview/detail/job-overview-drawer-detail.component.ts
##########
@@ -39,6 +40,12 @@ export class JobOverviewDrawerDetailComponent implements 
OnInit, OnDestroy {
   public ngOnInit(): void {
     
this.jobService.selectedVertex$.pipe(takeUntil(this.destroy$)).subscribe(node 
=> {
       this.node = node;
+      if (this.node != null && this.node.description != null) {
+        if (this.node.description.indexOf('<br/>') > 0) {
+          this.multilineNameCSS = 'name-multi-line';
+          this.node.description = this.node.description.replace(/<br\/>/g, 
'\n');

Review comment:
       Could you elaborate a bit why we need to replace `<br/>` back to `\n`?

##########
File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
##########
@@ -1368,6 +1373,51 @@ public JobGraph createGraphWithMultipleInputs(boolean 
chain, String... inputName
         return StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
     }
 
+    @Test
+    public void testTreeDescription() {

Review comment:
       Could we also add two tests for the vertex with chained sources ? 

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
##########
@@ -212,9 +213,125 @@ private JobGraph createJobGraph() {
                             + "This indicates that non-serializable types 
(like custom serializers) were registered");
         }
 
+        setVertexDescription();
+
         return jobGraph;
     }
 
+    private void setVertexDescription() {
+        for (Map.Entry<Integer, JobVertex> headOpAndJobVertex : 
jobVertices.entrySet()) {
+            Integer headOpId = headOpAndJobVertex.getKey();
+            JobVertex vertex = headOpAndJobVertex.getValue();
+            StringBuilder builder = new StringBuilder();
+            switch (streamGraph.getVertexDescriptionMode()) {
+                case CASCADING:
+                    getCascadingDescription(builder, headOpId, headOpId);

Review comment:
       `getCascadingDescription` -> `buildCascadingDescription`? Similar to the 
`getTreeDescription`

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
##########
@@ -212,9 +213,125 @@ private JobGraph createJobGraph() {
                             + "This indicates that non-serializable types 
(like custom serializers) were registered");
         }
 
+        setVertexDescription();
+
         return jobGraph;
     }
 
+    private void setVertexDescription() {
+        for (Map.Entry<Integer, JobVertex> headOpAndJobVertex : 
jobVertices.entrySet()) {
+            Integer headOpId = headOpAndJobVertex.getKey();
+            JobVertex vertex = headOpAndJobVertex.getValue();
+            StringBuilder builder = new StringBuilder();
+            switch (streamGraph.getVertexDescriptionMode()) {
+                case CASCADING:
+                    getCascadingDescription(builder, headOpId, headOpId);
+                    break;
+                case TREE:
+                    getTreeDescription(builder, headOpId, headOpId, "", true);
+                    break;
+                default:
+                    throw new IllegalArgumentException(
+                            String.format(
+                                    "Description mode %s not supported",
+                                    streamGraph.getVertexDescriptionMode()));
+            }
+            vertex.setOperatorPrettyName(builder.toString());
+        }
+    }
+
+    private void getCascadingDescription(StringBuilder builder, int headOpId, 
int currentOpId) {
+        StreamNode node = streamGraph.getStreamNode(currentOpId);
+        builder.append(descWithChainedSourcesInfo(node));
+
+        LinkedList<Integer> chainedOutput = getChainedOutputNodes(headOpId, 
node);
+        if (chainedOutput.isEmpty()) {
+            return;
+        }
+        builder.append(" -> ");
+
+        boolean multiOutput = chainedOutput.size() > 1;
+        if (multiOutput) {
+            builder.append("(");
+        }
+
+        while (!chainedOutput.isEmpty()) {
+            Integer outputId = chainedOutput.pollFirst();
+            getCascadingDescription(builder, headOpId, outputId);
+            if (chainedOutput.isEmpty() && multiOutput) {
+                builder.append(")");
+            } else if (!chainedOutput.isEmpty()) {
+                builder.append(" , ");
+            }
+        }
+    }
+
+    private LinkedList<Integer> getChainedOutputNodes(int headOpId, StreamNode 
node) {
+        LinkedList<Integer> chainedOutput = new LinkedList<>();
+        for (StreamEdge edge : node.getOutEdges()) {
+            int targetId = edge.getTargetId();
+            if (chainedConfigs.containsKey(headOpId)

Review comment:
       Could we move this condition out of the iteration ? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to