[ 
https://issues.apache.org/jira/browse/FLINK-8282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16308293#comment-16308293
 ] 

Aljoscha Krettek commented on FLINK-8282:
-----------------------------------------

In my opinion, there is no good separation of concerns between the 
{{StreamTask}} and the {{StreamOperator}}. {{AbstractStreamOperator}} does a 
lot of things that a stream operator shouldn't do and does things in a very 
specific way that {{StreamTask}} and other components expect to be done in this 
way and things go haywire if the operator doesn't behave that way.

Off the top of my head this includes everything that happens in {{setup()}}, 
i.e. metrics setup and configuration stuff, everything that happens in the 
various state initialisation methods and snapshot/restore methods, and the 
latency marker stuff.

There is actually this (somewhat old) issue: FLINK-4859.

> Transformation with TwoInputStreamOperator fails
> ------------------------------------------------
>
>                 Key: FLINK-8282
>                 URL: https://issues.apache.org/jira/browse/FLINK-8282
>             Project: Flink
>          Issue Type: Bug
>          Components: DataStream API
>    Affects Versions: 1.4.0
>            Reporter: Timo Walther
>
> The following program fails because of multiple reasons (see exceptions 
> below). The transformation with a {{TwoInputStreamOperator}} does not extend 
> {{AbstractStreamOperator}}. I think this is the main cause why it fails. 
> Either we fix the exceptions or we check for {{AbstractStreamOperator}} first.
> {code}
>               final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>               env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>               DataStream<Integer> ds1 = env.addSource(new 
> SourceFunction<Integer>() {
>                       @Override
>                       public void run(SourceContext<Integer> ctx) throws 
> Exception {
>                               ctx.emitWatermark(new Watermark(100L));
>                               ctx.collect(12);
>                               while (true) Thread.yield();
>                       }
>                       @Override
>                       public void cancel() {
>                       }
>               });
>               DataStream<Integer> ds2 = env.addSource(new 
> SourceFunction<Integer>() {
>                       @Override
>                       public void run(SourceContext<Integer> ctx) throws 
> Exception {
>                               ctx.emitWatermark(new Watermark(200L));
>                               ctx.collect(12);
>                               while (true) Thread.yield();
>                       }
>                       @Override
>                       public void cancel() {
>                       }
>               });
>               ds1.connect(ds2.broadcast()).transform("test", Types.INT, new 
> TwoInputStreamOperator<Integer, Integer, Integer>() {
>                       @Override
>                       public void processElement1(StreamRecord<Integer> 
> element) throws Exception {
>                               System.out.println();
>                       }
>                       @Override
>                       public void processElement2(StreamRecord<Integer> 
> element) throws Exception {
>                               System.out.println();
>                       }
>                       @Override
>                       public void processWatermark1(Watermark mark) throws 
> Exception {
>                               System.out.println();
>                       }
>                       @Override
>                       public void processWatermark2(Watermark mark) throws 
> Exception {
>                               System.out.println();
>                       }
>                       @Override
>                       public void processLatencyMarker1(LatencyMarker 
> latencyMarker) throws Exception {
>                       }
>                       @Override
>                       public void processLatencyMarker2(LatencyMarker 
> latencyMarker) throws Exception {
>                       }
>                       @Override
>                       public void setup(StreamTask<?, ?> containingTask, 
> StreamConfig config, Output<StreamRecord<Integer>> output) {
>                       }
>                       @Override
>                       public void open() throws Exception {
>                       }
>                       @Override
>                       public void close() throws Exception {
>                       }
>                       @Override
>                       public void dispose() throws Exception {
>                       }
>                       @Override
>                       public OperatorSnapshotResult snapshotState(long 
> checkpointId, long timestamp, CheckpointOptions checkpointOptions) throws 
> Exception {
>                               return null;
>                       }
>                       @Override
>                       public void initializeState(OperatorSubtaskState 
> stateHandles) throws Exception {
>                       }
>                       @Override
>                       public void notifyOfCompletedCheckpoint(long 
> checkpointId) throws Exception {
>                       }
>                       @Override
>                       public void setKeyContextElement1(StreamRecord<?> 
> record) throws Exception {
>                       }
>                       @Override
>                       public void setKeyContextElement2(StreamRecord<?> 
> record) throws Exception {
>                       }
>                       @Override
>                       public ChainingStrategy getChainingStrategy() {
>                               return null;
>                       }
>                       @Override
>                       public void setChainingStrategy(ChainingStrategy 
> strategy) {
>                       }
>                       @Override
>                       public MetricGroup getMetricGroup() {
>                               return null;
>                       }
>                       @Override
>                       public OperatorID getOperatorID() {
>                               return null;
>                       }
>               }).print();
>               // execute program
>               env.execute("Streaming WordCount");
> {code}
> Exceptions are either:
> {code}
> 16:27:51,177 WARN  
> org.apache.flink.streaming.api.operators.AbstractStreamOperator  - Error 
> while emitting latency marker.
> java.lang.RuntimeException: Buffer pool is destroyed.
>       at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitLatencyMarker(RecordWriterOutput.java:141)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitLatencyMarker(AbstractStreamOperator.java:825)
>       at 
> org.apache.flink.streaming.api.operators.StreamSource$LatencyMarksEmitter$1.onProcessingTime(StreamSource.java:150)
>       at 
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:294)
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>       at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.IllegalStateException: Buffer pool is destroyed.
>       at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:203)
>       at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:191)
>       at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:132)
>       at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.randomEmit(RecordWriter.java:107)
>       at 
> org.apache.flink.streaming.runtime.io.StreamRecordWriter.randomEmit(StreamRecordWriter.java:102)
>       at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitLatencyMarker(RecordWriterOutput.java:138)
>       ... 10 more
> {code}
> or
> {code}
> 16:33:53,826 WARN  
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor  - An exception 
> occurred during the metrics setup.
> java.lang.NullPointerException
>       at 
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:211)
>       at 
> org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:91)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:278)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:724)
>       at java.lang.Thread.run(Thread.java:745)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to