This is an automated email from the ASF dual-hosted git repository.

abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 846345669d Error handling improvements for frame channels. (#12895)
846345669d is described below

commit 846345669d5fa625666eb3682260b7a80d9769c1
Author: Gian Merlino <[email protected]>
AuthorDate: Sun Aug 14 23:01:55 2022 -0700

    Error handling improvements for frame channels. (#12895)
    
    * Error handling improvements for frame channels.
    
    Two changes:
    
    1) Send errors down in-memory channels (BlockingQueueFrameChannel) on
       failure. This ensures that in situations where a chain of processors
       has been set up on a single machine, all processors see the root
       cause error. In particular, this means the final processor in the
       chain reports the root cause error, which ensures that someone with
       a handle to the final processor will get the proper error.
    
    2) Update FrameFileHttpResponseHandler to expect that the final fetch,
       rather than being simply empty, is also empty with a special header.
       This ensures that the handler is able to tell the difference between
       an empty fetch due to being at EOF, and an empty fetch due to a
       truncated HTTP response (after the 200 OK and headers are sent down,
       but before any content appears).
    
    * Fix tests, imports.
    
    * Checkstyle!
---
 .../org/apache/druid/java/util/common/Either.java  |  4 +--
 .../java/org/apache/druid/common/EitherTest.java   |  5 +--
 .../frame/channel/BlockingQueueFrameChannel.java   |  5 +--
 .../druid/frame/channel/WritableFrameChannel.java  | 13 ++++---
 .../frame/channel/WritableFrameFileChannel.java    |  4 ++-
 .../frame/file/FrameFileHttpResponseHandler.java   | 13 +++++--
 .../druid/frame/file/FrameFilePartialFetch.java    |  8 +++--
 .../frame/processor/FrameProcessorExecutor.java    |  5 +--
 .../file/FrameFileHttpResponseHandlerTest.java     | 42 ++++++++++++++++++----
 .../processor/FrameProcessorExecutorTest.java      | 11 ++++--
 .../frame/processor/RunAllFullyWidgetTest.java     |  6 +++-
 11 files changed, 87 insertions(+), 29 deletions(-)

diff --git a/core/src/main/java/org/apache/druid/java/util/common/Either.java 
b/core/src/main/java/org/apache/druid/java/util/common/Either.java
index 85fc625297..71412ba406 100644
--- a/core/src/main/java/org/apache/druid/java/util/common/Either.java
+++ b/core/src/main/java/org/apache/druid/java/util/common/Either.java
@@ -20,7 +20,6 @@
 package org.apache.druid.java.util.common;
 
 import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
 
 import javax.annotation.Nullable;
 import java.util.Objects;
@@ -87,7 +86,8 @@ public class Either<L, R>
     if (isValue()) {
       return value;
     } else if (error instanceof Throwable) {
-      Throwables.propagateIfPossible((Throwable) error);
+      // Always wrap Throwable, even if we could throw it directly, to provide 
additional context
+      // about where the exception happened (we want the current stack frame 
in the trace).
       throw new RuntimeException((Throwable) error);
     } else {
       throw new RuntimeException(error.toString());
diff --git a/core/src/test/java/org/apache/druid/common/EitherTest.java 
b/core/src/test/java/org/apache/druid/common/EitherTest.java
index a91908e5f9..bb081a2424 100644
--- a/core/src/test/java/org/apache/druid/common/EitherTest.java
+++ b/core/src/test/java/org/apache/druid/common/EitherTest.java
@@ -96,8 +96,9 @@ public class EitherTest
     MatcherAssert.assertThat(either.error(), 
CoreMatchers.instanceOf(AssertionError.class));
     MatcherAssert.assertThat(either.error().getMessage(), 
CoreMatchers.equalTo("oh no"));
 
-    final AssertionError e = Assert.assertThrows(AssertionError.class, 
either::valueOrThrow);
-    MatcherAssert.assertThat(e.getMessage(), CoreMatchers.equalTo("oh no"));
+    final RuntimeException e = Assert.assertThrows(RuntimeException.class, 
either::valueOrThrow);
+    MatcherAssert.assertThat(e.getCause(), 
CoreMatchers.instanceOf(AssertionError.class));
+    MatcherAssert.assertThat(e.getCause().getMessage(), 
CoreMatchers.equalTo("oh no"));
 
     // Test toString.
     Assert.assertEquals("Error[java.lang.AssertionError: oh no]", 
either.toString());
diff --git 
a/processing/src/main/java/org/apache/druid/frame/channel/BlockingQueueFrameChannel.java
 
b/processing/src/main/java/org/apache/druid/frame/channel/BlockingQueueFrameChannel.java
index 0d8988b44e..1b0aa009a3 100644
--- 
a/processing/src/main/java/org/apache/druid/frame/channel/BlockingQueueFrameChannel.java
+++ 
b/processing/src/main/java/org/apache/druid/frame/channel/BlockingQueueFrameChannel.java
@@ -28,6 +28,7 @@ import org.apache.druid.java.util.common.Either;
 import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.ISE;
 
+import javax.annotation.Nullable;
 import java.util.ArrayDeque;
 import java.util.NoSuchElementException;
 import java.util.Optional;
@@ -162,12 +163,12 @@ public class BlockingQueueFrameChannel
     }
 
     @Override
