zhuzhurk commented on code in PR #24475:
URL: https://github.com/apache/flink/pull/24475#discussion_r1524746682


##########
flink-core/src/test/java/org/apache/flink/api/dag/TransformationTest.java:
##########
@@ -125,19 +145,34 @@ public void 
testDeclareManagedMemorySlotScopeUseCaseFailWrongScope() {
 
     /** A test implementation of {@link Transformation}. */
     private static class TestTransformation<T> extends Transformation<T> {
-
-        public TestTransformation(String name, TypeInformation<T> outputType, 
int parallelism) {

Review Comment:
   It's better to introduce a `TestMultipleInputTransformation` which inherits 
`AbstractMultipleInputTransformation`.
   Otherwise we are testing some testing logic instead of production logic.



##########
flink-core/src/test/java/org/apache/flink/api/dag/TransformationTest.java:
##########
@@ -45,7 +47,25 @@ public class TransformationTest extends TestLogger {
 
     @Before
     public void setUp() {
-        transformation = new TestTransformation<>("t", null, 1);
+        transformation = new TestTransformation<>("t", null, 1, null);
+    }
+
+    @Test
+    public void testPredecessorCache() throws Exception {
+        // diamond-like transformation:
+        //                   --> midNode-\
+        //              ---/             --\
+        //           --/                    ->
+        //      commonNode  -----------------> topNode
+        Transformation<Void> commonNode = new 
TestTransformation<>("commonNode", null, 1, null);
+        Transformation<Void> midNode =
+                new TestTransformation<>("midNode", null, 1, 
Collections.singletonList(commonNode));
+        Transformation<Void> topNode =
+                new TestTransformation<>("topNode", null, 1, 
Arrays.asList(midNode, commonNode));
+
+        topNode.getTransitivePredecessors();
+        assertEquals(topNode.getPredecessorsCache().size(), 1);

Review Comment:
   Flink is turning to JUnit5. According to the community guide[1], new tests 
should be written using JUnit5. If we have to modify an existing testing class, 
we should migrate it to JUnit5 first in a hotfix commit, for example, 
[#20350](https://github.com/apache/flink/pull/20350/commits).
   
   [1] 
https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#tooling



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/AbstractBroadcastStateTransformation.java:
##########
@@ -103,7 +103,7 @@ public void setChainingStrategy(ChainingStrategy 
chainingStrategy) {
     }
 
     @Override
-    public List<Transformation<?>> getTransitivePredecessors() {
+    protected List<Transformation<?>> getTransitivePredecessorsInternal() {

Review Comment:
   This method, and a few other methods like in TwoInputTransformation and 
UnionTransformation, need to be fixed too. 
   And it's better to introduce a test for each of them. Maybe we can introduce 
a separate test class `GetTransitivePredecessorsTest` to host them all.



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/AbstractMultipleInputTransformation.java:
##########
@@ -76,10 +78,10 @@ public StreamOperatorFactory<OUT> getOperatorFactory() {
     }
 
     @Override
-    public List<Transformation<?>> getTransitivePredecessors() {

Review Comment:
   A simpler way can be
   
   ```
           return inputs.stream()
                   .flatMap(input -> input.getTransitivePredecessors().stream())
                   .distinct();
                   .collect(Collectors.toList());
   ```



##########
flink-core/src/test/java/org/apache/flink/api/dag/TransformationTest.java:
##########
@@ -42,10 +43,21 @@
 public class TransformationTest extends TestLogger {
 
     private Transformation<Void> transformation;
+    private Transformation<Void> transformationWithInput;
 
     @Before
     public void setUp() {
         transformation = new TestTransformation<>("t", null, 1);
+        transformationWithInput =
+                new TestTransformationWithInput<>(
+                        "t", null, 1, Arrays.asList(transformation, 
transformation));
+    }
+
+    @Test
+    public void testPredecessorCache() throws Exception {
+        transformationWithInput.getTransitivePredecessors();
+        assertEquals(transformationWithInput.getPredecessorsCache().size(), 1);
+        assertEquals(((TestTransformation<?>) 
transformation).getNumGetTransitivePredecessor(), 1);
     }

Review Comment:
   Can we verify the result of `topNode.getTransitivePredecessors()`?
   Ideally it should be 2. But without the fix it will be 3.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to