[ 
https://issues.apache.org/jira/browse/BEAM-6859?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Work on BEAM-6859 started by Michael Luckey.
--------------------------------------------
> Spark-runner: DoFn tearDown function will not be invoked if there is no data 
> in the batch
> -----------------------------------------------------------------------------------------
>
>                 Key: BEAM-6859
>                 URL: https://issues.apache.org/jira/browse/BEAM-6859
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-spark
>    Affects Versions: 2.8.0, 2.9.0, 2.10.0, 2.11.0
>            Reporter: Ron Cai
>            Assignee: Michael Luckey
>            Priority: Critical
>
> In the implementation of 
> [MultiDoFnFunction|https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java],
>  
> {code:java}
> @Override
> public Iterator<Tuple2<TupleTag<?>, WindowedValue<?>>> 
> call(Iterator<WindowedValue<InputT>> iter)
> throws Exception {
>   if (!wasSetupCalled) {
>     DoFnInvokers.invokerFor(doFn).invokeSetup();
>     wasSetupCalled = true;
>   }
>   ...
>    return new SparkProcessContext<>(
>       doFn,
>       doFnRunnerWithMetrics,
>       outputManager,
>       stateful ? new TimerDataIterator(timerInternals) : 
> Collections.emptyIterator())
>       .processPartition(iter)
>       .iterator();
> }{code}
> It will call setup function of a DoFn every batch in spark streaming.  And 
> the tearDown function of DoFn will invoked by 
> [SparkProcessContext|https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java]
>  instance. But in the implementation of 
> SparkProcessContext.processParition(), if the partition is empty, it will 
> return an empty ArrayList instance directly. If there is no data in the 
> batch, it means the tearDown function of DoFn will not be invoked for it is 
> invoked in the ProcCtxtIterator instance which created only when there are 
> data (parition.hasNext() == true).
> {code:java}
> Iterable<OutputT> processPartition(Iterator<WindowedValue<FnInputT>> 
> partition) throws Exception {
>       
>       // skip if partition is empty.
>       if (!partition.hasNext()) {
>           return new ArrayList<>();
>       }
>       
>       // process the partition; finishBundle() is called from within the 
> output iterator.
>       return this.getOutputIterable(partition, doFnRunner);
>       }
> {code}
> If you want to reproduce the issue, just build a pipeline to read from 
> KafkaIO.read and write by KafkaIO.write() to kafka and run as a spark 
> streaming application, don't send any data to the kafka topic. Thread count 
> of kafka producer will keep increasing and OOO at the end.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to