This is an automated email from the ASF dual-hosted git repository.
jonwei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push:
new c48aa74 Fix NPE while handling CheckpointNotice in KafkaSupervisor
(#5996)
c48aa74 is described below
commit c48aa74a301a11f49b0d6ba6bde4283bdab7f699
Author: Jihoon Son <[email protected]>
AuthorDate: Fri Jul 13 17:14:57 2018 -0700
Fix NPE while handling CheckpointNotice in KafkaSupervisor (#5996)
* Fix NPE while handling CheckpointNotice
* fix code style
* Fix test
* fix test
* add a log for creating a new taskGroup
* fix backward compatibility in KafkaIOConfig
---
.../MaterializedViewSupervisor.java | 7 +-
.../IncrementalPublishingKafkaIndexTaskRunner.java | 7 +-
.../io/druid/indexing/kafka/KafkaIOConfig.java | 15 +-
.../indexing/kafka/supervisor/KafkaSupervisor.java | 185 ++++++-----
.../io/druid/indexing/kafka/KafkaIOConfigTest.java | 9 +
.../druid/indexing/kafka/KafkaIndexTaskTest.java | 106 ++++---
.../kafka/supervisor/KafkaSupervisorTest.java | 344 +++++++++++++++++----
.../CheckPointDataSourceMetadataAction.java | 27 +-
.../overlord/supervisor/SupervisorManager.java | 8 +-
.../io/druid/java/util/emitter/EmittingLogger.java | 4 +
.../overlord/supervisor/NoopSupervisorSpec.java | 6 +-
.../indexing/overlord/supervisor/Supervisor.java | 9 +-
.../metrics/ExceptionCapturingServiceEmitter.java | 71 +++++
13 files changed, 579 insertions(+), 219 deletions(-)
diff --git
a/extensions-contrib/materialized-view-maintenance/src/main/java/io/druid/indexing/materializedview/MaterializedViewSupervisor.java
b/extensions-contrib/materialized-view-maintenance/src/main/java/io/druid/indexing/materializedview/MaterializedViewSupervisor.java
index eba35a4..fedda09 100644
---
a/extensions-contrib/materialized-view-maintenance/src/main/java/io/druid/indexing/materializedview/MaterializedViewSupervisor.java
+++
b/extensions-contrib/materialized-view-maintenance/src/main/java/io/druid/indexing/materializedview/MaterializedViewSupervisor.java
@@ -53,7 +53,6 @@ import io.druid.timeline.DataSegment;
import org.joda.time.Duration;
import org.joda.time.Interval;
-import javax.annotation.Nullable;
import java.io.IOException;
import java.util.List;
import java.util.Map;
@@ -240,11 +239,7 @@ public class MaterializedViewSupervisor implements
Supervisor
}
@Override
- public void checkpoint(
- @Nullable String sequenceName,
- @Nullable DataSourceMetadata previousCheckPoint,
- @Nullable DataSourceMetadata currentCheckPoint
- )
+ public void checkpoint(int taskGroupId, DataSourceMetadata
previousCheckPoint, DataSourceMetadata currentCheckPoint)
{
// do nothing
}
diff --git
a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
index 3a440b1..a93fde6 100644
---
a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
+++
b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
@@ -600,12 +600,13 @@ public class IncrementalPublishingKafkaIndexTaskRunner
implements KafkaIndexTask
sequences
);
requestPause();
- if (!toolbox.getTaskActionClient().submit(new
CheckPointDataSourceMetadataAction(
+ final CheckPointDataSourceMetadataAction checkpointAction = new
CheckPointDataSourceMetadataAction(
task.getDataSource(),
- ioConfig.getBaseSequenceName(),
+ ioConfig.getTaskGroupId(),
new KafkaDataSourceMetadata(new KafkaPartitions(topic,
sequenceToCheckpoint.getStartOffsets())),
new KafkaDataSourceMetadata(new KafkaPartitions(topic,
nextOffsets))
- ))) {
+ );
+ if (!toolbox.getTaskActionClient().submit(checkpointAction)) {
throw new ISE("Checkpoint request with offsets [%s] failed,
dying", nextOffsets);
}
}
diff --git
a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIOConfig.java
b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIOConfig.java
index 4dd3aaf..b6c1d76 100644
---
a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIOConfig.java
+++
b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIOConfig.java
@@ -26,6 +26,7 @@ import com.google.common.base.Preconditions;
import io.druid.segment.indexing.IOConfig;
import org.joda.time.DateTime;
+import javax.annotation.Nullable;
import java.util.Map;
public class KafkaIOConfig implements IOConfig
@@ -33,6 +34,8 @@ public class KafkaIOConfig implements IOConfig
private static final boolean DEFAULT_USE_TRANSACTION = true;
private static final boolean DEFAULT_SKIP_OFFSET_GAPS = false;
+ @Nullable
+ private final Integer taskGroupId;
private final String baseSequenceName;
private final KafkaPartitions startPartitions;
private final KafkaPartitions endPartitions;
@@ -44,6 +47,7 @@ public class KafkaIOConfig implements IOConfig
@JsonCreator
public KafkaIOConfig(
+ @JsonProperty("taskGroupId") @Nullable Integer taskGroupId, // can be
null for backward compabitility
@JsonProperty("baseSequenceName") String baseSequenceName,
@JsonProperty("startPartitions") KafkaPartitions startPartitions,
@JsonProperty("endPartitions") KafkaPartitions endPartitions,
@@ -54,6 +58,7 @@ public class KafkaIOConfig implements IOConfig
@JsonProperty("skipOffsetGaps") Boolean skipOffsetGaps
)
{
+ this.taskGroupId = taskGroupId;
this.baseSequenceName = Preconditions.checkNotNull(baseSequenceName,
"baseSequenceName");
this.startPartitions = Preconditions.checkNotNull(startPartitions,
"startPartitions");
this.endPartitions = Preconditions.checkNotNull(endPartitions,
"endPartitions");
@@ -83,6 +88,13 @@ public class KafkaIOConfig implements IOConfig
}
}
+ @Nullable
+ @JsonProperty
+ public Integer getTaskGroupId()
+ {
+ return taskGroupId;
+ }
+
@JsonProperty
public String getBaseSequenceName()
{
@@ -135,7 +147,8 @@ public class KafkaIOConfig implements IOConfig
public String toString()
{
return "KafkaIOConfig{" +
- "baseSequenceName='" + baseSequenceName + '\'' +
+ "taskGroupId=" + taskGroupId +
+ ", baseSequenceName='" + baseSequenceName + '\'' +
", startPartitions=" + startPartitions +
", endPartitions=" + endPartitions +
", consumerProperties=" + consumerProperties +
diff --git
a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java
b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java
index 8ea13ef..ed287fa 100644
---
a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java
+++
b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java
@@ -143,8 +143,7 @@ public class KafkaSupervisor implements Supervisor
* time, there should only be up to a maximum of [taskCount]
actively-reading task groups (tracked in the [taskGroups]
* map) + zero or more pending-completion task groups (tracked in
[pendingCompletionTaskGroups]).
*/
- @VisibleForTesting
- static class TaskGroup
+ private static class TaskGroup
{
// This specifies the partitions and starting offsets for this task group.
It is set on group creation from the data
// in [partitionGroups] and never changes during the lifetime of this task
group, which will live until a task in
@@ -159,7 +158,7 @@ public class KafkaSupervisor implements Supervisor
DateTime completionTimeout; // is set after signalTasksToFinish(); if not
done by timeout, take corrective action
final TreeMap<Integer, Map<Integer, Long>> sequenceOffsets = new
TreeMap<>();
- public TaskGroup(
+ TaskGroup(
ImmutableMap<Integer, Long> partitionOffsets,
Optional<DateTime> minimumMessageTime,
Optional<DateTime> maximumMessageTime
@@ -171,7 +170,7 @@ public class KafkaSupervisor implements Supervisor
this.sequenceOffsets.put(0, partitionOffsets);
}
- public int addNewCheckpoint(Map<Integer, Long> checkpoint)
+ int addNewCheckpoint(Map<Integer, Long> checkpoint)
{
sequenceOffsets.put(sequenceOffsets.lastKey() + 1, checkpoint);
return sequenceOffsets.lastKey();
@@ -212,9 +211,6 @@ public class KafkaSupervisor implements Supervisor
private final ConcurrentHashMap<Integer, ConcurrentHashMap<Integer, Long>>
partitionGroups = new ConcurrentHashMap<>();
// --------------------------------------------------------
- // BaseSequenceName -> TaskGroup
- private final ConcurrentHashMap<String, TaskGroup> sequenceTaskGroup = new
ConcurrentHashMap<>();
-
private final TaskStorage taskStorage;
private final TaskMaster taskMaster;
private final IndexerMetadataStorageCoordinator
indexerMetadataStorageCoordinator;
@@ -513,13 +509,9 @@ public class KafkaSupervisor implements Supervisor
}
@Override
- public void checkpoint(
- String sequenceName,
- DataSourceMetadata previousCheckpoint,
- DataSourceMetadata currentCheckpoint
- )
+ public void checkpoint(int taskGroupId, DataSourceMetadata
previousCheckpoint, DataSourceMetadata currentCheckpoint)
{
- Preconditions.checkNotNull(sequenceName, "Cannot checkpoint without a
sequence name");
+ Preconditions.checkNotNull(previousCheckpoint, "previousCheckpoint");
Preconditions.checkNotNull(currentCheckpoint, "current checkpoint cannot
be null");
Preconditions.checkArgument(
ioConfig.getTopic()
@@ -530,12 +522,14 @@ public class KafkaSupervisor implements Supervisor
((KafkaDataSourceMetadata)
currentCheckpoint).getKafkaPartitions().getTopic()
);
- log.info("Checkpointing [%s] for sequence [%s]", currentCheckpoint,
sequenceName);
- notices.add(new CheckpointNotice(
- sequenceName,
- (KafkaDataSourceMetadata) previousCheckpoint,
- (KafkaDataSourceMetadata) currentCheckpoint
- ));
+ log.info("Checkpointing [%s] for taskGroup [%s]", currentCheckpoint,
taskGroupId);
+ notices.add(
+ new CheckpointNotice(
+ taskGroupId,
+ (KafkaDataSourceMetadata) previousCheckpoint,
+ (KafkaDataSourceMetadata) currentCheckpoint
+ )
+ );
}
public void possiblyRegisterListener()
@@ -637,17 +631,17 @@ public class KafkaSupervisor implements Supervisor
private class CheckpointNotice implements Notice
{
- final String sequenceName;
+ final int taskGroupId;
final KafkaDataSourceMetadata previousCheckpoint;
final KafkaDataSourceMetadata currentCheckpoint;
CheckpointNotice(
- String sequenceName,
+ int taskGroupId,
KafkaDataSourceMetadata previousCheckpoint,
KafkaDataSourceMetadata currentCheckpoint
)
{
- this.sequenceName = sequenceName;
+ this.taskGroupId = taskGroupId;
this.previousCheckpoint = previousCheckpoint;
this.currentCheckpoint = currentCheckpoint;
}
@@ -658,17 +652,12 @@ public class KafkaSupervisor implements Supervisor
// check for consistency
// if already received request for this sequenceName and
dataSourceMetadata combination then return
- Preconditions.checkNotNull(
- sequenceTaskGroup.get(sequenceName),
- "WTH?! cannot find task group for this sequence [%s],
sequencesTaskGroup map [%s], taskGroups [%s]",
- sequenceName,
- sequenceTaskGroup,
- taskGroups
- );
- final TreeMap<Integer, Map<Integer, Long>> checkpoints =
sequenceTaskGroup.get(sequenceName).sequenceOffsets;
+ final TaskGroup taskGroup = taskGroups.get(taskGroupId);
+
+ if (isValidTaskGroup(taskGroup)) {
+ final TreeMap<Integer, Map<Integer, Long>> checkpoints =
taskGroup.sequenceOffsets;
- // check validity of previousCheckpoint if it is not null
- if (previousCheckpoint != null) {
+ // check validity of previousCheckpoint
int index = checkpoints.size();
for (int sequenceId : checkpoints.descendingKeySet()) {
Map<Integer, Long> checkpoint = checkpoints.get(sequenceId);
@@ -685,26 +674,39 @@ public class KafkaSupervisor implements Supervisor
log.info("Already checkpointed with offsets [%s]",
checkpoints.lastEntry().getValue());
return;
}
- } else {
- // There cannot be more than one checkpoint when previous checkpoint
is null
- // as when the task starts they are sent existing checkpoints
- Preconditions.checkState(
- checkpoints.size() <= 1,
- "Got checkpoint request with null as previous check point, however
found more than one checkpoints"
+ final int taskGroupId = getTaskGroupIdForPartition(
+ currentCheckpoint.getKafkaPartitions()
+ .getPartitionOffsetMap()
+ .keySet()
+ .iterator()
+ .next()
);
- if (checkpoints.size() == 1) {
- log.info("Already checkpointed with dataSourceMetadata [%s]",
checkpoints.get(0));
- return;
+ final Map<Integer, Long> newCheckpoint =
checkpointTaskGroup(taskGroupId, false).get();
+ taskGroups.get(taskGroupId).addNewCheckpoint(newCheckpoint);
+ log.info("Handled checkpoint notice, new checkpoint is [%s] for
taskGroup [%s]", newCheckpoint, taskGroupId);
+ }
+ }
+
+ private boolean isValidTaskGroup(@Nullable TaskGroup taskGroup)
+ {
+ if (taskGroup == null) {
+ // taskGroup might be in pendingCompletionTaskGroups or partitionGroups
+ if (pendingCompletionTaskGroups.containsKey(taskGroupId)) {
+ log.warn(
+ "Ignoring checkpoint request because taskGroup[%d] has already
stopped indexing and is waiting for "
+ + "publishing segments",
+ taskGroupId
+ );
+ return false;
+ } else if (partitionGroups.containsKey(taskGroupId)) {
+ log.warn("Ignoring checkpoint request because taskGroup[%d] is
inactive", taskGroupId);
+ return false;
+ } else {
+ throw new ISE("WTH?! cannot find taskGroup [%s] among all taskGroups
[%s]", taskGroupId, taskGroups);
}
}
- final int taskGroupId =
getTaskGroupIdForPartition(currentCheckpoint.getKafkaPartitions()
-
.getPartitionOffsetMap()
-
.keySet()
-
.iterator()
-
.next());
- final Map<Integer, Long> newCheckpoint =
checkpointTaskGroup(taskGroupId, false).get();
- sequenceTaskGroup.get(sequenceName).addNewCheckpoint(newCheckpoint);
- log.info("Handled checkpoint notice, new checkpoint is [%s] for sequence
[%s]", newCheckpoint, sequenceName);
+
+ return true;
}
}
@@ -718,7 +720,6 @@ public class KafkaSupervisor implements Supervisor
taskGroups.values().forEach(this::killTasksInGroup);
taskGroups.clear();
partitionGroups.clear();
- sequenceTaskGroup.clear();
} else if (!(dataSourceMetadata instanceof KafkaDataSourceMetadata)) {
throw new IAE("Expected KafkaDataSourceMetadata but found instance of
[%s]", dataSourceMetadata.getClass());
} else {
@@ -778,8 +779,7 @@ public class KafkaSupervisor implements Supervisor
resetKafkaMetadata.getKafkaPartitions().getPartitionOffsetMap().keySet().forEach(partition
-> {
final int groupId = getTaskGroupIdForPartition(partition);
killTaskGroupForPartitions(ImmutableSet.of(partition));
- final TaskGroup removedGroup = taskGroups.remove(groupId);
- sequenceTaskGroup.remove(generateSequenceName(removedGroup));
+ taskGroups.remove(groupId);
partitionGroups.get(groupId).replaceAll((partitionId, offset) ->
NOT_SET);
});
} else {
@@ -955,9 +955,10 @@ public class KafkaSupervisor implements Supervisor
for (int partition = 0; partition < numPartitions; partition++) {
int taskGroupId = getTaskGroupIdForPartition(partition);
- partitionGroups.putIfAbsent(taskGroupId, new ConcurrentHashMap<Integer,
Long>());
-
- ConcurrentHashMap<Integer, Long> partitionMap =
partitionGroups.get(taskGroupId);
+ ConcurrentHashMap<Integer, Long> partitionMap =
partitionGroups.computeIfAbsent(
+ taskGroupId,
+ k -> new ConcurrentHashMap<>()
+ );
// The starting offset for a new partition in [partitionGroups] is
initially set to NOT_SET; when a new task group
// is created and is assigned partitions, if the offset in
[partitionGroups] is NOT_SET it will take the starting
@@ -1087,23 +1088,21 @@ public class KafkaSupervisor implements Supervisor
}
return false;
} else {
- final TaskGroup taskGroup = new TaskGroup(
- ImmutableMap.copyOf(
- kafkaTask.getIOConfig()
- .getStartPartitions()
- .getPartitionOffsetMap()
- ),
kafkaTask.getIOConfig().getMinimumMessageTime(),
- kafkaTask.getIOConfig().getMaximumMessageTime()
- );
- if (taskGroups.putIfAbsent(
+ final TaskGroup taskGroup =
taskGroups.computeIfAbsent(
taskGroupId,
- taskGroup
- ) == null) {
-
sequenceTaskGroup.put(generateSequenceName(taskGroup),
taskGroups.get(taskGroupId));
- log.info("Created new task group [%d]",
taskGroupId);
- }
+ k -> {
+ log.info("Creating a new task group for
taskGroupId[%d]", taskGroupId);
+ return new TaskGroup(
+ ImmutableMap.copyOf(
+
kafkaTask.getIOConfig().getStartPartitions().getPartitionOffsetMap()
+ ),
+
kafkaTask.getIOConfig().getMinimumMessageTime(),
+
kafkaTask.getIOConfig().getMaximumMessageTime()
+ );
+ }
+ );
taskGroupsToVerify.add(taskGroupId);
-
taskGroups.get(taskGroupId).tasks.putIfAbsent(taskId, new TaskData());
+ taskGroup.tasks.putIfAbsent(taskId, new
TaskData());
}
}
return true;
@@ -1256,7 +1255,6 @@ public class KafkaSupervisor implements Supervisor
// killing all tasks or no task left in the group ?
// clear state about the taskgroup so that get latest offset information
is fetched from metadata store
log.warn("Clearing task group [%d] information as no valid tasks left
the group", groupId);
- sequenceTaskGroup.remove(generateSequenceName(taskGroup));
taskGroups.remove(groupId);
partitionGroups.get(groupId).replaceAll((partition, offset) -> NOT_SET);
}
@@ -1281,9 +1279,10 @@ public class KafkaSupervisor implements Supervisor
Map<Integer, Long> startingPartitions
)
{
- pendingCompletionTaskGroups.putIfAbsent(groupId,
Lists.<TaskGroup>newCopyOnWriteArrayList());
-
- CopyOnWriteArrayList<TaskGroup> taskGroupList =
pendingCompletionTaskGroups.get(groupId);
+ final CopyOnWriteArrayList<TaskGroup> taskGroupList =
pendingCompletionTaskGroups.computeIfAbsent(
+ groupId,
+ k -> new CopyOnWriteArrayList<>()
+ );
for (TaskGroup taskGroup : taskGroupList) {
if (taskGroup.partitionOffsets.equals(startingPartitions)) {
if (taskGroup.tasks.putIfAbsent(taskId, new TaskData()) == null) {
@@ -1411,8 +1410,7 @@ public class KafkaSupervisor implements Supervisor
if (endOffsets != null) {
// set a timeout and put this group in pendingCompletionTaskGroups so
that it can be monitored for completion
group.completionTimeout =
DateTimes.nowUtc().plus(ioConfig.getCompletionTimeout());
- pendingCompletionTaskGroups.putIfAbsent(groupId,
Lists.<TaskGroup>newCopyOnWriteArrayList());
- pendingCompletionTaskGroups.get(groupId).add(group);
+ pendingCompletionTaskGroups.computeIfAbsent(groupId, k -> new
CopyOnWriteArrayList<>()).add(group);
// set endOffsets as the next startOffsets
for (Map.Entry<Integer, Long> entry : endOffsets.entrySet()) {
@@ -1432,7 +1430,6 @@ public class KafkaSupervisor implements Supervisor
partitionGroups.get(groupId).replaceAll((partition, offset) ->
NOT_SET);
}
- sequenceTaskGroup.remove(generateSequenceName(group));
// remove this task group from the list of current task groups now that
it has been handled
taskGroups.remove(groupId);
}
@@ -1456,7 +1453,8 @@ public class KafkaSupervisor implements Supervisor
// metadata store (which will have advanced if we succeeded in
publishing and will remain the same if publishing
// failed and we need to re-ingest)
return Futures.transform(
- stopTasksInGroup(taskGroup), new Function<Object, Map<Integer,
Long>>()
+ stopTasksInGroup(taskGroup),
+ new Function<Object, Map<Integer, Long>>()
{
@Nullable
@Override
@@ -1625,15 +1623,15 @@ public class KafkaSupervisor implements Supervisor
log.warn("All tasks in group [%d] failed to publish, killing all
tasks for these partitions", groupId);
} else {
log.makeAlert(
- "No task in [%s] succeeded before the completion timeout
elapsed [%s]!",
+ "No task in [%s] for taskGroup [%d] succeeded before the
completion timeout elapsed [%s]!",
group.taskIds(),
+ groupId,
ioConfig.getCompletionTimeout()
).emit();
}
// reset partitions offsets for this task group so that they will be
re-read from metadata storage
partitionGroups.get(groupId).replaceAll((partition, offset) ->
NOT_SET);
- sequenceTaskGroup.remove(generateSequenceName(group));
// kill all the tasks in this pending completion group
killTasksInGroup(group);
// set a flag so the other pending completion groups for this set of
partitions will also stop
@@ -1693,7 +1691,6 @@ public class KafkaSupervisor implements Supervisor
// be recreated with the next set of offsets
if (taskData.status.isSuccess()) {
futures.add(stopTasksInGroup(taskGroup));
- sequenceTaskGroup.remove(generateSequenceName(taskGroup));
iTaskGroups.remove();
break;
}
@@ -1735,7 +1732,6 @@ public class KafkaSupervisor implements Supervisor
groupId,
taskGroup
);
- sequenceTaskGroup.put(generateSequenceName(taskGroup),
taskGroups.get(groupId));
}
}
@@ -1778,6 +1774,7 @@ public class KafkaSupervisor implements Supervisor
DateTime maximumMessageTime =
taskGroups.get(groupId).maximumMessageTime.orNull();
KafkaIOConfig kafkaIOConfig = new KafkaIOConfig(
+ groupId,
sequenceName,
new KafkaPartitions(ioConfig.getTopic(), startPartitions),
new KafkaPartitions(ioConfig.getTopic(), endPartitions),
@@ -1944,7 +1941,7 @@ public class KafkaSupervisor implements Supervisor
}
}
- private ListenableFuture<?> stopTasksInGroup(TaskGroup taskGroup)
+ private ListenableFuture<?> stopTasksInGroup(@Nullable TaskGroup taskGroup)
{
if (taskGroup == null) {
return Futures.immediateFuture(null);
@@ -2289,6 +2286,28 @@ public class KafkaSupervisor implements Supervisor
return allStats;
}
+ @VisibleForTesting
+ @Nullable
+ TaskGroup removeTaskGroup(int taskGroupId)
+ {
+ return taskGroups.remove(taskGroupId);
+ }
+
+ @VisibleForTesting
+ void moveTaskGroupToPendingCompletion(int taskGroupId)
+ {
+ final TaskGroup taskGroup = taskGroups.remove(taskGroupId);
+ if (taskGroup != null) {
+ pendingCompletionTaskGroups.computeIfAbsent(taskGroupId, k -> new
CopyOnWriteArrayList<>()).add(taskGroup);
+ }
+ }
+
+ @VisibleForTesting
+ int getNoticesQueueSize()
+ {
+ return notices.size();
+ }
+
private static class StatsFromTaskResult
{
private final String groupId;
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIOConfigTest.java
b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIOConfigTest.java
index 3bc55e2..050dba7 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIOConfigTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIOConfigTest.java
@@ -50,6 +50,7 @@ public class KafkaIOConfigTest
{
String jsonStr = "{\n"
+ " \"type\": \"kafka\",\n"
+ + " \"taskGroupId\": 0,\n"
+ " \"baseSequenceName\": \"my-sequence-name\",\n"
+ " \"startPartitions\": {\"topic\":\"mytopic\",
\"partitionOffsetMap\" : {\"0\":1, \"1\":10}},\n"
+ " \"endPartitions\": {\"topic\":\"mytopic\",
\"partitionOffsetMap\" : {\"0\":15, \"1\":200}},\n"
@@ -82,6 +83,7 @@ public class KafkaIOConfigTest
{
String jsonStr = "{\n"
+ " \"type\": \"kafka\",\n"
+ + " \"taskGroupId\": 0,\n"
+ " \"baseSequenceName\": \"my-sequence-name\",\n"
+ " \"startPartitions\": {\"topic\":\"mytopic\",
\"partitionOffsetMap\" : {\"0\":1, \"1\":10}},\n"
+ " \"endPartitions\": {\"topic\":\"mytopic\",
\"partitionOffsetMap\" : {\"0\":15, \"1\":200}},\n"
@@ -118,6 +120,7 @@ public class KafkaIOConfigTest
{
String jsonStr = "{\n"
+ " \"type\": \"kafka\",\n"
+ + " \"taskGroupId\": 0,\n"
+ " \"startPartitions\": {\"topic\":\"mytopic\",
\"partitionOffsetMap\" : {\"0\":1, \"1\":10}},\n"
+ " \"endPartitions\": {\"topic\":\"mytopic\",
\"partitionOffsetMap\" : {\"0\":15, \"1\":200}},\n"
+ " \"consumerProperties\":
{\"bootstrap.servers\":\"localhost:9092\"},\n"
@@ -137,6 +140,7 @@ public class KafkaIOConfigTest
{
String jsonStr = "{\n"
+ " \"type\": \"kafka\",\n"
+ + " \"taskGroupId\": 0,\n"
+ " \"baseSequenceName\": \"my-sequence-name\",\n"
+ " \"endPartitions\": {\"topic\":\"mytopic\",
\"partitionOffsetMap\" : {\"0\":15, \"1\":200}},\n"
+ " \"consumerProperties\":
{\"bootstrap.servers\":\"localhost:9092\"},\n"
@@ -156,6 +160,7 @@ public class KafkaIOConfigTest
{
String jsonStr = "{\n"
+ " \"type\": \"kafka\",\n"
+ + " \"taskGroupId\": 0,\n"
+ " \"baseSequenceName\": \"my-sequence-name\",\n"
+ " \"startPartitions\": {\"topic\":\"mytopic\",
\"partitionOffsetMap\" : {\"0\":1, \"1\":10}},\n"
+ " \"consumerProperties\":
{\"bootstrap.servers\":\"localhost:9092\"},\n"
@@ -175,6 +180,7 @@ public class KafkaIOConfigTest
{
String jsonStr = "{\n"
+ " \"type\": \"kafka\",\n"
+ + " \"taskGroupId\": 0,\n"
+ " \"baseSequenceName\": \"my-sequence-name\",\n"
+ " \"startPartitions\": {\"topic\":\"mytopic\",
\"partitionOffsetMap\" : {\"0\":1, \"1\":10}},\n"
+ " \"endPartitions\": {\"topic\":\"mytopic\",
\"partitionOffsetMap\" : {\"0\":15, \"1\":200}},\n"
@@ -194,6 +200,7 @@ public class KafkaIOConfigTest
{
String jsonStr = "{\n"
+ " \"type\": \"kafka\",\n"
+ + " \"taskGroupId\": 0,\n"
+ " \"baseSequenceName\": \"my-sequence-name\",\n"
+ " \"startPartitions\": {\"topic\":\"mytopic\",
\"partitionOffsetMap\" : {\"0\":1, \"1\":10}},\n"
+ " \"endPartitions\": {\"topic\":\"other\",
\"partitionOffsetMap\" : {\"0\":15, \"1\":200}},\n"
@@ -214,6 +221,7 @@ public class KafkaIOConfigTest
{
String jsonStr = "{\n"
+ " \"type\": \"kafka\",\n"
+ + " \"taskGroupId\": 0,\n"
+ " \"baseSequenceName\": \"my-sequence-name\",\n"
+ " \"startPartitions\": {\"topic\":\"mytopic\",
\"partitionOffsetMap\" : {\"0\":1, \"1\":10}},\n"
+ " \"endPartitions\": {\"topic\":\"mytopic\",
\"partitionOffsetMap\" : {\"0\":15}},\n"
@@ -234,6 +242,7 @@ public class KafkaIOConfigTest
{
String jsonStr = "{\n"
+ " \"type\": \"kafka\",\n"
+ + " \"taskGroupId\": 0,\n"
+ " \"baseSequenceName\": \"my-sequence-name\",\n"
+ " \"startPartitions\": {\"topic\":\"mytopic\",
\"partitionOffsetMap\" : {\"0\":1, \"1\":10}},\n"
+ " \"endPartitions\": {\"topic\":\"mytopic\",
\"partitionOffsetMap\" : {\"0\":15, \"1\":2}},\n"
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java
b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java
index 15f4bb0..411fff9 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -262,21 +262,21 @@ public class KafkaIndexTaskTest
private static List<ProducerRecord<byte[], byte[]>> generateRecords(String
topic)
{
return ImmutableList.of(
- new ProducerRecord<byte[], byte[]>(topic, 0, null, JB("2008", "a",
"y", "10", "20.0", "1.0")),
- new ProducerRecord<byte[], byte[]>(topic, 0, null, JB("2009", "b",
"y", "10", "20.0", "1.0")),
- new ProducerRecord<byte[], byte[]>(topic, 0, null, JB("2010", "c",
"y", "10", "20.0", "1.0")),
- new ProducerRecord<byte[], byte[]>(topic, 0, null, JB("2011", "d",
"y", "10", "20.0", "1.0")),
- new ProducerRecord<byte[], byte[]>(topic, 0, null, JB("2011", "e",
"y", "10", "20.0", "1.0")),
- new ProducerRecord<byte[], byte[]>(topic, 0, null,
JB("246140482-04-24T15:36:27.903Z", "x", "z", "10", "20.0", "1.0")),
- new ProducerRecord<byte[], byte[]>(topic, 0, null,
StringUtils.toUtf8("unparseable")),
- new ProducerRecord<byte[], byte[]>(topic, 0, null,
StringUtils.toUtf8("unparseable2")),
- new ProducerRecord<byte[], byte[]>(topic, 0, null, null),
- new ProducerRecord<byte[], byte[]>(topic, 0, null, JB("2013", "f",
"y", "10", "20.0", "1.0")),
- new ProducerRecord<byte[], byte[]>(topic, 0, null, JB("2049", "f",
"y", "notanumber", "20.0", "1.0")),
- new ProducerRecord<byte[], byte[]>(topic, 0, null, JB("2049", "f",
"y", "10", "notanumber", "1.0")),
- new ProducerRecord<byte[], byte[]>(topic, 0, null, JB("2049", "f",
"y", "10", "20.0", "notanumber")),
- new ProducerRecord<byte[], byte[]>(topic, 1, null, JB("2012", "g",
"y", "10", "20.0", "1.0")),
- new ProducerRecord<byte[], byte[]>(topic, 1, null, JB("2011", "h",
"y", "10", "20.0", "1.0"))
+ new ProducerRecord<>(topic, 0, null, JB("2008", "a", "y", "10",
"20.0", "1.0")),
+ new ProducerRecord<>(topic, 0, null, JB("2009", "b", "y", "10",
"20.0", "1.0")),
+ new ProducerRecord<>(topic, 0, null, JB("2010", "c", "y", "10",
"20.0", "1.0")),
+ new ProducerRecord<>(topic, 0, null, JB("2011", "d", "y", "10",
"20.0", "1.0")),
+ new ProducerRecord<>(topic, 0, null, JB("2011", "e", "y", "10",
"20.0", "1.0")),
+ new ProducerRecord<>(topic, 0, null,
JB("246140482-04-24T15:36:27.903Z", "x", "z", "10", "20.0", "1.0")),
+ new ProducerRecord<>(topic, 0, null,
StringUtils.toUtf8("unparseable")),
+ new ProducerRecord<>(topic, 0, null,
StringUtils.toUtf8("unparseable2")),
+ new ProducerRecord<>(topic, 0, null, null),
+ new ProducerRecord<>(topic, 0, null, JB("2013", "f", "y", "10",
"20.0", "1.0")),
+ new ProducerRecord<>(topic, 0, null, JB("2049", "f", "y",
"notanumber", "20.0", "1.0")),
+ new ProducerRecord<>(topic, 0, null, JB("2049", "f", "y", "10",
"notanumber", "1.0")),
+ new ProducerRecord<>(topic, 0, null, JB("2049", "f", "y", "10",
"20.0", "notanumber")),
+ new ProducerRecord<>(topic, 1, null, JB("2012", "g", "y", "10",
"20.0", "1.0")),
+ new ProducerRecord<>(topic, 1, null, JB("2011", "h", "y", "10",
"20.0", "1.0"))
);
}
@@ -377,6 +377,7 @@ public class KafkaIndexTaskTest
final KafkaIndexTask task = createTask(
null,
new KafkaIOConfig(
+ 0,
"sequence0",
new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
@@ -418,6 +419,7 @@ public class KafkaIndexTaskTest
final KafkaIndexTask task = createTask(
null,
new KafkaIOConfig(
+ 0,
"sequence0",
new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
@@ -493,6 +495,7 @@ public class KafkaIndexTaskTest
final KafkaIndexTask task = createTask(
null,
new KafkaIOConfig(
+ 0,
baseSequenceName,
startPartitions,
endPartitions,
@@ -514,14 +517,16 @@ public class KafkaIndexTaskTest
Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
Assert.assertEquals(1, checkpointRequestsHash.size());
- Assert.assertTrue(checkpointRequestsHash.contains(
- Objects.hash(
- DATA_SCHEMA.getDataSource(),
- baseSequenceName,
- new KafkaDataSourceMetadata(startPartitions),
- new KafkaDataSourceMetadata(new KafkaPartitions(topic,
currentOffsets))
+ Assert.assertTrue(
+ checkpointRequestsHash.contains(
+ Objects.hash(
+ DATA_SCHEMA.getDataSource(),
+ 0,
+ new KafkaDataSourceMetadata(startPartitions),
+ new KafkaDataSourceMetadata(new KafkaPartitions(topic,
currentOffsets))
+ )
)
- ));
+ );
// Check metrics
Assert.assertEquals(8,
task.getRunner().getRowIngestionMeters().getProcessed());
@@ -581,6 +586,7 @@ public class KafkaIndexTaskTest
final KafkaIndexTask task = createTask(
null,
new KafkaIOConfig(
+ 0,
baseSequenceName,
startPartitions,
endPartitions,
@@ -603,14 +609,16 @@ public class KafkaIndexTaskTest
Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
Assert.assertEquals(1, checkpointRequestsHash.size());
- Assert.assertTrue(checkpointRequestsHash.contains(
- Objects.hash(
- DATA_SCHEMA.getDataSource(),
- baseSequenceName,
- new KafkaDataSourceMetadata(startPartitions),
- new KafkaDataSourceMetadata(new KafkaPartitions(topic,
checkpoint.getPartitionOffsetMap()))
+ Assert.assertTrue(
+ checkpointRequestsHash.contains(
+ Objects.hash(
+ DATA_SCHEMA.getDataSource(),
+ 0,
+ new KafkaDataSourceMetadata(startPartitions),
+ new KafkaDataSourceMetadata(new KafkaPartitions(topic,
checkpoint.getPartitionOffsetMap()))
+ )
)
- ));
+ );
// Check metrics
Assert.assertEquals(2,
task.getRunner().getRowIngestionMeters().getProcessed());
@@ -637,6 +645,7 @@ public class KafkaIndexTaskTest
final KafkaIndexTask task = createTask(
null,
new KafkaIOConfig(
+ 0,
"sequence0",
new KafkaPartitions(topic, ImmutableMap.of(0, 0L)),
new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
@@ -690,6 +699,7 @@ public class KafkaIndexTaskTest
final KafkaIndexTask task = createTask(
null,
new KafkaIOConfig(
+ 0,
"sequence0",
new KafkaPartitions(topic, ImmutableMap.of(0, 0L)),
new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
@@ -753,6 +763,7 @@ public class KafkaIndexTaskTest
)
),
new KafkaIOConfig(
+ 0,
"sequence0",
new KafkaPartitions(topic, ImmutableMap.of(0, 0L)),
new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
@@ -812,6 +823,7 @@ public class KafkaIndexTaskTest
final KafkaIndexTask task = createTask(
null,
new KafkaIOConfig(
+ 0,
"sequence0",
new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
@@ -852,6 +864,7 @@ public class KafkaIndexTaskTest
final KafkaIndexTask task = createTask(
null,
new KafkaIOConfig(
+ 0,
"sequence0",
new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
@@ -903,6 +916,7 @@ public class KafkaIndexTaskTest
final KafkaIndexTask task = createTask(
null,
new KafkaIOConfig(
+ 0,
"sequence0",
new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
@@ -957,6 +971,7 @@ public class KafkaIndexTaskTest
final KafkaIndexTask task = createTask(
null,
new KafkaIOConfig(
+ 0,
"sequence0",
new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
new KafkaPartitions(topic, ImmutableMap.of(0, 7L)),
@@ -1000,6 +1015,7 @@ public class KafkaIndexTaskTest
final KafkaIndexTask task = createTask(
null,
new KafkaIOConfig(
+ 0,
"sequence0",
new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
new KafkaPartitions(topic, ImmutableMap.of(0, 13L)),
@@ -1081,6 +1097,7 @@ public class KafkaIndexTaskTest
final KafkaIndexTask task = createTask(
null,
new KafkaIOConfig(
+ 0,
"sequence0",
new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
new KafkaPartitions(topic, ImmutableMap.of(0, 10L)),
@@ -1140,6 +1157,7 @@ public class KafkaIndexTaskTest
final KafkaIndexTask task1 = createTask(
null,
new KafkaIOConfig(
+ 0,
"sequence0",
new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
@@ -1153,6 +1171,7 @@ public class KafkaIndexTaskTest
final KafkaIndexTask task2 = createTask(
null,
new KafkaIOConfig(
+ 0,
"sequence0",
new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
@@ -1206,6 +1225,7 @@ public class KafkaIndexTaskTest
final KafkaIndexTask task1 = createTask(
null,
new KafkaIOConfig(
+ 0,
"sequence0",
new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
@@ -1219,6 +1239,7 @@ public class KafkaIndexTaskTest
final KafkaIndexTask task2 = createTask(
null,
new KafkaIOConfig(
+ 1,
"sequence1",
new KafkaPartitions(topic, ImmutableMap.of(0, 3L)),
new KafkaPartitions(topic, ImmutableMap.of(0, 10L)),
@@ -1273,6 +1294,7 @@ public class KafkaIndexTaskTest
final KafkaIndexTask task1 = createTask(
null,
new KafkaIOConfig(
+ 0,
"sequence0",
new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
@@ -1286,6 +1308,7 @@ public class KafkaIndexTaskTest
final KafkaIndexTask task2 = createTask(
null,
new KafkaIOConfig(
+ 1,
"sequence1",
new KafkaPartitions(topic, ImmutableMap.of(0, 3L)),
new KafkaPartitions(topic, ImmutableMap.of(0, 10L)),
@@ -1345,6 +1368,7 @@ public class KafkaIndexTaskTest
final KafkaIndexTask task = createTask(
null,
new KafkaIOConfig(
+ 0,
"sequence0",
new KafkaPartitions(topic, ImmutableMap.of(0, 2L, 1, 0L)),
new KafkaPartitions(topic, ImmutableMap.of(0, 5L, 1, 2L)),
@@ -1409,6 +1433,7 @@ public class KafkaIndexTaskTest
final KafkaIndexTask task1 = createTask(
null,
new KafkaIOConfig(
+ 0,
"sequence0",
new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
@@ -1422,6 +1447,7 @@ public class KafkaIndexTaskTest
final KafkaIndexTask task2 = createTask(
null,
new KafkaIOConfig(
+ 1,
"sequence1",
new KafkaPartitions(topic, ImmutableMap.of(1, 0L)),
new KafkaPartitions(topic, ImmutableMap.of(1, 1L)),
@@ -1477,6 +1503,7 @@ public class KafkaIndexTaskTest
final KafkaIndexTask task1 = createTask(
null,
new KafkaIOConfig(
+ 0,
"sequence0",
new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
@@ -1513,6 +1540,7 @@ public class KafkaIndexTaskTest
final KafkaIndexTask task2 = createTask(
task1.getId(),
new KafkaIOConfig(
+ 0,
"sequence0",
new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
@@ -1564,6 +1592,7 @@ public class KafkaIndexTaskTest
final KafkaIndexTask task = createTask(
null,
new KafkaIOConfig(
+ 0,
"sequence0",
new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
@@ -1647,6 +1676,7 @@ public class KafkaIndexTaskTest
final KafkaIndexTask task = createTask(
null,
new KafkaIOConfig(
+ 0,
"sequence0",
new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
@@ -1685,6 +1715,7 @@ public class KafkaIndexTaskTest
final KafkaIndexTask task = createTask(
null,
new KafkaIOConfig(
+ 0,
"sequence0",
new KafkaPartitions(topic, ImmutableMap.of(0, 200L)),
new KafkaPartitions(topic, ImmutableMap.of(0, 500L)),
@@ -1737,6 +1768,7 @@ public class KafkaIndexTaskTest
final KafkaIndexTask task = createTask(
null,
new KafkaIOConfig(
+ 0,
"sequence0",
// task should ignore these and use sequence info sent in the
context
new KafkaPartitions(topic, ImmutableMap.of(0, 0L)),
@@ -2026,18 +2058,20 @@ public class KafkaIndexTaskTest
@Override
public boolean checkPointDataSourceMetadata(
String supervisorId,
- @Nullable String sequenceName,
+ int taskGroupId,
@Nullable DataSourceMetadata previousDataSourceMetadata,
@Nullable DataSourceMetadata currentDataSourceMetadata
)
{
log.info("Adding checkpoint hash to the set");
- checkpointRequestsHash.add(Objects.hash(
- supervisorId,
- sequenceName,
- previousDataSourceMetadata,
- currentDataSourceMetadata
- ));
+ checkpointRequestsHash.add(
+ Objects.hash(
+ supervisorId,
+ taskGroupId,
+ previousDataSourceMetadata,
+ currentDataSourceMetadata
+ )
+ );
return true;
}
}
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index 86648fe..f0f6033 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -19,6 +19,7 @@
package io.druid.indexing.kafka.supervisor;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
@@ -61,6 +62,7 @@ import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.granularity.Granularities;
import io.druid.java.util.common.parsers.JSONPathFieldSpec;
import io.druid.java.util.common.parsers.JSONPathSpec;
+import io.druid.java.util.emitter.EmittingLogger;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.segment.TestHelper;
@@ -70,6 +72,7 @@ import
io.druid.segment.indexing.granularity.UniformGranularitySpec;
import io.druid.segment.realtime.FireDepartment;
import io.druid.server.metrics.DruidMonitorSchedulerConfig;
import io.druid.server.metrics.NoopServiceEmitter;
+import io.druid.server.metrics.ExceptionCapturingServiceEmitter;
import org.apache.curator.test.TestingCluster;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
@@ -99,7 +102,9 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
+import java.util.concurrent.TimeoutException;
import static org.easymock.EasyMock.anyBoolean;
import static org.easymock.EasyMock.anyObject;
@@ -141,6 +146,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
private TaskQueue taskQueue;
private String topic;
private RowIngestionMetersFactory rowIngestionMetersFactory;
+ private ExceptionCapturingServiceEmitter serviceEmitter;
private static String getTopic()
{
@@ -213,6 +219,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
topic = getTopic();
rowIngestionMetersFactory = new TestUtils().getRowIngestionMetersFactory();
+ serviceEmitter = new ExceptionCapturingServiceEmitter();
+ EmittingLogger.registerEmitter(serviceEmitter);
}
@After
@@ -553,7 +561,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
Task id1 = createKafkaIndexTask(
"id1",
DATASOURCE,
- "index_kafka_testDS__some_other_sequenceName",
+ 1,
new KafkaPartitions("topic", ImmutableMap.of(0, 0L)),
new KafkaPartitions("topic", ImmutableMap.of(0, 10L)),
null,
@@ -564,7 +572,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
Task id2 = createKafkaIndexTask(
"id2",
DATASOURCE,
- "sequenceName-0",
+ 0,
new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 1, 0L, 2, 0L)),
new KafkaPartitions("topic", ImmutableMap.of(0, 333L, 1, 333L, 2,
333L)),
null,
@@ -575,7 +583,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
Task id3 = createKafkaIndexTask(
"id3",
DATASOURCE,
- "index_kafka_testDS__some_other_sequenceName",
+ 1,
new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 1, 0L, 2, 1L)),
new KafkaPartitions("topic", ImmutableMap.of(0, 333L, 1, 333L, 2,
330L)),
null,
@@ -586,7 +594,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
Task id4 = createKafkaIndexTask(
"id4",
"other-datasource",
- "index_kafka_testDS_d927edff33c4b3f",
+ 2,
new KafkaPartitions("topic", ImmutableMap.of(0, 0L)),
new KafkaPartitions("topic", ImmutableMap.of(0, 10L)),
null,
@@ -634,7 +642,9 @@ public class KafkaSupervisorTest extends EasyMockSupport
TreeMap<Integer, Map<Integer, Long>> checkpoints = new TreeMap<>();
checkpoints.put(0, ImmutableMap.of(0, 0L, 1, 0L, 2, 0L));
- expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"),
anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(2);
+ expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"),
anyBoolean()))
+ .andReturn(Futures.immediateFuture(checkpoints))
+ .times(2);
replayAll();
@@ -652,7 +662,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
Task id1 = createKafkaIndexTask(
"id1",
DATASOURCE,
- "sequenceName-0",
+ 0,
new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 2, 0L)),
new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 2,
Long.MAX_VALUE)),
null,
@@ -661,7 +671,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
Task id2 = createKafkaIndexTask(
"id2",
DATASOURCE,
- "sequenceName-1",
+ 1,
new KafkaPartitions("topic", ImmutableMap.of(1, 0L)),
new KafkaPartitions("topic", ImmutableMap.of(1, Long.MAX_VALUE)),
null,
@@ -670,7 +680,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
Task id3 = createKafkaIndexTask(
"id3",
DATASOURCE,
- "sequenceName-0",
+ 0,
new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 1, 0L, 2, 0L)),
new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1,
Long.MAX_VALUE, 2, Long.MAX_VALUE)),
null,
@@ -679,7 +689,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
Task id4 = createKafkaIndexTask(
"id4",
DATASOURCE,
- "sequenceName-0",
+ 0,
new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 1, 0L)),
new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1,
Long.MAX_VALUE)),
null,
@@ -688,7 +698,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
Task id5 = createKafkaIndexTask(
"id5",
DATASOURCE,
- "sequenceName-0",
+ 0,
new KafkaPartitions("topic", ImmutableMap.of(1, 0L, 2, 0L)),
new KafkaPartitions("topic", ImmutableMap.of(1, Long.MAX_VALUE, 2,
Long.MAX_VALUE)),
null,
@@ -727,8 +737,12 @@ public class KafkaSupervisorTest extends EasyMockSupport
checkpoints1.put(0, ImmutableMap.of(0, 0L, 2, 0L));
TreeMap<Integer, Map<Integer, Long>> checkpoints2 = new TreeMap<>();
checkpoints2.put(0, ImmutableMap.of(1, 0L));
- expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"),
anyBoolean())).andReturn(Futures.immediateFuture(checkpoints1)).times(1);
- expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"),
anyBoolean())).andReturn(Futures.immediateFuture(checkpoints2)).times(1);
+ expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"),
anyBoolean()))
+ .andReturn(Futures.immediateFuture(checkpoints1))
+ .times(1);
+ expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"),
anyBoolean()))
+ .andReturn(Futures.immediateFuture(checkpoints2))
+ .times(1);
taskRunner.registerListener(anyObject(TaskRunnerListener.class),
anyObject(Executor.class));
taskQueue.shutdown("id4");
@@ -765,10 +779,12 @@ public class KafkaSupervisorTest extends EasyMockSupport
checkpoints1.put(0, ImmutableMap.of(0, 0L, 2, 0L));
TreeMap<Integer, Map<Integer, Long>> checkpoints2 = new TreeMap<>();
checkpoints2.put(0, ImmutableMap.of(1, 0L));
- expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"),
anyBoolean())).andReturn(Futures.immediateFuture(checkpoints1))
-
.anyTimes();
- expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"),
anyBoolean())).andReturn(Futures.immediateFuture(checkpoints2))
-
.anyTimes();
+ expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"),
anyBoolean()))
+ .andReturn(Futures.immediateFuture(checkpoints1))
+ .anyTimes();
+ expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"),
anyBoolean()))
+ .andReturn(Futures.immediateFuture(checkpoints2))
+ .anyTimes();
taskRunner.registerListener(anyObject(TaskRunnerListener.class),
anyObject(Executor.class));
replayAll();
@@ -830,7 +846,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
Task id1 = createKafkaIndexTask(
"id1",
DATASOURCE,
- "sequenceName-0",
+ 0,
new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 2, 0L)),
new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 2,
Long.MAX_VALUE)),
now,
@@ -857,7 +873,9 @@ public class KafkaSupervisorTest extends EasyMockSupport
TreeMap<Integer, Map<Integer, Long>> checkpoints = new TreeMap<>();
checkpoints.put(0, ImmutableMap.of(0, 0L, 2, 0L));
- expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"),
anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(2);
+ expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"),
anyBoolean()))
+ .andReturn(Futures.immediateFuture(checkpoints))
+ .times(2);
taskRunner.registerListener(anyObject(TaskRunnerListener.class),
anyObject(Executor.class));
replayAll();
@@ -878,9 +896,12 @@ public class KafkaSupervisorTest extends EasyMockSupport
reset(taskClient);
// for the newly created replica task
- expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"),
anyBoolean())).andReturn(Futures.immediateFuture(checkpoints))
-
.times(2);
- expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"),
anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(1);
+ expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"),
anyBoolean()))
+ .andReturn(Futures.immediateFuture(checkpoints))
+ .times(2);
+ expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"),
anyBoolean()))
+ .andReturn(Futures.immediateFuture(checkpoints))
+ .times(1);
expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of(captured.getValue())).anyTimes();
expect(taskStorage.getStatus(iHaveFailed.getId())).andReturn(Optional.of(TaskStatus.failure(iHaveFailed.getId())));
@@ -953,10 +974,12 @@ public class KafkaSupervisorTest extends EasyMockSupport
TreeMap<Integer, Map<Integer, Long>> checkpoints2 = new TreeMap<>();
checkpoints2.put(0, ImmutableMap.of(1, 0L));
// there would be 4 tasks, 2 for each task group
- expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"),
anyBoolean())).andReturn(Futures.immediateFuture(checkpoints1))
-
.times(2);
- expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"),
anyBoolean())).andReturn(Futures.immediateFuture(checkpoints2))
-
.times(2);
+ expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"),
anyBoolean()))
+ .andReturn(Futures.immediateFuture(checkpoints1))
+ .times(2);
+ expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"),
anyBoolean()))
+ .andReturn(Futures.immediateFuture(checkpoints2))
+ .times(2);
expect(taskStorage.getActiveTasks()).andReturn(tasks).anyTimes();
for (Task task : tasks) {
@@ -1063,10 +1086,12 @@ public class KafkaSupervisorTest extends EasyMockSupport
checkpoints1.put(0, ImmutableMap.of(0, 0L, 2, 0L));
TreeMap<Integer, Map<Integer, Long>> checkpoints2 = new TreeMap<>();
checkpoints2.put(0, ImmutableMap.of(1, 0L));
- expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"),
anyBoolean())).andReturn(Futures.immediateFuture(checkpoints1))
-
.times(2);
- expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"),
anyBoolean())).andReturn(Futures.immediateFuture(checkpoints2))
-
.times(2);
+ expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"),
anyBoolean()))
+ .andReturn(Futures.immediateFuture(checkpoints1))
+ .times(2);
+ expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"),
anyBoolean()))
+ .andReturn(Futures.immediateFuture(checkpoints2))
+ .times(2);
replay(taskStorage, taskRunner, taskClient, taskQueue);
@@ -1100,7 +1125,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
Task task = createKafkaIndexTask(
"id1",
DATASOURCE,
- "sequenceName-0",
+ 0,
new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 1, 0L, 2, 0L)),
new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1,
Long.MAX_VALUE, 2, Long.MAX_VALUE)),
null,
@@ -1192,7 +1217,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
Task task = createKafkaIndexTask(
"id1",
DATASOURCE,
- "sequenceName-0",
+ 0,
new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 2, 0L)),
new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 2,
Long.MAX_VALUE)),
null,
@@ -1282,7 +1307,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
Task id1 = createKafkaIndexTask(
"id1",
DATASOURCE,
- "sequenceName-0",
+ 0,
new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 1, 0L, 2, 0L)),
new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1,
Long.MAX_VALUE, 2, Long.MAX_VALUE)),
null,
@@ -1292,7 +1317,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
Task id2 = createKafkaIndexTask(
"id2",
DATASOURCE,
- "sequenceName-0",
+ 0,
new KafkaPartitions("topic", ImmutableMap.of(0, 1L, 1, 2L, 2, 3L)),
new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1,
Long.MAX_VALUE, 2, Long.MAX_VALUE)),
null,
@@ -1330,7 +1355,9 @@ public class KafkaSupervisorTest extends EasyMockSupport
// since id1 is publishing, so getCheckpoints wouldn't be called for it
TreeMap<Integer, Map<Integer, Long>> checkpoints = new TreeMap<>();
checkpoints.put(0, ImmutableMap.of(0, 1L, 1, 2L, 2, 3L));
- expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"),
anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(1);
+ expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"),
anyBoolean()))
+ .andReturn(Futures.immediateFuture(checkpoints))
+ .times(1);
replayAll();
@@ -1404,10 +1431,12 @@ public class KafkaSupervisorTest extends EasyMockSupport
checkpoints1.put(0, ImmutableMap.of(0, 0L, 2, 0L));
TreeMap<Integer, Map<Integer, Long>> checkpoints2 = new TreeMap<>();
checkpoints2.put(0, ImmutableMap.of(1, 0L));
- expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"),
anyBoolean())).andReturn(Futures.immediateFuture(checkpoints1))
-
.times(2);
- expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"),
anyBoolean())).andReturn(Futures.immediateFuture(checkpoints2))
-
.times(2);
+ expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"),
anyBoolean()))
+ .andReturn(Futures.immediateFuture(checkpoints1))
+ .times(2);
+ expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"),
anyBoolean()))
+ .andReturn(Futures.immediateFuture(checkpoints2))
+ .times(2);
expect(taskStorage.getActiveTasks()).andReturn(tasks).anyTimes();
for (Task task : tasks) {
@@ -1463,10 +1492,12 @@ public class KafkaSupervisorTest extends EasyMockSupport
checkpoints1.put(0, ImmutableMap.of(0, 0L, 2, 0L));
TreeMap<Integer, Map<Integer, Long>> checkpoints2 = new TreeMap<>();
checkpoints2.put(0, ImmutableMap.of(1, 0L));
- expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"),
anyBoolean())).andReturn(Futures.immediateFuture(checkpoints1))
-
.times(2);
- expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"),
anyBoolean())).andReturn(Futures.immediateFuture(checkpoints2))
-
.times(2);
+ expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"),
anyBoolean()))
+ .andReturn(Futures.immediateFuture(checkpoints1))
+ .times(2);
+ expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"),
anyBoolean()))
+ .andReturn(Futures.immediateFuture(checkpoints2))
+ .times(2);
captured = Capture.newInstance(CaptureType.ALL);
expect(taskStorage.getActiveTasks()).andReturn(tasks).anyTimes();
@@ -1540,10 +1571,12 @@ public class KafkaSupervisorTest extends EasyMockSupport
checkpoints1.put(0, ImmutableMap.of(0, 0L, 2, 0L));
TreeMap<Integer, Map<Integer, Long>> checkpoints2 = new TreeMap<>();
checkpoints2.put(0, ImmutableMap.of(1, 0L));
- expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"),
anyBoolean())).andReturn(Futures.immediateFuture(checkpoints1))
-
.times(2);
- expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"),
anyBoolean())).andReturn(Futures.immediateFuture(checkpoints2))
-
.times(2);
+ expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"),
anyBoolean()))
+ .andReturn(Futures.immediateFuture(checkpoints1))
+ .times(2);
+ expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"),
anyBoolean()))
+ .andReturn(Futures.immediateFuture(checkpoints2))
+ .times(2);
captured = Capture.newInstance(CaptureType.ALL);
expect(taskStorage.getActiveTasks()).andReturn(tasks).anyTimes();
@@ -1622,7 +1655,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
Task id1 = createKafkaIndexTask(
"id1",
DATASOURCE,
- "sequenceName-0",
+ 0,
new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 1, 0L, 2, 0L)),
new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1,
Long.MAX_VALUE, 2, Long.MAX_VALUE)),
null,
@@ -1632,7 +1665,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
Task id2 = createKafkaIndexTask(
"id2",
DATASOURCE,
- "sequenceName-0",
+ 0,
new KafkaPartitions("topic", ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)),
new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1,
Long.MAX_VALUE, 2, Long.MAX_VALUE)),
null,
@@ -1642,7 +1675,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
Task id3 = createKafkaIndexTask(
"id3",
DATASOURCE,
- "sequenceName-0",
+ 0,
new KafkaPartitions("topic", ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)),
new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1,
Long.MAX_VALUE, 2, Long.MAX_VALUE)),
null,
@@ -1678,8 +1711,12 @@ public class KafkaSupervisorTest extends EasyMockSupport
// getCheckpoints will not be called for id1 as it is in publishing state
TreeMap<Integer, Map<Integer, Long>> checkpoints = new TreeMap<>();
checkpoints.put(0, ImmutableMap.of(0, 10L, 1, 20L, 2, 30L));
- expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"),
anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(1);
- expect(taskClient.getCheckpointsAsync(EasyMock.contains("id3"),
anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(1);
+ expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"),
anyBoolean()))
+ .andReturn(Futures.immediateFuture(checkpoints))
+ .times(1);
+ expect(taskClient.getCheckpointsAsync(EasyMock.contains("id3"),
anyBoolean()))
+ .andReturn(Futures.immediateFuture(checkpoints))
+ .times(1);
taskRunner.registerListener(anyObject(TaskRunnerListener.class),
anyObject(Executor.class));
replayAll();
@@ -1824,7 +1861,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
Task id1 = createKafkaIndexTask(
"id1",
DATASOURCE,
- "sequenceName-0",
+ 0,
new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 1, 0L, 2, 0L)),
new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1,
Long.MAX_VALUE, 2, Long.MAX_VALUE)),
null,
@@ -1834,7 +1871,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
Task id2 = createKafkaIndexTask(
"id2",
DATASOURCE,
- "sequenceName-0",
+ 0,
new KafkaPartitions("topic", ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)),
new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1,
Long.MAX_VALUE, 2, Long.MAX_VALUE)),
null,
@@ -1844,7 +1881,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
Task id3 = createKafkaIndexTask(
"id3",
DATASOURCE,
- "sequenceName-0",
+ 0,
new KafkaPartitions("topic", ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)),
new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1,
Long.MAX_VALUE, 2, Long.MAX_VALUE)),
null,
@@ -1879,8 +1916,12 @@ public class KafkaSupervisorTest extends EasyMockSupport
TreeMap<Integer, Map<Integer, Long>> checkpoints = new TreeMap<>();
checkpoints.put(0, ImmutableMap.of(0, 10L, 1, 20L, 2, 30L));
- expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"),
anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(1);
- expect(taskClient.getCheckpointsAsync(EasyMock.contains("id3"),
anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(1);
+ expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"),
anyBoolean()))
+ .andReturn(Futures.immediateFuture(checkpoints))
+ .times(1);
+ expect(taskClient.getCheckpointsAsync(EasyMock.contains("id3"),
anyBoolean()))
+ .andReturn(Futures.immediateFuture(checkpoints))
+ .times(1);
taskRunner.registerListener(anyObject(TaskRunnerListener.class),
anyObject(Executor.class));
replayAll();
@@ -1908,7 +1949,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
Task id1 = createKafkaIndexTask(
"id1",
DATASOURCE,
- "sequenceName-0",
+ 0,
new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 1, 0L, 2, 0L)),
new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1,
Long.MAX_VALUE, 2, Long.MAX_VALUE)),
null,
@@ -1918,7 +1959,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
Task id2 = createKafkaIndexTask(
"id2",
DATASOURCE,
- "sequenceName-0",
+ 0,
new KafkaPartitions("topic", ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)),
new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1,
Long.MAX_VALUE, 2, Long.MAX_VALUE)),
null,
@@ -1928,7 +1969,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
Task id3 = createKafkaIndexTask(
"id3",
DATASOURCE,
- "sequenceName-0",
+ 0,
new KafkaPartitions("topic", ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)),
new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1,
Long.MAX_VALUE, 2, Long.MAX_VALUE)),
null,
@@ -1958,9 +1999,15 @@ public class KafkaSupervisorTest extends EasyMockSupport
TreeMap<Integer, Map<Integer, Long>> checkpoints = new TreeMap<>();
checkpoints.put(0, ImmutableMap.of(0, 10L, 1, 20L, 2, 30L));
- expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"),
anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(1);
- expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"),
anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(1);
- expect(taskClient.getCheckpointsAsync(EasyMock.contains("id3"),
anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(1);
+ expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"),
anyBoolean()))
+ .andReturn(Futures.immediateFuture(checkpoints))
+ .times(1);
+ expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"),
anyBoolean()))
+ .andReturn(Futures.immediateFuture(checkpoints))
+ .times(1);
+ expect(taskClient.getCheckpointsAsync(EasyMock.contains("id3"),
anyBoolean()))
+ .andReturn(Futures.immediateFuture(checkpoints))
+ .times(1);
taskRunner.registerListener(anyObject(TaskRunnerListener.class),
anyObject(Executor.class));
replayAll();
@@ -1980,6 +2027,172 @@ public class KafkaSupervisorTest extends EasyMockSupport
verifyAll();
}
+ @Test(timeout = 60_000L)
+ public void testCheckpointForInactiveTaskGroup()
+ throws InterruptedException, ExecutionException, TimeoutException,
JsonProcessingException
+ {
+ supervisor = getSupervisor(2, 1, true, "PT1S", null, null, false);
+ //not adding any events
+ final Task id1 = createKafkaIndexTask(
+ "id1",
+ DATASOURCE,
+ 0,
+ new KafkaPartitions(topic, ImmutableMap.of(0, 0L, 1, 0L, 2, 0L)),
+ new KafkaPartitions(topic, ImmutableMap.of(0, Long.MAX_VALUE, 1,
Long.MAX_VALUE, 2, Long.MAX_VALUE)),
+ null,
+ null
+ );
+
+ final Task id2 = createKafkaIndexTask(
+ "id2",
+ DATASOURCE,
+ 0,
+ new KafkaPartitions(topic, ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)),
+ new KafkaPartitions(topic, ImmutableMap.of(0, Long.MAX_VALUE, 1,
Long.MAX_VALUE, 2, Long.MAX_VALUE)),
+ null,
+ null
+ );
+
+ final Task id3 = createKafkaIndexTask(
+ "id3",
+ DATASOURCE,
+ 0,
+ new KafkaPartitions(topic, ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)),
+ new KafkaPartitions(topic, ImmutableMap.of(0, Long.MAX_VALUE, 1,
Long.MAX_VALUE, 2, Long.MAX_VALUE)),
+ null,
+ null
+ );
+
+
expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
+
expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
+ expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of(id1, id2,
id3)).anyTimes();
+
expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes();
+
expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes();
+
expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes();
+ expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes();
+ expect(taskStorage.getTask("id2")).andReturn(Optional.of(id2)).anyTimes();
+ expect(taskStorage.getTask("id3")).andReturn(Optional.of(id3)).anyTimes();
+ expect(
+
indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn(new
KafkaDataSourceMetadata(null)
+ ).anyTimes();
+
expect(taskClient.getStatusAsync("id1")).andReturn(Futures.immediateFuture(KafkaIndexTask.Status.READING));
+
expect(taskClient.getStatusAsync("id2")).andReturn(Futures.immediateFuture(KafkaIndexTask.Status.READING));
+
expect(taskClient.getStatusAsync("id3")).andReturn(Futures.immediateFuture(KafkaIndexTask.Status.READING));
+
+ final DateTime startTime = DateTimes.nowUtc();
+
expect(taskClient.getStartTimeAsync("id1")).andReturn(Futures.immediateFuture(startTime));
+
expect(taskClient.getStartTimeAsync("id2")).andReturn(Futures.immediateFuture(startTime));
+
expect(taskClient.getStartTimeAsync("id3")).andReturn(Futures.immediateFuture(startTime));
+
+ final TreeMap<Integer, Map<Integer, Long>> checkpoints = new TreeMap<>();
+ checkpoints.put(0, ImmutableMap.of(0, 10L, 1, 20L, 2, 30L));
+ expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"),
anyBoolean()))
+ .andReturn(Futures.immediateFuture(checkpoints))
+ .times(1);
+ expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"),
anyBoolean()))
+ .andReturn(Futures.immediateFuture(checkpoints))
+ .times(1);
+ expect(taskClient.getCheckpointsAsync(EasyMock.contains("id3"),
anyBoolean()))
+ .andReturn(Futures.immediateFuture(checkpoints))
+ .times(1);
+
+ taskRunner.registerListener(anyObject(TaskRunnerListener.class),
anyObject(Executor.class));
+ replayAll();
+
+ supervisor.start();
+ supervisor.runInternal();
+
+ final Map<Integer, Long> fakeCheckpoints = Collections.emptyMap();
+ supervisor.moveTaskGroupToPendingCompletion(0);
+ supervisor.checkpoint(
+ 0,
+ new KafkaDataSourceMetadata(new KafkaPartitions(topic,
checkpoints.get(0))),
+ new KafkaDataSourceMetadata(new KafkaPartitions(topic,
fakeCheckpoints))
+ );
+
+ while (supervisor.getNoticesQueueSize() > 0) {
+ Thread.sleep(100);
+ }
+
+ verifyAll();
+
+ Assert.assertNull(serviceEmitter.getStackTrace());
+ Assert.assertNull(serviceEmitter.getExceptionMessage());
+ Assert.assertNull(serviceEmitter.getExceptionClass());
+ }
+
+ @Test(timeout = 60_000L)
+ public void testCheckpointForUnknownTaskGroup() throws InterruptedException
+ {
+ supervisor = getSupervisor(2, 1, true, "PT1S", null, null, false);
+ //not adding any events
+ final Task id1 = createKafkaIndexTask(
+ "id1",
+ DATASOURCE,
+ 0,
+ new KafkaPartitions(topic, ImmutableMap.of(0, 0L, 1, 0L, 2, 0L)),
+ new KafkaPartitions(topic, ImmutableMap.of(0, Long.MAX_VALUE, 1,
Long.MAX_VALUE, 2, Long.MAX_VALUE)),
+ null,
+ null
+ );
+
+ final Task id2 = createKafkaIndexTask(
+ "id2",
+ DATASOURCE,
+ 0,
+ new KafkaPartitions(topic, ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)),
+ new KafkaPartitions(topic, ImmutableMap.of(0, Long.MAX_VALUE, 1,
Long.MAX_VALUE, 2, Long.MAX_VALUE)),
+ null,
+ null
+ );
+
+ final Task id3 = createKafkaIndexTask(
+ "id3",
+ DATASOURCE,
+ 0,
+ new KafkaPartitions(topic, ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)),
+ new KafkaPartitions(topic, ImmutableMap.of(0, Long.MAX_VALUE, 1,
Long.MAX_VALUE, 2, Long.MAX_VALUE)),
+ null,
+ null
+ );
+
+
expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
+
expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
+ expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of(id1, id2,
id3)).anyTimes();
+
expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes();
+
expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes();
+
expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes();
+ expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes();
+ expect(taskStorage.getTask("id2")).andReturn(Optional.of(id2)).anyTimes();
+ expect(taskStorage.getTask("id3")).andReturn(Optional.of(id3)).anyTimes();
+ expect(
+
indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn(new
KafkaDataSourceMetadata(null)
+ ).anyTimes();
+
+ replayAll();
+
+ supervisor.start();
+
+ supervisor.checkpoint(
+ 0,
+ new KafkaDataSourceMetadata(new KafkaPartitions(topic,
Collections.emptyMap())),
+ new KafkaDataSourceMetadata(new KafkaPartitions(topic,
Collections.emptyMap()))
+ );
+
+ while (supervisor.getNoticesQueueSize() > 0) {
+ Thread.sleep(100);
+ }
+
+ verifyAll();
+
+ Assert.assertNotNull(serviceEmitter.getStackTrace());
+ Assert.assertEquals(
+ "WTH?! cannot find taskGroup [0] among all taskGroups [{}]",
+ serviceEmitter.getExceptionMessage()
+ );
+ Assert.assertEquals(ISE.class, serviceEmitter.getExceptionClass());
+ }
+
private void addSomeEvents(int numEventsPerPartition) throws Exception
{
try (final KafkaProducer<byte[], byte[]> kafkaProducer =
kafkaServer.newProducer()) {
@@ -2106,7 +2319,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
private KafkaIndexTask createKafkaIndexTask(
String id,
String dataSource,
- String sequenceName,
+ int taskGroupId,
KafkaPartitions startPartitions,
KafkaPartitions endPartitions,
DateTime minimumMessageTime,
@@ -2119,7 +2332,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
getDataSchema(dataSource),
tuningConfig,
new KafkaIOConfig(
- sequenceName,
+ taskGroupId,
+ "sequenceName-" + taskGroupId,
startPartitions,
endPartitions,
ImmutableMap.<String, String>of(),
@@ -2128,7 +2342,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
maximumMessageTime,
false
),
- ImmutableMap.<String, Object>of(),
+ Collections.emptyMap(),
null,
null,
rowIngestionMetersFactory
diff --git
a/indexing-service/src/main/java/io/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java
b/indexing-service/src/main/java/io/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java
index 8265f87..f1d11de 100644
---
a/indexing-service/src/main/java/io/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java
+++
b/indexing-service/src/main/java/io/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java
@@ -21,27 +21,28 @@ package io.druid.indexing.common.actions;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.base.Preconditions;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.DataSourceMetadata;
public class CheckPointDataSourceMetadataAction implements TaskAction<Boolean>
{
private final String supervisorId;
- private final String sequenceName;
+ private final int taskGroupId;
private final DataSourceMetadata previousCheckPoint;
private final DataSourceMetadata currentCheckPoint;
public CheckPointDataSourceMetadataAction(
@JsonProperty("supervisorId") String supervisorId,
- @JsonProperty("sequenceName") String sequenceName,
+ @JsonProperty("taskGroupId") Integer taskGroupId,
@JsonProperty("previousCheckPoint") DataSourceMetadata
previousCheckPoint,
@JsonProperty("currentCheckPoint") DataSourceMetadata currentCheckPoint
)
{
- this.supervisorId = supervisorId;
- this.sequenceName = sequenceName;
- this.previousCheckPoint = previousCheckPoint;
- this.currentCheckPoint = currentCheckPoint;
+ this.supervisorId = Preconditions.checkNotNull(supervisorId,
"supervisorId");
+ this.taskGroupId = Preconditions.checkNotNull(taskGroupId, "taskGroupId");
+ this.previousCheckPoint = Preconditions.checkNotNull(previousCheckPoint,
"previousCheckPoint");
+ this.currentCheckPoint = Preconditions.checkNotNull(currentCheckPoint,
"currentCheckPoint");
}
@JsonProperty
@@ -51,9 +52,9 @@ public class CheckPointDataSourceMetadataAction implements
TaskAction<Boolean>
}
@JsonProperty
- public String getSequenceName()
+ public int getTaskGroupId()
{
- return sequenceName;
+ return taskGroupId;
}
@JsonProperty
@@ -81,8 +82,12 @@ public class CheckPointDataSourceMetadataAction implements
TaskAction<Boolean>
Task task, TaskActionToolbox toolbox
)
{
- return toolbox.getSupervisorManager()
- .checkPointDataSourceMetadata(supervisorId, sequenceName,
previousCheckPoint, currentCheckPoint);
+ return toolbox.getSupervisorManager().checkPointDataSourceMetadata(
+ supervisorId,
+ taskGroupId,
+ previousCheckPoint,
+ currentCheckPoint
+ );
}
@Override
@@ -96,7 +101,7 @@ public class CheckPointDataSourceMetadataAction implements
TaskAction<Boolean>
{
return "CheckPointDataSourceMetadataAction{" +
"supervisorId='" + supervisorId + '\'' +
- ", sequenceName='" + sequenceName + '\'' +
+ ", taskGroupId='" + taskGroupId + '\'' +
", previousCheckPoint=" + previousCheckPoint +
", currentCheckPoint=" + currentCheckPoint +
'}';
diff --git
a/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java
b/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java
index dcdd014..f9a5564 100644
---
a/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java
+++
b/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java
@@ -165,9 +165,9 @@ public class SupervisorManager
public boolean checkPointDataSourceMetadata(
String supervisorId,
- @Nullable String sequenceName,
- @Nullable DataSourceMetadata previousDataSourceMetadata,
- @Nullable DataSourceMetadata currentDataSourceMetadata
+ int taskGroupId,
+ DataSourceMetadata previousDataSourceMetadata,
+ DataSourceMetadata currentDataSourceMetadata
)
{
try {
@@ -178,7 +178,7 @@ public class SupervisorManager
Preconditions.checkNotNull(supervisor, "supervisor could not be found");
- supervisor.lhs.checkpoint(sequenceName, previousDataSourceMetadata,
currentDataSourceMetadata);
+ supervisor.lhs.checkpoint(taskGroupId, previousDataSourceMetadata,
currentDataSourceMetadata);
return true;
}
catch (Exception e) {
diff --git
a/java-util/src/main/java/io/druid/java/util/emitter/EmittingLogger.java
b/java-util/src/main/java/io/druid/java/util/emitter/EmittingLogger.java
index 0020cf1..661ed17 100644
--- a/java-util/src/main/java/io/druid/java/util/emitter/EmittingLogger.java
+++ b/java-util/src/main/java/io/druid/java/util/emitter/EmittingLogger.java
@@ -35,6 +35,10 @@ import java.io.StringWriter;
*/
public class EmittingLogger extends Logger
{
+ public static final String EXCEPTION_TYPE_KEY = "exceptionType";
+ public static final String EXCEPTION_MESSAGE_KEY = "exceptionMessage";
+ public static final String EXCEPTION_STACK_TRACE_KEY = "exceptionStackTrace";
+
private static volatile ServiceEmitter emitter = null;
private final String className;
diff --git
a/server/src/main/java/io/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java
b/server/src/main/java/io/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java
index 26ed99c..0408104 100644
---
a/server/src/main/java/io/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java
+++
b/server/src/main/java/io/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java
@@ -83,9 +83,9 @@ public class NoopSupervisorSpec implements SupervisorSpec
@Override
public void checkpoint(
- @Nullable String sequenceName,
- @Nullable DataSourceMetadata previousCheckPoint,
- @Nullable DataSourceMetadata currentCheckPoint
+ int taskGroupId,
+ DataSourceMetadata previousCheckPoint,
+ DataSourceMetadata currentCheckPoint
)
{
diff --git
a/server/src/main/java/io/druid/indexing/overlord/supervisor/Supervisor.java
b/server/src/main/java/io/druid/indexing/overlord/supervisor/Supervisor.java
index 5afe912..04afac7 100644
--- a/server/src/main/java/io/druid/indexing/overlord/supervisor/Supervisor.java
+++ b/server/src/main/java/io/druid/indexing/overlord/supervisor/Supervisor.java
@@ -22,7 +22,6 @@ package io.druid.indexing.overlord.supervisor;
import com.google.common.collect.ImmutableMap;
import io.druid.indexing.overlord.DataSourceMetadata;
-import javax.annotation.Nullable;
import java.util.Map;
public interface Supervisor
@@ -52,13 +51,9 @@ public interface Supervisor
* for example - Kafka Supervisor uses this to merge and handoff segments
containing at least the data
* represented by {@param currentCheckpoint} DataSourceMetadata
*
- * @param sequenceName unique Identifier to figure out for which
sequence to do checkpointing
+ * @param taskGroupId unique Identifier to figure out for which
sequence to do checkpointing
* @param previousCheckPoint DataSourceMetadata checkpointed in previous call
* @param currentCheckPoint current DataSourceMetadata to be checkpointed
*/
- void checkpoint(
- @Nullable String sequenceName,
- @Nullable DataSourceMetadata previousCheckPoint,
- @Nullable DataSourceMetadata currentCheckPoint
- );
+ void checkpoint(int taskGroupId, DataSourceMetadata previousCheckPoint,
DataSourceMetadata currentCheckPoint);
}
diff --git
a/server/src/test/java/io/druid/server/metrics/ExceptionCapturingServiceEmitter.java
b/server/src/test/java/io/druid/server/metrics/ExceptionCapturingServiceEmitter.java
new file mode 100644
index 0000000..cc217c6
--- /dev/null
+++
b/server/src/test/java/io/druid/server/metrics/ExceptionCapturingServiceEmitter.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package io.druid.server.metrics;
+
+import io.druid.java.util.emitter.EmittingLogger;
+import io.druid.java.util.emitter.core.Event;
+import io.druid.java.util.emitter.service.ServiceEmitter;
+
+import javax.annotation.Nullable;
+import java.util.Map;
+
+public class ExceptionCapturingServiceEmitter extends ServiceEmitter
+{
+ private volatile Class exceptionClass;
+ private volatile String exceptionMessage;
+ private volatile String stackTrace;
+
+ public ExceptionCapturingServiceEmitter()
+ {
+ super("", "", null);
+ }
+
+ @Override
+ public void emit(Event event)
+ {
+ //noinspection unchecked
+ final Map<String, Object> dataMap = (Map<String, Object>)
event.toMap().get("data");
+ final Class exceptionClass = (Class)
dataMap.get(EmittingLogger.EXCEPTION_TYPE_KEY);
+ if (exceptionClass != null) {
+ final String exceptionMessage = (String)
dataMap.get(EmittingLogger.EXCEPTION_MESSAGE_KEY);
+ final String stackTrace = (String)
dataMap.get(EmittingLogger.EXCEPTION_STACK_TRACE_KEY);
+ this.exceptionClass = exceptionClass;
+ this.exceptionMessage = exceptionMessage;
+ this.stackTrace = stackTrace;
+ }
+ }
+
+ @Nullable
+ public Class getExceptionClass()
+ {
+ return exceptionClass;
+ }
+
+ @Nullable
+ public String getExceptionMessage()
+ {
+ return exceptionMessage;
+ }
+
+ @Nullable
+ public String getStackTrace()
+ {
+ return stackTrace;
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]