sivabalan narayanan created HUDI-6502:
-----------------------------------------

             Summary: Fix NPE when there is mismatch in num of kafka partitions 
                 Key: HUDI-6502
                 URL: https://issues.apache.org/jira/browse/HUDI-6502
             Project: Apache Hudi
          Issue Type: Bug
          Components: deltastreamer
            Reporter: sivabalan narayanan


lets say latest checkpoint in deltastreamer has 5 kafka partition offset. Users 
deleted and re-created and now there is only one kafka partition. So, when 
checking for new offsets, we run into NPE. 

We might need to fix the parsing logic to accommodate only new partitions. 

stacktrace:
{code:java}
23/07/06 16:07:52 ERROR DeltaStreamer  : Failed to run job for table: ABC
java.lang.NullPointerException
        at 
org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.lambda$fetchValidOffsets$1(KafkaOffsetGen.java:404)
        at java.util.stream.MatchOps$1MatchSink.accept(MatchOps.java:90)
        at java.util.HashMap$EntrySpliterator.tryAdvance(HashMap.java:1744)
        at 
java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:126)
        at 
java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:499)
        at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:486)
        at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
        at 
java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:230)
        at 
java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:196)
        at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
        at 
java.util.stream.ReferencePipeline.anyMatch(ReferencePipeline.java:516)
        at 
org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.fetchValidOffsets(KafkaOffsetGen.java:404)
        at 
org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.getNextOffsetRanges(KafkaOffsetGen.java:317)
        at 
org.apache.hudi.utilities.sources.KafkaSource.fetchNewData(KafkaSource.java:67)
        at org.apache.hudi.utilities.sources.Source.fetchNext(Source.java:105)
        at 
org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter.fetchNewDataInRowFormat(SourceFormatAdapter.java:288)
        at 
org.apache.hudi.utilities.deltastreamer.DeltaSync.fetchFromSource(DeltaSync.java:477)
        at 
org.apache.hudi.utilities.deltastreamer.DeltaSync.readFromSource(DeltaSync.java:451)
        at 
org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:358)
        at 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.ingestOnce(HoodieDeltaStreamer.java:876)
        at org.apache.hudi.common.util.Option.ifPresent(Option.java:97)
        at {code}
Code snippet of interest 
{code:java}
private Map<TopicPartition, Long> fetchValidOffsets(KafkaConsumer consumer,
                                                      Option<String> 
lastCheckpointStr, Set<TopicPartition> topicPartitions) {
  Map<TopicPartition, Long> earliestOffsets = 
consumer.beginningOffsets(topicPartitions);
  Map<TopicPartition, Long> checkpointOffsets = 
CheckpointUtils.strToOffsets(lastCheckpointStr.get());
  boolean isCheckpointOutOfBounds = checkpointOffsets.entrySet().stream()
      .anyMatch(offset -> offset.getValue() < 
earliestOffsets.get(offset.getKey())); {code}
last time where we do earliestOffsets.get(offset.getKey()) runs into NPE. 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to