ilooner closed pull request #1470: DRILL-6746: Query can hang when 
PartitionSender task thread sees a co…
URL: https://github.com/apache/drill/pull/1470
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java
index 43abd8e705d..6d77d639f8f 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.work.batch;
 
 import java.io.IOException;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.drill.common.exceptions.DrillRuntimeException;
@@ -36,8 +37,9 @@
 
   protected interface BufferQueue<T> {
     void addOomBatch(RawFragmentBatch batch);
-    RawFragmentBatch poll() throws IOException;
+    RawFragmentBatch poll() throws IOException, InterruptedException;
     RawFragmentBatch take() throws IOException, InterruptedException;
+    RawFragmentBatch poll(long timeout, TimeUnit timeUnit) throws 
InterruptedException, IOException;
     boolean checkForOutOfMemory();
     int size();
     boolean isEmpty();
@@ -127,17 +129,24 @@ public synchronized void kill(final FragmentContext 
context) {
    * responses pending
    */
   private void clearBufferWithBody() {
+    RawFragmentBatch batch;
     while (!bufferQueue.isEmpty()) {
-      final RawFragmentBatch batch;
+      batch = null;
       try {
         batch = bufferQueue.poll();
         assertAckSent(batch);
       } catch (IOException e) {
         context.getExecutorState().fail(e);
         continue;
-      }
-      if (batch.getBody() != null) {
-        batch.getBody().release();
+      } catch (InterruptedException e) {
+        context.getExecutorState().fail(e);
+        // keep the state that the thread is interrupted
+        Thread.currentThread().interrupt();
+        continue;
+      } finally {
+        if (batch != null && batch.getBody() != null) {
+          batch.getBody().release();
+        }
       }
     }
   }
@@ -167,7 +176,25 @@ public RawFragmentBatch getNext() throws IOException {
 
       // if we didn't get a batch, block on waiting for queue.
       if (b == null && (!isTerminated() || !bufferQueue.isEmpty())) {
-        b = bufferQueue.take();
+        // We shouldn't block infinitely here. There can be a condition such 
that due to a failure FragmentExecutor
+        // state is changed to FAILED and queue is empty. Because of this the 
minor fragment main thread will block
+        // here waiting for next batch to arrive. Meanwhile when next batch 
arrived and was enqueued it sees
+        // FragmentExecutor failure state and doesn't enqueue the batch and 
cleans up the buffer queue. Hence this
+        // thread will stuck forever. So we pool for 5 seconds until we get a 
batch or FragmentExecutor state is in
+        // error condition.
+        while (b == null) {
+          b = bufferQueue.poll(5, TimeUnit.SECONDS);
+          if (!context.getExecutorState().shouldContinue()) {
+            kill(context);
+            if (b != null) {
+              assertAckSent(b);
+              if (b.getBody() != null) {
+                b.getBody().release();
+              }
+              b = null;
+            }
+          } // else b will be assigned a valid batch
+        }
       }
     } catch (final InterruptedException e) {
 
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java
index 5d4b3a1b8ad..50f582dfa38 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java
@@ -102,14 +102,10 @@ public void addOomBatch(RawFragmentBatch batch) {
     }
 
     @Override
-    public RawFragmentBatch poll() throws IOException {
+    public RawFragmentBatch poll() throws IOException, InterruptedException {
       RawFragmentBatchWrapper batchWrapper = buffer.poll();
       if (batchWrapper != null) {
-        try {
-          return batchWrapper.get();
-        } catch (InterruptedException e) {
-          return null;
-        }
+        return batchWrapper.get();
       }
       return null;
     }
@@ -119,6 +115,15 @@ public RawFragmentBatch take() throws IOException, 
InterruptedException {
       return buffer.take().get();
     }
 
+    @Override
+    public RawFragmentBatch poll(long timeout, TimeUnit timeUnit) throws 
InterruptedException, IOException {
+      RawFragmentBatchWrapper batchWrapper = buffer.poll(timeout, timeUnit);
+      if (batchWrapper != null) {
+        return batchWrapper.get();
+      }
+      return null;
+    }
+
     @Override
     public boolean checkForOutOfMemory() {
       return buffer.peek().isOutOfMemory();
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
index bf14a74b8cf..0d36d5d4083 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
@@ -19,6 +19,7 @@
 
 import java.io.IOException;
 import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.record.RawFragmentBatch;
@@ -63,6 +64,15 @@ public RawFragmentBatch take() throws IOException, 
InterruptedException {
       return batch;
     }
 
+    @Override
+    public RawFragmentBatch poll(long timeout, TimeUnit timeUnit) throws 
InterruptedException, IOException {
+      RawFragmentBatch batch = buffer.poll(timeout, timeUnit);
+      if (batch != null) {
+        batch.sendOk();
+      }
+      return batch;
+    }
+
     @Override
     public boolean checkForOutOfMemory() {
       return context.getAllocator().isOverLimit();


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to