This is an automated email from the ASF dual-hosted git repository. volodymyr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
commit 1887cce249e79058c7328c2bbf094b3d979e6ab2 Author: weijie.tong <[email protected]> AuthorDate: Sun Jan 6 15:48:55 2019 +0800 DRILL-6947: Fix RuntimeFilter memory leak --- .../impl/filter/RuntimeFilterRecordBatch.java | 44 ++++++++++++++++------ .../exec/physical/impl/join/HashJoinBatch.java | 8 +++- .../exec/work/filter/RuntimeFilterWritable.java | 21 +++++++++++ 3 files changed, 60 insertions(+), 13 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java index bf7ed79..ac6718c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java @@ -224,21 +224,41 @@ public class RuntimeFilterRecordBatch extends AbstractSingleRecordBatch<RuntimeF setupHashHelper(); //To make each independent bloom filter work together to construct a final filter result: BitSet. BitSet bitSet = new BitSet(originalRecordCount); - for (int i = 0; i < toFilterFields.size(); i++) { - BloomFilter bloomFilter = bloomFilters.get(i); - String fieldName = toFilterFields.get(i); - computeBitSet(field2id.get(fieldName), bloomFilter, bitSet); - } + + int filterSize = toFilterFields.size(); int svIndex = 0; - for (int i = 0; i < originalRecordCount; i++) { - boolean contain = bitSet.get(i); - if (contain) { - sv2.setIndex(svIndex, i); - svIndex++; - } else { - filteredRows++; + if (filterSize == 1) { + BloomFilter bloomFilter = bloomFilters.get(0); + String fieldName = toFilterFields.get(0); + int fieldId = field2id.get(fieldName); + for (int rowIndex = 0; rowIndex < originalRecordCount; rowIndex++) { + long hash = hash64.hash64Code(rowIndex, 0, fieldId); + boolean contain = bloomFilter.find(hash); + if (contain) { + sv2.setIndex(svIndex, rowIndex); + svIndex++; + } else { + filteredRows++; + } + } + } else { + for (int i = 0; i < toFilterFields.size(); i++) { + BloomFilter bloomFilter = bloomFilters.get(i); + String fieldName = toFilterFields.get(i); + computeBitSet(field2id.get(fieldName), bloomFilter, bitSet); + } + for (int i = 0; i < originalRecordCount; i++) { + boolean contain = bitSet.get(i); + if (contain) { + sv2.setIndex(svIndex, i); + svIndex++; + } else { + filteredRows++; + } } } + + appliedTimes++; sv2.setRecordCount(svIndex); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java index 0ac0809..30e8af7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java @@ -213,6 +213,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implem private Map<BloomFilter, Integer> bloomFilter2buildId = new HashMap<>(); private Map<BloomFilterDef, Integer> bloomFilterDef2buildId = new HashMap<>(); private List<BloomFilter> bloomFilters = new ArrayList<>(); + private boolean bloomFiltersGenerated = false; /** * This holds information about the spilled partitions for the build and probe side. @@ -818,8 +819,12 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implem } + /** + * Note: + * This method can not be called again as part of recursive call of executeBuildPhase() to handle spilled build partitions. + */ private void initializeRuntimeFilter() { - if (!enableRuntimeFilter) { + if (!enableRuntimeFilter || bloomFiltersGenerated) { return; } runtimeFilterReporter = new RuntimeFilterReporter((ExecutorFragmentContext) context); @@ -838,6 +843,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implem bloomFilter2buildId.put(bloomFilter, buildFieldId); } } + bloomFiltersGenerated = true; } /** diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java index f8c2701..aebd010 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java @@ -22,6 +22,7 @@ import io.netty.buffer.DrillBuf; import org.apache.drill.common.AutoCloseables; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.proto.BitData; +import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; import java.util.ArrayList; import java.util.List; @@ -39,6 +40,9 @@ public class RuntimeFilterWritable implements AutoCloseables.Closeable{ private String identifier; public RuntimeFilterWritable(BitData.RuntimeFilterBDef runtimeFilterBDef, DrillBuf... data) { + List<Integer> bfSizeInBytes = runtimeFilterBDef.getBloomFilterSizeInBytesList(); + int bufArrLen = data.length; + Preconditions.checkArgument(bfSizeInBytes.size() == bufArrLen, "the input DrillBuf number does not match the metadata definition!"); this.runtimeFilterBDef = runtimeFilterBDef; this.data = data; this.identifier = "majorFragmentId:" + runtimeFilterBDef.getMajorFragmentId() @@ -46,6 +50,23 @@ public class RuntimeFilterWritable implements AutoCloseables.Closeable{ + ", srcOperatorId:" + runtimeFilterBDef.getHjOpId(); } + public RuntimeFilterWritable(BitData.RuntimeFilterBDef runtimeFilterBDef, DrillBuf data) { + this.runtimeFilterBDef = runtimeFilterBDef; + List<Integer> bfSizeInBytes = runtimeFilterBDef.getBloomFilterSizeInBytesList(); + int boomFilterNum = bfSizeInBytes.size(); + this.data = new DrillBuf[boomFilterNum]; + int index = 0; + for (int i = 0; i < boomFilterNum; i++) { + int length = bfSizeInBytes.get(i); + this.data[i] = data.slice(index, length); + index = index + length; + } + + this.identifier = "majorFragmentId:" + runtimeFilterBDef.getMajorFragmentId() + + ",minorFragmentId:" + runtimeFilterBDef.getMinorFragmentId() + + ", srcOperatorId:" + runtimeFilterBDef.getHjOpId(); + } + public BitData.RuntimeFilterBDef getRuntimeFilterBDef() { return runtimeFilterBDef;
