This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push:
new 8732329 Fix log level and throw NPE on null currOffset in
SeekableStreamIndexTaskRunner (#7253)
8732329 is described below
commit 873232954fdba3490bd8c723a6bc157d0e256c1c
Author: Jihoon Son <[email protected]>
AuthorDate: Wed Mar 13 07:20:43 2019 -0700
Fix log level and throw NPE on null currOffset in
SeekableStreamIndexTaskRunner (#7253)
---
.../SeekableStreamIndexTaskRunner.java | 35 ++++++++++++----------
1 file changed, 19 insertions(+), 16 deletions(-)
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
index 7eee9dc..367b201 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
@@ -1887,22 +1887,25 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
{
// Check only for the first record among the record batch.
if (initialOffsetsSnapshot.contains(record.getPartitionId())) {
- final SequenceOffsetType currOffset =
currOffsets.get(record.getPartitionId());
- if (currOffset != null) {
- final OrderedSequenceNumber<SequenceOffsetType> recordSequenceNumber =
createSequenceNumber(
- record.getSequenceNumber()
- );
- final OrderedSequenceNumber<SequenceOffsetType> currentSequenceNumber
= createSequenceNumber(
- currOffset
+ final SequenceOffsetType currOffset = Preconditions.checkNotNull(
+ currOffsets.get(record.getPartitionId()),
+ "Current offset is null for sequenceNumber[%s] and partitionId[%s]",
+ record.getSequenceNumber(),
+ record.getPartitionId()
+ );
+ final OrderedSequenceNumber<SequenceOffsetType> recordSequenceNumber =
createSequenceNumber(
+ record.getSequenceNumber()
+ );
+ final OrderedSequenceNumber<SequenceOffsetType> currentSequenceNumber =
createSequenceNumber(
+ currOffset
+ );
+ if (recordSequenceNumber.compareTo(currentSequenceNumber) < 0) {
+ throw new ISE(
+ "sequenceNumber of the start record[%s] is smaller than current
sequenceNumber[%s] for partition[%s]",
+ record.getSequenceNumber(),
+ currOffset,
+ record.getPartitionId()
);
- if (recordSequenceNumber.compareTo(currentSequenceNumber) < 0) {
- throw new ISE(
- "sequenceNumber of the start record[%s] is smaller than current
sequenceNumber[%s] for partition [%s]",
- record.getSequenceNumber(),
- currOffset,
- record.getPartitionId()
- );
- }
}
// Remove the mark to notify that this partition has been read.
@@ -1910,7 +1913,7 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
// check exclusive starting sequence
if (isStartingSequenceOffsetsExclusive() &&
exclusiveStartingPartitions.contains(record.getPartitionId())) {
- log.warn("Skipping starting sequenceNumber for partition [%s] marked
exclusive", record.getPartitionId());
+ log.info("Skipping starting sequenceNumber for partition[%s] marked
exclusive", record.getPartitionId());
return false;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]