jon-wei closed pull request #5996: Fix NPE while handling CheckpointNotice in
KafkaSupervisor
URL: https://github.com/apache/incubator-druid/pull/5996
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/extensions-contrib/materialized-view-maintenance/src/main/java/io/druid/indexing/materializedview/MaterializedViewSupervisor.java
b/extensions-contrib/materialized-view-maintenance/src/main/java/io/druid/indexing/materializedview/MaterializedViewSupervisor.java
index eba35a4faff..fedda092c4d 100644
---
a/extensions-contrib/materialized-view-maintenance/src/main/java/io/druid/indexing/materializedview/MaterializedViewSupervisor.java
+++
b/extensions-contrib/materialized-view-maintenance/src/main/java/io/druid/indexing/materializedview/MaterializedViewSupervisor.java
@@ -53,7 +53,6 @@
import org.joda.time.Duration;
import org.joda.time.Interval;
-import javax.annotation.Nullable;
import java.io.IOException;
import java.util.List;
import java.util.Map;
@@ -240,11 +239,7 @@ public void reset(DataSourceMetadata dataSourceMetadata)
}
@Override
- public void checkpoint(
- @Nullable String sequenceName,
- @Nullable DataSourceMetadata previousCheckPoint,
- @Nullable DataSourceMetadata currentCheckPoint
- )
+ public void checkpoint(int taskGroupId, DataSourceMetadata
previousCheckPoint, DataSourceMetadata currentCheckPoint)
{
// do nothing
}
diff --git
a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
index 3a440b14fcb..a93fde611c5 100644
---
a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
+++
b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
@@ -600,12 +600,13 @@ public void onFailure(Throwable t)
sequences
);
requestPause();
- if (!toolbox.getTaskActionClient().submit(new
CheckPointDataSourceMetadataAction(
+ final CheckPointDataSourceMetadataAction checkpointAction = new
CheckPointDataSourceMetadataAction(
task.getDataSource(),
- ioConfig.getBaseSequenceName(),
+ ioConfig.getTaskGroupId(),
new KafkaDataSourceMetadata(new KafkaPartitions(topic,
sequenceToCheckpoint.getStartOffsets())),
new KafkaDataSourceMetadata(new KafkaPartitions(topic,
nextOffsets))
- ))) {
+ );
+ if (!toolbox.getTaskActionClient().submit(checkpointAction)) {
throw new ISE("Checkpoint request with offsets [%s] failed,
dying", nextOffsets);
}
}
diff --git
a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIOConfig.java
b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIOConfig.java
index 4dd3aaf3885..b6c1d765c95 100644
---
a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIOConfig.java
+++
b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIOConfig.java
@@ -26,6 +26,7 @@
import io.druid.segment.indexing.IOConfig;
import org.joda.time.DateTime;
+import javax.annotation.Nullable;
import java.util.Map;
public class KafkaIOConfig implements IOConfig
@@ -33,6 +34,8 @@
private static final boolean DEFAULT_USE_TRANSACTION = true;
private static final boolean DEFAULT_SKIP_OFFSET_GAPS = false;
+ @Nullable
+ private final Integer taskGroupId;
private final String baseSequenceName;
private final KafkaPartitions startPartitions;
private final KafkaPartitions endPartitions;
@@ -44,6 +47,7 @@
@JsonCreator
public KafkaIOConfig(
+ @JsonProperty("taskGroupId") @Nullable Integer taskGroupId, // can be
null for backward compabitility
@JsonProperty("baseSequenceName") String baseSequenceName,
@JsonProperty("startPartitions") KafkaPartitions startPartitions,
@JsonProperty("endPartitions") KafkaPartitions endPartitions,
@@ -54,6 +58,7 @@ public KafkaIOConfig(
@JsonProperty("skipOffsetGaps") Boolean skipOffsetGaps
)
{
+ this.taskGroupId = taskGroupId;
this.baseSequenceName = Preconditions.checkNotNull(baseSequenceName,
"baseSequenceName");
this.startPartitions = Preconditions.checkNotNull(startPartitions,
"startPartitions");
this.endPartitions = Preconditions.checkNotNull(endPartitions,
"endPartitions");
@@ -83,6 +88,13 @@ public KafkaIOConfig(
}
}
+ @Nullable
+ @JsonProperty
+ public Integer getTaskGroupId()
+ {
+ return taskGroupId;
+ }
+
@JsonProperty
public String getBaseSequenceName()
{
@@ -135,7 +147,8 @@ public boolean isSkipOffsetGaps()
public String toString()
{
return "KafkaIOConfig{" +
- "baseSequenceName='" + baseSequenceName + '\'' +
+ "taskGroupId=" + taskGroupId +
+ ", baseSequenceName='" + baseSequenceName + '\'' +
", startPartitions=" + startPartitions +
", endPartitions=" + endPartitions +
", consumerProperties=" + consumerProperties +
diff --git
a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java
b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java
index 8ea13efad0a..ed287fa0591 100644
---
a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java
+++
b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java
@@ -143,8 +143,7 @@
* time, there should only be up to a maximum of [taskCount]
actively-reading task groups (tracked in the [taskGroups]
* map) + zero or more pending-completion task groups (tracked in
[pendingCompletionTaskGroups]).
*/
- @VisibleForTesting
- static class TaskGroup
+ private static class TaskGroup
{
// This specifies the partitions and starting offsets for this task group.
It is set on group creation from the data
// in [partitionGroups] and never changes during the lifetime of this task
group, which will live until a task in
@@ -159,7 +158,7 @@
DateTime completionTimeout; // is set after signalTasksToFinish(); if not
done by timeout, take corrective action
final TreeMap<Integer, Map<Integer, Long>> sequenceOffsets = new
TreeMap<>();
- public TaskGroup(
+ TaskGroup(
ImmutableMap<Integer, Long> partitionOffsets,
Optional<DateTime> minimumMessageTime,
Optional<DateTime> maximumMessageTime
@@ -171,7 +170,7 @@ public TaskGroup(
this.sequenceOffsets.put(0, partitionOffsets);
}
- public int addNewCheckpoint(Map<Integer, Long> checkpoint)
+ int addNewCheckpoint(Map<Integer, Long> checkpoint)
{
sequenceOffsets.put(sequenceOffsets.lastKey() + 1, checkpoint);
return sequenceOffsets.lastKey();
@@ -212,9 +211,6 @@ public int addNewCheckpoint(Map<Integer, Long> checkpoint)
private final ConcurrentHashMap<Integer, ConcurrentHashMap<Integer, Long>>
partitionGroups = new ConcurrentHashMap<>();
// --------------------------------------------------------
- // BaseSequenceName -> TaskGroup
- private final ConcurrentHashMap<String, TaskGroup> sequenceTaskGroup = new
ConcurrentHashMap<>();
-
private final TaskStorage taskStorage;
private final TaskMaster taskMaster;
private final IndexerMetadataStorageCoordinator
indexerMetadataStorageCoordinator;
@@ -513,13 +509,9 @@ public void reset(DataSourceMetadata dataSourceMetadata)
}
@Override
- public void checkpoint(
- String sequenceName,
- DataSourceMetadata previousCheckpoint,
- DataSourceMetadata currentCheckpoint
- )
+ public void checkpoint(int taskGroupId, DataSourceMetadata
previousCheckpoint, DataSourceMetadata currentCheckpoint)
{
- Preconditions.checkNotNull(sequenceName, "Cannot checkpoint without a
sequence name");
+ Preconditions.checkNotNull(previousCheckpoint, "previousCheckpoint");
Preconditions.checkNotNull(currentCheckpoint, "current checkpoint cannot
be null");
Preconditions.checkArgument(
ioConfig.getTopic()
@@ -530,12 +522,14 @@ public void checkpoint(
((KafkaDataSourceMetadata)
currentCheckpoint).getKafkaPartitions().getTopic()
);
- log.info("Checkpointing [%s] for sequence [%s]", currentCheckpoint,
sequenceName);
- notices.add(new CheckpointNotice(
- sequenceName,
- (KafkaDataSourceMetadata) previousCheckpoint,
- (KafkaDataSourceMetadata) currentCheckpoint
- ));
+ log.info("Checkpointing [%s] for taskGroup [%s]", currentCheckpoint,
taskGroupId);
+ notices.add(
+ new CheckpointNotice(
+ taskGroupId,
+ (KafkaDataSourceMetadata) previousCheckpoint,
+ (KafkaDataSourceMetadata) currentCheckpoint
+ )
+ );
}
public void possiblyRegisterListener()
@@ -637,17 +631,17 @@ public void handle()
private class CheckpointNotice implements Notice
{
- final String sequenceName;
+ final int taskGroupId;
final KafkaDataSourceMetadata previousCheckpoint;
final KafkaDataSourceMetadata currentCheckpoint;
CheckpointNotice(
- String sequenceName,
+ int taskGroupId,
KafkaDataSourceMetadata previousCheckpoint,
KafkaDataSourceMetadata currentCheckpoint
)
{
- this.sequenceName = sequenceName;
+ this.taskGroupId = taskGroupId;
this.previousCheckpoint = previousCheckpoint;
this.currentCheckpoint = currentCheckpoint;
}
@@ -658,17 +652,12 @@ public void handle() throws ExecutionException,
InterruptedException
// check for consistency
// if already received request for this sequenceName and
dataSourceMetadata combination then return
- Preconditions.checkNotNull(
- sequenceTaskGroup.get(sequenceName),
- "WTH?! cannot find task group for this sequence [%s],
sequencesTaskGroup map [%s], taskGroups [%s]",
- sequenceName,
- sequenceTaskGroup,
- taskGroups
- );
- final TreeMap<Integer, Map<Integer, Long>> checkpoints =
sequenceTaskGroup.get(sequenceName).sequenceOffsets;
+ final TaskGroup taskGroup = taskGroups.get(taskGroupId);
+
+ if (isValidTaskGroup(taskGroup)) {
+ final TreeMap<Integer, Map<Integer, Long>> checkpoints =
taskGroup.sequenceOffsets;
- // check validity of previousCheckpoint if it is not null
- if (previousCheckpoint != null) {
+ // check validity of previousCheckpoint
int index = checkpoints.size();
for (int sequenceId : checkpoints.descendingKeySet()) {
Map<Integer, Long> checkpoint = checkpoints.get(sequenceId);
@@ -685,26 +674,39 @@ public void handle() throws ExecutionException,
InterruptedException
log.info("Already checkpointed with offsets [%s]",
checkpoints.lastEntry().getValue());
return;
}
- } else {
- // There cannot be more than one checkpoint when previous checkpoint
is null
- // as when the task starts they are sent existing checkpoints
- Preconditions.checkState(
- checkpoints.size() <= 1,
- "Got checkpoint request with null as previous check point, however
found more than one checkpoints"
+ final int taskGroupId = getTaskGroupIdForPartition(
+ currentCheckpoint.getKafkaPartitions()
+ .getPartitionOffsetMap()
+ .keySet()
+ .iterator()
+ .next()
);
- if (checkpoints.size() == 1) {
- log.info("Already checkpointed with dataSourceMetadata [%s]",
checkpoints.get(0));
- return;
+ final Map<Integer, Long> newCheckpoint =
checkpointTaskGroup(taskGroupId, false).get();
+ taskGroups.get(taskGroupId).addNewCheckpoint(newCheckpoint);
+ log.info("Handled checkpoint notice, new checkpoint is [%s] for
taskGroup [%s]", newCheckpoint, taskGroupId);
+ }
+ }
+
+ private boolean isValidTaskGroup(@Nullable TaskGroup taskGroup)
+ {
+ if (taskGroup == null) {
+ // taskGroup might be in pendingCompletionTaskGroups or partitionGroups
+ if (pendingCompletionTaskGroups.containsKey(taskGroupId)) {
+ log.warn(
+ "Ignoring checkpoint request because taskGroup[%d] has already
stopped indexing and is waiting for "
+ + "publishing segments",
+ taskGroupId
+ );
+ return false;
+ } else if (partitionGroups.containsKey(taskGroupId)) {
+ log.warn("Ignoring checkpoint request because taskGroup[%d] is
inactive", taskGroupId);
+ return false;
+ } else {
+ throw new ISE("WTH?! cannot find taskGroup [%s] among all taskGroups
[%s]", taskGroupId, taskGroups);
}
}
- final int taskGroupId =
getTaskGroupIdForPartition(currentCheckpoint.getKafkaPartitions()
-
.getPartitionOffsetMap()
-
.keySet()
-
.iterator()
-
.next());
- final Map<Integer, Long> newCheckpoint =
checkpointTaskGroup(taskGroupId, false).get();
- sequenceTaskGroup.get(sequenceName).addNewCheckpoint(newCheckpoint);
- log.info("Handled checkpoint notice, new checkpoint is [%s] for sequence
[%s]", newCheckpoint, sequenceName);
+
+ return true;
}
}
@@ -718,7 +720,6 @@ void resetInternal(DataSourceMetadata dataSourceMetadata)
taskGroups.values().forEach(this::killTasksInGroup);
taskGroups.clear();
partitionGroups.clear();
- sequenceTaskGroup.clear();
} else if (!(dataSourceMetadata instanceof KafkaDataSourceMetadata)) {
throw new IAE("Expected KafkaDataSourceMetadata but found instance of
[%s]", dataSourceMetadata.getClass());
} else {
@@ -778,8 +779,7 @@ void resetInternal(DataSourceMetadata dataSourceMetadata)
resetKafkaMetadata.getKafkaPartitions().getPartitionOffsetMap().keySet().forEach(partition
-> {
final int groupId = getTaskGroupIdForPartition(partition);
killTaskGroupForPartitions(ImmutableSet.of(partition));
- final TaskGroup removedGroup = taskGroups.remove(groupId);
- sequenceTaskGroup.remove(generateSequenceName(removedGroup));
+ taskGroups.remove(groupId);
partitionGroups.get(groupId).replaceAll((partitionId, offset) ->
NOT_SET);
});
} else {
@@ -955,9 +955,10 @@ private void updatePartitionDataFromKafka()
for (int partition = 0; partition < numPartitions; partition++) {
int taskGroupId = getTaskGroupIdForPartition(partition);
- partitionGroups.putIfAbsent(taskGroupId, new ConcurrentHashMap<Integer,
Long>());
-
- ConcurrentHashMap<Integer, Long> partitionMap =
partitionGroups.get(taskGroupId);
+ ConcurrentHashMap<Integer, Long> partitionMap =
partitionGroups.computeIfAbsent(
+ taskGroupId,
+ k -> new ConcurrentHashMap<>()
+ );
// The starting offset for a new partition in [partitionGroups] is
initially set to NOT_SET; when a new task group
// is created and is assigned partitions, if the offset in
[partitionGroups] is NOT_SET it will take the starting
@@ -1087,23 +1088,21 @@ public Boolean apply(KafkaIndexTask.Status status)
}
return false;
} else {
- final TaskGroup taskGroup = new TaskGroup(
- ImmutableMap.copyOf(
- kafkaTask.getIOConfig()
- .getStartPartitions()
- .getPartitionOffsetMap()
- ),
kafkaTask.getIOConfig().getMinimumMessageTime(),
- kafkaTask.getIOConfig().getMaximumMessageTime()
- );
- if (taskGroups.putIfAbsent(
+ final TaskGroup taskGroup =
taskGroups.computeIfAbsent(
taskGroupId,
- taskGroup
- ) == null) {
-
sequenceTaskGroup.put(generateSequenceName(taskGroup),
taskGroups.get(taskGroupId));
- log.info("Created new task group [%d]",
taskGroupId);
- }
+ k -> {
+ log.info("Creating a new task group for
taskGroupId[%d]", taskGroupId);
+ return new TaskGroup(
+ ImmutableMap.copyOf(
+
kafkaTask.getIOConfig().getStartPartitions().getPartitionOffsetMap()
+ ),
+
kafkaTask.getIOConfig().getMinimumMessageTime(),
+
kafkaTask.getIOConfig().getMaximumMessageTime()
+ );
+ }
+ );
taskGroupsToVerify.add(taskGroupId);
-
taskGroups.get(taskGroupId).tasks.putIfAbsent(taskId, new TaskData());
+ taskGroup.tasks.putIfAbsent(taskId, new
TaskData());
}
}
return true;
@@ -1256,7 +1255,6 @@ public void onFailure(Throwable t)
// killing all tasks or no task left in the group ?
// clear state about the taskgroup so that get latest offset information
is fetched from metadata store
log.warn("Clearing task group [%d] information as no valid tasks left
the group", groupId);
- sequenceTaskGroup.remove(generateSequenceName(taskGroup));
taskGroups.remove(groupId);
partitionGroups.get(groupId).replaceAll((partition, offset) -> NOT_SET);
}
@@ -1281,9 +1279,10 @@ private void
addDiscoveredTaskToPendingCompletionTaskGroups(
Map<Integer, Long> startingPartitions
)
{
- pendingCompletionTaskGroups.putIfAbsent(groupId,
Lists.<TaskGroup>newCopyOnWriteArrayList());
-
- CopyOnWriteArrayList<TaskGroup> taskGroupList =
pendingCompletionTaskGroups.get(groupId);
+ final CopyOnWriteArrayList<TaskGroup> taskGroupList =
pendingCompletionTaskGroups.computeIfAbsent(
+ groupId,
+ k -> new CopyOnWriteArrayList<>()
+ );
for (TaskGroup taskGroup : taskGroupList) {
if (taskGroup.partitionOffsets.equals(startingPartitions)) {
if (taskGroup.tasks.putIfAbsent(taskId, new TaskData()) == null) {
@@ -1411,8 +1410,7 @@ private void checkTaskDuration() throws
InterruptedException, ExecutionException
if (endOffsets != null) {
// set a timeout and put this group in pendingCompletionTaskGroups so
that it can be monitored for completion
group.completionTimeout =
DateTimes.nowUtc().plus(ioConfig.getCompletionTimeout());
- pendingCompletionTaskGroups.putIfAbsent(groupId,
Lists.<TaskGroup>newCopyOnWriteArrayList());
- pendingCompletionTaskGroups.get(groupId).add(group);
+ pendingCompletionTaskGroups.computeIfAbsent(groupId, k -> new
CopyOnWriteArrayList<>()).add(group);
// set endOffsets as the next startOffsets
for (Map.Entry<Integer, Long> entry : endOffsets.entrySet()) {
@@ -1432,7 +1430,6 @@ private void checkTaskDuration() throws
InterruptedException, ExecutionException
partitionGroups.get(groupId).replaceAll((partition, offset) ->
NOT_SET);
}
- sequenceTaskGroup.remove(generateSequenceName(group));
// remove this task group from the list of current task groups now that
it has been handled
taskGroups.remove(groupId);
}
@@ -1456,7 +1453,8 @@ private void checkTaskDuration() throws
InterruptedException, ExecutionException
// metadata store (which will have advanced if we succeeded in
publishing and will remain the same if publishing
// failed and we need to re-ingest)
return Futures.transform(
- stopTasksInGroup(taskGroup), new Function<Object, Map<Integer,
Long>>()
+ stopTasksInGroup(taskGroup),
+ new Function<Object, Map<Integer, Long>>()
{
@Nullable
@Override
@@ -1625,15 +1623,15 @@ private void checkPendingCompletionTasks() throws
ExecutionException, Interrupte
log.warn("All tasks in group [%d] failed to publish, killing all
tasks for these partitions", groupId);
} else {
log.makeAlert(
- "No task in [%s] succeeded before the completion timeout
elapsed [%s]!",
+ "No task in [%s] for taskGroup [%d] succeeded before the
completion timeout elapsed [%s]!",
group.taskIds(),
+ groupId,
ioConfig.getCompletionTimeout()
).emit();
}
// reset partitions offsets for this task group so that they will be
re-read from metadata storage
partitionGroups.get(groupId).replaceAll((partition, offset) ->
NOT_SET);
- sequenceTaskGroup.remove(generateSequenceName(group));
// kill all the tasks in this pending completion group
killTasksInGroup(group);
// set a flag so the other pending completion groups for this set of
partitions will also stop
@@ -1693,7 +1691,6 @@ private void checkCurrentTaskState() throws
ExecutionException, InterruptedExcep
// be recreated with the next set of offsets
if (taskData.status.isSuccess()) {
futures.add(stopTasksInGroup(taskGroup));
- sequenceTaskGroup.remove(generateSequenceName(taskGroup));
iTaskGroups.remove();
break;
}
@@ -1735,7 +1732,6 @@ void createNewTasks() throws JsonProcessingException
groupId,
taskGroup
);
- sequenceTaskGroup.put(generateSequenceName(taskGroup),
taskGroups.get(groupId));
}
}
@@ -1778,6 +1774,7 @@ private void createKafkaTasksForGroup(int groupId, int
replicas) throws JsonProc
DateTime maximumMessageTime =
taskGroups.get(groupId).maximumMessageTime.orNull();
KafkaIOConfig kafkaIOConfig = new KafkaIOConfig(
+ groupId,
sequenceName,
new KafkaPartitions(ioConfig.getTopic(), startPartitions),
new KafkaPartitions(ioConfig.getTopic(), endPartitions),
@@ -1944,7 +1941,7 @@ private boolean isTaskCurrent(int taskGroupId, String
taskId)
}
}
- private ListenableFuture<?> stopTasksInGroup(TaskGroup taskGroup)
+ private ListenableFuture<?> stopTasksInGroup(@Nullable TaskGroup taskGroup)
{
if (taskGroup == null) {
return Futures.immediateFuture(null);
@@ -2289,6 +2286,28 @@ Runnable updateCurrentAndLatestOffsets()
return allStats;
}
+ @VisibleForTesting
+ @Nullable
+ TaskGroup removeTaskGroup(int taskGroupId)
+ {
+ return taskGroups.remove(taskGroupId);
+ }
+
+ @VisibleForTesting
+ void moveTaskGroupToPendingCompletion(int taskGroupId)
+ {
+ final TaskGroup taskGroup = taskGroups.remove(taskGroupId);
+ if (taskGroup != null) {
+ pendingCompletionTaskGroups.computeIfAbsent(taskGroupId, k -> new
CopyOnWriteArrayList<>()).add(taskGroup);
+ }
+ }
+
+ @VisibleForTesting
+ int getNoticesQueueSize()
+ {
+ return notices.size();
+ }
+
private static class StatsFromTaskResult
{
private final String groupId;
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIOConfigTest.java
b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIOConfigTest.java
index 3bc55e277cb..050dba753b8 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIOConfigTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIOConfigTest.java
@@ -50,6 +50,7 @@ public void testSerdeWithDefaults() throws Exception
{
String jsonStr = "{\n"
+ " \"type\": \"kafka\",\n"
+ + " \"taskGroupId\": 0,\n"
+ " \"baseSequenceName\": \"my-sequence-name\",\n"
+ " \"startPartitions\": {\"topic\":\"mytopic\",
\"partitionOffsetMap\" : {\"0\":1, \"1\":10}},\n"
+ " \"endPartitions\": {\"topic\":\"mytopic\",
\"partitionOffsetMap\" : {\"0\":15, \"1\":200}},\n"
@@ -82,6 +83,7 @@ public void testSerdeWithNonDefaults() throws Exception
{
String jsonStr = "{\n"
+ " \"type\": \"kafka\",\n"
+ + " \"taskGroupId\": 0,\n"
+ " \"baseSequenceName\": \"my-sequence-name\",\n"
+ " \"startPartitions\": {\"topic\":\"mytopic\",
\"partitionOffsetMap\" : {\"0\":1, \"1\":10}},\n"
+ " \"endPartitions\": {\"topic\":\"mytopic\",
\"partitionOffsetMap\" : {\"0\":15, \"1\":200}},\n"
@@ -118,6 +120,7 @@ public void testBaseSequenceNameRequired() throws Exception
{
String jsonStr = "{\n"
+ " \"type\": \"kafka\",\n"
+ + " \"taskGroupId\": 0,\n"
+ " \"startPartitions\": {\"topic\":\"mytopic\",
\"partitionOffsetMap\" : {\"0\":1, \"1\":10}},\n"
+ " \"endPartitions\": {\"topic\":\"mytopic\",
\"partitionOffsetMap\" : {\"0\":15, \"1\":200}},\n"
+ " \"consumerProperties\":
{\"bootstrap.servers\":\"localhost:9092\"},\n"
@@ -137,6 +140,7 @@ public void testStartPartitionsRequired() throws Exception
{
String jsonStr = "{\n"
+ " \"type\": \"kafka\",\n"
+ + " \"taskGroupId\": 0,\n"
+ " \"baseSequenceName\": \"my-sequence-name\",\n"
+ " \"endPartitions\": {\"topic\":\"mytopic\",
\"partitionOffsetMap\" : {\"0\":15, \"1\":200}},\n"
+ " \"consumerProperties\":
{\"bootstrap.servers\":\"localhost:9092\"},\n"
@@ -156,6 +160,7 @@ public void testEndPartitionsRequired() throws Exception
{
String jsonStr = "{\n"
+ " \"type\": \"kafka\",\n"
+ + " \"taskGroupId\": 0,\n"
+ " \"baseSequenceName\": \"my-sequence-name\",\n"
+ " \"startPartitions\": {\"topic\":\"mytopic\",
\"partitionOffsetMap\" : {\"0\":1, \"1\":10}},\n"
+ " \"consumerProperties\":
{\"bootstrap.servers\":\"localhost:9092\"},\n"
@@ -175,6 +180,7 @@ public void testConsumerPropertiesRequired() throws
Exception
{
String jsonStr = "{\n"
+ " \"type\": \"kafka\",\n"
+ + " \"taskGroupId\": 0,\n"
+ " \"baseSequenceName\": \"my-sequence-name\",\n"
+ " \"startPartitions\": {\"topic\":\"mytopic\",
\"partitionOffsetMap\" : {\"0\":1, \"1\":10}},\n"
+ " \"endPartitions\": {\"topic\":\"mytopic\",
\"partitionOffsetMap\" : {\"0\":15, \"1\":200}},\n"
@@ -194,6 +200,7 @@ public void testStartAndEndTopicMatch() throws Exception
{
String jsonStr = "{\n"
+ " \"type\": \"kafka\",\n"
+ + " \"taskGroupId\": 0,\n"
+ " \"baseSequenceName\": \"my-sequence-name\",\n"
+ " \"startPartitions\": {\"topic\":\"mytopic\",
\"partitionOffsetMap\" : {\"0\":1, \"1\":10}},\n"
+ " \"endPartitions\": {\"topic\":\"other\",
\"partitionOffsetMap\" : {\"0\":15, \"1\":200}},\n"
@@ -214,6 +221,7 @@ public void testStartAndEndPartitionSetMatch() throws
Exception
{
String jsonStr = "{\n"
+ " \"type\": \"kafka\",\n"
+ + " \"taskGroupId\": 0,\n"
+ " \"baseSequenceName\": \"my-sequence-name\",\n"
+ " \"startPartitions\": {\"topic\":\"mytopic\",
\"partitionOffsetMap\" : {\"0\":1, \"1\":10}},\n"
+ " \"endPartitions\": {\"topic\":\"mytopic\",
\"partitionOffsetMap\" : {\"0\":15}},\n"
@@ -234,6 +242,7 @@ public void testEndOffsetGreaterThanStart() throws Exception
{
String jsonStr = "{\n"
+ " \"type\": \"kafka\",\n"
+ + " \"taskGroupId\": 0,\n"
+ " \"baseSequenceName\": \"my-sequence-name\",\n"
+ " \"startPartitions\": {\"topic\":\"mytopic\",
\"partitionOffsetMap\" : {\"0\":1, \"1\":10}},\n"
+ " \"endPartitions\": {\"topic\":\"mytopic\",
\"partitionOffsetMap\" : {\"0\":15, \"1\":2}},\n"
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java
b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java
index 15f4bb07775..411fff9168e 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -262,21 +262,21 @@ public KafkaIndexTaskTest(boolean
isIncrementalHandoffSupported)
private static List<ProducerRecord<byte[], byte[]>> generateRecords(String
topic)
{
return ImmutableList.of(
- new ProducerRecord<byte[], byte[]>(topic, 0, null, JB("2008", "a",
"y", "10", "20.0", "1.0")),
- new ProducerRecord<byte[], byte[]>(topic, 0, null, JB("2009", "b",
"y", "10", "20.0", "1.0")),
- new ProducerRecord<byte[], byte[]>(topic, 0, null, JB("2010", "c",
"y", "10", "20.0", "1.0")),
- new ProducerRecord<byte[], byte[]>(topic, 0, null, JB("2011", "d",
"y", "10", "20.0", "1.0")),
- new ProducerRecord<byte[], byte[]>(topic, 0, null, JB("2011", "e",
"y", "10", "20.0", "1.0")),
- new ProducerRecord<byte[], byte[]>(topic, 0, null,
JB("246140482-04-24T15:36:27.903Z", "x", "z", "10", "20.0", "1.0")),
- new ProducerRecord<byte[], byte[]>(topic, 0, null,
StringUtils.toUtf8("unparseable")),
- new ProducerRecord<byte[], byte[]>(topic, 0, null,
StringUtils.toUtf8("unparseable2")),
- new ProducerRecord<byte[], byte[]>(topic, 0, null, null),
- new ProducerRecord<byte[], byte[]>(topic, 0, null, JB("2013", "f",
"y", "10", "20.0", "1.0")),
- new ProducerRecord<byte[], byte[]>(topic, 0, null, JB("2049", "f",
"y", "notanumber", "20.0", "1.0")),
- new ProducerRecord<byte[], byte[]>(topic, 0, null, JB("2049", "f",
"y", "10", "notanumber", "1.0")),
- new ProducerRecord<byte[], byte[]>(topic, 0, null, JB("2049", "f",
"y", "10", "20.0", "notanumber")),
- new ProducerRecord<byte[], byte[]>(topic, 1, null, JB("2012", "g",
"y", "10", "20.0", "1.0")),
- new ProducerRecord<byte[], byte[]>(topic, 1, null, JB("2011", "h",
"y", "10", "20.0", "1.0"))
+ new ProducerRecord<>(topic, 0, null, JB("2008", "a", "y", "10",
"20.0", "1.0")),
+ new ProducerRecord<>(topic, 0, null, JB("2009", "b", "y", "10",
"20.0", "1.0")),
+ new ProducerRecord<>(topic, 0, null, JB("2010", "c", "y", "10",
"20.0", "1.0")),
+ new ProducerRecord<>(topic, 0, null, JB("2011", "d", "y", "10",
"20.0", "1.0")),
+ new ProducerRecord<>(topic, 0, null, JB("2011", "e", "y", "10",
"20.0", "1.0")),
+ new ProducerRecord<>(topic, 0, null,
JB("246140482-04-24T15:36:27.903Z", "x", "z", "10", "20.0", "1.0")),
+ new ProducerRecord<>(topic, 0, null,
StringUtils.toUtf8("unparseable")),
+ new ProducerRecord<>(topic, 0, null,
StringUtils.toUtf8("unparseable2")),
+ new ProducerRecord<>(topic, 0, null, null),
+ new ProducerRecord<>(topic, 0, null, JB("2013", "f", "y", "10",
"20.0", "1.0")),
+ new ProducerRecord<>(topic, 0, null, JB("2049", "f", "y",
"notanumber", "20.0", "1.0")),
+ new ProducerRecord<>(topic, 0, null, JB("2049", "f", "y", "10",
"notanumber", "1.0")),
+ new ProducerRecord<>(topic, 0, null, JB("2049", "f", "y", "10",
"20.0", "notanumber")),
+ new ProducerRecord<>(topic, 1, null, JB("2012", "g", "y", "10",
"20.0", "1.0")),
+ new ProducerRecord<>(topic, 1, null, JB("2011", "h", "y", "10",
"20.0", "1.0"))
);
}
@@ -377,6 +377,7 @@ public void testRunAfterDataInserted() throws Exception
final KafkaIndexTask task = createTask(
null,
new KafkaIOConfig(
+ 0,
"sequence0",
new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
@@ -418,6 +419,7 @@ public void testRunBeforeDataInserted() throws Exception
final KafkaIndexTask task = createTask(
null,
new KafkaIOConfig(
+ 0,
"sequence0",
new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
@@ -493,6 +495,7 @@ public void testIncrementalHandOff() throws Exception
final KafkaIndexTask task = createTask(
null,
new KafkaIOConfig(
+ 0,
baseSequenceName,
startPartitions,
endPartitions,
@@ -514,14 +517,16 @@ public void testIncrementalHandOff() throws Exception
Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
Assert.assertEquals(1, checkpointRequestsHash.size());
- Assert.assertTrue(checkpointRequestsHash.contains(
- Objects.hash(
- DATA_SCHEMA.getDataSource(),
- baseSequenceName,
- new KafkaDataSourceMetadata(startPartitions),
- new KafkaDataSourceMetadata(new KafkaPartitions(topic,
currentOffsets))
+ Assert.assertTrue(
+ checkpointRequestsHash.contains(
+ Objects.hash(
+ DATA_SCHEMA.getDataSource(),
+ 0,
+ new KafkaDataSourceMetadata(startPartitions),
+ new KafkaDataSourceMetadata(new KafkaPartitions(topic,
currentOffsets))
+ )
)
- ));
+ );
// Check metrics
Assert.assertEquals(8,
task.getRunner().getRowIngestionMeters().getProcessed());
@@ -581,6 +586,7 @@ public void testTimeBasedIncrementalHandOff() throws
Exception
final KafkaIndexTask task = createTask(
null,
new KafkaIOConfig(
+ 0,
baseSequenceName,
startPartitions,
endPartitions,
@@ -603,14 +609,16 @@ public void testTimeBasedIncrementalHandOff() throws
Exception
Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
Assert.assertEquals(1, checkpointRequestsHash.size());
- Assert.assertTrue(checkpointRequestsHash.contains(
- Objects.hash(
- DATA_SCHEMA.getDataSource(),
- baseSequenceName,
- new KafkaDataSourceMetadata(startPartitions),
- new KafkaDataSourceMetadata(new KafkaPartitions(topic,
checkpoint.getPartitionOffsetMap()))
+ Assert.assertTrue(
+ checkpointRequestsHash.contains(
+ Objects.hash(
+ DATA_SCHEMA.getDataSource(),
+ 0,
+ new KafkaDataSourceMetadata(startPartitions),
+ new KafkaDataSourceMetadata(new KafkaPartitions(topic,
checkpoint.getPartitionOffsetMap()))
+ )
)
- ));
+ );
// Check metrics
Assert.assertEquals(2,
task.getRunner().getRowIngestionMeters().getProcessed());
@@ -637,6 +645,7 @@ public void testRunWithMinimumMessageTime() throws Exception
final KafkaIndexTask task = createTask(
null,
new KafkaIOConfig(
+ 0,
"sequence0",
new KafkaPartitions(topic, ImmutableMap.of(0, 0L)),
new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
@@ -690,6 +699,7 @@ public void testRunWithMaximumMessageTime() throws Exception
final KafkaIndexTask task = createTask(
null,
new KafkaIOConfig(
+ 0,
"sequence0",
new KafkaPartitions(topic, ImmutableMap.of(0, 0L)),
new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
@@ -753,6 +763,7 @@ public void testRunWithTransformSpec() throws Exception
)
),
new KafkaIOConfig(
+ 0,
"sequence0",
new KafkaPartitions(topic, ImmutableMap.of(0, 0L)),
new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
@@ -812,6 +823,7 @@ public void testRunOnNothing() throws Exception
final KafkaIndexTask task = createTask(
null,
new KafkaIOConfig(
+ 0,
"sequence0",
new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
@@ -852,6 +864,7 @@ public void testHandoffConditionTimeoutWhenHandoffOccurs()
throws Exception
final KafkaIndexTask task = createTask(
null,
new KafkaIOConfig(
+ 0,
"sequence0",
new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
@@ -903,6 +916,7 @@ public void
testHandoffConditionTimeoutWhenHandoffDoesNotOccur() throws Exceptio
final KafkaIndexTask task = createTask(
null,
new KafkaIOConfig(
+ 0,
"sequence0",
new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
@@ -957,6 +971,7 @@ public void testReportParseExceptions() throws Exception
final KafkaIndexTask task = createTask(
null,
new KafkaIOConfig(
+ 0,
"sequence0",
new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
new KafkaPartitions(topic, ImmutableMap.of(0, 7L)),
@@ -1000,6 +1015,7 @@ public void testMultipleParseExceptionsSuccess() throws
Exception
final KafkaIndexTask task = createTask(
null,
new KafkaIOConfig(
+ 0,
"sequence0",
new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
new KafkaPartitions(topic, ImmutableMap.of(0, 13L)),
@@ -1081,6 +1097,7 @@ public void testMultipleParseExceptionsFailure() throws
Exception
final KafkaIndexTask task = createTask(
null,
new KafkaIOConfig(
+ 0,
"sequence0",
new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
new KafkaPartitions(topic, ImmutableMap.of(0, 10L)),
@@ -1140,6 +1157,7 @@ public void testRunReplicas() throws Exception
final KafkaIndexTask task1 = createTask(
null,
new KafkaIOConfig(
+ 0,
"sequence0",
new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
@@ -1153,6 +1171,7 @@ public void testRunReplicas() throws Exception
final KafkaIndexTask task2 = createTask(
null,
new KafkaIOConfig(
+ 0,
"sequence0",
new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
@@ -1206,6 +1225,7 @@ public void testRunConflicting() throws Exception
final KafkaIndexTask task1 = createTask(
null,
new KafkaIOConfig(
+ 0,
"sequence0",
new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
@@ -1219,6 +1239,7 @@ public void testRunConflicting() throws Exception
final KafkaIndexTask task2 = createTask(
null,
new KafkaIOConfig(
+ 1,
"sequence1",
new KafkaPartitions(topic, ImmutableMap.of(0, 3L)),
new KafkaPartitions(topic, ImmutableMap.of(0, 10L)),
@@ -1273,6 +1294,7 @@ public void testRunConflictingWithoutTransactions()
throws Exception
final KafkaIndexTask task1 = createTask(
null,
new KafkaIOConfig(
+ 0,
"sequence0",
new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
@@ -1286,6 +1308,7 @@ public void testRunConflictingWithoutTransactions()
throws Exception
final KafkaIndexTask task2 = createTask(
null,
new KafkaIOConfig(
+ 1,
"sequence1",
new KafkaPartitions(topic, ImmutableMap.of(0, 3L)),
new KafkaPartitions(topic, ImmutableMap.of(0, 10L)),
@@ -1345,6 +1368,7 @@ public void testRunOneTaskTwoPartitions() throws Exception
final KafkaIndexTask task = createTask(
null,
new KafkaIOConfig(
+ 0,
"sequence0",
new KafkaPartitions(topic, ImmutableMap.of(0, 2L, 1, 0L)),
new KafkaPartitions(topic, ImmutableMap.of(0, 5L, 1, 2L)),
@@ -1409,6 +1433,7 @@ public void testRunTwoTasksTwoPartitions() throws
Exception
final KafkaIndexTask task1 = createTask(
null,
new KafkaIOConfig(
+ 0,
"sequence0",
new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
@@ -1422,6 +1447,7 @@ public void testRunTwoTasksTwoPartitions() throws
Exception
final KafkaIndexTask task2 = createTask(
null,
new KafkaIOConfig(
+ 1,
"sequence1",
new KafkaPartitions(topic, ImmutableMap.of(1, 0L)),
new KafkaPartitions(topic, ImmutableMap.of(1, 1L)),
@@ -1477,6 +1503,7 @@ public void testRestore() throws Exception
final KafkaIndexTask task1 = createTask(
null,
new KafkaIOConfig(
+ 0,
"sequence0",
new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
@@ -1513,6 +1540,7 @@ public void testRestore() throws Exception
final KafkaIndexTask task2 = createTask(
task1.getId(),
new KafkaIOConfig(
+ 0,
"sequence0",
new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
@@ -1564,6 +1592,7 @@ public void testRunWithPauseAndResume() throws Exception
final KafkaIndexTask task = createTask(
null,
new KafkaIOConfig(
+ 0,
"sequence0",
new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
@@ -1647,6 +1676,7 @@ public void
testRunWithOffsetOutOfRangeExceptionAndPause() throws Exception
final KafkaIndexTask task = createTask(
null,
new KafkaIOConfig(
+ 0,
"sequence0",
new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
@@ -1685,6 +1715,7 @@ public void
testRunWithOffsetOutOfRangeExceptionAndNextOffsetGreaterThanLeastAva
final KafkaIndexTask task = createTask(
null,
new KafkaIOConfig(
+ 0,
"sequence0",
new KafkaPartitions(topic, ImmutableMap.of(0, 200L)),
new KafkaPartitions(topic, ImmutableMap.of(0, 500L)),
@@ -1737,6 +1768,7 @@ public void
testRunContextSequenceAheadOfStartingOffsets() throws Exception
final KafkaIndexTask task = createTask(
null,
new KafkaIOConfig(
+ 0,
"sequence0",
// task should ignore these and use sequence info sent in the
context
new KafkaPartitions(topic, ImmutableMap.of(0, 0L)),
@@ -2026,18 +2058,20 @@ private void makeToolboxFactory() throws IOException
@Override
public boolean checkPointDataSourceMetadata(
String supervisorId,
- @Nullable String sequenceName,
+ int taskGroupId,
@Nullable DataSourceMetadata previousDataSourceMetadata,
@Nullable DataSourceMetadata currentDataSourceMetadata
)
{
log.info("Adding checkpoint hash to the set");
- checkpointRequestsHash.add(Objects.hash(
- supervisorId,
- sequenceName,
- previousDataSourceMetadata,
- currentDataSourceMetadata
- ));
+ checkpointRequestsHash.add(
+ Objects.hash(
+ supervisorId,
+ taskGroupId,
+ previousDataSourceMetadata,
+ currentDataSourceMetadata
+ )
+ );
return true;
}
}
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index 86648fef282..f0f6033eea3 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -19,6 +19,7 @@
package io.druid.indexing.kafka.supervisor;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
@@ -61,6 +62,7 @@
import io.druid.java.util.common.granularity.Granularities;
import io.druid.java.util.common.parsers.JSONPathFieldSpec;
import io.druid.java.util.common.parsers.JSONPathSpec;
+import io.druid.java.util.emitter.EmittingLogger;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.segment.TestHelper;
@@ -70,6 +72,7 @@
import io.druid.segment.realtime.FireDepartment;
import io.druid.server.metrics.DruidMonitorSchedulerConfig;
import io.druid.server.metrics.NoopServiceEmitter;
+import io.druid.server.metrics.ExceptionCapturingServiceEmitter;
import org.apache.curator.test.TestingCluster;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
@@ -99,7 +102,9 @@
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
+import java.util.concurrent.TimeoutException;
import static org.easymock.EasyMock.anyBoolean;
import static org.easymock.EasyMock.anyObject;
@@ -141,6 +146,7 @@
private TaskQueue taskQueue;
private String topic;
private RowIngestionMetersFactory rowIngestionMetersFactory;
+ private ExceptionCapturingServiceEmitter serviceEmitter;
private static String getTopic()
{
@@ -213,6 +219,8 @@ public void setupTest()
topic = getTopic();
rowIngestionMetersFactory = new TestUtils().getRowIngestionMetersFactory();
+ serviceEmitter = new ExceptionCapturingServiceEmitter();
+ EmittingLogger.registerEmitter(serviceEmitter);
}
@After
@@ -553,7 +561,7 @@ public void testKillIncompatibleTasks() throws Exception
Task id1 = createKafkaIndexTask(
"id1",
DATASOURCE,
- "index_kafka_testDS__some_other_sequenceName",
+ 1,
new KafkaPartitions("topic", ImmutableMap.of(0, 0L)),
new KafkaPartitions("topic", ImmutableMap.of(0, 10L)),
null,
@@ -564,7 +572,7 @@ public void testKillIncompatibleTasks() throws Exception
Task id2 = createKafkaIndexTask(
"id2",
DATASOURCE,
- "sequenceName-0",
+ 0,
new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 1, 0L, 2, 0L)),
new KafkaPartitions("topic", ImmutableMap.of(0, 333L, 1, 333L, 2,
333L)),
null,
@@ -575,7 +583,7 @@ public void testKillIncompatibleTasks() throws Exception
Task id3 = createKafkaIndexTask(
"id3",
DATASOURCE,
- "index_kafka_testDS__some_other_sequenceName",
+ 1,
new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 1, 0L, 2, 1L)),
new KafkaPartitions("topic", ImmutableMap.of(0, 333L, 1, 333L, 2,
330L)),
null,
@@ -586,7 +594,7 @@ public void testKillIncompatibleTasks() throws Exception
Task id4 = createKafkaIndexTask(
"id4",
"other-datasource",
- "index_kafka_testDS_d927edff33c4b3f",
+ 2,
new KafkaPartitions("topic", ImmutableMap.of(0, 0L)),
new KafkaPartitions("topic", ImmutableMap.of(0, 10L)),
null,
@@ -634,7 +642,9 @@ public void testKillIncompatibleTasks() throws Exception
TreeMap<Integer, Map<Integer, Long>> checkpoints = new TreeMap<>();
checkpoints.put(0, ImmutableMap.of(0, 0L, 1, 0L, 2, 0L));
- expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"),
anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(2);
+ expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"),
anyBoolean()))
+ .andReturn(Futures.immediateFuture(checkpoints))
+ .times(2);
replayAll();
@@ -652,7 +662,7 @@ public void testKillBadPartitionAssignment() throws
Exception
Task id1 = createKafkaIndexTask(
"id1",
DATASOURCE,
- "sequenceName-0",
+ 0,
new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 2, 0L)),
new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 2,
Long.MAX_VALUE)),
null,
@@ -661,7 +671,7 @@ public void testKillBadPartitionAssignment() throws
Exception
Task id2 = createKafkaIndexTask(
"id2",
DATASOURCE,
- "sequenceName-1",
+ 1,
new KafkaPartitions("topic", ImmutableMap.of(1, 0L)),
new KafkaPartitions("topic", ImmutableMap.of(1, Long.MAX_VALUE)),
null,
@@ -670,7 +680,7 @@ public void testKillBadPartitionAssignment() throws
Exception
Task id3 = createKafkaIndexTask(
"id3",
DATASOURCE,
- "sequenceName-0",
+ 0,
new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 1, 0L, 2, 0L)),
new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1,
Long.MAX_VALUE, 2, Long.MAX_VALUE)),
null,
@@ -679,7 +689,7 @@ public void testKillBadPartitionAssignment() throws
Exception
Task id4 = createKafkaIndexTask(
"id4",
DATASOURCE,
- "sequenceName-0",
+ 0,
new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 1, 0L)),
new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1,
Long.MAX_VALUE)),
null,
@@ -688,7 +698,7 @@ public void testKillBadPartitionAssignment() throws
Exception
Task id5 = createKafkaIndexTask(
"id5",
DATASOURCE,
- "sequenceName-0",
+ 0,
new KafkaPartitions("topic", ImmutableMap.of(1, 0L, 2, 0L)),
new KafkaPartitions("topic", ImmutableMap.of(1, Long.MAX_VALUE, 2,
Long.MAX_VALUE)),
null,
@@ -727,8 +737,12 @@ public void testKillBadPartitionAssignment() throws
Exception
checkpoints1.put(0, ImmutableMap.of(0, 0L, 2, 0L));
TreeMap<Integer, Map<Integer, Long>> checkpoints2 = new TreeMap<>();
checkpoints2.put(0, ImmutableMap.of(1, 0L));
- expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"),
anyBoolean())).andReturn(Futures.immediateFuture(checkpoints1)).times(1);
- expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"),
anyBoolean())).andReturn(Futures.immediateFuture(checkpoints2)).times(1);
+ expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"),
anyBoolean()))
+ .andReturn(Futures.immediateFuture(checkpoints1))
+ .times(1);
+ expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"),
anyBoolean()))
+ .andReturn(Futures.immediateFuture(checkpoints2))
+ .times(1);
taskRunner.registerListener(anyObject(TaskRunnerListener.class),
anyObject(Executor.class));
taskQueue.shutdown("id4");
@@ -765,10 +779,12 @@ public void testRequeueTaskWhenFailed() throws Exception
checkpoints1.put(0, ImmutableMap.of(0, 0L, 2, 0L));
TreeMap<Integer, Map<Integer, Long>> checkpoints2 = new TreeMap<>();
checkpoints2.put(0, ImmutableMap.of(1, 0L));
- expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"),
anyBoolean())).andReturn(Futures.immediateFuture(checkpoints1))
-
.anyTimes();
- expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"),
anyBoolean())).andReturn(Futures.immediateFuture(checkpoints2))
-
.anyTimes();
+ expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"),
anyBoolean()))
+ .andReturn(Futures.immediateFuture(checkpoints1))
+ .anyTimes();
+ expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"),
anyBoolean()))
+ .andReturn(Futures.immediateFuture(checkpoints2))
+ .anyTimes();
taskRunner.registerListener(anyObject(TaskRunnerListener.class),
anyObject(Executor.class));
replayAll();
@@ -830,7 +846,7 @@ public void testRequeueAdoptedTaskWhenFailed() throws
Exception
Task id1 = createKafkaIndexTask(
"id1",
DATASOURCE,
- "sequenceName-0",
+ 0,
new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 2, 0L)),
new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 2,
Long.MAX_VALUE)),
now,
@@ -857,7 +873,9 @@ public void testRequeueAdoptedTaskWhenFailed() throws
Exception
TreeMap<Integer, Map<Integer, Long>> checkpoints = new TreeMap<>();
checkpoints.put(0, ImmutableMap.of(0, 0L, 2, 0L));
- expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"),
anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(2);
+ expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"),
anyBoolean()))
+ .andReturn(Futures.immediateFuture(checkpoints))
+ .times(2);
taskRunner.registerListener(anyObject(TaskRunnerListener.class),
anyObject(Executor.class));
replayAll();
@@ -878,9 +896,12 @@ public void testRequeueAdoptedTaskWhenFailed() throws
Exception
reset(taskClient);
// for the newly created replica task
- expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"),
anyBoolean())).andReturn(Futures.immediateFuture(checkpoints))
-
.times(2);
- expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"),
anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(1);
+ expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"),
anyBoolean()))
+ .andReturn(Futures.immediateFuture(checkpoints))
+ .times(2);
+ expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"),
anyBoolean()))
+ .andReturn(Futures.immediateFuture(checkpoints))
+ .times(1);
expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of(captured.getValue())).anyTimes();
expect(taskStorage.getStatus(iHaveFailed.getId())).andReturn(Optional.of(TaskStatus.failure(iHaveFailed.getId())));
@@ -953,10 +974,12 @@ public void testQueueNextTasksOnSuccess() throws Exception
TreeMap<Integer, Map<Integer, Long>> checkpoints2 = new TreeMap<>();
checkpoints2.put(0, ImmutableMap.of(1, 0L));
// there would be 4 tasks, 2 for each task group
- expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"),
anyBoolean())).andReturn(Futures.immediateFuture(checkpoints1))
-
.times(2);
- expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"),
anyBoolean())).andReturn(Futures.immediateFuture(checkpoints2))
-
.times(2);
+ expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"),
anyBoolean()))
+ .andReturn(Futures.immediateFuture(checkpoints1))
+ .times(2);
+ expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"),
anyBoolean()))
+ .andReturn(Futures.immediateFuture(checkpoints2))
+ .times(2);
expect(taskStorage.getActiveTasks()).andReturn(tasks).anyTimes();
for (Task task : tasks) {
@@ -1063,10 +1086,12 @@ public void testBeginPublishAndQueueNextTasks() throws
Exception
checkpoints1.put(0, ImmutableMap.of(0, 0L, 2, 0L));
TreeMap<Integer, Map<Integer, Long>> checkpoints2 = new TreeMap<>();
checkpoints2.put(0, ImmutableMap.of(1, 0L));
- expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"),
anyBoolean())).andReturn(Futures.immediateFuture(checkpoints1))
-
.times(2);
- expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"),
anyBoolean())).andReturn(Futures.immediateFuture(checkpoints2))
-
.times(2);
+ expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"),
anyBoolean()))
+ .andReturn(Futures.immediateFuture(checkpoints1))
+ .times(2);
+ expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"),
anyBoolean()))
+ .andReturn(Futures.immediateFuture(checkpoints2))
+ .times(2);
replay(taskStorage, taskRunner, taskClient, taskQueue);
@@ -1100,7 +1125,7 @@ public void testDiscoverExistingPublishingTask() throws
Exception
Task task = createKafkaIndexTask(
"id1",
DATASOURCE,
- "sequenceName-0",
+ 0,
new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 1, 0L, 2, 0L)),
new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1,
Long.MAX_VALUE, 2, Long.MAX_VALUE)),
null,
@@ -1192,7 +1217,7 @@ public void
testDiscoverExistingPublishingTaskWithDifferentPartitionAllocation()
Task task = createKafkaIndexTask(
"id1",
DATASOURCE,
- "sequenceName-0",
+ 0,
new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 2, 0L)),
new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 2,
Long.MAX_VALUE)),
null,
@@ -1282,7 +1307,7 @@ public void
testDiscoverExistingPublishingAndReadingTask() throws Exception
Task id1 = createKafkaIndexTask(
"id1",
DATASOURCE,
- "sequenceName-0",
+ 0,
new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 1, 0L, 2, 0L)),
new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1,
Long.MAX_VALUE, 2, Long.MAX_VALUE)),
null,
@@ -1292,7 +1317,7 @@ public void
testDiscoverExistingPublishingAndReadingTask() throws Exception
Task id2 = createKafkaIndexTask(
"id2",
DATASOURCE,
- "sequenceName-0",
+ 0,
new KafkaPartitions("topic", ImmutableMap.of(0, 1L, 1, 2L, 2, 3L)),
new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1,
Long.MAX_VALUE, 2, Long.MAX_VALUE)),
null,
@@ -1330,7 +1355,9 @@ public void
testDiscoverExistingPublishingAndReadingTask() throws Exception
// since id1 is publishing, so getCheckpoints wouldn't be called for it
TreeMap<Integer, Map<Integer, Long>> checkpoints = new TreeMap<>();
checkpoints.put(0, ImmutableMap.of(0, 1L, 1, 2L, 2, 3L));
- expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"),
anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(1);
+ expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"),
anyBoolean()))
+ .andReturn(Futures.immediateFuture(checkpoints))
+ .times(1);
replayAll();
@@ -1404,10 +1431,12 @@ public void
testKillUnresponsiveTasksWhileGettingStartTime() throws Exception
checkpoints1.put(0, ImmutableMap.of(0, 0L, 2, 0L));
TreeMap<Integer, Map<Integer, Long>> checkpoints2 = new TreeMap<>();
checkpoints2.put(0, ImmutableMap.of(1, 0L));
- expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"),
anyBoolean())).andReturn(Futures.immediateFuture(checkpoints1))
-
.times(2);
- expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"),
anyBoolean())).andReturn(Futures.immediateFuture(checkpoints2))
-
.times(2);
+ expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"),
anyBoolean()))
+ .andReturn(Futures.immediateFuture(checkpoints1))
+ .times(2);
+ expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"),
anyBoolean()))
+ .andReturn(Futures.immediateFuture(checkpoints2))
+ .times(2);
expect(taskStorage.getActiveTasks()).andReturn(tasks).anyTimes();
for (Task task : tasks) {
@@ -1463,10 +1492,12 @@ public void testKillUnresponsiveTasksWhilePausing()
throws Exception
checkpoints1.put(0, ImmutableMap.of(0, 0L, 2, 0L));
TreeMap<Integer, Map<Integer, Long>> checkpoints2 = new TreeMap<>();
checkpoints2.put(0, ImmutableMap.of(1, 0L));
- expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"),
anyBoolean())).andReturn(Futures.immediateFuture(checkpoints1))
-
.times(2);
- expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"),
anyBoolean())).andReturn(Futures.immediateFuture(checkpoints2))
-
.times(2);
+ expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"),
anyBoolean()))
+ .andReturn(Futures.immediateFuture(checkpoints1))
+ .times(2);
+ expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"),
anyBoolean()))
+ .andReturn(Futures.immediateFuture(checkpoints2))
+ .times(2);
captured = Capture.newInstance(CaptureType.ALL);
expect(taskStorage.getActiveTasks()).andReturn(tasks).anyTimes();
@@ -1540,10 +1571,12 @@ public void
testKillUnresponsiveTasksWhileSettingEndOffsets() throws Exception
checkpoints1.put(0, ImmutableMap.of(0, 0L, 2, 0L));
TreeMap<Integer, Map<Integer, Long>> checkpoints2 = new TreeMap<>();
checkpoints2.put(0, ImmutableMap.of(1, 0L));
- expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"),
anyBoolean())).andReturn(Futures.immediateFuture(checkpoints1))
-
.times(2);
- expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"),
anyBoolean())).andReturn(Futures.immediateFuture(checkpoints2))
-
.times(2);
+ expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"),
anyBoolean()))
+ .andReturn(Futures.immediateFuture(checkpoints1))
+ .times(2);
+ expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"),
anyBoolean()))
+ .andReturn(Futures.immediateFuture(checkpoints2))
+ .times(2);
captured = Capture.newInstance(CaptureType.ALL);
expect(taskStorage.getActiveTasks()).andReturn(tasks).anyTimes();
@@ -1622,7 +1655,7 @@ public void testStopGracefully() throws Exception
Task id1 = createKafkaIndexTask(
"id1",
DATASOURCE,
- "sequenceName-0",
+ 0,
new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 1, 0L, 2, 0L)),
new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1,
Long.MAX_VALUE, 2, Long.MAX_VALUE)),
null,
@@ -1632,7 +1665,7 @@ public void testStopGracefully() throws Exception
Task id2 = createKafkaIndexTask(
"id2",
DATASOURCE,
- "sequenceName-0",
+ 0,
new KafkaPartitions("topic", ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)),
new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1,
Long.MAX_VALUE, 2, Long.MAX_VALUE)),
null,
@@ -1642,7 +1675,7 @@ public void testStopGracefully() throws Exception
Task id3 = createKafkaIndexTask(
"id3",
DATASOURCE,
- "sequenceName-0",
+ 0,
new KafkaPartitions("topic", ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)),
new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1,
Long.MAX_VALUE, 2, Long.MAX_VALUE)),
null,
@@ -1678,8 +1711,12 @@ public void testStopGracefully() throws Exception
// getCheckpoints will not be called for id1 as it is in publishing state
TreeMap<Integer, Map<Integer, Long>> checkpoints = new TreeMap<>();
checkpoints.put(0, ImmutableMap.of(0, 10L, 1, 20L, 2, 30L));
- expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"),
anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(1);
- expect(taskClient.getCheckpointsAsync(EasyMock.contains("id3"),
anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(1);
+ expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"),
anyBoolean()))
+ .andReturn(Futures.immediateFuture(checkpoints))
+ .times(1);
+ expect(taskClient.getCheckpointsAsync(EasyMock.contains("id3"),
anyBoolean()))
+ .andReturn(Futures.immediateFuture(checkpoints))
+ .times(1);
taskRunner.registerListener(anyObject(TaskRunnerListener.class),
anyObject(Executor.class));
replayAll();
@@ -1824,7 +1861,7 @@ public void testResetRunningTasks() throws Exception
Task id1 = createKafkaIndexTask(
"id1",
DATASOURCE,
- "sequenceName-0",
+ 0,
new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 1, 0L, 2, 0L)),
new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1,
Long.MAX_VALUE, 2, Long.MAX_VALUE)),
null,
@@ -1834,7 +1871,7 @@ public void testResetRunningTasks() throws Exception
Task id2 = createKafkaIndexTask(
"id2",
DATASOURCE,
- "sequenceName-0",
+ 0,
new KafkaPartitions("topic", ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)),
new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1,
Long.MAX_VALUE, 2, Long.MAX_VALUE)),
null,
@@ -1844,7 +1881,7 @@ public void testResetRunningTasks() throws Exception
Task id3 = createKafkaIndexTask(
"id3",
DATASOURCE,
- "sequenceName-0",
+ 0,
new KafkaPartitions("topic", ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)),
new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1,
Long.MAX_VALUE, 2, Long.MAX_VALUE)),
null,
@@ -1879,8 +1916,12 @@ public void testResetRunningTasks() throws Exception
TreeMap<Integer, Map<Integer, Long>> checkpoints = new TreeMap<>();
checkpoints.put(0, ImmutableMap.of(0, 10L, 1, 20L, 2, 30L));
- expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"),
anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(1);
- expect(taskClient.getCheckpointsAsync(EasyMock.contains("id3"),
anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(1);
+ expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"),
anyBoolean()))
+ .andReturn(Futures.immediateFuture(checkpoints))
+ .times(1);
+ expect(taskClient.getCheckpointsAsync(EasyMock.contains("id3"),
anyBoolean()))
+ .andReturn(Futures.immediateFuture(checkpoints))
+ .times(1);
taskRunner.registerListener(anyObject(TaskRunnerListener.class),
anyObject(Executor.class));
replayAll();
@@ -1908,7 +1949,7 @@ public void testNoDataIngestionTasks() throws Exception
Task id1 = createKafkaIndexTask(
"id1",
DATASOURCE,
- "sequenceName-0",
+ 0,
new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 1, 0L, 2, 0L)),
new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1,
Long.MAX_VALUE, 2, Long.MAX_VALUE)),
null,
@@ -1918,7 +1959,7 @@ public void testNoDataIngestionTasks() throws Exception
Task id2 = createKafkaIndexTask(
"id2",
DATASOURCE,
- "sequenceName-0",
+ 0,
new KafkaPartitions("topic", ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)),
new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1,
Long.MAX_VALUE, 2, Long.MAX_VALUE)),
null,
@@ -1928,7 +1969,7 @@ public void testNoDataIngestionTasks() throws Exception
Task id3 = createKafkaIndexTask(
"id3",
DATASOURCE,
- "sequenceName-0",
+ 0,
new KafkaPartitions("topic", ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)),
new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1,
Long.MAX_VALUE, 2, Long.MAX_VALUE)),
null,
@@ -1958,9 +1999,15 @@ public void testNoDataIngestionTasks() throws Exception
TreeMap<Integer, Map<Integer, Long>> checkpoints = new TreeMap<>();
checkpoints.put(0, ImmutableMap.of(0, 10L, 1, 20L, 2, 30L));
- expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"),
anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(1);
- expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"),
anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(1);
- expect(taskClient.getCheckpointsAsync(EasyMock.contains("id3"),
anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(1);
+ expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"),
anyBoolean()))
+ .andReturn(Futures.immediateFuture(checkpoints))
+ .times(1);
+ expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"),
anyBoolean()))
+ .andReturn(Futures.immediateFuture(checkpoints))
+ .times(1);
+ expect(taskClient.getCheckpointsAsync(EasyMock.contains("id3"),
anyBoolean()))
+ .andReturn(Futures.immediateFuture(checkpoints))
+ .times(1);
taskRunner.registerListener(anyObject(TaskRunnerListener.class),
anyObject(Executor.class));
replayAll();
@@ -1980,6 +2027,172 @@ public void testNoDataIngestionTasks() throws Exception
verifyAll();
}
+ @Test(timeout = 60_000L)
+ public void testCheckpointForInactiveTaskGroup()
+ throws InterruptedException, ExecutionException, TimeoutException,
JsonProcessingException
+ {
+ supervisor = getSupervisor(2, 1, true, "PT1S", null, null, false);
+ //not adding any events
+ final Task id1 = createKafkaIndexTask(
+ "id1",
+ DATASOURCE,
+ 0,
+ new KafkaPartitions(topic, ImmutableMap.of(0, 0L, 1, 0L, 2, 0L)),
+ new KafkaPartitions(topic, ImmutableMap.of(0, Long.MAX_VALUE, 1,
Long.MAX_VALUE, 2, Long.MAX_VALUE)),
+ null,
+ null
+ );
+
+ final Task id2 = createKafkaIndexTask(
+ "id2",
+ DATASOURCE,
+ 0,
+ new KafkaPartitions(topic, ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)),
+ new KafkaPartitions(topic, ImmutableMap.of(0, Long.MAX_VALUE, 1,
Long.MAX_VALUE, 2, Long.MAX_VALUE)),
+ null,
+ null
+ );
+
+ final Task id3 = createKafkaIndexTask(
+ "id3",
+ DATASOURCE,
+ 0,
+ new KafkaPartitions(topic, ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)),
+ new KafkaPartitions(topic, ImmutableMap.of(0, Long.MAX_VALUE, 1,
Long.MAX_VALUE, 2, Long.MAX_VALUE)),
+ null,
+ null
+ );
+
+
expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
+
expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
+ expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of(id1, id2,
id3)).anyTimes();
+
expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes();
+
expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes();
+
expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes();
+ expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes();
+ expect(taskStorage.getTask("id2")).andReturn(Optional.of(id2)).anyTimes();
+ expect(taskStorage.getTask("id3")).andReturn(Optional.of(id3)).anyTimes();
+ expect(
+
indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn(new
KafkaDataSourceMetadata(null)
+ ).anyTimes();
+
expect(taskClient.getStatusAsync("id1")).andReturn(Futures.immediateFuture(KafkaIndexTask.Status.READING));
+
expect(taskClient.getStatusAsync("id2")).andReturn(Futures.immediateFuture(KafkaIndexTask.Status.READING));
+
expect(taskClient.getStatusAsync("id3")).andReturn(Futures.immediateFuture(KafkaIndexTask.Status.READING));
+
+ final DateTime startTime = DateTimes.nowUtc();
+
expect(taskClient.getStartTimeAsync("id1")).andReturn(Futures.immediateFuture(startTime));
+
expect(taskClient.getStartTimeAsync("id2")).andReturn(Futures.immediateFuture(startTime));
+
expect(taskClient.getStartTimeAsync("id3")).andReturn(Futures.immediateFuture(startTime));
+
+ final TreeMap<Integer, Map<Integer, Long>> checkpoints = new TreeMap<>();
+ checkpoints.put(0, ImmutableMap.of(0, 10L, 1, 20L, 2, 30L));
+ expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"),
anyBoolean()))
+ .andReturn(Futures.immediateFuture(checkpoints))
+ .times(1);
+ expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"),
anyBoolean()))
+ .andReturn(Futures.immediateFuture(checkpoints))
+ .times(1);
+ expect(taskClient.getCheckpointsAsync(EasyMock.contains("id3"),
anyBoolean()))
+ .andReturn(Futures.immediateFuture(checkpoints))
+ .times(1);
+
+ taskRunner.registerListener(anyObject(TaskRunnerListener.class),
anyObject(Executor.class));
+ replayAll();
+
+ supervisor.start();
+ supervisor.runInternal();
+
+ final Map<Integer, Long> fakeCheckpoints = Collections.emptyMap();
+ supervisor.moveTaskGroupToPendingCompletion(0);
+ supervisor.checkpoint(
+ 0,
+ new KafkaDataSourceMetadata(new KafkaPartitions(topic,
checkpoints.get(0))),
+ new KafkaDataSourceMetadata(new KafkaPartitions(topic,
fakeCheckpoints))
+ );
+
+ while (supervisor.getNoticesQueueSize() > 0) {
+ Thread.sleep(100);
+ }
+
+ verifyAll();
+
+ Assert.assertNull(serviceEmitter.getStackTrace());
+ Assert.assertNull(serviceEmitter.getExceptionMessage());
+ Assert.assertNull(serviceEmitter.getExceptionClass());
+ }
+
+ @Test(timeout = 60_000L)
+ public void testCheckpointForUnknownTaskGroup() throws InterruptedException
+ {
+ supervisor = getSupervisor(2, 1, true, "PT1S", null, null, false);
+ //not adding any events
+ final Task id1 = createKafkaIndexTask(
+ "id1",
+ DATASOURCE,
+ 0,
+ new KafkaPartitions(topic, ImmutableMap.of(0, 0L, 1, 0L, 2, 0L)),
+ new KafkaPartitions(topic, ImmutableMap.of(0, Long.MAX_VALUE, 1,
Long.MAX_VALUE, 2, Long.MAX_VALUE)),
+ null,
+ null
+ );
+
+ final Task id2 = createKafkaIndexTask(
+ "id2",
+ DATASOURCE,
+ 0,
+ new KafkaPartitions(topic, ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)),
+ new KafkaPartitions(topic, ImmutableMap.of(0, Long.MAX_VALUE, 1,
Long.MAX_VALUE, 2, Long.MAX_VALUE)),
+ null,
+ null
+ );
+
+ final Task id3 = createKafkaIndexTask(
+ "id3",
+ DATASOURCE,
+ 0,
+ new KafkaPartitions(topic, ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)),
+ new KafkaPartitions(topic, ImmutableMap.of(0, Long.MAX_VALUE, 1,
Long.MAX_VALUE, 2, Long.MAX_VALUE)),
+ null,
+ null
+ );
+
+
expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
+
expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
+ expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of(id1, id2,
id3)).anyTimes();
+
expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes();
+
expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes();
+
expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes();
+ expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes();
+ expect(taskStorage.getTask("id2")).andReturn(Optional.of(id2)).anyTimes();
+ expect(taskStorage.getTask("id3")).andReturn(Optional.of(id3)).anyTimes();
+ expect(
+
indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn(new
KafkaDataSourceMetadata(null)
+ ).anyTimes();
+
+ replayAll();
+
+ supervisor.start();
+
+ supervisor.checkpoint(
+ 0,
+ new KafkaDataSourceMetadata(new KafkaPartitions(topic,
Collections.emptyMap())),
+ new KafkaDataSourceMetadata(new KafkaPartitions(topic,
Collections.emptyMap()))
+ );
+
+ while (supervisor.getNoticesQueueSize() > 0) {
+ Thread.sleep(100);
+ }
+
+ verifyAll();
+
+ Assert.assertNotNull(serviceEmitter.getStackTrace());
+ Assert.assertEquals(
+ "WTH?! cannot find taskGroup [0] among all taskGroups [{}]",
+ serviceEmitter.getExceptionMessage()
+ );
+ Assert.assertEquals(ISE.class, serviceEmitter.getExceptionClass());
+ }
+
private void addSomeEvents(int numEventsPerPartition) throws Exception
{
try (final KafkaProducer<byte[], byte[]> kafkaProducer =
kafkaServer.newProducer()) {
@@ -2106,7 +2319,7 @@ private static DataSchema getDataSchema(String dataSource)
private KafkaIndexTask createKafkaIndexTask(
String id,
String dataSource,
- String sequenceName,
+ int taskGroupId,
KafkaPartitions startPartitions,
KafkaPartitions endPartitions,
DateTime minimumMessageTime,
@@ -2119,7 +2332,8 @@ private KafkaIndexTask createKafkaIndexTask(
getDataSchema(dataSource),
tuningConfig,
new KafkaIOConfig(
- sequenceName,
+ taskGroupId,
+ "sequenceName-" + taskGroupId,
startPartitions,
endPartitions,
ImmutableMap.<String, String>of(),
@@ -2128,7 +2342,7 @@ private KafkaIndexTask createKafkaIndexTask(
maximumMessageTime,
false
),
- ImmutableMap.<String, Object>of(),
+ Collections.emptyMap(),
null,
null,
rowIngestionMetersFactory
diff --git
a/indexing-service/src/main/java/io/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java
b/indexing-service/src/main/java/io/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java
index 8265f87c7b9..f1d11deb4ea 100644
---
a/indexing-service/src/main/java/io/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java
+++
b/indexing-service/src/main/java/io/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java
@@ -21,27 +21,28 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.base.Preconditions;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.DataSourceMetadata;
public class CheckPointDataSourceMetadataAction implements TaskAction<Boolean>
{
private final String supervisorId;
- private final String sequenceName;
+ private final int taskGroupId;
private final DataSourceMetadata previousCheckPoint;
private final DataSourceMetadata currentCheckPoint;
public CheckPointDataSourceMetadataAction(
@JsonProperty("supervisorId") String supervisorId,
- @JsonProperty("sequenceName") String sequenceName,
+ @JsonProperty("taskGroupId") Integer taskGroupId,
@JsonProperty("previousCheckPoint") DataSourceMetadata
previousCheckPoint,
@JsonProperty("currentCheckPoint") DataSourceMetadata currentCheckPoint
)
{
- this.supervisorId = supervisorId;
- this.sequenceName = sequenceName;
- this.previousCheckPoint = previousCheckPoint;
- this.currentCheckPoint = currentCheckPoint;
+ this.supervisorId = Preconditions.checkNotNull(supervisorId,
"supervisorId");
+ this.taskGroupId = Preconditions.checkNotNull(taskGroupId, "taskGroupId");
+ this.previousCheckPoint = Preconditions.checkNotNull(previousCheckPoint,
"previousCheckPoint");
+ this.currentCheckPoint = Preconditions.checkNotNull(currentCheckPoint,
"currentCheckPoint");
}
@JsonProperty
@@ -51,9 +52,9 @@ public String getSupervisorId()
}
@JsonProperty
- public String getSequenceName()
+ public int getTaskGroupId()
{
- return sequenceName;
+ return taskGroupId;
}
@JsonProperty
@@ -81,8 +82,12 @@ public Boolean perform(
Task task, TaskActionToolbox toolbox
)
{
- return toolbox.getSupervisorManager()
- .checkPointDataSourceMetadata(supervisorId, sequenceName,
previousCheckPoint, currentCheckPoint);
+ return toolbox.getSupervisorManager().checkPointDataSourceMetadata(
+ supervisorId,
+ taskGroupId,
+ previousCheckPoint,
+ currentCheckPoint
+ );
}
@Override
@@ -96,7 +101,7 @@ public String toString()
{
return "CheckPointDataSourceMetadataAction{" +
"supervisorId='" + supervisorId + '\'' +
- ", sequenceName='" + sequenceName + '\'' +
+ ", taskGroupId='" + taskGroupId + '\'' +
", previousCheckPoint=" + previousCheckPoint +
", currentCheckPoint=" + currentCheckPoint +
'}';
diff --git
a/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java
b/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java
index dcdd014c95c..f9a55644432 100644
---
a/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java
+++
b/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java
@@ -165,9 +165,9 @@ public boolean resetSupervisor(String id, @Nullable
DataSourceMetadata dataSourc
public boolean checkPointDataSourceMetadata(
String supervisorId,
- @Nullable String sequenceName,
- @Nullable DataSourceMetadata previousDataSourceMetadata,
- @Nullable DataSourceMetadata currentDataSourceMetadata
+ int taskGroupId,
+ DataSourceMetadata previousDataSourceMetadata,
+ DataSourceMetadata currentDataSourceMetadata
)
{
try {
@@ -178,7 +178,7 @@ public boolean checkPointDataSourceMetadata(
Preconditions.checkNotNull(supervisor, "supervisor could not be found");
- supervisor.lhs.checkpoint(sequenceName, previousDataSourceMetadata,
currentDataSourceMetadata);
+ supervisor.lhs.checkpoint(taskGroupId, previousDataSourceMetadata,
currentDataSourceMetadata);
return true;
}
catch (Exception e) {
diff --git
a/java-util/src/main/java/io/druid/java/util/emitter/EmittingLogger.java
b/java-util/src/main/java/io/druid/java/util/emitter/EmittingLogger.java
index 0020cf1c79f..661ed17d3b9 100644
--- a/java-util/src/main/java/io/druid/java/util/emitter/EmittingLogger.java
+++ b/java-util/src/main/java/io/druid/java/util/emitter/EmittingLogger.java
@@ -35,6 +35,10 @@
*/
public class EmittingLogger extends Logger
{
+ public static final String EXCEPTION_TYPE_KEY = "exceptionType";
+ public static final String EXCEPTION_MESSAGE_KEY = "exceptionMessage";
+ public static final String EXCEPTION_STACK_TRACE_KEY = "exceptionStackTrace";
+
private static volatile ServiceEmitter emitter = null;
private final String className;
diff --git
a/server/src/main/java/io/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java
b/server/src/main/java/io/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java
index 26ed99cd543..0408104cde8 100644
---
a/server/src/main/java/io/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java
+++
b/server/src/main/java/io/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java
@@ -83,9 +83,9 @@ public void reset(DataSourceMetadata dataSourceMetadata) {}
@Override
public void checkpoint(
- @Nullable String sequenceName,
- @Nullable DataSourceMetadata previousCheckPoint,
- @Nullable DataSourceMetadata currentCheckPoint
+ int taskGroupId,
+ DataSourceMetadata previousCheckPoint,
+ DataSourceMetadata currentCheckPoint
)
{
diff --git
a/server/src/main/java/io/druid/indexing/overlord/supervisor/Supervisor.java
b/server/src/main/java/io/druid/indexing/overlord/supervisor/Supervisor.java
index 5afe9122991..04afac7aea6 100644
--- a/server/src/main/java/io/druid/indexing/overlord/supervisor/Supervisor.java
+++ b/server/src/main/java/io/druid/indexing/overlord/supervisor/Supervisor.java
@@ -22,7 +22,6 @@
import com.google.common.collect.ImmutableMap;
import io.druid.indexing.overlord.DataSourceMetadata;
-import javax.annotation.Nullable;
import java.util.Map;
public interface Supervisor
@@ -52,13 +51,9 @@
* for example - Kafka Supervisor uses this to merge and handoff segments
containing at least the data
* represented by {@param currentCheckpoint} DataSourceMetadata
*
- * @param sequenceName unique Identifier to figure out for which
sequence to do checkpointing
+ * @param taskGroupId unique Identifier to figure out for which
sequence to do checkpointing
* @param previousCheckPoint DataSourceMetadata checkpointed in previous call
* @param currentCheckPoint current DataSourceMetadata to be checkpointed
*/
- void checkpoint(
- @Nullable String sequenceName,
- @Nullable DataSourceMetadata previousCheckPoint,
- @Nullable DataSourceMetadata currentCheckPoint
- );
+ void checkpoint(int taskGroupId, DataSourceMetadata previousCheckPoint,
DataSourceMetadata currentCheckPoint);
}
diff --git
a/server/src/test/java/io/druid/server/metrics/ExceptionCapturingServiceEmitter.java
b/server/src/test/java/io/druid/server/metrics/ExceptionCapturingServiceEmitter.java
new file mode 100644
index 00000000000..cc217c6f5b0
--- /dev/null
+++
b/server/src/test/java/io/druid/server/metrics/ExceptionCapturingServiceEmitter.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package io.druid.server.metrics;
+
+import io.druid.java.util.emitter.EmittingLogger;
+import io.druid.java.util.emitter.core.Event;
+import io.druid.java.util.emitter.service.ServiceEmitter;
+
+import javax.annotation.Nullable;
+import java.util.Map;
+
+public class ExceptionCapturingServiceEmitter extends ServiceEmitter
+{
+ private volatile Class exceptionClass;
+ private volatile String exceptionMessage;
+ private volatile String stackTrace;
+
+ public ExceptionCapturingServiceEmitter()
+ {
+ super("", "", null);
+ }
+
+ @Override
+ public void emit(Event event)
+ {
+ //noinspection unchecked
+ final Map<String, Object> dataMap = (Map<String, Object>)
event.toMap().get("data");
+ final Class exceptionClass = (Class)
dataMap.get(EmittingLogger.EXCEPTION_TYPE_KEY);
+ if (exceptionClass != null) {
+ final String exceptionMessage = (String)
dataMap.get(EmittingLogger.EXCEPTION_MESSAGE_KEY);
+ final String stackTrace = (String)
dataMap.get(EmittingLogger.EXCEPTION_STACK_TRACE_KEY);
+ this.exceptionClass = exceptionClass;
+ this.exceptionMessage = exceptionMessage;
+ this.stackTrace = stackTrace;
+ }
+ }
+
+ @Nullable
+ public Class getExceptionClass()
+ {
+ return exceptionClass;
+ }
+
+ @Nullable
+ public String getExceptionMessage()
+ {
+ return exceptionMessage;
+ }
+
+ @Nullable
+ public String getStackTrace()
+ {
+ return stackTrace;
+ }
+}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]