-    public void fail()
+    public void fail(@Nullable Throwable cause)
     {
       synchronized (lock) {
         queue.clear();
 
-        if (!queue.offer(Optional.of(Either.error(new ISE("Aborted"))))) {
+        if (!queue.offer(Optional.of(Either.error(cause != null ? cause : new 
ISE("Failed"))))) {
           // If this happens, it's a bug, potentially due to incorrectly using 
this class with multiple writers.
           throw new ISE("Could not write error to channel");
         }
diff --git 
a/processing/src/main/java/org/apache/druid/frame/channel/WritableFrameChannel.java
 
b/processing/src/main/java/org/apache/druid/frame/channel/WritableFrameChannel.java
index 6ec86c445b..ccb0166acc 100644
--- 
a/processing/src/main/java/org/apache/druid/frame/channel/WritableFrameChannel.java
+++ 
b/processing/src/main/java/org/apache/druid/frame/channel/WritableFrameChannel.java
@@ -22,6 +22,7 @@ package org.apache.druid.frame.channel;
 import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.druid.frame.Frame;
 
+import javax.annotation.Nullable;
 import java.io.Closeable;
 import java.io.IOException;
 
@@ -54,16 +55,20 @@ public interface WritableFrameChannel extends Closeable
 
   /**
    * Called prior to {@link #close()} if the writer has failed. Must be 
followed by a call to {@link #close()}.
+   *
+   * @param cause optional cause of failure. Used by the in-memory channel 
{@link BlockingQueueFrameChannel.Writable}
+   *              to propagate exeptions to downstream processors. Most other 
channels ignore the provided cause.
    */
-  void fail() throws IOException;
+  void fail(@Nullable Throwable cause) throws IOException;
 
   /**
    * Finish writing to this channel.
    *
-   * When this method is called without {@link #fail()} having previously been 
called, the writer is understood to have
-   * completed successfully.
+   * When this method is called without {@link #fail(Throwable)} having 
previously been called, the writer is
+   * understood to have completed successfully.
    *
-   * After calling this method, no additional calls to {@link #write}, {@link 
#fail()}, or this method are permitted.
+   * After calling this method, no additional calls to {@link #write}, {@link 
#fail(Throwable)}, or this method
+   * are permitted.
    */
   @Override
   void close() throws IOException;
diff --git 
a/processing/src/main/java/org/apache/druid/frame/channel/WritableFrameFileChannel.java
 
b/processing/src/main/java/org/apache/druid/frame/channel/WritableFrameFileChannel.java
index 231b8b8955..8ed0a5b721 100644
--- 
a/processing/src/main/java/org/apache/druid/frame/channel/WritableFrameFileChannel.java
+++ 
b/processing/src/main/java/org/apache/druid/frame/channel/WritableFrameFileChannel.java
@@ -23,6 +23,7 @@ import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.druid.frame.file.FrameFileWriter;
 
+import javax.annotation.Nullable;
 import java.io.IOException;
 
 /**
@@ -44,8 +45,9 @@ public class WritableFrameFileChannel implements 
WritableFrameChannel
   }
 
   @Override
-  public void fail() throws IOException
+  public void fail(@Nullable Throwable cause) throws IOException
   {
+    // Cause is ignored when writing to frame files. Readers can tell the file 
is truncated, but they won't know why.
     writer.abort();
   }
 
diff --git 
a/processing/src/main/java/org/apache/druid/frame/file/FrameFileHttpResponseHandler.java
 
b/processing/src/main/java/org/apache/druid/frame/file/FrameFileHttpResponseHandler.java
index 08645f562f..0f70fb3d69 100644
--- 
a/processing/src/main/java/org/apache/druid/frame/file/FrameFileHttpResponseHandler.java
+++ 
b/processing/src/main/java/org/apache/druid/frame/file/FrameFileHttpResponseHandler.java
@@ -40,9 +40,15 @@ import org.jboss.netty.handler.codec.http.HttpResponseStatus;
  * to back off from issuing the next request, if appropriate. However: the 
handler does not implement backpressure
  * through the {@link HttpResponseHandler.TrafficCop} mechanism. Therefore, it 
is important that each request retrieve
  * a modest amount of data.
+ *
+ * The last fetch must be empty (zero content bytes) and must have the header 
{@link #HEADER_LAST_FETCH_NAME} set to
+ * {@link #HEADER_LAST_FETCH_VALUE}. Under these conditions, {@link 
FrameFilePartialFetch#isLastFetch()} returns true.
  */
 public class FrameFileHttpResponseHandler implements 
HttpResponseHandler<FrameFilePartialFetch, FrameFilePartialFetch>
 {
+  public static final String HEADER_LAST_FETCH_NAME = 
"X-Druid-Frame-Last-Fetch";
+  public static final String HEADER_LAST_FETCH_VALUE = "yes";
+
   private final ReadableByteChunksFrameChannel channel;
 
   public FrameFileHttpResponseHandler(final ReadableByteChunksFrameChannel 
channel)
@@ -53,14 +59,17 @@ public class FrameFileHttpResponseHandler implements 
HttpResponseHandler<FrameFi
   @Override
   public ClientResponse<FrameFilePartialFetch> handleResponse(final 
HttpResponse response, final TrafficCop trafficCop)
   {
-    final ClientResponse<FrameFilePartialFetch> clientResponse = 
ClientResponse.unfinished(new FrameFilePartialFetch());
-
     if (response.getStatus().getCode() != HttpResponseStatus.OK.getCode()) {
       // Note: if the error body is chunked, we will discard all future chunks 
due to setting exceptionCaught here.
       // This is OK because we don't need the body; just the HTTP status code.
+      final ClientResponse<FrameFilePartialFetch> clientResponse =
+          ClientResponse.unfinished(new FrameFilePartialFetch(false));
       exceptionCaught(clientResponse, new ISE("Server for [%s] returned [%s]", 
channel.getId(), response.getStatus()));
       return clientResponse;
     } else {
+      final boolean lastFetchHeaderSet = 
HEADER_LAST_FETCH_VALUE.equals(response.headers().get(HEADER_LAST_FETCH_NAME));
+      final ClientResponse<FrameFilePartialFetch> clientResponse =
+          ClientResponse.unfinished(new 
FrameFilePartialFetch(lastFetchHeaderSet));
       return response(clientResponse, response.getContent());
     }
   }
diff --git 
a/processing/src/main/java/org/apache/druid/frame/file/FrameFilePartialFetch.java
 
b/processing/src/main/java/org/apache/druid/frame/file/FrameFilePartialFetch.java
index 2aae404b04..8c2056dcbe 100644
--- 
a/processing/src/main/java/org/apache/druid/frame/file/FrameFilePartialFetch.java
+++ 
b/processing/src/main/java/org/apache/druid/frame/file/FrameFilePartialFetch.java
@@ -33,6 +33,7 @@ import javax.annotation.Nullable;
  */
 public class FrameFilePartialFetch
 {
+  private final boolean lastFetchHeaderSet;
   private long bytesRead;
 
   @Nullable
@@ -41,13 +42,14 @@ public class FrameFilePartialFetch
   @Nullable
   private ListenableFuture<?> backpressureFuture;
 
-  FrameFilePartialFetch()
+  FrameFilePartialFetch(boolean lastFetchHeaderSet)
   {
+    this.lastFetchHeaderSet = lastFetchHeaderSet;
   }
 
-  public boolean isEmptyFetch()
+  public boolean isLastFetch()
   {
-    return exceptionCaught == null && bytesRead == 0L;
+    return exceptionCaught == null && lastFetchHeaderSet && bytesRead == 0L;
   }
 
   /**
diff --git 
a/processing/src/main/java/org/apache/druid/frame/processor/FrameProcessorExecutor.java
 
b/processing/src/main/java/org/apache/druid/frame/processor/FrameProcessorExecutor.java
index f83c7003fc..43221de36a 100644
--- 
a/processing/src/main/java/org/apache/druid/frame/processor/FrameProcessorExecutor.java
+++ 
b/processing/src/main/java/org/apache/druid/frame/processor/FrameProcessorExecutor.java
@@ -50,6 +50,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.CancellationException;
 import java.util.concurrent.ExecutorService;
 import java.util.function.BiFunction;
 import java.util.stream.Collectors;
@@ -317,7 +318,7 @@ public class FrameProcessorExecutor
       {
         for (final WritableFrameChannel outputChannel : outputChannels) {
           try {
-            outputChannel.fail();
+            outputChannel.fail(e);
           }
           catch (Throwable e1) {
             e.addSuppressed(e1);
@@ -535,7 +536,7 @@ public class FrameProcessorExecutor
       // Fail all output channels prior to calling cleanup.
       for (final WritableFrameChannel outputChannel : 
processor.outputChannels()) {
         try {
-          outputChannel.fail();
+          outputChannel.fail(new CancellationException("Canceled"));
         }
         catch (Throwable e) {
           log.debug(e, "Exception encountered while marking output channel 
failed for processor [%s]", processor);
diff --git 
a/processing/src/test/java/org/apache/druid/frame/file/FrameFileHttpResponseHandlerTest.java
 
b/processing/src/test/java/org/apache/druid/frame/file/FrameFileHttpResponseHandlerTest.java
index d5d472211f..c6c2dc198d 100644
--- 
a/processing/src/test/java/org/apache/druid/frame/file/FrameFileHttpResponseHandlerTest.java
+++ 
b/processing/src/test/java/org/apache/druid/frame/file/FrameFileHttpResponseHandlerTest.java
@@ -119,14 +119,14 @@ public class FrameFileHttpResponseHandlerTest extends 
InitializedNullHandlingTes
     Assert.assertFalse(response1.isFinished());
     Assert.assertTrue(response1.isContinueReading());
     Assert.assertFalse(response1.getObj().isExceptionCaught());
-    Assert.assertFalse(response1.getObj().isEmptyFetch());
+    Assert.assertFalse(response1.getObj().isLastFetch());
 
     final ClientResponse<FrameFilePartialFetch> response2 = 
handler.done(response1);
 
     Assert.assertTrue(response2.isFinished());
     Assert.assertTrue(response2.isContinueReading());
     Assert.assertFalse(response2.getObj().isExceptionCaught());
-    Assert.assertFalse(response2.getObj().isEmptyFetch());
+    Assert.assertFalse(response2.getObj().isLastFetch());
 
     final ListenableFuture<?> backpressureFuture = 
response2.getObj().backpressureFuture();
     Assert.assertFalse(backpressureFuture.isDone());
@@ -143,7 +143,7 @@ public class FrameFileHttpResponseHandlerTest extends 
InitializedNullHandlingTes
   }
 
   @Test
-  public void testEmptyResponse()
+  public void testEmptyResponseWithoutLastFetchHeader()
   {
     final ClientResponse<FrameFilePartialFetch> response1 = 
handler.handleResponse(
         makeResponse(HttpResponseStatus.OK, ByteArrays.EMPTY_ARRAY),
@@ -153,14 +153,42 @@ public class FrameFileHttpResponseHandlerTest extends 
InitializedNullHandlingTes
     Assert.assertFalse(response1.isFinished());
     Assert.assertTrue(response1.isContinueReading());
     Assert.assertFalse(response1.getObj().isExceptionCaught());
-    Assert.assertTrue(response1.getObj().isEmptyFetch());
+    Assert.assertFalse(response1.getObj().isLastFetch());
 
     final ClientResponse<FrameFilePartialFetch> response2 = 
handler.done(response1);
 
     Assert.assertTrue(response2.isFinished());
     Assert.assertTrue(response2.isContinueReading());
     Assert.assertFalse(response2.getObj().isExceptionCaught());
-    Assert.assertTrue(response2.getObj().isEmptyFetch());
+    Assert.assertFalse(response2.getObj().isLastFetch());
+    Assert.assertTrue(response2.getObj().backpressureFuture().isDone());
+  }
+
+  @Test
+  public void testEmptyResponseWithLastFetchHeader()
+  {
+    final HttpResponse serverResponse = makeResponse(HttpResponseStatus.OK, 
ByteArrays.EMPTY_ARRAY);
+    serverResponse.headers().set(
+        FrameFileHttpResponseHandler.HEADER_LAST_FETCH_NAME,
+        FrameFileHttpResponseHandler.HEADER_LAST_FETCH_VALUE
+    );
+
+    final ClientResponse<FrameFilePartialFetch> response1 = 
handler.handleResponse(
+        serverResponse,
+        null
+    );
+
+    Assert.assertFalse(response1.isFinished());
+    Assert.assertTrue(response1.isContinueReading());
+    Assert.assertFalse(response1.getObj().isExceptionCaught());
+    Assert.assertTrue(response1.getObj().isLastFetch());
+
+    final ClientResponse<FrameFilePartialFetch> response2 = 
handler.done(response1);
+
+    Assert.assertTrue(response2.isFinished());
+    Assert.assertTrue(response2.isContinueReading());
+    Assert.assertFalse(response2.getObj().isExceptionCaught());
+    Assert.assertTrue(response2.getObj().isLastFetch());
     Assert.assertTrue(response2.getObj().backpressureFuture().isDone());
   }
 
@@ -186,7 +214,7 @@ public class FrameFileHttpResponseHandlerTest extends 
InitializedNullHandlingTes
 
       Assert.assertFalse(response.isFinished());
       Assert.assertFalse(response.getObj().isExceptionCaught());
-      Assert.assertFalse(response.getObj().isEmptyFetch());
+      Assert.assertFalse(response.getObj().isLastFetch());
     }
 
     final ClientResponse<FrameFilePartialFetch> finalResponse = 
handler.done(response);
@@ -194,7 +222,7 @@ public class FrameFileHttpResponseHandlerTest extends 
InitializedNullHandlingTes
     Assert.assertTrue(finalResponse.isFinished());
     Assert.assertTrue(finalResponse.isContinueReading());
     Assert.assertFalse(response.getObj().isExceptionCaught());
-    Assert.assertFalse(response.getObj().isEmptyFetch());
+    Assert.assertFalse(response.getObj().isLastFetch());
 
     final ListenableFuture<?> backpressureFuture = 
response.getObj().backpressureFuture();
     Assert.assertFalse(backpressureFuture.isDone());
diff --git 
a/processing/src/test/java/org/apache/druid/frame/processor/FrameProcessorExecutorTest.java
 
b/processing/src/test/java/org/apache/druid/frame/processor/FrameProcessorExecutorTest.java
index 792436ece1..349fdb2efe 100644
--- 
a/processing/src/test/java/org/apache/druid/frame/processor/FrameProcessorExecutorTest.java
+++ 
b/processing/src/test/java/org/apache/druid/frame/processor/FrameProcessorExecutorTest.java
@@ -195,18 +195,23 @@ public class FrameProcessorExecutorTest
       );
 
       MatcherAssert.assertThat(
-          e.getCause(),
+          e.getCause().getCause(),
           
ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString("failure!"))
       );
 
       final ReadableFrameChannel outReadableChannel = outChannel.readable();
       Assert.assertTrue(outReadableChannel.canRead());
 
-      Assert.assertThrows(
-          IllegalStateException.class,
+      final RuntimeException readException = Assert.assertThrows(
+          RuntimeException.class,
           outReadableChannel::read
       );
 
+      MatcherAssert.assertThat(
+          readException.getCause(),
+          
ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString("failure!"))
+      );
+
       Assert.assertTrue(outReadableChannel.isFinished()); // Finished now that 
we read the error
     }
 
diff --git 
a/processing/src/test/java/org/apache/druid/frame/processor/RunAllFullyWidgetTest.java
 
b/processing/src/test/java/org/apache/druid/frame/processor/RunAllFullyWidgetTest.java
index caec7984b2..cd13bcd00d 100644
--- 
a/processing/src/test/java/org/apache/druid/frame/processor/RunAllFullyWidgetTest.java
+++ 
b/processing/src/test/java/org/apache/druid/frame/processor/RunAllFullyWidgetTest.java
@@ -216,7 +216,11 @@ public class RunAllFullyWidgetTest extends 
FrameProcessorExecutorTest.BaseFrameP
 
     final ExecutionException e = Assert.assertThrows(ExecutionException.class, 
future::get);
     MatcherAssert.assertThat(e.getCause(), 
CoreMatchers.instanceOf(RuntimeException.class));
-    MatcherAssert.assertThat(e.getCause(), 
ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo("failure!")));
+    MatcherAssert.assertThat(e.getCause().getCause(), 
CoreMatchers.instanceOf(RuntimeException.class));
+    MatcherAssert.assertThat(
+        e.getCause().getCause(),
+        ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo("failure!"))
+    );
   }
 
   @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to