This is an automated email from the ASF dual-hosted git repository.

lindong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-ml.git

commit 0c852da7a1b87bc632e21a19148dbb8d19a8bd30
Author: Zhangzp <[email protected]>
AuthorDate: Thu Apr 20 13:30:20 2023 +0800

    [FLINK-31173] TailOperator should have only one input
    
    This closes #216.
---
 .../org/apache/flink/iteration/Iterations.java     | 27 ++++++++++++++++------
 1 file changed, 20 insertions(+), 7 deletions(-)

diff --git 
a/flink-ml-iteration/src/main/java/org/apache/flink/iteration/Iterations.java 
b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/Iterations.java
index 8f716770..8f8fc3e2 100644
--- 
a/flink-ml-iteration/src/main/java/org/apache/flink/iteration/Iterations.java
+++ 
b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/Iterations.java
@@ -21,6 +21,7 @@ package org.apache.flink.iteration;
 import org.apache.flink.annotation.Experimental;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.iteration.compile.DraftExecutionEnvironment;
 import org.apache.flink.iteration.operator.HeadOperator;
 import org.apache.flink.iteration.operator.HeadOperatorFactory;
@@ -38,6 +39,7 @@ import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.streaming.api.transformations.PhysicalTransformation;
 import org.apache.flink.util.Collector;
 
 import java.util.ArrayList;
@@ -476,13 +478,24 @@ public class Iterations {
         return new DataStreamList(
                 map(
                         dataStreams,
-                        (index, dataStream) ->
-                                ((DataStream<IterationRecord<?>>) dataStream)
-                                        .transform(
-                                                "tail-" + 
dataStream.getTransformation().getName(),
-                                                new 
IterationRecordTypeInfo(dataStream.getType()),
-                                                new TailOperator(iterationId, 
startIndex + index))
-                                        
.setParallelism(dataStream.getParallelism())));
+                        (index, dataStream) -> {
+                            Transformation<?> inputTransformation = 
dataStream.getTransformation();
+                            if (!(inputTransformation instanceof 
PhysicalTransformation)
+                                    && inputTransformation.getInputs().size() 
> 1) {
+                                // TODO: Support epoch watermark alignment for 
TailOperator.
+                                throw new UnsupportedOperationException(
+                                        "Tail operator should have only one 
input. Please check whether operator \""
+                                                + inputTransformation.getName()
+                                                + "\" contains multiple 
inputs.");
+                            }
+
+                            return ((DataStream<IterationRecord<?>>) 
dataStream)
+                                    .transform(
+                                            "tail-" + 
dataStream.getTransformation().getName(),
+                                            new 
IterationRecordTypeInfo(dataStream.getType()),
+                                            new TailOperator(iterationId, 
startIndex + index))
+                                    
.setParallelism(dataStream.getParallelism());
+                        }));
     }
 
     @SuppressWarnings({"unchecked", "rawtypes"})

Reply via email to