LakshSingla commented on code in PR #16790:
URL: https://github.com/apache/druid/pull/16790#discussion_r1690868113


##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerClient.java:
##########
@@ -77,20 +81,21 @@ void postResultsComplete(
 
   /**
    * Client side method to inform the controller that the error has occured in 
the given worker.
+   *
+   * @param queryId      query ID, if this error is associated with a specific 
query
+   * @param errorWrapper error details
    */
   void postWorkerError(
-      String workerId,
+      @Nullable String queryId,

Review Comment:
   Looking at the places where this is called, we pass it the worker's task id. 
Should we keep the parameter name as is? Also, I was taking a look at the 
controller chat handler and we don't really use the worker's task id afaict. 
Maybe we can remove the param altogether, though that would introduce backward 
compatibility issues. 
   Also, can it be nullable given that we pass this as a URL param? 



##########
extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/shuffle/output/ByteChunksInputStreamTest.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.shuffle.output;
+
+import com.google.common.collect.ImmutableList;
+import org.hamcrest.MatcherAssert;
+import org.hamcrest.Matchers;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+
+public class ByteChunksInputStreamTest
+{
+  private final List<byte[]> chunks = ImmutableList.of(

Review Comment:
   Should we add some empty arrays in the middle and end as well? 



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerClient.java:
##########
@@ -77,20 +81,21 @@ void postResultsComplete(
 
   /**
    * Client side method to inform the controller that the error has occured in 
the given worker.
+   *
+   * @param queryId      query ID, if this error is associated with a specific 
query
+   * @param errorWrapper error details
    */
   void postWorkerError(
-      String workerId,
+      @Nullable String queryId,
       MSQErrorReport errorWrapper
   ) throws IOException;
 
   /**
    * Client side method to inform the controller about the warnings generated 
by the given worker.
    */
-  void postWorkerWarning(
-      List<MSQErrorReport> MSQErrorReports
-  ) throws IOException;
+  void postWorkerWarning(List<MSQErrorReport> MSQErrorReports) throws 
IOException;
 
-  List<String> getTaskList() throws IOException;
+  List<String> getWorkerIds() throws IOException;

Review Comment:
   nit: lets add javadoc here, even though it wasn't present in the original 
code. 



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/shuffle/output/ChannelStageOutputReader.java:
##########
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.shuffle.output;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.primitives.Ints;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import it.unimi.dsi.fastutil.bytes.ByteArrays;
+import org.apache.druid.common.guava.FutureUtils;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.frame.channel.ByteTracker;
+import org.apache.druid.frame.channel.ReadableFrameChannel;
+import org.apache.druid.frame.file.FrameFileWriter;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.msq.exec.OutputChannelMode;
+import org.apache.druid.msq.kernel.controller.ControllerQueryKernelUtils;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+import java.util.ArrayDeque;
+import java.util.Deque;
+
+/**
+ * Reader for {@link ReadableFrameChannel}.
+ *
+ * Because this reader returns an underlying channel directly, it must only be 
used when it is certain that

Review Comment:
   If this reader can be used by a single consumer, why is there a requirement 
for adding synchronization to the methods? Also, we can probably guard the 
shared states with `GuardedBy`. 



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/shuffle/output/ChannelStageOutputReader.java:
##########


Review Comment:
   Curious if we should separate out the state transitions from the actual 
code. The file itself is fairly easily understandable so I am fine either way. 



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/shuffle/output/FutureReadableFrameChannel.java:
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.shuffle.output;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.druid.common.guava.FutureUtils;
+import org.apache.druid.frame.Frame;
+import org.apache.druid.frame.channel.ReadableFrameChannel;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.logger.Logger;
+
+import java.util.NoSuchElementException;
+
+public class FutureReadableFrameChannel implements ReadableFrameChannel
+{
+  private static final Logger log = new 
Logger(FutureReadableFrameChannel.class);
+
+  private final ListenableFuture<ReadableFrameChannel> channelFuture;
+  private ReadableFrameChannel channel;
+
+  public FutureReadableFrameChannel(final 
ListenableFuture<ReadableFrameChannel> channelFuture)
+  {
+    this.channelFuture = channelFuture;
+  }
+
+  @Override
+  public boolean isFinished()
+  {
+    if (populateChannel()) {
+      return channel.isFinished();
+    } else {
+      return false;
+    }
+  }
+
+  @Override
+  public boolean canRead()
+  {
+    if (populateChannel()) {
+      return channel.canRead();
+    } else {
+      return false;
+    }
+  }
+
+  @Override
+  public Frame read()
+  {
+    if (populateChannel()) {
+      return channel.read();
+    } else {
+      throw new NoSuchElementException();
+    }
+  }
+
+  @Override
+  public ListenableFuture<?> readabilityFuture()
+  {
+    if (populateChannel()) {
+      return channel.readabilityFuture();
+    } else {
+      return FutureUtils.transformAsync(channelFuture, ignored -> 
readabilityFuture());
+    }
+  }
+
+  @Override
+  public void close()
+  {
+    if (populateChannel()) {
+      channel.close();
+    } else {
+      channelFuture.cancel(true);
+      channelFuture.addListener(

Review Comment:
   Quick question - Why are we attaching a listener after canceling the future? 
Is it a defensive check for closing the channel if the `cancel` didn't register 
"in time"? Also given that this is happening on a single thread would we ever 
encounter such case when the addListener could get triggered?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to