[
https://issues.apache.org/jira/browse/BEAM-4803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16551079#comment-16551079
]
Vivek Agarwal commented on BEAM-4803:
-------------------------------------
[~jbonofre] yes spark runners for streaming need to create microbatches over
unbounded data source, but shouldn't the readers(kafka consumers in this case)
be cached? Looking at SourceDStream.java(in beam spark-runner code), I could
find
{code:java}
// Reader cache expiration interval. 50% of batch interval is added to
accommodate latency.
this.readerCacheInterval = 1.5 * sparkOptions.getBatchIntervalMillis();{code}
which is used to specify the time for which reader objects will be cached when
creating microbatches. Since it is just using batchInterval, we can't really
increase it without converting streaming to batch processing. I think there
should be a way to separately specify readerCacheInterval, no? as clearly
repeated kerberos authentication is a no go.
In kafka for spark streaming
[documentation|https://spark.apache.org/docs/2.2.0/streaming-kafka-0-10-integration.html#locationstrategies]
its mentioned that default behaviour is to cache consumers rather than
creating for each batch. Please comment, thanks.
> Beam spark runner not working properly with kafka
> -------------------------------------------------
>
> Key: BEAM-4803
> URL: https://issues.apache.org/jira/browse/BEAM-4803
> Project: Beam
> Issue Type: Bug
> Components: io-java-kafka, runner-spark
> Affects Versions: 2.4.0, 2.5.0
> Reporter: Vivek Agarwal
> Assignee: Jean-Baptiste Onofré
> Priority: Major
>
> We are running a beam stream processing job on a spark runner, which reads
> from a kafka topic using kerberos authentication. We are using java-io-kafka
> v2.4.0 to read from kafka topic in the pipeline. The issue is that the
> kafkaIO client is continuously creating a new kafka consumer with specified
> config, doing kerberos login every time. Also, there are spark streaming jobs
> which get spawned for the unbounded source, every second or so even when
> there is no data in the kafka topic. Log has these jobs-
> INFO SparkContext: Starting job: [email protected]:172
> We can see in the logs
> INFO MicrobatchSource: No cached reader found for split:
> [org.apache.beam.sdk.io.kafka.KafkaUnboundedSource@2919a728]. Creating new
> reader at checkpoint mark...
> And then it creates new consumer doing fresh kerberos login, which is
> creating issues.
> We are unsure of what should be correct behavior here and why so many spark
> streaming jobs are getting created. We tried the beam code with flink runner
> and did not find this issue there. Can someone point to the correct settings
> for using unbounded kafka source with spark runner using beam?
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)