[ 
https://issues.apache.org/jira/browse/BEAM-6859?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ron Cai updated BEAM-6859:
--------------------------
    Description: 
In the implementation of 
[MultiDoFnFunction|https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java],
 
{code:java}
@Override
public Iterator<Tuple2<TupleTag<?>, WindowedValue<?>>> 
call(Iterator<WindowedValue<InputT>> iter)
throws Exception {
  if (!wasSetupCalled) {
    DoFnInvokers.invokerFor(doFn).invokeSetup();
    wasSetupCalled = true;
  }
  ...

   return new SparkProcessContext<>(
        doFn,
        doFnRunnerWithMetrics,
        outputManager,
        stateful ? new TimerDataIterator(timerInternals) : 
Collections.emptyIterator())
        .processPartition(iter)
        .iterator();
}{code}
It will call setup function of a DoFn every batch in spark streaming.  And the 
tearDown function of DoFn will invoked by 
[SparkProcessContext|https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java]
 instance. But in the implementation of SparkProcessContext.processParition(), 
if the partition is empty, it will return an empty ArrayList instance directly. 
If there is no data in the batch, it means the tearDown function of DoFn will 
not be invoked for it is invoked in the ProcCtxtIterator instance which created 
only when there are data (parition.hasNext() == true).
{code:java}
Iterable<OutputT> processPartition(Iterator<WindowedValue<FnInputT>> partition) 
throws Exception {
        
        // skip if partition is empty.
        if (!partition.hasNext()) {
            return new ArrayList<>();
        }
        
        // process the partition; finishBundle() is called from within the 
output iterator.
        return this.getOutputIterable(partition, doFnRunner);
        }

{code}
If you want to reproduce the issue, just build a pipeline to read from 
KafkaIO.read and write by KafkaIO.write() to kafka and run as a spark streaming 
application, don't send any data to the kafka topic. Thread count of kafka 
producer will keep increasing and OOO at the end.

 

 

  was:
In the implementation of 
[MultiDoFnFunction|https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java],
 
{code:java}
@Override
public Iterator<Tuple2<TupleTag<?>, WindowedValue<?>>> 
call(Iterator<WindowedValue<InputT>> iter)
throws Exception {
  if (!wasSetupCalled) {
    DoFnInvokers.invokerFor(doFn).invokeSetup();
    wasSetupCalled = true;
  }
  ...

   return new SparkProcessContext<>(
        doFn,
        doFnRunnerWithMetrics,
        outputManager,
        stateful ? new TimerDataIterator(timerInternals) : 
Collections.emptyIterator())
        .processPartition(iter)
        .iterator();
}{code}
It will call setup function of a DoFn every batch in spark streaming.  And the 
tearDown function of DoFn will invoked by 
[SparkProcessContext|https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java]
 instance. But in the implementation of SparkProcessContext.processParition(), 
if the partition is empty, it will return an empty ArrayList instance directly. 
If there is no data in the batch, it means the tearDown function of DoFn will 
not be invoked for it is invoked in the ProcCtxtIterator instance which created 
only when there are data (parition.hasNext() == true).
{code:java}
Iterable<OutputT> processPartition(Iterator<WindowedValue<FnInputT>> partition) 
throws Exception {
        
        // skip if partition is empty.
        if (!partition.hasNext()) {
            return new ArrayList<>();
        }
        
        // process the partition; finishBundle() is called from within the 
output iterator.
        return this.getOutputIterable(partition, doFnRunner);
        }

{code}
If you want to produce the issue, just build a pipeline to read from 
KafkaIO.read and write by KafkaIO.write() to kafka and run as a spark streaming 
application, don't send any data to the kafka topic. Thread count of kafka 
producer will keep increasing and OOO at the end.

 

 


> Spark-runner: DoFn tearDown function will not be invoked if there is no data 
> in the batch
> -----------------------------------------------------------------------------------------
>
>                 Key: BEAM-6859
>                 URL: https://issues.apache.org/jira/browse/BEAM-6859
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-spark
>    Affects Versions: 2.11.0
>            Reporter: Ron Cai
>            Priority: Critical
>
> In the implementation of 
> [MultiDoFnFunction|https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java],
>  
> {code:java}
> @Override
> public Iterator<Tuple2<TupleTag<?>, WindowedValue<?>>> 
> call(Iterator<WindowedValue<InputT>> iter)
> throws Exception {
>   if (!wasSetupCalled) {
>     DoFnInvokers.invokerFor(doFn).invokeSetup();
>     wasSetupCalled = true;
>   }
>   ...
>    return new SparkProcessContext<>(
>       doFn,
>       doFnRunnerWithMetrics,
>       outputManager,
>       stateful ? new TimerDataIterator(timerInternals) : 
> Collections.emptyIterator())
>       .processPartition(iter)
>       .iterator();
> }{code}
> It will call setup function of a DoFn every batch in spark streaming.  And 
> the tearDown function of DoFn will invoked by 
> [SparkProcessContext|https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java]
>  instance. But in the implementation of 
> SparkProcessContext.processParition(), if the partition is empty, it will 
> return an empty ArrayList instance directly. If there is no data in the 
> batch, it means the tearDown function of DoFn will not be invoked for it is 
> invoked in the ProcCtxtIterator instance which created only when there are 
> data (parition.hasNext() == true).
> {code:java}
> Iterable<OutputT> processPartition(Iterator<WindowedValue<FnInputT>> 
> partition) throws Exception {
>       
>       // skip if partition is empty.
>       if (!partition.hasNext()) {
>           return new ArrayList<>();
>       }
>       
>       // process the partition; finishBundle() is called from within the 
> output iterator.
>       return this.getOutputIterable(partition, doFnRunner);
>       }
> {code}
> If you want to reproduce the issue, just build a pipeline to read from 
> KafkaIO.read and write by KafkaIO.write() to kafka and run as a spark 
> streaming application, don't send any data to the kafka topic. Thread count 
> of kafka producer will keep increasing and OOO at the end.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to