This is an automated email from the ASF dual-hosted git repository. lindong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-ml.git
commit 0c852da7a1b87bc632e21a19148dbb8d19a8bd30 Author: Zhangzp <[email protected]> AuthorDate: Thu Apr 20 13:30:20 2023 +0800 [FLINK-31173] TailOperator should have only one input This closes #216. --- .../org/apache/flink/iteration/Iterations.java | 27 ++++++++++++++++------ 1 file changed, 20 insertions(+), 7 deletions(-) diff --git a/flink-ml-iteration/src/main/java/org/apache/flink/iteration/Iterations.java b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/Iterations.java index 8f716770..8f8fc3e2 100644 --- a/flink-ml-iteration/src/main/java/org/apache/flink/iteration/Iterations.java +++ b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/Iterations.java @@ -21,6 +21,7 @@ package org.apache.flink.iteration; import org.apache.flink.annotation.Experimental; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.dag.Transformation; import org.apache.flink.iteration.compile.DraftExecutionEnvironment; import org.apache.flink.iteration.operator.HeadOperator; import org.apache.flink.iteration.operator.HeadOperatorFactory; @@ -38,6 +39,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.CoProcessFunction; import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; import org.apache.flink.streaming.api.transformations.OneInputTransformation; +import org.apache.flink.streaming.api.transformations.PhysicalTransformation; import org.apache.flink.util.Collector; import java.util.ArrayList; @@ -476,13 +478,24 @@ public class Iterations { return new DataStreamList( map( dataStreams, - (index, dataStream) -> - ((DataStream<IterationRecord<?>>) dataStream) - .transform( - "tail-" + dataStream.getTransformation().getName(), - new IterationRecordTypeInfo(dataStream.getType()), - new TailOperator(iterationId, startIndex + index)) - .setParallelism(dataStream.getParallelism()))); + (index, dataStream) -> { + Transformation<?> inputTransformation = dataStream.getTransformation(); + if (!(inputTransformation instanceof PhysicalTransformation) + && inputTransformation.getInputs().size() > 1) { + // TODO: Support epoch watermark alignment for TailOperator. + throw new UnsupportedOperationException( + "Tail operator should have only one input. Please check whether operator \"" + + inputTransformation.getName() + + "\" contains multiple inputs."); + } + + return ((DataStream<IterationRecord<?>>) dataStream) + .transform( + "tail-" + dataStream.getTransformation().getName(), + new IterationRecordTypeInfo(dataStream.getType()), + new TailOperator(iterationId, startIndex + index)) + .setParallelism(dataStream.getParallelism()); + })); } @SuppressWarnings({"unchecked", "rawtypes"})
