sivabalan narayanan created HUDI-6502:
-----------------------------------------
Summary: Fix NPE when there is mismatch in num of kafka partitions
Key: HUDI-6502
URL: https://issues.apache.org/jira/browse/HUDI-6502
Project: Apache Hudi
Issue Type: Bug
Components: deltastreamer
Reporter: sivabalan narayanan
lets say latest checkpoint in deltastreamer has 5 kafka partition offset. Users
deleted and re-created and now there is only one kafka partition. So, when
checking for new offsets, we run into NPE.
We might need to fix the parsing logic to accommodate only new partitions.
stacktrace:
{code:java}
23/07/06 16:07:52 ERROR DeltaStreamer : Failed to run job for table: ABC
java.lang.NullPointerException
at
org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.lambda$fetchValidOffsets$1(KafkaOffsetGen.java:404)
at java.util.stream.MatchOps$1MatchSink.accept(MatchOps.java:90)
at java.util.HashMap$EntrySpliterator.tryAdvance(HashMap.java:1744)
at
java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:126)
at
java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:499)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:486)
at
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
at
java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:230)
at
java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:196)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at
java.util.stream.ReferencePipeline.anyMatch(ReferencePipeline.java:516)
at
org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.fetchValidOffsets(KafkaOffsetGen.java:404)
at
org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.getNextOffsetRanges(KafkaOffsetGen.java:317)
at
org.apache.hudi.utilities.sources.KafkaSource.fetchNewData(KafkaSource.java:67)
at org.apache.hudi.utilities.sources.Source.fetchNext(Source.java:105)
at
org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter.fetchNewDataInRowFormat(SourceFormatAdapter.java:288)
at
org.apache.hudi.utilities.deltastreamer.DeltaSync.fetchFromSource(DeltaSync.java:477)
at
org.apache.hudi.utilities.deltastreamer.DeltaSync.readFromSource(DeltaSync.java:451)
at
org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:358)
at
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.ingestOnce(HoodieDeltaStreamer.java:876)
at org.apache.hudi.common.util.Option.ifPresent(Option.java:97)
at {code}
Code snippet of interest
{code:java}
private Map<TopicPartition, Long> fetchValidOffsets(KafkaConsumer consumer,
Option<String>
lastCheckpointStr, Set<TopicPartition> topicPartitions) {
Map<TopicPartition, Long> earliestOffsets =
consumer.beginningOffsets(topicPartitions);
Map<TopicPartition, Long> checkpointOffsets =
CheckpointUtils.strToOffsets(lastCheckpointStr.get());
boolean isCheckpointOutOfBounds = checkpointOffsets.entrySet().stream()
.anyMatch(offset -> offset.getValue() <
earliestOffsets.get(offset.getKey())); {code}
last time where we do earliestOffsets.get(offset.getKey()) runs into NPE.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)