[ 
https://issues.apache.org/jira/browse/DRILL-6348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16481762#comment-16481762
 ] 

ASF GitHub Bot commented on DRILL-6348:
---------------------------------------

asfgit closed pull request #1237: DRILL-6348: Fixed code so that Unordered 
Receiver reports its memory …
URL: https://github.com/apache/drill/pull/1237
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java
index c9b20705ff..65792827d2 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java
@@ -428,13 +428,16 @@ public boolean isImpersonationEnabled() {
   public void close() {
     waitForSendComplete();
 
+    // Close the buffers before closing the operators; this is needed as 
buffer ownership
+    // is attached to the receive operators.
+    suppressingClose(buffers);
+
     // close operator context
     for (OperatorContextImpl opContext : contexts) {
       suppressingClose(opContext);
     }
 
     suppressingClose(bufferManager);
-    suppressingClose(buffers);
     suppressingClose(allocator);
   }
 
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/MergingReceiverCreator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/MergingReceiverCreator.java
index 0ef84b960f..66a0cc2c25 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/MergingReceiverCreator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/MergingReceiverCreator.java
@@ -42,7 +42,7 @@ public MergingRecordBatch getBatch(ExecutorFragmentContext 
context,
     IncomingBuffers bufHolder = context.getBuffers();
 
     assert bufHolder != null : "IncomingBuffers must be defined for any place 
a receiver is declared.";
-    RawBatchBuffer[] buffers = 
bufHolder.getBuffers(receiver.getOppositeMajorFragmentId());
+    RawBatchBuffer[] buffers = 
bufHolder.getCollector(receiver.getOppositeMajorFragmentId()).getBuffers();
 
     return new MergingRecordBatch(context, receiver, buffers);
   }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
index 7e5ff2126f..9087757b0a 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
@@ -136,6 +136,9 @@ public MergingRecordBatch(final ExchangeFragmentContext 
context,
     this.config = config;
     this.inputCounts = new long[config.getNumSenders()];
     this.outputCounts = new long[config.getNumSenders()];
+
+    // Register this operator's buffer allocator so that incoming buffers are 
owned by this allocator
+    
context.getBuffers().getCollector(config.getOppositeMajorFragmentId()).setAllocator(oContext.getAllocator());
   }
 
   @SuppressWarnings("resource")
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
index 9da8a4b6f1..fcf258e377 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
@@ -86,6 +86,9 @@ public UnorderedReceiverBatch(final ExchangeFragmentContext 
context, final RawFr
     this.stats = oContext.getStats();
     this.stats.setLongStat(Metric.NUM_SENDERS, config.getNumSenders());
     this.config = config;
+
+    // Register this operator's buffer allocator so that incoming buffers are 
owned by this allocator
+    
context.getBuffers().getCollector(config.getOppositeMajorFragmentId()).setAllocator(oContext.getAllocator());
   }
 
   @Override
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverCreator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverCreator.java
index 01a458890a..3dcdfc4ef7 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverCreator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverCreator.java
@@ -37,7 +37,7 @@ public UnorderedReceiverBatch 
getBatch(ExecutorFragmentContext context, Unordere
     IncomingBuffers bufHolder = context.getBuffers();
     assert bufHolder != null : "IncomingBuffers must be defined for any place 
a receiver is declared.";
 
-    RawBatchBuffer[] buffers = 
bufHolder.getBuffers(receiver.getOppositeMajorFragmentId());
+    RawBatchBuffer[] buffers = 
bufHolder.getCollector(receiver.getOppositeMajorFragmentId()).getBuffers();
     assert buffers.length == 1;
     RawBatchBuffer buffer = buffers[0];
     return new UnorderedReceiverBatch(context, buffer, receiver);
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java
index b6b4183e77..bb3a5a266d 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java
@@ -22,6 +22,7 @@
 import java.util.concurrent.atomic.AtomicIntegerArray;
 
 import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.proto.BitControl.Collector;
 import org.apache.drill.exec.record.RawFragmentBatch;
@@ -37,6 +38,8 @@
   private final int incomingStreams;
   protected final RawBatchBuffer[] buffers;
   protected final ArrayWrappedIntIntMap fragmentMap;
+  /** Allocator which owns incoming batches */
+  protected BufferAllocator ownerAllocator;
 
   /**
    * @param parentAccounter
@@ -53,6 +56,7 @@ public AbstractDataCollector(AtomicInteger parentAccounter,
     this.parentAccounter = parentAccounter;
     this.remainders = new AtomicIntegerArray(incomingStreams);
     this.oppositeMajorFragmentId = collector.getOppositeMajorFragmentId();
+    this.ownerAllocator = context.getAllocator();
     // Create fragmentId to index that is within the range [0, 
incoming.size()-1]
     // We use this mapping to find objects belonging to the fragment in 
buffers and remainders arrays.
     fragmentMap = new ArrayWrappedIntIntMap();
@@ -116,4 +120,17 @@ public void close() throws Exception {
     AutoCloseables.close(buffers);
   }
 
+  /** {@inheritDoc} */
+  @Override
+  public BufferAllocator getAllocator() {
+    return this.ownerAllocator;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void setAllocator(BufferAllocator allocator) {
+    Preconditions.checkArgument(allocator != null, "buffer allocator cannot be 
null");
+    this.ownerAllocator = allocator;
+  }
+
 }
\ No newline at end of file
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/DataCollector.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/DataCollector.java
index 026fc81e1e..fa746770b1 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/DataCollector.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/DataCollector.java
@@ -19,13 +19,25 @@
 
 import java.io.IOException;
 
+import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.record.RawFragmentBatch;
 
-interface DataCollector extends AutoCloseable {
+public interface DataCollector extends AutoCloseable {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(DataCollector.class);
   public boolean batchArrived(int minorFragmentId, RawFragmentBatch batch) 
throws IOException ;
   public int getOppositeMajorFragmentId();
   public RawBatchBuffer[] getBuffers();
   public int getTotalIncomingFragments();
   public void close() throws Exception;
+  /**
+   * Enables caller (e.g., receiver) to attach its buffer allocator to this 
Data Collector in order
+   * to claim ownership of incoming batches; by default, the fragment 
allocator owns these batches.
+   *
+   * @param allocator operator buffer allocator
+   */
+  void setAllocator(BufferAllocator allocator);
+  /**
+   * @return allocator
+   */
+  BufferAllocator getAllocator();
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java
index 876c8b5b55..2d1b4f2f0c 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java
@@ -27,6 +27,7 @@
 import org.apache.drill.common.AutoCloseables;
 import org.apache.drill.common.concurrent.AutoCloseableLock;
 import org.apache.drill.exec.exception.FragmentSetupException;
+import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.proto.BitControl.Collector;
 import org.apache.drill.exec.proto.BitControl.PlanFragment;
@@ -103,8 +104,11 @@ public boolean batchArrived(final IncomingDataBatch 
incomingBatch) throws Fragme
             Arrays.toString(collectorMap.values().toArray())));
       }
 
+      // Use the Data Collector's buffer allocator if set, otherwise the 
fragment's one
+      BufferAllocator ownerAllocator = collector.getAllocator();
+
       synchronized (collector) {
-        final RawFragmentBatch newRawFragmentBatch = 
incomingBatch.newRawFragmentBatch(context.getAllocator());
+        final RawFragmentBatch newRawFragmentBatch = 
incomingBatch.newRawFragmentBatch(ownerAllocator);
         boolean decrementedToZero = collector
             
.batchArrived(incomingBatch.getHeader().getSendingMinorFragmentId(), 
newRawFragmentBatch);
         newRawFragmentBatch.release();
@@ -125,8 +129,8 @@ public int getRemainingRequired() {
     return rem;
   }
 
-  public RawBatchBuffer[] getBuffers(int senderMajorFragmentId) {
-    return collectorMap.get(senderMajorFragmentId).getBuffers();
+  public DataCollector getCollector(int senderMajorFragmentId) {
+    return collectorMap.get(senderMajorFragmentId);
   }
 
   public boolean isDone() {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> Unordered Receiver does not report its memory usage
> ---------------------------------------------------
>
>                 Key: DRILL-6348
>                 URL: https://issues.apache.org/jira/browse/DRILL-6348
>             Project: Apache Drill
>          Issue Type: Improvement
>          Components: Execution - Flow
>            Reporter: salim achouche
>            Assignee: salim achouche
>            Priority: Major
>              Labels: ready-to-commit
>             Fix For: 1.14.0
>
>
> The Drill Profile functionality doesn't show any memory usage for the 
> Unordered Receiver operator. This is problematic when analyzing OOM 
> conditions since we cannot account for all of a query memory usage. This Jira 
> is to fix memory reporting for the Unordered Receiver operator.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to