[
https://issues.apache.org/jira/browse/BEAM-6859?focusedWorklogId=236344&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-236344
]
ASF GitHub Bot logged work on BEAM-6859:
----------------------------------------
Author: ASF GitHub Bot
Created on: 02/May/19 15:01
Start Date: 02/May/19 15:01
Worklog Time Spent: 10m
Work Description: echauchot commented on issue #8443: [BEAM-6859] align
teardown with setup calls also for empty streaming …
URL: https://github.com/apache/beam/pull/8443#issuecomment-488708334
Run Spark Validates Runner
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 236344)
Time Spent: 40m (was: 0.5h)
> Spark-runner: DoFn tearDown function will not be invoked if there is no data
> in the batch
> -----------------------------------------------------------------------------------------
>
> Key: BEAM-6859
> URL: https://issues.apache.org/jira/browse/BEAM-6859
> Project: Beam
> Issue Type: Bug
> Components: runner-spark
> Affects Versions: 2.8.0, 2.9.0, 2.10.0, 2.11.0
> Reporter: Ron Cai
> Assignee: Michael Luckey
> Priority: Critical
> Time Spent: 40m
> Remaining Estimate: 0h
>
> In the implementation of
> [MultiDoFnFunction|https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java],
>
> {code:java}
> @Override
> public Iterator<Tuple2<TupleTag<?>, WindowedValue<?>>>
> call(Iterator<WindowedValue<InputT>> iter)
> throws Exception {
> if (!wasSetupCalled) {
> DoFnInvokers.invokerFor(doFn).invokeSetup();
> wasSetupCalled = true;
> }
> ...
> return new SparkProcessContext<>(
> doFn,
> doFnRunnerWithMetrics,
> outputManager,
> stateful ? new TimerDataIterator(timerInternals) :
> Collections.emptyIterator())
> .processPartition(iter)
> .iterator();
> }{code}
> It will call setup function of a DoFn every batch in spark streaming. And
> the tearDown function of DoFn will invoked by
> [SparkProcessContext|https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java]
> instance. But in the implementation of
> SparkProcessContext.processParition(), if the partition is empty, it will
> return an empty ArrayList instance directly. If there is no data in the
> batch, it means the tearDown function of DoFn will not be invoked for it is
> invoked in the ProcCtxtIterator instance which created only when there are
> data (parition.hasNext() == true).
> {code:java}
> Iterable<OutputT> processPartition(Iterator<WindowedValue<FnInputT>>
> partition) throws Exception {
>
> // skip if partition is empty.
> if (!partition.hasNext()) {
> return new ArrayList<>();
> }
>
> // process the partition; finishBundle() is called from within the
> output iterator.
> return this.getOutputIterable(partition, doFnRunner);
> }
> {code}
> If you want to reproduce the issue, just build a pipeline to read from
> KafkaIO.read and write by KafkaIO.write() to kafka and run as a spark
> streaming application, don't send any data to the kafka topic. Thread count
> of kafka producer will keep increasing and OOO at the end.
>
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)