This is an automated email from the ASF dual-hosted git repository.
karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 7cb21cb968 Use worker number instead of task id in MSQ for
communication to/from workers. (#13062)
7cb21cb968 is described below
commit 7cb21cb9685b6641c52e2a51e9707fcbcb7f6903
Author: Laksh Singla <[email protected]>
AuthorDate: Thu Nov 3 10:25:45 2022 +0530
Use worker number instead of task id in MSQ for communication to/from
workers. (#13062)
* Conversion from taskId to workerNumber in the workerClient
* storage connector changes, suffix file when finish writing to it
* Fix tests
* Trigger Build
* convert IntFunction to a dedicated interface
* first review round
* use a dummy file to indicate success
* fetch the first filename from the list in case of multiple files
* tests working, fix semantic issue with ls
* change how the success flag works
* comments, checkstyle, method rename
* fix test
* forbiddenapis fix
* Trigger Build
* change the writer
* dead store fix
* Review comments
* revert changes
* review
* review comments
* Update
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/shuffle/DurableStorageInputChannelFactory.java
Co-authored-by: Karan Kumar <[email protected]>
* Update
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/shuffle/DurableStorageInputChannelFactory.java
Co-authored-by: Karan Kumar <[email protected]>
* update error messages
* better error messages
* fix checkstyle
Co-authored-by: Karan Kumar <[email protected]>
---
.../storage/local/LocalFileStorageConnector.java | 6 +-
.../apache/druid/msq/exec/ControllerClient.java | 1 -
.../apache/druid/msq/exec/ControllerContext.java | 1 -
.../org/apache/druid/msq/exec/ControllerImpl.java | 5 +-
.../org/apache/druid/msq/exec/WorkerClient.java | 8 +-
.../java/org/apache/druid/msq/exec/WorkerImpl.java | 54 +++++++++--
.../druid/msq/indexing/ControllerChatHandler.java | 3 +-
.../msq/indexing/IndexerControllerClient.java | 7 +-
.../error/MSQWarningReportSimplePublisher.java | 2 +-
.../shuffle/DurableStorageInputChannelFactory.java | 92 ++++++++++++++----
.../DurableStorageOutputChannelFactory.java | 79 ++++++++++------
.../druid/msq/shuffle/DurableStorageUtils.java | 105 +++++++++++++++++++++
.../org/apache/druid/msq/exec/MSQSelectTest.java | 51 ++++++++++
.../msq/indexing/report/MSQTaskReportTest.java | 6 +-
.../org/apache/druid/msq/test/MSQTestBase.java | 8 +-
.../druid/msq/test/MSQTestControllerClient.java | 2 +-
16 files changed, 349 insertions(+), 81 deletions(-)
diff --git
a/core/src/main/java/org/apache/druid/storage/local/LocalFileStorageConnector.java
b/core/src/main/java/org/apache/druid/storage/local/LocalFileStorageConnector.java
index 8699568b6d..de70f9a941 100644
---
a/core/src/main/java/org/apache/druid/storage/local/LocalFileStorageConnector.java
+++
b/core/src/main/java/org/apache/druid/storage/local/LocalFileStorageConnector.java
@@ -31,7 +31,7 @@ import java.io.OutputStream;
import java.nio.file.Files;
/**
- * Implementation that uses local filesystem. All paths are appended with the
base path, in such a way that its not visible
+ * Implementation that uses local filesystem. All paths are appended with the
base path, in such a way that it is not visible
* to the users of this class.
*/
public class LocalFileStorageConnector implements StorageConnector
@@ -54,10 +54,6 @@ public class LocalFileStorageConnector implements
StorageConnector
/**
* Reads the file present as basePath + path. Will throw an IO exception in
case the file is not present.
* Closing of the stream is the responsibility of the caller.
- *
- * @param path
- * @return
- * @throws IOException
*/
@Override
public InputStream read(String path) throws IOException
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerClient.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerClient.java
index f621133586..faf1c3ff5e 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerClient.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerClient.java
@@ -71,7 +71,6 @@ public interface ControllerClient extends AutoCloseable
* Client side method to inform the controller about the warnings generated
by the given worker.
*/
void postWorkerWarning(
- String workerId,
List<MSQErrorReport> MSQErrorReports
) throws IOException;
List<String> getTaskList() throws IOException;
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java
index f77290c092..e09ac9ebd6 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java
@@ -74,7 +74,6 @@ public interface ControllerContext
* Client for communicating with workers.
*/
WorkerClient taskClientFor(Controller controller);
-
/**
* Writes controller task report.
*/
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
index a2f9328000..8eb348bf50 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
@@ -146,7 +146,7 @@ import org.apache.druid.msq.querykit.ShuffleSpecFactory;
import org.apache.druid.msq.querykit.groupby.GroupByQueryKit;
import org.apache.druid.msq.querykit.scan.ScanQueryKit;
import org.apache.druid.msq.shuffle.DurableStorageInputChannelFactory;
-import org.apache.druid.msq.shuffle.DurableStorageOutputChannelFactory;
+import org.apache.druid.msq.shuffle.DurableStorageUtils;
import org.apache.druid.msq.shuffle.WorkerInputChannelFactory;
import org.apache.druid.msq.statistics.ClusterByStatisticsSnapshot;
import org.apache.druid.msq.util.DimensionSchemaUtils;
@@ -1202,7 +1202,6 @@ public class ControllerImpl implements Controller
if
(MultiStageQueryContext.isDurableStorageEnabled(task.getQuerySpec().getQuery().context()))
{
inputChannelFactory =
DurableStorageInputChannelFactory.createStandardImplementation(
id(),
- () -> taskIds,
MSQTasks.makeStorageConnector(context.injector()),
closer
);
@@ -1303,7 +1302,7 @@ public class ControllerImpl implements Controller
private void cleanUpDurableStorageIfNeeded()
{
if
(MultiStageQueryContext.isDurableStorageEnabled(task.getQuerySpec().getQuery().context()))
{
- final String controllerDirName =
DurableStorageOutputChannelFactory.getControllerDirectory(task.getId());
+ final String controllerDirName =
DurableStorageUtils.getControllerDirectory(task.getId());
try {
// Delete all temporary files as a failsafe
MSQTasks.makeStorageConnector(context.injector()).deleteRecursively(controllerDirName);
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerClient.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerClient.java
index 72a0a81604..8f4c7bac23 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerClient.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerClient.java
@@ -36,7 +36,7 @@ public interface WorkerClient extends AutoCloseable
/**
* Worker's client method to add a {@link WorkOrder} to the worker to work on
*/
- ListenableFuture<Void> postWorkOrder(String workerId, WorkOrder workOrder);
+ ListenableFuture<Void> postWorkOrder(String workerTaskId, WorkOrder
workOrder);
/**
* Worker's client method to inform it of the partition boundaries for the
given stage. This is usually invoked by the
@@ -50,13 +50,15 @@ public interface WorkerClient extends AutoCloseable
/**
* Worker's client method to inform that the work has been done, and it can
initiate cleanup and shutdown
+ * @param workerTaskId
*/
- ListenableFuture<Void> postFinish(String workerId);
+ ListenableFuture<Void> postFinish(String workerTaskId);
/**
* Fetches all the counters gathered by that worker
+ * @param workerTaskId
*/
- ListenableFuture<CounterSnapshotsTree> getCounters(String workerId);
+ ListenableFuture<CounterSnapshotsTree> getCounters(String workerTaskId);
/**
* Worker's client method that informs it that the results and resources for
the given stage are no longer required
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java
index d930d825f9..ad1cdfa04c 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java
@@ -102,6 +102,7 @@ import org.apache.druid.msq.kernel.worker.WorkerStagePhase;
import org.apache.druid.msq.querykit.DataSegmentProvider;
import org.apache.druid.msq.shuffle.DurableStorageInputChannelFactory;
import org.apache.druid.msq.shuffle.DurableStorageOutputChannelFactory;
+import org.apache.druid.msq.shuffle.DurableStorageUtils;
import org.apache.druid.msq.shuffle.WorkerInputChannelFactory;
import org.apache.druid.msq.statistics.ClusterByStatisticsCollector;
import org.apache.druid.msq.statistics.ClusterByStatisticsSnapshot;
@@ -210,7 +211,12 @@ public class WorkerImpl implements Worker
}
catch (Throwable e) {
maybeErrorReport = Optional.of(
- MSQErrorReport.fromException(id(),
MSQTasks.getHostFromSelfNode(selfDruidNode), null, e)
+ MSQErrorReport.fromException(
+ id(),
+ MSQTasks.getHostFromSelfNode(selfDruidNode),
+ null,
+ e
+ )
);
}
@@ -241,6 +247,7 @@ public class WorkerImpl implements Worker
this.controllerClient =
context.makeControllerClient(task.getControllerTaskId());
closer.register(controllerClient::close);
context.registerWorker(this, closer); // Uses controllerClient, so must be
called after that is initialized
+
this.workerClient = new
ExceptionWrappingWorkerClient(context.makeWorkerClient());
closer.register(workerClient::close);
@@ -583,7 +590,6 @@ public class WorkerImpl implements Worker
if (durableStageStorageEnabled) {
return DurableStorageInputChannelFactory.createStandardImplementation(
task.getControllerTaskId(),
- workerTaskList,
MSQTasks.makeStorageConnector(context.injector()),
closer
);
@@ -601,8 +607,9 @@ public class WorkerImpl implements Worker
if (durableStageStorageEnabled) {
return DurableStorageOutputChannelFactory.createStandardImplementation(
task.getControllerTaskId(),
- id(),
+ task().getWorkerNumber(),
stageNumber,
+ task().getId(),
frameSize,
MSQTasks.makeStorageConnector(context.injector())
);
@@ -709,18 +716,18 @@ public class WorkerImpl implements Worker
// Therefore, the logic for cleaning the stage output in case of a
worker/machine crash has to be external.
// We currently take care of this in the controller.
if (durableStageStorageEnabled) {
- final String fileName =
DurableStorageOutputChannelFactory.getPartitionFileName(
+ final String folderName =
DurableStorageUtils.getTaskIdOutputsFolderName(
task.getControllerTaskId(),
- task.getId(),
stageId.getStageNumber(),
- partition
+ task.getWorkerNumber(),
+ task.getId()
);
try {
-
MSQTasks.makeStorageConnector(context.injector()).deleteFile(fileName);
+
MSQTasks.makeStorageConnector(context.injector()).deleteRecursively(folderName);
}
catch (Exception e) {
// If an error is thrown while cleaning up a file, log it and try to
continue with the cleanup
- log.warn(e, "Error while cleaning up temporary files at path " +
fileName);
+ log.warn(e, "Error while cleaning up folder at path " + folderName);
}
}
}
@@ -888,7 +895,38 @@ public class WorkerImpl implements Worker
for (OutputChannel channel : outputChannels.getAllChannels()) {
stageOutputs.computeIfAbsent(stageDef.getId(), ignored1 -> new
ConcurrentHashMap<>())
.computeIfAbsent(channel.getPartitionNumber(),
ignored2 -> channel.getReadableChannel());
+
+ }
+
+ if (durableStageStorageEnabled) {
+ // Once the outputs channels have been resolved and are ready
for reading, the worker appends the filename
+ // with a special marker flag and adds it to the
+ DurableStorageOutputChannelFactory
durableStorageOutputChannelFactory =
+
DurableStorageOutputChannelFactory.createStandardImplementation(
+ task.getControllerTaskId(),
+ task().getWorkerNumber(),
+ stageDef.getStageNumber(),
+ task().getId(),
+ frameContext.memoryParameters().getStandardFrameSize(),
+ MSQTasks.makeStorageConnector(context.injector())
+ );
+ try {
+
durableStorageOutputChannelFactory.createSuccessFile(task.getId());
+ }
+ catch (IOException e) {
+ throw new ISE(
+ e,
+ "Unable to create the success file [%s] at the location
[%s]",
+ DurableStorageUtils.SUCCESS_MARKER_FILENAME,
+ DurableStorageUtils.getSuccessFilePath(
+ task.getControllerTaskId(),
+ stageDef.getStageNumber(),
+ task().getWorkerNumber()
+ )
+ );
+ }
}
+
kernelManipulationQueue.add(holder -> holder.getStageKernelMap()
.get(stageDef.getId())
.setResultsComplete(resultObject));
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/ControllerChatHandler.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/ControllerChatHandler.java
index 5561a98a07..1857d83708 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/ControllerChatHandler.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/ControllerChatHandler.java
@@ -106,12 +106,11 @@ public class ControllerChatHandler implements ChatHandler
* See {@link ControllerClient#postWorkerWarning} for the client-side code
that calls this API.
*/
@POST
- @Path("/workerWarning/{taskId}")
+ @Path("/workerWarning")
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
public Response httpPostWorkerWarning(
final List<MSQErrorReport> errorReport,
- @PathParam("taskId") final String taskId,
@Context final HttpServletRequest req
)
{
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerClient.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerClient.java
index 2c249e2639..3a6c9e7879 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerClient.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerClient.java
@@ -122,12 +122,9 @@ public class IndexerControllerClient implements
ControllerClient
}
@Override
- public void postWorkerWarning(String workerId, List<MSQErrorReport>
MSQErrorReports) throws IOException
+ public void postWorkerWarning(List<MSQErrorReport> MSQErrorReports) throws
IOException
{
- final String path = StringUtils.format(
- "/workerWarning/%s",
- StringUtils.urlEncode(workerId)
- );
+ final String path = "/workerWarning";
doRequest(
new RequestBuilder(HttpMethod.POST, path)
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/MSQWarningReportSimplePublisher.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/MSQWarningReportSimplePublisher.java
index 3c07a163aa..1353f40404 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/MSQWarningReportSimplePublisher.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/MSQWarningReportSimplePublisher.java
@@ -57,7 +57,7 @@ public class MSQWarningReportSimplePublisher implements
MSQWarningReportPublishe
final MSQErrorReport warningReport = MSQErrorReport.fromException(taskId,
host, stageNumber, e);
try {
- controllerClient.postWorkerWarning(workerId,
ImmutableList.of(warningReport));
+ controllerClient.postWorkerWarning(ImmutableList.of(warningReport));
}
catch (IOException e2) {
throw new RuntimeException(e2);
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/shuffle/DurableStorageInputChannelFactory.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/shuffle/DurableStorageInputChannelFactory.java
index f7beba6c25..0d32a2d6bb 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/shuffle/DurableStorageInputChannelFactory.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/shuffle/DurableStorageInputChannelFactory.java
@@ -20,6 +20,7 @@
package org.apache.druid.msq.shuffle;
import com.google.common.base.Preconditions;
+import org.apache.commons.io.IOUtils;
import org.apache.druid.frame.channel.ReadableFrameChannel;
import org.apache.druid.frame.channel.ReadableInputStreamFrameChannel;
import org.apache.druid.java.util.common.IOE;
@@ -27,36 +28,36 @@ import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.RetryUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.msq.indexing.InputChannelFactory;
import org.apache.druid.msq.kernel.StageId;
import org.apache.druid.storage.StorageConnector;
import java.io.IOException;
import java.io.InputStream;
-import java.util.List;
+import java.nio.charset.StandardCharsets;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.function.Supplier;
/**
* Provides input channels connected to durable storage.
*/
public class DurableStorageInputChannelFactory implements InputChannelFactory
{
+
+ private static final Logger LOG = new
Logger(DurableStorageInputChannelFactory.class);
+
private final StorageConnector storageConnector;
private final ExecutorService remoteInputStreamPool;
private final String controllerTaskId;
- private final Supplier<List<String>> taskList;
public DurableStorageInputChannelFactory(
final String controllerTaskId,
- final Supplier<List<String>> taskList,
final StorageConnector storageConnector,
final ExecutorService remoteInputStreamPool
)
{
this.controllerTaskId = Preconditions.checkNotNull(controllerTaskId,
"controllerTaskId");
- this.taskList = Preconditions.checkNotNull(taskList, "taskList");
this.storageConnector = Preconditions.checkNotNull(storageConnector,
"storageConnector");
this.remoteInputStreamPool =
Preconditions.checkNotNull(remoteInputStreamPool, "remoteInputStreamPool");
}
@@ -67,7 +68,6 @@ public class DurableStorageInputChannelFactory implements
InputChannelFactory
*/
public static DurableStorageInputChannelFactory createStandardImplementation(
final String controllerTaskId,
- final Supplier<List<String>> taskList,
final StorageConnector storageConnector,
final Closer closer
)
@@ -75,28 +75,35 @@ public class DurableStorageInputChannelFactory implements
InputChannelFactory
final ExecutorService remoteInputStreamPool =
Executors.newCachedThreadPool(Execs.makeThreadFactory(controllerTaskId
+ "-remote-fetcher-%d"));
closer.register(remoteInputStreamPool::shutdownNow);
- return new DurableStorageInputChannelFactory(controllerTaskId, taskList,
storageConnector, remoteInputStreamPool);
+ return new DurableStorageInputChannelFactory(controllerTaskId,
storageConnector, remoteInputStreamPool);
}
@Override
public ReadableFrameChannel openChannel(StageId stageId, int workerNumber,
int partitionNumber) throws IOException
{
- final String workerTaskId = taskList.get().get(workerNumber);
try {
- final String remotePartitionPath =
DurableStorageOutputChannelFactory.getPartitionFileName(
+ final String remotePartitionPath = findSuccessfulPartitionOutput(
controllerTaskId,
- workerTaskId,
+ workerNumber,
stageId.getStageNumber(),
partitionNumber
);
+ LOG.debug(
+ "Reading output of stage [%d], partition [%d] for worker [%d] from
the file at path [%s]",
+ stageId.getStageNumber(),
+ partitionNumber,
+ workerNumber,
+ remotePartitionPath
+ );
RetryUtils.retry(() -> {
if (!storageConnector.pathExists(remotePartitionPath)) {
throw new ISE(
- "Could not find remote output of worker task[%s] stage[%d]
partition[%d]",
- workerTaskId,
+ "Could not find remote outputs of stage [%d] partition [%d] for
worker [%d] at the path [%s]",
stageId.getStageNumber(),
- partitionNumber
+ partitionNumber,
+ workerNumber,
+ remotePartitionPath
);
}
return Boolean.TRUE;
@@ -112,11 +119,64 @@ public class DurableStorageInputChannelFactory implements
InputChannelFactory
catch (Exception e) {
throw new IOE(
e,
- "Could not find remote output of worker task[%s] stage[%d]
partition[%d]",
- workerTaskId,
+ "Encountered error while reading the output of stage [%d], partition
[%d] for worker [%d]",
stageId.getStageNumber(),
- partitionNumber
+ partitionNumber,
+ workerNumber
);
}
}
+
+ /**
+ * Given an input worker number, stage number and the partition number, this
method figures out the exact location
+ * where the outputs would be present in the durable storage and returns the
complete path or throws an exception
+ * if no such file exists in the durable storage
+ * More information at {@link
DurableStorageOutputChannelFactory#createSuccessFile(String)}
+ */
+ public String findSuccessfulPartitionOutput(
+ final String controllerTaskId,
+ final int workerNo,
+ final int stageNumber,
+ final int partitionNumber
+ ) throws IOException
+ {
+ String successfulFilePath = DurableStorageUtils.getSuccessFilePath(
+ controllerTaskId,
+ stageNumber,
+ workerNo
+ );
+
+ if (!storageConnector.pathExists(successfulFilePath)) {
+ throw new ISE(
+ "No file present at the location [%s]. Unable to read the outputs of
stage [%d], partition [%d] for the worker [%d]",
+ successfulFilePath,
+ stageNumber,
+ partitionNumber,
+ workerNo
+ );
+ }
+
+ String successfulTaskId;
+
+ try (InputStream is = storageConnector.read(successfulFilePath)) {
+ successfulTaskId = IOUtils.toString(is, StandardCharsets.UTF_8);
+ }
+ if (successfulTaskId == null) {
+ throw new ISE("Unable to read the task id from the file: [%s]",
successfulFilePath);
+ }
+ LOG.debug(
+ "Reading output of stage [%d], partition [%d] from task id [%s]",
+ stageNumber,
+ partitionNumber,
+ successfulTaskId
+ );
+
+ return DurableStorageUtils.getPartitionOutputsFileNameForPartition(
+ controllerTaskId,
+ stageNumber,
+ workerNo,
+ successfulTaskId,
+ partitionNumber
+ );
+ }
}
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/shuffle/DurableStorageOutputChannelFactory.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/shuffle/DurableStorageOutputChannelFactory.java
index 1f4361a939..3bc0f72c89 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/shuffle/DurableStorageOutputChannelFactory.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/shuffle/DurableStorageOutputChannelFactory.java
@@ -20,7 +20,6 @@
package org.apache.druid.msq.shuffle;
import com.google.common.base.Preconditions;
-import org.apache.druid.common.utils.IdUtils;
import org.apache.druid.frame.allocation.ArenaMemoryAllocator;
import org.apache.druid.frame.channel.ReadableNilFrameChannel;
import org.apache.druid.frame.channel.WritableFrameFileChannel;
@@ -28,31 +27,39 @@ import org.apache.druid.frame.file.FrameFileWriter;
import org.apache.druid.frame.processor.OutputChannel;
import org.apache.druid.frame.processor.OutputChannelFactory;
import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.storage.StorageConnector;
import java.io.IOException;
+import java.io.OutputStreamWriter;
import java.nio.channels.Channels;
+import java.nio.charset.StandardCharsets;
public class DurableStorageOutputChannelFactory implements OutputChannelFactory
{
+
+ private static final Logger LOG = new
Logger(DurableStorageOutputChannelFactory.class);
+
private final String controllerTaskId;
- private final String workerTaskId;
+ private final int workerNumber;
private final int stageNumber;
+ private final String taskId;
private final int frameSize;
private final StorageConnector storageConnector;
public DurableStorageOutputChannelFactory(
final String controllerTaskId,
- final String workerTaskId,
+ final int workerNumber,
final int stageNumber,
+ final String taskId,
final int frameSize,
final StorageConnector storageConnector
)
{
this.controllerTaskId = Preconditions.checkNotNull(controllerTaskId,
"controllerTaskId");
- this.workerTaskId = Preconditions.checkNotNull(workerTaskId,
"workerTaskId");
+ this.workerNumber = workerNumber;
this.stageNumber = stageNumber;
+ this.taskId = taskId;
this.frameSize = frameSize;
this.storageConnector = Preconditions.checkNotNull(storageConnector,
"storageConnector");
}
@@ -63,16 +70,18 @@ public class DurableStorageOutputChannelFactory implements
OutputChannelFactory
*/
public static DurableStorageOutputChannelFactory
createStandardImplementation(
final String controllerTaskId,
- final String workerTaskId,
+ final int workerNumber,
final int stageNumber,
+ final String taskId,
final int frameSize,
final StorageConnector storageConnector
)
{
return new DurableStorageOutputChannelFactory(
controllerTaskId,
- workerTaskId,
+ workerNumber,
stageNumber,
+ taskId,
frameSize,
storageConnector
);
@@ -81,7 +90,13 @@ public class DurableStorageOutputChannelFactory implements
OutputChannelFactory
@Override
public OutputChannel openChannel(int partitionNumber) throws IOException
{
- final String fileName = getPartitionFileName(controllerTaskId,
workerTaskId, stageNumber, partitionNumber);
+ final String fileName =
DurableStorageUtils.getPartitionOutputsFileNameForPartition(
+ controllerTaskId,
+ stageNumber,
+ workerNumber,
+ taskId,
+ partitionNumber
+ );
final WritableFrameFileChannel writableChannel =
new WritableFrameFileChannel(
FrameFileWriter.open(
@@ -101,7 +116,13 @@ public class DurableStorageOutputChannelFactory implements
OutputChannelFactory
@Override
public OutputChannel openNilChannel(int partitionNumber)
{
- final String fileName = getPartitionFileName(controllerTaskId,
workerTaskId, stageNumber, partitionNumber);
+ final String fileName =
DurableStorageUtils.getPartitionOutputsFileNameForPartition(
+ controllerTaskId,
+ stageNumber,
+ workerNumber,
+ taskId,
+ partitionNumber
+ );
// As tasks dependent on output of this partition will forever block if no
file is present in RemoteStorage. Hence, writing a dummy frame.
try {
@@ -111,32 +132,30 @@ public class DurableStorageOutputChannelFactory
implements OutputChannelFactory
catch (IOException e) {
throw new ISE(
e,
- "Unable to create empty remote output of workerTask[%s] stage[%d]
partition[%d]",
- workerTaskId,
+ "Unable to create empty remote output of stage [%d], partition [%d]
for worker [%d]",
stageNumber,
- partitionNumber
+ partitionNumber,
+ workerNumber
);
}
}
- public static String getControllerDirectory(final String controllerTaskId)
- {
- return StringUtils.format("controller_%s", IdUtils.validateId("controller
task ID", controllerTaskId));
- }
-
- public static String getPartitionFileName(
- final String controllerTaskId,
- final String workerTaskId,
- final int stageNumber,
- final int partitionNumber
- )
+ /**
+ * Creates a file with name __success and adds the worker's id which has
successfully written its outputs. While reading
+ * this file can be used to find out the worker which has written its
outputs completely.
+ * Rename operation is not very quick in cloud storage like S3 due to which
this alternative
+ * route has been taken.
+ * If the success file is already present in the location, then this method
is a noop
+ */
+ public void createSuccessFile(String taskId) throws IOException
{
- return StringUtils.format(
- "%s/worker_%s/stage_%d/part_%d",
- getControllerDirectory(controllerTaskId),
- IdUtils.validateId("worker task ID", workerTaskId),
- stageNumber,
- partitionNumber
- );
+ String fileName = DurableStorageUtils.getSuccessFilePath(controllerTaskId,
stageNumber, workerNumber);
+ if (storageConnector.pathExists(fileName)) {
+ LOG.warn("Path [%s] already exists. Won't attempt to rewrite on top of
it.", fileName);
+ return;
+ }
+ OutputStreamWriter os = new
OutputStreamWriter(storageConnector.write(fileName), StandardCharsets.UTF_8);
+ os.write(taskId); // Add some dummy content in the file
+ os.close();
}
}
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/shuffle/DurableStorageUtils.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/shuffle/DurableStorageUtils.java
new file mode 100644
index 0000000000..df3b86a5c7
--- /dev/null
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/shuffle/DurableStorageUtils.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.shuffle;
+
+import org.apache.druid.common.utils.IdUtils;
+import org.apache.druid.java.util.common.StringUtils;
+
+/**
+ * Helper class that fetches the directory and file names corresponding to
file location
+ */
+public class DurableStorageUtils
+{
+ public static final String SUCCESS_MARKER_FILENAME = "__success";
+
+ public static String getControllerDirectory(final String controllerTaskId)
+ {
+ return StringUtils.format("controller_%s", IdUtils.validateId("controller
task ID", controllerTaskId));
+ }
+
+ public static String getSuccessFilePath(
+ final String controllerTaskId,
+ final int stageNumber,
+ final int workerNumber
+ )
+ {
+ String folderName = getWorkerOutputFolderName(
+ controllerTaskId,
+ stageNumber,
+ workerNumber
+ );
+ String fileName = StringUtils.format("%s/%s", folderName,
SUCCESS_MARKER_FILENAME);
+ return fileName;
+ }
+
+ /**
+ * Fetches the directory location where workers will store the partition
files corresponding to the stage number
+ */
+ public static String getWorkerOutputFolderName(
+ final String controllerTaskId,
+ final int stageNumber,
+ final int workerNumber
+ )
+ {
+ return StringUtils.format(
+ "%s/stage_%d/worker_%d",
+ getControllerDirectory(controllerTaskId),
+ stageNumber,
+ workerNumber
+ );
+ }
+
+ /**
+ * Fetches the directory location where a particular worker will store the
partition files corresponding to the
+ * stage number, and it's task id
+ */
+ public static String getTaskIdOutputsFolderName(
+ final String controllerTaskId,
+ final int stageNumber,
+ final int workerNumber,
+ final String taskId
+ )
+ {
+ return StringUtils.format(
+ "%s/taskId_%s",
+ getWorkerOutputFolderName(controllerTaskId, stageNumber, workerNumber),
+ taskId
+ );
+ }
+
+ /**
+ * Fetches the file location where a particular worker writes the data
corresponding to a particular stage
+ * and partition
+ */
+ public static String getPartitionOutputsFileNameForPartition(
+ final String controllerTaskId,
+ final int stageNumber,
+ final int workerNumber,
+ final String taskId,
+ final int partitionNumber
+ )
+ {
+ return StringUtils.format(
+ "%s/part_%d",
+ getTaskIdOutputsFolderName(controllerTaskId, stageNumber,
workerNumber, taskId),
+ partitionNumber
+ );
+ }
+}
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java
index de7038700b..c83481757d 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java
@@ -31,6 +31,7 @@ import org.apache.druid.msq.indexing.ColumnMapping;
import org.apache.druid.msq.indexing.ColumnMappings;
import org.apache.druid.msq.indexing.MSQSpec;
import org.apache.druid.msq.indexing.MSQTuningConfig;
+import org.apache.druid.msq.shuffle.DurableStorageUtils;
import org.apache.druid.msq.test.MSQTestBase;
import org.apache.druid.query.InlineDataSource;
import org.apache.druid.query.QueryDataSource;
@@ -62,6 +63,8 @@ import org.apache.druid.sql.calcite.util.CalciteTests;
import org.hamcrest.CoreMatchers;
import org.junit.Test;
import org.junit.internal.matchers.ThrowableMessageMatcher;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mockito;
import javax.annotation.Nonnull;
import java.io.File;
@@ -1203,6 +1206,54 @@ public class MSQSelectTest extends MSQTestBase
.verifyResults();
}
+ @Test
+ public void testGroupByOnFooWithDurableStoragePathAssertions() throws
IOException
+ {
+ RowSignature rowSignature = RowSignature.builder()
+ .add("cnt", ColumnType.LONG)
+ .add("cnt1", ColumnType.LONG)
+ .build();
+
+ testSelectQuery()
+ .setSql("select cnt,count(*) as cnt1 from foo group by cnt")
+ .setExpectedMSQSpec(MSQSpec.builder()
+ .query(GroupByQuery.builder()
+
.setDataSource(CalciteTests.DATASOURCE1)
+
.setInterval(querySegmentSpec(Filtration
+
.eternity()))
+
.setGranularity(Granularities.ALL)
+
.setDimensions(dimensions(
+ new
DefaultDimensionSpec(
+ "cnt",
+ "d0",
+ ColumnType.LONG
+ )
+ ))
+
.setAggregatorSpecs(aggregators(new CountAggregatorFactory(
+ "a0")))
+
.setContext(DEFAULT_MSQ_CONTEXT)
+ .build())
+ .columnMappings(
+ new ColumnMappings(ImmutableList.of(
+ new ColumnMapping("d0", "cnt"),
+ new ColumnMapping("a0", "cnt1")
+ )
+ ))
+
.tuningConfig(MSQTuningConfig.defaultConfig())
+ .build())
+ .setExpectedRowSignature(rowSignature)
+ .setExpectedResultRows(ImmutableList.of(new Object[]{1L, 6L}))
+ .verifyResults();
+ File successFile = new File(
+ localFileStorageDir,
+ DurableStorageUtils.getSuccessFilePath("query-test-query", 0, 0)
+ );
+
+ Mockito.verify(localFileStorageConnector, Mockito.times(2))
+ .write(ArgumentMatchers.endsWith("__success"));
+ }
+
+
@Nonnull
private List<Object[]> expectedMultiValueFooRowsGroup()
{
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/report/MSQTaskReportTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/report/MSQTaskReportTest.java
index 4902675509..a0eeec58fb 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/report/MSQTaskReportTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/report/MSQTaskReportTest.java
@@ -43,7 +43,6 @@ import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.junit.Assert;
-import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
@@ -175,7 +174,6 @@ public class MSQTaskReportTest
}
@Test
- @Ignore("requires https://github.com/apache/druid/pull/12938")
public void testWriteTaskReport() throws Exception
{
final MSQTaskReport report = new MSQTaskReport(
@@ -204,7 +202,9 @@ public class MSQTaskReportTest
final Map<String, TaskReport> reportMap = mapper.readValue(
reportFile,
- new TypeReference<Map<String, TaskReport>>() {}
+ new TypeReference<Map<String, TaskReport>>()
+ {
+ }
);
final MSQTaskReport report2 = (MSQTaskReport)
reportMap.get(MSQTaskReport.REPORT_KEY);
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
index 7aebaa476d..b33df1e3f2 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
@@ -140,7 +140,7 @@ import org.apache.druid.sql.calcite.util.SqlTestFramework;
import org.apache.druid.sql.calcite.view.InProcessViewManager;
import org.apache.druid.storage.StorageConnector;
import org.apache.druid.storage.StorageConnectorProvider;
-import org.apache.druid.storage.local.LocalFileStorageConnectorProvider;
+import org.apache.druid.storage.local.LocalFileStorageConnector;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.PruneLoadSpec;
import org.apache.druid.timeline.SegmentId;
@@ -218,6 +218,7 @@ public class MSQTestBase extends BaseCalciteQueryTest
public final boolean useDefault = NullHandling.replaceWithDefault();
protected File localFileStorageDir;
+ protected LocalFileStorageConnector localFileStorageConnector;
private static final Logger log = new Logger(MSQTestBase.class);
private ObjectMapper objectMapper;
private MSQTestOverlordServiceClient indexingServiceClient;
@@ -327,8 +328,11 @@ public class MSQTestBase extends BaseCalciteQueryTest
MultiStageQuery.class
);
localFileStorageDir = tmpFolder.newFolder("fault");
+ localFileStorageConnector = Mockito.spy(
+ new LocalFileStorageConnector(localFileStorageDir)
+ );
binder.bind(Key.get(StorageConnector.class, MultiStageQuery.class))
- .toProvider(new
LocalFileStorageConnectorProvider(localFileStorageDir));
+ .toProvider(() -> localFileStorageConnector);
}
catch (IOException e) {
throw new ISE(e, "Unable to create setup storage connector");
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerClient.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerClient.java
index 0e00a8e3ed..5b088b71d5 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerClient.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerClient.java
@@ -75,7 +75,7 @@ public class MSQTestControllerClient implements
ControllerClient
}
@Override
- public void postWorkerWarning(String workerId, List<MSQErrorReport>
MSQErrorReports)
+ public void postWorkerWarning(List<MSQErrorReport> MSQErrorReports)
{
controller.workerWarning(MSQErrorReports);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]