[
https://issues.apache.org/jira/browse/STORM-4055?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Anthony Castrati updated STORM-4055:
------------------------------------
Description:
After a recent upgrade to storm-kafka-client on storm server 2.6.1, we are
seeing ConcurrentModificationException in our topology at runtime. I believe
this is due to the re-use of a KafkaConsumer instance between the KafkaSpout
and the
KafkaOffsetPartitionMetrics which were added some time between 2.4.0 and 2.6.1.
h3. Steps to Reproduce:
Configure a topology with a basic KafkaSpout. Configure the topology with one
of the metrics loggers. We used our own custom one, but reproduced it with
ConsoleStormReporter as well. The JMXReporter did not reproduce the issue for
us, but we did not dig into why.
*reporter config:*
{{topology.metrics.reporters: [}}
{{ {}}
{{ "filter": {}}
{{ "expression": ".*",}}
{{ "class": "org.apache.storm.metrics2.filters.RegexFilter"}}
{{ },}}
{{ "report.period": 15,}}
{{ "report.period.units": "SECONDS",}}
{{ "class": "org.apache.storm.metrics2.reporters.ConsoleStormReporter"}}
}
{{]}}
h3. Stacktrace:
{quote}[ERROR] Exception thrown from NewRelicReporter#report. Exception was
suppressed.
java.util.ConcurrentModificationException: KafkaConsumer is not safe for
multi-threaded access. currentThread(name: metrics-newRelicReporter-1-thread-1,
id: 24) otherThread(id: 40)
at
org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:2484)
~[stormjar.jar:?]
at
org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:2465)
~[stormjar.jar:?]
at
org.apache.kafka.clients.consumer.KafkaConsumer.beginningOffsets(KafkaConsumer.java:2144)
~[stormjar.jar:?]
at
org.apache.kafka.clients.consumer.KafkaConsumer.beginningOffsets(KafkaConsumer.java:2123)
~[stormjar.jar:?]
at
org.apache.storm.kafka.spout.metrics2.KafkaOffsetPartitionMetrics.getBeginningOffsets(KafkaOffsetPartitionMetrics.java:181)
~[stormjar.jar:?]
at
org.apache.storm.kafka.spout.metrics2.KafkaOffsetPartitionMetrics$2.getValue(KafkaOffsetPartitionMetrics.java:93)
~[stormjar.jar:?]
at
org.apache.storm.kafka.spout.metrics2.KafkaOffsetPartitionMetrics$2.getValue(KafkaOffsetPartitionMetrics.java:90)
~[stormjar.jar:?]
at
com.codahale.metrics.newrelic.transformer.GaugeTransformer.transform(GaugeTransformer.java:60)
~[stormjar.jar:?]
at
com.codahale.metrics.newrelic.NewRelicReporter.lambda$transform$0(NewRelicReporter.java:154)
~[stormjar.jar:?]
at java.base/java.util.stream.ReferencePipeline$7$1.accept(Unknown Source)
~[?:?]
at
java.base/java.util.Collections$UnmodifiableMap$UnmodifiableEntrySet.lambda$entryConsumer$0(Unknown
Source) ~[?:?]
at java.base/java.util.TreeMap$EntrySpliterator.forEachRemaining(Unknown
Source) ~[?:?]
at
java.base/java.util.Collections$UnmodifiableMap$UnmodifiableEntrySet$UnmodifiableEntrySetSpliterator.forEachRemaining(Unknown
Source) ~[?:?]
at java.base/java.util.stream.AbstractPipeline.copyInto(Unknown Source)
~[?:?]
at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(Unknown
Source) ~[?:?]
at
java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(Unknown
Source) ~[?:?]
at
java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(Unknown
Source) ~[?:?]
at java.base/java.util.stream.AbstractPipeline.evaluate(Unknown Source)
~[?:?]
at java.base/java.util.stream.ReferencePipeline.forEach(Unknown Source)
~[?:?]
at java.base/java.util.stream.ReferencePipeline$7$1.accept(Unknown Source)
~[?:?]
at
java.base/java.util.Spliterators$ArraySpliterator.forEachRemaining(Unknown
Source) ~[?:?]
at java.base/java.util.stream.AbstractPipeline.copyInto(Unknown Source)
~[?:?]
at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(Unknown
Source) ~[?:?]
at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(Unknown
Source) ~[?:?]
at java.base/java.util.stream.AbstractPipeline.evaluate(Unknown Source)
~[?:?]
at java.base/java.util.stream.ReferencePipeline.collect(Unknown Source)
~[?:?]
at
com.codahale.metrics.newrelic.NewRelicReporter.report(NewRelicReporter.java:138)
~[stormjar.jar:?]
at
com.codahale.metrics.ScheduledReporter.report(ScheduledReporter.java:243)
~[metrics-core-3.2.6.jar:3.2.6]
at com.codahale.metrics.ScheduledReporter$1.run(ScheduledReporter.java:182)
[metrics-core-3.2.6.jar:3.2.6]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown
Source) [?:?]
at java.base/java.util.concurrent.FutureTask.runAndReset(Unknown Source)
[?:?]
at
java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown
Source) [?:?]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown
Source) [?:?]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
Source) [?:?]
at java.base/java.lang.Thread.run(Unknown Source) [?:?]
{quote}
h3. Workaround
Configure the with RegexFilter or similar that excludes the
KafkaOffsetPartitionMetrics.
h3. Impact
I am concerned that depending on the timing of the access to the spout that the
offending metric could fast forward or rewind the spout. I did not do any
further testing to see if the lock could be mis-managed in such a way that the
spout is directly impacted, but it is feasible. Impact may need to be adjusted
if it is confirmed that a simple metric reporter could result in skipping
events or re-processing them.
h3. Potential Code Issues:
*KafkaSpout.java*
{{private transient Consumer<K, V> consumer;}}
{{...}}
{{public void open(Map<String, Object> conf, TopologyContext context,
SpoutOutputCollector collector) {}}
{{ ...}}
{{ //this consumer will be used by the spout everywhere}}
{{ consumer =
kafkaConsumerFactory.createConsumer(kafkaSpoutConfig.getKafkaProps());}}
{{ tupleListener.open(conf, context);}}
{{ this.kafkaOffsetMetricManager}}
{{ = new KafkaOffsetMetricManager<>(() ->
Collections.unmodifiableMap(offsetManagers), () -> consumer, context);}}
() -> consumer does not appear to be a safe provider. It re-uses the same
instance of the KafkaConsumer as the KafkaSpout in another thread and
KafkaConsumer is not thread safe.
{*}KafkaOffsetPartitionMetrics.java: getBeginningOffsets,
getEndOffsets{*}{{{{}}{}}}
{{private Map<TopicPartition, Long> getBeginningOffsets(Set<TopicPartition>
topicPartitions) {}}
{{ Consumer<K, V> consumer = consumerSupplier.get();}}
{{ ...}}
{{ try {}}
{{ // This will actually try to modify the KafkaSpout instance of the
consumer which could negatively impact the spout}}
{{ beginningOffsets = consumer.beginningOffsets(topicPartitions);}}
{\\{ }}}
{{ ...}}
{{{}}{}}}{{{}private Map<TopicPartition, Long>
getEndOffsets(Set<TopicPartition> topicPartitions) {{}}}
{{ Consumer<K, V> consumer = consumerSupplier.get();}}
{{ ...}}
{{ try {}}
{{ // This will actually try to modify the KafkaSpout instance of the
consumer which could negatively impact the spout}}
{{{} endOffsets = consumer.endOffsets(topicPartitions);{}}}{\{ }
}}
{{ ...}}
{{}}}
was:
After a recent upgrade to storm-kafka-client on storm server 2.6.1, we are
seeing ConcurrentModificationException in our topology at runtime. I believe
this is due to the re-use of a KafkaConsumer instance between the KafkaSpout
and the
KafkaOffsetPartitionMetrics which were added some time between 2.4.0 and 2.6.1.
h3. Steps to Reproduce:
Configure a topology with a basic KafkaSpout. Configure the topology with one
of the metrics loggers. We used our own custom one, but reproduced it with
ConsoleStormReporter as well. The JMXReporter did not reproduce the issue for
us, but we did not dig into why.
*reporter config:*
{{topology.metrics.reporters: [}}
{{ {}}
{{ "filter": {}}
{{ "expression": ".*",}}
{{ "class": "org.apache.storm.metrics2.filters.RegexFilter"}}
{{ },}}
{{ "report.period": 15,}}
{{ "report.period.units": "SECONDS",}}
{{ "class": "org.apache.storm.metrics2.reporters.ConsoleStormReporter"}}
{{]}}
h3. Stacktrace:
{quote}[ERROR] Exception thrown from NewRelicReporter#report. Exception was
suppressed.
java.util.ConcurrentModificationException: KafkaConsumer is not safe for
multi-threaded access. currentThread(name: metrics-newRelicReporter-1-thread-1,
id: 24) otherThread(id: 40)
at
org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:2484)
~[stormjar.jar:?]
at
org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:2465)
~[stormjar.jar:?]
at
org.apache.kafka.clients.consumer.KafkaConsumer.beginningOffsets(KafkaConsumer.java:2144)
~[stormjar.jar:?]
at
org.apache.kafka.clients.consumer.KafkaConsumer.beginningOffsets(KafkaConsumer.java:2123)
~[stormjar.jar:?]
at
org.apache.storm.kafka.spout.metrics2.KafkaOffsetPartitionMetrics.getBeginningOffsets(KafkaOffsetPartitionMetrics.java:181)
~[stormjar.jar:?]
at
org.apache.storm.kafka.spout.metrics2.KafkaOffsetPartitionMetrics$2.getValue(KafkaOffsetPartitionMetrics.java:93)
~[stormjar.jar:?]
at
org.apache.storm.kafka.spout.metrics2.KafkaOffsetPartitionMetrics$2.getValue(KafkaOffsetPartitionMetrics.java:90)
~[stormjar.jar:?]
at
com.codahale.metrics.newrelic.transformer.GaugeTransformer.transform(GaugeTransformer.java:60)
~[stormjar.jar:?]
at
com.codahale.metrics.newrelic.NewRelicReporter.lambda$transform$0(NewRelicReporter.java:154)
~[stormjar.jar:?]
at java.base/java.util.stream.ReferencePipeline$7$1.accept(Unknown Source)
~[?:?]
at
java.base/java.util.Collections$UnmodifiableMap$UnmodifiableEntrySet.lambda$entryConsumer$0(Unknown
Source) ~[?:?]
at java.base/java.util.TreeMap$EntrySpliterator.forEachRemaining(Unknown
Source) ~[?:?]
at
java.base/java.util.Collections$UnmodifiableMap$UnmodifiableEntrySet$UnmodifiableEntrySetSpliterator.forEachRemaining(Unknown
Source) ~[?:?]
at java.base/java.util.stream.AbstractPipeline.copyInto(Unknown Source)
~[?:?]
at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(Unknown
Source) ~[?:?]
at
java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(Unknown
Source) ~[?:?]
at
java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(Unknown
Source) ~[?:?]
at java.base/java.util.stream.AbstractPipeline.evaluate(Unknown Source)
~[?:?]
at java.base/java.util.stream.ReferencePipeline.forEach(Unknown Source)
~[?:?]
at java.base/java.util.stream.ReferencePipeline$7$1.accept(Unknown Source)
~[?:?]
at
java.base/java.util.Spliterators$ArraySpliterator.forEachRemaining(Unknown
Source) ~[?:?]
at java.base/java.util.stream.AbstractPipeline.copyInto(Unknown Source)
~[?:?]
at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(Unknown
Source) ~[?:?]
at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(Unknown
Source) ~[?:?]
at java.base/java.util.stream.AbstractPipeline.evaluate(Unknown Source)
~[?:?]
at java.base/java.util.stream.ReferencePipeline.collect(Unknown Source)
~[?:?]
at
com.codahale.metrics.newrelic.NewRelicReporter.report(NewRelicReporter.java:138)
~[stormjar.jar:?]
at
com.codahale.metrics.ScheduledReporter.report(ScheduledReporter.java:243)
~[metrics-core-3.2.6.jar:3.2.6]
at com.codahale.metrics.ScheduledReporter$1.run(ScheduledReporter.java:182)
[metrics-core-3.2.6.jar:3.2.6]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown
Source) [?:?]
at java.base/java.util.concurrent.FutureTask.runAndReset(Unknown Source)
[?:?]
at
java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown
Source) [?:?]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown
Source) [?:?]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
Source) [?:?]
at java.base/java.lang.Thread.run(Unknown Source) [?:?]
{quote}
h3. Workaround
Configure the with RegexFilter or similar that excludes the
KafkaOffsetPartitionMetrics.
h3. Impact
I am concerned that depending on the timing of the access to the spout that the
offending metric could fast forward or rewind the spout. I did not do any
further testing to see if the lock could be mis-managed in such a way that the
spout is directly impacted, but it is feasible. Impact may need to be adjusted
if it is confirmed that a simple metric reporter could result in skipping
events or re-processing them.
h3. Potential Code Issues:
*KafkaSpout.java*
{{private transient Consumer<K, V> consumer;}}
{{...}}
{{public void open(Map<String, Object> conf, TopologyContext context,
SpoutOutputCollector collector) {}}
{{ ...}}
{{ //this consumer will be used by the spout everywhere}}
{{ consumer =
kafkaConsumerFactory.createConsumer(kafkaSpoutConfig.getKafkaProps());}}
{{ tupleListener.open(conf, context);}}
{{ this.kafkaOffsetMetricManager}}
{{ = new KafkaOffsetMetricManager<>(() ->
Collections.unmodifiableMap(offsetManagers), () -> consumer, context);}}
() -> consumer does not appear to be a safe provider. It re-uses the same
instance of the KafkaConsumer as the KafkaSpout in another thread and
KafkaConsumer is not thread safe.
{*}KafkaOffsetPartitionMetrics.java: getBeginningOffsets,
getEndOffsets{*}{{{{}}{}}}
{{private Map<TopicPartition, Long> getBeginningOffsets(Set<TopicPartition>
topicPartitions) {}}
{{ Consumer<K, V> consumer = consumerSupplier.get();}}
{{ ...}}
{{ try {}}
{{ // This will actually try to modify the KafkaSpout instance of the
consumer which could negatively impact the spout}}
{{ beginningOffsets = consumer.beginningOffsets(topicPartitions);}}
{\{ }}}
{{ ...}}
{{{}}{}}}{{{}private Map<TopicPartition, Long>
getEndOffsets(Set<TopicPartition> topicPartitions) {{}}}
{{ Consumer<K, V> consumer = consumerSupplier.get();}}
{{ ...}}
{{ try {}}
{{ // This will actually try to modify the KafkaSpout instance of the
consumer which could negatively impact the spout}}
{{ endOffsets = consumer.endOffsets(topicPartitions);}}
{\{ }}}
{{ ...}}
{{}}}
> ConcurrentModificationException on KafkaConsumer when running topology with
> Metrics Reporters
> ---------------------------------------------------------------------------------------------
>
> Key: STORM-4055
> URL: https://issues.apache.org/jira/browse/STORM-4055
> Project: Apache Storm
> Issue Type: Bug
> Components: storm-kafka-client
> Affects Versions: 2.6.1
> Reporter: Anthony Castrati
> Priority: Major
>
> After a recent upgrade to storm-kafka-client on storm server 2.6.1, we are
> seeing ConcurrentModificationException in our topology at runtime. I believe
> this is due to the re-use of a KafkaConsumer instance between the KafkaSpout
> and the
> KafkaOffsetPartitionMetrics which were added some time between 2.4.0 and
> 2.6.1.
>
> h3. Steps to Reproduce:
> Configure a topology with a basic KafkaSpout. Configure the topology with one
> of the metrics loggers. We used our own custom one, but reproduced it with
> ConsoleStormReporter as well. The JMXReporter did not reproduce the issue for
> us, but we did not dig into why.
> *reporter config:*
> {{topology.metrics.reporters: [}}
> {{ {}}
> {{ "filter": {}}
> {{ "expression": ".*",}}
> {{ "class": "org.apache.storm.metrics2.filters.RegexFilter"}}
> {{ },}}
> {{ "report.period": 15,}}
> {{ "report.period.units": "SECONDS",}}
> {{ "class": "org.apache.storm.metrics2.reporters.ConsoleStormReporter"}}
> }
> {{]}}
> h3. Stacktrace:
> {quote}[ERROR] Exception thrown from NewRelicReporter#report. Exception was
> suppressed.
> java.util.ConcurrentModificationException: KafkaConsumer is not safe for
> multi-threaded access. currentThread(name:
> metrics-newRelicReporter-1-thread-1, id: 24) otherThread(id: 40)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:2484)
> ~[stormjar.jar:?]
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:2465)
> ~[stormjar.jar:?]
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.beginningOffsets(KafkaConsumer.java:2144)
> ~[stormjar.jar:?]
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.beginningOffsets(KafkaConsumer.java:2123)
> ~[stormjar.jar:?]
> at
> org.apache.storm.kafka.spout.metrics2.KafkaOffsetPartitionMetrics.getBeginningOffsets(KafkaOffsetPartitionMetrics.java:181)
> ~[stormjar.jar:?]
> at
> org.apache.storm.kafka.spout.metrics2.KafkaOffsetPartitionMetrics$2.getValue(KafkaOffsetPartitionMetrics.java:93)
> ~[stormjar.jar:?]
> at
> org.apache.storm.kafka.spout.metrics2.KafkaOffsetPartitionMetrics$2.getValue(KafkaOffsetPartitionMetrics.java:90)
> ~[stormjar.jar:?]
> at
> com.codahale.metrics.newrelic.transformer.GaugeTransformer.transform(GaugeTransformer.java:60)
> ~[stormjar.jar:?]
> at
> com.codahale.metrics.newrelic.NewRelicReporter.lambda$transform$0(NewRelicReporter.java:154)
> ~[stormjar.jar:?]
> at java.base/java.util.stream.ReferencePipeline$7$1.accept(Unknown
> Source) ~[?:?]
> at
> java.base/java.util.Collections$UnmodifiableMap$UnmodifiableEntrySet.lambda$entryConsumer$0(Unknown
> Source) ~[?:?]
> at java.base/java.util.TreeMap$EntrySpliterator.forEachRemaining(Unknown
> Source) ~[?:?]
> at
> java.base/java.util.Collections$UnmodifiableMap$UnmodifiableEntrySet$UnmodifiableEntrySetSpliterator.forEachRemaining(Unknown
> Source) ~[?:?]
> at java.base/java.util.stream.AbstractPipeline.copyInto(Unknown Source)
> ~[?:?]
> at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(Unknown
> Source) ~[?:?]
> at
> java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(Unknown
> Source) ~[?:?]
> at
> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(Unknown
> Source) ~[?:?]
> at java.base/java.util.stream.AbstractPipeline.evaluate(Unknown Source)
> ~[?:?]
> at java.base/java.util.stream.ReferencePipeline.forEach(Unknown Source)
> ~[?:?]
> at java.base/java.util.stream.ReferencePipeline$7$1.accept(Unknown
> Source) ~[?:?]
> at
> java.base/java.util.Spliterators$ArraySpliterator.forEachRemaining(Unknown
> Source) ~[?:?]
> at java.base/java.util.stream.AbstractPipeline.copyInto(Unknown Source)
> ~[?:?]
> at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(Unknown
> Source) ~[?:?]
> at
> java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(Unknown
> Source) ~[?:?]
> at java.base/java.util.stream.AbstractPipeline.evaluate(Unknown Source)
> ~[?:?]
> at java.base/java.util.stream.ReferencePipeline.collect(Unknown Source)
> ~[?:?]
> at
> com.codahale.metrics.newrelic.NewRelicReporter.report(NewRelicReporter.java:138)
> ~[stormjar.jar:?]
> at
> com.codahale.metrics.ScheduledReporter.report(ScheduledReporter.java:243)
> ~[metrics-core-3.2.6.jar:3.2.6]
> at
> com.codahale.metrics.ScheduledReporter$1.run(ScheduledReporter.java:182)
> [metrics-core-3.2.6.jar:3.2.6]
> at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown
> Source) [?:?]
> at java.base/java.util.concurrent.FutureTask.runAndReset(Unknown Source)
> [?:?]
> at
> java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown
> Source) [?:?]
> at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown
> Source) [?:?]
> at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
> Source) [?:?]
> at java.base/java.lang.Thread.run(Unknown Source) [?:?]
> {quote}
> h3. Workaround
> Configure the with RegexFilter or similar that excludes the
> KafkaOffsetPartitionMetrics.
> h3. Impact
> I am concerned that depending on the timing of the access to the spout that
> the offending metric could fast forward or rewind the spout. I did not do any
> further testing to see if the lock could be mis-managed in such a way that
> the spout is directly impacted, but it is feasible. Impact may need to be
> adjusted if it is confirmed that a simple metric reporter could result in
> skipping events or re-processing them.
> h3. Potential Code Issues:
> *KafkaSpout.java*
> {{private transient Consumer<K, V> consumer;}}
> {{...}}
> {{public void open(Map<String, Object> conf, TopologyContext context,
> SpoutOutputCollector collector) {}}
> {{ ...}}
> {{ //this consumer will be used by the spout everywhere}}
> {{ consumer =
> kafkaConsumerFactory.createConsumer(kafkaSpoutConfig.getKafkaProps());}}
> {{ tupleListener.open(conf, context);}}
> {{ this.kafkaOffsetMetricManager}}
> {{ = new KafkaOffsetMetricManager<>(() ->
> Collections.unmodifiableMap(offsetManagers), () -> consumer, context);}}
> () -> consumer does not appear to be a safe provider. It re-uses the same
> instance of the KafkaConsumer as the KafkaSpout in another thread and
> KafkaConsumer is not thread safe.
> {*}KafkaOffsetPartitionMetrics.java: getBeginningOffsets,
> getEndOffsets{*}{{{{}}{}}}
>
>
> {{private Map<TopicPartition, Long> getBeginningOffsets(Set<TopicPartition>
> topicPartitions) {}}
> {{ Consumer<K, V> consumer = consumerSupplier.get();}}
> {{ ...}}
> {{ try {}}
> {{ // This will actually try to modify the KafkaSpout instance of the
> consumer which could negatively impact the spout}}
> {{ beginningOffsets = consumer.beginningOffsets(topicPartitions);}}
> {\\{ }}}
> {{ ...}}
> {{{}}{}}}{{{}private Map<TopicPartition, Long>
> getEndOffsets(Set<TopicPartition> topicPartitions) {{}}}
> {{ Consumer<K, V> consumer = consumerSupplier.get();}}
> {{ ...}}
> {{ try {}}
> {{ // This will actually try to modify the KafkaSpout instance of the
> consumer which could negatively impact the spout}}
> {{{} endOffsets = consumer.endOffsets(topicPartitions);{}}}{\{ }
> }}
> {{ ...}}
> {{}}}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)