[ 
https://issues.apache.org/jira/browse/BEAM-6859?focusedWorklogId=235570&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-235570
 ]

ASF GitHub Bot logged work on BEAM-6859:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 30/Apr/19 22:23
            Start Date: 30/Apr/19 22:23
    Worklog Time Spent: 10m 
      Work Description: adude3141 commented on pull request #8443: [BEAM-6859] 
align teardown with setup calls also for empty streaming …
URL: https://github.com/apache/beam/pull/8443
 
 
   …batches
   
   This PR aligns teardown with setup calls. Currently, for empty batches in 
streaming for DoFns setup is called, but teardown isn't. [1]
   
   This PR removes the call to setup. An alternative implementation could be to 
add
   ```diff
   diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
   index 830778fe95..97a4c288d9 100644
   --- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
   +++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
   @@ -58,6 +58,7 @@ class SparkProcessContext<FnInputT, FnOutputT, OutputT> {
    
        // skip if partition is empty.
        if (!partition.hasNext()) {
   +      DoFnInvokers.invokerFor(doFn).invokeTeardown();
          return new ArrayList<>();
        }
   ```
   This was implemented before, but the call to teardown was dropped and 
afterwards the call to setup was moved on some refactoring so that this issue 
was accidentally introduced.
   
   
   [1] https://issues.apache.org/jira/browse/BEAM-6859
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
    - [ ] [**Choose 
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and 
mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
    - [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   Post-Commit Tests Status (on master branch)
   
------------------------------------------------------------------------------------------------
   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | --- | --- | --- | --- | ---
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)
   Python | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/)<br>[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Python3_Verify/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python3_Verify/lastCompletedBuild/)
 | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)
 <br> [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/)
 | --- | --- | ---
   
   Pre-Commit Tests Status (on master branch)
   
------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website
   --- | --- | --- | --- | ---
   Non-portable | [![Build 
Status](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/)
 
   Portable | --- | [![Build 
Status](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/)
 | --- | ---
   
   See 
[.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md)
 for trigger phrase, status and link of all Jenkins jobs.
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

            Worklog Id:     (was: 235570)
            Time Spent: 10m
    Remaining Estimate: 0h

> Spark-runner: DoFn tearDown function will not be invoked if there is no data 
> in the batch
> -----------------------------------------------------------------------------------------
>
>                 Key: BEAM-6859
>                 URL: https://issues.apache.org/jira/browse/BEAM-6859
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-spark
>    Affects Versions: 2.8.0, 2.9.0, 2.10.0, 2.11.0
>            Reporter: Ron Cai
>            Assignee: Michael Luckey
>            Priority: Critical
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> In the implementation of 
> [MultiDoFnFunction|https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java],
>  
> {code:java}
> @Override
> public Iterator<Tuple2<TupleTag<?>, WindowedValue<?>>> 
> call(Iterator<WindowedValue<InputT>> iter)
> throws Exception {
>   if (!wasSetupCalled) {
>     DoFnInvokers.invokerFor(doFn).invokeSetup();
>     wasSetupCalled = true;
>   }
>   ...
>    return new SparkProcessContext<>(
>       doFn,
>       doFnRunnerWithMetrics,
>       outputManager,
>       stateful ? new TimerDataIterator(timerInternals) : 
> Collections.emptyIterator())
>       .processPartition(iter)
>       .iterator();
> }{code}
> It will call setup function of a DoFn every batch in spark streaming.  And 
> the tearDown function of DoFn will invoked by 
> [SparkProcessContext|https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java]
>  instance. But in the implementation of 
> SparkProcessContext.processParition(), if the partition is empty, it will 
> return an empty ArrayList instance directly. If there is no data in the 
> batch, it means the tearDown function of DoFn will not be invoked for it is 
> invoked in the ProcCtxtIterator instance which created only when there are 
> data (parition.hasNext() == true).
> {code:java}
> Iterable<OutputT> processPartition(Iterator<WindowedValue<FnInputT>> 
> partition) throws Exception {
>       
>       // skip if partition is empty.
>       if (!partition.hasNext()) {
>           return new ArrayList<>();
>       }
>       
>       // process the partition; finishBundle() is called from within the 
> output iterator.
>       return this.getOutputIterable(partition, doFnRunner);
>       }
> {code}
> If you want to reproduce the issue, just build a pipeline to read from 
> KafkaIO.read and write by KafkaIO.write() to kafka and run as a spark 
> streaming application, don't send any data to the kafka topic. Thread count 
> of kafka producer will keep increasing and OOO at the end.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to