Timo Walther created FLINK-8282: ----------------------------------- Summary: Transformation with TwoInputStreamOperator fails Key: FLINK-8282 URL: https://issues.apache.org/jira/browse/FLINK-8282 Project: Flink Issue Type: Bug Components: DataStream API Reporter: Timo Walther
The following program fails because of multiple reasons (see exceptions below). The transformation with a {{TwoInputStreamOperator}} does not extend {{AbstractStreamOperator}}. I think this is the main cause why it fails. Either we fix the exceptions or we check for {{AbstractStreamOperator}} first. {code} final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); DataStream<Integer> ds1 = env.addSource(new SourceFunction<Integer>() { @Override public void run(SourceContext<Integer> ctx) throws Exception { ctx.emitWatermark(new Watermark(100L)); ctx.collect(12); while (true) Thread.yield(); } @Override public void cancel() { } }); DataStream<Integer> ds2 = env.addSource(new SourceFunction<Integer>() { @Override public void run(SourceContext<Integer> ctx) throws Exception { ctx.emitWatermark(new Watermark(200L)); ctx.collect(12); while (true) Thread.yield(); } @Override public void cancel() { } }); ds1.connect(ds2.broadcast()).transform("test", Types.INT, new TwoInputStreamOperator<Integer, Integer, Integer>() { @Override public void processElement1(StreamRecord<Integer> element) throws Exception { System.out.println(); } @Override public void processElement2(StreamRecord<Integer> element) throws Exception { System.out.println(); } @Override public void processWatermark1(Watermark mark) throws Exception { System.out.println(); } @Override public void processWatermark2(Watermark mark) throws Exception { System.out.println(); } @Override public void processLatencyMarker1(LatencyMarker latencyMarker) throws Exception { } @Override public void processLatencyMarker2(LatencyMarker latencyMarker) throws Exception { } @Override public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<Integer>> output) { } @Override public void open() throws Exception { } @Override public void close() throws Exception { } @Override public void dispose() throws Exception { } @Override public OperatorSnapshotResult snapshotState(long checkpointId, long timestamp, CheckpointOptions checkpointOptions) throws Exception { return null; } @Override public void initializeState(OperatorSubtaskState stateHandles) throws Exception { } @Override public void notifyOfCompletedCheckpoint(long checkpointId) throws Exception { } @Override public void setKeyContextElement1(StreamRecord<?> record) throws Exception { } @Override public void setKeyContextElement2(StreamRecord<?> record) throws Exception { } @Override public ChainingStrategy getChainingStrategy() { return null; } @Override public void setChainingStrategy(ChainingStrategy strategy) { } @Override public MetricGroup getMetricGroup() { return null; } @Override public OperatorID getOperatorID() { return null; } }).print(); // execute program env.execute("Streaming WordCount"); {code} Exceptions are either: {code} 16:27:51,177 WARN org.apache.flink.streaming.api.operators.AbstractStreamOperator - Error while emitting latency marker. java.lang.RuntimeException: Buffer pool is destroyed. at org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitLatencyMarker(RecordWriterOutput.java:141) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitLatencyMarker(AbstractStreamOperator.java:825) at org.apache.flink.streaming.api.operators.StreamSource$LatencyMarksEmitter$1.onProcessingTime(StreamSource.java:150) at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:294) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.IllegalStateException: Buffer pool is destroyed. at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:203) at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:191) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:132) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.randomEmit(RecordWriter.java:107) at org.apache.flink.streaming.runtime.io.StreamRecordWriter.randomEmit(StreamRecordWriter.java:102) at org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitLatencyMarker(RecordWriterOutput.java:138) ... 10 more {code} or {code} 16:33:53,826 WARN org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor - An exception occurred during the metrics setup. java.lang.NullPointerException at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:211) at org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:91) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:278) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:724) at java.lang.Thread.run(Thread.java:745) {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)