This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fa12ce2f88ebb96ffaec78f2f6432a286d84faff
Author: noorall <[email protected]>
AuthorDate: Fri Oct 25 16:31:16 2024 +0800

    [FLINK-36066][runtime] Introduce ImmutableStreamGraph to provide a 
read-only view of StreamGraph
---
 .../flink/streaming/api/graph/StreamEdge.java      |  4 ++
 .../api/graph/util/ImmutableStreamEdge.java        | 48 +++++++++++++++
 .../api/graph/util/ImmutableStreamGraph.java       | 46 ++++++++++++++
 .../api/graph/util/ImmutableStreamNode.java        | 63 +++++++++++++++++++
 .../api/graph/util/ImmutableStreamGraphTest.java   | 72 ++++++++++++++++++++++
 5 files changed, 233 insertions(+)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java
index 704586badec..be8421f2ae1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java
@@ -238,4 +238,8 @@ public class StreamEdge implements Serializable {
     public IntermediateDataSetID getIntermediateDatasetIdToProduce() {
         return intermediateDatasetIdToProduce;
     }
+
+    public String getEdgeId() {
+        return edgeId;
+    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/ImmutableStreamEdge.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/ImmutableStreamEdge.java
new file mode 100644
index 00000000000..f0ab4d42e12
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/ImmutableStreamEdge.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph.util;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.api.graph.StreamEdge;
+
+/** Helper class that provides read-only StreamEdge. */
+@Internal
+public class ImmutableStreamEdge {
+    private final StreamEdge streamEdge;
+
+    public ImmutableStreamEdge(StreamEdge streamEdge) {
+        this.streamEdge = streamEdge;
+    }
+
+    public int getTypeNumber() {
+        return streamEdge.getTypeNumber();
+    }
+
+    public int getTargetId() {
+        return streamEdge.getTargetId();
+    }
+
+    public int getSourceId() {
+        return streamEdge.getSourceId();
+    }
+
+    public String getEdgeId() {
+        return streamEdge.getEdgeId();
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/ImmutableStreamGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/ImmutableStreamGraph.java
new file mode 100644
index 00000000000..eef1f001c31
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/ImmutableStreamGraph.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph.util;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** Helper class that provides read-only StreamGraph. */
+@Internal
+public class ImmutableStreamGraph {
+    private final StreamGraph streamGraph;
+
+    private final Map<Integer, ImmutableStreamNode> immutableStreamNodes;
+
+    public ImmutableStreamGraph(StreamGraph streamGraph) {
+        this.streamGraph = streamGraph;
+        this.immutableStreamNodes = new HashMap<>();
+    }
+
+    public ImmutableStreamNode getStreamNode(Integer vertexId) {
+        if (streamGraph.getStreamNode(vertexId) == null) {
+            return null;
+        }
+        return immutableStreamNodes.computeIfAbsent(
+                vertexId, id -> new 
ImmutableStreamNode(streamGraph.getStreamNode(id)));
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/ImmutableStreamNode.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/ImmutableStreamNode.java
new file mode 100644
index 00000000000..781afe1c4f2
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/ImmutableStreamNode.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph.util;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.api.graph.StreamEdge;
+import org.apache.flink.streaming.api.graph.StreamNode;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/** Helper class that provides read-only StreamNode. */
+@Internal
+public class ImmutableStreamNode {
+    private final StreamNode streamNode;
+    private List<ImmutableStreamEdge> immutableOutEdges = null;
+    private List<ImmutableStreamEdge> immutableInEdges = null;
+
+    public ImmutableStreamNode(StreamNode streamNode) {
+        this.streamNode = streamNode;
+    }
+
+    public List<ImmutableStreamEdge> getOutEdges() {
+        if (immutableOutEdges == null) {
+            immutableOutEdges = new ArrayList<>();
+            for (StreamEdge edge : streamNode.getOutEdges()) {
+                immutableOutEdges.add(new ImmutableStreamEdge(edge));
+            }
+        }
+        return Collections.unmodifiableList(immutableOutEdges);
+    }
+
+    public List<ImmutableStreamEdge> getInEdges() {
+        if (immutableInEdges == null) {
+            immutableInEdges = new ArrayList<>();
+            for (StreamEdge edge : streamNode.getInEdges()) {
+                immutableInEdges.add(new ImmutableStreamEdge(edge));
+            }
+        }
+        return Collections.unmodifiableList(immutableInEdges);
+    }
+
+    public int getId() {
+        return streamNode.getId();
+    }
+}
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/util/ImmutableStreamGraphTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/util/ImmutableStreamGraphTest.java
new file mode 100644
index 00000000000..53777f2efdd
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/util/ImmutableStreamGraphTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph.util;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.graph.StreamEdge;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.streaming.api.graph.StreamNode;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Unit tests for {@link ImmutableStreamGraph}. */
+class ImmutableStreamGraphTest {
+    @Test
+    void testImmutableStreamGraphGraphContent() {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.fromSequence(1L, 3L).map(value -> 
value).print().setParallelism(env.getParallelism());
+        StreamGraph streamGraph = env.getStreamGraph();
+        ImmutableStreamGraph immutableStreamGraph = new 
ImmutableStreamGraph(streamGraph);
+
+        for (StreamNode streamNode : streamGraph.getStreamNodes()) {
+            isStreamNodeEquals(streamNode, 
immutableStreamGraph.getStreamNode(streamNode.getId()));
+        }
+    }
+
+    void isStreamNodeEquals(StreamNode streamNode, ImmutableStreamNode 
immutableStreamNode) {
+        assertThat(immutableStreamNode).isNotNull();
+        assertThat(streamNode.getId()).isEqualTo(immutableStreamNode.getId());
+        assertThat(streamNode.getInEdges().size())
+                .isEqualTo(immutableStreamNode.getInEdges().size());
+        assertThat(streamNode.getOutEdges().size())
+                .isEqualTo(immutableStreamNode.getOutEdges().size());
+        List<StreamEdge> inEdges = streamNode.getInEdges();
+        List<ImmutableStreamEdge> immutableInEdges = 
immutableStreamNode.getInEdges();
+        for (int i = 0; i < inEdges.size(); i++) {
+            isStreamEdgeEquals(inEdges.get(i), immutableInEdges.get(i));
+        }
+        List<StreamEdge> outEdges = streamNode.getOutEdges();
+        List<ImmutableStreamEdge> immutableOutEdges = 
immutableStreamNode.getOutEdges();
+        for (int i = 0; i < outEdges.size(); i++) {
+            isStreamEdgeEquals(outEdges.get(i), immutableOutEdges.get(i));
+        }
+    }
+
+    void isStreamEdgeEquals(StreamEdge streamEdge, ImmutableStreamEdge 
immutableStreamEdge) {
+        assertThat(immutableStreamEdge).isNotNull();
+        
assertThat(streamEdge.getEdgeId()).isEqualTo(immutableStreamEdge.getEdgeId());
+        
assertThat(streamEdge.getSourceId()).isEqualTo(immutableStreamEdge.getSourceId());
+        
assertThat(streamEdge.getTargetId()).isEqualTo(immutableStreamEdge.getTargetId());
+        
assertThat(streamEdge.getTypeNumber()).isEqualTo(immutableStreamEdge.getTypeNumber());
+    }
+}

Reply via email to