This is an automated email from the ASF dual-hosted git repository. volodymyr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
commit 4e03d54cd854c08a5ed96a67e7c27f02fa5ff435 Author: weijie.tong <[email protected]> AuthorDate: Thu Jan 24 22:06:40 2019 +0800 DRILL-6999: Fix the case that there's more than one join conditions closes #1600 --- .../drill/exec/rpc/data/DataServerRequestHandler.java | 13 ++++++++++++- .../drill/exec/work/filter/RuntimeFilterWritable.java | 18 ------------------ 2 files changed, 12 insertions(+), 19 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServerRequestHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServerRequestHandler.java index e60fcae..5ad7ba4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServerRequestHandler.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServerRequestHandler.java @@ -37,6 +37,7 @@ import org.apache.drill.exec.work.filter.RuntimeFilterWritable; import org.apache.drill.exec.work.fragment.FragmentManager; import java.io.IOException; +import java.util.List; import java.util.concurrent.ThreadLocalRandom; // package private @@ -106,7 +107,17 @@ class DataServerRequestHandler implements RequestHandler<DataServerConnection> { if (dBody == null) { return; } - RuntimeFilterWritable runtimeFilterWritable = new RuntimeFilterWritable(runtimeFilterBDef, (DrillBuf) dBody); + List<Integer> bfSizeInBytes = runtimeFilterBDef.getBloomFilterSizeInBytesList(); + int boomFilterNum = bfSizeInBytes.size(); + DrillBuf data = (DrillBuf) dBody; + DrillBuf[] bufs = new DrillBuf[boomFilterNum]; + int index = 0; + for (int i = 0; i < boomFilterNum; i++) { + int length = bfSizeInBytes.get(i); + bufs[i] = data.slice(index, length); + index = index + length; + } + RuntimeFilterWritable runtimeFilterWritable = new RuntimeFilterWritable(runtimeFilterBDef, bufs); AckSender ackSender = new AckSender(sender); ackSender.increment(); try { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java index aebd010..566781b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java @@ -50,24 +50,6 @@ public class RuntimeFilterWritable implements AutoCloseables.Closeable{ + ", srcOperatorId:" + runtimeFilterBDef.getHjOpId(); } - public RuntimeFilterWritable(BitData.RuntimeFilterBDef runtimeFilterBDef, DrillBuf data) { - this.runtimeFilterBDef = runtimeFilterBDef; - List<Integer> bfSizeInBytes = runtimeFilterBDef.getBloomFilterSizeInBytesList(); - int boomFilterNum = bfSizeInBytes.size(); - this.data = new DrillBuf[boomFilterNum]; - int index = 0; - for (int i = 0; i < boomFilterNum; i++) { - int length = bfSizeInBytes.get(i); - this.data[i] = data.slice(index, length); - index = index + length; - } - - this.identifier = "majorFragmentId:" + runtimeFilterBDef.getMajorFragmentId() - + ",minorFragmentId:" + runtimeFilterBDef.getMinorFragmentId() - + ", srcOperatorId:" + runtimeFilterBDef.getHjOpId(); - } - - public BitData.RuntimeFilterBDef getRuntimeFilterBDef() { return runtimeFilterBDef; }
