[ https://issues.apache.org/jira/browse/BEAM-6859?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Work on BEAM-6859 started by Michael Luckey. -------------------------------------------- > Spark-runner: DoFn tearDown function will not be invoked if there is no data > in the batch > ----------------------------------------------------------------------------------------- > > Key: BEAM-6859 > URL: https://issues.apache.org/jira/browse/BEAM-6859 > Project: Beam > Issue Type: Bug > Components: runner-spark > Affects Versions: 2.8.0, 2.9.0, 2.10.0, 2.11.0 > Reporter: Ron Cai > Assignee: Michael Luckey > Priority: Critical > > In the implementation of > [MultiDoFnFunction|https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java], > > {code:java} > @Override > public Iterator<Tuple2<TupleTag<?>, WindowedValue<?>>> > call(Iterator<WindowedValue<InputT>> iter) > throws Exception { > if (!wasSetupCalled) { > DoFnInvokers.invokerFor(doFn).invokeSetup(); > wasSetupCalled = true; > } > ... > return new SparkProcessContext<>( > doFn, > doFnRunnerWithMetrics, > outputManager, > stateful ? new TimerDataIterator(timerInternals) : > Collections.emptyIterator()) > .processPartition(iter) > .iterator(); > }{code} > It will call setup function of a DoFn every batch in spark streaming. And > the tearDown function of DoFn will invoked by > [SparkProcessContext|https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java] > instance. But in the implementation of > SparkProcessContext.processParition(), if the partition is empty, it will > return an empty ArrayList instance directly. If there is no data in the > batch, it means the tearDown function of DoFn will not be invoked for it is > invoked in the ProcCtxtIterator instance which created only when there are > data (parition.hasNext() == true). > {code:java} > Iterable<OutputT> processPartition(Iterator<WindowedValue<FnInputT>> > partition) throws Exception { > > // skip if partition is empty. > if (!partition.hasNext()) { > return new ArrayList<>(); > } > > // process the partition; finishBundle() is called from within the > output iterator. > return this.getOutputIterable(partition, doFnRunner); > } > {code} > If you want to reproduce the issue, just build a pipeline to read from > KafkaIO.read and write by KafkaIO.write() to kafka and run as a spark > streaming application, don't send any data to the kafka topic. Thread count > of kafka producer will keep increasing and OOO at the end. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)