This is an automated email from the ASF dual-hosted git repository.
joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new d81c4db5e IMPALA-13185: Include runtime filter source in key
d81c4db5e is described below
commit d81c4db5e1e99865cf3a9a96cbc48af6436dc0c0
Author: Michael Smith <[email protected]>
AuthorDate: Tue Aug 27 11:32:40 2024 -0700
IMPALA-13185: Include runtime filter source in key
Incorporates the build-side PlanNode of a runtime filter in the tuple
cache key to avoid re-using intermediate results that were generated
using a runtime filter on the same target but different selection
criteria (build-side conjuncts).
We currently don't support caching ExchangeNode, but a common scenario
is a runtime filter produced by a HashJoin, with an Exchange on the
build side. Looks through the first ExchangeNode when considering the
cache key and eligibility for the build side source for a runtime
filter.
Testing shows all tests now passing for test_tuple_cache_tpc_queries
except those that hit "TupleCacheNode does not enforce limits itself and
cannot have a limit set."
Adds planner tests covering some scenarios where runtime filters are
expected to match or differ, and custom cluster tests for multi-node
testing.
Change-Id: I0077964be5acdb588d76251a6a39e57a0f42bb5a
Reviewed-on: http://gerrit.cloudera.org:8080/21729
Tested-by: Impala Public Jenkins <[email protected]>
Reviewed-by: Joe McDonnell <[email protected]>
---
.../org/apache/impala/planner/JoinBuildSink.java | 4 +-
.../java/org/apache/impala/planner/JoinNode.java | 5 +
.../java/org/apache/impala/planner/PlanNode.java | 42 ++++-
.../impala/planner/RuntimeFilterGenerator.java | 60 +++++--
.../org/apache/impala/planner/TupleCacheInfo.java | 76 ++++++++-
.../org/apache/impala/planner/TupleCacheTest.java | 64 ++++++++
tests/custom_cluster/test_tuple_cache.py | 177 ++++++++++++++++++++-
7 files changed, 395 insertions(+), 33 deletions(-)
diff --git a/fe/src/main/java/org/apache/impala/planner/JoinBuildSink.java
b/fe/src/main/java/org/apache/impala/planner/JoinBuildSink.java
index 8f6698119..2b13c1879 100644
--- a/fe/src/main/java/org/apache/impala/planner/JoinBuildSink.java
+++ b/fe/src/main/java/org/apache/impala/planner/JoinBuildSink.java
@@ -22,6 +22,7 @@ import java.util.List;
import org.apache.impala.analysis.BinaryPredicate;
import org.apache.impala.analysis.Expr;
+import org.apache.impala.common.ThriftSerializationCtx;
import org.apache.impala.planner.RuntimeFilterGenerator.RuntimeFilter;
import org.apache.impala.thrift.TDataSink;
import org.apache.impala.thrift.TDataSinkType;
@@ -90,7 +91,8 @@ public class JoinBuildSink extends DataSink {
tBuildSink.setHash_seed(joinNode_.getFragment().getHashSeed());
}
for (RuntimeFilter filter : runtimeFilters_) {
- tBuildSink.addToRuntime_filters(filter.toThrift());
+ tBuildSink.addToRuntime_filters(
+ filter.toThrift(new ThriftSerializationCtx(), null));
}
tBuildSink.setShare_build(joinNode_.canShareBuild());
tsink.setJoin_build_sink(tBuildSink);
diff --git a/fe/src/main/java/org/apache/impala/planner/JoinNode.java
b/fe/src/main/java/org/apache/impala/planner/JoinNode.java
index 43dc651f2..38831a42c 100644
--- a/fe/src/main/java/org/apache/impala/planner/JoinNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/JoinNode.java
@@ -1016,6 +1016,11 @@ public abstract class JoinNode extends PlanNode {
}
}
+ public PlanNode getBuildNode() {
+ Preconditions.checkState(getChildCount() == 2);
+ return getChild(1);
+ }
+
/**
* Helper method to compute the resource requirements for the join that can
be
* called from the builder or the join node. Returns a pair of the probe
diff --git a/fe/src/main/java/org/apache/impala/planner/PlanNode.java
b/fe/src/main/java/org/apache/impala/planner/PlanNode.java
index 16c253374..b28b9d085 100644
--- a/fe/src/main/java/org/apache/impala/planner/PlanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/PlanNode.java
@@ -546,7 +546,7 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
}
// Serialize any runtime filters
for (RuntimeFilter filter : runtimeFilters_) {
- msg.addToRuntime_filters(filter.toThrift());
+ msg.addToRuntime_filters(filter.toThrift(serialCtx, this));
}
msg.setDisable_codegen(disableCodegen_);
msg.pipelines = new ArrayList<>();
@@ -1324,15 +1324,25 @@ abstract public class PlanNode extends
TreeNode<PlanNode> {
* the key if the node is eligible to avoid overhead.
*/
public void computeTupleCacheInfo(DescriptorTable descTbl) {
+ if (tupleCacheInfo_ != null) {
+ // Already computed.
+ LOG.trace("Tuple cache found for {}", this);
+ return;
+ }
+
+ LOG.trace("Computing tuple cache info for {}", this);
tupleCacheInfo_ = new TupleCacheInfo(descTbl);
// computing the tuple cache information is a bottom-up tree traversal,
// so visit and merge the children before processing this node's contents
- for (int i = 0; i < getChildCount(); i++) {
- getChild(i).computeTupleCacheInfo(descTbl);
- tupleCacheInfo_.mergeChild(getChild(i).getTupleCacheInfo());
+ for (PlanNode child : getChildren()) {
+ child.computeTupleCacheInfo(descTbl);
+ if (!tupleCacheInfo_.mergeChild(child.getTupleCacheInfo())) {
+ LOG.trace("{} ineligible for caching due to {}", this, child);
+ }
}
if (!isTupleCachingImplemented()) {
+ LOG.trace("{} is ineligible for caching", this);
tupleCacheInfo_.setIneligible(TupleCacheInfo.IneligibilityReason.NOT_IMPLEMENTED);
}
@@ -1342,6 +1352,29 @@ abstract public class PlanNode extends
TreeNode<PlanNode> {
return;
}
+ // Include the build-side of a RuntimeFilter; look past the 1st
ExchangeNode.
+ // If the build-side is hashable, merge the hash. Otherwise mark this node
as
+ // ineligible because the RuntimeFilter is too complex to reason about.
+ for (RuntimeFilter filter : runtimeFilters_) {
+ // We want the build side of the join.
+ PlanNode build = filter.getSrc().getBuildNode();
+ Preconditions.checkState(!build.contains(this),
+ "Build-side contains current node, so cache info cannot be
initialized");
+
+ if (build instanceof ExchangeNode && build.getChildCount() == 1) {
+ // We only look past ExchangeNodes with 1 child. IcebergDeleteNode has
2.
+ build = build.getChild(0);
+ }
+
+ // Build may not have been visited yet.
+ build.computeTupleCacheInfo(descTbl);
+ if (!tupleCacheInfo_.mergeChildWithScans(build.getTupleCacheInfo())) {
+ LOG.trace("{} on {} ineligible for caching due to {}", filter, this,
build);
+ tupleCacheInfo_.finalizeHash();
+ return;
+ }
+ }
+
// Incorporate this node's information
// TODO: This will also calculate eligibility via initThrift/toThrift.
// TODO: This will adjust the output of initThrift/toThrift to mask out
items.
@@ -1351,6 +1384,7 @@ abstract public class PlanNode extends TreeNode<PlanNode>
{
toThrift(msg, serialCtx);
tupleCacheInfo_.hashThrift(msg);
tupleCacheInfo_.finalizeHash();
+ LOG.trace("Hash for {}: {}", this, tupleCacheInfo_.getHashTrace());
}
/**
diff --git
a/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java
b/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java
index d05fe9399..02a70dea4 100644
--- a/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java
+++ b/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java
@@ -58,6 +58,7 @@ import org.apache.impala.catalog.Type;
import org.apache.impala.common.AnalysisException;
import org.apache.impala.common.IdGenerator;
import org.apache.impala.common.InternalException;
+import org.apache.impala.common.ThriftSerializationCtx;
import org.apache.impala.planner.JoinNode.DistributionMode;
import org.apache.impala.planner.JoinNode.EqJoinConjunctScanSlots;
import org.apache.impala.service.BackendConfig;
@@ -285,14 +286,19 @@ public final class RuntimeFilterGenerator {
this.highValue = highValue;
}
- public TRuntimeFilterTargetDesc toThrift() {
+ public TRuntimeFilterTargetDesc toThrift(ThriftSerializationCtx
serialCtx) {
TRuntimeFilterTargetDesc tFilterTarget = new
TRuntimeFilterTargetDesc();
- tFilterTarget.setNode_id(node.getId().asInt());
- tFilterTarget.setTarget_expr(expr.treeToThrift());
+ if (serialCtx.isTupleCache()) {
+ // Target PlanNodeId is global and not useful for tuple caching.
+ tFilterTarget.setNode_id(0);
+ } else {
+ tFilterTarget.setNode_id(node.getId().asInt());
+ }
+ tFilterTarget.setTarget_expr(expr.treeToThrift(serialCtx));
List<SlotId> sids = new ArrayList<>();
expr.getIds(null, sids);
List<Integer> tSlotIds = Lists.newArrayListWithCapacity(sids.size());
- for (SlotId sid: sids) tSlotIds.add(sid.asInt());
+ for (SlotId sid: sids)
tSlotIds.add(serialCtx.translateSlotId(sid).asInt());
tFilterTarget.setTarget_expr_slotids(tSlotIds);
tFilterTarget.setIs_bound_by_partition_columns(isBoundByPartitionColumns);
tFilterTarget.setIs_local_target(isLocalTarget);
@@ -373,23 +379,45 @@ public final class RuntimeFilterGenerator {
/**
* Serializes a runtime filter to Thrift.
*/
- public TRuntimeFilterDesc toThrift() {
+ public TRuntimeFilterDesc toThrift(ThriftSerializationCtx serialCtx,
+ PlanNode cacheTarget) {
TRuntimeFilterDesc tFilter = new TRuntimeFilterDesc();
- tFilter.setFilter_id(id_.asInt());
- tFilter.setSrc_expr(srcExpr_.treeToThrift());
- tFilter.setSrc_node_id(src_.getId().asInt());
+ // Omit properties that don't affect tuple caching.
+ if (serialCtx.isTupleCache()) {
+ // Required property; RuntimeFilterId is irrelevant to tuple cache
results.
+ tFilter.setFilter_id(0);
+ // The target plan is already serialized as part of the calling
context. targets_
+ // may reference multiple target nodes; the others are irrelevent for
tuple
+ // caching.
+ tFilter.setTargets(new ArrayList<>());
+ // Target PlanNodeId is global and not useful for tuple caching.
+ tFilter.setPlanid_to_target_ndx(new HashMap<>());
+ // Incorporate the source plan TRuntimeFilterTargetDesc. targets_ may
include
+ // other targets that are not relevant to the PlanNode we're caching.
+ for (RuntimeFilterTarget target: targets_) {
+ if (target.node == cacheTarget) {
+ tFilter.addToTargets(target.toThrift(serialCtx));
+ break;
+ }
+ }
+ } else {
+ tFilter.setFilter_id(id_.asInt());
+ tFilter.setSrc_node_id(src_.getId().asInt());
+ tFilter.setNdv_estimate(ndvEstimate_);
+ for (int i = 0; i < targets_.size(); ++i) {
+ RuntimeFilterTarget target = targets_.get(i);
+ tFilter.addToTargets(target.toThrift(serialCtx));
+ tFilter.putToPlanid_to_target_ndx(target.node.getId().asInt(), i);
+ }
+ }
+ tFilter.setSrc_expr(srcExpr_.treeToThrift(serialCtx));
tFilter.setIs_broadcast_join(isBroadcastJoin_);
- tFilter.setNdv_estimate(ndvEstimate_);
tFilter.setHas_local_targets(hasLocalTargets_);
tFilter.setHas_remote_targets(hasRemoteTargets_);
tFilter.setCompareOp(exprCmpOp_.getThriftOp());
boolean appliedOnPartitionColumns = true;
- for (int i = 0; i < targets_.size(); ++i) {
- RuntimeFilterTarget target = targets_.get(i);
- tFilter.addToTargets(target.toThrift());
- tFilter.putToPlanid_to_target_ndx(target.node.getId().asInt(), i);
- appliedOnPartitionColumns =
- appliedOnPartitionColumns && target.isBoundByPartitionColumns;
+ for (RuntimeFilterTarget target: targets_) {
+ appliedOnPartitionColumns &= target.isBoundByPartitionColumns;
}
tFilter.setApplied_on_partition_columns(appliedOnPartitionColumns);
tFilter.setType(type_);
@@ -664,7 +692,7 @@ public final class RuntimeFilterGenerator {
public long getFilterSize() { return filterSizeBytes_; }
public boolean isTimestampTruncation() { return isTimestampTruncation_; }
public boolean isBroadcast() { return isBroadcastJoin_; }
- public PlanNode getSrc() { return src_; }
+ public JoinNode getSrc() { return src_; }
private long getBuildKeyNumRowStats() {
long minNumRows = src_.getChild(1).getCardinality();
diff --git a/fe/src/main/java/org/apache/impala/planner/TupleCacheInfo.java
b/fe/src/main/java/org/apache/impala/planner/TupleCacheInfo.java
index fa065fa29..5c8af2b1d 100644
--- a/fe/src/main/java/org/apache/impala/planner/TupleCacheInfo.java
+++ b/fe/src/main/java/org/apache/impala/planner/TupleCacheInfo.java
@@ -33,6 +33,10 @@ import org.apache.impala.catalog.FeTable;
import org.apache.impala.catalog.FeView;
import org.apache.impala.common.IdGenerator;
import org.apache.impala.common.ThriftSerializationCtx;
+import org.apache.impala.thrift.TFileSplitGeneratorSpec;
+import org.apache.impala.thrift.TScanRange;
+import org.apache.impala.thrift.TScanRangeLocationList;
+import org.apache.impala.thrift.TScanRangeSpec;
import org.apache.impala.thrift.TSlotDescriptor;
import org.apache.impala.thrift.TTableName;
import org.apache.impala.thrift.TTupleDescriptor;
@@ -176,13 +180,77 @@ public class TupleCacheInfo {
* Pull in a child's TupleCacheInfo into this TupleCacheInfo. If the child is
* ineligible, then this is marked ineligible and there is no need to
calculate
* a hash. If the child is eligible, it incorporates the child's hash into
this
- * hash.
+ * hash. Returns true if the child was merged, false if it was ineligible.
*/
- public void mergeChild(TupleCacheInfo child) {
+ public boolean mergeChild(TupleCacheInfo child) {
+ if (!mergeChildImpl(child)) {
+ return false;
+ }
+
+ // Merge the child's inputScanNodes_
+ inputScanNodes_.addAll(child.inputScanNodes_);
+ return true;
+ }
+
+ /**
+ * Pull in a child's TupleCacheInfo into this TupleCacheInfo. If the child is
+ * ineligible, then this is marked ineligible and there is no need to
calculate
+ * a hash. If the child is eligible, it incorporates the child's hash into
this
+ * hash. Returns true if the child was merged, false if it was ineligible.
+ *
+ * Resolves scan ranges statically for cases where results depend on all
scan ranges.
+ */
+ public boolean mergeChildWithScans(TupleCacheInfo child) {
+ if (!mergeChildImpl(child)) {
+ return false;
+ }
+
+ // Add all scan range specs to the hash. Copy only the relevant fields,
primarily:
+ // filename, mtime, size, and offset. Others like partition_id may change
after
+ // reloading metadata.
+ for (HdfsScanNode scanNode: child.inputScanNodes_) {
+ TScanRangeSpec orig = scanNode.getScanRangeSpecs();
+ TScanRangeSpec spec = new TScanRangeSpec();
+ if (orig.isSetConcrete_ranges()) {
+ for (TScanRangeLocationList origLocList: orig.concrete_ranges) {
+ // We only need the TScanRange, which provides the file segment info.
+ TScanRangeLocationList locList = new TScanRangeLocationList();
+ TScanRange scanRange = origLocList.scan_range.deepCopy();
+ if (scanRange.isSetHdfs_file_split()) {
+ // Zero out partition_id, it's not stable.
+ scanRange.hdfs_file_split.partition_id = 0;
+ }
+ locList.setScan_range(scanRange);
+ spec.addToConcrete_ranges(locList);
+ }
+ // Reloaded partitions may have a different order. Sort for stability.
+ spec.concrete_ranges.sort(null);
+ }
+ if (orig.isSetSplit_specs()) {
+ for (TFileSplitGeneratorSpec origSplitSpec: orig.split_specs) {
+ TFileSplitGeneratorSpec splitSpec = origSplitSpec.deepCopy();
+ // Zero out partition_id, it's not stable.
+ splitSpec.partition_id = 0;
+ spec.addToSplit_specs(splitSpec);
+ }
+ // Reloaded partitions may have a different order. Sort for stability.
+ spec.split_specs.sort(null);
+ }
+ hashThrift(spec);
+ }
+ return true;
+ }
+
+ /**
+ * Pull in a child's TupleCacheInfo that can be exhaustively determined
during planning.
+ * Public interfaces may add additional info that is more dynamic, such as
scan ranges.
+ */
+ private boolean mergeChildImpl(TupleCacheInfo child) {
Preconditions.checkState(!finalized_,
"TupleCacheInfo is finalized and can't be modified");
if (!child.isEligible()) {
ineligibilityReasons_.add(IneligibilityReason.CHILDREN_INELIGIBLE);
+ return false;
} else {
// The child is eligible, so incorporate its hash into our hasher.
hasher_.putBytes(child.getHashString().getBytes());
@@ -192,9 +260,6 @@ public class TupleCacheInfo {
// and each contribution would be clear.
hashTraceBuilder_.append(child.getHashTrace());
- // Merge the child's inputScanNodes_
- inputScanNodes_.addAll(child.inputScanNodes_);
-
// Incorporate the child's tuple references. This is creating a new
translation
// of TupleIds, because it will be incorporating multiple children.
for (TupleId id : child.tupleTranslationMap_.keySet()) {
@@ -203,6 +268,7 @@ public class TupleCacheInfo {
// id translation maps.
registerTupleHelper(id, false);
}
+ return true;
}
}
diff --git a/fe/src/test/java/org/apache/impala/planner/TupleCacheTest.java
b/fe/src/test/java/org/apache/impala/planner/TupleCacheTest.java
index 1bce808b6..c43bb18a9 100644
--- a/fe/src/test/java/org/apache/impala/planner/TupleCacheTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/TupleCacheTest.java
@@ -65,6 +65,47 @@ public class TupleCacheTest extends PlannerTestBase {
"select id from functional.alltypes where id = 2");
}
+ @Test
+ public void testRuntimeFilterCacheKeys() {
+ String basicJoinTmpl = "select straight_join probe.id from
functional.alltypes " +
+ "probe, functional.alltypestiny build where %s";
+ verifyIdenticalCacheKeys(
+ String.format(basicJoinTmpl, "probe.id = build.id"),
+ String.format(basicJoinTmpl, "probe.id = build.id"));
+ // Trivial rewrite produces same plan
+ verifyIdenticalCacheKeys(
+ String.format(basicJoinTmpl, "probe.id = build.id"),
+ String.format(basicJoinTmpl, "build.id = probe.id"));
+ // Larger join with same subquery. Cache keys match because cache is
disabled when
+ // the build side is too complex.
+ verifyIdenticalCacheKeys(
+ String.format(basicJoinTmpl, "probe.id = build.id"),
+ "select straight_join p.id from functional.alltypes p, (" +
+ String.format(basicJoinTmpl, "probe.id = build.id") + ") b where p.id
= b.id");
+ // Different filter slot
+ verifyOverlappingCacheKeys(
+ String.format(basicJoinTmpl, "probe.id = build.id"),
+ String.format(basicJoinTmpl, "probe.id + 1 = build.id"));
+ // Different target expression
+ verifyOverlappingCacheKeys(
+ String.format(basicJoinTmpl, "probe.id + 1 = build.id"),
+ String.format(basicJoinTmpl, "probe.id + 2 = build.id"));
+ // Larger join with similar subquery and simpler plan tree.
+ verifyOverlappingCacheKeys(
+ String.format(basicJoinTmpl, "probe.id = build.id"),
+ "select straight_join a.id from functional.alltypes a,
functional.alltypes b, " +
+ "functional.alltypestiny c where a.id = b.id and b.id = c.id");
+ // Different build-side source table
+ verifyDifferentCacheKeys(
+ String.format(basicJoinTmpl, "probe.id = build.id"),
+ "select straight_join a.id from functional.alltypes a,
functional.alltypes b " +
+ "where a.id = b.id");
+ // Different build-side predicates
+ verifyDifferentCacheKeys(
+ String.format(basicJoinTmpl, "probe.id = build.id"),
+ String.format(basicJoinTmpl, "probe.id = build.id and build.id <
100"));
+ }
+
/**
* Test cases that rely on masking out unnecessary data to have cache hits.
*/
@@ -87,6 +128,10 @@ public class TupleCacheTest extends PlannerTestBase {
verifyCacheIneligible("select id from functional_kudu.alltypes");
verifyCacheIneligible("select id from functional_hbase.alltypes");
+ // Runtime filter produced by Kudu table is not implemented
+ verifyCacheIneligible("select a.id from functional.alltypes a, " +
+ "functional_kudu.alltypes b where a.id = b.id");
+
// TODO: Random functions should make a location ineligible
// rand()/random()/uuid()
// verifyCacheIneligible(
@@ -149,6 +194,17 @@ public class TupleCacheTest extends PlannerTestBase {
"where probe1.id = probe2.id and probe2.id = build.id",
"select straight_join probe1.id from functional.alltypes probe1, " +
"functional.alltypes build where probe1.id = build.id");
+
+ // This query has a tuple cache location for the build-side scan that
feeds into the
+ // join on the build side. If we add a bunch of additional filters on the
probe side
+ // table, that changes the slot ids, but we should still get identical
keys for the
+ // build-side caching location.
+ verifyOverlappingCacheKeys(
+ "select straight_join probe.id from functional.alltypes probe, (select
" +
+ "build1.id from functional.alltypes build1) build where probe.id =
build.id",
+ "select straight_join probe.id from functional.alltypes probe, (select
" +
+ "build1.id from functional.alltypes build1) build where probe.id =
build.id " +
+ "and probe.bool_col = true and probe.int_col = 100");
}
@Test
@@ -282,6 +338,14 @@ public class TupleCacheTest extends PlannerTestBase {
printQueryCacheEligibleNodes(query2, cacheEligibleNodes2, errorLog);
fail(errorLog.toString());
}
+
+ if (cacheKeys1.equals(cacheKeys2) ||
cacheHashTraces1.equals(cacheHashTraces2)) {
+ StringBuilder errorLog = new StringBuilder();
+ errorLog.append("Expected some cache keys to differ. Instead found:\n");
+ printQueryCacheEligibleNodes(query1, cacheEligibleNodes1, errorLog);
+ printQueryCacheEligibleNodes(query2, cacheEligibleNodes2, errorLog);
+ fail(errorLog.toString());
+ }
}
protected void verifyDifferentCacheKeys(String query1, String query2) {
diff --git a/tests/custom_cluster/test_tuple_cache.py
b/tests/custom_cluster/test_tuple_cache.py
index af30201f7..6a40331ba 100644
--- a/tests/custom_cluster/test_tuple_cache.py
+++ b/tests/custom_cluster/test_tuple_cache.py
@@ -19,6 +19,7 @@ from __future__ import absolute_import, division,
print_function
import pytest
import random
+import re
import string
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
@@ -27,6 +28,11 @@ from tests.common.test_dimensions import (
TABLE_LAYOUT = 'name STRING, age INT, address STRING'
CACHE_START_ARGS = "--tuple_cache_dir=/tmp --log_level=2"
+NUM_HITS = 'NumTupleCacheHits'
+NUM_HALTED = 'NumTupleCacheHalted'
+NUM_SKIPPED = 'NumTupleCacheSkipped'
+# Indenation used for TUPLE_CACHE_NODE in specific fragments (not averaged
fragment).
+NODE_INDENT = ' - '
# Generates a random table entry of at least 15 bytes.
@@ -39,19 +45,40 @@ def table_value(seed):
return '"{0}", {1}, "{2}"'.format(name, age, address)
-def assertCounters(profile, num_hits, num_halted, num_skipped):
- assert "NumTupleCacheHits: {0} ".format(num_hits) in profile
- assert "NumTupleCacheHalted: {0} ".format(num_halted) in profile
- assert "NumTupleCacheSkipped: {0} ".format(num_skipped) in profile
+def assertCounter(profile, key, val, num_matches):
+ if not isinstance(num_matches, list):
+ num_matches = [num_matches]
+ assert profile.count("{0}{1}: {2} ".format(NODE_INDENT, key, val)) in
num_matches, \
+ re.findall(r"{0}{1}: .*".format(NODE_INDENT, key), profile)
+
+
+def assertCounters(profile, num_hits, num_halted, num_skipped, num_matches=1):
+ assertCounter(profile, NUM_HITS, num_hits, num_matches)
+ assertCounter(profile, NUM_HALTED, num_halted, num_matches)
+ assertCounter(profile, NUM_SKIPPED, num_skipped, num_matches)
def get_cache_keys(profile):
- cache_keys = []
+ cache_keys = {}
+ last_node_id = -1
+ matcher = re.compile(r'TUPLE_CACHE_NODE \(id=([0-9]*)\)')
for line in profile.splitlines():
if "Combined Key:" in line:
key = line.split(":")[1].strip()
- cache_keys.append(key)
- return cache_keys
+ cache_keys[last_node_id].append(key)
+ continue
+
+ match = matcher.search(line)
+ if match:
+ last_node_id = int(match.group(1))
+ if last_node_id not in cache_keys:
+ cache_keys[last_node_id] = []
+
+ # Sort cache keys: with multiple nodes, order in the profile may change.
+ for _, val in cache_keys.items():
+ val.sort()
+
+ return next(iter(cache_keys.values())) if len(cache_keys) == 1 else
cache_keys
def assert_deterministic_scan(vector, profile):
@@ -205,6 +232,142 @@ class TestTupleCache(TestTupleCacheBase):
assert hit_error
+ @CustomClusterTestSuite.with_args(start_args=CACHE_START_ARGS)
+ @pytest.mark.execute_serially
+ def test_runtime_filters(self, vector, unique_database):
+ """
+ This tests that adding files to a table results in different runtime
filter keys.
+ """
+ self.client.set_configuration(vector.get_value('exec_option'))
+ fq_table = "{0}.runtime_filters".format(unique_database)
+ # A query containing multiple runtime filters
+ # - scan of A receives runtime filters from B and C, so it depends on
contents of B/C
+ # - scan of B receives runtime filter from C, so it depends on contents of
C
+ query = "select straight_join a.id from functional.alltypes a,
functional.alltypes" \
+ " b, {0} c where a.id = b.id and a.id = c.age order by
a.id".format(fq_table)
+ query_a_id = 10
+ query_b_id = 11
+ query_c_id = 12
+
+ # Create an empty table
+ self.create_table(fq_table, scale=0)
+
+ # Establish a baseline
+ empty_result = self.execute_query(query)
+ empty_cache_keys = get_cache_keys(empty_result.runtime_profile)
+ # Tables a and b have multiple files, so they are distributed across all 3
nodes.
+ # Table c has one file, so it has a single entry.
+ assert len(empty_cache_keys) == 3
+ assert len(empty_cache_keys[query_c_id]) == 1
+ empty_c_compile_key, empty_c_finst_key =
empty_cache_keys[query_c_id][0].split("_")
+ assert empty_c_finst_key == "0"
+ assert len(empty_result.data) == 0
+
+ # Insert a row, which creates a file / scan range
+ self.execute_query("INSERT INTO {0} VALUES ({1})".format(fq_table,
table_value(0)))
+
+ # Now, there is a scan range, so the fragment instance key should be
non-zero.
+ one_file_result = self.execute_query(query)
+ one_cache_keys = get_cache_keys(one_file_result.runtime_profile)
+ assert len(one_cache_keys) == 3
+ assert len(empty_cache_keys[query_c_id]) == 1
+ one_c_compile_key, one_c_finst_key =
one_cache_keys[query_c_id][0].split("_")
+ assert one_c_finst_key != "0"
+ # This should be a cache miss
+ assertCounters(one_file_result.runtime_profile, 0, 0, 0, 7)
+ assert len(one_file_result.data) == 1
+
+ # The new scan range did not change the compile-time key, but did change
the runtime
+ # filter keys.
+ for id in [query_a_id, query_b_id]:
+ assert len(empty_cache_keys[id]) == len(one_cache_keys[id])
+ for empty, one in zip(empty_cache_keys[id], one_cache_keys[id]):
+ assert empty != one
+ assert empty_c_compile_key == one_c_compile_key
+
+ # Insert another row, which creates a file / scan range
+ self.execute_query("INSERT INTO {0} VALUES ({1})".format(fq_table,
table_value(1)))
+
+ # There is a second scan range, so the fragment instance key should change
again
+ two_files_result = self.execute_query(query)
+ two_cache_keys = get_cache_keys(two_files_result.runtime_profile)
+ assert len(two_cache_keys) == 3
+ assert len(two_cache_keys[query_c_id]) == 2
+ two_c1_compile_key, two_c1_finst_key =
two_cache_keys[query_c_id][0].split("_")
+ two_c2_compile_key, two_c2_finst_key =
two_cache_keys[query_c_id][1].split("_")
+ assert two_c1_finst_key != "0"
+ assert two_c2_finst_key != "0"
+ # There may be a cache hit for the prior "c" scan range (if scheduled to
the same
+ # instance), and the rest cache misses.
+ assertCounter(two_files_result.runtime_profile, NUM_HITS, 0,
num_matches=[7, 8])
+ assertCounter(two_files_result.runtime_profile, NUM_HITS, 1,
num_matches=[0, 1])
+ assertCounter(two_files_result.runtime_profile, NUM_HALTED, 0,
num_matches=8)
+ assertCounter(two_files_result.runtime_profile, NUM_SKIPPED, 0,
num_matches=8)
+ assert len(two_files_result.data) == 2
+ # Ordering can vary by environment. Ensure one matches and one differs.
+ assert one_c_finst_key == two_c1_finst_key or one_c_finst_key ==
two_c2_finst_key
+ assert one_c_finst_key != two_c1_finst_key or one_c_finst_key !=
two_c2_finst_key
+ overlapping_rows =
set(one_file_result.data).intersection(set(two_files_result.data))
+ assert len(overlapping_rows) == 1
+
+ # The new scan range did not change the compile-time key, but did change
the runtime
+ # filter keys.
+ for id in [query_a_id, query_b_id]:
+ assert len(empty_cache_keys[id]) == len(one_cache_keys[id])
+ for empty, one in zip(empty_cache_keys[id], one_cache_keys[id]):
+ assert empty != one
+ assert one_c_compile_key == two_c1_compile_key
+ assert one_c_compile_key == two_c2_compile_key
+
+ # Invalidate metadata and rerun the last query. The keys should stay the
same.
+ self.execute_query("invalidate metadata")
+ rerun_two_files_result = self.execute_query(query)
+ # Verify that this is a cache hit
+ assertCounters(rerun_two_files_result.runtime_profile, 1, 0, 0,
num_matches=8)
+ rerun_cache_keys = get_cache_keys(rerun_two_files_result.runtime_profile)
+ assert rerun_cache_keys == two_cache_keys
+ assert rerun_two_files_result.data == two_files_result.data
+
+ @CustomClusterTestSuite.with_args(start_args=CACHE_START_ARGS)
+ @pytest.mark.execute_serially
+ def test_runtime_filter_reload(self, vector, unique_database):
+ """
+ This tests that reloading files to a table results in matching runtime
filter keys.
+ """
+ self.client.set_configuration(vector.get_value('exec_option'))
+ fq_table = "{0}.runtime_filter_genspec".format(unique_database)
+ # Query where fq_table generates a runtime filter.
+ query = "select straight_join a.id from functional.alltypes a, {0} b " \
+ "where a.id = b.age order by a.id".format(fq_table)
+
+ # Create a partitioned table with 3 partitions
+ self.execute_query("CREATE EXTERNAL TABLE {0} (name STRING) "
+ "PARTITIONED BY (age INT)".format(fq_table))
+ self.execute_query(
+ "INSERT INTO {0} PARTITION(age=4) VALUES
(\"Vanessa\")".format(fq_table))
+ self.execute_query(
+ "INSERT INTO {0} PARTITION(age=5) VALUES (\"Carl\")".format(fq_table))
+ self.execute_query(
+ "INSERT INTO {0} PARTITION(age=6) VALUES
(\"Cleopatra\")".format(fq_table))
+
+ # Prime the cache
+ base_result = self.execute_query(query)
+ base_cache_keys = get_cache_keys(base_result.runtime_profile)
+ assert len(base_cache_keys) == 2
+
+ # Drop and reload the table
+ self.execute_query("DROP TABLE {0}".format(fq_table))
+ self.execute_query("CREATE EXTERNAL TABLE {0} (name STRING, address
STRING) "
+ "PARTITIONED BY (age INT)".format(fq_table))
+ self.execute_query("ALTER TABLE {0} RECOVER PARTITIONS".format(fq_table))
+
+ # Verify we reuse the cache
+ reload_result = self.execute_query(query)
+ reload_cache_keys = get_cache_keys(reload_result.runtime_profile)
+ assert base_result.data == reload_result.data
+ assert base_cache_keys == reload_cache_keys
+ # Skips verifying cache hits as fragments may not be assigned to the same
nodes.
+
class TestTupleCacheRuntimeKeysBasic(TestTupleCacheBase):
"""Simpler tests that run on a single node with mt_dop=0 or mt_dop=1."""