Repository: incubator-impala Updated Branches: refs/heads/master 94ef5b19b -> faebfebdf
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/157c8005/fe/src/main/java/com/cloudera/impala/planner/KuduScanNode.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/planner/KuduScanNode.java b/fe/src/main/java/com/cloudera/impala/planner/KuduScanNode.java index c1330b2..4f654a9 100644 --- a/fe/src/main/java/com/cloudera/impala/planner/KuduScanNode.java +++ b/fe/src/main/java/com/cloudera/impala/planner/KuduScanNode.java @@ -17,33 +17,37 @@ package com.cloudera.impala.planner; -import java.math.BigDecimal; +import java.io.IOException; import java.util.List; import java.util.ListIterator; import java.util.Set; -import com.cloudera.impala.analysis.BinaryPredicate; -import com.cloudera.impala.analysis.BinaryPredicate.Operator; -import com.cloudera.impala.analysis.Expr; -import com.cloudera.impala.analysis.NumericLiteral; -import com.cloudera.impala.analysis.SlotRef; -import com.cloudera.impala.common.AnalysisException; -import com.cloudera.impala.common.ImpalaRuntimeException; -import com.google.common.base.Charsets; -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; -import org.kududb.client.KuduClient; -import org.kududb.client.LocatedTablet; +import org.apache.kudu.ColumnSchema; +import org.apache.kudu.Schema; +import org.apache.kudu.client.KuduClient; +import org.apache.kudu.client.KuduClient.KuduClientBuilder; +import org.apache.kudu.client.KuduPredicate; +import org.apache.kudu.client.KuduPredicate.ComparisonOp; +import org.apache.kudu.client.KuduScanToken; +import org.apache.kudu.client.KuduScanToken.KuduScanTokenBuilder; +import org.apache.kudu.client.LocatedTablet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.cloudera.impala.analysis.Analyzer; +import com.cloudera.impala.analysis.BinaryPredicate; +import com.cloudera.impala.analysis.BoolLiteral; +import com.cloudera.impala.analysis.Expr; +import com.cloudera.impala.analysis.LiteralExpr; +import com.cloudera.impala.analysis.NullLiteral; +import com.cloudera.impala.analysis.NumericLiteral; +import com.cloudera.impala.analysis.SlotDescriptor; +import com.cloudera.impala.analysis.SlotRef; +import com.cloudera.impala.analysis.StringLiteral; import com.cloudera.impala.analysis.TupleDescriptor; import com.cloudera.impala.catalog.KuduTable; -import com.cloudera.impala.common.InternalException; +import com.cloudera.impala.common.ImpalaRuntimeException; import com.cloudera.impala.thrift.TExplainLevel; -import com.cloudera.impala.thrift.TKuduKeyRange; import com.cloudera.impala.thrift.TKuduScanNode; import com.cloudera.impala.thrift.TNetworkAddress; import com.cloudera.impala.thrift.TPlanNode; @@ -51,20 +55,28 @@ import com.cloudera.impala.thrift.TPlanNodeType; import com.cloudera.impala.thrift.TScanRange; import com.cloudera.impala.thrift.TScanRangeLocation; import com.cloudera.impala.thrift.TScanRangeLocations; +import com.google.common.base.Charsets; +import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Sets; -import static org.kududb.client.KuduClient.*; - /** * Scan of a single Kudu table. - * Extracts predicates that can be pushed down to Kudu and sends them to the backend - * as part of a TKuduScanNode. - * Currently only binary predicates (<=, >=, ==) that have a constant expression on one - * side and a slot ref on the other can be evaluated by Kudu. + * + * Extracts predicates that can be pushed down to Kudu. Currently only binary predicates + * that have a constant expression on one side and a slot ref on the other can be + * evaluated by Kudu. + * + * Uses the Kudu ScanToken API to generate a set of Kudu "scan tokens" which are used for + * scheduling and initializing the scanners. Scan tokens are opaque objects that represent + * a scan for some Kudu data on a tablet (currently one token represents one tablet), and + * it contains the tablet locations and all information needed to produce a Kudu scanner, + * including the projected columns and predicates that are pushed down. + * + * After KUDU-1065 is resolved, Kudu will also prune the tablets that don't need to be + * scanned, and only the tokens for those tablets will be returned. */ public class KuduScanNode extends ScanNode { - private final static Logger LOG = LoggerFactory.getLogger(KuduScanNode.class); private final KuduTable kuduTable_; @@ -73,89 +85,123 @@ public class KuduScanNode extends ScanNode { // From analyzer.getHostIndex().getIndex(address) private final Set<Integer> hostIndexSet_ = Sets.newHashSet(); - // List of conjuncts that can be pushed down to Kudu - // TODO use the portion of this list that pertains to keys to do partition pruning. + // List of conjuncts that can be pushed down to Kudu, after they have been normalized + // by BinaryPredicate.normalizeSlotRefComparison(). Used for computing stats and + // explain strings. private final List<Expr> kuduConjuncts_ = Lists.newArrayList(); + // Exprs in kuduConjuncts_ converted to KuduPredicates. + private final List<KuduPredicate> kuduPredicates_ = Lists.newArrayList(); + public KuduScanNode(PlanNodeId id, TupleDescriptor desc) { super(id, desc, "SCAN KUDU"); kuduTable_ = (KuduTable) desc_.getTable(); } @Override - public void init(Analyzer analyzer) throws InternalException { + public void init(Analyzer analyzer) throws ImpalaRuntimeException { assignConjuncts(analyzer); analyzer.createEquivConjuncts(tupleIds_.get(0), conjuncts_); conjuncts_ = orderConjunctsByCost(conjuncts_); - // Extract predicates that can be evaluated by Kudu. - try { - kuduConjuncts_.addAll(extractKuduConjuncts(analyzer)); - // Mark these slots as materialized, otherwise the toThrift() of SlotRefs - // referencing them will fail. These slots will never be filled with data though as - // Kudu won't return these columns. - // TODO KUDU-935 Don't require that slots be materialized in order to serialize - // SlotRefs. - analyzer.materializeSlots(kuduConjuncts_); - } catch (AnalysisException e) { - throw new InternalException("Error while extracting Kudu conjuncts.", e); + try (KuduClient client = + new KuduClientBuilder(kuduTable_.getKuduMasterAddresses()).build()) { + org.apache.kudu.client.KuduTable rpcTable = + client.openTable(kuduTable_.getKuduTableName()); + validateSchema(rpcTable); + + // Extract predicates that can be evaluated by Kudu. + extractKuduConjuncts(analyzer, client, rpcTable); + + // Materialize the slots of the remaining conjuncts (i.e. those not pushed to Kudu) + analyzer.materializeSlots(conjuncts_); + + // Creates Kudu scan tokens and sets the scan range locations. + computeScanRangeLocations(analyzer, client, rpcTable); + } catch (Exception e) { + throw new ImpalaRuntimeException("Unable to initialize the Kudu scan node", e); } - computeScanRangeLocations(analyzer); - analyzer.materializeSlots(conjuncts_); computeMemLayout(analyzer); computeStats(analyzer); } /** - * Compute the scan range locations for the given table. Does not look at predicates. - * To get the locations, we look at the table and load its tablets, for each tablet - * we get the key-range and for each tablet we get the replicated hosts as well. + * Validate the columns Impala expects are actually in the Kudu table. + */ + private void validateSchema(org.apache.kudu.client.KuduTable rpcTable) + throws ImpalaRuntimeException { + Schema tableSchema = rpcTable.getSchema(); + for (SlotDescriptor desc: getTupleDesc().getSlots()) { + String colName = desc.getColumn().getName(); + try { + tableSchema.getColumn(colName); + } catch (Exception e) { + throw new ImpalaRuntimeException("Column '" + colName + "' not found in kudu " + + "table " + rpcTable.getName()); + } + } + } + + /** + * Compute the scan range locations for the given table using the scan tokens. */ - private void computeScanRangeLocations(Analyzer analyzer) { + private void computeScanRangeLocations(Analyzer analyzer, + KuduClient client, org.apache.kudu.client.KuduTable rpcTable) + throws ImpalaRuntimeException { scanRanges_ = Lists.newArrayList(); - try (KuduClient client = new KuduClientBuilder( - kuduTable_.getKuduMasterAddresses()).build()) { - org.kududb.client.KuduTable rpcTable = - client.openTable(kuduTable_.getKuduTableName()); - List<LocatedTablet> tabletLocations = - rpcTable.getTabletsLocations(KuduTable.KUDU_RPC_TIMEOUT_MS); - - for (LocatedTablet tablet : tabletLocations) { - List<TScanRangeLocation> locations = Lists.newArrayList(); - if (tablet.getReplicas().isEmpty()) { - throw new ImpalaRuntimeException(String.format( - "At least one tablet does not have any replicas. Tablet ID: %s", - new String(tablet.getTabletId(), Charsets.UTF_8))); - } - for (LocatedTablet.Replica replica : tablet.getReplicas()) { - TNetworkAddress address = new TNetworkAddress(replica.getRpcHost(), - replica.getRpcPort()); - // Use the network address to look up the host in the global list - Integer hostIndex = analyzer.getHostIndex().getIndex(address); - locations.add(new TScanRangeLocation(hostIndex)); - hostIndexSet_.add(hostIndex); - } - TScanRangeLocations locs = new TScanRangeLocations(); + List<KuduScanToken> scanTokens = createScanTokens(client, rpcTable); + for (KuduScanToken token: scanTokens) { + LocatedTablet tablet = token.getTablet(); + List<TScanRangeLocation> locations = Lists.newArrayList(); + if (tablet.getReplicas().isEmpty()) { + throw new ImpalaRuntimeException(String.format( + "At least one tablet does not have any replicas. Tablet ID: %s", + new String(tablet.getTabletId(), Charsets.UTF_8))); + } - // Now set the scan range of this tablet - TKuduKeyRange keyRange = new TKuduKeyRange(); - keyRange.setRange_start_key(tablet.getPartition().getPartitionKeyStart()); - keyRange.setRange_stop_key(tablet.getPartition().getPartitionKeyEnd()); - TScanRange scanRange = new TScanRange(); - scanRange.setKudu_key_range(keyRange); + for (LocatedTablet.Replica replica: tablet.getReplicas()) { + TNetworkAddress address = + new TNetworkAddress(replica.getRpcHost(), replica.getRpcPort()); + // Use the network address to look up the host in the global list + Integer hostIndex = analyzer.getHostIndex().getIndex(address); + locations.add(new TScanRangeLocation(hostIndex)); + hostIndexSet_.add(hostIndex); + } - // Set the scan range for this set of locations - locs.setScan_range(scanRange); - locs.locations = locations; - scanRanges_.add(locs); + TScanRange scanRange = new TScanRange(); + try { + scanRange.setKudu_scan_token(token.serialize()); + } catch (IOException e) { + throw new ImpalaRuntimeException("Unable to serialize Kudu scan token=" + + token.toString(), e); } - } catch (Exception e) { - throw new RuntimeException("Loading Kudu Table failed", e); + + TScanRangeLocations locs = new TScanRangeLocations(); + locs.setScan_range(scanRange); + locs.locations = locations; + scanRanges_.add(locs); } } + /** + * Returns KuduScanTokens for this scan given the projected columns and predicates that + * will be pushed to Kudu. + */ + private List<KuduScanToken> createScanTokens(KuduClient client, + org.apache.kudu.client.KuduTable rpcTable) { + List<String> projectedCols = Lists.newArrayList(); + for (SlotDescriptor desc: getTupleDesc().getSlots()) { + if (desc.isMaterialized()) projectedCols.add(desc.getColumn().getName()); + } + + KuduScanTokenBuilder tokenBuilder = client.newScanTokenBuilder(rpcTable); + tokenBuilder.setProjectedColumnNames(projectedCols); + for (KuduPredicate predicate: kuduPredicates_) tokenBuilder.addPredicate(predicate); + return tokenBuilder.build(); + } + @Override protected double computeSelectivity() { List<Expr> allConjuncts = Lists.newArrayList( @@ -208,121 +254,105 @@ public class KuduScanNode extends ScanNode { protected void toThrift(TPlanNode node) { node.node_type = TPlanNodeType.KUDU_SCAN_NODE; node.kudu_scan_node = new TKuduScanNode(desc_.getId().asInt()); - - // Thriftify the pushable predicates and set them on the scan node. - for (Expr predicate : kuduConjuncts_) { - node.kudu_scan_node.addToKudu_conjuncts(predicate.treeToThrift()); - } } /** * Extracts predicates from conjuncts_ that can be pushed down to Kudu. Currently only - * binary predicates (<=, >=, ==) that have a constant expression on one side and a slot - * ref on the other can be evaluated by Kudu. Only looks at comparisons of constants - * (i.e., the bounds of the result can be evaluated with Expr::GetValue(NULL)). + * binary predicates that have a constant expression on one side and a slot ref on the + * other can be evaluated by Kudu. Only looks at comparisons of constants (i.e., the + * bounds of the result can be evaluated with Expr::GetValue(NULL)). If a conjunct can + * be converted into this form, the normalized expr is added to kuduConjuncts_, a + * KuduPredicate is added to kuduPredicates_, and the original expr from conjuncts_ is + * removed. */ - private List<Expr> extractKuduConjuncts(Analyzer analyzer) - throws InternalException, AnalysisException { - ImmutableList.Builder<Expr> pushableConjunctsBuilder = ImmutableList.builder(); - ListIterator<Expr> i = conjuncts_.listIterator(); - while (i.hasNext()) { - Expr e = i.next(); - if (!(e instanceof BinaryPredicate)) continue; - BinaryPredicate comparisonPred = (BinaryPredicate) e; - // TODO KUDU-931 look into handling implicit/explicit casts on the SlotRef. - comparisonPred = BinaryPredicate.normalizeSlotRefComparison(comparisonPred, - analyzer); - if (comparisonPred == null) continue; - - // Needs to have a literal on the right. - if (!comparisonPred.getChild(1).isLiteral()) continue; - - comparisonPred = normalizeIntLiteralComparison(comparisonPred, analyzer); - - Operator op = comparisonPred.getOp(); - switch (comparisonPred.getOp()) { - case NE: continue; - case GT: continue; // TODO Exclusive predicates are not supported in Kudu yet. - case LT: continue; // TODO Exclusive predicates are not supported in Kudu yet. - case GE: // Fallthrough intended. - case LE: // Fallthrough intended. - case EQ: { - i.remove(); - pushableConjunctsBuilder.add(comparisonPred); - break; - } - default: - Preconditions.checkState(false, "Unexpected BinaryPredicate type: " - + op.getName()); - } + private void extractKuduConjuncts(Analyzer analyzer, + KuduClient client, org.apache.kudu.client.KuduTable rpcTable) { + ListIterator<Expr> it = conjuncts_.listIterator(); + while (it.hasNext()) { + if (tryConvertKuduPredicate(analyzer, rpcTable, it.next())) it.remove(); } - return pushableConjunctsBuilder.build(); } /** - * Hack to be able to push to Kudu Int GT/LT conjuncts by incrementing or decrementing - * the int literal and changing the operator to GE/LE. - * Expects the predicate to have been previously normalized. - * Returns the same BinaryPredicate if the transformation was not needed or possible or - * a new, analyzed predicate if it was. - * TODO Remove this when KUDU-1148 (inclusive predicate support in Kudu) gets done + * If 'expr' can be converted to a KuduPredicate, returns true and updates + * kuduPredicates_ and kuduConjuncts_. */ - private static BinaryPredicate normalizeIntLiteralComparison( - BinaryPredicate comparisonPred, Analyzer analyzer) { - Expr constantExpr = comparisonPred.getChild(1); - if (!(constantExpr instanceof NumericLiteral)) return comparisonPred; - NumericLiteral numLiteral = (NumericLiteral) constantExpr; - long intValue = numLiteral.getLongValue(); - if (comparisonPred.getOp() == Operator.GT) { - // Make sure we don't overflow the type, in which case the type would change and - // the slot would get an implicit cast meaning we wouldn't push it anyway. - switch (constantExpr.getType().getPrimitiveType()) { - case TINYINT: - if (intValue >= Byte.MAX_VALUE) return comparisonPred; - break; - case SMALLINT: - if (intValue >= Short.MAX_VALUE) return comparisonPred; - break; - case INT: - if (intValue >= Integer.MAX_VALUE) return comparisonPred; - break; - case BIGINT: - if (intValue >= Long.MAX_VALUE) return comparisonPred; - break; - default: return comparisonPred; + private boolean tryConvertKuduPredicate(Analyzer analyzer, + org.apache.kudu.client.KuduTable table, Expr expr) { + if (!(expr instanceof BinaryPredicate)) return false; + BinaryPredicate predicate = (BinaryPredicate) expr; + + // TODO KUDU-931 look into handling implicit/explicit casts on the SlotRef. + predicate = BinaryPredicate.normalizeSlotRefComparison(predicate, analyzer); + if (predicate == null) return false; + ComparisonOp op = getKuduOperator(((BinaryPredicate)predicate).getOp()); + if (op == null) return false; + + SlotRef ref = (SlotRef) predicate.getChild(0); + LiteralExpr literal = (LiteralExpr) predicate.getChild(1); + + // Cannot push prediates with null literal values (KUDU-1595). + if (literal instanceof NullLiteral) return false; + + String colName = ref.getDesc().getColumn().getName(); + ColumnSchema column = table.getSchema().getColumn(colName); + KuduPredicate kuduPredicate = null; + switch (literal.getType().getPrimitiveType()) { + case BOOLEAN: { + kuduPredicate = KuduPredicate.newComparisonPredicate(column, op, + ((BoolLiteral)literal).getValue()); + break; } - BigDecimal newValue = BigDecimal.valueOf(intValue + 1); - NumericLiteral newLiteral = new NumericLiteral(newValue); - comparisonPred = new BinaryPredicate(Operator.GE, comparisonPred.getChild(0), - newLiteral); - comparisonPred.analyzeNoThrow(analyzer); - return comparisonPred; - } - if (comparisonPred.getOp() == Operator.LT) { - // Make sure we don't underflow the type, in which case the type would change and - // the slot would get an implicit cast meaning we wouldn't push it anyway. - switch (constantExpr.getType().getPrimitiveType()) { - case TINYINT: - if (intValue <= Byte.MIN_VALUE) return comparisonPred; - break; - case SMALLINT: - if (intValue <= Short.MIN_VALUE) return comparisonPred; - break; - case INT: - if (intValue <= Integer.MIN_VALUE) return comparisonPred; - break; - case BIGINT: - if (intValue <= Long.MIN_VALUE) return comparisonPred; - break; - default: return comparisonPred; + case TINYINT: + case SMALLINT: + case INT: { + kuduPredicate = KuduPredicate.newComparisonPredicate(column, op, + ((NumericLiteral)literal).getIntValue()); + break; + } + case BIGINT: { + kuduPredicate = KuduPredicate.newComparisonPredicate(column, op, + ((NumericLiteral)literal).getLongValue()); + break; + } + case FLOAT: { + kuduPredicate = KuduPredicate.newComparisonPredicate(column, op, + (float)((NumericLiteral)literal).getDoubleValue()); + break; + } + case DOUBLE: { + kuduPredicate = KuduPredicate.newComparisonPredicate(column, op, + ((NumericLiteral)literal).getDoubleValue()); + break; } - BigDecimal newValue = BigDecimal.valueOf(intValue - 1); - NumericLiteral newLiteral = new NumericLiteral(newValue); - comparisonPred = new BinaryPredicate(Operator.LE, comparisonPred.getChild(0), - newLiteral); - comparisonPred.analyzeNoThrow(analyzer); - return comparisonPred; + case STRING: + case VARCHAR: + case CHAR: { + kuduPredicate = KuduPredicate.newComparisonPredicate(column, op, + ((StringLiteral)literal).getStringValue()); + break; + } + default: break; + } + if (kuduPredicate == null) return false; + + kuduConjuncts_.add(predicate); + kuduPredicates_.add(kuduPredicate); + return true; + } + + /** + * Returns a Kudu comparison operator for the BinaryPredicate operator, or null if + * the operation is not supported by Kudu. + */ + private static KuduPredicate.ComparisonOp getKuduOperator(BinaryPredicate.Operator op) { + switch (op) { + case GT: return ComparisonOp.GREATER; + case LT: return ComparisonOp.LESS; + case GE: return ComparisonOp.GREATER_EQUAL; + case LE: return ComparisonOp.LESS_EQUAL; + case EQ: return ComparisonOp.EQUAL; + default: return null; } - return comparisonPred; } } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/157c8005/fe/src/main/java/com/cloudera/impala/util/KuduUtil.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/util/KuduUtil.java b/fe/src/main/java/com/cloudera/impala/util/KuduUtil.java index 3a97c05..4f627d8 100644 --- a/fe/src/main/java/com/cloudera/impala/util/KuduUtil.java +++ b/fe/src/main/java/com/cloudera/impala/util/KuduUtil.java @@ -37,11 +37,11 @@ import com.google.common.collect.Maps; import com.google.common.collect.Sets; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Table; -import org.kududb.ColumnSchema; -import org.kududb.Schema; -import org.kududb.Type; -import org.kududb.client.KuduTable; -import org.kududb.client.PartialRow; +import org.apache.kudu.ColumnSchema; +import org.apache.kudu.Schema; +import org.apache.kudu.Type; +import org.apache.kudu.client.KuduTable; +import org.apache.kudu.client.PartialRow; import static com.cloudera.impala.catalog.Type.parseColumnType; import static java.lang.String.format; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/157c8005/fe/src/test/java/com/cloudera/impala/planner/KuduPlannerTest.java ---------------------------------------------------------------------- diff --git a/fe/src/test/java/com/cloudera/impala/planner/KuduPlannerTest.java b/fe/src/test/java/com/cloudera/impala/planner/KuduPlannerTest.java deleted file mode 100644 index fe15737..0000000 --- a/fe/src/test/java/com/cloudera/impala/planner/KuduPlannerTest.java +++ /dev/null @@ -1,52 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package com.cloudera.impala.planner; - -import com.cloudera.impala.testutil.TestUtils; -import com.cloudera.impala.thrift.TExplainLevel; -import com.cloudera.impala.thrift.TQueryOptions; -import org.junit.Before; -import org.junit.Test; - -/** - * This class contains all plan generation related tests with Kudu as a storage backend. - */ -public class KuduPlannerTest extends PlannerTestBase { - - @Before - public void checkKuduSupport() { TestUtils.assumeKuduIsSupported(); } - - @Test - public void testKudu() { runPlannerTestFile("kudu"); } - - @Test - public void testUpdate() { runPlannerTestFile("kudu-update"); } - - @Test - public void testDelete() { runPlannerTestFile("kudu-delete"); } - - @Test - public void testSelectivity() { - TQueryOptions options = defaultQueryOptions(); - options.setExplain_level(TExplainLevel.VERBOSE); - runPlannerTestFile("kudu-selectivity", options); - } - - @Test - public void testTpch() { runPlannerTestFile("tpch-kudu"); } -} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/157c8005/fe/src/test/java/com/cloudera/impala/planner/PlannerTest.java ---------------------------------------------------------------------- diff --git a/fe/src/test/java/com/cloudera/impala/planner/PlannerTest.java b/fe/src/test/java/com/cloudera/impala/planner/PlannerTest.java index 7472da0..a09f218 100644 --- a/fe/src/test/java/com/cloudera/impala/planner/PlannerTest.java +++ b/fe/src/test/java/com/cloudera/impala/planner/PlannerTest.java @@ -20,6 +20,7 @@ package com.cloudera.impala.planner; import org.junit.Test; import com.cloudera.impala.catalog.Db; +import com.cloudera.impala.thrift.TExplainLevel; import com.cloudera.impala.thrift.TQueryOptions; import com.cloudera.impala.thrift.TRuntimeFilterMode; @@ -244,4 +245,23 @@ public class PlannerTest extends PlannerTestBase { public void testConjunctOrdering() { runPlannerTestFile("conjunct-ordering"); } + + @Test + public void testKudu() { runPlannerTestFile("kudu"); } + + @Test + public void testKuduUpdate() { runPlannerTestFile("kudu-update"); } + + @Test + public void testKuduDelete() { runPlannerTestFile("kudu-delete"); } + + @Test + public void testKuduSelectivity() { + TQueryOptions options = defaultQueryOptions(); + options.setExplain_level(TExplainLevel.VERBOSE); + runPlannerTestFile("kudu-selectivity", options); + } + + @Test + public void testKuduTpch() { runPlannerTestFile("tpch-kudu"); } } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/157c8005/fe/src/test/java/com/cloudera/impala/planner/PlannerTestBase.java ---------------------------------------------------------------------- diff --git a/fe/src/test/java/com/cloudera/impala/planner/PlannerTestBase.java b/fe/src/test/java/com/cloudera/impala/planner/PlannerTestBase.java index 328d151..6db1293 100644 --- a/fe/src/test/java/com/cloudera/impala/planner/PlannerTestBase.java +++ b/fe/src/test/java/com/cloudera/impala/planner/PlannerTestBase.java @@ -33,6 +33,8 @@ import java.util.regex.Pattern; import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.hadoop.fs.Path; +import org.apache.kudu.client.KuduClient; +import org.apache.kudu.client.KuduScanToken; import org.junit.AfterClass; import org.junit.BeforeClass; import org.slf4j.Logger; @@ -58,7 +60,6 @@ import com.cloudera.impala.thrift.THdfsPartition; import com.cloudera.impala.thrift.THdfsPartitionLocation; import com.cloudera.impala.thrift.THdfsScanNode; import com.cloudera.impala.thrift.THdfsTable; -import com.cloudera.impala.thrift.TKuduKeyRange; import com.cloudera.impala.thrift.TLineageGraph; import com.cloudera.impala.thrift.TNetworkAddress; import com.cloudera.impala.thrift.TPlanFragment; @@ -82,6 +83,7 @@ public class PlannerTestBase extends FrontendTestBase { private final static boolean GENERATE_OUTPUT_FILE = true; private final String testDir_ = "functional-planner/queries/PlannerTest"; private final String outDir_ = "/tmp/PlannerTest/"; + private static KuduClient kuduClient_; // Map from plan ID (TPlanNodeId) to the plan node with that ID. private final Map<Integer, TPlanNode> planMap_ = Maps.newHashMap(); @@ -102,11 +104,20 @@ public class PlannerTestBase extends FrontendTestBase { updateReq.setHostnames(Sets.newHashSet("localhost")); updateReq.setNum_nodes(3); MembershipSnapshot.update(updateReq); + + if (RuntimeEnv.INSTANCE.isKuduSupported()) { + kuduClient_ = new KuduClient.KuduClientBuilder("127.0.0.1:7051").build(); + } } @AfterClass - public static void cleanUp() { + public static void cleanUp() throws Exception { RuntimeEnv.INSTANCE.reset(); + + if (kuduClient_ != null) { + kuduClient_.close(); + kuduClient_ = null; + } } /** @@ -274,20 +285,16 @@ public class PlannerTestBase extends FrontendTestBase { } } - if (locations.scan_range.isSetKudu_key_range()) { - TKuduKeyRange kr = locations.scan_range.getKudu_key_range(); - Integer hostIdx = locations.locations.get(0).host_idx; - TNetworkAddress networkAddress = execRequest.getHost_list().get(hostIdx); - result.append("KUDU KEYRANGE "); - // TODO Enable the lines below once we have better testing for - // non-local key-ranges - //result.append("host=" + networkAddress.hostname + ":" + - // networkAddress.port + " "); - result.append(Arrays.toString(kr.getRange_start_key())); - result.append(":"); - result.append(Arrays.toString(kr.getRange_stop_key())); + if (locations.scan_range.isSetKudu_scan_token()) { + Preconditions.checkNotNull(kuduClient_, + "Test should not be invoked on platforms that do not support Kudu."); + try { + result.append(KuduScanToken.stringifySerializedToken( + locations.scan_range.kudu_scan_token.array(), kuduClient_)); + } catch (IOException e) { + throw new IllegalStateException("Unable to parse Kudu scan token", e); + } } - result.append("\n"); } } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/157c8005/fe/src/test/java/com/cloudera/impala/util/KuduUtilTest.java ---------------------------------------------------------------------- diff --git a/fe/src/test/java/com/cloudera/impala/util/KuduUtilTest.java b/fe/src/test/java/com/cloudera/impala/util/KuduUtilTest.java index 9b2c884..ef5c817 100644 --- a/fe/src/test/java/com/cloudera/impala/util/KuduUtilTest.java +++ b/fe/src/test/java/com/cloudera/impala/util/KuduUtilTest.java @@ -22,11 +22,11 @@ import java.util.List; import com.cloudera.impala.common.ImpalaRuntimeException; import com.google.common.collect.ImmutableList; import org.junit.Test; -import org.kududb.ColumnSchema; -import org.kududb.ColumnSchema.ColumnSchemaBuilder; -import org.kududb.Schema; -import org.kududb.Type; -import org.kududb.client.PartialRow; +import org.apache.kudu.ColumnSchema; +import org.apache.kudu.ColumnSchema.ColumnSchemaBuilder; +import org.apache.kudu.Schema; +import org.apache.kudu.Type; +import org.apache.kudu.client.PartialRow; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/157c8005/testdata/workloads/functional-planner/queries/PlannerTest/kudu-selectivity.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/kudu-selectivity.test b/testdata/workloads/functional-planner/queries/PlannerTest/kudu-selectivity.test index ead71ef..73acf7f 100644 --- a/testdata/workloads/functional-planner/queries/PlannerTest/kudu-selectivity.test +++ b/testdata/workloads/functional-planner/queries/PlannerTest/kudu-selectivity.test @@ -45,7 +45,7 @@ select * from functional_kudu.zipcode_incomes where id > '1' and zip > '2' ---- PLAN F00:PLAN FRAGMENT [UNPARTITIONED] 00:SCAN KUDU [functional_kudu.zipcode_incomes] - predicates: zip > '2', id > '1' + kudu predicates: zip > '2', id > '1' hosts=3 per-host-mem=unavailable tuple-ids=0 row-size=124B cardinality=3317 ---- DISTRIBUTEDPLAN @@ -57,7 +57,7 @@ F01:PLAN FRAGMENT [UNPARTITIONED] F00:PLAN FRAGMENT [RANDOM] DATASTREAM SINK [FRAGMENT=F01, EXCHANGE=01, UNPARTITIONED] 00:SCAN KUDU [functional_kudu.zipcode_incomes] - predicates: zip > '2', id > '1' + kudu predicates: zip > '2', id > '1' hosts=3 per-host-mem=0B tuple-ids=0 row-size=124B cardinality=3317 ==== http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/157c8005/testdata/workloads/functional-planner/queries/PlannerTest/kudu-update.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/kudu-update.test b/testdata/workloads/functional-planner/queries/PlannerTest/kudu-update.test index 0c21b36..80ba800 100644 --- a/testdata/workloads/functional-planner/queries/PlannerTest/kudu-update.test +++ b/testdata/workloads/functional-planner/queries/PlannerTest/kudu-update.test @@ -4,13 +4,13 @@ UPDATE KUDU [functional_kudu.testtbl] | check keys exist: false | 00:SCAN KUDU [functional_kudu.testtbl] - kudu predicates: zip >= 94550 + kudu predicates: zip > 94549 ---- DISTRIBUTEDPLAN UPDATE KUDU [functional_kudu.testtbl] | check keys exist: false | 00:SCAN KUDU [functional_kudu.testtbl] - kudu predicates: zip >= 94550 + kudu predicates: zip > 94549 ==== # Predicate on key update functional_kudu.testtbl set name = 'peter' where zip > 94549 and id = 5 @@ -19,13 +19,13 @@ UPDATE KUDU [functional_kudu.testtbl] | check keys exist: false | 00:SCAN KUDU [functional_kudu.testtbl] - kudu predicates: zip >= 94550, id = 5 + kudu predicates: zip > 94549, id = 5 ---- DISTRIBUTEDPLAN UPDATE KUDU [functional_kudu.testtbl] | check keys exist: false | 00:SCAN KUDU [functional_kudu.testtbl] - kudu predicates: zip >= 94550, id = 5 + kudu predicates: zip > 94549, id = 5 ==== # Mixing predicate and value assignment update functional_kudu.testtbl set zip = 94546 where zip > 94549 @@ -34,13 +34,13 @@ UPDATE KUDU [functional_kudu.testtbl] | check keys exist: false | 00:SCAN KUDU [functional_kudu.testtbl] - kudu predicates: zip >= 94550 + kudu predicates: zip > 94549 ---- DISTRIBUTEDPLAN UPDATE KUDU [functional_kudu.testtbl] | check keys exist: false | 00:SCAN KUDU [functional_kudu.testtbl] - kudu predicates: zip >= 94550 + kudu predicates: zip > 94549 ==== update a set a.name = b.name http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/157c8005/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test b/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test index 35abe35..2c8a415 100644 --- a/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test +++ b/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test @@ -3,9 +3,9 @@ select * from functional_kudu.testtbl 00:SCAN KUDU [functional_kudu.testtbl] ---- SCANRANGELOCATIONS NODE 0: - KUDU KEYRANGE [0, 0, 0, 1]:[0, 0, 0, 2] - KUDU KEYRANGE [0, 0, 0, 2]:[] - KUDU KEYRANGE []:[0, 0, 0, 1] + ScanToken{table=testtbl, hash-partition-buckets: [0], range-partition: [<start>, <end>)} + ScanToken{table=testtbl, hash-partition-buckets: [1], range-partition: [<start>, <end>)} + ScanToken{table=testtbl, hash-partition-buckets: [2], range-partition: [<start>, <end>)} ---- DISTRIBUTEDPLAN 01:EXCHANGE [UNPARTITIONED] | @@ -17,15 +17,20 @@ select * from functional_kudu.testtbl where name = '10' kudu predicates: name = '10' ---- SCANRANGELOCATIONS NODE 0: - KUDU KEYRANGE [0, 0, 0, 1]:[0, 0, 0, 2] - KUDU KEYRANGE [0, 0, 0, 2]:[] - KUDU KEYRANGE []:[0, 0, 0, 1] + ScanToken{table=testtbl, hash-partition-buckets: [0], range-partition: [<start>, <end>)} + ScanToken{table=testtbl, hash-partition-buckets: [1], range-partition: [<start>, <end>)} + ScanToken{table=testtbl, hash-partition-buckets: [2], range-partition: [<start>, <end>)} ---- DISTRIBUTEDPLAN 01:EXCHANGE [UNPARTITIONED] | 00:SCAN KUDU [functional_kudu.testtbl] kudu predicates: name = '10' ==== +select * from functional_kudu.testtbl where name = NULL +---- PLAN +00:SCAN KUDU [functional_kudu.testtbl] + predicates: name = NULL +==== insert into functional_kudu.testtbl(id) values (10) ---- PLAN INSERT INTO KUDU [functional_kudu.testtbl] @@ -91,17 +96,17 @@ where id >= 10 and zip <= 5 and 20 >= id and 'foo' = name and zip >= 0 and 30 >= and zip > 1 and zip < 50 ---- PLAN 00:SCAN KUDU [functional_kudu.testtbl] - kudu predicates: id >= 10, zip <= 5, id <= 20, zip >= 0, zip <= 30, zip >= 2, zip <= 49, name = 'foo' + kudu predicates: id >= 10, zip <= 5, id <= 20, zip >= 0, zip <= 30, zip > 1, zip < 50, name = 'foo' ---- SCANRANGELOCATIONS NODE 0: - KUDU KEYRANGE [0, 0, 0, 1]:[0, 0, 0, 2] - KUDU KEYRANGE [0, 0, 0, 2]:[] - KUDU KEYRANGE []:[0, 0, 0, 1] + ScanToken{table=testtbl, hash-partition-buckets: [0], range-partition: [<start>, <end>)} + ScanToken{table=testtbl, hash-partition-buckets: [1], range-partition: [<start>, <end>)} + ScanToken{table=testtbl, hash-partition-buckets: [2], range-partition: [<start>, <end>)} ---- DISTRIBUTEDPLAN 01:EXCHANGE [UNPARTITIONED] | 00:SCAN KUDU [functional_kudu.testtbl] - kudu predicates: id >= 10, zip <= 5, id <= 20, zip >= 0, zip <= 30, zip >= 2, zip <= 49, name = 'foo' + kudu predicates: id >= 10, zip <= 5, id <= 20, zip >= 0, zip <= 30, zip > 1, zip < 50, name = 'foo' ==== # Test constant folding. select * from functional_kudu.testtbl @@ -109,18 +114,18 @@ where id < 10 + 30 and cast(sin(id) as boolean) = true and 20 * 3 >= id and 10 ---- PLAN 00:SCAN KUDU [functional_kudu.testtbl] predicates: CAST(sin(id) AS BOOLEAN) = TRUE - kudu predicates: id <= 39, id <= 60, id <= 102 + kudu predicates: id < 40, id <= 60, id < 103 ---- SCANRANGELOCATIONS NODE 0: - KUDU KEYRANGE [0, 0, 0, 1]:[0, 0, 0, 2] - KUDU KEYRANGE [0, 0, 0, 2]:[] - KUDU KEYRANGE []:[0, 0, 0, 1] + ScanToken{table=testtbl, hash-partition-buckets: [0], range-partition: [<start>, <end>)} + ScanToken{table=testtbl, hash-partition-buckets: [1], range-partition: [<start>, <end>)} + ScanToken{table=testtbl, hash-partition-buckets: [2], range-partition: [<start>, <end>)} ---- DISTRIBUTEDPLAN 01:EXCHANGE [UNPARTITIONED] | 00:SCAN KUDU [functional_kudu.testtbl] predicates: CAST(sin(id) AS BOOLEAN) = TRUE - kudu predicates: id <= 39, id <= 60, id <= 102 + kudu predicates: id < 40, id <= 60, id < 103 ==== # Some predicates can be pushed down but others can't (predicate on an non-const value). select * from functional_kudu.testtbl @@ -131,9 +136,9 @@ where cast(sin(id) as boolean) = true and name = 'a' kudu predicates: name = 'a' ---- SCANRANGELOCATIONS NODE 0: - KUDU KEYRANGE [0, 0, 0, 1]:[0, 0, 0, 2] - KUDU KEYRANGE [0, 0, 0, 2]:[] - KUDU KEYRANGE []:[0, 0, 0, 1] + ScanToken{table=testtbl, hash-partition-buckets: [0], range-partition: [<start>, <end>)} + ScanToken{table=testtbl, hash-partition-buckets: [1], range-partition: [<start>, <end>)} + ScanToken{table=testtbl, hash-partition-buckets: [2], range-partition: [<start>, <end>)} ---- DISTRIBUTEDPLAN 01:EXCHANGE [UNPARTITIONED] | @@ -150,9 +155,9 @@ where cast(sin(id) as boolean) = true and name is null predicates: name IS NULL, CAST(sin(id) AS BOOLEAN) = TRUE ---- SCANRANGELOCATIONS NODE 0: - KUDU KEYRANGE [0, 0, 0, 1]:[0, 0, 0, 2] - KUDU KEYRANGE [0, 0, 0, 2]:[] - KUDU KEYRANGE []:[0, 0, 0, 1] + ScanToken{table=testtbl, hash-partition-buckets: [0], range-partition: [<start>, <end>)} + ScanToken{table=testtbl, hash-partition-buckets: [1], range-partition: [<start>, <end>)} + ScanToken{table=testtbl, hash-partition-buckets: [2], range-partition: [<start>, <end>)} ---- DISTRIBUTEDPLAN 01:EXCHANGE [UNPARTITIONED] | @@ -178,5 +183,5 @@ where t.c <= cast('1995-01-01 00:00:00' as timestamp) order by c | 00:SCAN KUDU [tpch_kudu.orders] predicates: CAST(o_orderdate AS TIMESTAMP) <= CAST('1995-01-01 00:00:00' AS TIMESTAMP) - kudu predicates: o_orderkey <= 9 + kudu predicates: o_orderkey < 10 ==== http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/157c8005/testdata/workloads/functional-planner/queries/PlannerTest/tpch-kudu.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/tpch-kudu.test b/testdata/workloads/functional-planner/queries/PlannerTest/tpch-kudu.test index 2dc3619..5659b4b 100644 --- a/testdata/workloads/functional-planner/queries/PlannerTest/tpch-kudu.test +++ b/testdata/workloads/functional-planner/queries/PlannerTest/tpch-kudu.test @@ -172,10 +172,10 @@ limit 10 | hash predicates: l_orderkey = o_orderkey | |--01:SCAN KUDU [tpch_kudu.orders] -| predicates: o_orderdate < '1995-03-15' +| kudu predicates: o_orderdate < '1995-03-15' | 02:SCAN KUDU [tpch_kudu.lineitem] - predicates: l_shipdate > '1995-03-15' + kudu predicates: l_shipdate > '1995-03-15' ==== # Q4 - Order Priority Checking Query select @@ -211,8 +211,7 @@ order by | hash predicates: l_orderkey = o_orderkey | |--00:SCAN KUDU [tpch_kudu.orders] -| predicates: o_orderdate < '1993-10-01' -| kudu predicates: o_orderdate >= '1993-07-01' +| kudu predicates: o_orderdate < '1993-10-01', o_orderdate >= '1993-07-01' | 01:SCAN KUDU [tpch_kudu.lineitem] predicates: l_commitdate < l_receiptdate @@ -275,8 +274,7 @@ order by | hash predicates: l_orderkey = o_orderkey | |--01:SCAN KUDU [tpch_kudu.orders] -| predicates: o_orderdate < '1995-01-01' -| kudu predicates: o_orderdate >= '1994-01-01' +| kudu predicates: o_orderdate >= '1994-01-01', o_orderdate < '1995-01-01' | 02:SCAN KUDU [tpch_kudu.lineitem] ==== @@ -295,8 +293,7 @@ where | output: sum(l_extendedprice * l_discount) | 00:SCAN KUDU [tpch_kudu.lineitem] - predicates: l_quantity < 24, l_shipdate < '1995-01-01' - kudu predicates: l_discount >= 0.05, l_discount <= 0.07, l_shipdate >= '1994-01-01' + kudu predicates: l_discount >= 0.05, l_discount <= 0.07, l_quantity < 24, l_shipdate >= '1994-01-01', l_shipdate < '1995-01-01' ==== # Q7 - Volume Shipping Query select @@ -582,8 +579,7 @@ limit 20 | | hash predicates: l_orderkey = o_orderkey | | | |--01:SCAN KUDU [tpch_kudu.orders] -| | predicates: o_orderdate < '1994-01-01' -| | kudu predicates: o_orderdate >= '1993-10-01' +| | kudu predicates: o_orderdate >= '1993-10-01', o_orderdate < '1994-01-01' | | | 02:SCAN KUDU [tpch_kudu.lineitem] | kudu predicates: l_returnflag = 'R' @@ -705,8 +701,8 @@ order by | hash predicates: o_orderkey = l_orderkey | |--01:SCAN KUDU [tpch_kudu.lineitem] -| predicates: l_shipmode IN ('MAIL', 'SHIP'), l_commitdate < l_receiptdate, l_shipdate < l_commitdate, l_receiptdate < '1995-01-01' -| kudu predicates: l_receiptdate >= '1994-01-01' +| predicates: l_shipmode IN ('MAIL', 'SHIP'), l_commitdate < l_receiptdate, l_shipdate < l_commitdate +| kudu predicates: l_receiptdate >= '1994-01-01', l_receiptdate < '1995-01-01' | 00:SCAN KUDU [tpch_kudu.orders] ==== @@ -775,8 +771,7 @@ where |--01:SCAN KUDU [tpch_kudu.part] | 00:SCAN KUDU [tpch_kudu.lineitem] - predicates: l_shipdate < '1995-10-01' - kudu predicates: l_shipdate >= '1995-09-01' + kudu predicates: l_shipdate >= '1995-09-01', l_shipdate < '1995-10-01' ==== # Q15 - Top Supplier Query with revenue_view as ( @@ -824,8 +819,7 @@ order by | | group by: l_suppkey | | | 03:SCAN KUDU [tpch_kudu.lineitem] -| predicates: l_shipdate < '1996-04-01' -| kudu predicates: l_shipdate >= '1996-01-01' +| kudu predicates: l_shipdate >= '1996-01-01', l_shipdate < '1996-04-01' | 06:HASH JOIN [INNER JOIN] | hash predicates: s_suppkey = l_suppkey @@ -835,8 +829,7 @@ order by | | group by: l_suppkey | | | 01:SCAN KUDU [tpch_kudu.lineitem] -| predicates: l_shipdate < '1996-04-01' -| kudu predicates: l_shipdate >= '1996-01-01' +| kudu predicates: l_shipdate >= '1996-01-01', l_shipdate < '1996-04-01' | 00:SCAN KUDU [tpch_kudu.supplier] ==== @@ -1118,8 +1111,7 @@ order by | group by: l_partkey, l_suppkey | 04:SCAN KUDU [tpch_kudu.lineitem] - predicates: l_shipdate < '1995-01-01' - kudu predicates: l_shipdate >= '1994-01-01' + kudu predicates: l_shipdate >= '1994-01-01', l_shipdate < '1995-01-01' ==== # Q21 - Suppliers Who Kept Orders Waiting Query select @@ -1256,7 +1248,8 @@ order by | | | output: avg(c_acctbal) | | | | | 01:SCAN KUDU [tpch_kudu.customer] -| | predicates: c_acctbal > 0.00, substr(c_phone, 1, 2) IN ('13', '31', '23', '29', '30', '18', '17') +| | predicates: substr(c_phone, 1, 2) IN ('13', '31', '23', '29', '30', '18', '17') +| | kudu predicates: c_acctbal > 0.00 | | | 00:SCAN KUDU [tpch_kudu.customer] | predicates: substr(c_phone, 1, 2) IN ('13', '31', '23', '29', '30', '18', '17')
