This is an automated email from the ASF dual-hosted git repository.

karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 7cb21cb968 Use worker number instead of task id in MSQ for 
communication to/from workers. (#13062)
7cb21cb968 is described below

commit 7cb21cb9685b6641c52e2a51e9707fcbcb7f6903
Author: Laksh Singla <[email protected]>
AuthorDate: Thu Nov 3 10:25:45 2022 +0530

    Use worker number instead of task id in MSQ for communication to/from 
workers. (#13062)
    
    * Conversion from taskId to workerNumber in the workerClient
    
    * storage connector changes, suffix file when finish writing to it
    
    * Fix tests
    
    * Trigger Build
    
    * convert IntFunction to a dedicated interface
    
    * first review round
    
    * use a dummy file to indicate success
    
    * fetch the first filename from the list in case of multiple files
    
    * tests working, fix semantic issue with ls
    
    * change how the success flag works
    
    * comments, checkstyle, method rename
    
    * fix test
    
    * forbiddenapis fix
    
    * Trigger Build
    
    * change the writer
    
    * dead store fix
    
    * Review comments
    
    * revert changes
    
    * review
    
    * review comments
    
    * Update 
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/shuffle/DurableStorageInputChannelFactory.java
    
    Co-authored-by: Karan Kumar <[email protected]>
    
    * Update 
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/shuffle/DurableStorageInputChannelFactory.java
    
    Co-authored-by: Karan Kumar <[email protected]>
    
    * update error messages
    
    * better error messages
    
    * fix checkstyle
    
    Co-authored-by: Karan Kumar <[email protected]>
---
 .../storage/local/LocalFileStorageConnector.java   |   6 +-
 .../apache/druid/msq/exec/ControllerClient.java    |   1 -
 .../apache/druid/msq/exec/ControllerContext.java   |   1 -
 .../org/apache/druid/msq/exec/ControllerImpl.java  |   5 +-
 .../org/apache/druid/msq/exec/WorkerClient.java    |   8 +-
 .../java/org/apache/druid/msq/exec/WorkerImpl.java |  54 +++++++++--
 .../druid/msq/indexing/ControllerChatHandler.java  |   3 +-
 .../msq/indexing/IndexerControllerClient.java      |   7 +-
 .../error/MSQWarningReportSimplePublisher.java     |   2 +-
 .../shuffle/DurableStorageInputChannelFactory.java |  92 ++++++++++++++----
 .../DurableStorageOutputChannelFactory.java        |  79 ++++++++++------
 .../druid/msq/shuffle/DurableStorageUtils.java     | 105 +++++++++++++++++++++
 .../org/apache/druid/msq/exec/MSQSelectTest.java   |  51 ++++++++++
 .../msq/indexing/report/MSQTaskReportTest.java     |   6 +-
 .../org/apache/druid/msq/test/MSQTestBase.java     |   8 +-
 .../druid/msq/test/MSQTestControllerClient.java    |   2 +-
 16 files changed, 349 insertions(+), 81 deletions(-)

diff --git 
a/core/src/main/java/org/apache/druid/storage/local/LocalFileStorageConnector.java
 
b/core/src/main/java/org/apache/druid/storage/local/LocalFileStorageConnector.java
index 8699568b6d..de70f9a941 100644
--- 
a/core/src/main/java/org/apache/druid/storage/local/LocalFileStorageConnector.java
+++ 
b/core/src/main/java/org/apache/druid/storage/local/LocalFileStorageConnector.java
@@ -31,7 +31,7 @@ import java.io.OutputStream;
 import java.nio.file.Files;
 
 /**
- * Implementation that uses local filesystem. All paths are appended with the 
base path, in such a way that its not visible
+ * Implementation that uses local filesystem. All paths are appended with the 
base path, in such a way that it is not visible
  * to the users of this class.
  */
 public class LocalFileStorageConnector implements StorageConnector
@@ -54,10 +54,6 @@ public class LocalFileStorageConnector implements 
StorageConnector
   /**
    * Reads the file present as basePath + path. Will throw an IO exception in 
case the file is not present.
    * Closing of the stream is the responsibility of the caller.
-   *
-   * @param path
-   * @return
-   * @throws IOException
    */
   @Override
   public InputStream read(String path) throws IOException
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerClient.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerClient.java
index f621133586..faf1c3ff5e 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerClient.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerClient.java
@@ -71,7 +71,6 @@ public interface ControllerClient extends AutoCloseable
    * Client side method to inform the controller about the warnings generated 
by the given worker.
    */
   void postWorkerWarning(
-      String workerId,
       List<MSQErrorReport> MSQErrorReports
   ) throws IOException;
   List<String> getTaskList() throws IOException;
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java
index f77290c092..e09ac9ebd6 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java
@@ -74,7 +74,6 @@ public interface ControllerContext
    * Client for communicating with workers.
    */
   WorkerClient taskClientFor(Controller controller);
-
   /**
    * Writes controller task report.
    */
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
index a2f9328000..8eb348bf50 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
@@ -146,7 +146,7 @@ import org.apache.druid.msq.querykit.ShuffleSpecFactory;
 import org.apache.druid.msq.querykit.groupby.GroupByQueryKit;
 import org.apache.druid.msq.querykit.scan.ScanQueryKit;
 import org.apache.druid.msq.shuffle.DurableStorageInputChannelFactory;
-import org.apache.druid.msq.shuffle.DurableStorageOutputChannelFactory;
+import org.apache.druid.msq.shuffle.DurableStorageUtils;
 import org.apache.druid.msq.shuffle.WorkerInputChannelFactory;
 import org.apache.druid.msq.statistics.ClusterByStatisticsSnapshot;
 import org.apache.druid.msq.util.DimensionSchemaUtils;
@@ -1202,7 +1202,6 @@ public class ControllerImpl implements Controller
       if 
(MultiStageQueryContext.isDurableStorageEnabled(task.getQuerySpec().getQuery().context()))
 {
         inputChannelFactory = 
DurableStorageInputChannelFactory.createStandardImplementation(
             id(),
-            () -> taskIds,
             MSQTasks.makeStorageConnector(context.injector()),
             closer
         );
@@ -1303,7 +1302,7 @@ public class ControllerImpl implements Controller
   private void cleanUpDurableStorageIfNeeded()
   {
     if 
(MultiStageQueryContext.isDurableStorageEnabled(task.getQuerySpec().getQuery().context()))
 {
-      final String controllerDirName = 
DurableStorageOutputChannelFactory.getControllerDirectory(task.getId());
+      final String controllerDirName = 
DurableStorageUtils.getControllerDirectory(task.getId());
       try {
         // Delete all temporary files as a failsafe
         
MSQTasks.makeStorageConnector(context.injector()).deleteRecursively(controllerDirName);
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerClient.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerClient.java
index 72a0a81604..8f4c7bac23 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerClient.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerClient.java
@@ -36,7 +36,7 @@ public interface WorkerClient extends AutoCloseable
   /**
    * Worker's client method to add a {@link WorkOrder} to the worker to work on
    */
-  ListenableFuture<Void> postWorkOrder(String workerId, WorkOrder workOrder);
+  ListenableFuture<Void> postWorkOrder(String workerTaskId, WorkOrder 
workOrder);
 
   /**
    * Worker's client method to inform it of the partition boundaries for the 
given stage. This is usually invoked by the
@@ -50,13 +50,15 @@ public interface WorkerClient extends AutoCloseable
 
   /**
    * Worker's client method to inform that the work has been done, and it can 
initiate cleanup and shutdown
+   * @param workerTaskId
    */
-  ListenableFuture<Void> postFinish(String workerId);
+  ListenableFuture<Void> postFinish(String workerTaskId);
 
   /**
    * Fetches all the counters gathered by that worker
+   * @param workerTaskId
    */
-  ListenableFuture<CounterSnapshotsTree> getCounters(String workerId);
+  ListenableFuture<CounterSnapshotsTree> getCounters(String workerTaskId);
 
   /**
    * Worker's client method that informs it that the results and resources for 
the given stage are no longer required
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java
index d930d825f9..ad1cdfa04c 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java
@@ -102,6 +102,7 @@ import org.apache.druid.msq.kernel.worker.WorkerStagePhase;
 import org.apache.druid.msq.querykit.DataSegmentProvider;
 import org.apache.druid.msq.shuffle.DurableStorageInputChannelFactory;
 import org.apache.druid.msq.shuffle.DurableStorageOutputChannelFactory;
+import org.apache.druid.msq.shuffle.DurableStorageUtils;
 import org.apache.druid.msq.shuffle.WorkerInputChannelFactory;
 import org.apache.druid.msq.statistics.ClusterByStatisticsCollector;
 import org.apache.druid.msq.statistics.ClusterByStatisticsSnapshot;
@@ -210,7 +211,12 @@ public class WorkerImpl implements Worker
       }
       catch (Throwable e) {
         maybeErrorReport = Optional.of(
-            MSQErrorReport.fromException(id(), 
MSQTasks.getHostFromSelfNode(selfDruidNode), null, e)
+            MSQErrorReport.fromException(
+                id(),
+                MSQTasks.getHostFromSelfNode(selfDruidNode),
+                null,
+                e
+            )
         );
       }
 
@@ -241,6 +247,7 @@ public class WorkerImpl implements Worker
     this.controllerClient = 
context.makeControllerClient(task.getControllerTaskId());
     closer.register(controllerClient::close);
     context.registerWorker(this, closer); // Uses controllerClient, so must be 
called after that is initialized
+
     this.workerClient = new 
ExceptionWrappingWorkerClient(context.makeWorkerClient());
     closer.register(workerClient::close);
 
@@ -583,7 +590,6 @@ public class WorkerImpl implements Worker
     if (durableStageStorageEnabled) {
       return DurableStorageInputChannelFactory.createStandardImplementation(
           task.getControllerTaskId(),
-          workerTaskList,
           MSQTasks.makeStorageConnector(context.injector()),
           closer
       );
@@ -601,8 +607,9 @@ public class WorkerImpl implements Worker
     if (durableStageStorageEnabled) {
       return DurableStorageOutputChannelFactory.createStandardImplementation(
           task.getControllerTaskId(),
-          id(),
+          task().getWorkerNumber(),
           stageNumber,
+          task().getId(),
           frameSize,
           MSQTasks.makeStorageConnector(context.injector())
       );
@@ -709,18 +716,18 @@ public class WorkerImpl implements Worker
       // Therefore, the logic for cleaning the stage output in case of a 
worker/machine crash has to be external.
       // We currently take care of this in the controller.
       if (durableStageStorageEnabled) {
-        final String fileName = 
DurableStorageOutputChannelFactory.getPartitionFileName(
+        final String folderName = 
DurableStorageUtils.getTaskIdOutputsFolderName(
             task.getControllerTaskId(),
-            task.getId(),
             stageId.getStageNumber(),
-            partition
+            task.getWorkerNumber(),
+            task.getId()
         );
         try {
-          
MSQTasks.makeStorageConnector(context.injector()).deleteFile(fileName);
+          
MSQTasks.makeStorageConnector(context.injector()).deleteRecursively(folderName);
         }
         catch (Exception e) {
           // If an error is thrown while cleaning up a file, log it and try to 
continue with the cleanup
-          log.warn(e, "Error while cleaning up temporary files at path " + 
fileName);
+          log.warn(e, "Error while cleaning up folder at path " + folderName);
         }
       }
     }
@@ -888,7 +895,38 @@ public class WorkerImpl implements Worker
             for (OutputChannel channel : outputChannels.getAllChannels()) {
               stageOutputs.computeIfAbsent(stageDef.getId(), ignored1 -> new 
ConcurrentHashMap<>())
                           .computeIfAbsent(channel.getPartitionNumber(), 
ignored2 -> channel.getReadableChannel());
+
+            }
+
+            if (durableStageStorageEnabled) {
+              // Once the outputs channels have been resolved and are ready 
for reading, the worker appends the filename
+              // with a special marker flag and adds it to the
+              DurableStorageOutputChannelFactory 
durableStorageOutputChannelFactory =
+                  
DurableStorageOutputChannelFactory.createStandardImplementation(
+                      task.getControllerTaskId(),
+                      task().getWorkerNumber(),
+                      stageDef.getStageNumber(),
+                      task().getId(),
+                      frameContext.memoryParameters().getStandardFrameSize(),
+                      MSQTasks.makeStorageConnector(context.injector())
+                  );
+              try {
+                
durableStorageOutputChannelFactory.createSuccessFile(task.getId());
+              }
+              catch (IOException e) {
+                throw new ISE(
+                    e,
+                    "Unable to create the success file [%s] at the location 
[%s]",
+                    DurableStorageUtils.SUCCESS_MARKER_FILENAME,
+                    DurableStorageUtils.getSuccessFilePath(
+                        task.getControllerTaskId(),
+                        stageDef.getStageNumber(),
+                        task().getWorkerNumber()
+                    )
+                );
+              }
             }
+
             kernelManipulationQueue.add(holder -> holder.getStageKernelMap()
                                                         .get(stageDef.getId())
                                                         
.setResultsComplete(resultObject));
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/ControllerChatHandler.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/ControllerChatHandler.java
index 5561a98a07..1857d83708 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/ControllerChatHandler.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/ControllerChatHandler.java
@@ -106,12 +106,11 @@ public class ControllerChatHandler implements ChatHandler
    * See {@link ControllerClient#postWorkerWarning} for the client-side code 
that calls this API.
    */
   @POST
-  @Path("/workerWarning/{taskId}")
+  @Path("/workerWarning")
   @Produces(MediaType.APPLICATION_JSON)
   @Consumes(MediaType.APPLICATION_JSON)
   public Response httpPostWorkerWarning(
       final List<MSQErrorReport> errorReport,
-      @PathParam("taskId") final String taskId,
       @Context final HttpServletRequest req
   )
   {
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerClient.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerClient.java
index 2c249e2639..3a6c9e7879 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerClient.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerClient.java
@@ -122,12 +122,9 @@ public class IndexerControllerClient implements 
ControllerClient
   }
 
   @Override
-  public void postWorkerWarning(String workerId, List<MSQErrorReport> 
MSQErrorReports) throws IOException
+  public void postWorkerWarning(List<MSQErrorReport> MSQErrorReports) throws 
IOException
   {
-    final String path = StringUtils.format(
-        "/workerWarning/%s",
-        StringUtils.urlEncode(workerId)
-    );
+    final String path = "/workerWarning";
 
     doRequest(
         new RequestBuilder(HttpMethod.POST, path)
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/MSQWarningReportSimplePublisher.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/MSQWarningReportSimplePublisher.java
index 3c07a163aa..1353f40404 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/MSQWarningReportSimplePublisher.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/MSQWarningReportSimplePublisher.java
@@ -57,7 +57,7 @@ public class MSQWarningReportSimplePublisher implements 
MSQWarningReportPublishe
     final MSQErrorReport warningReport = MSQErrorReport.fromException(taskId, 
host, stageNumber, e);
 
     try {
-      controllerClient.postWorkerWarning(workerId, 
ImmutableList.of(warningReport));
+      controllerClient.postWorkerWarning(ImmutableList.of(warningReport));
     }
     catch (IOException e2) {
       throw new RuntimeException(e2);
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/shuffle/DurableStorageInputChannelFactory.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/shuffle/DurableStorageInputChannelFactory.java
index f7beba6c25..0d32a2d6bb 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/shuffle/DurableStorageInputChannelFactory.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/shuffle/DurableStorageInputChannelFactory.java
@@ -20,6 +20,7 @@
 package org.apache.druid.msq.shuffle;
 
 import com.google.common.base.Preconditions;
+import org.apache.commons.io.IOUtils;
 import org.apache.druid.frame.channel.ReadableFrameChannel;
 import org.apache.druid.frame.channel.ReadableInputStreamFrameChannel;
 import org.apache.druid.java.util.common.IOE;
@@ -27,36 +28,36 @@ import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.RetryUtils;
 import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.msq.indexing.InputChannelFactory;
 import org.apache.druid.msq.kernel.StageId;
 import org.apache.druid.storage.StorageConnector;
 
 import java.io.IOException;
 import java.io.InputStream;
-import java.util.List;
+import java.nio.charset.StandardCharsets;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.function.Supplier;
 
 /**
  * Provides input channels connected to durable storage.
  */
 public class DurableStorageInputChannelFactory implements InputChannelFactory
 {
+
+  private static final Logger LOG = new 
Logger(DurableStorageInputChannelFactory.class);
+
   private final StorageConnector storageConnector;
   private final ExecutorService remoteInputStreamPool;
   private final String controllerTaskId;
-  private final Supplier<List<String>> taskList;
 
   public DurableStorageInputChannelFactory(
       final String controllerTaskId,
-      final Supplier<List<String>> taskList,
       final StorageConnector storageConnector,
       final ExecutorService remoteInputStreamPool
   )
   {
     this.controllerTaskId = Preconditions.checkNotNull(controllerTaskId, 
"controllerTaskId");
-    this.taskList = Preconditions.checkNotNull(taskList, "taskList");
     this.storageConnector = Preconditions.checkNotNull(storageConnector, 
"storageConnector");
     this.remoteInputStreamPool = 
Preconditions.checkNotNull(remoteInputStreamPool, "remoteInputStreamPool");
   }
@@ -67,7 +68,6 @@ public class DurableStorageInputChannelFactory implements 
InputChannelFactory
    */
   public static DurableStorageInputChannelFactory createStandardImplementation(
       final String controllerTaskId,
-      final Supplier<List<String>> taskList,
       final StorageConnector storageConnector,
       final Closer closer
   )
@@ -75,28 +75,35 @@ public class DurableStorageInputChannelFactory implements 
InputChannelFactory
     final ExecutorService remoteInputStreamPool =
         Executors.newCachedThreadPool(Execs.makeThreadFactory(controllerTaskId 
+ "-remote-fetcher-%d"));
     closer.register(remoteInputStreamPool::shutdownNow);
-    return new DurableStorageInputChannelFactory(controllerTaskId, taskList, 
storageConnector, remoteInputStreamPool);
+    return new DurableStorageInputChannelFactory(controllerTaskId, 
storageConnector, remoteInputStreamPool);
   }
 
   @Override
   public ReadableFrameChannel openChannel(StageId stageId, int workerNumber, 
int partitionNumber) throws IOException
   {
-    final String workerTaskId = taskList.get().get(workerNumber);
 
     try {
-      final String remotePartitionPath = 
DurableStorageOutputChannelFactory.getPartitionFileName(
+      final String remotePartitionPath = findSuccessfulPartitionOutput(
           controllerTaskId,
-          workerTaskId,
+          workerNumber,
           stageId.getStageNumber(),
           partitionNumber
       );
+      LOG.debug(
+          "Reading output of stage [%d], partition [%d] for worker [%d] from 
the file at path [%s]",
+          stageId.getStageNumber(),
+          partitionNumber,
+          workerNumber,
+          remotePartitionPath
+      );
       RetryUtils.retry(() -> {
         if (!storageConnector.pathExists(remotePartitionPath)) {
           throw new ISE(
-              "Could not find remote output of worker task[%s] stage[%d] 
partition[%d]",
-              workerTaskId,
+              "Could not find remote outputs of stage [%d] partition [%d] for 
worker [%d] at the path [%s]",
               stageId.getStageNumber(),
-              partitionNumber
+              partitionNumber,
+              workerNumber,
+              remotePartitionPath
           );
         }
         return Boolean.TRUE;
@@ -112,11 +119,64 @@ public class DurableStorageInputChannelFactory implements 
InputChannelFactory
     catch (Exception e) {
       throw new IOE(
           e,
-          "Could not find remote output of worker task[%s] stage[%d] 
partition[%d]",
-          workerTaskId,
+          "Encountered error while reading the output of stage [%d], partition 
[%d] for worker [%d]",
           stageId.getStageNumber(),
-          partitionNumber
+          partitionNumber,
+          workerNumber
       );
     }
   }
+
+  /**
+   * Given an input worker number, stage number and the partition number, this 
method figures out the exact location
+   * where the outputs would be present in the durable storage and returns the 
complete path or throws an exception
+   * if no such file exists in the durable storage
+   * More information at {@link 
DurableStorageOutputChannelFactory#createSuccessFile(String)}
+   */
+  public String findSuccessfulPartitionOutput(
+      final String controllerTaskId,
+      final int workerNo,
+      final int stageNumber,
+      final int partitionNumber
+  ) throws IOException
+  {
+    String successfulFilePath = DurableStorageUtils.getSuccessFilePath(
+        controllerTaskId,
+        stageNumber,
+        workerNo
+    );
+
+    if (!storageConnector.pathExists(successfulFilePath)) {
+      throw new ISE(
+          "No file present at the location [%s]. Unable to read the outputs of 
stage [%d], partition [%d] for the worker [%d]",
+          successfulFilePath,
+          stageNumber,
+          partitionNumber,
+          workerNo
+      );
+    }
+
+    String successfulTaskId;
+
+    try (InputStream is = storageConnector.read(successfulFilePath)) {
+      successfulTaskId = IOUtils.toString(is, StandardCharsets.UTF_8);
+    }
+    if (successfulTaskId == null) {
+      throw new ISE("Unable to read the task id from the file: [%s]", 
successfulFilePath);
+    }
+    LOG.debug(
+        "Reading output of stage [%d], partition [%d] from task id [%s]",
+        stageNumber,
+        partitionNumber,
+        successfulTaskId
+    );
+
+    return DurableStorageUtils.getPartitionOutputsFileNameForPartition(
+        controllerTaskId,
+        stageNumber,
+        workerNo,
+        successfulTaskId,
+        partitionNumber
+    );
+  }
 }
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/shuffle/DurableStorageOutputChannelFactory.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/shuffle/DurableStorageOutputChannelFactory.java
index 1f4361a939..3bc0f72c89 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/shuffle/DurableStorageOutputChannelFactory.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/shuffle/DurableStorageOutputChannelFactory.java
@@ -20,7 +20,6 @@
 package org.apache.druid.msq.shuffle;
 
 import com.google.common.base.Preconditions;
-import org.apache.druid.common.utils.IdUtils;
 import org.apache.druid.frame.allocation.ArenaMemoryAllocator;
 import org.apache.druid.frame.channel.ReadableNilFrameChannel;
 import org.apache.druid.frame.channel.WritableFrameFileChannel;
@@ -28,31 +27,39 @@ import org.apache.druid.frame.file.FrameFileWriter;
 import org.apache.druid.frame.processor.OutputChannel;
 import org.apache.druid.frame.processor.OutputChannelFactory;
 import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.storage.StorageConnector;
 
 import java.io.IOException;
+import java.io.OutputStreamWriter;
 import java.nio.channels.Channels;
+import java.nio.charset.StandardCharsets;
 
 public class DurableStorageOutputChannelFactory implements OutputChannelFactory
 {
+
+  private static final Logger LOG = new 
Logger(DurableStorageOutputChannelFactory.class);
+
   private final String controllerTaskId;
-  private final String workerTaskId;
+  private final int workerNumber;
   private final int stageNumber;
+  private final String taskId;
   private final int frameSize;
   private final StorageConnector storageConnector;
 
   public DurableStorageOutputChannelFactory(
       final String controllerTaskId,
-      final String workerTaskId,
+      final int workerNumber,
       final int stageNumber,
+      final String taskId,
       final int frameSize,
       final StorageConnector storageConnector
   )
   {
     this.controllerTaskId = Preconditions.checkNotNull(controllerTaskId, 
"controllerTaskId");
-    this.workerTaskId = Preconditions.checkNotNull(workerTaskId, 
"workerTaskId");
+    this.workerNumber = workerNumber;
     this.stageNumber = stageNumber;
+    this.taskId = taskId;
     this.frameSize = frameSize;
     this.storageConnector = Preconditions.checkNotNull(storageConnector, 
"storageConnector");
   }
@@ -63,16 +70,18 @@ public class DurableStorageOutputChannelFactory implements 
OutputChannelFactory
    */
   public static DurableStorageOutputChannelFactory 
createStandardImplementation(
       final String controllerTaskId,
-      final String workerTaskId,
+      final int workerNumber,
       final int stageNumber,
+      final String taskId,
       final int frameSize,
       final StorageConnector storageConnector
   )
   {
     return new DurableStorageOutputChannelFactory(
         controllerTaskId,
-        workerTaskId,
+        workerNumber,
         stageNumber,
+        taskId,
         frameSize,
         storageConnector
     );
@@ -81,7 +90,13 @@ public class DurableStorageOutputChannelFactory implements 
OutputChannelFactory
   @Override
   public OutputChannel openChannel(int partitionNumber) throws IOException
   {
-    final String fileName = getPartitionFileName(controllerTaskId, 
workerTaskId, stageNumber, partitionNumber);
+    final String fileName = 
DurableStorageUtils.getPartitionOutputsFileNameForPartition(
+        controllerTaskId,
+        stageNumber,
+        workerNumber,
+        taskId,
+        partitionNumber
+    );
     final WritableFrameFileChannel writableChannel =
         new WritableFrameFileChannel(
             FrameFileWriter.open(
@@ -101,7 +116,13 @@ public class DurableStorageOutputChannelFactory implements 
OutputChannelFactory
   @Override
   public OutputChannel openNilChannel(int partitionNumber)
   {
-    final String fileName = getPartitionFileName(controllerTaskId, 
workerTaskId, stageNumber, partitionNumber);
+    final String fileName = 
DurableStorageUtils.getPartitionOutputsFileNameForPartition(
+        controllerTaskId,
+        stageNumber,
+        workerNumber,
+        taskId,
+        partitionNumber
+    );
     // As tasks dependent on output of this partition will forever block if no 
file is present in RemoteStorage. Hence, writing a dummy frame.
     try {
 
@@ -111,32 +132,30 @@ public class DurableStorageOutputChannelFactory 
implements OutputChannelFactory
     catch (IOException e) {
       throw new ISE(
           e,
-          "Unable to create empty remote output of workerTask[%s] stage[%d] 
partition[%d]",
-          workerTaskId,
+          "Unable to create empty remote output of stage [%d], partition [%d] 
for worker [%d]",
           stageNumber,
-          partitionNumber
+          partitionNumber,
+          workerNumber
       );
     }
   }
 
-  public static String getControllerDirectory(final String controllerTaskId)
-  {
-    return StringUtils.format("controller_%s", IdUtils.validateId("controller 
task ID", controllerTaskId));
-  }
-
-  public static String getPartitionFileName(
-      final String controllerTaskId,
-      final String workerTaskId,
-      final int stageNumber,
-      final int partitionNumber
-  )
+  /**
+   * Creates a file with name __success and adds the worker's id which has 
successfully written its outputs. While reading
+   * this file can be used to find out the worker which has written its 
outputs completely.
+   * Rename operation is not very quick in cloud storage like S3 due to which 
this alternative
+   * route has been taken.
+   * If the success file is already present in the location, then this method 
is a noop
+   */
+  public void createSuccessFile(String taskId) throws IOException
   {
-    return StringUtils.format(
-        "%s/worker_%s/stage_%d/part_%d",
-        getControllerDirectory(controllerTaskId),
-        IdUtils.validateId("worker task ID", workerTaskId),
-        stageNumber,
-        partitionNumber
-    );
+    String fileName = DurableStorageUtils.getSuccessFilePath(controllerTaskId, 
stageNumber, workerNumber);
+    if (storageConnector.pathExists(fileName)) {
+      LOG.warn("Path [%s] already exists. Won't attempt to rewrite on top of 
it.", fileName);
+      return;
+    }
+    OutputStreamWriter os = new 
OutputStreamWriter(storageConnector.write(fileName), StandardCharsets.UTF_8);
+    os.write(taskId); // Add some dummy content in the file
+    os.close();
   }
 }
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/shuffle/DurableStorageUtils.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/shuffle/DurableStorageUtils.java
new file mode 100644
index 0000000000..df3b86a5c7
--- /dev/null
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/shuffle/DurableStorageUtils.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.shuffle;
+
+import org.apache.druid.common.utils.IdUtils;
+import org.apache.druid.java.util.common.StringUtils;
+
+/**
+ * Helper class that fetches the directory and file names corresponding to 
file location
+ */
+public class DurableStorageUtils
+{
+  public static final String SUCCESS_MARKER_FILENAME = "__success";
+
+  public static String getControllerDirectory(final String controllerTaskId)
+  {
+    return StringUtils.format("controller_%s", IdUtils.validateId("controller 
task ID", controllerTaskId));
+  }
+
+  public static String getSuccessFilePath(
+      final String controllerTaskId,
+      final int stageNumber,
+      final int workerNumber
+  )
+  {
+    String folderName = getWorkerOutputFolderName(
+        controllerTaskId,
+        stageNumber,
+        workerNumber
+    );
+    String fileName = StringUtils.format("%s/%s", folderName, 
SUCCESS_MARKER_FILENAME);
+    return fileName;
+  }
+
+  /**
+   * Fetches the directory location where workers will store the partition 
files corresponding to the stage number
+   */
+  public static String getWorkerOutputFolderName(
+      final String controllerTaskId,
+      final int stageNumber,
+      final int workerNumber
+  )
+  {
+    return StringUtils.format(
+        "%s/stage_%d/worker_%d",
+        getControllerDirectory(controllerTaskId),
+        stageNumber,
+        workerNumber
+    );
+  }
+
+  /**
+   * Fetches the directory location where a particular worker will store the 
partition files corresponding to the
+   * stage number, and it's task id
+   */
+  public static String getTaskIdOutputsFolderName(
+      final String controllerTaskId,
+      final int stageNumber,
+      final int workerNumber,
+      final String taskId
+  )
+  {
+    return StringUtils.format(
+        "%s/taskId_%s",
+        getWorkerOutputFolderName(controllerTaskId, stageNumber, workerNumber),
+        taskId
+    );
+  }
+
+  /**
+   * Fetches the file location where a particular worker writes the data 
corresponding to a particular stage
+   * and partition
+   */
+  public static String getPartitionOutputsFileNameForPartition(
+      final String controllerTaskId,
+      final int stageNumber,
+      final int workerNumber,
+      final String taskId,
+      final int partitionNumber
+  )
+  {
+    return StringUtils.format(
+        "%s/part_%d",
+        getTaskIdOutputsFolderName(controllerTaskId, stageNumber, 
workerNumber, taskId),
+        partitionNumber
+    );
+  }
+}
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java
index de7038700b..c83481757d 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java
@@ -31,6 +31,7 @@ import org.apache.druid.msq.indexing.ColumnMapping;
 import org.apache.druid.msq.indexing.ColumnMappings;
 import org.apache.druid.msq.indexing.MSQSpec;
 import org.apache.druid.msq.indexing.MSQTuningConfig;
+import org.apache.druid.msq.shuffle.DurableStorageUtils;
 import org.apache.druid.msq.test.MSQTestBase;
 import org.apache.druid.query.InlineDataSource;
 import org.apache.druid.query.QueryDataSource;
@@ -62,6 +63,8 @@ import org.apache.druid.sql.calcite.util.CalciteTests;
 import org.hamcrest.CoreMatchers;
 import org.junit.Test;
 import org.junit.internal.matchers.ThrowableMessageMatcher;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mockito;
 
 import javax.annotation.Nonnull;
 import java.io.File;
@@ -1203,6 +1206,54 @@ public class MSQSelectTest extends MSQTestBase
         .verifyResults();
   }
 
+  @Test
+  public void testGroupByOnFooWithDurableStoragePathAssertions() throws 
IOException
+  {
+    RowSignature rowSignature = RowSignature.builder()
+                                            .add("cnt", ColumnType.LONG)
+                                            .add("cnt1", ColumnType.LONG)
+                                            .build();
+
+    testSelectQuery()
+        .setSql("select cnt,count(*) as cnt1 from foo group by cnt")
+        .setExpectedMSQSpec(MSQSpec.builder()
+                                   .query(GroupByQuery.builder()
+                                                      
.setDataSource(CalciteTests.DATASOURCE1)
+                                                      
.setInterval(querySegmentSpec(Filtration
+                                                                               
         .eternity()))
+                                                      
.setGranularity(Granularities.ALL)
+                                                      
.setDimensions(dimensions(
+                                                          new 
DefaultDimensionSpec(
+                                                              "cnt",
+                                                              "d0",
+                                                              ColumnType.LONG
+                                                          )
+                                                      ))
+                                                      
.setAggregatorSpecs(aggregators(new CountAggregatorFactory(
+                                                          "a0")))
+                                                      
.setContext(DEFAULT_MSQ_CONTEXT)
+                                                      .build())
+                                   .columnMappings(
+                                       new ColumnMappings(ImmutableList.of(
+                                           new ColumnMapping("d0", "cnt"),
+                                           new ColumnMapping("a0", "cnt1")
+                                       )
+                                       ))
+                                   
.tuningConfig(MSQTuningConfig.defaultConfig())
+                                   .build())
+        .setExpectedRowSignature(rowSignature)
+        .setExpectedResultRows(ImmutableList.of(new Object[]{1L, 6L}))
+        .verifyResults();
+    File successFile = new File(
+        localFileStorageDir,
+        DurableStorageUtils.getSuccessFilePath("query-test-query", 0, 0)
+    );
+
+    Mockito.verify(localFileStorageConnector, Mockito.times(2))
+               .write(ArgumentMatchers.endsWith("__success"));
+  }
+
+
   @Nonnull
   private List<Object[]> expectedMultiValueFooRowsGroup()
   {
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/report/MSQTaskReportTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/report/MSQTaskReportTest.java
index 4902675509..a0eeec58fb 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/report/MSQTaskReportTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/report/MSQTaskReportTest.java
@@ -43,7 +43,6 @@ import org.apache.druid.segment.TestHelper;
 import org.apache.druid.segment.column.ColumnType;
 import org.apache.druid.segment.column.RowSignature;
 import org.junit.Assert;
-import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -175,7 +174,6 @@ public class MSQTaskReportTest
   }
 
   @Test
-  @Ignore("requires https://github.com/apache/druid/pull/12938";)
   public void testWriteTaskReport() throws Exception
   {
     final MSQTaskReport report = new MSQTaskReport(
@@ -204,7 +202,9 @@ public class MSQTaskReportTest
 
     final Map<String, TaskReport> reportMap = mapper.readValue(
         reportFile,
-        new TypeReference<Map<String, TaskReport>>() {}
+        new TypeReference<Map<String, TaskReport>>()
+        {
+        }
     );
 
     final MSQTaskReport report2 = (MSQTaskReport) 
reportMap.get(MSQTaskReport.REPORT_KEY);
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
index 7aebaa476d..b33df1e3f2 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
@@ -140,7 +140,7 @@ import org.apache.druid.sql.calcite.util.SqlTestFramework;
 import org.apache.druid.sql.calcite.view.InProcessViewManager;
 import org.apache.druid.storage.StorageConnector;
 import org.apache.druid.storage.StorageConnectorProvider;
-import org.apache.druid.storage.local.LocalFileStorageConnectorProvider;
+import org.apache.druid.storage.local.LocalFileStorageConnector;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.PruneLoadSpec;
 import org.apache.druid.timeline.SegmentId;
@@ -218,6 +218,7 @@ public class MSQTestBase extends BaseCalciteQueryTest
   public final boolean useDefault = NullHandling.replaceWithDefault();
 
   protected File localFileStorageDir;
+  protected LocalFileStorageConnector localFileStorageConnector;
   private static final Logger log = new Logger(MSQTestBase.class);
   private ObjectMapper objectMapper;
   private MSQTestOverlordServiceClient indexingServiceClient;
@@ -327,8 +328,11 @@ public class MSQTestBase extends BaseCalciteQueryTest
                 MultiStageQuery.class
             );
             localFileStorageDir = tmpFolder.newFolder("fault");
+            localFileStorageConnector = Mockito.spy(
+                new LocalFileStorageConnector(localFileStorageDir)
+            );
             binder.bind(Key.get(StorageConnector.class, MultiStageQuery.class))
-                  .toProvider(new 
LocalFileStorageConnectorProvider(localFileStorageDir));
+                  .toProvider(() -> localFileStorageConnector);
           }
           catch (IOException e) {
             throw new ISE(e, "Unable to create setup storage connector");
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerClient.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerClient.java
index 0e00a8e3ed..5b088b71d5 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerClient.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerClient.java
@@ -75,7 +75,7 @@ public class MSQTestControllerClient implements 
ControllerClient
   }
 
   @Override
-  public void postWorkerWarning(String workerId, List<MSQErrorReport> 
MSQErrorReports)
+  public void postWorkerWarning(List<MSQErrorReport> MSQErrorReports)
   {
     controller.workerWarning(MSQErrorReports);
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to