fjy closed pull request #6015: Check the kafka topic when comparing checkpoints
from tasks with the one stored in metastore
URL: https://github.com/apache/incubator-druid/pull/6015
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java
b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java
index ed287fa0591..046331e946f 100644
---
a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java
+++
b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java
@@ -514,9 +514,7 @@ public void checkpoint(int taskGroupId, DataSourceMetadata
previousCheckpoint, D
Preconditions.checkNotNull(previousCheckpoint, "previousCheckpoint");
Preconditions.checkNotNull(currentCheckpoint, "current checkpoint cannot
be null");
Preconditions.checkArgument(
- ioConfig.getTopic()
- .equals(((KafkaDataSourceMetadata)
currentCheckpoint).getKafkaPartitions()
-
.getTopic()),
+ ioConfig.getTopic().equals(((KafkaDataSourceMetadata)
currentCheckpoint).getKafkaPartitions().getTopic()),
"Supervisor topic [%s] and topic in checkpoint [%s] does not match",
ioConfig.getTopic(),
((KafkaDataSourceMetadata)
currentCheckpoint).getKafkaPartitions().getTopic()
@@ -661,6 +659,8 @@ public void handle() throws ExecutionException,
InterruptedException
int index = checkpoints.size();
for (int sequenceId : checkpoints.descendingKeySet()) {
Map<Integer, Long> checkpoint = checkpoints.get(sequenceId);
+ // We have already verified the topic of the current checkpoint is
same with that in ioConfig.
+ // See checkpoint().
if
(checkpoint.equals(previousCheckpoint.getKafkaPartitions().getPartitionOffsetMap()))
{
break;
}
@@ -1183,16 +1183,22 @@ public void onFailure(Throwable t)
Futures.allAsList(futures).get(futureTimeoutInSeconds, TimeUnit.SECONDS);
}
catch (Exception e) {
- Throwables.propagate(e);
+ throw new RuntimeException(e);
}
final KafkaDataSourceMetadata latestDataSourceMetadata =
(KafkaDataSourceMetadata) indexerMetadataStorageCoordinator
.getDataSourceMetadata(dataSource);
- final Map<Integer, Long> latestOffsetsFromDb = (latestDataSourceMetadata
== null
- ||
latestDataSourceMetadata.getKafkaPartitions() == null) ? null
-
: latestDataSourceMetadata
- .getKafkaPartitions()
-
.getPartitionOffsetMap();
+ final boolean hasValidOffsetsFromDb = latestDataSourceMetadata != null &&
+
latestDataSourceMetadata.getKafkaPartitions() != null &&
+ ioConfig.getTopic().equals(
+
latestDataSourceMetadata.getKafkaPartitions().getTopic()
+ );
+ final Map<Integer, Long> latestOffsetsFromDb;
+ if (hasValidOffsetsFromDb) {
+ latestOffsetsFromDb =
latestDataSourceMetadata.getKafkaPartitions().getPartitionOffsetMap();
+ } else {
+ latestOffsetsFromDb = null;
+ }
// order tasks of this taskGroup by the latest sequenceId
taskSequences.sort((o1, o2) ->
o2.rhs.firstKey().compareTo(o1.rhs.firstKey()));
@@ -1203,22 +1209,21 @@ public void onFailure(Throwable t)
while (taskIndex < taskSequences.size()) {
if (earliestConsistentSequenceId.get() == -1) {
- // find the first replica task with earliest sequenceId consistent
with datasource metadata in the metadata store
+ // find the first replica task with earliest sequenceId consistent
with datasource metadata in the metadata
+ // store
if (taskSequences.get(taskIndex).rhs.entrySet().stream().anyMatch(
sequenceCheckpoint ->
sequenceCheckpoint.getValue().entrySet().stream().allMatch(
partitionOffset -> Longs.compare(
partitionOffset.getValue(),
- latestOffsetsFromDb == null
- ?
- partitionOffset.getValue()
- :
latestOffsetsFromDb.getOrDefault(partitionOffset.getKey(),
partitionOffset.getValue())
+ latestOffsetsFromDb == null ?
+ partitionOffset.getValue() :
+ latestOffsetsFromDb.getOrDefault(partitionOffset.getKey(),
partitionOffset.getValue())
) == 0) && earliestConsistentSequenceId.compareAndSet(-1,
sequenceCheckpoint.getKey())) || (
pendingCompletionTaskGroups.getOrDefault(groupId,
EMPTY_LIST).size() > 0
&& earliestConsistentSequenceId.compareAndSet(-1,
taskSequences.get(taskIndex).rhs.firstKey()))) {
- final SortedMap<Integer, Map<Integer, Long>> latestCheckpoints = new
TreeMap<>(taskSequences.get(taskIndex).rhs
-
.tailMap(
-
earliestConsistentSequenceId
-
.get()));
+ final SortedMap<Integer, Map<Integer, Long>> latestCheckpoints = new
TreeMap<>(
+
taskSequences.get(taskIndex).rhs.tailMap(earliestConsistentSequenceId.get())
+ );
log.info("Setting taskGroup sequences to [%s] for group [%d]",
latestCheckpoints, groupId);
taskGroup.sequenceOffsets.clear();
taskGroup.sequenceOffsets.putAll(latestCheckpoints);
@@ -1262,7 +1267,8 @@ public void onFailure(Throwable t)
taskSequences.stream().filter(taskIdSequences ->
tasksToKill.contains(taskIdSequences.lhs)).forEach(
sequenceCheckpoint -> {
log.warn(
- "Killing task [%s], as its checkpoints [%s] are not consistent
with group checkpoints[%s] or latest persisted offsets in metadata store [%s]",
+ "Killing task [%s], as its checkpoints [%s] are not consistent
with group checkpoints[%s] or latest "
+ + "persisted offsets in metadata store [%s]",
sequenceCheckpoint.lhs,
sequenceCheckpoint.rhs,
taskGroup.sequenceOffsets,
@@ -1270,7 +1276,8 @@ public void onFailure(Throwable t)
);
killTask(sequenceCheckpoint.lhs);
taskGroup.tasks.remove(sequenceCheckpoint.lhs);
- });
+ }
+ );
}
private void addDiscoveredTaskToPendingCompletionTaskGroups(
@@ -1849,7 +1856,7 @@ private void createKafkaTasksForGroup(int groupId, int
replicas) throws JsonProc
private long getOffsetFromStorageForPartition(int partition)
{
long offset;
- Map<Integer, Long> metadataOffsets = getOffsetsFromMetadataStorage();
+ final Map<Integer, Long> metadataOffsets = getOffsetsFromMetadataStorage();
if (metadataOffsets.get(partition) != null) {
offset = metadataOffsets.get(partition);
log.debug("Getting offset [%,d] from metadata storage for partition
[%d]", offset, partition);
@@ -1877,8 +1884,8 @@ private long getOffsetFromStorageForPartition(int
partition)
private Map<Integer, Long> getOffsetsFromMetadataStorage()
{
- DataSourceMetadata dataSourceMetadata =
indexerMetadataStorageCoordinator.getDataSourceMetadata(dataSource);
- if (dataSourceMetadata != null && dataSourceMetadata instanceof
KafkaDataSourceMetadata) {
+ final DataSourceMetadata dataSourceMetadata =
indexerMetadataStorageCoordinator.getDataSourceMetadata(dataSource);
+ if (dataSourceMetadata instanceof KafkaDataSourceMetadata) {
KafkaPartitions partitions = ((KafkaDataSourceMetadata)
dataSourceMetadata).getKafkaPartitions();
if (partitions != null) {
if (!ioConfig.getTopic().equals(partitions.getTopic())) {
@@ -1887,14 +1894,14 @@ private long getOffsetFromStorageForPartition(int
partition)
partitions.getTopic(),
ioConfig.getTopic()
);
- return ImmutableMap.of();
+ return Collections.emptyMap();
} else if (partitions.getPartitionOffsetMap() != null) {
return partitions.getPartitionOffsetMap();
}
}
}
- return ImmutableMap.of();
+ return Collections.emptyMap();
}
private long getOffsetFromKafkaForPartition(int partition, boolean
useEarliestOffset)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]