http://git-wip-us.apache.org/repos/asf/lens/blob/ae83caae/lens-cube/src/main/java/org/apache/lens/cube/parse/SingleFactSingleStorageHQLContext.java ---------------------------------------------------------------------- diff --git a/lens-cube/src/main/java/org/apache/lens/cube/parse/SingleFactSingleStorageHQLContext.java b/lens-cube/src/main/java/org/apache/lens/cube/parse/SingleFactSingleStorageHQLContext.java deleted file mode 100644 index dbc84ed..0000000 --- a/lens-cube/src/main/java/org/apache/lens/cube/parse/SingleFactSingleStorageHQLContext.java +++ /dev/null @@ -1,73 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.lens.cube.parse; - -import java.util.Map; -import java.util.Set; - -import org.apache.lens.cube.metadata.Dimension; -import org.apache.lens.server.api.error.LensException; - -/** - * HQL context class which passes down all query strings to come from DimOnlyHQLContext and works with fact being - * queried. - * <p/> - * Updates from string with join clause expanded - */ -class SingleFactSingleStorageHQLContext extends DimOnlyHQLContext { - - private final CandidateFact fact; - private String storageAlias; - - SingleFactSingleStorageHQLContext(CandidateFact fact, Map<Dimension, CandidateDim> dimsToQuery, - CubeQueryContext query, QueryAST ast) - throws LensException { - this(fact, dimsToQuery, dimsToQuery.keySet(), query, ast); - } - - SingleFactSingleStorageHQLContext(CandidateFact fact, Map<Dimension, CandidateDim> dimsToQuery, - Set<Dimension> dimsQueried, CubeQueryContext query, QueryAST ast) - throws LensException { - super(dimsToQuery, dimsQueried, query, ast); - this.fact = fact; - } - - SingleFactSingleStorageHQLContext(CandidateFact fact, String storageAlias, Map<Dimension, CandidateDim> dimsToQuery, - CubeQueryContext query, QueryAST ast) throws LensException { - this(fact, dimsToQuery, query, ast); - this.storageAlias = storageAlias; - } - - @Override - protected String getFromTable() throws LensException { - if (getQuery().isAutoJoinResolved()) { - if (storageAlias != null) { - return storageAlias; - } else { - return fact.getStorageString(query.getAliasForTableName(query.getCube().getName())); - } - } else { - if (fact.getStorageTables().size() == 1) { - return getQuery().getQBFromString(fact, getDimsToQuery()); - } else { - return storageAlias; - } - } - } -}
http://git-wip-us.apache.org/repos/asf/lens/blob/ae83caae/lens-cube/src/main/java/org/apache/lens/cube/parse/StorageCandidate.java ---------------------------------------------------------------------- diff --git a/lens-cube/src/main/java/org/apache/lens/cube/parse/StorageCandidate.java b/lens-cube/src/main/java/org/apache/lens/cube/parse/StorageCandidate.java new file mode 100644 index 0000000..d95cf27 --- /dev/null +++ b/lens-cube/src/main/java/org/apache/lens/cube/parse/StorageCandidate.java @@ -0,0 +1,1040 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.lens.cube.parse; + +import static org.apache.lens.cube.parse.CandidateTablePruneCause.CandidateTablePruneCode; +import static org.apache.lens.cube.parse.CandidateTablePruneCause.SkipUpdatePeriodCode; +import static org.apache.lens.cube.parse.CandidateTablePruneCause.timeDimNotSupported; +import static org.apache.lens.cube.parse.StorageUtil.getFallbackRange; +import static org.apache.lens.cube.parse.StorageUtil.joinWithAnd; +import static org.apache.lens.cube.parse.StorageUtil.processCubeColForDataCompleteness; +import static org.apache.lens.cube.parse.StorageUtil.processExpressionsForCompleteness; + +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.*; + +import org.apache.lens.cube.metadata.AbstractCubeTable; +import org.apache.lens.cube.metadata.CubeFactTable; +import org.apache.lens.cube.metadata.CubeInterface; +import org.apache.lens.cube.metadata.CubeMetastoreClient; +import org.apache.lens.cube.metadata.DateUtil; +import org.apache.lens.cube.metadata.Dimension; +import org.apache.lens.cube.metadata.FactPartition; +import org.apache.lens.cube.metadata.MetastoreUtil; +import org.apache.lens.cube.metadata.TimeRange; +import org.apache.lens.cube.metadata.UpdatePeriod; +import org.apache.lens.server.api.error.LensException; +import org.apache.lens.server.api.metastore.DataCompletenessChecker; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.lib.Node; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.parse.ASTNode; +import org.apache.hadoop.hive.ql.parse.HiveParser; +import org.apache.hadoop.hive.ql.session.SessionState; + +import org.antlr.runtime.CommonToken; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import lombok.Getter; +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; + +/** + * Represents a fact on a storage table and the dimensions it needs to be joined with to answer the query + */ +@Slf4j +public class StorageCandidate implements Candidate, CandidateTable { + + // TODO union : Put comments on member variables. + @Getter + private final CubeQueryContext cubeql; + private final String processTimePartCol; + private final CubeMetastoreClient client; + private final String completenessPartCol; + private final float completenessThreshold; + + /** + * Name of this storage candidate = storageName_factName + */ + @Getter + @Setter + private String name; + + /** + * This is the storage table specific name. It is used while generating query from this candidate + */ + @Setter + private String resolvedName; + /** + * Valid update periods populated by Phase 1. + */ + @Getter + private TreeSet<UpdatePeriod> validUpdatePeriods = new TreeSet<>(); + + /** + * These are the update periods that finally participate in partitions. + * @see #getParticipatingPartitions() + */ + @Getter + private TreeSet<UpdatePeriod> participatingUpdatePeriods = new TreeSet<>(); + + @Getter + @Setter + Map<String, SkipUpdatePeriodCode> updatePeriodRejectionCause; + private Configuration conf = null; + + /** + * This map holds Tags (A tag refers to one or more measures) that have incomplete (below configured threshold) data. + * Value is a map of date string and %completeness. + */ + @Getter + private Map<String, Map<String, Float>> dataCompletenessMap = new HashMap<>(); + private SimpleDateFormat partWhereClauseFormat = null; + /** + * Participating fact, storage and dimensions for this StorageCandidate + */ + @Getter + private CubeFactTable fact; + @Getter + private String storageName; + @Getter + private String storageTable; + @Getter + @Setter + private QueryAST queryAst; + @Getter + private Map<TimeRange, Set<FactPartition>> rangeToPartitions = new LinkedHashMap<>(); + @Getter + private Map<TimeRange, String> rangeToExtraWhereFallBack = new LinkedHashMap<>(); + @Getter + @Setter + private String whereString; + @Getter + private Set<Integer> answerableMeasurePhraseIndices = Sets.newHashSet(); + @Getter + @Setter + private String fromString; + @Getter + private CubeInterface cube; + @Getter + private Map<Dimension, CandidateDim> dimsToQuery; + @Getter + private Date startTime; + @Getter + private Date endTime; + /** + * Cached fact columns + */ + private Collection<String> factColumns; + + /** + * Non existing partitions + */ + @Getter + private Set<String> nonExistingPartitions = new HashSet<>(); + @Getter + private int numQueriedParts = 0; + + /** + * This will be true if this storage candidate has multiple storage tables (one per update period) + * https://issues.apache.org/jira/browse/LENS-1386 + */ + @Getter + private boolean isStorageTblsAtUpdatePeriodLevel; + + public StorageCandidate(StorageCandidate sc) throws LensException { + this(sc.getCube(), sc.getFact(), sc.getStorageName(), sc.getCubeql()); + this.validUpdatePeriods.addAll(sc.getValidUpdatePeriods()); + this.whereString = sc.whereString; + this.fromString = sc.fromString; + this.dimsToQuery = sc.dimsToQuery; + this.factColumns = sc.factColumns; + this.answerableMeasurePhraseIndices.addAll(sc.answerableMeasurePhraseIndices); + if (sc.getQueryAst() != null) { + this.queryAst = new DefaultQueryAST(); + CandidateUtil.copyASTs(sc.getQueryAst(), new DefaultQueryAST()); + } + for (Map.Entry<TimeRange, Set<FactPartition>> entry : sc.getRangeToPartitions().entrySet()) { + rangeToPartitions.put(entry.getKey(), new LinkedHashSet<>(entry.getValue())); + } + this.rangeToExtraWhereFallBack = sc.rangeToExtraWhereFallBack; + this.answerableMeasurePhraseIndices = sc.answerableMeasurePhraseIndices; + } + + public StorageCandidate(CubeInterface cube, CubeFactTable fact, String storageName, CubeQueryContext cubeql) + throws LensException { + if ((cube == null) || (fact == null) || (storageName == null)) { + throw new IllegalArgumentException("Cube,fact and storageName should be non null"); + } + this.cube = cube; + this.fact = fact; + this.cubeql = cubeql; + this.storageName = storageName; + this.storageTable = MetastoreUtil.getFactOrDimtableStorageTableName(fact.getName(), storageName); + this.conf = cubeql.getConf(); + this.name = fact.getName(); + this.processTimePartCol = conf.get(CubeQueryConfUtil.PROCESS_TIME_PART_COL); + String formatStr = conf.get(CubeQueryConfUtil.PART_WHERE_CLAUSE_DATE_FORMAT); + if (formatStr != null) { + this.partWhereClauseFormat = new SimpleDateFormat(formatStr); + } + completenessPartCol = conf.get(CubeQueryConfUtil.COMPLETENESS_CHECK_PART_COL); + completenessThreshold = conf + .getFloat(CubeQueryConfUtil.COMPLETENESS_THRESHOLD, CubeQueryConfUtil.DEFAULT_COMPLETENESS_THRESHOLD); + client = cubeql.getMetastoreClient(); + Set<String> storageTblNames = client.getStorageTables(fact.getName(), storageName); + if (storageTblNames.size() > 1) { + isStorageTblsAtUpdatePeriodLevel = true; + } else { + //if this.storageTable is equal to the storage table name it implies isStorageTblsAtUpdatePeriodLevel is false + isStorageTblsAtUpdatePeriodLevel = !storageTblNames.iterator().next().equalsIgnoreCase(storageTable); + } + setStorageStartAndEndDate(); + } + + /** + * Sets Storage candidates start and end time based on underlying storage-tables + * + * CASE 1 + * If has Storage has single storage table* + * Storage start time = max(storage start time , fact start time) + * Storage end time = min(storage end time , fact start time) + * + * CASE 2 + * If the Storage has multiple Storage Tables (one per update period)* + * update Period start Time = Max(update start time, fact start time) + * update Period end Time = Min(update end time, fact end time) + * Stoarge start and end time is derived form the underlying update period start and end times. + * Storage start time = min(update1 start time ,...., updateN start time) + * Storage end time = max(update1 end time ,...., updateN end time) + * + * Note in Case 2 its assumed that the time range supported by different update periods are either + * overlapping(Example 2) or form a non overlapping but continuous chain(Example 1) as illustrated + * in examples below + * + * Example 1 + * A Storage has 2 Non Oevralpping but continuous Update Periods. + * MONTHLY with start time as now.month -13 months and end time as now.month -2months and + * DAILY with start time as now.day and end time as now.month -2months + * Then this Sorage will have an implied start time as now.month -13 month and end time as now.day + * + * Example 2 + * A Storage has 2 Overlapping Update Periods. + * MONTHLY with start time as now.month -13 months and end time as now.month -1months and + * DAILY with start time as now.day and end time as now.month -2months + * Then this Sorage will have an implied start time as now.month -13 month and end time as now.day + * + * @throws LensException + */ + public void setStorageStartAndEndDate() throws LensException { + if (this.startTime != null && !this.isStorageTblsAtUpdatePeriodLevel) { + //If the times are already set and are not dependent of update period, no point setting times again. + return; + } + List<Date> startDates = new ArrayList<>(); + List<Date> endDates = new ArrayList<>(); + for (String storageTablePrefix : getValidStorageTableNames()) { + startDates.add(client.getStorageTableStartDate(storageTablePrefix, fact.getName())); + endDates.add(client.getStorageTableEndDate(storageTablePrefix, fact.getName())); + } + this.startTime = Collections.min(startDates); + this.endTime = Collections.max(endDates); + } + + private Set<String> getValidStorageTableNames() throws LensException { + if (!validUpdatePeriods.isEmpty()) { + // In this case skip invalid update periods and get storage tables only for valid ones. + Set<String> uniqueStorageTables = new HashSet<>(); + for (UpdatePeriod updatePeriod : validUpdatePeriods) { + uniqueStorageTables.add(client.getStorageTableName(fact.getName(), storageName, updatePeriod)); + } + return uniqueStorageTables; + } else { + //Get all storage tables. + return client.getStorageTables(fact.getName(), storageName); + } + } + + private void setMissingExpressions(Set<Dimension> queriedDims) throws LensException { + setFromString(String.format("%s", getFromTable())); + setWhereString(joinWithAnd( + genWhereClauseWithDimPartitions(whereString, queriedDims), cubeql.getConf().getBoolean( + CubeQueryConfUtil.REPLACE_TIMEDIM_WITH_PART_COL, CubeQueryConfUtil.DEFAULT_REPLACE_TIMEDIM_WITH_PART_COL) + ? getPostSelectionWhereClause() : null)); + if (cubeql.getHavingAST() != null) { + queryAst.setHavingAST(MetastoreUtil.copyAST(cubeql.getHavingAST())); + } + } + + private String genWhereClauseWithDimPartitions(String originalWhere, Set<Dimension> queriedDims) { + StringBuilder whereBuf; + if (originalWhere != null) { + whereBuf = new StringBuilder(originalWhere); + } else { + whereBuf = new StringBuilder(); + } + + // add where clause for all dimensions + if (cubeql != null) { + boolean added = (originalWhere != null); + for (Dimension dim : queriedDims) { + CandidateDim cdim = dimsToQuery.get(dim); + String alias = cubeql.getAliasForTableName(dim.getName()); + if (!cdim.isWhereClauseAdded() && !StringUtils.isBlank(cdim.getWhereClause())) { + appendWhereClause(whereBuf, StorageUtil.getWhereClause(cdim, alias), added); + added = true; + } + } + } + if (whereBuf.length() == 0) { + return null; + } + return whereBuf.toString(); + } + + private static void appendWhereClause(StringBuilder filterCondition, String whereClause, boolean hasMore) { + // Make sure we add AND only when there are already some conditions in where + // clause + if (hasMore && !filterCondition.toString().isEmpty() && !StringUtils.isBlank(whereClause)) { + filterCondition.append(" AND "); + } + + if (!StringUtils.isBlank(whereClause)) { + filterCondition.append("("); + filterCondition.append(whereClause); + filterCondition.append(")"); + } + } + + private String getPostSelectionWhereClause() throws LensException { + return null; + } + + void setAnswerableMeasurePhraseIndices(int index) { + answerableMeasurePhraseIndices.add(index); + } + + public String toHQL(Set<Dimension> queriedDims) throws LensException { + setMissingExpressions(queriedDims); + // Check if the picked candidate is a StorageCandidate and in that case + // update the selectAST with final alias. + if (this == cubeql.getPickedCandidate()) { + CandidateUtil.updateFinalAlias(queryAst.getSelectAST(), cubeql); + updateOrderByWithFinalAlias(queryAst.getOrderByAST(), queryAst.getSelectAST()); + } else { + queryAst.setHavingAST(null); + } + return CandidateUtil + .buildHQLString(queryAst.getSelectString(), fromString, whereString, queryAst.getGroupByString(), + queryAst.getOrderByString(), queryAst.getHavingString(), queryAst.getLimitValue()); + } + + /** + * Update Orderby children with final alias used in select + * + * @param orderby Order by AST + * @param select Select AST + */ + private void updateOrderByWithFinalAlias(ASTNode orderby, ASTNode select) { + if (orderby == null) { + return; + } + for (Node orderbyNode : orderby.getChildren()) { + ASTNode orderBychild = (ASTNode) orderbyNode; + for (Node selectNode : select.getChildren()) { + ASTNode selectChild = (ASTNode) selectNode; + if (selectChild.getChildCount() == 2) { + if (HQLParser.getString((ASTNode) selectChild.getChild(0)) + .equals(HQLParser.getString((ASTNode) orderBychild.getChild(0)))) { + ASTNode alias = new ASTNode((ASTNode) selectChild.getChild(1)); + orderBychild.replaceChildren(0, 0, alias); + break; + } + } + } + } + } + + @Override + public String getStorageString(String alias) { + return storageName + " " + alias; + } + + @Override + public AbstractCubeTable getTable() { + return fact; + } + + @Override + public AbstractCubeTable getBaseTable() { + return (AbstractCubeTable) cube; + } + + @Override + public Collection<String> getColumns() { + if (factColumns == null) { + factColumns = fact.getValidColumns(); + if (factColumns == null) { + factColumns = fact.getAllFieldNames(); + } + } + return factColumns; + } + + @Override + public double getCost() { + return fact.weight(); + } + + @Override + public boolean contains(Candidate candidate) { + return this.equals(candidate); + } + + @Override + public Collection<Candidate> getChildren() { + return null; + } + + private void updatePartitionStorage(FactPartition part) throws LensException { + try { + if (client.factPartitionExists(fact, part, storageTable)) { + part.getStorageTables().add(storageTable); + part.setFound(true); + } + } catch (HiveException e) { + log.warn("Hive exception while getting storage table partition", e); + } + } + + /** + * Gets FactPartitions for the given fact using the following logic + * + * 1. Find the max update interval that will be used for the query. Lets assume time + * range is 15 Sep to 15 Dec and the fact has two storage with update periods as MONTHLY,DAILY,HOURLY. + * In this case the data for [15 sep - 1 oct)U[1 Dec - 15 Dec) will be answered by DAILY partitions + * and [1 oct - 1Dec) will be answered by MONTHLY partitions. The max interavl for this query will be MONTHLY. + * + * 2.Prune Storgaes that do not fall in the queries time range. + * {@link CubeMetastoreClient#isStorageTableCandidateForRange(String, Date, Date)} + * + * 3. Iterate over max interavl . In out case it will give two months Oct and Nov. Find partitions for + * these two months.Check validity of FactPartitions for Oct and Nov + * via {@link #updatePartitionStorage(FactPartition)}. + * If the partition is missing, try getting partitions for the time range form other update periods (DAILY,HOURLY). + * This is achieved by calling getPartitions() recursively but passing only 2 update periods (DAILY,HOURLY) + * + * 4.If the monthly partitions are found, check for lookahead partitions and call getPartitions recursively for the + * remaining time intervals i.e, [15 sep - 1 oct) and [1 Dec - 15 Dec) + * + * TODO union : Move this into util. + */ + private boolean getPartitions(Date fromDate, Date toDate, String partCol, Set<FactPartition> partitions, + TreeSet<UpdatePeriod> updatePeriods, boolean addNonExistingParts, boolean failOnPartialData, + PartitionRangesForPartitionColumns missingPartitions) throws LensException { + if (fromDate.equals(toDate) || fromDate.after(toDate)) { + return true; + } + if (updatePeriods == null | updatePeriods.isEmpty()) { + return false; + } + + UpdatePeriod maxInterval = CubeFactTable.maxIntervalInRange(fromDate, toDate, updatePeriods); + if (maxInterval == null) { + log.info("No max interval for range: {} to {}", fromDate, toDate); + return false; + } + + if (maxInterval == UpdatePeriod.CONTINUOUS + && cubeql.getRangeWriter().getClass().equals(BetweenTimeRangeWriter.class)) { + FactPartition part = new FactPartition(partCol, fromDate, maxInterval, null, partWhereClauseFormat); + partitions.add(part); + part.getStorageTables().add(storageName); + part = new FactPartition(partCol, toDate, maxInterval, null, partWhereClauseFormat); + partitions.add(part); + part.getStorageTables().add(storageName); + this.participatingUpdatePeriods.add(maxInterval); + log.info("Added continuous fact partition for storage table {}", storageName); + return true; + } + + if (!client.partColExists(this.getFact().getName(), storageName, partCol)) { + log.info("{} does not exist in {}", partCol, storageTable); + return false; + } + + Date maxIntervalStorageTblStartDate = getStorageTableStartDate(maxInterval); + Date maxIntervalStorageTblEndDate = getStorageTableEndDate(maxInterval); + + TreeSet<UpdatePeriod> remainingIntervals = new TreeSet<>(updatePeriods); + remainingIntervals.remove(maxInterval); + if (!CandidateUtil.isCandidatePartiallyValidForTimeRange( + maxIntervalStorageTblStartDate, maxIntervalStorageTblEndDate, fromDate, toDate)) { + //Check the time range in remainingIntervals as maxInterval is not useful + return getPartitions(fromDate, toDate, partCol, partitions, remainingIntervals, + addNonExistingParts, failOnPartialData, missingPartitions); + } + + Date ceilFromDate = DateUtil.getCeilDate(fromDate.after(maxIntervalStorageTblStartDate) + ? fromDate : maxIntervalStorageTblStartDate, maxInterval); + Date floorToDate = DateUtil.getFloorDate(toDate.before(maxIntervalStorageTblEndDate) + ? toDate : maxIntervalStorageTblEndDate, maxInterval); + if (ceilFromDate.equals(floorToDate) || floorToDate.before(ceilFromDate)) { + return getPartitions(fromDate, toDate, partCol, partitions, remainingIntervals, + addNonExistingParts, failOnPartialData, missingPartitions); + } + + int lookAheadNumParts = conf + .getInt(CubeQueryConfUtil.getLookAheadPTPartsKey(maxInterval), CubeQueryConfUtil.DEFAULT_LOOK_AHEAD_PT_PARTS); + TimeRange.Iterable.Iterator iter = TimeRange.iterable(ceilFromDate, floorToDate, maxInterval, 1).iterator(); + // add partitions from ceilFrom to floorTo + while (iter.hasNext()) { + Date dt = iter.next(); + Date nextDt = iter.peekNext(); + FactPartition part = new FactPartition(partCol, dt, maxInterval, null, partWhereClauseFormat); + updatePartitionStorage(part); + log.debug("Storage tables containing Partition {} are: {}", part, part.getStorageTables()); + if (part.isFound()) { + log.debug("Adding existing partition {}", part); + partitions.add(part); + this.participatingUpdatePeriods.add(maxInterval); + log.debug("Looking for look ahead process time partitions for {}", part); + if (processTimePartCol == null) { + log.debug("processTimePartCol is null"); + } else if (partCol.equals(processTimePartCol)) { + log.debug("part column is process time col"); + } else if (updatePeriods.first().equals(maxInterval)) { + log.debug("Update period is the least update period"); + } else if ((iter.getNumIters() - iter.getCounter()) > lookAheadNumParts) { + // see if this is the part of the last-n look ahead partitions + log.debug("Not a look ahead partition"); + } else { + log.debug("Looking for look ahead process time partitions for {}", part); + // check if finer partitions are required + // final partitions are required if no partitions from + // look-ahead + // process time are present + TimeRange.Iterable.Iterator processTimeIter = TimeRange.iterable(nextDt, lookAheadNumParts, maxInterval, 1) + .iterator(); + while (processTimeIter.hasNext()) { + Date pdt = processTimeIter.next(); + Date nextPdt = processTimeIter.peekNext(); + FactPartition processTimePartition = new FactPartition(processTimePartCol, pdt, maxInterval, null, + partWhereClauseFormat); + updatePartitionStorage(processTimePartition); + if (processTimePartition.isFound()) { + log.debug("Finer parts not required for look-ahead partition :{}", part); + } else { + log.debug("Looked ahead process time partition {} is not found", processTimePartition); + TreeSet<UpdatePeriod> newset = new TreeSet<UpdatePeriod>(); + newset.addAll(updatePeriods); + newset.remove(maxInterval); + log.debug("newset of update periods:{}", newset); + if (!newset.isEmpty()) { + // Get partitions for look ahead process time + log.debug("Looking for process time partitions between {} and {}", pdt, nextPdt); + Set<FactPartition> processTimeParts = getPartitions( + TimeRange.getBuilder().fromDate(pdt).toDate(nextPdt).partitionColumn(processTimePartCol).build(), + newset, true, failOnPartialData, missingPartitions); + log.debug("Look ahead partitions: {}", processTimeParts); + TimeRange timeRange = TimeRange.getBuilder().fromDate(dt).toDate(nextDt).build(); + for (FactPartition pPart : processTimeParts) { + log.debug("Looking for finer partitions in pPart: {}", pPart); + for (Date date : timeRange.iterable(pPart.getPeriod(), 1)) { + FactPartition innerPart = new FactPartition(partCol, date, pPart.getPeriod(), pPart, + partWhereClauseFormat); + updatePartitionStorage(innerPart); + innerPart.setFound(pPart.isFound()); + if (innerPart.isFound()) { + partitions.add(innerPart); + } + } + log.debug("added all sub partitions blindly in pPart: {}", pPart); + } + } + } + } + } + } else { + log.info("Partition:{} does not exist in any storage table", part); + if (!getPartitions(dt, nextDt, partCol, partitions, remainingIntervals, false, failOnPartialData, + missingPartitions)) { + log.debug("Adding non existing partition {}", part); + if (addNonExistingParts) { + // Add non existing partitions for all cases of whether we populate all non existing or not. + this.participatingUpdatePeriods.add(maxInterval); + missingPartitions.add(part); + if (!failOnPartialData) { + partitions.add(part); + part.getStorageTables().add(storageName); + } + } else { + log.info("No finer granualar partitions exist for {}", part); + return false; + } + } else { + log.debug("Finer granualar partitions added for {}", part); + } + } + } + + return getPartitions(fromDate, ceilFromDate, partCol, partitions, remainingIntervals, + addNonExistingParts, failOnPartialData, missingPartitions) + && getPartitions(floorToDate, toDate, partCol, partitions, remainingIntervals, + addNonExistingParts, failOnPartialData, missingPartitions); + } + + @Override + public boolean evaluateCompleteness(TimeRange timeRange, TimeRange queriedTimeRange, boolean failOnPartialData) + throws LensException { + // Check the measure tags. + if (!evaluateMeasuresCompleteness(timeRange)) { + log.info("Storage candidate:{} has partitions with incomplete data: {} for given ranges: {}", this, + dataCompletenessMap, cubeql.getTimeRanges()); + if (failOnPartialData) { + return false; + } + } + PartitionRangesForPartitionColumns missingParts = new PartitionRangesForPartitionColumns(); + PruneCauses<StorageCandidate> storagePruningMsgs = cubeql.getStoragePruningMsgs(); + Set<String> unsupportedTimeDims = Sets.newHashSet(); + Set<String> partColsQueried = Sets.newHashSet(); + partColsQueried.add(timeRange.getPartitionColumn()); + StringBuilder extraWhereClauseFallback = new StringBuilder(); + Set<FactPartition> rangeParts = getPartitions(timeRange, validUpdatePeriods, true, failOnPartialData, missingParts); + String partCol = timeRange.getPartitionColumn(); + boolean partColNotSupported = rangeParts.isEmpty(); + String storageTableName = getStorageTable(); + + if (storagePruningMsgs.containsKey(this)) { + List<CandidateTablePruneCause> causes = storagePruningMsgs.get(this); + // Find the PART_COL_DOES_NOT_EXISTS + for (CandidateTablePruneCause cause : causes) { + if (cause.getCause().equals(CandidateTablePruneCode.PART_COL_DOES_NOT_EXIST)) { + partColNotSupported &= cause.getNonExistantPartCols().contains(partCol); + } + } + } else { + partColNotSupported = false; + } + TimeRange prevRange = timeRange; + String sep = ""; + while (rangeParts.isEmpty()) { + String timeDim = cubeql.getBaseCube().getTimeDimOfPartitionColumn(partCol); + if (partColNotSupported && !CandidateUtil.factHasColumn(getFact(), timeDim)) { + unsupportedTimeDims.add(cubeql.getBaseCube().getTimeDimOfPartitionColumn(timeRange.getPartitionColumn())); + break; + } + TimeRange fallBackRange = getFallbackRange(prevRange, this.getFact().getName(), cubeql); + log.info("No partitions for range:{}. fallback range: {}", timeRange, fallBackRange); + if (fallBackRange == null) { + break; + } + partColsQueried.add(fallBackRange.getPartitionColumn()); + rangeParts = getPartitions(fallBackRange, validUpdatePeriods, true, failOnPartialData, missingParts); + extraWhereClauseFallback.append(sep) + .append(prevRange.toTimeDimWhereClause(cubeql.getAliasForTableName(cubeql.getCube()), timeDim)); + sep = " AND "; + prevRange = fallBackRange; + partCol = prevRange.getPartitionColumn(); + if (!rangeParts.isEmpty()) { + break; + } + } + // Add all the partitions. participatingPartitions contains all the partitions for previous time ranges also. + rangeToPartitions.put(queriedTimeRange, rangeParts); + numQueriedParts += rangeParts.size(); + if (!unsupportedTimeDims.isEmpty()) { + log.info("Not considering storage candidate:{} as it doesn't support time dimensions: {}", this, + unsupportedTimeDims); + cubeql.addStoragePruningMsg(this, timeDimNotSupported(unsupportedTimeDims)); + return false; + } + Set<String> nonExistingParts = missingParts.toSet(partColsQueried); + // TODO union : Relook at this. + nonExistingPartitions.addAll(nonExistingParts); + if (rangeParts.size() == 0 || (failOnPartialData && !nonExistingParts.isEmpty())) { + log.info("Not considering storage candidate:{} as no partitions for fallback range:{}", this, timeRange); + return false; + } + String extraWhere = extraWhereClauseFallback.toString(); + if (!StringUtils.isEmpty(extraWhere)) { + rangeToExtraWhereFallBack.put(queriedTimeRange, extraWhere); + } + return true; + } + + @Override + public Set<FactPartition> getParticipatingPartitions() { + Set<FactPartition> allPartitions = new HashSet<>(numQueriedParts); + for (Set<FactPartition> rangePartitions : rangeToPartitions.values()) { + allPartitions.addAll(rangePartitions); + } + return allPartitions; + } + + private boolean evaluateMeasuresCompleteness(TimeRange timeRange) throws LensException { + String factDataCompletenessTag = fact.getDataCompletenessTag(); + if (factDataCompletenessTag == null) { + log.info("Not checking completeness for the fact table:{} as the dataCompletenessTag is not set", fact); + return true; + } + Set<String> measureTag = new HashSet<>(); + Map<String, String> tagToMeasureOrExprMap = new HashMap<>(); + + processExpressionsForCompleteness(cubeql, measureTag, tagToMeasureOrExprMap); + + Set<String> measures = cubeql.getQueriedMsrs(); + if (measures == null) { + measures = new HashSet<>(); + } + for (String measure : measures) { + processCubeColForDataCompleteness(cubeql, measure, measure, measureTag, tagToMeasureOrExprMap); + } + //Checking if dataCompletenessTag is set for the fact + if (measureTag.isEmpty()) { + log.info("No Queried measures with the dataCompletenessTag, hence skipping the availability check"); + return true; + } + boolean isDataComplete = false; + DataCompletenessChecker completenessChecker = client.getCompletenessChecker(); + DateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + formatter.setTimeZone(TimeZone.getTimeZone("UTC")); + if (!timeRange.getPartitionColumn().equals(completenessPartCol)) { + log.info("Completeness check not available for partCol:{}", timeRange.getPartitionColumn()); + return true; + } + Date from = timeRange.getFromDate(); + Date to = timeRange.getToDate(); + Map<String, Map<Date, Float>> completenessMap = completenessChecker + .getCompleteness(factDataCompletenessTag, from, to, measureTag); + if (completenessMap != null && !completenessMap.isEmpty()) { + for (Map.Entry<String, Map<Date, Float>> measureCompleteness : completenessMap.entrySet()) { + String tag = measureCompleteness.getKey(); + for (Map.Entry<Date, Float> completenessResult : measureCompleteness.getValue().entrySet()) { + if (completenessResult.getValue() < completenessThreshold) { + log.info("Completeness for the measure_tag {} is {}, threshold: {}, for the hour {}", tag, + completenessResult.getValue(), completenessThreshold, formatter.format(completenessResult.getKey())); + String measureorExprFromTag = tagToMeasureOrExprMap.get(tag); + dataCompletenessMap.computeIfAbsent(measureorExprFromTag, k -> new HashMap<>()) + .put(formatter.format(completenessResult.getKey()), completenessResult.getValue()); + isDataComplete = false; + } + } + } + } + return isDataComplete; + } + + private Set<FactPartition> getPartitions(TimeRange timeRange, TreeSet<UpdatePeriod> updatePeriods, + boolean addNonExistingParts, boolean failOnPartialData, PartitionRangesForPartitionColumns missingParts) + throws LensException { + Set<FactPartition> partitions = new TreeSet<>(); + if (timeRange != null && timeRange.isCoverableBy(updatePeriods)) { + getPartitions(timeRange.getFromDate(), timeRange.getToDate(), timeRange.getPartitionColumn(), + partitions, updatePeriods, addNonExistingParts, failOnPartialData, missingParts); + } + return partitions; + } + + @Override + public boolean isExpressionEvaluable(ExpressionResolver.ExpressionContext expr) { + return expr.isEvaluable(this); + } + + /** + * Update selectAST for StorageCandidate + * 1. Delete projected select expression if it's not answerable by StorageCandidate. + * 2. Replace the queried alias with select alias if both are different in a select expr. + * + * @param cubeql + * @throws LensException + */ + + public void updateAnswerableSelectColumns(CubeQueryContext cubeql) throws LensException { + // update select AST with selected fields + int currentChild = 0; + for (int i = 0; i < cubeql.getSelectAST().getChildCount(); i++) { + ASTNode selectExpr = (ASTNode) queryAst.getSelectAST().getChild(currentChild); + Set<String> exprCols = HQLParser.getColsInExpr(cubeql.getAliasForTableName(cubeql.getCube()), selectExpr); + if (getColumns().containsAll(exprCols)) { + ASTNode aliasNode = HQLParser.findNodeByPath(selectExpr, HiveParser.Identifier); + String alias = cubeql.getSelectPhrases().get(i).getSelectAlias(); + if (aliasNode != null) { + String queryAlias = aliasNode.getText(); + if (!queryAlias.equals(alias)) { + // replace the alias node + ASTNode newAliasNode = new ASTNode(new CommonToken(HiveParser.Identifier, alias)); + queryAst.getSelectAST().getChild(currentChild) + .replaceChildren(selectExpr.getChildCount() - 1, selectExpr.getChildCount() - 1, newAliasNode); + } + } else { + // add column alias + ASTNode newAliasNode = new ASTNode(new CommonToken(HiveParser.Identifier, alias)); + queryAst.getSelectAST().getChild(currentChild).addChild(newAliasNode); + } + } else { + queryAst.getSelectAST().deleteChild(currentChild); + currentChild--; + } + currentChild++; + } + } + + @Override + public boolean equals(Object obj) { + if (super.equals(obj)) { + return true; + } + + if (obj == null || !(obj instanceof StorageCandidate)) { + return false; + } + + StorageCandidate storageCandidateObj = (StorageCandidate) obj; + //Assuming that same instance of cube and fact will be used across StorageCandidate s and hence relying directly + //on == check for these. + return (this.cube == storageCandidateObj.cube && this.fact == storageCandidateObj.fact && this.storageTable + .equals(storageCandidateObj.storageTable)); + } + + @Override + public int hashCode() { + return this.storageTable.hashCode(); + } + + @Override + public String toString() { + return getResolvedName(); + } + + void addValidUpdatePeriod(UpdatePeriod updatePeriod) { + this.validUpdatePeriods.add(updatePeriod); + } + + void updateFromString(CubeQueryContext query, Set<Dimension> queryDims, + Map<Dimension, CandidateDim> dimsToQuery) throws LensException { + this.dimsToQuery = dimsToQuery; + String alias = cubeql.getAliasForTableName(cubeql.getCube().getName()); + fromString = getAliasForTable(alias); + if (query.isAutoJoinResolved()) { + fromString = query.getAutoJoinCtx().getFromString(fromString, this, queryDims, dimsToQuery, query, cubeql); + } + } + + private String getFromTable() throws LensException { + if (cubeql.isAutoJoinResolved()) { + return fromString; + } else { + return cubeql.getQBFromString(this, getDimsToQuery()); + } + } + + public String getAliasForTable(String alias) { + String database = SessionState.get().getCurrentDatabase(); + String ret; + if (alias == null || alias.isEmpty()) { + ret = getResolvedName(); + } else { + ret = getResolvedName() + " " + alias; + } + if (StringUtils.isNotBlank(database) && !"default".equalsIgnoreCase(database)) { + ret = database + "." + ret; + } + return ret; + } + + boolean isUpdatePeriodUseful(UpdatePeriod updatePeriod) { + return cubeql.getTimeRanges().stream().anyMatch(timeRange -> isUpdatePeriodUseful(timeRange, updatePeriod)); + } + + /** + * Is the update period useful for this time range. e.g. for a time range of hours and days, monthly + * and yearly update periods are useless. DAILY and HOURLY are useful. It further checks if the update + * period answers the range at least partially based on start and end times configured at update period + * level or at storage or fact level. + * @param timeRange The time range + * @param updatePeriod Update period + * @return Whether it's useless + */ + private boolean isUpdatePeriodUseful(TimeRange timeRange, UpdatePeriod updatePeriod) { + try { + if (!CandidateUtil.isCandidatePartiallyValidForTimeRange(getStorageTableStartDate(updatePeriod), + getStorageTableEndDate(updatePeriod), timeRange.getFromDate(), timeRange.getToDate())) { + return false; + } + Date storageTblStartDate = getStorageTableStartDate(updatePeriod); + Date storageTblEndDate = getStorageTableEndDate(updatePeriod); + TimeRange.getBuilder() //TODO date calculation to move to util method and resued + .fromDate(timeRange.getFromDate().after(storageTblStartDate) ? timeRange.getFromDate() : storageTblStartDate) + .toDate(timeRange.getToDate().before(storageTblEndDate) ? timeRange.getToDate() : storageTblEndDate) + .partitionColumn(timeRange.getPartitionColumn()) + .build() + .truncate(updatePeriod); + return true; + } catch (LensException e) { + return false; + } + } + + /** + * Is time range coverable based on valid update periods of this storage candidate + * + * @param timeRange + * @return + * @throws LensException + */ + public boolean isTimeRangeCoverable(TimeRange timeRange) throws LensException { + return isTimeRangeCoverable(timeRange.getFromDate(), timeRange.getToDate(), validUpdatePeriods); + } + + /** + * Is the time range coverable by given update periods. + * Extracts the max update period, then extracts maximum amount of range from the middle that this update + * period can cover. Then recurses on the remaining ranges on the left and right side of the extracted chunk + * using one less update period. + * + * @param timeRangeStart + * @param timeRangeEnd + * @param intervals Update periods to check + * @return Whether time range is coverable by provided update periods or not. + */ + private boolean isTimeRangeCoverable(Date timeRangeStart, Date timeRangeEnd, + Set<UpdatePeriod> intervals) throws LensException { + if (timeRangeStart.equals(timeRangeEnd) || timeRangeStart.after(timeRangeEnd)) { + return true; + } + if (intervals == null || intervals.isEmpty()) { + return false; + } + + UpdatePeriod maxInterval = CubeFactTable.maxIntervalInRange(timeRangeStart, timeRangeEnd, intervals); + if (maxInterval == null) { + return false; + } + + if (maxInterval == UpdatePeriod.CONTINUOUS + && cubeql.getRangeWriter().getClass().equals(BetweenTimeRangeWriter.class)) { + return true; + } + + Date maxIntervalStorageTableStartDate = getStorageTableStartDate(maxInterval); + Date maxIntervalStorageTableEndDate = getStorageTableEndDate(maxInterval); + Set<UpdatePeriod> remainingIntervals = Sets.difference(intervals, Sets.newHashSet(maxInterval)); + + if (!CandidateUtil.isCandidatePartiallyValidForTimeRange( + maxIntervalStorageTableStartDate, maxIntervalStorageTableEndDate, timeRangeStart, timeRangeEnd)) { + //Check the time range in remainingIntervals as maxInterval is not useful + return isTimeRangeCoverable(timeRangeStart, timeRangeEnd, remainingIntervals); + } + + Date ceilFromDate = DateUtil.getCeilDate(timeRangeStart.after(maxIntervalStorageTableStartDate) + ? timeRangeStart : maxIntervalStorageTableStartDate, maxInterval); + Date floorToDate = DateUtil.getFloorDate(timeRangeEnd.before(maxIntervalStorageTableEndDate) + ? timeRangeEnd : maxIntervalStorageTableEndDate, maxInterval); + if (ceilFromDate.equals(floorToDate) || floorToDate.before(ceilFromDate)) { + return isTimeRangeCoverable(timeRangeStart, timeRangeEnd, remainingIntervals); + } + + //ceilFromDate to floorToDate time range is covered by maxInterval (though there may be holes.. but that's ok) + //Check the remaining part of time range in remainingIntervals + return isTimeRangeCoverable(timeRangeStart, ceilFromDate, remainingIntervals) + && isTimeRangeCoverable(floorToDate, timeRangeEnd, remainingIntervals); + } + + private Date getStorageTableStartDate(UpdatePeriod interval) throws LensException { + if (!isStorageTblsAtUpdatePeriodLevel) { + //In this case the start time and end time is at Storage Level and will be same for all update periods. + return this.startTime; + } + return client.getStorageTableStartDate( + client.getStorageTableName(fact.getName(), storageName, interval), fact.getName()); + } + + private Date getStorageTableEndDate(UpdatePeriod interval) throws LensException { + if (!isStorageTblsAtUpdatePeriodLevel) { + //In this case the start time and end time is at Storage Level and will be same for all update periods. + return this.endTime; + } + return client.getStorageTableEndDate( + client.getStorageTableName(fact.getName(), storageName, interval), fact.getName()); + } + + + public String getResolvedName() { + if (resolvedName == null) { + return storageTable; + } + return resolvedName; + } + + /** + * Splits the Storage Candidates into multiple Storage Candidates if storage candidate has multiple + * storage tables (one per update period) + * + * @return + * @throws LensException + */ + public Collection<StorageCandidate> splitAtUpdatePeriodLevelIfReq() throws LensException { + if (!isStorageTblsAtUpdatePeriodLevel) { + return Lists.newArrayList(this); // No need to explode in this case + } + return getPeriodSpecificStorageCandidates(); + } + + private Collection<StorageCandidate> getPeriodSpecificStorageCandidates() throws LensException { + List<StorageCandidate> periodSpecificScList = new ArrayList<>(participatingUpdatePeriods.size()); + StorageCandidate updatePeriodSpecificSc; + for (UpdatePeriod period : participatingUpdatePeriods) { + updatePeriodSpecificSc = new StorageCandidate(this); + updatePeriodSpecificSc.truncatePartitions(period); + updatePeriodSpecificSc.setResolvedName(client.getStorageTableName(fact.getName(), + storageName, period)); + periodSpecificScList.add(updatePeriodSpecificSc); + } + return periodSpecificScList; + } + + /** + * Truncates partitions in {@link #rangeToPartitions} such that only partitions belonging to + * the passed undatePeriod are retained. + * @param updatePeriod + */ + private void truncatePartitions(UpdatePeriod updatePeriod) { + Iterator<Map.Entry<TimeRange, Set<FactPartition>>> rangeItr = rangeToPartitions.entrySet().iterator(); + while (rangeItr.hasNext()) { + Map.Entry<TimeRange, Set<FactPartition>> rangeEntry = rangeItr.next(); + Iterator<FactPartition> partitionItr = rangeEntry.getValue().iterator(); + while (partitionItr.hasNext()) { + if (!partitionItr.next().getPeriod().equals(updatePeriod)) { + partitionItr.remove(); + } + } + if (rangeEntry.getValue().isEmpty()) { + rangeItr.remove(); + } + } + } + + +}