This is an automated email from the ASF dual-hosted git repository.

volodymyr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 1887cce249e79058c7328c2bbf094b3d979e6ab2
Author: weijie.tong <[email protected]>
AuthorDate: Sun Jan 6 15:48:55 2019 +0800

    DRILL-6947: Fix RuntimeFilter memory leak
---
 .../impl/filter/RuntimeFilterRecordBatch.java      | 44 ++++++++++++++++------
 .../exec/physical/impl/join/HashJoinBatch.java     |  8 +++-
 .../exec/work/filter/RuntimeFilterWritable.java    | 21 +++++++++++
 3 files changed, 60 insertions(+), 13 deletions(-)

diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java
index bf7ed79..ac6718c 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java
@@ -224,21 +224,41 @@ public class RuntimeFilterRecordBatch extends 
AbstractSingleRecordBatch<RuntimeF
     setupHashHelper();
     //To make each independent bloom filter work together to construct a final 
filter result: BitSet.
     BitSet bitSet = new BitSet(originalRecordCount);
-    for (int i = 0; i < toFilterFields.size(); i++) {
-      BloomFilter bloomFilter = bloomFilters.get(i);
-      String fieldName = toFilterFields.get(i);
-      computeBitSet(field2id.get(fieldName), bloomFilter, bitSet);
-    }
+
+    int filterSize = toFilterFields.size();
     int svIndex = 0;
-    for (int i = 0; i < originalRecordCount; i++) {
-      boolean contain = bitSet.get(i);
-      if (contain) {
-        sv2.setIndex(svIndex, i);
-        svIndex++;
-      } else {
-        filteredRows++;
+    if (filterSize == 1) {
+      BloomFilter bloomFilter = bloomFilters.get(0);
+      String fieldName = toFilterFields.get(0);
+      int fieldId = field2id.get(fieldName);
+      for (int rowIndex = 0; rowIndex < originalRecordCount; rowIndex++) {
+        long hash = hash64.hash64Code(rowIndex, 0, fieldId);
+        boolean contain = bloomFilter.find(hash);
+        if (contain) {
+          sv2.setIndex(svIndex, rowIndex);
+          svIndex++;
+        } else {
+          filteredRows++;
+        }
+      }
+    } else {
+      for (int i = 0; i < toFilterFields.size(); i++) {
+        BloomFilter bloomFilter = bloomFilters.get(i);
+        String fieldName = toFilterFields.get(i);
+        computeBitSet(field2id.get(fieldName), bloomFilter, bitSet);
+      }
+      for (int i = 0; i < originalRecordCount; i++) {
+        boolean contain = bitSet.get(i);
+        if (contain) {
+          sv2.setIndex(svIndex, i);
+          svIndex++;
+        } else {
+          filteredRows++;
+        }
       }
     }
+
+
     appliedTimes++;
     sv2.setRecordCount(svIndex);
   }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 0ac0809..30e8af7 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -213,6 +213,7 @@ public class HashJoinBatch extends 
AbstractBinaryRecordBatch<HashJoinPOP> implem
   private Map<BloomFilter, Integer> bloomFilter2buildId = new HashMap<>();
   private Map<BloomFilterDef, Integer> bloomFilterDef2buildId = new 
HashMap<>();
   private List<BloomFilter> bloomFilters = new ArrayList<>();
+  private boolean bloomFiltersGenerated = false;
 
   /**
    * This holds information about the spilled partitions for the build and 
probe side.
@@ -818,8 +819,12 @@ public class HashJoinBatch extends 
AbstractBinaryRecordBatch<HashJoinPOP> implem
 
   }
 
+  /**
+   * Note:
+   * This method can not be called again as part of recursive call of 
executeBuildPhase() to handle spilled build partitions.
+   */
   private void initializeRuntimeFilter() {
-    if (!enableRuntimeFilter) {
+    if (!enableRuntimeFilter || bloomFiltersGenerated) {
       return;
     }
     runtimeFilterReporter = new 
RuntimeFilterReporter((ExecutorFragmentContext) context);
@@ -838,6 +843,7 @@ public class HashJoinBatch extends 
AbstractBinaryRecordBatch<HashJoinPOP> implem
         bloomFilter2buildId.put(bloomFilter, buildFieldId);
       }
     }
+    bloomFiltersGenerated = true;
   }
 
   /**
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java
index f8c2701..aebd010 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java
@@ -22,6 +22,7 @@ import io.netty.buffer.DrillBuf;
 import org.apache.drill.common.AutoCloseables;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.proto.BitData;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -39,6 +40,9 @@ public class RuntimeFilterWritable implements 
AutoCloseables.Closeable{
   private String identifier;
 
   public RuntimeFilterWritable(BitData.RuntimeFilterBDef runtimeFilterBDef, 
DrillBuf... data) {
+    List<Integer> bfSizeInBytes = 
runtimeFilterBDef.getBloomFilterSizeInBytesList();
+    int bufArrLen = data.length;
+    Preconditions.checkArgument(bfSizeInBytes.size() == bufArrLen, "the input 
DrillBuf number does not match the metadata definition!");
     this.runtimeFilterBDef = runtimeFilterBDef;
     this.data = data;
     this.identifier = "majorFragmentId:" + 
runtimeFilterBDef.getMajorFragmentId()
@@ -46,6 +50,23 @@ public class RuntimeFilterWritable implements 
AutoCloseables.Closeable{
       + ", srcOperatorId:" + runtimeFilterBDef.getHjOpId();
   }
 
+  public RuntimeFilterWritable(BitData.RuntimeFilterBDef runtimeFilterBDef, 
DrillBuf data) {
+    this.runtimeFilterBDef = runtimeFilterBDef;
+    List<Integer> bfSizeInBytes = 
runtimeFilterBDef.getBloomFilterSizeInBytesList();
+    int boomFilterNum = bfSizeInBytes.size();
+    this.data = new DrillBuf[boomFilterNum];
+    int index = 0;
+    for (int i = 0; i < boomFilterNum; i++) {
+      int length = bfSizeInBytes.get(i);
+      this.data[i] = data.slice(index, length);
+      index = index + length;
+    }
+
+    this.identifier = "majorFragmentId:" + 
runtimeFilterBDef.getMajorFragmentId()
+                      + ",minorFragmentId:" + 
runtimeFilterBDef.getMinorFragmentId()
+                      + ", srcOperatorId:" + runtimeFilterBDef.getHjOpId();
+  }
+
 
   public BitData.RuntimeFilterBDef getRuntimeFilterBDef() {
     return runtimeFilterBDef;

Reply via email to