[ https://issues.apache.org/jira/browse/DRILL-6731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16645517#comment-16645517 ]
ASF GitHub Bot commented on DRILL-6731: --------------------------------------- sohami closed pull request #1459: DRILL-6731: Move the BFs aggregating work from the Foreman to the RuntimeFi… URL: https://github.com/apache/drill/pull/1459 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java index 608f05c567a..88c21d9e957 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java @@ -21,6 +21,7 @@ import java.util.List; import java.util.concurrent.ExecutorService; +import org.apache.drill.exec.work.filter.RuntimeFilterSink; import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting; import org.apache.calcite.schema.SchemaPlus; import org.apache.drill.common.config.DrillConfig; @@ -160,16 +161,15 @@ BufferAllocator getNewChildAllocator(final String operatorName, void close(); /** - * Return null ,if setRuntimeFilter not being called * @return */ - RuntimeFilterWritable getRuntimeFilter(); + RuntimeFilterSink getRuntimeFilterSink(); /** - * Set a RuntimeFilter when the RuntimeFilter receiver belongs to the same MinorFragment + * add a RuntimeFilter when the RuntimeFilter receiver belongs to the same MinorFragment * @param runtimeFilter */ - public void setRuntimeFilter(RuntimeFilterWritable runtimeFilter); + public void addRuntimeFilter(RuntimeFilterWritable runtimeFilter); interface ExecutorState { /** diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java index a8980785a32..fcfdc8c6b54 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java @@ -61,6 +61,7 @@ import org.apache.drill.exec.util.ImpersonationUtil; import org.apache.drill.exec.work.batch.IncomingBuffers; +import org.apache.drill.exec.work.filter.RuntimeFilterSink; import org.apache.drill.shaded.guava.com.google.common.base.Function; import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; import org.apache.drill.shaded.guava.com.google.common.collect.Lists; @@ -136,7 +137,7 @@ public void interrupt(final InterruptedException e) { /** Stores constants and their holders by type */ private final Map<String, Map<MinorType, ValueHolder>> constantValueHolderCache; - private RuntimeFilterWritable runtimeFilterWritable; + private RuntimeFilterSink runtimeFilterSink; /** * Create a FragmentContext instance for non-root fragment. @@ -208,6 +209,11 @@ public FragmentContextImpl(final DrillbitContext dbContext, final PlanFragment f stats = new FragmentStats(allocator, fragment.getAssignment()); bufferManager = new BufferManagerImpl(this.allocator); constantValueHolderCache = Maps.newHashMap(); + boolean enableRF = context.getOptionManager().getOption(ExecConstants.HASHJOIN_ENABLE_RUNTIME_FILTER); + if (enableRF) { + ExecutorService executorService = context.getExecutor(); + this.runtimeFilterSink = new RuntimeFilterSink(this.allocator, executorService); + } } /** @@ -348,13 +354,13 @@ public boolean isUserAuthenticationEnabled() { } @Override - public void setRuntimeFilter(RuntimeFilterWritable runtimeFilter) { - this.runtimeFilterWritable = runtimeFilter; + public void addRuntimeFilter(RuntimeFilterWritable runtimeFilter) { + this.runtimeFilterSink.aggregate(runtimeFilter); } @Override - public RuntimeFilterWritable getRuntimeFilter() { - return runtimeFilterWritable; + public RuntimeFilterSink getRuntimeFilterSink() { + return runtimeFilterSink; } /** @@ -470,9 +476,7 @@ public void close() { for (OperatorContextImpl opContext : contexts) { suppressingClose(opContext); } - if (runtimeFilterWritable != null) { - suppressingClose(runtimeFilterWritable); - } + suppressingClose(runtimeFilterSink); suppressingClose(bufferManager); suppressingClose(allocator); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java index bc21580d369..9248bbc698e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java @@ -36,7 +36,9 @@ import org.apache.drill.exec.record.selection.SelectionVector2; import org.apache.drill.exec.record.selection.SelectionVector4; import org.apache.drill.exec.work.filter.BloomFilter; +import org.apache.drill.exec.work.filter.RuntimeFilterSink; import org.apache.drill.exec.work.filter.RuntimeFilterWritable; + import java.util.ArrayList; import java.util.BitSet; import java.util.HashMap; @@ -56,6 +58,8 @@ private Map<String, Integer> field2id = new HashMap<>(); private List<String> toFilterFields; private List<BloomFilter> bloomFilters; + private RuntimeFilterWritable current; + private RuntimeFilterWritable previous; private int originalRecordCount; private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RuntimeFilterRecordBatch.class); @@ -102,6 +106,9 @@ public void close() { sv2.clear(); } super.close(); + if (current != null) { + current.close(); + } } @Override @@ -148,30 +155,36 @@ protected boolean setupNewSchema() throws SchemaChangeException { * schema change hash64 should be reset and this method needs to be called again. */ private void setupHashHelper() { - final RuntimeFilterWritable runtimeFilterWritable = context.getRuntimeFilter(); - + final RuntimeFilterSink runtimeFilterSink = context.getRuntimeFilterSink(); // Check if RuntimeFilterWritable was received by the minor fragment or not - if (runtimeFilterWritable == null) { + if (!runtimeFilterSink.containOne()) { return; } - - // Check if bloomFilters is initialized or not - if (bloomFilters == null) { - bloomFilters = runtimeFilterWritable.unwrap(); + if (runtimeFilterSink.hasFreshOne()) { + RuntimeFilterWritable freshRuntimeFilterWritable = runtimeFilterSink.fetchLatestDuplicatedAggregatedOne(); + if (current == null) { + current = freshRuntimeFilterWritable; + previous = freshRuntimeFilterWritable; + } else { + previous = current; + current = freshRuntimeFilterWritable; + previous.close(); + } + bloomFilters = current.unwrap(); } - // Check if HashHelper is initialized or not if (hash64 == null) { ValueVectorHashHelper hashHelper = new ValueVectorHashHelper(incoming, context); try { //generate hash helper - this.toFilterFields = runtimeFilterWritable.getRuntimeFilterBDef().getProbeFieldsList(); + this.toFilterFields = current.getRuntimeFilterBDef().getProbeFieldsList(); List<LogicalExpression> hashFieldExps = new ArrayList<>(); List<TypedFieldId> typedFieldIds = new ArrayList<>(); for (String toFilterField : toFilterFields) { SchemaPath schemaPath = new SchemaPath(new PathSegment.NameSegment(toFilterField), ExpressionPosition.UNKNOWN); TypedFieldId typedFieldId = container.getValueVectorId(schemaPath); - this.field2id.put(toFilterField, typedFieldId.getFieldIds()[0]); + int[] fieldIds = typedFieldId.getFieldIds(); + this.field2id.put(toFilterField, fieldIds[0]); typedFieldIds.add(typedFieldId); ValueVectorReadExpression toHashFieldExp = new ValueVectorReadExpression(typedFieldId); hashFieldExps.add(toHashFieldExp); @@ -195,11 +208,9 @@ private void applyRuntimeFilter() throws SchemaChangeException { sv2.setRecordCount(0); return; } - - final RuntimeFilterWritable runtimeFilterWritable = context.getRuntimeFilter(); + final RuntimeFilterSink runtimeFilterSink = context.getRuntimeFilterSink(); sv2.allocateNew(originalRecordCount); - - if (runtimeFilterWritable == null) { + if (!runtimeFilterSink.containOne()) { // means none of the rows are filtered out hence set all the indexes for (int i = 0; i < originalRecordCount; ++i) { sv2.setIndex(i, i); @@ -207,10 +218,8 @@ private void applyRuntimeFilter() throws SchemaChangeException { sv2.setRecordCount(originalRecordCount); return; } - - // Setup a hash helper if need be + // Setup a hash helper if needed setupHashHelper(); - //To make each independent bloom filter work together to construct a final filter result: BitSet. BitSet bitSet = new BitSet(originalRecordCount); for (int i = 0; i < toFilterFields.size(); i++) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java index 658f03a338c..3d456967f08 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java @@ -724,7 +724,7 @@ private void initializeRuntimeFilter() { runtimeFilterReporter = new RuntimeFilterReporter((ExecutorFragmentContext) context); RuntimeFilterDef runtimeFilterDef = popConfig.getRuntimeFilterDef(); //RuntimeFilter is not a necessary part of a HashJoin operator, only the query which satisfy the - //RuntimeFilterManager's judgement will have the RuntimeFilterDef. + //RuntimeFilterRouter's judgement will have the RuntimeFilterDef. if (runtimeFilterDef != null) { List<BloomFilterDef> bloomFilterDefs = runtimeFilterDef.getBloomFilterDefs(); for (BloomFilterDef bloomFilterDef : bloomFilterDefs) { @@ -944,7 +944,8 @@ public IterOutcome executeBuildPhase() throws SchemaChangeException { if (cycleNum == 0 && enableRuntimeFilter) { if (bloomFilters.size() > 0) { - runtimeFilterReporter.sendOut(bloomFilters, probeFields, this.popConfig.getRuntimeFilterDef().isSendToForeman()); + int hashJoinOpId = this.popConfig.getOperatorId(); + runtimeFilterReporter.sendOut(bloomFilters, probeFields, this.popConfig.getRuntimeFilterDef().isSendToForeman(), hashJoinOpId); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RuntimeFilterVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RuntimeFilterVisitor.java index c31e491a388..fcfa2bca1fc 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RuntimeFilterVisitor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RuntimeFilterVisitor.java @@ -197,11 +197,8 @@ public Void visitPrel(Prel prel, RFHelperHolder holder) throws RuntimeException @Override public Void visitExchange(ExchangePrel exchange, RFHelperHolder holder) throws RuntimeException { if (holder != null) { - boolean broadcastExchange = exchange instanceof BroadcastExchangePrel; if (holder.isFromBuildSide()) { - //To the build side ,we need to identify whether the HashJoin's direct children have a Broadcast node to mark - //this HashJoin as BroadcastHashJoin - holder.setEncounteredBroadcastExchange(broadcastExchange); + holder.setBuildSideExchange(exchange); } } return visitPrel(exchange, holder); @@ -224,19 +221,11 @@ public Void visitJoin(JoinPrel prel, RFHelperHolder holder) throws RuntimeExcept Prel right = (Prel) hashJoinPrel.getRight(); holder.setFromBuildSide(true); right.accept(this, holder); - boolean buildSideEncountererdBroadcastExchange = holder.isEncounteredBroadcastExchange(); - if (buildSideEncountererdBroadcastExchange) { - runtimeFilterDef.setSendToForeman(false); - } else { - runtimeFilterDef.setSendToForeman(true); - } + boolean routeToForeman = holder.needToRouteToForeman(); + runtimeFilterDef.setSendToForeman(routeToForeman); List<BloomFilterDef> bloomFilterDefs = runtimeFilterDef.getBloomFilterDefs(); for (BloomFilterDef bloomFilterDef : bloomFilterDefs) { - if (buildSideEncountererdBroadcastExchange) { - bloomFilterDef.setLocal(true); - } else { - bloomFilterDef.setLocal(false); - } + bloomFilterDef.setLocal(!routeToForeman); } } } @@ -338,18 +327,17 @@ public boolean isEncounteredBlockNode() { * RuntimeFilter helper util holder */ private static class RFHelperHolder { - //whether this join operator is a partitioned HashJoin or broadcast HashJoin, - //also single node HashJoin is not expected to do JPPD. - private boolean encounteredBroadcastExchange; private boolean fromBuildSide; - public boolean isEncounteredBroadcastExchange() { - return encounteredBroadcastExchange; + private ExchangePrel exchangePrel; + + public void setBuildSideExchange(ExchangePrel exchange){ + this.exchangePrel = exchange; } - public void setEncounteredBroadcastExchange(boolean encounteredBroadcastExchange) { - this.encounteredBroadcastExchange = encounteredBroadcastExchange; + public boolean needToRouteToForeman() { + return exchangePrel != null && !(exchangePrel instanceof BroadcastExchangePrel); } public boolean isFromBuildSide() { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java index bf91ed3cdfa..0d97e0ac308 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java @@ -395,7 +395,7 @@ public void run() { final String originalName = currentThread.getName(); currentThread.setName(queryIdStr + ":foreman:registerRuntimeFilter"); try { - foreman.getRuntimeFilterManager().registerRuntimeFilter(runtimeFilter); + foreman.getRuntimeFilterRouter().registerRuntimeFilter(runtimeFilter); } catch (Exception e) { logger.warn("Exception while registering the RuntimeFilter", e); } finally { @@ -413,7 +413,7 @@ public void run() { .setQueryId(queryId).build(); FragmentExecutor fragmentExecutor = runningFragments.get(fragmentHandle); if (fragmentExecutor != null) { - fragmentExecutor.getContext().setRuntimeFilter(runtimeFilter); + fragmentExecutor.getContext().addRuntimeFilter(runtimeFilter); } } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterReporter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterReporter.java index e6ede7a4393..6e4a9a8e511 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterReporter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterReporter.java @@ -39,7 +39,7 @@ public RuntimeFilterReporter(ExecutorFragmentContext context) { this.context = context; } - public void sendOut(List<BloomFilter> bloomFilters, List<String> probeFields, boolean sendToForeman) { + public void sendOut(List<BloomFilter> bloomFilters, List<String> probeFields, boolean sendToForeman, int hashJoinOpId) { ExecProtos.FragmentHandle fragmentHandle = context.getHandle(); DrillBuf[] data = new DrillBuf[bloomFilters.size()]; List<Integer> bloomFilterSizeInBytes = new ArrayList<>(); @@ -63,6 +63,7 @@ public void sendOut(List<BloomFilter> bloomFilters, List<String> probeFields, bo .setMajorFragmentId(majorFragmentId) .setMinorFragmentId(minorFragmentId) .setToForeman(sendToForeman) + .setHjOpId(hashJoinOpId) .addAllBloomFilterSizeInBytes(bloomFilterSizeInBytes) .build(); RuntimeFilterWritable runtimeFilterWritable = new RuntimeFilterWritable(runtimeFilterB, data); @@ -72,7 +73,7 @@ public void sendOut(List<BloomFilter> bloomFilters, List<String> probeFields, bo AccountingDataTunnel dataTunnel = context.getDataTunnel(foremanEndpoint); dataTunnel.sendRuntimeFilter(runtimeFilterWritable); } else { - context.setRuntimeFilter(runtimeFilterWritable); + context.addRuntimeFilter(runtimeFilterWritable); } } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterRouter.java similarity index 87% rename from exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterManager.java rename to exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterRouter.java index e3f89a6e727..5a8c6fc9e1f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterRouter.java @@ -18,7 +18,7 @@ package org.apache.drill.exec.work.filter; import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; +import io.netty.buffer.DrillBuf; import org.apache.commons.collections.CollectionUtils; import org.apache.drill.exec.ops.AccountingDataTunnel; import org.apache.drill.exec.ops.Consumer; @@ -35,7 +35,6 @@ import org.apache.drill.exec.proto.CoordinationProtos; import org.apache.drill.exec.proto.GeneralRPCProtos; import org.apache.drill.exec.proto.UserBitShared; -import org.apache.drill.exec.proto.helper.QueryIdHelper; import org.apache.drill.exec.rpc.RpcException; import org.apache.drill.exec.rpc.RpcOutcomeListener; import org.apache.drill.exec.rpc.data.DataTunnel; @@ -60,17 +59,14 @@ * The HashJoinRecordBatch is responsible to generate the RuntimeFilter. * To Partitioned case: * The generated RuntimeFilter will be sent to the Foreman node. The Foreman node receives the RuntimeFilter - * async, aggregates them, broadcasts them the Scan nodes's MinorFragment. The RuntimeFilterRecordBatch which - * steps over the Scan node will leverage the received RuntimeFilter to filter out the scanned rows to generate - * the SV2. + * async, broadcasts them to the Scan nodes's MinorFragment. The RuntimeFilterRecordBatch which is downstream + * to the Scan node will aggregate all the received RuntimeFilter and will leverage it to filter out the + * scanned rows to generate the SV2. * To Broadcast case: * The generated RuntimeFilter will be sent to Scan node's RuntimeFilterRecordBatch directly. The working of the * RuntimeFilterRecordBath is the same as the Partitioned one. - * - * - * */ -public class RuntimeFilterManager { +public class RuntimeFilterRouter { private Wrapper rootWrapper; //HashJoin node's major fragment id to its corresponding probe side nodes's endpoints @@ -79,14 +75,12 @@ private Map<Integer, Integer> joinMjId2scanSize = new ConcurrentHashMap<>(); //HashJoin node's major fragment id to its corresponding probe side scan node's belonging major fragment id private Map<Integer, Integer> joinMjId2ScanMjId = new HashMap<>(); - //HashJoin node's major fragment id to its aggregated RuntimeFilterWritable - private Map<Integer, RuntimeFilterWritable> joinMjId2AggregatedRF = new ConcurrentHashMap<>(); private DrillbitContext drillbitContext; private SendingAccountor sendingAccountor = new SendingAccountor(); - private static final Logger logger = LoggerFactory.getLogger(RuntimeFilterManager.class); + private static final Logger logger = LoggerFactory.getLogger(RuntimeFilterRouter.class); /** * This class maintains context for the runtime join push down's filter management. It @@ -95,7 +89,7 @@ * @param workUnit * @param drillbitContext */ - public RuntimeFilterManager(QueryWorkUnit workUnit, DrillbitContext drillbitContext) { + public RuntimeFilterRouter(QueryWorkUnit workUnit, DrillbitContext drillbitContext) { this.rootWrapper = workUnit.getRootWrapper(); this.drillbitContext = drillbitContext; } @@ -134,32 +128,16 @@ public void waitForComplete() { * @param runtimeFilterWritable */ public void registerRuntimeFilter(RuntimeFilterWritable runtimeFilterWritable) { - BitData.RuntimeFilterBDef runtimeFilterB = runtimeFilterWritable.getRuntimeFilterBDef(); - int majorId = runtimeFilterB.getMajorFragmentId(); - UserBitShared.QueryId queryId = runtimeFilterB.getQueryId(); - List<String> probeFields = runtimeFilterB.getProbeFieldsList(); - logger.info("RuntimeFilterManager receives a runtime filter , majorId:{}, queryId:{}", majorId, QueryIdHelper.getQueryId(queryId)); - int size; - synchronized (this) { - size = joinMjId2scanSize.get(majorId); - Preconditions.checkState(size > 0); - RuntimeFilterWritable aggregatedRuntimeFilter = joinMjId2AggregatedRF.get(majorId); - if (aggregatedRuntimeFilter == null) { - aggregatedRuntimeFilter = runtimeFilterWritable; - } else { - aggregatedRuntimeFilter.aggregate(runtimeFilterWritable); - } - joinMjId2AggregatedRF.put(majorId, aggregatedRuntimeFilter); - size--; - joinMjId2scanSize.put(majorId, size); - } - if (size == 0) { - broadcastAggregatedRuntimeFilter(majorId, queryId, probeFields); - } + broadcastAggregatedRuntimeFilter(runtimeFilterWritable); } - private void broadcastAggregatedRuntimeFilter(int joinMajorId, UserBitShared.QueryId queryId, List<String> probeFields) { + private void broadcastAggregatedRuntimeFilter(RuntimeFilterWritable srcRuntimeFilterWritable) { + BitData.RuntimeFilterBDef runtimeFilterB = srcRuntimeFilterWritable.getRuntimeFilterBDef(); + int joinMajorId = runtimeFilterB.getMajorFragmentId(); + UserBitShared.QueryId queryId = runtimeFilterB.getQueryId(); + List<String> probeFields = runtimeFilterB.getProbeFieldsList(); + DrillBuf[] data = srcRuntimeFilterWritable.getData(); List<CoordinationProtos.DrillbitEndpoint> scanNodeEps = joinMjId2probdeScanEps.get(joinMajorId); int scanNodeMjId = joinMjId2ScanMjId.get(joinMajorId); for (int minorId = 0; minorId < scanNodeEps.size(); minorId++) { @@ -172,10 +150,8 @@ private void broadcastAggregatedRuntimeFilter(int joinMajorId, UserBitShared.Que .setMajorFragmentId(scanNodeMjId) .setMinorFragmentId(minorId) .build(); - RuntimeFilterWritable aggregatedRuntimeFilter = joinMjId2AggregatedRF.get(joinMajorId); - RuntimeFilterWritable runtimeFilterWritable = new RuntimeFilterWritable(runtimeFilterBDef, aggregatedRuntimeFilter.getData()); + RuntimeFilterWritable runtimeFilterWritable = new RuntimeFilterWritable(runtimeFilterBDef, data); CoordinationProtos.DrillbitEndpoint drillbitEndpoint = scanNodeEps.get(minorId); - DataTunnel dataTunnel = drillbitContext.getDataConnectionsPool().getTunnel(drillbitEndpoint); Consumer<RpcException> exceptionConsumer = new Consumer<RpcException>() { @Override @@ -235,8 +211,6 @@ public Void visitOp(PhysicalOperator op, RFHelperHolder holder) throws RuntimeEx private class WrapperOperatorsVisitor extends AbstractPhysicalVisitor<Void, Void, RuntimeException> { - private PhysicalOperator targetOp; - private Fragment fragment; private boolean contain = false; @@ -251,7 +225,6 @@ public Void visitOp(PhysicalOperator op, RFHelperHolder holder) throws RuntimeEx public WrapperOperatorsVisitor(PhysicalOperator targetOp, Fragment fragment) { - this.targetOp = targetOp; this.fragment = fragment; this.targetIsGroupScan = targetOp instanceof GroupScan; this.targetIsHashJoin = targetOp instanceof HashJoinPOP; @@ -343,13 +316,10 @@ private Wrapper findPhysicalOpContainer(Wrapper wrapper, PhysicalOperator op) { private int probeSideScanMajorId; - - private List<CoordinationProtos.DrillbitEndpoint> probeSideScanEndpoints; private RuntimeFilterDef runtimeFilterDef; - public List<CoordinationProtos.DrillbitEndpoint> getProbeSideScanEndpoints() { return probeSideScanEndpoints; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java new file mode 100644 index 00000000000..14686254f93 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.work.filter; + +import org.apache.drill.exec.memory.BufferAllocator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +/** + * This sink receives the RuntimeFilters from the netty thread, + * aggregates them in an async thread, supplies the aggregated + * one to the fragment running thread. + */ +public class RuntimeFilterSink implements AutoCloseable { + + private AtomicInteger currentBookId = new AtomicInteger(0); + + private int staleBookId = 0; + + /** + * RuntimeFilterWritable holding the aggregated version of all the received filter + */ + private RuntimeFilterWritable aggregated = null; + + private BlockingQueue<RuntimeFilterWritable> rfQueue = new LinkedBlockingQueue<>(); + + /** + * Flag used by Minor Fragment thread to indicate it has encountered error + */ + private AtomicBoolean running = new AtomicBoolean(true); + + /** + * Lock used to synchronize between producer (Netty Thread) and consumer (AsyncAggregateThread) of elements of this + * queue. This is needed because in error condition running flag can be consumed by producer and consumer thread at + * different times. Whoever sees it first will take this lock and clear all elements and set the queue to null to + * indicate producer not to put any new elements in it. + */ + private ReentrantLock queueLock = new ReentrantLock(); + + private Condition notEmpty = queueLock.newCondition(); + + private ReentrantLock aggregatedRFLock = new ReentrantLock(); + + private BufferAllocator bufferAllocator; + + private Future future; + + private static final Logger logger = LoggerFactory.getLogger(RuntimeFilterSink.class); + + + public RuntimeFilterSink(BufferAllocator bufferAllocator, ExecutorService executorService) { + this.bufferAllocator = bufferAllocator; + AsyncAggregateWorker asyncAggregateWorker = new AsyncAggregateWorker(); + future = executorService.submit(asyncAggregateWorker); + } + + public void aggregate(RuntimeFilterWritable runtimeFilterWritable) { + if (running.get()) { + try { + aggregatedRFLock.lock(); + if (containOne()) { + boolean same = aggregated.equals(runtimeFilterWritable); + if (!same) { + // This is to solve the only one fragment case that two RuntimeFilterRecordBatchs + // share the same FragmentContext. + aggregated.close(); + currentBookId.set(0); + staleBookId = 0; + clearQueued(false); + } + } + } finally { + aggregatedRFLock.unlock(); + } + + try { + queueLock.lock(); + if (rfQueue != null) { + rfQueue.add(runtimeFilterWritable); + notEmpty.signal(); + } else { + runtimeFilterWritable.close(); + } + } finally { + queueLock.unlock(); + } + } else { + runtimeFilterWritable.close(); + } + } + + public RuntimeFilterWritable fetchLatestDuplicatedAggregatedOne() { + try { + aggregatedRFLock.lock(); + return aggregated.duplicate(bufferAllocator); + } finally { + aggregatedRFLock.unlock(); + } + } + + /** + * whether there's a fresh aggregated RuntimeFilter + * + * @return + */ + public boolean hasFreshOne() { + if (currentBookId.get() > staleBookId) { + staleBookId = currentBookId.get(); + return true; + } + return false; + } + + /** + * whether there's a usable RuntimeFilter. + * + * @return + */ + public boolean containOne() { + return aggregated != null; + } + + private void doCleanup() { + running.compareAndSet(true, false); + try { + aggregatedRFLock.lock(); + if (containOne()) { + aggregated.close(); + aggregated = null; + } + } finally { + aggregatedRFLock.unlock(); + } + } + + @Override + public void close() throws Exception { + future.cancel(true); + doCleanup(); + } + + private void clearQueued(boolean setToNull) { + RuntimeFilterWritable toClear; + try { + queueLock.lock(); + while (rfQueue != null && (toClear = rfQueue.poll()) != null) { + toClear.close(); + } + rfQueue = (setToNull) ? null : rfQueue; + } finally { + queueLock.unlock(); + } + } + + private class AsyncAggregateWorker implements Runnable { + + @Override + public void run() { + try { + RuntimeFilterWritable toAggregate = null; + while (running.get()) { + try { + queueLock.lock(); + toAggregate = (rfQueue != null) ? rfQueue.poll() : null; + if (toAggregate == null) { + notEmpty.await(); + continue; + } + } finally { + queueLock.unlock(); + } + + try { + aggregatedRFLock.lock(); + if (containOne()) { + aggregated.aggregate(toAggregate); + + // Release the byteBuf referenced by toAggregate since aggregate will not do it + toAggregate.close(); + } else { + aggregated = toAggregate; + } + } finally { + aggregatedRFLock.unlock(); + } + currentBookId.incrementAndGet(); + } + } catch (InterruptedException e) { + logger.info("RFAggregating Thread : {} was interrupted.", Thread.currentThread().getName()); + Thread.currentThread().interrupt(); + } finally { + doCleanup(); + clearQueued(true); + } + } + } +} + + diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java index 8649e15af1e..9a971e94cbc 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java @@ -20,6 +20,7 @@ import io.netty.buffer.DrillBuf; import org.apache.drill.common.AutoCloseables; +import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.proto.BitData; import java.util.ArrayList; @@ -29,15 +30,20 @@ * A binary wire transferable representation of the RuntimeFilter which contains * the runtime filter definition and its corresponding data. */ -public class RuntimeFilterWritable implements AutoCloseables.Closeable { +public class RuntimeFilterWritable implements AutoCloseables.Closeable{ private BitData.RuntimeFilterBDef runtimeFilterBDef; private DrillBuf[] data; + private String identifier; + public RuntimeFilterWritable(BitData.RuntimeFilterBDef runtimeFilterBDef, DrillBuf... data) { this.runtimeFilterBDef = runtimeFilterBDef; this.data = data; + this.identifier = "majorFragmentId:" + runtimeFilterBDef.getMajorFragmentId() + + ",minorFragmentId:" + runtimeFilterBDef.getMinorFragmentId() + + ", srcOperatorId:" + runtimeFilterBDef.getHjOpId(); } @@ -81,11 +87,47 @@ public void aggregate(RuntimeFilterWritable runtimeFilterWritable) { } } + public RuntimeFilterWritable duplicate(BufferAllocator bufferAllocator) { + int len = data.length; + DrillBuf[] cloned = new DrillBuf[len]; + int i = 0; + for (DrillBuf src : data) { + int capacity = src.readableBytes(); + DrillBuf duplicateOne = bufferAllocator.buffer(capacity); + int readerIndex = src.readerIndex(); + duplicateOne.writeBytes(src); + src.readerIndex(readerIndex); + cloned[i] = duplicateOne; + i++; + } + return new RuntimeFilterWritable(runtimeFilterBDef, cloned); + } + + public String toString() { + return identifier; + } + + @Override + public boolean equals(Object other) { + if (other == null) { + return false; + } + if (other instanceof RuntimeFilterWritable) { + RuntimeFilterWritable otherRFW = (RuntimeFilterWritable) other; + return this.identifier.equals(otherRFW.identifier); + } + return false; + } + + @Override + public int hashCode() { + return identifier.hashCode(); + } + @Override public void close() { for (DrillBuf buf : data) { buf.release(); } } - } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java index 634e8328cf5..42b76f278c7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java @@ -17,6 +17,7 @@ */ package org.apache.drill.exec.work.foreman; +import org.apache.drill.exec.work.filter.RuntimeFilterRouter; import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; import org.apache.drill.shaded.guava.com.google.common.collect.Lists; import com.google.protobuf.InvalidProtocolBufferException; @@ -61,7 +62,6 @@ import org.apache.drill.exec.util.Pointer; import org.apache.drill.exec.work.QueryWorkUnit; import org.apache.drill.exec.work.WorkManager.WorkerBee; -import org.apache.drill.exec.work.filter.RuntimeFilterManager; import org.apache.drill.exec.work.foreman.rm.QueryQueue.QueueTimeoutException; import org.apache.drill.exec.work.foreman.rm.QueryQueue.QueryQueueException; import org.apache.drill.exec.work.foreman.rm.QueryResourceManager; @@ -122,7 +122,7 @@ private String queryText; - private RuntimeFilterManager runtimeFilterManager; + private RuntimeFilterRouter runtimeFilterRouter; private boolean enableRuntimeFilter; /** @@ -410,8 +410,8 @@ private void runPhysicalPlan(final PhysicalPlan plan, Pointer<String> textPlan) queryRM.visitAbstractPlan(plan); final QueryWorkUnit work = getQueryWorkUnit(plan); if (enableRuntimeFilter) { - runtimeFilterManager = new RuntimeFilterManager(work, drillbitContext); - runtimeFilterManager.collectRuntimeFilterParallelAndControlInfo(); + runtimeFilterRouter = new RuntimeFilterRouter(work, drillbitContext); + runtimeFilterRouter.collectRuntimeFilterParallelAndControlInfo(); } if (textPlan != null) { queryManager.setPlanText(textPlan.value); @@ -734,8 +734,8 @@ public void close() { logger.debug(queryIdString + ": cleaning up."); injector.injectPause(queryContext.getExecutionControls(), "foreman-cleanup", logger); - if (enableRuntimeFilter && runtimeFilterManager != null) { - runtimeFilterManager.waitForComplete(); + if (enableRuntimeFilter && runtimeFilterRouter != null) { + runtimeFilterRouter.waitForComplete(); } // remove the channel disconnected listener (doesn't throw) closeFuture.removeListener(closeListener); @@ -866,8 +866,8 @@ public void interrupted(final InterruptedException e) { } - public RuntimeFilterManager getRuntimeFilterManager() { - return runtimeFilterManager; + public RuntimeFilterRouter getRuntimeFilterRouter() { + return runtimeFilterRouter; } } diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java b/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java index f867015725c..a1e7d0d2a41 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java +++ b/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java @@ -17,6 +17,7 @@ */ package org.apache.drill.test; +import org.apache.drill.exec.work.filter.RuntimeFilterSink; import org.apache.drill.shaded.guava.com.google.common.base.Function; import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; import org.apache.drill.shaded.guava.com.google.common.collect.Lists; @@ -80,6 +81,7 @@ import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; /** * Test fixture for operator and (especially) "sub-operator" tests. @@ -180,6 +182,7 @@ public OperatorFixture build() { private ExecutorState executorState = new OperatorFixture.MockExecutorState(); private ExecutionControls controls; + private RuntimeFilterSink runtimeFilterSink; public MockFragmentContext(final DrillConfig config, final OptionManager options, @@ -195,6 +198,7 @@ public MockFragmentContext(final DrillConfig config, this.controls = new ExecutionControls(options); compiler = new CodeCompiler(config, options); bufferManager = new BufferManagerImpl(allocator); + this.runtimeFilterSink = new RuntimeFilterSink(allocator, Executors.newCachedThreadPool()); } private static FunctionImplementationRegistry newFunctionRegistry( @@ -315,13 +319,13 @@ public void close() { } @Override - public RuntimeFilterWritable getRuntimeFilter() { - return null; + public RuntimeFilterSink getRuntimeFilterSink() { + return runtimeFilterSink; } @Override - public void setRuntimeFilter(RuntimeFilterWritable runtimeFilter) { - + public void addRuntimeFilter(RuntimeFilterWritable runtimeFilter) { + runtimeFilterSink.aggregate(runtimeFilter); } @Override diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/PhysicalOpUnitTestBase.java b/exec/java-exec/src/test/java/org/apache/drill/test/PhysicalOpUnitTestBase.java index 1c4779c3e7f..300e88bf243 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/test/PhysicalOpUnitTestBase.java +++ b/exec/java-exec/src/test/java/org/apache/drill/test/PhysicalOpUnitTestBase.java @@ -38,6 +38,7 @@ import org.apache.drill.exec.store.dfs.DrillFileSystem; import org.apache.drill.exec.store.easy.json.JSONRecordReader; import org.apache.drill.exec.work.batch.IncomingBuffers; +import org.apache.drill.exec.work.filter.RuntimeFilterSink; import org.apache.drill.exec.work.filter.RuntimeFilterWritable; import org.apache.drill.common.config.DrillConfig; import org.apache.drill.common.exceptions.ExecutionSetupException; @@ -203,11 +204,12 @@ protected OperatorTestBuilder opTestBuilder() { * </p> */ protected static class MockExecutorFragmentContext extends OperatorFixture.MockFragmentContext implements ExecutorFragmentContext { - private RuntimeFilterWritable runtimeFilterWritable; + private RuntimeFilterSink runtimeFilterSink; public MockExecutorFragmentContext(final FragmentContext fragmentContext) { super(fragmentContext.getConfig(), fragmentContext.getOptions(), fragmentContext.getAllocator(), fragmentContext.getScanExecutor(), fragmentContext.getScanDecodeExecutor()); + this.runtimeFilterSink = new RuntimeFilterSink(fragmentContext.getAllocator(), Executors.newCachedThreadPool()); } @Override @@ -304,13 +306,13 @@ public boolean isUserAuthenticationEnabled() { } @Override - public void setRuntimeFilter(RuntimeFilterWritable runtimeFilter) { - this.runtimeFilterWritable = runtimeFilter; + public void addRuntimeFilter(RuntimeFilterWritable runtimeFilter) { + this.runtimeFilterSink.aggregate(runtimeFilter); } @Override - public RuntimeFilterWritable getRuntimeFilter() { - return runtimeFilterWritable; + public RuntimeFilterSink getRuntimeFilterSink() { + return runtimeFilterSink; } } diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/BitData.java b/protocol/src/main/java/org/apache/drill/exec/proto/BitData.java index 20abb3b62cb..d7921fc0f9f 100644 --- a/protocol/src/main/java/org/apache/drill/exec/proto/BitData.java +++ b/protocol/src/main/java/org/apache/drill/exec/proto/BitData.java @@ -2518,6 +2518,24 @@ public Builder clearIsLastBatch() { */ com.google.protobuf.ByteString getProbeFieldsBytes(int index); + + // optional int32 hj_op_id = 7; + /** + * <code>optional int32 hj_op_id = 7;</code> + * + * <pre> + * the operator id of the HashJoin which generates this RuntimeFilter + * </pre> + */ + boolean hasHjOpId(); + /** + * <code>optional int32 hj_op_id = 7;</code> + * + * <pre> + * the operator id of the HashJoin which generates this RuntimeFilter + * </pre> + */ + int getHjOpId(); } /** * Protobuf type {@code exec.bit.data.RuntimeFilterBDef} @@ -2627,6 +2645,11 @@ private RuntimeFilterBDef( probeFields_.add(input.readBytes()); break; } + case 56: { + bitField0_ |= 0x00000010; + hjOpId_ = input.readInt32(); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -2820,6 +2843,30 @@ public int getProbeFieldsCount() { return probeFields_.getByteString(index); } + // optional int32 hj_op_id = 7; + public static final int HJ_OP_ID_FIELD_NUMBER = 7; + private int hjOpId_; + /** + * <code>optional int32 hj_op_id = 7;</code> + * + * <pre> + * the operator id of the HashJoin which generates this RuntimeFilter + * </pre> + */ + public boolean hasHjOpId() { + return ((bitField0_ & 0x00000010) == 0x00000010); + } + /** + * <code>optional int32 hj_op_id = 7;</code> + * + * <pre> + * the operator id of the HashJoin which generates this RuntimeFilter + * </pre> + */ + public int getHjOpId() { + return hjOpId_; + } + private void initFields() { queryId_ = org.apache.drill.exec.proto.UserBitShared.QueryId.getDefaultInstance(); majorFragmentId_ = 0; @@ -2827,6 +2874,7 @@ private void initFields() { toForeman_ = false; bloomFilterSizeInBytes_ = java.util.Collections.emptyList(); probeFields_ = com.google.protobuf.LazyStringArrayList.EMPTY; + hjOpId_ = 0; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -2858,6 +2906,9 @@ public void writeTo(com.google.protobuf.CodedOutputStream output) for (int i = 0; i < probeFields_.size(); i++) { output.writeBytes(6, probeFields_.getByteString(i)); } + if (((bitField0_ & 0x00000010) == 0x00000010)) { + output.writeInt32(7, hjOpId_); + } getUnknownFields().writeTo(output); } @@ -2901,6 +2952,10 @@ public int getSerializedSize() { size += dataSize; size += 1 * getProbeFieldsList().size(); } + if (((bitField0_ & 0x00000010) == 0x00000010)) { + size += com.google.protobuf.CodedOutputStream + .computeInt32Size(7, hjOpId_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -3034,6 +3089,8 @@ public Builder clear() { bitField0_ = (bitField0_ & ~0x00000010); probeFields_ = com.google.protobuf.LazyStringArrayList.EMPTY; bitField0_ = (bitField0_ & ~0x00000020); + hjOpId_ = 0; + bitField0_ = (bitField0_ & ~0x00000040); return this; } @@ -3093,6 +3150,10 @@ public Builder clone() { bitField0_ = (bitField0_ & ~0x00000020); } result.probeFields_ = probeFields_; + if (((from_bitField0_ & 0x00000040) == 0x00000040)) { + to_bitField0_ |= 0x00000010; + } + result.hjOpId_ = hjOpId_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -3141,6 +3202,9 @@ public Builder mergeFrom(org.apache.drill.exec.proto.BitData.RuntimeFilterBDef o } onChanged(); } + if (other.hasHjOpId()) { + setHjOpId(other.getHjOpId()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -3595,6 +3659,55 @@ public Builder addProbeFieldsBytes( return this; } + // optional int32 hj_op_id = 7; + private int hjOpId_ ; + /** + * <code>optional int32 hj_op_id = 7;</code> + * + * <pre> + * the operator id of the HashJoin which generates this RuntimeFilter + * </pre> + */ + public boolean hasHjOpId() { + return ((bitField0_ & 0x00000040) == 0x00000040); + } + /** + * <code>optional int32 hj_op_id = 7;</code> + * + * <pre> + * the operator id of the HashJoin which generates this RuntimeFilter + * </pre> + */ + public int getHjOpId() { + return hjOpId_; + } + /** + * <code>optional int32 hj_op_id = 7;</code> + * + * <pre> + * the operator id of the HashJoin which generates this RuntimeFilter + * </pre> + */ + public Builder setHjOpId(int value) { + bitField0_ |= 0x00000040; + hjOpId_ = value; + onChanged(); + return this; + } + /** + * <code>optional int32 hj_op_id = 7;</code> + * + * <pre> + * the operator id of the HashJoin which generates this RuntimeFilter + * </pre> + */ + public Builder clearHjOpId() { + bitField0_ = (bitField0_ & ~0x00000040); + hjOpId_ = 0; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:exec.bit.data.RuntimeFilterBDef) } @@ -3648,16 +3761,16 @@ public Builder addProbeFieldsBytes( " \003(\005\022!\n\031sending_major_fragment_id\030\004 \001(\005\022" + "!\n\031sending_minor_fragment_id\030\005 \001(\005\022(\n\003de" + "f\030\006 \001(\0132\033.exec.shared.RecordBatchDef\022\023\n\013" + - "isLastBatch\030\007 \001(\010\"\277\001\n\021RuntimeFilterBDef\022" + + "isLastBatch\030\007 \001(\010\"\321\001\n\021RuntimeFilterBDef\022" + "&\n\010query_id\030\001 \001(\0132\024.exec.shared.QueryId\022" + "\031\n\021major_fragment_id\030\002 \001(\005\022\031\n\021minor_frag" + "ment_id\030\003 \001(\005\022\022\n\nto_foreman\030\004 \001(\010\022\"\n\032blo" + "om_filter_size_in_bytes\030\005 \003(\005\022\024\n\014probe_f" + - "ields\030\006 \003(\t*n\n\007RpcType\022\r\n\tHANDSHAKE\020\000\022\007\n" + - "\003ACK\020\001\022\013\n\007GOODBYE\020\002\022\024\n\020REQ_RECORD_BATCH\020", - "\003\022\020\n\014SASL_MESSAGE\020\004\022\026\n\022REQ_RUNTIME_FILTE" + - "R\020\005B(\n\033org.apache.drill.exec.protoB\007BitD" + - "ataH\001" + "ields\030\006 \003(\t\022\020\n\010hj_op_id\030\007 \001(\005*n\n\007RpcType" + + "\022\r\n\tHANDSHAKE\020\000\022\007\n\003ACK\020\001\022\013\n\007GOODBYE\020\002\022\024\n", + "\020REQ_RECORD_BATCH\020\003\022\020\n\014SASL_MESSAGE\020\004\022\026\n" + + "\022REQ_RUNTIME_FILTER\020\005B(\n\033org.apache.dril" + + "l.exec.protoB\007BitDataH\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -3687,7 +3800,7 @@ public Builder addProbeFieldsBytes( internal_static_exec_bit_data_RuntimeFilterBDef_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_exec_bit_data_RuntimeFilterBDef_descriptor, - new java.lang.String[] { "QueryId", "MajorFragmentId", "MinorFragmentId", "ToForeman", "BloomFilterSizeInBytes", "ProbeFields", }); + new java.lang.String[] { "QueryId", "MajorFragmentId", "MinorFragmentId", "ToForeman", "BloomFilterSizeInBytes", "ProbeFields", "HjOpId", }); return null; } }; diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/SchemaBitData.java b/protocol/src/main/java/org/apache/drill/exec/proto/SchemaBitData.java index 6aa54dd9551..3c88ffcedbd 100644 --- a/protocol/src/main/java/org/apache/drill/exec/proto/SchemaBitData.java +++ b/protocol/src/main/java/org/apache/drill/exec/proto/SchemaBitData.java @@ -441,6 +441,8 @@ public void writeTo(com.dyuproject.protostuff.Output output, org.apache.drill.ex output.writeInt32(5, bloomFilterSizeInBytes, true); for(String probeFields : message.getProbeFieldsList()) output.writeString(6, probeFields, true); + if(message.hasHjOpId()) + output.writeInt32(7, message.getHjOpId(), false); } public boolean isInitialized(org.apache.drill.exec.proto.BitData.RuntimeFilterBDef message) { @@ -499,6 +501,9 @@ public void mergeFrom(com.dyuproject.protostuff.Input input, org.apache.drill.ex case 6: builder.addProbeFields(input.readString()); break; + case 7: + builder.setHjOpId(input.readInt32()); + break; default: input.handleUnknownField(number, this); } @@ -545,6 +550,7 @@ public void writeTo(com.dyuproject.protostuff.Output output, org.apache.drill.ex case 4: return "toForeman"; case 5: return "bloomFilterSizeInBytes"; case 6: return "probeFields"; + case 7: return "hjOpId"; default: return null; } } @@ -562,6 +568,7 @@ public static int getFieldNumber(java.lang.String name) fieldMap.put("toForeman", 4); fieldMap.put("bloomFilterSizeInBytes", 5); fieldMap.put("probeFields", 6); + fieldMap.put("hjOpId", 7); } } diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/beans/RuntimeFilterBDef.java b/protocol/src/main/java/org/apache/drill/exec/proto/beans/RuntimeFilterBDef.java index 99f3c78db76..2d1c2a70253 100644 --- a/protocol/src/main/java/org/apache/drill/exec/proto/beans/RuntimeFilterBDef.java +++ b/protocol/src/main/java/org/apache/drill/exec/proto/beans/RuntimeFilterBDef.java @@ -55,6 +55,7 @@ public static RuntimeFilterBDef getDefaultInstance() private Boolean toForeman; private List<Integer> bloomFilterSizeInBytes; private List<String> probeFields; + private int hjOpId; public RuntimeFilterBDef() { @@ -141,6 +142,19 @@ public RuntimeFilterBDef setProbeFieldsList(List<String> probeFields) return this; } + // hjOpId + + public int getHjOpId() + { + return hjOpId; + } + + public RuntimeFilterBDef setHjOpId(int hjOpId) + { + this.hjOpId = hjOpId; + return this; + } + // java serialization public void readExternal(ObjectInput in) throws IOException @@ -218,6 +232,9 @@ public void mergeFrom(Input input, RuntimeFilterBDef message) throws IOException message.probeFields = new ArrayList<String>(); message.probeFields.add(input.readString()); break; + case 7: + message.hjOpId = input.readInt32(); + break; default: input.handleUnknownField(number, this); } @@ -257,6 +274,9 @@ public void writeTo(Output output, RuntimeFilterBDef message) throws IOException output.writeString(6, probeFields, true); } } + + if(message.hjOpId != 0) + output.writeInt32(7, message.hjOpId, false); } public String getFieldName(int number) @@ -269,6 +289,7 @@ public String getFieldName(int number) case 4: return "toForeman"; case 5: return "bloomFilterSizeInBytes"; case 6: return "probeFields"; + case 7: return "hjOpId"; default: return null; } } @@ -288,6 +309,7 @@ public int getFieldNumber(String name) __fieldMap.put("toForeman", 4); __fieldMap.put("bloomFilterSizeInBytes", 5); __fieldMap.put("probeFields", 6); + __fieldMap.put("hjOpId", 7); } } diff --git a/protocol/src/main/protobuf/BitData.proto b/protocol/src/main/protobuf/BitData.proto index 063efe45851..15c72308ee2 100644 --- a/protocol/src/main/protobuf/BitData.proto +++ b/protocol/src/main/protobuf/BitData.proto @@ -47,4 +47,5 @@ message RuntimeFilterBDef{ optional bool to_foreman = 4; // true means sending to foreman,false means sending to scan nodes repeated int32 bloom_filter_size_in_bytes = 5; repeated string probe_fields = 6; // probe fields with corresponding BloomFilters + optional int32 hj_op_id = 7; // the operator id of the HashJoin which generates this RuntimeFilter } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > JPPD:Move aggregating the BF from the Foreman to the RuntimeFilter > ------------------------------------------------------------------ > > Key: DRILL-6731 > URL: https://issues.apache.org/jira/browse/DRILL-6731 > Project: Apache Drill > Issue Type: Improvement > Components: Server > Affects Versions: 1.15.0 > Reporter: weijie.tong > Assignee: weijie.tong > Priority: Major > Labels: ready-to-commit > Fix For: 1.15.0 > > > This PR is to move the BloomFilter aggregating work from the foreman to > RuntimeFilter. Though this change, the RuntimeFilter can apply the incoming > BF as soon as possible. -- This message was sent by Atlassian JIRA (v7.6.3#76005)