[
https://issues.apache.org/jira/browse/BEAM-6859?focusedWorklogId=235570&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-235570
]
ASF GitHub Bot logged work on BEAM-6859:
----------------------------------------
Author: ASF GitHub Bot
Created on: 30/Apr/19 22:23
Start Date: 30/Apr/19 22:23
Worklog Time Spent: 10m
Work Description: adude3141 commented on pull request #8443: [BEAM-6859]
align teardown with setup calls also for empty streaming …
URL: https://github.com/apache/beam/pull/8443
…batches
This PR aligns teardown with setup calls. Currently, for empty batches in
streaming for DoFns setup is called, but teardown isn't. [1]
This PR removes the call to setup. An alternative implementation could be to
add
```diff
diff --git
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
index 830778fe95..97a4c288d9 100644
---
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
+++
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
@@ -58,6 +58,7 @@ class SparkProcessContext<FnInputT, FnOutputT, OutputT> {
// skip if partition is empty.
if (!partition.hasNext()) {
+ DoFnInvokers.invokerFor(doFn).invokeTeardown();
return new ArrayList<>();
}
```
This was implemented before, but the call to teardown was dropped and
afterwards the call to setup was moved on some refactoring so that this issue
was accidentally introduced.
[1] https://issues.apache.org/jira/browse/BEAM-6859
------------------------
Thank you for your contribution! Follow this checklist to help us
incorporate your contribution quickly and easily:
- [ ] [**Choose
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and
mention them in a comment (`R: @username`).
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA
issue, if applicable. This will automatically link the pull request to the
issue.
- [ ] If this contribution is large, please file an Apache [Individual
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
Post-Commit Tests Status (on master branch)
------------------------------------------------------------------------------------------------
Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
--- | --- | --- | --- | --- | --- | --- | ---
Go | [](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
| --- | --- | --- | --- | --- | ---
Java | [](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)
Python | [](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/)<br>[](https://builds.apache.org/job/beam_PostCommit_Python3_Verify/lastCompletedBuild/)
| --- | [](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)
<br> [](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/)
| --- | --- | ---
Pre-Commit Tests Status (on master branch)
------------------------------------------------------------------------------------------------
--- |Java | Python | Go | Website
--- | --- | --- | --- | ---
Non-portable | [](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/)
Portable | --- | [](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/)
| --- | ---
See
[.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md)
for trigger phrase, status and link of all Jenkins jobs.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 235570)
Time Spent: 10m
Remaining Estimate: 0h
> Spark-runner: DoFn tearDown function will not be invoked if there is no data
> in the batch
> -----------------------------------------------------------------------------------------
>
> Key: BEAM-6859
> URL: https://issues.apache.org/jira/browse/BEAM-6859
> Project: Beam
> Issue Type: Bug
> Components: runner-spark
> Affects Versions: 2.8.0, 2.9.0, 2.10.0, 2.11.0
> Reporter: Ron Cai
> Assignee: Michael Luckey
> Priority: Critical
> Time Spent: 10m
> Remaining Estimate: 0h
>
> In the implementation of
> [MultiDoFnFunction|https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java],
>
> {code:java}
> @Override
> public Iterator<Tuple2<TupleTag<?>, WindowedValue<?>>>
> call(Iterator<WindowedValue<InputT>> iter)
> throws Exception {
> if (!wasSetupCalled) {
> DoFnInvokers.invokerFor(doFn).invokeSetup();
> wasSetupCalled = true;
> }
> ...
> return new SparkProcessContext<>(
> doFn,
> doFnRunnerWithMetrics,
> outputManager,
> stateful ? new TimerDataIterator(timerInternals) :
> Collections.emptyIterator())
> .processPartition(iter)
> .iterator();
> }{code}
> It will call setup function of a DoFn every batch in spark streaming. And
> the tearDown function of DoFn will invoked by
> [SparkProcessContext|https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java]
> instance. But in the implementation of
> SparkProcessContext.processParition(), if the partition is empty, it will
> return an empty ArrayList instance directly. If there is no data in the
> batch, it means the tearDown function of DoFn will not be invoked for it is
> invoked in the ProcCtxtIterator instance which created only when there are
> data (parition.hasNext() == true).
> {code:java}
> Iterable<OutputT> processPartition(Iterator<WindowedValue<FnInputT>>
> partition) throws Exception {
>
> // skip if partition is empty.
> if (!partition.hasNext()) {
> return new ArrayList<>();
> }
>
> // process the partition; finishBundle() is called from within the
> output iterator.
> return this.getOutputIterable(partition, doFnRunner);
> }
> {code}
> If you want to reproduce the issue, just build a pipeline to read from
> KafkaIO.read and write by KafkaIO.write() to kafka and run as a spark
> streaming application, don't send any data to the kafka topic. Thread count
> of kafka producer will keep increasing and OOO at the end.
>
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)