This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch 31.0.0
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/31.0.0 by this push:
new ccb7c2edd95 [Backport] Dart and security backports (#17249) (#17278)
(#17281) (#17282) (#17283) (#17277) (#17285)
ccb7c2edd95 is described below
commit ccb7c2edd959a26d7f96528a2779b80051815ac1
Author: Karan Kumar <[email protected]>
AuthorDate: Tue Oct 8 19:46:40 2024 +0530
[Backport] Dart and security backports (#17249) (#17278) (#17281) (#17282)
(#17283) (#17277) (#17285)
* MSQ: Allow for worker gaps. (#17277)
* DartSqlResource: Sort queries by start time. (#17282)
* DartSqlResource: Add controllerHost to GetQueriesResponse. (#17283)
* DartWorkerModule: Replace en dash with regular dash. (#17281)
* DartSqlResource: Return HTTP 202 on cancellation even if no such query.
(#17278)
* Upgraded Protobuf to 3.25.5 (#17249)
---------
Signed-off-by: dependabot[bot] <[email protected]>
Co-authored-by: dependabot[bot]
<49699333+dependabot[bot]@users.noreply.github.com>
(cherry picked from commit 7d9e6d36fddd7893825d1fa2f5da2e20f67c5de8)
---------
Co-authored-by: Gian Merlino <[email protected]>
Co-authored-by: Shivam Garg <[email protected]>
---
.../msq/dart/controller/ControllerHolder.java | 8 ++
.../msq/dart/controller/http/DartQueryInfo.java | 23 +++-
.../msq/dart/controller/http/DartSqlResource.java | 15 ++-
.../msq/dart/controller/sql/DartQueryMaker.java | 1 +
.../druid/msq/dart/guice/DartWorkerModule.java | 2 +-
.../druid/msq/input/stage/ReadablePartition.java | 12 ++
.../druid/msq/input/stage/ReadablePartitions.java | 33 ++++-
.../stage/SparseStripedReadablePartitions.java | 142 +++++++++++++++++++++
.../kernel/controller/ControllerStageTracker.java | 26 ++--
.../druid/msq/kernel/controller/WorkerInputs.java | 41 +++---
.../dart/controller/http/DartSqlResourceTest.java | 18 ++-
.../controller/http/GetQueriesResponseTest.java | 1 +
.../dart/controller/sql/DartSqlClientImplTest.java | 2 +
.../stage/CollectedReadablePartitionsTest.java | 12 +-
.../stage/CombinedReadablePartitionsTest.java | 2 +-
...va => SparseStripedReadablePartitionsTest.java} | 29 +++--
.../input/stage/StripedReadablePartitionsTest.java | 34 ++++-
.../msq/kernel/controller/WorkerInputsTest.java | 98 +++++++++++---
licenses.yaml | 4 +-
pom.xml | 2 +-
20 files changed, 419 insertions(+), 86 deletions(-)
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/ControllerHolder.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/ControllerHolder.java
index 9644444dad2..ca60ee3cbc1 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/ControllerHolder.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/ControllerHolder.java
@@ -59,6 +59,7 @@ public class ControllerHolder
private final ControllerContext controllerContext;
private final String sqlQueryId;
private final String sql;
+ private final String controllerHost;
private final AuthenticationResult authenticationResult;
private final DateTime startTime;
private final AtomicReference<State> state = new
AtomicReference<>(State.ACCEPTED);
@@ -68,6 +69,7 @@ public class ControllerHolder
final ControllerContext controllerContext,
final String sqlQueryId,
final String sql,
+ final String controllerHost,
final AuthenticationResult authenticationResult,
final DateTime startTime
)
@@ -76,6 +78,7 @@ public class ControllerHolder
this.controllerContext = controllerContext;
this.sqlQueryId = Preconditions.checkNotNull(sqlQueryId, "sqlQueryId");
this.sql = sql;
+ this.controllerHost = controllerHost;
this.authenticationResult = authenticationResult;
this.startTime = Preconditions.checkNotNull(startTime, "startTime");
}
@@ -95,6 +98,11 @@ public class ControllerHolder
return sql;
}
+ public String getControllerHost()
+ {
+ return controllerHost;
+ }
+
public AuthenticationResult getAuthenticationResult()
{
return authenticationResult;
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/http/DartQueryInfo.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/http/DartQueryInfo.java
index e5f3abb894e..2bc5d08704d 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/http/DartQueryInfo.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/http/DartQueryInfo.java
@@ -26,6 +26,7 @@ import com.google.common.base.Preconditions;
import org.apache.druid.msq.dart.controller.ControllerHolder;
import org.apache.druid.msq.util.MSQTaskQueryMakerUtils;
import org.apache.druid.query.QueryContexts;
+import org.apache.druid.server.DruidNode;
import org.joda.time.DateTime;
import java.util.Objects;
@@ -38,6 +39,7 @@ public class DartQueryInfo
private final String sqlQueryId;
private final String dartQueryId;
private final String sql;
+ private final String controllerHost;
private final String authenticator;
private final String identity;
private final DateTime startTime;
@@ -48,6 +50,7 @@ public class DartQueryInfo
@JsonProperty("sqlQueryId") final String sqlQueryId,
@JsonProperty("dartQueryId") final String dartQueryId,
@JsonProperty("sql") final String sql,
+ @JsonProperty("controllerHost") final String controllerHost,
@JsonProperty("authenticator") final String authenticator,
@JsonProperty("identity") final String identity,
@JsonProperty("startTime") final DateTime startTime,
@@ -57,6 +60,7 @@ public class DartQueryInfo
this.sqlQueryId = Preconditions.checkNotNull(sqlQueryId, "sqlQueryId");
this.dartQueryId = Preconditions.checkNotNull(dartQueryId, "dartQueryId");
this.sql = sql;
+ this.controllerHost = controllerHost;
this.authenticator = authenticator;
this.identity = identity;
this.startTime = startTime;
@@ -69,6 +73,7 @@ public class DartQueryInfo
holder.getSqlQueryId(),
holder.getController().queryId(),
MSQTaskQueryMakerUtils.maskSensitiveJsonKeys(holder.getSql()),
+ holder.getControllerHost(),
holder.getAuthenticationResult().getAuthenticatedBy(),
holder.getAuthenticationResult().getIdentity(),
holder.getStartTime(),
@@ -104,6 +109,16 @@ public class DartQueryInfo
return sql;
}
+ /**
+ * Controller host:port, from {@link DruidNode#getHostAndPortToUse()}, that
is executing this query.
+ */
+ @JsonProperty
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ public String getControllerHost()
+ {
+ return controllerHost;
+ }
+
/**
* Authenticator that authenticated the identity from {@link #getIdentity()}.
*/
@@ -145,7 +160,7 @@ public class DartQueryInfo
*/
public DartQueryInfo withoutAuthenticationResult()
{
- return new DartQueryInfo(sqlQueryId, dartQueryId, sql, null, null,
startTime, state);
+ return new DartQueryInfo(sqlQueryId, dartQueryId, sql, controllerHost,
null, null, startTime, state);
}
@Override
@@ -161,6 +176,7 @@ public class DartQueryInfo
return Objects.equals(sqlQueryId, that.sqlQueryId)
&& Objects.equals(dartQueryId, that.dartQueryId)
&& Objects.equals(sql, that.sql)
+ && Objects.equals(controllerHost, that.controllerHost)
&& Objects.equals(authenticator, that.authenticator)
&& Objects.equals(identity, that.identity)
&& Objects.equals(startTime, that.startTime)
@@ -170,7 +186,7 @@ public class DartQueryInfo
@Override
public int hashCode()
{
- return Objects.hash(sqlQueryId, dartQueryId, sql, authenticator, identity,
startTime, state);
+ return Objects.hash(sqlQueryId, dartQueryId, sql, controllerHost,
authenticator, identity, startTime, state);
}
@Override
@@ -180,10 +196,11 @@ public class DartQueryInfo
"sqlQueryId='" + sqlQueryId + '\'' +
", dartQueryId='" + dartQueryId + '\'' +
", sql='" + sql + '\'' +
+ ", controllerHost='" + controllerHost + '\'' +
", authenticator='" + authenticator + '\'' +
", identity='" + identity + '\'' +
", startTime=" + startTime +
- ", state=" + state +
+ ", state='" + state + '\'' +
'}';
}
}
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/http/DartSqlResource.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/http/DartSqlResource.java
index 37e9f105131..65d770a29c5 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/http/DartSqlResource.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/http/DartSqlResource.java
@@ -154,7 +154,6 @@ public class DartSqlResource extends SqlResource
controllerRegistry.getAllHolders()
.stream()
.map(DartQueryInfo::fromControllerHolder)
-
.sorted(Comparator.comparing(DartQueryInfo::getStartTime))
.collect(Collectors.toList());
// Add queries from all other servers, if "selfOnly" is not set.
@@ -172,6 +171,9 @@ public class DartSqlResource extends SqlResource
}
}
+ // Sort queries by start time, breaking ties by query ID, so the list
comes back in a consistent and nice order.
+
queries.sort(Comparator.comparing(DartQueryInfo::getStartTime).thenComparing(DartQueryInfo::getDartQueryId));
+
final GetQueriesResponse response;
if (stateReadAccess.isAllowed()) {
// User can READ STATE, so they can see all running queries, as well as
authentication details.
@@ -237,7 +239,10 @@ public class DartSqlResource extends SqlResource
List<SqlLifecycleManager.Cancelable> cancelables =
sqlLifecycleManager.getAll(sqlQueryId);
if (cancelables.isEmpty()) {
- return Response.status(Response.Status.NOT_FOUND).build();
+ // Return ACCEPTED even if the query wasn't found. When the Router
broadcasts cancellation requests to all
+ // Brokers, this ensures the user sees a successful request.
+ AuthorizationUtils.setRequestAuthorizationAttributeIfNeeded(req);
+ return Response.status(Response.Status.ACCEPTED).build();
}
final Access access = authorizeCancellation(req, cancelables);
@@ -247,14 +252,12 @@ public class DartSqlResource extends SqlResource
// Don't call cancel() on the cancelables. That just cancels native
queries, which is useless here. Instead,
// get the controller and stop it.
- boolean found = false;
for (SqlLifecycleManager.Cancelable cancelable : cancelables) {
final HttpStatement stmt = (HttpStatement) cancelable;
final Object dartQueryId =
stmt.context().get(DartSqlEngine.CTX_DART_QUERY_ID);
if (dartQueryId instanceof String) {
final ControllerHolder holder = controllerRegistry.get((String)
dartQueryId);
if (holder != null) {
- found = true;
holder.cancel();
}
} else {
@@ -267,7 +270,9 @@ public class DartSqlResource extends SqlResource
}
}
- return Response.status(found ? Response.Status.ACCEPTED :
Response.Status.NOT_FOUND).build();
+ // Return ACCEPTED even if the query wasn't found. When the Router
broadcasts cancellation requests to all
+ // Brokers, this ensures the user sees a successful request.
+ return Response.status(Response.Status.ACCEPTED).build();
} else {
return Response.status(Response.Status.FORBIDDEN).build();
}
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartQueryMaker.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartQueryMaker.java
index 37ed936a117..66686f7640f 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartQueryMaker.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartQueryMaker.java
@@ -154,6 +154,7 @@ public class DartQueryMaker implements QueryMaker
controllerContext,
plannerContext.getSqlQueryId(),
plannerContext.getSql(),
+ controllerContext.selfNode().getHostAndPortToUse(),
plannerContext.getAuthenticationResult(),
DateTimes.nowUtc()
);
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartWorkerModule.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartWorkerModule.java
index 15bc0e65299..e9bd59f53d8 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartWorkerModule.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartWorkerModule.java
@@ -113,7 +113,7 @@ public class DartWorkerModule implements DruidModule
final AuthorizerMapper authorizerMapper
)
{
- final ExecutorService exec =
Execs.multiThreaded(memoryIntrospector.numTasksInJvm(), "dart–worker-%s");
+ final ExecutorService exec =
Execs.multiThreaded(memoryIntrospector.numTasksInJvm(), "dart-worker-%s");
final File baseTempDir =
new File(processingConfig.getTmpDir(), StringUtils.format("dart_%s",
selfNode.getPortToUse()));
return new DartWorkerRunner(
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/stage/ReadablePartition.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/stage/ReadablePartition.java
index 99098d1d4cb..5f366c60009 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/stage/ReadablePartition.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/stage/ReadablePartition.java
@@ -59,6 +59,18 @@ public class ReadablePartition
return new ReadablePartition(stageNumber, workerNumbers, partitionNumber);
}
+ /**
+ * Returns an output partition that is striped across a set of {@code
workerNumbers}.
+ */
+ public static ReadablePartition striped(
+ final int stageNumber,
+ final IntSortedSet workerNumbers,
+ final int partitionNumber
+ )
+ {
+ return new ReadablePartition(stageNumber, workerNumbers, partitionNumber);
+ }
+
/**
* Returns an output partition that has been collected onto a single worker.
*/
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/stage/ReadablePartitions.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/stage/ReadablePartitions.java
index a71535fbcfc..dcf0042f68b 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/stage/ReadablePartitions.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/stage/ReadablePartitions.java
@@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo;
import it.unimi.dsi.fastutil.ints.Int2IntAVLTreeMap;
import it.unimi.dsi.fastutil.ints.Int2IntSortedMap;
import it.unimi.dsi.fastutil.ints.IntAVLTreeSet;
+import it.unimi.dsi.fastutil.ints.IntSortedSet;
import java.util.Collections;
import java.util.List;
@@ -39,6 +40,7 @@ import java.util.Map;
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "collected", value =
CollectedReadablePartitions.class),
@JsonSubTypes.Type(name = "striped", value =
StripedReadablePartitions.class),
+ @JsonSubTypes.Type(name = "sparseStriped", value =
SparseStripedReadablePartitions.class),
@JsonSubTypes.Type(name = "combined", value =
CombinedReadablePartitions.class)
})
public interface ReadablePartitions extends Iterable<ReadablePartition>
@@ -59,7 +61,7 @@ public interface ReadablePartitions extends
Iterable<ReadablePartition>
/**
* Combines various sets of partitions into a single set.
*/
- static CombinedReadablePartitions combine(List<ReadablePartitions>
readablePartitions)
+ static ReadablePartitions combine(List<ReadablePartitions>
readablePartitions)
{
return new CombinedReadablePartitions(readablePartitions);
}
@@ -68,7 +70,7 @@ public interface ReadablePartitions extends
Iterable<ReadablePartition>
* Returns a set of {@code numPartitions} partitions striped across {@code
numWorkers} workers: each worker contains
* a "stripe" of each partition.
*/
- static StripedReadablePartitions striped(
+ static ReadablePartitions striped(
final int stageNumber,
final int numWorkers,
final int numPartitions
@@ -82,11 +84,36 @@ public interface ReadablePartitions extends
Iterable<ReadablePartition>
return new StripedReadablePartitions(stageNumber, numWorkers,
partitionNumbers);
}
+ /**
+ * Returns a set of {@code numPartitions} partitions striped across {@code
workers}: each worker contains
+ * a "stripe" of each partition.
+ */
+ static ReadablePartitions striped(
+ final int stageNumber,
+ final IntSortedSet workers,
+ final int numPartitions
+ )
+ {
+ final IntAVLTreeSet partitionNumbers = new IntAVLTreeSet();
+ for (int i = 0; i < numPartitions; i++) {
+ partitionNumbers.add(i);
+ }
+
+ if (workers.lastInt() == workers.size() - 1) {
+ // Dense worker set. Use StripedReadablePartitions for compactness (send
a single number rather than the
+ // entire worker set) and for backwards compatibility (older workers
cannot understand
+ // SparseStripedReadablePartitions).
+ return new StripedReadablePartitions(stageNumber, workers.size(),
partitionNumbers);
+ } else {
+ return new SparseStripedReadablePartitions(stageNumber, workers,
partitionNumbers);
+ }
+ }
+
/**
* Returns a set of partitions that have been collected onto specific
workers: each partition is on exactly
* one worker.
*/
- static CollectedReadablePartitions collected(
+ static ReadablePartitions collected(
final int stageNumber,
final Map<Integer, Integer> partitionToWorkerMap
)
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/stage/SparseStripedReadablePartitions.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/stage/SparseStripedReadablePartitions.java
new file mode 100644
index 00000000000..e9a02a7d488
--- /dev/null
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/stage/SparseStripedReadablePartitions.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.input.stage;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.Iterators;
+import it.unimi.dsi.fastutil.ints.IntAVLTreeSet;
+import it.unimi.dsi.fastutil.ints.IntSortedSet;
+import org.apache.druid.msq.input.SlicerUtils;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * Set of partitions striped across a sparse set of {@code workers}. Each
worker contains a "stripe" of each partition.
+ *
+ * @see StripedReadablePartitions dense version, where workers from [0..N) are
all used.
+ */
+public class SparseStripedReadablePartitions implements ReadablePartitions
+{
+ private final int stageNumber;
+ private final IntSortedSet workers;
+ private final IntSortedSet partitionNumbers;
+
+ /**
+ * Constructor. Most callers should use {@link
ReadablePartitions#striped(int, int, int)} instead, which takes
+ * a partition count rather than a set of partition numbers.
+ */
+ public SparseStripedReadablePartitions(
+ final int stageNumber,
+ final IntSortedSet workers,
+ final IntSortedSet partitionNumbers
+ )
+ {
+ this.stageNumber = stageNumber;
+ this.workers = workers;
+ this.partitionNumbers = partitionNumbers;
+ }
+
+ @JsonCreator
+ private SparseStripedReadablePartitions(
+ @JsonProperty("stageNumber") final int stageNumber,
+ @JsonProperty("workers") final Set<Integer> workers,
+ @JsonProperty("partitionNumbers") final Set<Integer> partitionNumbers
+ )
+ {
+ this(stageNumber, new IntAVLTreeSet(workers), new
IntAVLTreeSet(partitionNumbers));
+ }
+
+ @Override
+ public Iterator<ReadablePartition> iterator()
+ {
+ return Iterators.transform(
+ partitionNumbers.iterator(),
+ partitionNumber -> ReadablePartition.striped(stageNumber, workers,
partitionNumber)
+ );
+ }
+
+ @Override
+ public List<ReadablePartitions> split(final int maxNumSplits)
+ {
+ final List<ReadablePartitions> retVal = new ArrayList<>();
+
+ for (List<Integer> entries :
SlicerUtils.makeSlicesStatic(partitionNumbers.iterator(), maxNumSplits)) {
+ if (!entries.isEmpty()) {
+ retVal.add(new SparseStripedReadablePartitions(stageNumber, workers,
new IntAVLTreeSet(entries)));
+ }
+ }
+
+ return retVal;
+ }
+
+ @JsonProperty
+ int getStageNumber()
+ {
+ return stageNumber;
+ }
+
+ @JsonProperty
+ IntSortedSet getWorkers()
+ {
+ return workers;
+ }
+
+ @JsonProperty
+ IntSortedSet getPartitionNumbers()
+ {
+ return partitionNumbers;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ SparseStripedReadablePartitions that = (SparseStripedReadablePartitions) o;
+ return stageNumber == that.stageNumber
+ && Objects.equals(workers, that.workers)
+ && Objects.equals(partitionNumbers, that.partitionNumbers);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(stageNumber, workers, partitionNumbers);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "StripedReadablePartitions{" +
+ "stageNumber=" + stageNumber +
+ ", workers=" + workers +
+ ", partitionNumbers=" + partitionNumbers +
+ '}';
+ }
+}
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerStageTracker.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerStageTracker.java
index 338a35e0d24..533cb57b97f 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerStageTracker.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerStageTracker.java
@@ -403,7 +403,7 @@ class ControllerStageTracker
throw new ISE("Stage does not gather result key statistics");
}
- if (workerNumber < 0 || workerNumber >= workerCount) {
+ if (!workerInputs.workers().contains(workerNumber)) {
throw new IAE("Invalid workerNumber [%s]", workerNumber);
}
@@ -522,7 +522,7 @@ class ControllerStageTracker
throw new ISE("Stage does not gather result key statistics");
}
- if (workerNumber < 0 || workerNumber >= workerCount) {
+ if (!workerInputs.workers().contains(workerNumber)) {
throw new IAE("Invalid workerNumber [%s]", workerNumber);
}
@@ -656,7 +656,7 @@ class ControllerStageTracker
throw new ISE("Stage does not gather result key statistics");
}
- if (workerNumber < 0 || workerNumber >= workerCount) {
+ if (!workerInputs.workers().contains(workerNumber)) {
throw new IAE("Invalid workerNumber [%s]", workerNumber);
}
@@ -763,7 +763,7 @@ class ControllerStageTracker
this.resultPartitionBoundaries = clusterByPartitions;
this.resultPartitions = ReadablePartitions.striped(
stageDef.getStageNumber(),
- workerCount,
+ workerInputs.workers(),
clusterByPartitions.size()
);
@@ -788,7 +788,7 @@ class ControllerStageTracker
throw DruidException.defensive("Cannot setDoneReadingInput for
stage[%s], it is not sorting", stageDef.getId());
}
- if (workerNumber < 0 || workerNumber >= workerCount) {
+ if (!workerInputs.workers().contains(workerNumber)) {
throw new IAE("Invalid workerNumber[%s] for stage[%s]", workerNumber,
stageDef.getId());
}
@@ -830,7 +830,7 @@ class ControllerStageTracker
@SuppressWarnings("unchecked")
boolean setResultsCompleteForWorker(final int workerNumber, final Object
resultObject)
{
- if (workerNumber < 0 || workerNumber >= workerCount) {
+ if (!workerInputs.workers().contains(workerNumber)) {
throw new IAE("Invalid workerNumber [%s]", workerNumber);
}
@@ -947,14 +947,18 @@ class ControllerStageTracker
resultPartitionBoundaries =
maybeResultPartitionBoundaries.valueOrThrow();
resultPartitions = ReadablePartitions.striped(
stageNumber,
- workerCount,
+ workerInputs.workers(),
resultPartitionBoundaries.size()
);
- } else if (shuffleSpec.kind() == ShuffleKind.MIX) {
- resultPartitionBoundaries =
ClusterByPartitions.oneUniversalPartition();
- resultPartitions = ReadablePartitions.striped(stageNumber,
workerCount, shuffleSpec.partitionCount());
} else {
- resultPartitions = ReadablePartitions.striped(stageNumber,
workerCount, shuffleSpec.partitionCount());
+ if (shuffleSpec.kind() == ShuffleKind.MIX) {
+ resultPartitionBoundaries =
ClusterByPartitions.oneUniversalPartition();
+ }
+ resultPartitions = ReadablePartitions.striped(
+ stageNumber,
+ workerInputs.workers(),
+ shuffleSpec.partitionCount()
+ );
}
} else {
// No reshuffling: retain partitioning from nonbroadcast inputs.
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/WorkerInputs.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/WorkerInputs.java
index 83d7a602bc1..8dcaee9c213 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/WorkerInputs.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/WorkerInputs.java
@@ -24,7 +24,9 @@ import com.google.common.collect.Iterables;
import it.unimi.dsi.fastutil.ints.Int2IntMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectAVLTreeMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
-import it.unimi.dsi.fastutil.ints.IntSet;
+import it.unimi.dsi.fastutil.ints.Int2ObjectSortedMap;
+import it.unimi.dsi.fastutil.ints.IntAVLTreeSet;
+import it.unimi.dsi.fastutil.ints.IntSortedSet;
import it.unimi.dsi.fastutil.objects.ObjectIterator;
import org.apache.druid.msq.input.InputSlice;
import org.apache.druid.msq.input.InputSpec;
@@ -45,9 +47,9 @@ import java.util.stream.IntStream;
public class WorkerInputs
{
// Worker number -> input number -> input slice.
- private final Int2ObjectMap<List<InputSlice>> assignmentsMap;
+ private final Int2ObjectSortedMap<List<InputSlice>> assignmentsMap;
- private WorkerInputs(final Int2ObjectMap<List<InputSlice>> assignmentsMap)
+ private WorkerInputs(final Int2ObjectSortedMap<List<InputSlice>>
assignmentsMap)
{
this.assignmentsMap = assignmentsMap;
}
@@ -64,7 +66,7 @@ public class WorkerInputs
)
{
// Split each inputSpec and assign to workers. This list maps worker
number -> input number -> input slice.
- final Int2ObjectMap<List<InputSlice>> assignmentsMap = new
Int2ObjectAVLTreeMap<>();
+ final Int2ObjectSortedMap<List<InputSlice>> assignmentsMap = new
Int2ObjectAVLTreeMap<>();
final int numInputs = stageDef.getInputSpecs().size();
if (numInputs == 0) {
@@ -117,8 +119,8 @@ public class WorkerInputs
final ObjectIterator<Int2ObjectMap.Entry<List<InputSlice>>>
assignmentsIterator =
assignmentsMap.int2ObjectEntrySet().iterator();
+ final IntSortedSet nilWorkers = new IntAVLTreeSet();
- boolean first = true;
while (assignmentsIterator.hasNext()) {
final Int2ObjectMap.Entry<List<InputSlice>> entry =
assignmentsIterator.next();
final List<InputSlice> slices = entry.getValue();
@@ -130,20 +132,29 @@ public class WorkerInputs
}
}
- // Eliminate workers that have no non-nil, non-broadcast inputs. (Except
the first one, because if all input
- // is nil, *some* worker has to do *something*.)
- final boolean hasNonNilNonBroadcastInput =
+ // Identify nil workers (workers with no non-broadcast inputs).
+ final boolean isNilWorker =
IntStream.range(0, numInputs)
- .anyMatch(i ->
- !slices.get(i).equals(NilInputSlice.INSTANCE)
// Non-nil
- &&
!stageDef.getBroadcastInputNumbers().contains(i) // Non-broadcast
+ .allMatch(i ->
+ slices.get(i).equals(NilInputSlice.INSTANCE)
// Nil regular input
+ ||
stageDef.getBroadcastInputNumbers().contains(i) // Broadcast
);
- if (!first && !hasNonNilNonBroadcastInput) {
- assignmentsIterator.remove();
+ if (isNilWorker) {
+ nilWorkers.add(entry.getIntKey());
}
+ }
- first = false;
+ if (nilWorkers.size() == assignmentsMap.size()) {
+ // All workers have nil regular inputs. Remove all workers exept the
first (*some* worker has to do *something*).
+ final List<InputSlice> firstSlices =
assignmentsMap.get(nilWorkers.firstInt());
+ assignmentsMap.clear();
+ assignmentsMap.put(nilWorkers.firstInt(), firstSlices);
+ } else {
+ // Remove all nil workers.
+ for (final int nilWorker : nilWorkers) {
+ assignmentsMap.remove(nilWorker);
+ }
}
return new WorkerInputs(assignmentsMap);
@@ -154,7 +165,7 @@ public class WorkerInputs
return Preconditions.checkNotNull(assignmentsMap.get(workerNumber),
"worker [%s]", workerNumber);
}
- public IntSet workers()
+ public IntSortedSet workers()
{
return assignmentsMap.keySet();
}
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/http/DartSqlResourceTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/http/DartSqlResourceTest.java
index db347917872..51e17235203 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/http/DartSqlResourceTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/http/DartSqlResourceTest.java
@@ -331,9 +331,10 @@ public class DartSqlResourceTest extends MSQTestBase
"sid",
"did2",
"SELECT 2",
+ "localhost:1002",
AUTHENTICATOR_NAME,
DIFFERENT_REGULAR_USER_NAME,
- DateTimes.of("2000"),
+ DateTimes.of("2001"),
ControllerHolder.State.RUNNING.toString()
);
Mockito.when(dartSqlClient.getRunningQueries(true))
@@ -398,6 +399,7 @@ public class DartSqlResourceTest extends MSQTestBase
"sid",
"did2",
"SELECT 2",
+ "localhost:1002",
AUTHENTICATOR_NAME,
DIFFERENT_REGULAR_USER_NAME,
DateTimes.of("2000"),
@@ -434,6 +436,7 @@ public class DartSqlResourceTest extends MSQTestBase
"sid",
"did2",
"SELECT 2",
+ "localhost:1002",
AUTHENTICATOR_NAME,
DIFFERENT_REGULAR_USER_NAME,
DateTimes.of("2000"),
@@ -724,7 +727,7 @@ public class DartSqlResourceTest extends MSQTestBase
.thenReturn(makeAuthenticationResult(REGULAR_USER_NAME));
final Response cancellationResponse =
sqlResource.cancelQuery("nonexistent", httpServletRequest);
- Assertions.assertEquals(Response.Status.NOT_FOUND.getStatusCode(),
cancellationResponse.getStatus());
+ Assertions.assertEquals(Response.Status.ACCEPTED.getStatusCode(),
cancellationResponse.getStatus());
}
/**
@@ -739,8 +742,15 @@ public class DartSqlResourceTest extends MSQTestBase
Mockito.when(controller.queryId()).thenReturn("did_" + identity);
final AuthenticationResult authenticationResult =
makeAuthenticationResult(identity);
- final ControllerHolder holder =
- new ControllerHolder(controller, null, "sid", "SELECT 1",
authenticationResult, DateTimes.of("2000"));
+ final ControllerHolder holder = new ControllerHolder(
+ controller,
+ null,
+ "sid",
+ "SELECT 1",
+ "localhost:1001",
+ authenticationResult,
+ DateTimes.of("2000")
+ );
controllerRegistry.register(holder);
return holder;
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/http/GetQueriesResponseTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/http/GetQueriesResponseTest.java
index 7b43c863c9d..bffaace5745 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/http/GetQueriesResponseTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/http/GetQueriesResponseTest.java
@@ -41,6 +41,7 @@ public class GetQueriesResponseTest
"xyz",
"abc",
"SELECT 1",
+ "localhost:1001",
"auth",
"anon",
DateTimes.of("2000"),
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/sql/DartSqlClientImplTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/sql/DartSqlClientImplTest.java
index 19a4eaf0b15..114ea9c7207 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/sql/DartSqlClientImplTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/sql/DartSqlClientImplTest.java
@@ -69,6 +69,7 @@ public class DartSqlClientImplTest
"sid",
"did",
"SELECT 1",
+ "localhost:1001",
"",
"",
DateTimes.of("2000"),
@@ -97,6 +98,7 @@ public class DartSqlClientImplTest
"sid",
"did",
"SELECT 1",
+ "localhost:1001",
"",
"",
DateTimes.of("2000"),
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/input/stage/CollectedReadablePartitionsTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/input/stage/CollectedReadablePartitionsTest.java
index 6ed7d2d43d4..d4db7a0a7c5 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/input/stage/CollectedReadablePartitionsTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/input/stage/CollectedReadablePartitionsTest.java
@@ -33,21 +33,24 @@ public class CollectedReadablePartitionsTest
@Test
public void testPartitionToWorkerMap()
{
- final CollectedReadablePartitions partitions =
ReadablePartitions.collected(1, ImmutableMap.of(0, 1, 1, 2, 2, 1));
+ final CollectedReadablePartitions partitions =
+ (CollectedReadablePartitions) ReadablePartitions.collected(1,
ImmutableMap.of(0, 1, 1, 2, 2, 1));
Assert.assertEquals(ImmutableMap.of(0, 1, 1, 2, 2, 1),
partitions.getPartitionToWorkerMap());
}
@Test
public void testStageNumber()
{
- final CollectedReadablePartitions partitions =
ReadablePartitions.collected(1, ImmutableMap.of(0, 1, 1, 2, 2, 1));
+ final CollectedReadablePartitions partitions =
+ (CollectedReadablePartitions) ReadablePartitions.collected(1,
ImmutableMap.of(0, 1, 1, 2, 2, 1));
Assert.assertEquals(1, partitions.getStageNumber());
}
@Test
public void testSplit()
{
- final CollectedReadablePartitions partitions =
ReadablePartitions.collected(1, ImmutableMap.of(0, 1, 1, 2, 2, 1));
+ final CollectedReadablePartitions partitions =
+ (CollectedReadablePartitions) ReadablePartitions.collected(1,
ImmutableMap.of(0, 1, 1, 2, 2, 1));
Assert.assertEquals(
ImmutableList.of(
@@ -64,7 +67,8 @@ public class CollectedReadablePartitionsTest
final ObjectMapper mapper = TestHelper.makeJsonMapper()
.registerModules(new
MSQIndexingModule().getJacksonModules());
- final CollectedReadablePartitions partitions =
ReadablePartitions.collected(1, ImmutableMap.of(0, 1, 1, 2, 2, 1));
+ final CollectedReadablePartitions partitions =
+ (CollectedReadablePartitions) ReadablePartitions.collected(1,
ImmutableMap.of(0, 1, 1, 2, 2, 1));
Assert.assertEquals(
partitions,
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/input/stage/CombinedReadablePartitionsTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/input/stage/CombinedReadablePartitionsTest.java
index 685f4ff7a8a..16bd047b624 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/input/stage/CombinedReadablePartitionsTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/input/stage/CombinedReadablePartitionsTest.java
@@ -31,7 +31,7 @@ import org.junit.Test;
public class CombinedReadablePartitionsTest
{
- private static final CombinedReadablePartitions PARTITIONS =
ReadablePartitions.combine(
+ private static final ReadablePartitions PARTITIONS =
ReadablePartitions.combine(
ImmutableList.of(
ReadablePartitions.striped(0, 2, 2),
ReadablePartitions.striped(1, 2, 4)
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/input/stage/StripedReadablePartitionsTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/input/stage/SparseStripedReadablePartitionsTest.java
similarity index 61%
copy from
extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/input/stage/StripedReadablePartitionsTest.java
copy to
extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/input/stage/SparseStripedReadablePartitionsTest.java
index 38e0707f5d0..5268fd60180 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/input/stage/StripedReadablePartitionsTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/input/stage/SparseStripedReadablePartitionsTest.java
@@ -23,44 +23,50 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import it.unimi.dsi.fastutil.ints.IntAVLTreeSet;
+import it.unimi.dsi.fastutil.ints.IntSet;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.msq.guice.MSQIndexingModule;
import org.apache.druid.segment.TestHelper;
import org.junit.Assert;
import org.junit.Test;
-public class StripedReadablePartitionsTest
+public class SparseStripedReadablePartitionsTest
{
@Test
public void testPartitionNumbers()
{
- final StripedReadablePartitions partitions = ReadablePartitions.striped(1,
2, 3);
+ final SparseStripedReadablePartitions partitions =
+ (SparseStripedReadablePartitions) ReadablePartitions.striped(1, new
IntAVLTreeSet(new int[]{1, 3}), 3);
Assert.assertEquals(ImmutableSet.of(0, 1, 2),
partitions.getPartitionNumbers());
}
@Test
- public void testNumWorkers()
+ public void testWorkers()
{
- final StripedReadablePartitions partitions = ReadablePartitions.striped(1,
2, 3);
- Assert.assertEquals(2, partitions.getNumWorkers());
+ final SparseStripedReadablePartitions partitions =
+ (SparseStripedReadablePartitions) ReadablePartitions.striped(1, new
IntAVLTreeSet(new int[]{1, 3}), 3);
+ Assert.assertEquals(IntSet.of(1, 3), partitions.getWorkers());
}
@Test
public void testStageNumber()
{
- final StripedReadablePartitions partitions = ReadablePartitions.striped(1,
2, 3);
+ final SparseStripedReadablePartitions partitions =
+ (SparseStripedReadablePartitions) ReadablePartitions.striped(1, new
IntAVLTreeSet(new int[]{1, 3}), 3);
Assert.assertEquals(1, partitions.getStageNumber());
}
@Test
public void testSplit()
{
- final StripedReadablePartitions partitions = ReadablePartitions.striped(1,
2, 3);
+ final IntAVLTreeSet workers = new IntAVLTreeSet(new int[]{1, 3});
+ final SparseStripedReadablePartitions partitions =
+ (SparseStripedReadablePartitions) ReadablePartitions.striped(1,
workers, 3);
Assert.assertEquals(
ImmutableList.of(
- new StripedReadablePartitions(1, 2, new IntAVLTreeSet(new int[]{0,
2})),
- new StripedReadablePartitions(1, 2, new IntAVLTreeSet(new
int[]{1}))
+ new SparseStripedReadablePartitions(1, workers, new
IntAVLTreeSet(new int[]{0, 2})),
+ new SparseStripedReadablePartitions(1, workers, new
IntAVLTreeSet(new int[]{1}))
),
partitions.split(2)
);
@@ -72,7 +78,8 @@ public class StripedReadablePartitionsTest
final ObjectMapper mapper = TestHelper.makeJsonMapper()
.registerModules(new
MSQIndexingModule().getJacksonModules());
- final StripedReadablePartitions partitions = ReadablePartitions.striped(1,
2, 3);
+ final IntAVLTreeSet workers = new IntAVLTreeSet(new int[]{1, 3});
+ final ReadablePartitions partitions = ReadablePartitions.striped(1,
workers, 3);
Assert.assertEquals(
partitions,
@@ -86,6 +93,6 @@ public class StripedReadablePartitionsTest
@Test
public void testEquals()
{
-
EqualsVerifier.forClass(StripedReadablePartitions.class).usingGetClass().verify();
+
EqualsVerifier.forClass(SparseStripedReadablePartitions.class).usingGetClass().verify();
}
}
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/input/stage/StripedReadablePartitionsTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/input/stage/StripedReadablePartitionsTest.java
index 38e0707f5d0..05b42b33250 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/input/stage/StripedReadablePartitionsTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/input/stage/StripedReadablePartitionsTest.java
@@ -26,36 +26,60 @@ import it.unimi.dsi.fastutil.ints.IntAVLTreeSet;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.msq.guice.MSQIndexingModule;
import org.apache.druid.segment.TestHelper;
+import org.hamcrest.CoreMatchers;
+import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Test;
public class StripedReadablePartitionsTest
{
+ @Test
+ public void testFromDenseSet()
+ {
+ // Tests that when ReadablePartitions.striped is called with a dense set,
we get StripedReadablePartitions.
+
+ final IntAVLTreeSet workers = new IntAVLTreeSet();
+ workers.add(0);
+ workers.add(1);
+
+ final ReadablePartitions readablePartitionsFromSet =
ReadablePartitions.striped(1, workers, 3);
+
+ MatcherAssert.assertThat(
+ readablePartitionsFromSet,
+ CoreMatchers.instanceOf(StripedReadablePartitions.class)
+ );
+
+ Assert.assertEquals(
+ ReadablePartitions.striped(1, 2, 3),
+ readablePartitionsFromSet
+ );
+ }
+
@Test
public void testPartitionNumbers()
{
- final StripedReadablePartitions partitions = ReadablePartitions.striped(1,
2, 3);
+ final StripedReadablePartitions partitions = (StripedReadablePartitions)
ReadablePartitions.striped(1, 2, 3);
Assert.assertEquals(ImmutableSet.of(0, 1, 2),
partitions.getPartitionNumbers());
}
@Test
public void testNumWorkers()
{
- final StripedReadablePartitions partitions = ReadablePartitions.striped(1,
2, 3);
+ final StripedReadablePartitions partitions = (StripedReadablePartitions)
ReadablePartitions.striped(1, 2, 3);
Assert.assertEquals(2, partitions.getNumWorkers());
}
@Test
public void testStageNumber()
{
- final StripedReadablePartitions partitions = ReadablePartitions.striped(1,
2, 3);
+ final StripedReadablePartitions partitions = (StripedReadablePartitions)
ReadablePartitions.striped(1, 2, 3);
Assert.assertEquals(1, partitions.getStageNumber());
}
@Test
public void testSplit()
{
- final StripedReadablePartitions partitions = ReadablePartitions.striped(1,
2, 3);
+ final ReadablePartitions partitions = ReadablePartitions.striped(1, 2, 3);
Assert.assertEquals(
ImmutableList.of(
@@ -72,7 +96,7 @@ public class StripedReadablePartitionsTest
final ObjectMapper mapper = TestHelper.makeJsonMapper()
.registerModules(new
MSQIndexingModule().getJacksonModules());
- final StripedReadablePartitions partitions = ReadablePartitions.striped(1,
2, 3);
+ final ReadablePartitions partitions = ReadablePartitions.striped(1, 2, 3);
Assert.assertEquals(
partitions,
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/WorkerInputsTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/WorkerInputsTest.java
index 605e0bf2de7..e74125b0830 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/WorkerInputsTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/WorkerInputsTest.java
@@ -25,9 +25,11 @@ import it.unimi.dsi.fastutil.ints.Int2IntMaps;
import it.unimi.dsi.fastutil.ints.Int2ObjectAVLTreeMap;
import it.unimi.dsi.fastutil.ints.IntAVLTreeSet;
import it.unimi.dsi.fastutil.ints.IntSet;
+import it.unimi.dsi.fastutil.ints.IntSortedSet;
import it.unimi.dsi.fastutil.longs.LongArrayList;
import it.unimi.dsi.fastutil.longs.LongList;
import nl.jqno.equalsverifier.EqualsVerifier;
+import org.apache.druid.error.DruidException;
import org.apache.druid.msq.exec.Limits;
import org.apache.druid.msq.exec.OutputChannelMode;
import org.apache.druid.msq.input.InputSlice;
@@ -75,7 +77,7 @@ public class WorkerInputsTest
final WorkerInputs inputs = WorkerInputs.create(
stageDef,
Int2IntMaps.EMPTY_MAP,
- new TestInputSpecSlicer(true),
+ new TestInputSpecSlicer(denseWorkers(4), true),
WorkerAssignmentStrategy.MAX,
Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER
);
@@ -91,6 +93,35 @@ public class WorkerInputsTest
);
}
+ @Test
+ public void test_max_threeInputs_fourWorkers_withGaps()
+ {
+ final StageDefinition stageDef =
+ StageDefinition.builder(0)
+ .inputs(new TestInputSpec(1, 2, 3))
+ .maxWorkerCount(4)
+ .processorFactory(new
OffsetLimitFrameProcessorFactory(0, 0L))
+ .build(QUERY_ID);
+
+ final WorkerInputs inputs = WorkerInputs.create(
+ stageDef,
+ Int2IntMaps.EMPTY_MAP,
+ new TestInputSpecSlicer(new IntAVLTreeSet(new int[]{1, 3, 4, 5}),
true),
+ WorkerAssignmentStrategy.MAX,
+ Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER
+ );
+
+ Assert.assertEquals(
+ ImmutableMap.<Integer, List<InputSlice>>builder()
+ .put(1, Collections.singletonList(new TestInputSlice(1)))
+ .put(3, Collections.singletonList(new TestInputSlice(2)))
+ .put(4, Collections.singletonList(new TestInputSlice(3)))
+ .put(5, Collections.singletonList(new TestInputSlice()))
+ .build(),
+ inputs.assignmentsMap()
+ );
+ }
+
@Test
public void test_max_zeroInputs_fourWorkers()
{
@@ -104,7 +135,7 @@ public class WorkerInputsTest
final WorkerInputs inputs = WorkerInputs.create(
stageDef,
Int2IntMaps.EMPTY_MAP,
- new TestInputSpecSlicer(true),
+ new TestInputSpecSlicer(denseWorkers(4), true),
WorkerAssignmentStrategy.MAX,
Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER
);
@@ -133,7 +164,7 @@ public class WorkerInputsTest
final WorkerInputs inputs = WorkerInputs.create(
stageDef,
Int2IntMaps.EMPTY_MAP,
- new TestInputSpecSlicer(true),
+ new TestInputSpecSlicer(denseWorkers(4), true),
WorkerAssignmentStrategy.AUTO,
Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER
);
@@ -159,7 +190,7 @@ public class WorkerInputsTest
final WorkerInputs inputs = WorkerInputs.create(
stageDef,
Int2IntMaps.EMPTY_MAP,
- new TestInputSpecSlicer(true),
+ new TestInputSpecSlicer(denseWorkers(4), true),
WorkerAssignmentStrategy.AUTO,
Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER
);
@@ -186,7 +217,7 @@ public class WorkerInputsTest
final WorkerInputs inputs = WorkerInputs.create(
stageDef,
Int2IntMaps.EMPTY_MAP,
- new TestInputSpecSlicer(true),
+ new TestInputSpecSlicer(denseWorkers(4), true),
WorkerAssignmentStrategy.AUTO,
Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER
);
@@ -212,7 +243,7 @@ public class WorkerInputsTest
final WorkerInputs inputs = WorkerInputs.create(
stageDef,
Int2IntMaps.EMPTY_MAP,
- new TestInputSpecSlicer(true),
+ new TestInputSpecSlicer(denseWorkers(4), true),
WorkerAssignmentStrategy.AUTO,
Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER
);
@@ -324,7 +355,7 @@ public class WorkerInputsTest
final WorkerInputs inputs = WorkerInputs.create(
stageDef,
Int2IntMaps.EMPTY_MAP,
- new TestInputSpecSlicer(true),
+ new TestInputSpecSlicer(denseWorkers(4), true),
WorkerAssignmentStrategy.AUTO,
Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER
);
@@ -351,7 +382,7 @@ public class WorkerInputsTest
final WorkerInputs inputs = WorkerInputs.create(
stageDef,
Int2IntMaps.EMPTY_MAP,
- new TestInputSpecSlicer(true),
+ new TestInputSpecSlicer(denseWorkers(2), true),
WorkerAssignmentStrategy.AUTO,
Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER
);
@@ -384,7 +415,7 @@ public class WorkerInputsTest
final WorkerInputs inputs = WorkerInputs.create(
stageDef,
Int2IntMaps.EMPTY_MAP,
- new TestInputSpecSlicer(true),
+ new TestInputSpecSlicer(denseWorkers(1), true),
WorkerAssignmentStrategy.AUTO,
Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER
);
@@ -411,7 +442,7 @@ public class WorkerInputsTest
.processorFactory(new
OffsetLimitFrameProcessorFactory(0, 0L))
.build(QUERY_ID);
- TestInputSpecSlicer testInputSpecSlicer = spy(new
TestInputSpecSlicer(true));
+ TestInputSpecSlicer testInputSpecSlicer = spy(new
TestInputSpecSlicer(denseWorkers(3), true));
final WorkerInputs inputs = WorkerInputs.create(
stageDef,
@@ -455,7 +486,7 @@ public class WorkerInputsTest
.processorFactory(new
OffsetLimitFrameProcessorFactory(0, 0L))
.build(QUERY_ID);
- TestInputSpecSlicer testInputSpecSlicer = spy(new
TestInputSpecSlicer(true));
+ TestInputSpecSlicer testInputSpecSlicer = spy(new
TestInputSpecSlicer(denseWorkers(3), true));
final WorkerInputs inputs = WorkerInputs.create(
stageDef,
@@ -498,7 +529,7 @@ public class WorkerInputsTest
.processorFactory(new
OffsetLimitFrameProcessorFactory(0, 0L))
.build(QUERY_ID);
- TestInputSpecSlicer testInputSpecSlicer = spy(new
TestInputSpecSlicer(true));
+ TestInputSpecSlicer testInputSpecSlicer = spy(new
TestInputSpecSlicer(denseWorkers(3), true));
final WorkerInputs inputs = WorkerInputs.create(
stageDef,
@@ -585,11 +616,23 @@ public class WorkerInputsTest
private static class TestInputSpecSlicer implements InputSpecSlicer
{
+ private final IntSortedSet workers;
private final boolean canSliceDynamic;
- public TestInputSpecSlicer(boolean canSliceDynamic)
+ /**
+ * Create a test slicer.
+ *
+ * @param workers Set of workers to consider assigning work to.
+ * @param canSliceDynamic Whether this slicer can slice dynamically.
+ */
+ public TestInputSpecSlicer(final IntSortedSet workers, final boolean
canSliceDynamic)
{
+ this.workers = workers;
this.canSliceDynamic = canSliceDynamic;
+
+ if (workers.isEmpty()) {
+ throw DruidException.defensive("Need more than one worker in
workers[%s]", workers);
+ }
}
@Override
@@ -606,9 +649,9 @@ public class WorkerInputsTest
SlicerUtils.makeSlicesStatic(
testInputSpec.values.iterator(),
i -> i,
- maxNumSlices
+ Math.min(maxNumSlices, workers.size())
);
- return makeSlices(assignments);
+ return makeSlices(workers, assignments);
}
@Override
@@ -624,24 +667,39 @@ public class WorkerInputsTest
SlicerUtils.makeSlicesDynamic(
testInputSpec.values.iterator(),
i -> i,
- maxNumSlices,
+ Math.min(maxNumSlices, workers.size()),
maxFilesPerSlice,
maxBytesPerSlice
);
- return makeSlices(assignments);
+ return makeSlices(workers, assignments);
}
private static List<InputSlice> makeSlices(
+ final IntSortedSet workers,
final List<List<Long>> assignments
)
{
final List<InputSlice> retVal = new ArrayList<>(assignments.size());
-
- for (final List<Long> assignment : assignments) {
- retVal.add(new TestInputSlice(new LongArrayList(assignment)));
+ for (int assignment = 0, workerNumber = 0;
+ workerNumber <= workers.lastInt() && assignment <
assignments.size();
+ workerNumber++) {
+ if (workers.contains(workerNumber)) {
+ retVal.add(new TestInputSlice(new
LongArrayList(assignments.get(assignment++))));
+ } else {
+ retVal.add(NilInputSlice.INSTANCE);
+ }
}
return retVal;
}
}
+
+ private static IntSortedSet denseWorkers(final int numWorkers)
+ {
+ final IntAVLTreeSet workers = new IntAVLTreeSet();
+ for (int i = 0; i < numWorkers; i++) {
+ workers.add(i);
+ }
+ return workers;
+ }
}
diff --git a/licenses.yaml b/licenses.yaml
index a04391ce902..12c2d031c37 100644
--- a/licenses.yaml
+++ b/licenses.yaml
@@ -3327,7 +3327,7 @@ name: Protocol Buffers
license_category: binary
module: java-core
license_name: BSD-3-Clause License
-version: 3.24.0
+version: 3.25.5
copyright: Google, Inc.
license_file_path:
- licenses/bin/protobuf-java.BSD3
@@ -3493,7 +3493,7 @@ name: Protocol Buffers
license_category: binary
module: extensions/druid-protobuf-extensions
license_name: BSD-3-Clause License
-version: 3.24.0
+version: 3.25.5
copyright: Google, Inc.
license_file_path: licenses/bin/protobuf-java.BSD3
libraries:
diff --git a/pom.xml b/pom.xml
index b2a2dbf22de..3568d1a045b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -108,7 +108,7 @@
<netty3.version>3.10.6.Final</netty3.version>
<netty4.version>4.1.108.Final</netty4.version>
<postgresql.version>42.7.2</postgresql.version>
- <protobuf.version>3.24.0</protobuf.version>
+ <protobuf.version>3.25.5</protobuf.version>
<resilience4j.version>1.3.1</resilience4j.version>
<slf4j.version>1.7.36</slf4j.version>
<jna.version>5.13.0</jna.version>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]