jihoonson opened a new issue #7600: An empty partition can block data ingestion 
from Kafka/Kinesis
URL: https://github.com/apache/incubator-druid/issues/7600
 
 
   ### Affected Version
   
   All version since 0.10
   
   ### Description
   
   When the supervisor is restarted or reset, it first finds the valid offsets 
to continue reading from metadata storage. If there's no offsets in metadata 
storage, then it fetches the latest/earliest offset from Kafka/Kinesis per 
partition. It's implemented here.
   
   ```java
     private ImmutableMap<PartitionIdType, 
OrderedSequenceNumber<SequenceOffsetType>> 
generateStartingSequencesForPartitionGroup(
         int groupId
     )
     {
       ImmutableMap.Builder<PartitionIdType, 
OrderedSequenceNumber<SequenceOffsetType>> builder = ImmutableMap.builder();
       for (Entry<PartitionIdType, SequenceOffsetType> entry : 
partitionGroups.get(groupId).entrySet()) {
         PartitionIdType partition = entry.getKey();
         SequenceOffsetType sequence = entry.getValue();
   
         if (!getNotSetMarker().equals(sequence)) {
           // if we are given a startingOffset (set by a previous task group 
which is pending completion) then use it
           if (!isEndOfShard(sequence)) {
             builder.put(partition, makeSequenceNumber(sequence, 
useExclusiveStartSequenceNumberForNonFirstSequence()));
           }
         } else {
           // if we don't have a startingOffset (first run or we had some 
previous failures and reset the sequences) then
           // get the sequence from metadata storage (if available) or 
Kafka/Kinesis (otherwise)
           OrderedSequenceNumber<SequenceOffsetType> offsetFromStorage = 
getOffsetFromStorageForPartition(partition);
   
           if (offsetFromStorage != null) {
             builder.put(partition, offsetFromStorage);
           }
         }
       }
       return builder.build();
     }
   ```
   
   If the supervisor sees an exception while fetching the offset from 
Kafka/Kinesis for some reason (e.g., timeout because of an empty partition), 
then this method would return immediately and handling the runNotice would fail 
because of the exception. This may block the entire data ingestion until any 
data is ingested to all partitions.
   
   I think the supervisor should be able to skip some partitions if it sees 
some error while getting the offset and continue ingestion for other partitions.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to