LENS-1397: Support query rewrite for separate table per update period in a storage
Project: http://git-wip-us.apache.org/repos/asf/lens/repo Commit: http://git-wip-us.apache.org/repos/asf/lens/commit/363f132d Tree: http://git-wip-us.apache.org/repos/asf/lens/tree/363f132d Diff: http://git-wip-us.apache.org/repos/asf/lens/diff/363f132d Branch: refs/heads/lens-1381 Commit: 363f132d140b5107bc2afbc3c75cb9b54bf64a65 Parents: 112af59 Author: Puneet Gupta <[email protected]> Authored: Wed Mar 29 15:21:26 2017 +0530 Committer: Rajat Khandelwal <[email protected]> Committed: Wed Mar 29 15:21:26 2017 +0530 ---------------------------------------------------------------------- .../lens/cube/metadata/CubeMetastoreClient.java | 30 +- .../apache/lens/cube/metadata/TimeRange.java | 2 - .../org/apache/lens/cube/parse/Candidate.java | 17 +- .../parse/CandidateCoveringSetsResolver.java | 3 - .../cube/parse/CandidateTablePruneCause.java | 4 +- .../lens/cube/parse/CandidateTableResolver.java | 1 - .../apache/lens/cube/parse/CandidateUtil.java | 39 +- .../lens/cube/parse/ColumnLifetimeChecker.java | 6 - .../lens/cube/parse/CubeQueryContext.java | 133 ++++-- .../lens/cube/parse/CubeSemanticAnalyzer.java | 1 - .../apache/lens/cube/parse/DefaultQueryAST.java | 2 + .../cube/parse/DenormalizationResolver.java | 3 +- .../lens/cube/parse/ExpressionResolver.java | 2 +- .../apache/lens/cube/parse/JoinCandidate.java | 6 + .../cube/parse/MaxCoveringFactResolver.java | 4 +- .../org/apache/lens/cube/parse/QueryAST.java | 4 + .../lens/cube/parse/StorageCandidate.java | 475 ++++++++++++++----- .../lens/cube/parse/StorageTableResolver.java | 52 +- .../apache/lens/cube/parse/UnionCandidate.java | 11 + .../lens/cube/parse/UnionQueryWriter.java | 9 +- .../lens/cube/parse/join/AutoJoinContext.java | 2 +- .../apache/lens/cube/metadata/DateFactory.java | 29 +- .../apache/lens/cube/parse/CubeTestSetup.java | 12 +- .../lens/cube/parse/TestBaseCubeQueries.java | 17 +- .../cube/parse/TestDenormalizationResolver.java | 16 +- .../lens/cube/parse/TestQueryMetrics.java | 7 +- .../lens/cube/parse/TestTimeRangeResolver.java | 6 +- .../parse/TestTimeRangeWriterWithQuery.java | 4 +- .../lens/cube/parse/TestUnionQueries.java | 44 ++ .../test/resources/schema/facts/testfact.xml | 39 ++ 30 files changed, 738 insertions(+), 242 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/lens/blob/363f132d/lens-cube/src/main/java/org/apache/lens/cube/metadata/CubeMetastoreClient.java ---------------------------------------------------------------------- diff --git a/lens-cube/src/main/java/org/apache/lens/cube/metadata/CubeMetastoreClient.java b/lens-cube/src/main/java/org/apache/lens/cube/metadata/CubeMetastoreClient.java index c8a2498..7608a43 100644 --- a/lens-cube/src/main/java/org/apache/lens/cube/metadata/CubeMetastoreClient.java +++ b/lens-cube/src/main/java/org/apache/lens/cube/metadata/CubeMetastoreClient.java @@ -2261,17 +2261,37 @@ public class CubeMetastoreClient { return dimTables; } - public boolean partColExists(String tableName, String partCol) throws LensException { - Table tbl = getTable(tableName); - for (FieldSchema f : tbl.getPartCols()) { - if (f.getName().equalsIgnoreCase(partCol)) { - return true; + public boolean partColExists(String fact, String storage, String partCol) throws LensException { + for (String storageTable : getStorageTables(fact, storage)) { + for (FieldSchema f : getTable(storageTable).getPartCols()) { + if (f.getName().equalsIgnoreCase(partCol)) { + return true; + } } } return false; } /** + * Returns storage table names for a storage. + * Note: If each update period in the storage has a different storage table, this method will return N Storage Tables + * where N is the number of update periods in the storage (LENS-1386) + * + * @param fact + * @param storage + * @return + * @throws LensException + */ + public Set<String> getStorageTables(String fact, String storage) throws LensException { + Set<String> uniqueStorageTables = new HashSet<>(); + for (UpdatePeriod updatePeriod : getFactTable(fact).getUpdatePeriods().get(storage)) { + uniqueStorageTables.add(getStorageTableName(fact, storage, updatePeriod)); + } + return uniqueStorageTables; + } + + + /** * * @param table table name * @param hiveTable hive table http://git-wip-us.apache.org/repos/asf/lens/blob/363f132d/lens-cube/src/main/java/org/apache/lens/cube/metadata/TimeRange.java ---------------------------------------------------------------------- diff --git a/lens-cube/src/main/java/org/apache/lens/cube/metadata/TimeRange.java b/lens-cube/src/main/java/org/apache/lens/cube/metadata/TimeRange.java index 5bdbf74..242d3ba 100644 --- a/lens-cube/src/main/java/org/apache/lens/cube/metadata/TimeRange.java +++ b/lens-cube/src/main/java/org/apache/lens/cube/metadata/TimeRange.java @@ -23,7 +23,6 @@ import static org.apache.lens.cube.metadata.DateUtil.ABSDATE_PARSER; import java.util.Calendar; import java.util.Date; import java.util.Set; -import java.util.TreeSet; import org.apache.lens.cube.error.LensCubeErrorCode; import org.apache.lens.server.api.error.LensException; @@ -33,7 +32,6 @@ import org.apache.hadoop.hive.ql.parse.ASTNode; import org.codehaus.jackson.annotate.JsonIgnoreProperties; -import lombok.Builder; import lombok.Data; import lombok.Getter; http://git-wip-us.apache.org/repos/asf/lens/blob/363f132d/lens-cube/src/main/java/org/apache/lens/cube/parse/Candidate.java ---------------------------------------------------------------------- diff --git a/lens-cube/src/main/java/org/apache/lens/cube/parse/Candidate.java b/lens-cube/src/main/java/org/apache/lens/cube/parse/Candidate.java index 095a297..f241cb3 100644 --- a/lens-cube/src/main/java/org/apache/lens/cube/parse/Candidate.java +++ b/lens-cube/src/main/java/org/apache/lens/cube/parse/Candidate.java @@ -83,6 +83,22 @@ public interface Candidate { Collection<Candidate> getChildren(); /** + * Is time range coverable based on start and end times configured in schema for the composing storage candidates + * and valid update periods. + * + * Note: This method is different from {@link #evaluateCompleteness(TimeRange, TimeRange, boolean)} . + * isTimeRangeCoverable checks the the possibility of covering time range from schema perspective by using valid + * storages/update periods while evaluateCompleteness checks if a time range can be covered based on + * registered partitions. So isTimeRangeCoverable = false implies evaluateCompleteness = false but vice versa is + * not true. + * + * @param timeRange + * @return + * @throws LensException + */ + boolean isTimeRangeCoverable(TimeRange timeRange) throws LensException; + + /** * Calculates if this candidate can answer the query for given time range based on actual data registered with * the underlying candidate storages. This method will also update any internal candidate data structures that are * required for writing the re-written query and to answer {@link #getParticipatingPartitions()}. @@ -120,5 +136,4 @@ public interface Candidate { * @return */ Set<Integer> getAnswerableMeasurePhraseIndices(); - } http://git-wip-us.apache.org/repos/asf/lens/blob/363f132d/lens-cube/src/main/java/org/apache/lens/cube/parse/CandidateCoveringSetsResolver.java ---------------------------------------------------------------------- diff --git a/lens-cube/src/main/java/org/apache/lens/cube/parse/CandidateCoveringSetsResolver.java b/lens-cube/src/main/java/org/apache/lens/cube/parse/CandidateCoveringSetsResolver.java index 0b7d400..0aafda6 100644 --- a/lens-cube/src/main/java/org/apache/lens/cube/parse/CandidateCoveringSetsResolver.java +++ b/lens-cube/src/main/java/org/apache/lens/cube/parse/CandidateCoveringSetsResolver.java @@ -23,12 +23,9 @@ import static org.apache.lens.cube.parse.CandidateUtil.getColumns; import java.util.*; import org.apache.lens.cube.error.NoCandidateFactAvailableException; - import org.apache.lens.cube.metadata.TimeRange; import org.apache.lens.server.api.error.LensException; -import org.apache.hadoop.conf.Configuration; - import lombok.extern.slf4j.Slf4j; @Slf4j http://git-wip-us.apache.org/repos/asf/lens/blob/363f132d/lens-cube/src/main/java/org/apache/lens/cube/parse/CandidateTablePruneCause.java ---------------------------------------------------------------------- diff --git a/lens-cube/src/main/java/org/apache/lens/cube/parse/CandidateTablePruneCause.java b/lens-cube/src/main/java/org/apache/lens/cube/parse/CandidateTablePruneCause.java index 6cb18e6..1de491c 100644 --- a/lens-cube/src/main/java/org/apache/lens/cube/parse/CandidateTablePruneCause.java +++ b/lens-cube/src/main/java/org/apache/lens/cube/parse/CandidateTablePruneCause.java @@ -101,7 +101,7 @@ public class CandidateTablePruneCause { // cube table has more partitions MORE_PARTITIONS("Picked table has more partitions than minimum"), // invalid cube table - INVALID("Invalid cube table provided in query"), + INVALID("Invalid cube table provided in query"), //TODO move up. This does not make sense here. // expression is not evaluable in the candidate EXPRESSION_NOT_EVALUABLE("%s expressions not evaluable") { Object[] getFormatPlaceholders(Set<CandidateTablePruneCause> causes) { @@ -210,7 +210,7 @@ public class CandidateTablePruneCause { INVALID, //this update period is greater than the Query max interval as provided by user with lens.cube.query.max.interval UPDATE_PERIOD_BIGGER_THAN_MAX, - QUERY_INTERVAL_SMALLER_THAN_UPDATE_PERIOD + TIME_RANGE_NOT_ANSWERABLE_BY_UPDATE_PERIOD } // Used for Test cases only. http://git-wip-us.apache.org/repos/asf/lens/blob/363f132d/lens-cube/src/main/java/org/apache/lens/cube/parse/CandidateTableResolver.java ---------------------------------------------------------------------- diff --git a/lens-cube/src/main/java/org/apache/lens/cube/parse/CandidateTableResolver.java b/lens-cube/src/main/java/org/apache/lens/cube/parse/CandidateTableResolver.java index 97a73a8..6d61f1f 100644 --- a/lens-cube/src/main/java/org/apache/lens/cube/parse/CandidateTableResolver.java +++ b/lens-cube/src/main/java/org/apache/lens/cube/parse/CandidateTableResolver.java @@ -30,7 +30,6 @@ import org.apache.lens.cube.parse.ExpressionResolver.ExpressionContext; import org.apache.lens.server.api.error.LensException; import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.conf.Configuration; import com.google.common.collect.Sets; import lombok.NonNull; http://git-wip-us.apache.org/repos/asf/lens/blob/363f132d/lens-cube/src/main/java/org/apache/lens/cube/parse/CandidateUtil.java ---------------------------------------------------------------------- diff --git a/lens-cube/src/main/java/org/apache/lens/cube/parse/CandidateUtil.java b/lens-cube/src/main/java/org/apache/lens/cube/parse/CandidateUtil.java index 68449f6..5db1344 100644 --- a/lens-cube/src/main/java/org/apache/lens/cube/parse/CandidateUtil.java +++ b/lens-cube/src/main/java/org/apache/lens/cube/parse/CandidateUtil.java @@ -58,10 +58,25 @@ public class CandidateUtil { return true; } + static boolean isCandidatePartiallyValidForTimeRange(Date candidateStartTime, Date candidateEndTime, + Date timeRangeStart, Date timeRangeEnd) { + Date start = candidateStartTime.after(timeRangeStart) ? candidateStartTime : timeRangeStart; + Date end = candidateEndTime.before(timeRangeEnd) ? candidateEndTime : timeRangeEnd; + if (end.after(start)) { + return true; + } + return false; + } + + + static boolean isPartiallyValidForTimeRange(Candidate cand, TimeRange timeRange) { + return isPartiallyValidForTimeRanges(cand, Arrays.asList(timeRange)); + } + static boolean isPartiallyValidForTimeRanges(Candidate cand, List<TimeRange> timeRanges) { return timeRanges.stream().anyMatch(timeRange -> - (cand.getStartTime().before(timeRange.getFromDate()) && cand.getEndTime().after(timeRange.getFromDate())) - || (cand.getStartTime().before(timeRange.getToDate()) && cand.getEndTime().after(timeRange.getToDate()))); + isCandidatePartiallyValidForTimeRange(cand.getStartTime(), cand.getEndTime(), + timeRange.getFromDate(), timeRange.getToDate())); } /** @@ -72,6 +87,7 @@ public class CandidateUtil { * @throws LensException */ static void copyASTs(QueryAST sourceAst, QueryAST targetAst) throws LensException { + targetAst.setSelectAST(MetastoreUtil.copyAST(sourceAst.getSelectAST())); targetAst.setWhereAST(MetastoreUtil.copyAST(sourceAst.getWhereAST())); if (sourceAst.getJoinAST() != null) { @@ -83,6 +99,13 @@ public class CandidateUtil { if (sourceAst.getHavingAST() != null) { targetAst.setHavingAST(MetastoreUtil.copyAST(sourceAst.getHavingAST())); } + if (sourceAst.getOrderByAST() != null) { + targetAst.setOrderByAST(MetastoreUtil.copyAST(sourceAst.getOrderByAST())); + } + + targetAst.setLimitValue(sourceAst.getLimitValue()); + targetAst.setFromString(sourceAst.getFromString()); + targetAst.setWhereString(sourceAst.getWhereString()); } public static Set<StorageCandidate> getStorageCandidates(final Candidate candidate) { @@ -194,6 +217,15 @@ public class CandidateUtil { return false; } + public static String getTimeRangeWhereClasue(TimeRangeWriter rangeWriter, StorageCandidate sc, TimeRange range) throws LensException { + String rangeWhere = rangeWriter.getTimeRangeWhereClause(sc.getCubeql(), sc.getCubeql().getAliasForTableName(sc.getCube().getName()), + sc.getRangeToPartitions().get(range)); + if(sc.getRangeToExtraWhereFallBack().containsKey(range)){ + rangeWhere = "((" + rangeWhere + ") and (" + sc.getRangeToExtraWhereFallBack().get(range) + "))"; + } + return rangeWhere; + } + public static class ChildrenSizeBasedCandidateComparator<T> implements Comparator<Candidate> { @Override public int compare(Candidate o1, Candidate o2) { @@ -274,4 +306,7 @@ public class CandidateUtil { } } + + + } http://git-wip-us.apache.org/repos/asf/lens/blob/363f132d/lens-cube/src/main/java/org/apache/lens/cube/parse/ColumnLifetimeChecker.java ---------------------------------------------------------------------- diff --git a/lens-cube/src/main/java/org/apache/lens/cube/parse/ColumnLifetimeChecker.java b/lens-cube/src/main/java/org/apache/lens/cube/parse/ColumnLifetimeChecker.java index 24eb8f0..c3d12a4 100644 --- a/lens-cube/src/main/java/org/apache/lens/cube/parse/ColumnLifetimeChecker.java +++ b/lens-cube/src/main/java/org/apache/lens/cube/parse/ColumnLifetimeChecker.java @@ -28,14 +28,8 @@ import org.apache.lens.cube.error.LensCubeErrorCode; import org.apache.lens.cube.metadata.*; import org.apache.lens.cube.metadata.join.JoinPath; import org.apache.lens.cube.parse.join.AutoJoinContext; -import org.apache.lens.server.api.LensConfConstants; import org.apache.lens.server.api.error.LensException; -import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.ql.parse.ASTNode; -import org.apache.hadoop.hive.ql.plan.PlanUtils; - import lombok.extern.slf4j.Slf4j; @Slf4j http://git-wip-us.apache.org/repos/asf/lens/blob/363f132d/lens-cube/src/main/java/org/apache/lens/cube/parse/CubeQueryContext.java ---------------------------------------------------------------------- diff --git a/lens-cube/src/main/java/org/apache/lens/cube/parse/CubeQueryContext.java b/lens-cube/src/main/java/org/apache/lens/cube/parse/CubeQueryContext.java index 76031ec..193bf44 100644 --- a/lens-cube/src/main/java/org/apache/lens/cube/parse/CubeQueryContext.java +++ b/lens-cube/src/main/java/org/apache/lens/cube/parse/CubeQueryContext.java @@ -21,9 +21,7 @@ package org.apache.lens.cube.parse; import static com.google.common.base.Preconditions.checkArgument; import static java.util.stream.Collectors.toSet; -import static org.apache.hadoop.hive.ql.parse.HiveParser.Identifier; -import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_TABLE_OR_COL; -import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_TMP_FILE; +import static org.apache.hadoop.hive.ql.parse.HiveParser.*; import static org.apache.lens.cube.parse.CubeQueryConfUtil.DEFAULT_REPLACE_TIMEDIM_WITH_PART_COL; import static org.apache.lens.cube.parse.CubeQueryConfUtil.DEFAULT_REWRITE_DIM_FILTER_TO_FACT_FILTER; import static org.apache.lens.cube.parse.CubeQueryConfUtil.NON_EXISTING_PARTITIONS; @@ -47,16 +45,7 @@ import java.util.function.Predicate; import org.apache.lens.cube.error.LensCubeErrorCode; import org.apache.lens.cube.error.NoCandidateDimAvailableException; import org.apache.lens.cube.error.NoCandidateFactAvailableException; -import org.apache.lens.cube.metadata.AbstractCubeTable; -import org.apache.lens.cube.metadata.Cube; -import org.apache.lens.cube.metadata.CubeDimensionTable; -import org.apache.lens.cube.metadata.CubeInterface; -import org.apache.lens.cube.metadata.CubeMetastoreClient; -import org.apache.lens.cube.metadata.DerivedCube; -import org.apache.lens.cube.metadata.Dimension; -import org.apache.lens.cube.metadata.JoinChain; -import org.apache.lens.cube.metadata.Named; -import org.apache.lens.cube.metadata.TimeRange; +import org.apache.lens.cube.metadata.*; import org.apache.lens.cube.metadata.join.TableRelationship; import org.apache.lens.cube.parse.join.AutoJoinContext; import org.apache.lens.cube.parse.join.JoinClause; @@ -68,6 +57,7 @@ import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.Context; +import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.parse.ASTNode; import org.apache.hadoop.hive.ql.parse.HiveParser; @@ -78,6 +68,7 @@ import org.apache.hadoop.hive.ql.parse.ParseUtils; import org.apache.hadoop.hive.ql.parse.QB; import org.apache.hadoop.hive.ql.parse.QBJoinTree; import org.apache.hadoop.hive.ql.parse.QBParseInfo; +import org.apache.hadoop.util.ReflectionUtils; import org.codehaus.jackson.map.ObjectMapper; @@ -213,8 +204,11 @@ public class CubeQueryContext extends TracksQueriedColumns implements QueryAST, @Getter private Map<Dimension, PruneCauses<CubeDimensionTable>> dimPruningMsgs = new HashMap<Dimension, PruneCauses<CubeDimensionTable>>(); + @Setter @Getter private String fromString; + @Getter + private TimeRangeWriter rangeWriter = null; public CubeQueryContext(ASTNode ast, QB qb, Configuration queryConf, HiveConf metastoreConf) throws LensException { this.ast = ast; @@ -242,8 +236,10 @@ public class CubeQueryContext extends TracksQueriedColumns implements QueryAST, if (qb.getParseInfo().getSelForClause(clauseName) != null) { this.selectAST = qb.getParseInfo().getSelForClause(clauseName); } - extractMetaTables(); + + this.rangeWriter = ReflectionUtils.newInstance(conf.getClass(CubeQueryConfUtil.TIME_RANGE_WRITER_CLASS, + CubeQueryConfUtil.DEFAULT_TIME_RANGE_WRITER, TimeRangeWriter.class), conf); } boolean hasCubeInQuery() { @@ -664,6 +660,11 @@ public class CubeQueryContext extends TracksQueriedColumns implements QueryAST, return HQLParser.getString(selectAST); } + + public void setWhereString(String whereString) { + //NO OP + } + public String getWhereString() { if (whereAST != null) { return HQLParser.getString(whereAST); @@ -883,23 +884,65 @@ public class CubeQueryContext extends TracksQueriedColumns implements QueryAST, if (sc != null) { // resolve timerange positions and replace it by corresponding where clause for (TimeRange range : getTimeRanges()) { - String rangeWhere = sc.getRangeToWhere().get(range); + String rangeWhere = CandidateUtil.getTimeRangeWhereClasue(rangeWriter, sc, range); if (!StringUtils.isBlank(rangeWhere)) { - ASTNode rangeAST = HQLParser.parseExpr(rangeWhere, conf); - range.getParent().setChild(range.getChildIndex(), rangeAST); + ASTNode updatedRangeAST = HQLParser.parseExpr(rangeWhere, conf); + updateTimeRangeNode(sc.getQueryAst().getWhereAST(), range.getAstNode(), updatedRangeAST); } - sc.getQueryAst().setWhereAST(HQLParser.parseExpr(getWhereString(), conf)); } } } + + /** + * Find the appropriate time range node in the AST and update it with "updatedTimeRange". + * Time Range node looks like this + * time_range_in(dt, '2017', '2018') -> + * TOK_FUNCTION [TOK_FUNCTION] (l5c2p37) { + * time_range_in [Identifier] (l6c1p37)$ + * TOK_TABLE_OR_COL [TOK_TABLE_OR_COL] (l6c2p51) { + * dt [Identifier] (l7c1p51)$ + * } + * '2017' [StringLiteral] (l6c3p55)$ + * '2018' [StringLiteral] (l6c4p63)$ + } + * @param root + * @param timeRangeFuncNode + * @param updatedTimeRange + */ + private void updateTimeRangeNode(ASTNode root, ASTNode timeRangeFuncNode, ASTNode updatedTimeRange) { + ASTNode childNode; + if (root.getChildCount() == 0) { + return; + } + for (Node child : root.getChildren()) { + childNode = (ASTNode) child; + if (childNode.getType() == timeRangeFuncNode.getType() + && childNode.getChildCount() == timeRangeFuncNode.getChildCount() + && childNode.getChild(0).getText().equalsIgnoreCase(timeRangeFuncNode.getChild(0).getText())) { + //Found the "time_range_in" function node. Check the details further as there can be more than one time ranges + if (HQLParser.getString(timeRangeFuncNode).equalsIgnoreCase(HQLParser.getString(childNode))) { + //This is the correct time range node . Replace it with "updatedTimeRange" + childNode.getParent().setChild(childNode.getChildIndex(), updatedTimeRange); + return; + } + } + updateTimeRangeNode(childNode, timeRangeFuncNode, updatedTimeRange); + } + } + + public String toHQL() throws LensException { Candidate cand = pickCandidateToQuery(); Map<Dimension, CandidateDim> dimsToQuery = pickCandidateDimsToQuery(dimensions); - Set<StorageCandidate> scSet = new HashSet<>(); + Collection<StorageCandidate> scSet = new HashSet<>(); if (cand != null) { scSet.addAll(CandidateUtil.getStorageCandidates(cand)); } + + //Expand and get update period specific storage candidates if required. + scSet = expandStorageCandidates(scSet); + log.info("Candidate: {}, DimsToQuery: {}", cand, dimsToQuery); if (autoJoinCtx != null) { // prune join paths for picked fact and dimensions @@ -924,9 +967,9 @@ public class CubeQueryContext extends TracksQueriedColumns implements QueryAST, Set<Dimension> exprDimensions = new HashSet<>(); if (!scSet.isEmpty()) { for (StorageCandidate sc : scSet) { - Set<Dimension> factExprDimTables = exprCtx.rewriteExprCtx(this, sc, dimsToQuery, sc.getQueryAst()); - exprDimensions.addAll(factExprDimTables); - factDimMap.get(sc).addAll(factExprDimTables); + Set<Dimension> scExprDimTables = exprCtx.rewriteExprCtx(this, sc, dimsToQuery, sc.getQueryAst()); + exprDimensions.addAll(scExprDimTables); + factDimMap.get(sc).addAll(scExprDimTables); } } else { // dim only query @@ -939,9 +982,9 @@ public class CubeQueryContext extends TracksQueriedColumns implements QueryAST, Set<Dimension> denormTables = new HashSet<>(); if (!scSet.isEmpty()) { for (StorageCandidate sc : scSet) { - Set<Dimension> factDenormTables = deNormCtx.rewriteDenormctx(this, sc, dimsToQuery, !scSet.isEmpty()); - denormTables.addAll(factDenormTables); - factDimMap.get(sc).addAll(factDenormTables); + Set<Dimension> scDenormTables = deNormCtx.rewriteDenormctx(this, sc, dimsToQuery, !scSet.isEmpty()); + denormTables.addAll(scDenormTables); + factDimMap.get(sc).addAll(scDenormTables); } } else { denormTables.addAll(deNormCtx.rewriteDenormctx(this, null, dimsToQuery, false)); @@ -958,9 +1001,9 @@ public class CubeQueryContext extends TracksQueriedColumns implements QueryAST, Set<Dimension> joiningTables = new HashSet<>(); if (scSet != null && scSet.size() > 1) { for (StorageCandidate sc : scSet) { - Set<Dimension> factJoiningTables = autoJoinCtx.pickOptionalTables(sc, factDimMap.get(sc), this); - factDimMap.get(sc).addAll(factJoiningTables); - joiningTables.addAll(factJoiningTables); + Set<Dimension> scJoiningTables = autoJoinCtx.pickOptionalTables(sc, factDimMap.get(sc), this); + factDimMap.get(sc).addAll(scJoiningTables); + joiningTables.addAll(scJoiningTables); } } else { joiningTables.addAll(autoJoinCtx.pickOptionalTables(null, dimsToQuery.keySet(), this)); @@ -970,6 +1013,8 @@ public class CubeQueryContext extends TracksQueriedColumns implements QueryAST, log.info("Picked StorageCandidates: {} DimsToQuery: {}", scSet, dimsToQuery); pickedDimTables = dimsToQuery.values(); pickedCandidate = cand; + + //Set From string and time range clause if (!scSet.isEmpty()) { for (StorageCandidate sc : scSet) { sc.updateFromString(this, factDimMap.get(sc), dimsToQuery); @@ -977,33 +1022,41 @@ public class CubeQueryContext extends TracksQueriedColumns implements QueryAST, } else { updateFromString(null, dimsToQuery); } - //update dim filter with fact filter + + //update dim filter with fact filter, set where string in sc if (scSet.size() > 0) { for (StorageCandidate sc : scSet) { - if (!sc.getStorageName().isEmpty()) { - String qualifiedStorageTable = sc.getStorageName(); - String storageTable = qualifiedStorageTable.substring(qualifiedStorageTable.indexOf(".") + 1); - String where = getWhere(sc, autoJoinCtx, - sc.getQueryAst().getWhereAST(), getAliasForTableName(sc.getBaseTable().getName()), - shouldReplaceDimFilterWithFactFilter(), storageTable, dimsToQuery); - sc.setWhereString(where); - } + String qualifiedStorageTable = sc.getStorageName(); + String storageTable = qualifiedStorageTable.substring(qualifiedStorageTable.indexOf(".") + 1); //TODO this looks useless + String where = getWhere(sc, autoJoinCtx, + sc.getQueryAst().getWhereAST(), getAliasForTableName(sc.getBaseTable().getName()), + shouldReplaceDimFilterWithFactFilter(), storageTable, dimsToQuery); + sc.setWhereString(where); } } if (cand == null) { hqlContext = new DimOnlyHQLContext(dimsToQuery, this, this); return hqlContext.toHQL(); - } else if (cand instanceof StorageCandidate) { - StorageCandidate sc = (StorageCandidate) cand; + } else if (scSet.size() == 1) { + StorageCandidate sc = (StorageCandidate) scSet.iterator().next(); sc.updateAnswerableSelectColumns(this); return getInsertClause() + sc.toHQL(factDimMap.get(sc)); } else { - UnionQueryWriter uqc = new UnionQueryWriter(cand, this); + UnionQueryWriter uqc = new UnionQueryWriter(scSet, this); return getInsertClause() + uqc.toHQL(factDimMap); } } + private Collection<StorageCandidate> expandStorageCandidates(Collection<StorageCandidate> scSet) + throws LensException { + Collection<StorageCandidate> expandedList = new ArrayList<StorageCandidate>(); + for (StorageCandidate sc : scSet) { + expandedList.addAll(sc.splitAtUpdatePeriodLevelIfReq()); + } + return expandedList; + } + public ASTNode toAST(Context ctx) throws LensException { String hql = toHQL(); ParseDriver pd = new ParseDriver(); http://git-wip-us.apache.org/repos/asf/lens/blob/363f132d/lens-cube/src/main/java/org/apache/lens/cube/parse/CubeSemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/lens-cube/src/main/java/org/apache/lens/cube/parse/CubeSemanticAnalyzer.java b/lens-cube/src/main/java/org/apache/lens/cube/parse/CubeSemanticAnalyzer.java index 0e2ca82..8214f65 100644 --- a/lens-cube/src/main/java/org/apache/lens/cube/parse/CubeSemanticAnalyzer.java +++ b/lens-cube/src/main/java/org/apache/lens/cube/parse/CubeSemanticAnalyzer.java @@ -22,7 +22,6 @@ package org.apache.lens.cube.parse; import java.util.ArrayList; import java.util.List; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.QueryState; import org.apache.hadoop.hive.ql.parse.*; http://git-wip-us.apache.org/repos/asf/lens/blob/363f132d/lens-cube/src/main/java/org/apache/lens/cube/parse/DefaultQueryAST.java ---------------------------------------------------------------------- diff --git a/lens-cube/src/main/java/org/apache/lens/cube/parse/DefaultQueryAST.java b/lens-cube/src/main/java/org/apache/lens/cube/parse/DefaultQueryAST.java index 17e202d..29da0a2 100644 --- a/lens-cube/src/main/java/org/apache/lens/cube/parse/DefaultQueryAST.java +++ b/lens-cube/src/main/java/org/apache/lens/cube/parse/DefaultQueryAST.java @@ -24,9 +24,11 @@ import org.apache.hadoop.hive.ql.parse.ASTNode; import lombok.AllArgsConstructor; import lombok.Data; +import lombok.NoArgsConstructor; @Data @AllArgsConstructor +@NoArgsConstructor public class DefaultQueryAST implements QueryAST { private ASTNode selectAST, whereAST, groupByAST, havingAST, joinAST, orderByAST; private Integer limitValue; http://git-wip-us.apache.org/repos/asf/lens/blob/363f132d/lens-cube/src/main/java/org/apache/lens/cube/parse/DenormalizationResolver.java ---------------------------------------------------------------------- diff --git a/lens-cube/src/main/java/org/apache/lens/cube/parse/DenormalizationResolver.java b/lens-cube/src/main/java/org/apache/lens/cube/parse/DenormalizationResolver.java index bcea7ed..76e5f23 100644 --- a/lens-cube/src/main/java/org/apache/lens/cube/parse/DenormalizationResolver.java +++ b/lens-cube/src/main/java/org/apache/lens/cube/parse/DenormalizationResolver.java @@ -20,6 +20,7 @@ package org.apache.lens.cube.parse; import static org.apache.hadoop.hive.ql.parse.HiveParser.Identifier; import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_TABLE_OR_COL; +import static org.apache.hadoop.hive.ql.parse.HiveParser_SelectClauseParser.TOK_FUNCTION; import static org.apache.lens.cube.parse.CandidateTablePruneCause.denormColumnNotFound; import java.util.*; @@ -276,7 +277,7 @@ public class DenormalizationResolver implements ContextRewriter { } resolveClause(ast.getGroupByAST()); resolveClause(ast.getHavingAST()); - resolveClause(cubeql.getOrderByAST()); + resolveClause(ast.getOrderByAST()); } private void resolveClause(ASTNode node) throws LensException { http://git-wip-us.apache.org/repos/asf/lens/blob/363f132d/lens-cube/src/main/java/org/apache/lens/cube/parse/ExpressionResolver.java ---------------------------------------------------------------------- diff --git a/lens-cube/src/main/java/org/apache/lens/cube/parse/ExpressionResolver.java b/lens-cube/src/main/java/org/apache/lens/cube/parse/ExpressionResolver.java index 1daeea5..97a9ef0 100644 --- a/lens-cube/src/main/java/org/apache/lens/cube/parse/ExpressionResolver.java +++ b/lens-cube/src/main/java/org/apache/lens/cube/parse/ExpressionResolver.java @@ -424,7 +424,7 @@ class ExpressionResolver implements ContextRewriter { // Having AST is resolved by each fact, so that all facts can expand their expressions. // Having ast is not copied now, it's maintained in cubeql, each fact processes that serially. replaceAST(cubeql, cubeql.getHavingAST()); - replaceAST(cubeql, cubeql.getOrderByAST()); + replaceAST(cubeql, queryAST.getOrderByAST()); } private void replaceAST(final CubeQueryContext cubeql, ASTNode node) throws LensException { http://git-wip-us.apache.org/repos/asf/lens/blob/363f132d/lens-cube/src/main/java/org/apache/lens/cube/parse/JoinCandidate.java ---------------------------------------------------------------------- diff --git a/lens-cube/src/main/java/org/apache/lens/cube/parse/JoinCandidate.java b/lens-cube/src/main/java/org/apache/lens/cube/parse/JoinCandidate.java index fa3ba8f..6334062 100644 --- a/lens-cube/src/main/java/org/apache/lens/cube/parse/JoinCandidate.java +++ b/lens-cube/src/main/java/org/apache/lens/cube/parse/JoinCandidate.java @@ -123,6 +123,12 @@ public class JoinCandidate implements Candidate { } @Override + public boolean isTimeRangeCoverable(TimeRange timeRange) throws LensException { + return this.childCandidate1.isTimeRangeCoverable(timeRange) + && this.childCandidate2.isTimeRangeCoverable(timeRange); + } + + @Override public String toString() { if (this.toStr == null) { this.toStr = getToString(); http://git-wip-us.apache.org/repos/asf/lens/blob/363f132d/lens-cube/src/main/java/org/apache/lens/cube/parse/MaxCoveringFactResolver.java ---------------------------------------------------------------------- diff --git a/lens-cube/src/main/java/org/apache/lens/cube/parse/MaxCoveringFactResolver.java b/lens-cube/src/main/java/org/apache/lens/cube/parse/MaxCoveringFactResolver.java index 4cae6f8..4f4e3ab 100644 --- a/lens-cube/src/main/java/org/apache/lens/cube/parse/MaxCoveringFactResolver.java +++ b/lens-cube/src/main/java/org/apache/lens/cube/parse/MaxCoveringFactResolver.java @@ -194,8 +194,8 @@ class MaxCoveringFactResolver implements ContextRewriter { } public String toString() { - return String.valueOf(days) + " days, " + hours + " hours, " + minutes + - " minutes, " + seconds + " seconds, " + milliseconds + " milliseconds."; + return String.valueOf(days) + " days, " + hours + " hours, " + minutes + + " minutes, " + seconds + " seconds, " + milliseconds + " milliseconds."; } } } http://git-wip-us.apache.org/repos/asf/lens/blob/363f132d/lens-cube/src/main/java/org/apache/lens/cube/parse/QueryAST.java ---------------------------------------------------------------------- diff --git a/lens-cube/src/main/java/org/apache/lens/cube/parse/QueryAST.java b/lens-cube/src/main/java/org/apache/lens/cube/parse/QueryAST.java index bdd6376..b94f131 100644 --- a/lens-cube/src/main/java/org/apache/lens/cube/parse/QueryAST.java +++ b/lens-cube/src/main/java/org/apache/lens/cube/parse/QueryAST.java @@ -85,4 +85,8 @@ public interface QueryAST { void setOrderByAST(ASTNode node); void setJoinAST(ASTNode node); + + void setFromString(String fromString); + void setWhereString(String whereString); + } http://git-wip-us.apache.org/repos/asf/lens/blob/363f132d/lens-cube/src/main/java/org/apache/lens/cube/parse/StorageCandidate.java ---------------------------------------------------------------------- diff --git a/lens-cube/src/main/java/org/apache/lens/cube/parse/StorageCandidate.java b/lens-cube/src/main/java/org/apache/lens/cube/parse/StorageCandidate.java index e6e9f8f..17f3af8 100644 --- a/lens-cube/src/main/java/org/apache/lens/cube/parse/StorageCandidate.java +++ b/lens-cube/src/main/java/org/apache/lens/cube/parse/StorageCandidate.java @@ -28,17 +28,8 @@ import static org.apache.lens.cube.parse.StorageUtil.processExpressionsForComple import java.text.DateFormat; import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.TimeZone; -import java.util.TreeSet; +import java.util.*; +import java.util.stream.Collectors; import org.apache.lens.cube.metadata.AbstractCubeTable; import org.apache.lens.cube.metadata.CubeFactTable; @@ -64,6 +55,7 @@ import org.apache.hadoop.util.ReflectionUtils; import org.antlr.runtime.CommonToken; +import com.google.common.collect.Lists; import com.google.common.collect.Sets; import lombok.Getter; import lombok.Setter; @@ -78,18 +70,36 @@ public class StorageCandidate implements Candidate, CandidateTable { // TODO union : Put comments on member variables. @Getter private final CubeQueryContext cubeql; - private final TimeRangeWriter rangeWriter; private final String processTimePartCol; private final CubeMetastoreClient client; private final String completenessPartCol; private final float completenessThreshold; + + /** + * Name of this storage candidate = storageName_factName + */ @Getter - private final String name; + @Setter + private String name; + /** - * Valid udpate periods populated by Phase 1. + * This is the storage table specific name. It is used while generating query from this candidate + */ + @Setter + private String resolvedName; + /** + * Valid update periods populated by Phase 1. */ @Getter private TreeSet<UpdatePeriod> validUpdatePeriods = new TreeSet<>(); + + /** + * These are the update periods that finally participate in partitions. + * @see #getParticipatingPartitions() + */ + @Getter + private TreeSet<UpdatePeriod> participatingUpdatePeriods = new TreeSet<>(); + @Getter @Setter Map<String, SkipUpdatePeriodCode> updatePeriodRejectionCause; @@ -113,12 +123,14 @@ public class StorageCandidate implements Candidate, CandidateTable { @Setter private QueryAST queryAst; @Getter - private Map<TimeRange, String> rangeToWhere = new LinkedHashMap<>(); + private Map<TimeRange, Set<FactPartition>> rangeToPartitions = new LinkedHashMap<>(); + @Getter + private Map<TimeRange, String> rangeToExtraWhereFallBack = new LinkedHashMap<>(); @Getter @Setter private String whereString; @Getter - private final Set<Integer> answerableMeasurePhraseIndices = Sets.newHashSet(); + private Set<Integer> answerableMeasurePhraseIndices = Sets.newHashSet(); @Getter @Setter private String fromString; @@ -136,11 +148,6 @@ public class StorageCandidate implements Candidate, CandidateTable { private Collection<String> factColumns; /** - * Partition calculated by getPartition() method. - */ - @Getter - private Set<FactPartition> participatingPartitions = new HashSet<>(); - /** * Non existing partitions */ @Getter @@ -148,6 +155,32 @@ public class StorageCandidate implements Candidate, CandidateTable { @Getter private int numQueriedParts = 0; + /** + * This will be true if this storage candidate has multiple storage tables (one per update period) + * https://issues.apache.org/jira/browse/LENS-1386 + */ + @Getter + private boolean isStorageTblsAtUpdatePeriodLevel; + + public StorageCandidate(StorageCandidate sc) throws LensException { + this(sc.getCube(), sc.getFact(), sc.getStorageName(), sc.getCubeql()); + this.validUpdatePeriods.addAll(sc.getValidUpdatePeriods()); + this.whereString = sc.whereString; + this.fromString = sc.fromString; + this.dimsToQuery = sc.dimsToQuery; + this.factColumns = sc.factColumns; + this.answerableMeasurePhraseIndices.addAll(sc.answerableMeasurePhraseIndices); + if (sc.getQueryAst() != null) { + this.queryAst = new DefaultQueryAST(); + CandidateUtil.copyASTs(sc.getQueryAst(), new DefaultQueryAST()); + } + for (Map.Entry<TimeRange, Set<FactPartition>> entry : sc.getRangeToPartitions().entrySet()) { + rangeToPartitions.put(entry.getKey(), new LinkedHashSet<>(entry.getValue())); + } + this.rangeToExtraWhereFallBack = sc.rangeToExtraWhereFallBack; + this.answerableMeasurePhraseIndices = sc.answerableMeasurePhraseIndices; + } + public StorageCandidate(CubeInterface cube, CubeFactTable fact, String storageName, CubeQueryContext cubeql) throws LensException { if ((cube == null) || (fact == null) || (storageName == null)) { @@ -159,9 +192,6 @@ public class StorageCandidate implements Candidate, CandidateTable { this.storageName = storageName; this.conf = cubeql.getConf(); this.name = MetastoreUtil.getFactOrDimtableStorageTableName(fact.getName(), storageName); - rangeWriter = ReflectionUtils.newInstance(conf - .getClass(CubeQueryConfUtil.TIME_RANGE_WRITER_CLASS, CubeQueryConfUtil.DEFAULT_TIME_RANGE_WRITER, - TimeRangeWriter.class), conf); this.processTimePartCol = conf.get(CubeQueryConfUtil.PROCESS_TIME_PART_COL); String formatStr = conf.get(CubeQueryConfUtil.PART_WHERE_CLAUSE_DATE_FORMAT); if (formatStr != null) { @@ -171,14 +201,77 @@ public class StorageCandidate implements Candidate, CandidateTable { completenessThreshold = conf .getFloat(CubeQueryConfUtil.COMPLETENESS_THRESHOLD, CubeQueryConfUtil.DEFAULT_COMPLETENESS_THRESHOLD); client = cubeql.getMetastoreClient(); - startTime = client.getStorageTableStartDate(name, fact.getName()); - endTime = client.getStorageTableEndDate(name, fact.getName()); + Set<String> storageTblNames = client.getStorageTables(fact.getName(), storageName); + if (storageTblNames.size() > 1) { + isStorageTblsAtUpdatePeriodLevel = true; + } else { + //if this.name is equal to the storage table name it implies isStorageTblsAtUpdatePeriodLevel is false + isStorageTblsAtUpdatePeriodLevel = !storageTblNames.iterator().next().equalsIgnoreCase(name); + } + setStorageStartAndEndDate(); } - public StorageCandidate(StorageCandidate sc) throws LensException { - this(sc.getCube(), sc.getFact(), sc.getStorageName(), sc.getCubeql()); - // Copy update periods. - this.validUpdatePeriods.addAll(sc.getValidUpdatePeriods()); + /** + * Sets Storage candidates start and end time based on underlying storage-tables + * + * CASE 1 + * If has Storage has single storage table* + * Storage start time = max(storage start time , fact start time) + * Storage end time = min(storage end time , fact start time) + * + * CASE 2 + * If the Storage has multiple Storage Tables (one per update period)* + * update Period start Time = Max(update start time, fact start time) + * update Period end Time = Min(update end time, fact end time) + * Stoarge start and end time is derived form the underlying update period start and end times. + * Storage start time = min(update1 start time ,...., updateN start time) + * Storage end time = max(update1 end time ,...., updateN end time) + * + * Note in Case 2 its assumed that the time range supported by different update periods are either + * overlapping(Example 2) or form a non overlapping but continuous chain(Example 1) as illustrated + * in examples below + * + * Example 1 + * A Storage has 2 Non Oevralpping but continuous Update Periods. + * MONTHLY with start time as now.month -13 months and end time as now.month -2months and + * DAILY with start time as now.day and end time as now.month -2months + * Then this Sorage will have an implied start time as now.month -13 month and end time as now.day + * + * Example 2 + * A Storage has 2 Overlapping Update Periods. + * MONTHLY with start time as now.month -13 months and end time as now.month -1months and + * DAILY with start time as now.day and end time as now.month -2months + * Then this Sorage will have an implied start time as now.month -13 month and end time as now.day + * + * @throws LensException + */ + public void setStorageStartAndEndDate() throws LensException { + if (this.startTime != null && !this.isStorageTblsAtUpdatePeriodLevel) { + //If the times are already set and are not dependent of update period, no point setting times again. + return; + } + List<Date> startDates = new ArrayList<>(); + List<Date> endDates = new ArrayList<>(); + for (String storageTablePrefix : getValidStorageTableNames()) { + startDates.add(client.getStorageTableStartDate(storageTablePrefix, fact.getName())); + endDates.add(client.getStorageTableEndDate(storageTablePrefix, fact.getName())); + } + this.startTime = Collections.min(startDates); + this.endTime = Collections.max(endDates); + } + + private Set<String> getValidStorageTableNames() throws LensException { + if (!validUpdatePeriods.isEmpty()) { + // In this case skip invalid update periods and get storage tables only for valid ones. + Set<String> uniqueStorageTables = new HashSet<>(); + for (UpdatePeriod updatePeriod : validUpdatePeriods) { + uniqueStorageTables.add(client.getStorageTableName(fact.getName(), storageName, updatePeriod)); + } + return uniqueStorageTables; + } else { + //Get all storage tables. + return client.getStorageTables(fact.getName(), storageName); + } } private void setMissingExpressions(Set<Dimension> queriedDims) throws LensException { @@ -322,8 +415,7 @@ public class StorageCandidate implements Candidate, CandidateTable { private void updatePartitionStorage(FactPartition part) throws LensException { try { - if (client.isStorageTablePartitionACandidate(name, part.getPartSpec()) && (client - .factPartitionExists(fact, part, name))) { + if (client.factPartitionExists(fact, part, name)) { part.getStorageTables().add(name); part.setFound(true); } @@ -360,58 +452,75 @@ public class StorageCandidate implements Candidate, CandidateTable { if (fromDate.equals(toDate) || fromDate.after(toDate)) { return true; } - UpdatePeriod interval = CubeFactTable.maxIntervalInRange(fromDate, toDate, updatePeriods); - if (interval == null) { + if (updatePeriods == null | updatePeriods.isEmpty()) { + return false; + } + + UpdatePeriod maxInterval = CubeFactTable.maxIntervalInRange(fromDate, toDate, updatePeriods); + if (maxInterval == null) { log.info("No max interval for range: {} to {}", fromDate, toDate); return false; } - if (interval == UpdatePeriod.CONTINUOUS && rangeWriter.getClass().equals(BetweenTimeRangeWriter.class)) { - FactPartition part = new FactPartition(partCol, fromDate, interval, null, partWhereClauseFormat); + if (maxInterval == UpdatePeriod.CONTINUOUS + && cubeql.getRangeWriter().getClass().equals(BetweenTimeRangeWriter.class)) { + FactPartition part = new FactPartition(partCol, fromDate, maxInterval, null, partWhereClauseFormat); partitions.add(part); part.getStorageTables().add(storageName); - part = new FactPartition(partCol, toDate, interval, null, partWhereClauseFormat); + part = new FactPartition(partCol, toDate, maxInterval, null, partWhereClauseFormat); partitions.add(part); part.getStorageTables().add(storageName); + this.participatingUpdatePeriods.add(maxInterval); log.info("Added continuous fact partition for storage table {}", storageName); return true; } - if (!client.isStorageTableCandidateForRange(name, fromDate, toDate)) { - cubeql.addStoragePruningMsg(this, - new CandidateTablePruneCause(CandidateTablePruneCause.CandidateTablePruneCode.TIME_RANGE_NOT_ANSWERABLE)); - return false; - } else if (!client.partColExists(name, partCol)) { + if (!client.partColExists(this.getFact().getName(), storageName, partCol)) { log.info("{} does not exist in {}", partCol, name); - List<String> missingCols = new ArrayList<>(); - missingCols.add(partCol); - // cubeql.addStoragePruningMsg(this, partitionColumnsMissing(missingCols)); return false; } - Date ceilFromDate = DateUtil.getCeilDate(fromDate, interval); - Date floorToDate = DateUtil.getFloorDate(toDate, interval); + Date maxIntervalStorageTblStartDate = getStorageTableStartDate(maxInterval); + Date maxIntervalStorageTblEndDate = getStorageTableEndDate(maxInterval); - int lookAheadNumParts = conf - .getInt(CubeQueryConfUtil.getLookAheadPTPartsKey(interval), CubeQueryConfUtil.DEFAULT_LOOK_AHEAD_PT_PARTS); + TreeSet<UpdatePeriod> remainingIntervals = new TreeSet<>(updatePeriods); + remainingIntervals.remove(maxInterval); + if (!CandidateUtil.isCandidatePartiallyValidForTimeRange( + maxIntervalStorageTblStartDate, maxIntervalStorageTblEndDate,fromDate, toDate)) { + //Check the time range in remainingIntervals as maxInterval is not useful + return getPartitions(fromDate, toDate, partCol, partitions, remainingIntervals, + addNonExistingParts, failOnPartialData, missingPartitions); + } - TimeRange.Iterable.Iterator iter = TimeRange.iterable(ceilFromDate, floorToDate, interval, 1).iterator(); + Date ceilFromDate = DateUtil.getCeilDate(fromDate.after(maxIntervalStorageTblStartDate) + ? fromDate : maxIntervalStorageTblStartDate, maxInterval); + Date floorToDate = DateUtil.getFloorDate(toDate.before(maxIntervalStorageTblEndDate) + ? toDate : maxIntervalStorageTblEndDate, maxInterval); + if(ceilFromDate.equals(floorToDate) || floorToDate.before(ceilFromDate)) { + return getPartitions(fromDate, toDate, partCol, partitions, remainingIntervals, + addNonExistingParts, failOnPartialData, missingPartitions); + } + + int lookAheadNumParts = conf + .getInt(CubeQueryConfUtil.getLookAheadPTPartsKey(maxInterval), CubeQueryConfUtil.DEFAULT_LOOK_AHEAD_PT_PARTS); + TimeRange.Iterable.Iterator iter = TimeRange.iterable(ceilFromDate, floorToDate, maxInterval, 1).iterator(); // add partitions from ceilFrom to floorTo while (iter.hasNext()) { Date dt = iter.next(); Date nextDt = iter.peekNext(); - FactPartition part = new FactPartition(partCol, dt, interval, null, partWhereClauseFormat); + FactPartition part = new FactPartition(partCol, dt, maxInterval, null, partWhereClauseFormat); updatePartitionStorage(part); log.debug("Storage tables containing Partition {} are: {}", part, part.getStorageTables()); if (part.isFound()) { log.debug("Adding existing partition {}", part); partitions.add(part); + this.participatingUpdatePeriods.add(maxInterval); log.debug("Looking for look ahead process time partitions for {}", part); if (processTimePartCol == null) { log.debug("processTimePartCol is null"); } else if (partCol.equals(processTimePartCol)) { log.debug("part column is process time col"); - } else if (updatePeriods.first().equals(interval)) { + } else if (updatePeriods.first().equals(maxInterval)) { log.debug("Update period is the least update period"); } else if ((iter.getNumIters() - iter.getCounter()) > lookAheadNumParts) { // see if this is the part of the last-n look ahead partitions @@ -422,12 +531,12 @@ public class StorageCandidate implements Candidate, CandidateTable { // final partitions are required if no partitions from // look-ahead // process time are present - TimeRange.Iterable.Iterator processTimeIter = TimeRange.iterable(nextDt, lookAheadNumParts, interval, 1) + TimeRange.Iterable.Iterator processTimeIter = TimeRange.iterable(nextDt, lookAheadNumParts, maxInterval, 1) .iterator(); while (processTimeIter.hasNext()) { Date pdt = processTimeIter.next(); Date nextPdt = processTimeIter.peekNext(); - FactPartition processTimePartition = new FactPartition(processTimePartCol, pdt, interval, null, + FactPartition processTimePartition = new FactPartition(processTimePartCol, pdt, maxInterval, null, partWhereClauseFormat); updatePartitionStorage(processTimePartition); if (processTimePartition.isFound()) { @@ -436,7 +545,7 @@ public class StorageCandidate implements Candidate, CandidateTable { log.debug("Looked ahead process time partition {} is not found", processTimePartition); TreeSet<UpdatePeriod> newset = new TreeSet<UpdatePeriod>(); newset.addAll(updatePeriods); - newset.remove(interval); + newset.remove(maxInterval); log.debug("newset of update periods:{}", newset); if (!newset.isEmpty()) { // Get partitions for look ahead process time @@ -465,50 +574,35 @@ public class StorageCandidate implements Candidate, CandidateTable { } } else { log.info("Partition:{} does not exist in any storage table", part); - TreeSet<UpdatePeriod> newset = new TreeSet<>(); - newset.addAll(updatePeriods); - newset.remove(interval); - if (!getPartitions(dt, nextDt, partCol, partitions, newset, false, failOnPartialData, missingPartitions)) { + if (!getPartitions(dt, nextDt, partCol, partitions, remainingIntervals, false, failOnPartialData, + missingPartitions)) { log.debug("Adding non existing partition {}", part); if (addNonExistingParts) { // Add non existing partitions for all cases of whether we populate all non existing or not. + this.participatingUpdatePeriods.add(maxInterval); missingPartitions.add(part); if (!failOnPartialData) { - if (!client.isStorageTablePartitionACandidate(name, part.getPartSpec())) { - log.info("Storage tables not eligible"); - return false; - } partitions.add(part); part.getStorageTables().add(storageName); } } else { - log.info("No finer granual partitions exist for {}", part); + log.info("No finer granualar partitions exist for {}", part); return false; } } else { - log.debug("Finer granual partitions added for {}", part); + log.debug("Finer granualar partitions added for {}", part); } } } - return - getPartitions(fromDate, ceilFromDate, partCol, partitions, updatePeriods, + + return getPartitions(fromDate, ceilFromDate, partCol, partitions, remainingIntervals, addNonExistingParts, failOnPartialData, missingPartitions) - && getPartitions(floorToDate, toDate, partCol, partitions, updatePeriods, + && getPartitions(floorToDate, toDate, partCol, partitions, remainingIntervals, addNonExistingParts, failOnPartialData, missingPartitions); } - /** - * Finds all the partitions for a storage table with a particular time range. - * - * @param timeRange : TimeRange to check completeness for. TimeRange consists of start time, end time and the - * partition column - * @param failOnPartialData : fail fast if the candidate can answer the query only partially - * @return Steps: - * 1. Get skip storage causes - * 2. getPartitions for timeRange and validUpdatePeriods - */ @Override - public boolean evaluateCompleteness(TimeRange timeRange, TimeRange parentTimeRange, boolean failOnPartialData) + public boolean evaluateCompleteness(TimeRange timeRange, TimeRange queriedTimeRange, boolean failOnPartialData) throws LensException { // Check the measure tags. if (!evaluateMeasuresCompleteness(timeRange)) { @@ -565,7 +659,7 @@ public class StorageCandidate implements Candidate, CandidateTable { } } // Add all the partitions. participatingPartitions contains all the partitions for previous time ranges also. - this.participatingPartitions.addAll(rangeParts); + rangeToPartitions.put(queriedTimeRange, rangeParts); numQueriedParts += rangeParts.size(); if (!unsupportedTimeDims.isEmpty()) { log.info("Not considering storage candidate:{} as it doesn't support time dimensions: {}", this, @@ -582,16 +676,20 @@ public class StorageCandidate implements Candidate, CandidateTable { } String extraWhere = extraWhereClauseFallback.toString(); if (!StringUtils.isEmpty(extraWhere)) { - rangeToWhere.put(parentTimeRange, "((" + rangeWriter - .getTimeRangeWhereClause(cubeql, cubeql.getAliasForTableName(cubeql.getCube().getName()), rangeParts) - + ") and (" + extraWhere + "))"); - } else { - rangeToWhere.put(parentTimeRange, rangeWriter - .getTimeRangeWhereClause(cubeql, cubeql.getAliasForTableName(cubeql.getCube().getName()), rangeParts)); + rangeToExtraWhereFallBack.put(queriedTimeRange, extraWhere); } return true; } + @Override + public Set<FactPartition> getParticipatingPartitions() { + Set<FactPartition> allPartitions = new HashSet<>(numQueriedParts); + for (Set<FactPartition> rangePartitions : rangeToPartitions.values()) { + allPartitions.addAll(rangePartitions); + } + return allPartitions; + } + private boolean evaluateMeasuresCompleteness(TimeRange timeRange) throws LensException { String factDataCompletenessTag = fact.getDataCompletenessTag(); if (factDataCompletenessTag == null) { @@ -649,12 +747,11 @@ public class StorageCandidate implements Candidate, CandidateTable { boolean addNonExistingParts, boolean failOnPartialData, PartitionRangesForPartitionColumns missingParts) throws LensException { Set<FactPartition> partitions = new TreeSet<>(); - if (timeRange != null && timeRange.isCoverableBy(updatePeriods) && getPartitions(timeRange.getFromDate(), - timeRange.getToDate(), timeRange.getPartitionColumn(), partitions, updatePeriods, addNonExistingParts, - failOnPartialData, missingParts)) { - return partitions; + if (timeRange != null && timeRange.isCoverableBy(updatePeriods)) { + getPartitions(timeRange.getFromDate(), timeRange.getToDate(), timeRange.getPartitionColumn(), + partitions, updatePeriods, addNonExistingParts, failOnPartialData, missingParts); } - return new TreeSet<>(); + return partitions; } @Override @@ -714,8 +811,8 @@ public class StorageCandidate implements Candidate, CandidateTable { StorageCandidate storageCandidateObj = (StorageCandidate) obj; //Assuming that same instance of cube and fact will be used across StorageCandidate s and hence relying directly //on == check for these. - return (this.cube == storageCandidateObj.cube && this.fact == storageCandidateObj.fact && this.storageName - .equals(storageCandidateObj.storageName)); + return (this.cube == storageCandidateObj.cube && this.fact == storageCandidateObj.fact && this.name + .equals(storageCandidateObj.name)); } @Override @@ -725,7 +822,7 @@ public class StorageCandidate implements Candidate, CandidateTable { @Override public String toString() { - return getName(); + return getResolvedName(); } void addValidUpdatePeriod(UpdatePeriod updatePeriod) { @@ -754,9 +851,9 @@ public class StorageCandidate implements Candidate, CandidateTable { String database = SessionState.get().getCurrentDatabase(); String ret; if (alias == null || alias.isEmpty()) { - ret = name; + ret = getResolvedName(); } else { - ret = name + " " + alias; + ret = getResolvedName() + " " + alias; } if (StringUtils.isNotBlank(database) && !"default".equalsIgnoreCase(database)) { ret = database + "." + ret; @@ -764,56 +861,178 @@ public class StorageCandidate implements Candidate, CandidateTable { return ret; } - Set<UpdatePeriod> getAllUpdatePeriods() { - return getFact().getUpdatePeriods().get(getStorageName()); + boolean isUpdatePeriodUseful(UpdatePeriod updatePeriod) { + return cubeql.getTimeRanges().stream().anyMatch(timeRange -> isUpdatePeriodUseful(timeRange, updatePeriod)); + } + + /** + * Is the update period useful for this time range. e.g. for a time range of hours and days, monthly + * and yearly update periods are useless. DAILY and HOURLY are useful. It further checks if the update + * period answers the range at least partially based on start and end times configured at update period + * level or at storage or fact level. + * @param timeRange The time range + * @param updatePeriod Update period + * @return Whether it's useless + */ + private boolean isUpdatePeriodUseful(TimeRange timeRange, UpdatePeriod updatePeriod) { + try { + if (!CandidateUtil.isCandidatePartiallyValidForTimeRange(getStorageTableStartDate(updatePeriod), + getStorageTableEndDate(updatePeriod), timeRange.getFromDate(), timeRange.getToDate())) + { + return false; + } + Date storageTblStartDate = getStorageTableStartDate(updatePeriod); + Date storageTblEndDate = getStorageTableEndDate(updatePeriod); + TimeRange.getBuilder() //TODO date calculation to move to util method and resued + .fromDate(timeRange.getFromDate().after(storageTblStartDate) ? timeRange.getFromDate() : storageTblStartDate) + .toDate(timeRange.getToDate().before(storageTblEndDate) ? timeRange.getToDate() : storageTblEndDate) + .partitionColumn(timeRange.getPartitionColumn()) + .build() + .truncate(updatePeriod); + return true; + } catch (LensException e) { + return false; + } } - // TODO: move them to upper interfaces for complex candidates. Right now it's unused, so keeping it just here - public boolean isTimeRangeCoverable(TimeRange timeRange) { - return isTimeRangeCoverable(timeRange.getFromDate(), timeRange.getToDate(), getValidUpdatePeriods()); + + /** + * Is time range coverable based on valid update periods of this storage candidate + * + * @param timeRange + * @return + * @throws LensException + */ + public boolean isTimeRangeCoverable(TimeRange timeRange) throws LensException { + return isTimeRangeCoverable(timeRange.getFromDate(), timeRange.getToDate(), validUpdatePeriods); } /** * Is the time range coverable by given update periods. * Extracts the max update period, then extracts maximum amount of range from the middle that this update - * period can cover. Then recurses on the ramaining ranges on the left and right side of the extracted chunk + * period can cover. Then recurses on the remaining ranges on the left and right side of the extracted chunk * using one less update period. - * //TODO: add tests if the function is useful. Till then it's untested and unverified. - * @param fromDate From date - * @param toDate To date - * @param periods Update periods to check + * + * @param timeRangeStart + * @param timeRangeEnd + * @param intervals Update periods to check * @return Whether time range is coverable by provided update periods or not. */ - private boolean isTimeRangeCoverable(Date fromDate, Date toDate, Set<UpdatePeriod> periods) { - UpdatePeriod interval = CubeFactTable.maxIntervalInRange(fromDate, toDate, periods); - if (fromDate.equals(toDate)) { + private boolean isTimeRangeCoverable(Date timeRangeStart, Date timeRangeEnd, + Set<UpdatePeriod> intervals) throws LensException { + if (timeRangeStart.equals(timeRangeEnd) || timeRangeStart.after(timeRangeEnd)) { return true; - } else if (periods.isEmpty()) { + } + if (intervals == null || intervals.isEmpty()) { return false; - } else { - Set<UpdatePeriod> remaining = Sets.difference(periods, Sets.newHashSet(interval)); - return interval != null - && isTimeRangeCoverable(fromDate, DateUtil.getCeilDate(fromDate, interval), remaining) - && isTimeRangeCoverable(DateUtil.getFloorDate(toDate, interval), toDate, remaining); } + + UpdatePeriod maxInterval = CubeFactTable.maxIntervalInRange(timeRangeStart, timeRangeEnd, intervals); + if (maxInterval == null) { + return false; + } + + if (maxInterval == UpdatePeriod.CONTINUOUS + && cubeql.getRangeWriter().getClass().equals(BetweenTimeRangeWriter.class)) { + return true; + } + + Date maxIntervalStorageTableStartDate = getStorageTableStartDate(maxInterval); + Date maxIntervalStorageTableEndDate = getStorageTableEndDate(maxInterval); + Set<UpdatePeriod> remainingIntervals = Sets.difference(intervals, Sets.newHashSet(maxInterval)); + + if (!CandidateUtil.isCandidatePartiallyValidForTimeRange( + maxIntervalStorageTableStartDate, maxIntervalStorageTableEndDate, timeRangeStart, timeRangeEnd)) { + //Check the time range in remainingIntervals as maxInterval is not useful + return isTimeRangeCoverable(timeRangeStart, timeRangeEnd, remainingIntervals); + } + + Date ceilFromDate = DateUtil.getCeilDate(timeRangeStart.after(maxIntervalStorageTableStartDate) + ? timeRangeStart : maxIntervalStorageTableStartDate, maxInterval); + Date floorToDate = DateUtil.getFloorDate(timeRangeEnd.before(maxIntervalStorageTableEndDate) + ? timeRangeEnd : maxIntervalStorageTableEndDate, maxInterval); + if (ceilFromDate.equals(floorToDate) || floorToDate.before(ceilFromDate)) { + return isTimeRangeCoverable(timeRangeStart, timeRangeEnd, remainingIntervals); + } + + //ceilFromDate to floorToDate time range is covered by maxInterval (though there may be holes.. but that's ok) + //Check the remaining part of time range in remainingIntervals + return isTimeRangeCoverable(timeRangeStart, ceilFromDate, remainingIntervals) + && isTimeRangeCoverable(floorToDate, timeRangeEnd, remainingIntervals); } - boolean isUpdatePeriodUseful(UpdatePeriod updatePeriod) { - return cubeql.getTimeRanges().stream().anyMatch(timeRange -> isUpdatePeriodUseful(timeRange, updatePeriod)); + private Date getStorageTableStartDate(UpdatePeriod interval) throws LensException { + if (!isStorageTblsAtUpdatePeriodLevel) { + //In this case the start time and end time is at Storage Level and will be same for all update periods. + return this.startTime; + } + return client.getStorageTableStartDate( + client.getStorageTableName(fact.getName(), storageName, interval), fact.getName()); + } + + private Date getStorageTableEndDate(UpdatePeriod interval) throws LensException { + if (!isStorageTblsAtUpdatePeriodLevel) { + //In this case the start time and end time is at Storage Level and will be same for all update periods. + return this.endTime; + } + return client.getStorageTableEndDate( + client.getStorageTableName(fact.getName(), storageName, interval), fact.getName()); + } + + + public String getResolvedName() { + if (resolvedName == null) { + return name; + } + return resolvedName; } /** - * Is the update period useful for this time range. e.g. for a time range of hours and days, monthly - * and yearly update periods are useless. DAILY and HOURLY are useful - * @param timeRange The time range - * @param updatePeriod Update period - * @return Whether it's useless + * Splits the Storage Candidates into multiple Storage Candidates if storage candidate has multiple + * storage tables (one per update period) + * + * @return + * @throws LensException */ - private boolean isUpdatePeriodUseful(TimeRange timeRange, UpdatePeriod updatePeriod) { - try { - timeRange.truncate(updatePeriod); - return true; - } catch (LensException e) { - return false; + public Collection<StorageCandidate> splitAtUpdatePeriodLevelIfReq() throws LensException { + if (!isStorageTblsAtUpdatePeriodLevel) { + return Lists.newArrayList(this); // No need to explode in this case } + return getPeriodSpecificStorageCandidates(); } + + private Collection<StorageCandidate> getPeriodSpecificStorageCandidates() throws LensException { + List<StorageCandidate> periodSpecificScList = new ArrayList<>(participatingUpdatePeriods.size()); + StorageCandidate updatePeriodSpecificSc; + for (UpdatePeriod period : participatingUpdatePeriods) { + updatePeriodSpecificSc = new StorageCandidate(this); + updatePeriodSpecificSc.truncatePartitions(period); + updatePeriodSpecificSc.setResolvedName(client.getStorageTableName(fact.getName(), + storageName, period)); + periodSpecificScList.add(updatePeriodSpecificSc); + } + return periodSpecificScList; + } + + /** + * Truncates partitions in {@link #rangeToPartitions} such that only partitions belonging to + * the passed undatePeriod are retained. + * @param updatePeriod + */ + private void truncatePartitions(UpdatePeriod updatePeriod) { + Iterator<Map.Entry<TimeRange, Set<FactPartition>>> rangeItr = rangeToPartitions.entrySet().iterator(); + while (rangeItr.hasNext()) { + Map.Entry<TimeRange, Set<FactPartition>> rangeEntry = rangeItr.next(); + Iterator<FactPartition> partitionItr = rangeEntry.getValue().iterator(); + while (partitionItr.hasNext()) { + if (!partitionItr.next().getPeriod().equals(updatePeriod)) { + partitionItr.remove(); + } + } + if (rangeEntry.getValue().isEmpty()) { + rangeItr.remove(); + } + } + } + + } http://git-wip-us.apache.org/repos/asf/lens/blob/363f132d/lens-cube/src/main/java/org/apache/lens/cube/parse/StorageTableResolver.java ---------------------------------------------------------------------- diff --git a/lens-cube/src/main/java/org/apache/lens/cube/parse/StorageTableResolver.java b/lens-cube/src/main/java/org/apache/lens/cube/parse/StorageTableResolver.java index 22e2e09..1a2d9a9 100644 --- a/lens-cube/src/main/java/org/apache/lens/cube/parse/StorageTableResolver.java +++ b/lens-cube/src/main/java/org/apache/lens/cube/parse/StorageTableResolver.java @@ -31,6 +31,7 @@ import org.apache.lens.server.api.error.LensException; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; +import com.google.common.collect.Lists; import lombok.extern.slf4j.Slf4j; /** * Resolve storages and partitions of all candidate tables and prunes candidate tables with missing storages or @@ -120,10 +121,21 @@ class StorageTableResolver implements ContextRewriter { while (candidateIterator.hasNext()) { Candidate candidate = candidateIterator.next(); boolean isComplete = true; + boolean isTimeRangeAnswerableByThisCandidate = true; for (TimeRange range : cubeql.getTimeRanges()) { + if (!candidate.isTimeRangeCoverable(range)) { + isTimeRangeAnswerableByThisCandidate = false; + log.info("Not considering candidate:{} as it can not cover time range {}", candidate, range); + cubeql.addCandidatePruningMsg(candidate, + CandidateTablePruneCause.storageNotAvailableInRange(Lists.newArrayList(range))); + break; + } isComplete &= candidate.evaluateCompleteness(range, range, failOnPartialData); } - if (failOnPartialData && !isComplete) { + if (!isTimeRangeAnswerableByThisCandidate) { + candidateIterator.remove(); + } + else if (failOnPartialData && !isComplete) { candidateIterator.remove(); log.info("Not considering candidate:{} as its data is not is not complete", candidate); Set<StorageCandidate> scSet = CandidateUtil.getStorageCandidates(candidate); @@ -144,7 +156,6 @@ class StorageTableResolver implements ContextRewriter { } } - private void resolveDimStorageTablesAndPartitions(CubeQueryContext cubeql) throws LensException { Set<Dimension> allDims = new HashSet<>(cubeql.getDimensions()); for (Aliased<Dimension> dim : cubeql.getOptionalDimensions()) { @@ -261,10 +272,17 @@ class StorageTableResolver implements ContextRewriter { } List<String> validUpdatePeriods = CubeQueryConfUtil .getStringList(conf, CubeQueryConfUtil.getValidUpdatePeriodsKey(sc.getFact().getName(), sc.getStorageName())); - boolean isStorageAdded = false; + boolean isUpdatePeriodForStorageAdded = false; Map<String, SkipUpdatePeriodCode> skipUpdatePeriodCauses = new HashMap<>(); - // Populate valid update periods. + if (cubeql.getTimeRanges().stream().noneMatch(range -> CandidateUtil.isPartiallyValidForTimeRange(sc, range))) { + cubeql.addStoragePruningMsg(sc, + new CandidateTablePruneCause(CandidateTablePruneCode.TIME_RANGE_NOT_ANSWERABLE)); + it.remove(); + continue; + } + + // Populate valid update periods abd check validity at update period level for (UpdatePeriod updatePeriod : sc.getFact().getUpdatePeriods().get(sc.getStorageName())) { if (maxInterval != null && updatePeriod.compareTo(maxInterval) > 0) { // if user supplied max interval, all intervals larger than that are useless. @@ -279,20 +297,20 @@ class StorageTableResolver implements ContextRewriter { } else if (!sc.isUpdatePeriodUseful(updatePeriod)) { // if the storage candidate finds this update useful to keep looking at the time ranges queried skipUpdatePeriodCauses.put(updatePeriod.toString(), - SkipUpdatePeriodCode.QUERY_INTERVAL_SMALLER_THAN_UPDATE_PERIOD); + SkipUpdatePeriodCode.TIME_RANGE_NOT_ANSWERABLE_BY_UPDATE_PERIOD); } else { - isStorageAdded = true; + isUpdatePeriodForStorageAdded = true; sc.addValidUpdatePeriod(updatePeriod); } } - // this is just for documentation/debugging, so we can see why some update periods are skipped. + // For DEBUG purpose only to see why some update periods are skipped. if (!skipUpdatePeriodCauses.isEmpty()) { sc.setUpdatePeriodRejectionCause(skipUpdatePeriodCauses); } // if no update periods were added in previous section, we skip this storage candidate - if (!isStorageAdded) { + if (!isUpdatePeriodForStorageAdded) { if (skipUpdatePeriodCauses.values().stream().allMatch( - SkipUpdatePeriodCode.QUERY_INTERVAL_SMALLER_THAN_UPDATE_PERIOD::equals)) { + SkipUpdatePeriodCode.TIME_RANGE_NOT_ANSWERABLE_BY_UPDATE_PERIOD::equals)) { // all update periods bigger than query range, it means time range not answerable. cubeql.addStoragePruningMsg(sc, new CandidateTablePruneCause(CandidateTablePruneCode.TIME_RANGE_NOT_ANSWERABLE)); @@ -301,27 +319,30 @@ class StorageTableResolver implements ContextRewriter { } it.remove(); } else { - Set<CandidateTablePruneCause> allPruningCauses = new HashSet<>(2); + //set the dates again as they can change based on ValidUpdatePeriod + sc.setStorageStartAndEndDate(); + Set<CandidateTablePruneCause> allPruningCauses = new HashSet<>(cubeql.getTimeRanges().size()); for (TimeRange range : cubeql.getTimeRanges()) { CandidateTablePruneCause pruningCauseForThisTimeRange = null; - if (!client.isStorageTableCandidateForRange(storageTable, range.getFromDate(), range.getToDate())) { + if (!CandidateUtil.isPartiallyValidForTimeRange(sc, range)) { //This is the prune cause pruningCauseForThisTimeRange = new CandidateTablePruneCause(CandidateTablePruneCode.TIME_RANGE_NOT_ANSWERABLE); } //Check partition (or fallback) column existence + //TODO Shouldn't we check atleast once for the existence of part column else if (cubeql.shouldReplaceTimeDimWithPart()) { - if (!client.partColExists(storageTable, range.getPartitionColumn())) { + if (!client.partColExists(sc.getFact().getName(), sc.getStorageName(), range.getPartitionColumn())) { pruningCauseForThisTimeRange = partitionColumnsMissing(range.getPartitionColumn()); TimeRange fallBackRange = StorageUtil.getFallbackRange(range, sc.getFact().getName(), cubeql); while (fallBackRange != null) { pruningCauseForThisTimeRange = null; - if (!client.partColExists(storageTable, fallBackRange.getPartitionColumn())) { + if (!client.partColExists(sc.getFact().getName(), sc.getStorageName(), + fallBackRange.getPartitionColumn())) { pruningCauseForThisTimeRange = partitionColumnsMissing(fallBackRange.getPartitionColumn()); fallBackRange = StorageUtil.getFallbackRange(fallBackRange, sc.getFact().getName(), cubeql); } else { - if (!client.isStorageTableCandidateForRange(storageTable, fallBackRange.getFromDate(), - fallBackRange.getToDate())) { + if (!CandidateUtil.isPartiallyValidForTimeRange(sc, fallBackRange)) { pruningCauseForThisTimeRange = new CandidateTablePruneCause(CandidateTablePruneCode.TIME_RANGE_NOT_ANSWERABLE); } @@ -336,6 +357,7 @@ class StorageTableResolver implements ContextRewriter { } } if (!allPruningCauses.isEmpty()) { + // TODO if this storage can answer atleast one time range , why prune it ? it.remove(); cubeql.addStoragePruningMsg(sc, allPruningCauses.toArray(new CandidateTablePruneCause[0])); } http://git-wip-us.apache.org/repos/asf/lens/blob/363f132d/lens-cube/src/main/java/org/apache/lens/cube/parse/UnionCandidate.java ---------------------------------------------------------------------- diff --git a/lens-cube/src/main/java/org/apache/lens/cube/parse/UnionCandidate.java b/lens-cube/src/main/java/org/apache/lens/cube/parse/UnionCandidate.java index d97e7b8..62ebf71 100644 --- a/lens-cube/src/main/java/org/apache/lens/cube/parse/UnionCandidate.java +++ b/lens-cube/src/main/java/org/apache/lens/cube/parse/UnionCandidate.java @@ -55,6 +55,17 @@ public class UnionCandidate implements Candidate { } @Override + public boolean isTimeRangeCoverable(TimeRange timeRange) throws LensException { + Map<Candidate, TimeRange> candidateRange = splitTimeRangeForChildren(timeRange); + for (Map.Entry<Candidate, TimeRange> entry : candidateRange.entrySet()) { + if (!entry.getKey().isTimeRangeCoverable(entry.getValue())) { + return false; + } + } + return true; + } + + @Override public Collection<String> getColumns() { // In UnionCandidate all columns are same, return the columns // of first child http://git-wip-us.apache.org/repos/asf/lens/blob/363f132d/lens-cube/src/main/java/org/apache/lens/cube/parse/UnionQueryWriter.java ---------------------------------------------------------------------- diff --git a/lens-cube/src/main/java/org/apache/lens/cube/parse/UnionQueryWriter.java b/lens-cube/src/main/java/org/apache/lens/cube/parse/UnionQueryWriter.java index 2ca1181..f9717fa 100644 --- a/lens-cube/src/main/java/org/apache/lens/cube/parse/UnionQueryWriter.java +++ b/lens-cube/src/main/java/org/apache/lens/cube/parse/UnionQueryWriter.java @@ -51,12 +51,15 @@ public class UnionQueryWriter { private Map<String, ASTNode> storageCandidateToSelectAstMap = new HashMap<>(); private AliasDecider aliasDecider = new DefaultAliasDecider(); private CubeQueryContext cubeql; - Set<StorageCandidate> storageCandidates; + Collection<StorageCandidate> storageCandidates; public static final String DEFAULT_MEASURE = "0.0"; - public UnionQueryWriter(Candidate cand, CubeQueryContext cubeql) { + public UnionQueryWriter(Collection<StorageCandidate> storageCandidates, CubeQueryContext cubeql) { + if (storageCandidates == null || storageCandidates.size()<=1) { + throw new IllegalArgumentException("There should be atleast two storage candidates to write a union query"); + } this.cubeql = cubeql; - storageCandidates = CandidateUtil.getStorageCandidates(cand); + this.storageCandidates = storageCandidates; } public String toHQL(Map<StorageCandidate, Set<Dimension>> factDimMap) throws LensException { http://git-wip-us.apache.org/repos/asf/lens/blob/363f132d/lens-cube/src/main/java/org/apache/lens/cube/parse/join/AutoJoinContext.java ---------------------------------------------------------------------- diff --git a/lens-cube/src/main/java/org/apache/lens/cube/parse/join/AutoJoinContext.java b/lens-cube/src/main/java/org/apache/lens/cube/parse/join/AutoJoinContext.java index b5b0b30..aab671e 100644 --- a/lens-cube/src/main/java/org/apache/lens/cube/parse/join/AutoJoinContext.java +++ b/lens-cube/src/main/java/org/apache/lens/cube/parse/join/AutoJoinContext.java @@ -356,7 +356,7 @@ public class AutoJoinContext { * @param dimsToQuery * @throws LensException */ - public void pruneAllPaths(CubeInterface cube, Set<StorageCandidate> scSet, + public void pruneAllPaths(CubeInterface cube, Collection<StorageCandidate> scSet, final Map<Dimension, CandidateDim> dimsToQuery) throws LensException { // Remove join paths which cannot be satisfied by the resolved candidate // fact and dimension tables http://git-wip-us.apache.org/repos/asf/lens/blob/363f132d/lens-cube/src/test/java/org/apache/lens/cube/metadata/DateFactory.java ---------------------------------------------------------------------- diff --git a/lens-cube/src/test/java/org/apache/lens/cube/metadata/DateFactory.java b/lens-cube/src/test/java/org/apache/lens/cube/metadata/DateFactory.java index 3bdc047..855f54a 100644 --- a/lens-cube/src/test/java/org/apache/lens/cube/metadata/DateFactory.java +++ b/lens-cube/src/test/java/org/apache/lens/cube/metadata/DateFactory.java @@ -65,11 +65,16 @@ public class DateFactory { } public static class GeneralDateOffsetProvider extends HashMap<UpdatePeriod, DateOffsetProvider> { + boolean truncate; + public GeneralDateOffsetProvider(boolean truncate) { + this.truncate = truncate; + } + @Override public DateOffsetProvider get(Object key) { if (!containsKey(key) && key instanceof UpdatePeriod) { UpdatePeriod up = (UpdatePeriod) key; - put(up, new DateOffsetProvider(up)); + put(up, new DateOffsetProvider(up, truncate)); } return super.get(key); } @@ -79,13 +84,19 @@ public class DateFactory { } } - public static final GeneralDateOffsetProvider GENERAL_DATE_OFFSET_PROVIDER = new GeneralDateOffsetProvider(); + public static final GeneralDateOffsetProvider GENERAL_DATE_OFFSET_PROVIDER = new GeneralDateOffsetProvider(false); + public static final GeneralDateOffsetProvider GENERAL_TRUNCATED_DATE_OFFSET_PROVIDER + = new GeneralDateOffsetProvider(true); public static Date getDateWithOffset(UpdatePeriod up, int offset) { return GENERAL_DATE_OFFSET_PROVIDER.get(up, offset); } + public static Date getTruncatedDateWithOffset(UpdatePeriod up, int offset) { + return GENERAL_TRUNCATED_DATE_OFFSET_PROVIDER.get(up, offset); + } + public static String getDateStringWithOffset(UpdatePeriod up, int offset) { return getDateStringWithOffset(up, offset, up); } @@ -141,6 +152,10 @@ public class DateFactory { public static final Date NOW; public static final Date TWODAYS_BACK; public static final Date TWO_MONTHS_BACK; + public static final Date THIS_MONTH_TRUNCATED; + public static final Date ONE_MONTH_BACK_TRUNCATED; + public static final Date TWO_MONTHS_BACK_TRUNCATED; + public static final Date THREE_MONTHS_BACK_TRUNCATED; public static final Date BEFORE_6_DAYS; public static final Date BEFORE_4_DAYS; @@ -159,6 +174,8 @@ public class DateFactory { public static final String TWO_MONTHS_RANGE_UPTO_DAYS; public static final String TWO_MONTHS_RANGE_UPTO_HOURS; public static final String TWO_DAYS_RANGE_BEFORE_4_DAYS; + public static final String THREE_MONTHS_RANGE_UPTO_DAYS; + public static final String THREE_MONTHS_RANGE_UPTO_MONTH; private static boolean zerothHour; @@ -179,6 +196,12 @@ public class DateFactory { TWO_MONTHS_BACK = getDateWithOffset(MONTHLY, -2); System.out.println("Test TWO_MONTHS_BACK:" + TWO_MONTHS_BACK); + THIS_MONTH_TRUNCATED = getTruncatedDateWithOffset(MONTHLY, 0); + ONE_MONTH_BACK_TRUNCATED = getTruncatedDateWithOffset(MONTHLY, -1); + TWO_MONTHS_BACK_TRUNCATED = getTruncatedDateWithOffset(MONTHLY, -2); + THREE_MONTHS_BACK_TRUNCATED = getTruncatedDateWithOffset(MONTHLY, -3); + + // Before 4days BEFORE_4_DAYS = getDateWithOffset(DAILY, -4); BEFORE_6_DAYS = getDateWithOffset(DAILY, -6); @@ -196,6 +219,8 @@ public class DateFactory { TWO_MONTHS_RANGE_UPTO_MONTH = getTimeRangeString(MONTHLY, -2, 0); TWO_MONTHS_RANGE_UPTO_DAYS = getTimeRangeString(MONTHLY, -2, 0, DAILY); TWO_MONTHS_RANGE_UPTO_HOURS = getTimeRangeString(MONTHLY, -2, 0, HOURLY); + THREE_MONTHS_RANGE_UPTO_DAYS = getTimeRangeString(MONTHLY, -3, 0, DAILY); + THREE_MONTHS_RANGE_UPTO_MONTH = getTimeRangeString(MONTHLY, -3, 0, MONTHLY); // calculate LAST_HOUR_TIME_RANGE LAST_HOUR_TIME_RANGE = getTimeRangeString(HOURLY, -1, 0); http://git-wip-us.apache.org/repos/asf/lens/blob/363f132d/lens-cube/src/test/java/org/apache/lens/cube/parse/CubeTestSetup.java ---------------------------------------------------------------------- diff --git a/lens-cube/src/test/java/org/apache/lens/cube/parse/CubeTestSetup.java b/lens-cube/src/test/java/org/apache/lens/cube/parse/CubeTestSetup.java index 2d031f4..860db28 100644 --- a/lens-cube/src/test/java/org/apache/lens/cube/parse/CubeTestSetup.java +++ b/lens-cube/src/test/java/org/apache/lens/cube/parse/CubeTestSetup.java @@ -452,6 +452,16 @@ public class CubeTestSetup { StorageUtil.getWherePartClause("dt", TEST_CUBE_NAME, parts)); return storageTableToWhereClause; } + + public static Map<String, String> getWhereForMonthly(String monthlyTable, Date startMonth, Date endMonth) { + Map<String, String> storageTableToWhereClause = new LinkedHashMap<String, String>(); + List<String> parts = new ArrayList<String>(); + addParts(parts, MONTHLY, startMonth, endMonth); + storageTableToWhereClause.put(getDbName() + monthlyTable, + StorageUtil.getWherePartClause("dt", TEST_CUBE_NAME, parts)); + return storageTableToWhereClause; + } + public static Map<String, String> getWhereForHourly2days(String hourlyTable) { return getWhereForHourly2days(TEST_CUBE_NAME, hourlyTable); } @@ -953,4 +963,4 @@ public class CubeTestSetup { System.out.println("--query- " + query); HQLParser.printAST(HQLParser.parseHQL(query, new HiveConf())); } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/lens/blob/363f132d/lens-cube/src/test/java/org/apache/lens/cube/parse/TestBaseCubeQueries.java ---------------------------------------------------------------------- diff --git a/lens-cube/src/test/java/org/apache/lens/cube/parse/TestBaseCubeQueries.java b/lens-cube/src/test/java/org/apache/lens/cube/parse/TestBaseCubeQueries.java index 2bf0554..591b24b 100644 --- a/lens-cube/src/test/java/org/apache/lens/cube/parse/TestBaseCubeQueries.java +++ b/lens-cube/src/test/java/org/apache/lens/cube/parse/TestBaseCubeQueries.java @@ -873,17 +873,18 @@ public class TestBaseCubeQueries extends TestQueryRewrite { assertEquals(ctx.getCandidates().size(), 1); assertEquals(CandidateUtil.getStorageCandidates(ctx.getCandidates().iterator().next()).size(), 1); StorageCandidate sc = CandidateUtil.getStorageCandidates(ctx.getCandidates().iterator().next()).iterator().next(); - assertEquals(sc.getRangeToWhere().size(), 2); - for(Map.Entry<TimeRange, String> entry: sc.getRangeToWhere().entrySet()) { - if (entry.getKey().getPartitionColumn().equals("dt")) { - ASTNode parsed = HQLParser.parseExpr(entry.getValue()); + assertEquals(sc.getRangeToPartitions().size(), 2); + for(TimeRange range: sc.getRangeToPartitions().keySet()) { + String rangeWhere = CandidateUtil.getTimeRangeWhereClasue(ctx.getRangeWriter(), sc, range); + if (range.getPartitionColumn().equals("dt")) { + ASTNode parsed = HQLParser.parseExpr(rangeWhere); assertEquals(parsed.getToken().getType(), KW_AND); - assertTrue(entry.getValue().substring(((CommonToken) parsed.getToken()).getStopIndex() + 1) + assertTrue(rangeWhere.substring(((CommonToken) parsed.getToken()).getStopIndex() + 1) .toLowerCase().contains(dTimeWhereClause)); - assertFalse(entry.getValue().substring(0, ((CommonToken) parsed.getToken()).getStartIndex()) + assertFalse(rangeWhere.substring(0, ((CommonToken) parsed.getToken()).getStartIndex()) .toLowerCase().contains("and")); - } else if (entry.getKey().getPartitionColumn().equals("ttd")) { - assertFalse(entry.getValue().toLowerCase().contains("and")); + } else if (range.getPartitionColumn().equals("ttd")) { + assertFalse(rangeWhere.toLowerCase().contains("and")); } else { throw new LensException("Unexpected"); }
