This is an automated email from the ASF dual-hosted git repository. cwylie pushed a commit to branch 0.14.1-incubating in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
commit 01aa1666dd7615edf458ffeee6d365072349c292 Author: Gian Merlino <[email protected]> AuthorDate: Thu Apr 4 13:25:18 2019 -0400 Fix two issues with Coordinator -> Overlord communication. (#7412) * Fix two issues with Coordinator -> Overlord communication. 1) ClientCompactQuery needs to recognize the potential for 'intervals' to be set instead of 'segments'. The lack of this led to a NullPointerException on DruidCoordinatorSegmentCompactor.java:102. 2) In two locations (DruidCoordinatorSegmentCompactor, DruidCoordinatorCleanupPendingSegments) tasks were being retrieved using waiting/pending/running tasks in the wrong order: by checking 'running' first and then 'pending', tasks could be missed if they moved from 'pending' to 'running' in between the two calls. Replaced these methods with calls to 'getActiveTasks', a new method that does the calls in the right order. * Remove unused import. --- .../druid/client/indexing/ClientCompactQuery.java | 47 +++++++++++++++++++++- .../client/indexing/HttpIndexingServiceClient.java | 43 +++++++++++++------- .../client/indexing/IndexingServiceClient.java | 9 ++--- .../DruidCoordinatorCleanupPendingSegments.java | 20 +-------- .../helper/DruidCoordinatorSegmentCompactor.java | 38 ++++++++--------- .../client/indexing/NoopIndexingServiceClient.java | 17 ++------ .../DruidCoordinatorSegmentCompactorTest.java | 14 +------ 7 files changed, 103 insertions(+), 85 deletions(-) diff --git a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQuery.java b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQuery.java index f32b11d..b4bbd1a 100644 --- a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQuery.java +++ b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQuery.java @@ -22,15 +22,18 @@ package org.apache.druid.client.indexing; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.druid.timeline.DataSegment; +import org.joda.time.Interval; import javax.annotation.Nullable; import java.util.List; import java.util.Map; +import java.util.Objects; public class ClientCompactQuery implements ClientQuery { private final String dataSource; private final List<DataSegment> segments; + private final Interval interval; private final boolean keepSegmentGranularity; @Nullable private final Long targetCompactionSizeBytes; @@ -40,7 +43,8 @@ public class ClientCompactQuery implements ClientQuery @JsonCreator public ClientCompactQuery( @JsonProperty("dataSource") String dataSource, - @JsonProperty("segments") List<DataSegment> segments, + @Nullable @JsonProperty("interval") final Interval interval, + @Nullable @JsonProperty("segments") final List<DataSegment> segments, @JsonProperty("keepSegmentGranularity") boolean keepSegmentGranularity, @JsonProperty("targetCompactionSizeBytes") @Nullable Long targetCompactionSizeBytes, @JsonProperty("tuningConfig") ClientCompactQueryTuningConfig tuningConfig, @@ -49,6 +53,7 @@ public class ClientCompactQuery implements ClientQuery { this.dataSource = dataSource; this.segments = segments; + this.interval = interval; this.keepSegmentGranularity = keepSegmentGranularity; this.targetCompactionSizeBytes = targetCompactionSizeBytes; this.tuningConfig = tuningConfig; @@ -76,6 +81,12 @@ public class ClientCompactQuery implements ClientQuery } @JsonProperty + public Interval getInterval() + { + return interval; + } + + @JsonProperty public boolean isKeepSegmentGranularity() { return keepSegmentGranularity; @@ -101,11 +112,45 @@ public class ClientCompactQuery implements ClientQuery } @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ClientCompactQuery that = (ClientCompactQuery) o; + return keepSegmentGranularity == that.keepSegmentGranularity && + Objects.equals(dataSource, that.dataSource) && + Objects.equals(segments, that.segments) && + Objects.equals(interval, that.interval) && + Objects.equals(targetCompactionSizeBytes, that.targetCompactionSizeBytes) && + Objects.equals(tuningConfig, that.tuningConfig) && + Objects.equals(context, that.context); + } + + @Override + public int hashCode() + { + return Objects.hash( + dataSource, + segments, + interval, + keepSegmentGranularity, + targetCompactionSizeBytes, + tuningConfig, + context + ); + } + + @Override public String toString() { return "ClientCompactQuery{" + "dataSource='" + dataSource + '\'' + ", segments=" + segments + + ", interval=" + interval + ", keepSegmentGranularity=" + keepSegmentGranularity + ", targetCompactionSizeBytes=" + targetCompactionSizeBytes + ", tuningConfig=" + tuningConfig + diff --git a/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java b/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java index 2d8c4dc..8cda53c 100644 --- a/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java +++ b/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java @@ -23,7 +23,7 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; import com.google.common.base.Strings; -import com.google.common.base.Throwables; +import com.google.common.collect.Iterables; import com.google.inject.Inject; import org.apache.druid.discovery.DruidLeaderClient; import org.apache.druid.indexer.TaskStatusPlus; @@ -42,11 +42,14 @@ import org.joda.time.Interval; import javax.annotation.Nullable; import javax.ws.rs.core.MediaType; import java.io.IOException; +import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; public class HttpIndexingServiceClient implements IndexingServiceClient { @@ -112,6 +115,7 @@ public class HttpIndexingServiceClient implements IndexingServiceClient return runTask( new ClientCompactQuery( dataSource, + null, segments, keepSegmentGranularity, targetCompactionSizeBytes, @@ -150,7 +154,7 @@ public class HttpIndexingServiceClient implements IndexingServiceClient return Preconditions.checkNotNull(taskId, "Null task id for task[%s]", taskObject); } catch (Exception e) { - throw Throwables.propagate(e); + throw new RuntimeException(e); } } @@ -184,7 +188,7 @@ public class HttpIndexingServiceClient implements IndexingServiceClient return killedTaskId; } catch (Exception e) { - throw Throwables.propagate(e); + throw new RuntimeException(e); } } @@ -217,21 +221,30 @@ public class HttpIndexingServiceClient implements IndexingServiceClient } @Override - public List<TaskStatusPlus> getRunningTasks() + public List<TaskStatusPlus> getActiveTasks() { - return getTasks("runningTasks"); - } + // Must retrieve waiting, then pending, then running, so if tasks move from one state to the next between + // calls then we still catch them. (Tasks always go waiting -> pending -> running.) + // + // Consider switching to new-style /druid/indexer/v1/tasks API in the future. + final List<TaskStatusPlus> tasks = new ArrayList<>(); + final Set<String> taskIdsSeen = new HashSet<>(); + + final Iterable<TaskStatusPlus> activeTasks = Iterables.concat( + getTasks("waitingTasks"), + getTasks("pendingTasks"), + getTasks("runningTasks") + ); - @Override - public List<TaskStatusPlus> getPendingTasks() - { - return getTasks("pendingTasks"); - } + for (TaskStatusPlus task : activeTasks) { + // Use taskIdsSeen to prevent returning the same task ID more than once (if it hops from 'pending' to 'running', + // for example, and we see it twice.) + if (taskIdsSeen.add(task.getId())) { + tasks.add(task); + } + } - @Override - public List<TaskStatusPlus> getWaitingTasks() - { - return getTasks("waitingTasks"); + return tasks; } private List<TaskStatusPlus> getTasks(String endpointSuffix) diff --git a/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java b/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java index 905b810..4a2c0af 100644 --- a/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java +++ b/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java @@ -51,11 +51,10 @@ public interface IndexingServiceClient String killTask(String taskId); - List<TaskStatusPlus> getRunningTasks(); - - List<TaskStatusPlus> getPendingTasks(); - - List<TaskStatusPlus> getWaitingTasks(); + /** + * Gets all tasks that are waiting, pending, or running. + */ + List<TaskStatusPlus> getActiveTasks(); TaskStatusResponse getTaskStatus(String taskId); diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorCleanupPendingSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorCleanupPendingSegments.java index a25ea07..bf4ffde 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorCleanupPendingSegments.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorCleanupPendingSegments.java @@ -52,27 +52,11 @@ public class DruidCoordinatorCleanupPendingSegments implements DruidCoordinatorH final List<DateTime> createdTimes = new ArrayList<>(); createdTimes.add( indexingServiceClient - .getRunningTasks() + .getActiveTasks() .stream() .map(TaskStatusPlus::getCreatedTime) .min(Comparators.naturalNullsFirst()) - .orElse(DateTimes.nowUtc()) // If there is no running tasks, this returns the current time. - ); - createdTimes.add( - indexingServiceClient - .getPendingTasks() - .stream() - .map(TaskStatusPlus::getCreatedTime) - .min(Comparators.naturalNullsFirst()) - .orElse(DateTimes.nowUtc()) // If there is no pending tasks, this returns the current time. - ); - createdTimes.add( - indexingServiceClient - .getWaitingTasks() - .stream() - .map(TaskStatusPlus::getCreatedTime) - .min(Comparators.naturalNullsFirst()) - .orElse(DateTimes.nowUtc()) // If there is no waiting tasks, this returns the current time. + .orElse(DateTimes.nowUtc()) // If there are no active tasks, this returns the current time. ); final TaskStatusPlus completeTaskStatus = indexingServiceClient.getLastCompleteTask(); diff --git a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java index 0258cd0..d22f92a 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java @@ -41,7 +41,6 @@ import org.joda.time.Interval; import javax.annotation.Nullable; import java.util.ArrayList; -import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -84,11 +83,7 @@ public class DruidCoordinatorSegmentCompactor implements DruidCoordinatorHelper Map<String, DataSourceCompactionConfig> compactionConfigs = compactionConfigList .stream() .collect(Collectors.toMap(DataSourceCompactionConfig::getDataSource, Function.identity())); - final List<TaskStatusPlus> compactTasks = filterNonCompactTasks( - indexingServiceClient.getRunningTasks(), - indexingServiceClient.getPendingTasks(), - indexingServiceClient.getWaitingTasks() - ); + final List<TaskStatusPlus> compactTasks = filterNonCompactTasks(indexingServiceClient.getActiveTasks()); // dataSource -> list of intervals of compact tasks final Map<String, List<Interval>> compactTaskIntervals = new HashMap<>(compactionConfigList.size()); for (TaskStatusPlus status : compactTasks) { @@ -98,13 +93,22 @@ public class DruidCoordinatorSegmentCompactor implements DruidCoordinatorHelper } if (COMPACT_TASK_TYPE.equals(response.getPayload().getType())) { final ClientCompactQuery compactQuery = (ClientCompactQuery) response.getPayload(); - final Interval interval = JodaUtils.umbrellaInterval( - compactQuery.getSegments() - .stream() - .map(DataSegment::getInterval) - .sorted(Comparators.intervalsByStartThenEnd()) - .collect(Collectors.toList()) - ); + final Interval interval; + + if (compactQuery.getSegments() != null) { + interval = JodaUtils.umbrellaInterval( + compactQuery.getSegments() + .stream() + .map(DataSegment::getInterval) + .sorted(Comparators.intervalsByStartThenEnd()) + .collect(Collectors.toList()) + ); + } else if (compactQuery.getInterval() != null) { + interval = compactQuery.getInterval(); + } else { + throw new ISE("task[%s] has neither 'segments' nor 'interval'", status.getId()); + } + compactTaskIntervals.computeIfAbsent(status.getDataSource(), k -> new ArrayList<>()).add(interval); } else { throw new ISE("WTH? task[%s] is not a compactTask?", status.getId()); @@ -146,13 +150,9 @@ public class DruidCoordinatorSegmentCompactor implements DruidCoordinatorHelper .build(); } - @SafeVarargs - private static List<TaskStatusPlus> filterNonCompactTasks(List<TaskStatusPlus>...taskStatusStreams) + private static List<TaskStatusPlus> filterNonCompactTasks(List<TaskStatusPlus> taskStatuses) { - final List<TaskStatusPlus> allTaskStatusPlus = new ArrayList<>(); - Arrays.stream(taskStatusStreams).forEach(allTaskStatusPlus::addAll); - - return allTaskStatusPlus + return taskStatuses .stream() .filter(status -> { final String taskType = status.getType(); diff --git a/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java b/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java index 91e1a8a..8ef5667 100644 --- a/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java +++ b/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java @@ -25,6 +25,7 @@ import org.joda.time.DateTime; import org.joda.time.Interval; import javax.annotation.Nullable; +import java.util.Collections; import java.util.List; import java.util.Map; @@ -80,21 +81,9 @@ public class NoopIndexingServiceClient implements IndexingServiceClient } @Override - public List<TaskStatusPlus> getRunningTasks() + public List<TaskStatusPlus> getActiveTasks() { - return null; - } - - @Override - public List<TaskStatusPlus> getPendingTasks() - { - return null; - } - - @Override - public List<TaskStatusPlus> getWaitingTasks() - { - return null; + return Collections.emptyList(); } @Override diff --git a/server/src/test/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactorTest.java index 52b78e2..66b73a7 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactorTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactorTest.java @@ -107,19 +107,7 @@ public class DruidCoordinatorSegmentCompactorTest } @Override - public List<TaskStatusPlus> getRunningTasks() - { - return Collections.emptyList(); - } - - @Override - public List<TaskStatusPlus> getPendingTasks() - { - return Collections.emptyList(); - } - - @Override - public List<TaskStatusPlus> getWaitingTasks() + public List<TaskStatusPlus> getActiveTasks() { return Collections.emptyList(); } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
