Zhipeng Zhang created FLINK-31255:
-------------------------------------

             Summary: OperatorUtils#createWrappedOperatorConfig fails to wrap 
operator config
                 Key: FLINK-31255
                 URL: https://issues.apache.org/jira/browse/FLINK-31255
             Project: Flink
          Issue Type: Bug
          Components: Library / Machine Learning
    Affects Versions: ml-2.1.0, ml-2.0.0, ml-2.2.0
            Reporter: Zhipeng Zhang


Currently we use operator wrapper to enable using normal operators in 
iterations. However, teh operatorConfig is not correctly unwrapped. For 
example, the following code fails because of wrong type serializer.

 
{code:java}
@Test
public void testIterationWithMapPartition() throws Exception {
    StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
    DataStream<Long> input =
        env.fromParallelCollection(new NumberSequenceIterator(0L, 5L), 
Types.LONG);
    DataStreamList result =
        Iterations.iterateBoundedStreamsUntilTermination(
            DataStreamList.of(input),
            ReplayableDataStreamList.notReplay(input),
            IterationConfig.newBuilder()
                .setOperatorLifeCycle(OperatorLifeCycle.PER_ROUND)
                .build(),
            new IterationBodyWithMapPartition());

    List<Integer> counts = 
IteratorUtils.toList(result.get(0).executeAndCollect());
    System.out.println(counts.size());
}

private static class IterationBodyWithMapPartition implements IterationBody {

    @Override
    public IterationBodyResult process(
        DataStreamList variableStreams, DataStreamList dataStreams) {
        DataStream<Long> input = variableStreams.get(0);

        DataStream<Long> mapPartitionResult =
            DataStreamUtils.mapPartition(
                input,
                new MapPartitionFunction <Long, Long>() {
                    @Override
                    public void mapPartition(Iterable <Long> iterable, 
Collector <Long> collector)
                        throws Exception {
                        for (Long iter: iterable) {
                            collector.collect(iter);
                        }
                    }
                });

        DataStream<Integer> terminationCriteria =
            mapPartitionResult.<Long>flatMap(new 
TerminateOnMaxIter(2)).returns(Types.INT);

        return new IterationBodyResult(
            DataStreamList.of(mapPartitionResult), variableStreams, 
terminationCriteria);
    }
} {code}
The error stack is:

Caused by: java.lang.ClassCastException: java.lang.Long cannot be cast to 
org.apache.flink.iteration.IterationRecord
    at 
org.apache.flink.iteration.typeinfo.IterationRecordSerializer.serialize(IterationRecordSerializer.java:34)
    at 
org.apache.flink.iteration.datacache.nonkeyed.FileSegmentWriter.addRecord(FileSegmentWriter.java:79)
    at 
org.apache.flink.iteration.datacache.nonkeyed.DataCacheWriter.addRecord(DataCacheWriter.java:107)
    at 
org.apache.flink.iteration.datacache.nonkeyed.ListStateWithCache.add(ListStateWithCache.java:148)
    at 
org.apache.flink.ml.common.datastream.DataStreamUtils$MapPartitionOperator.processElement(DataStreamUtils.java:445)
    at 
org.apache.flink.iteration.operator.perround.OneInputPerRoundWrapperOperator.processElement(OneInputPerRoundWrapperOperator.java:69)
    at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
    at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
    at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
    at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
    at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
    at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
    at java.lang.Thread.run(Thread.java:748)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to