http://git-wip-us.apache.org/repos/asf/lens/blob/b6f0cc3d/lens-cube/src/main/java/org/apache/lens/cube/parse/QueriedPhraseContext.java ---------------------------------------------------------------------- diff --git a/lens-cube/src/main/java/org/apache/lens/cube/parse/QueriedPhraseContext.java b/lens-cube/src/main/java/org/apache/lens/cube/parse/QueriedPhraseContext.java index 11eb8f7..64a9626 100644 --- a/lens-cube/src/main/java/org/apache/lens/cube/parse/QueriedPhraseContext.java +++ b/lens-cube/src/main/java/org/apache/lens/cube/parse/QueriedPhraseContext.java @@ -98,89 +98,93 @@ class QueriedPhraseContext extends TracksQueriedColumns implements TrackQueriedC return false; } - boolean isEvaluable(CubeQueryContext cubeQl, CandidateFact cfact) throws LensException { + /** + * TODO union: change CandidateFact to StorageCandidate. Let the callers typecast and send for now. + * @param cubeQl + * @param sc + * @return + * @throws LensException + */ + public boolean isEvaluable(CubeQueryContext cubeQl, StorageCandidate sc) throws LensException { // all measures of the queried phrase should be present for (String msr : queriedMsrs) { - if (!checkForColumnExistsAndValidForRange(cfact, msr, cubeQl)) { + if (!checkForColumnExistsAndValidForRange(sc, msr, cubeQl)) { return false; } } // all expression columns should be evaluable for (String exprCol : queriedExprColumns) { - if (!cubeQl.getExprCtx().isEvaluable(exprCol, cfact)) { - log.info("expression {} is not evaluable in fact table:{}", expr, cfact); + if (!cubeQl.getExprCtx().isEvaluable(exprCol, sc)) { + log.info("expression {} is not evaluable in fact table:{}", expr, sc); return false; } } // all dim-attributes should be present. for (String col : queriedDimAttrs) { - if (!cfact.getColumns().contains(col.toLowerCase())) { + if (!sc.getColumns().contains(col.toLowerCase())) { // check if it available as reference - if (!cubeQl.getDeNormCtx().addRefUsage(cfact, col, cubeQl.getCube().getName())) { - log.info("column {} is not available in fact table:{} ", col, cfact); + if (!cubeQl.getDeNormCtx().addRefUsage(sc, col, cubeQl.getCube().getName())) { + log.info("column {} is not available in fact table:{} ", col, sc); return false; } - } else if (!isFactColumnValidForRange(cubeQl, cfact, col)) { - log.info("column {} is not available in range queried in fact {}", col, cfact); + } else if (!isFactColumnValidForRange(cubeQl, sc, col)) { + log.info("column {} is not available in range queried in fact {}", col, sc); return false; } } return true; } - public static boolean isColumnAvailableInRange(final TimeRange range, Date startTime, Date endTime) { + private static boolean isColumnAvailableInRange(final TimeRange range, Date startTime, Date endTime) { return (isColumnAvailableFrom(range.getFromDate(), startTime) && isColumnAvailableTill(range.getToDate(), endTime)); } - public static boolean isColumnAvailableFrom(@NonNull final Date date, Date startTime) { + private static boolean isColumnAvailableFrom(@NonNull final Date date, Date startTime) { return (startTime == null) || date.equals(startTime) || date.after(startTime); } - public static boolean isColumnAvailableTill(@NonNull final Date date, Date endTime) { + private static boolean isColumnAvailableTill(@NonNull final Date date, Date endTime) { return (endTime == null) || date.equals(endTime) || date.before(endTime); } - public static boolean isFactColumnValidForRange(CubeQueryContext cubeql, CandidateTable cfact, String col) { + public static boolean isFactColumnValidForRange(CubeQueryContext cubeql, StorageCandidate sc, String col) { for(TimeRange range : cubeql.getTimeRanges()) { - if (!isColumnAvailableInRange(range, getFactColumnStartTime(cfact, col), getFactColumnEndTime(cfact, col))) { + if (!isColumnAvailableInRange(range, getFactColumnStartTime(sc, col), getFactColumnEndTime(sc, col))) { return false; } } return true; } - public static Date getFactColumnStartTime(CandidateTable table, String factCol) { + public static Date getFactColumnStartTime(StorageCandidate sc, String factCol) { Date startTime = null; - if (table instanceof CandidateFact) { - for (String key : ((CandidateFact) table).fact.getProperties().keySet()) { + for (String key : sc.getTable().getProperties().keySet()) { if (key.contains(MetastoreConstants.FACT_COL_START_TIME_PFX)) { String propCol = StringUtils.substringAfter(key, MetastoreConstants.FACT_COL_START_TIME_PFX); if (factCol.equals(propCol)) { - startTime = ((CandidateFact) table).fact.getDateFromProperty(key, false, true); + startTime = sc.getTable().getDateFromProperty(key, false, true); } } } - } return startTime; } - public static Date getFactColumnEndTime(CandidateTable table, String factCol) { + public static Date getFactColumnEndTime(StorageCandidate sc, String factCol) { Date endTime = null; - if (table instanceof CandidateFact) { - for (String key : ((CandidateFact) table).fact.getProperties().keySet()) { + for (String key : sc.getTable().getProperties().keySet()) { if (key.contains(MetastoreConstants.FACT_COL_END_TIME_PFX)) { String propCol = StringUtils.substringAfter(key, MetastoreConstants.FACT_COL_END_TIME_PFX); if (factCol.equals(propCol)) { - endTime = ((CandidateFact) table).fact.getDateFromProperty(key, false, true); + endTime = sc.getTable().getDateFromProperty(key, false, true); } } } - } - return endTime; + return endTime; } - static boolean checkForColumnExistsAndValidForRange(CandidateTable table, String column, CubeQueryContext cubeql) { - return (table.getColumns().contains(column) && isFactColumnValidForRange(cubeql, table, column)); + static boolean checkForColumnExistsAndValidForRange(StorageCandidate sc, String column, CubeQueryContext cubeql) { + return (sc.getColumns().contains(column) && isFactColumnValidForRange(cubeql, sc, column)); } + }
http://git-wip-us.apache.org/repos/asf/lens/blob/b6f0cc3d/lens-cube/src/main/java/org/apache/lens/cube/parse/QueryAST.java ---------------------------------------------------------------------- diff --git a/lens-cube/src/main/java/org/apache/lens/cube/parse/QueryAST.java b/lens-cube/src/main/java/org/apache/lens/cube/parse/QueryAST.java index 7298604..bdd6376 100644 --- a/lens-cube/src/main/java/org/apache/lens/cube/parse/QueryAST.java +++ b/lens-cube/src/main/java/org/apache/lens/cube/parse/QueryAST.java @@ -83,4 +83,6 @@ public interface QueryAST { ASTNode getOrderByAST(); void setOrderByAST(ASTNode node); + + void setJoinAST(ASTNode node); } http://git-wip-us.apache.org/repos/asf/lens/blob/b6f0cc3d/lens-cube/src/main/java/org/apache/lens/cube/parse/StorageCandidate.java ---------------------------------------------------------------------- diff --git a/lens-cube/src/main/java/org/apache/lens/cube/parse/StorageCandidate.java b/lens-cube/src/main/java/org/apache/lens/cube/parse/StorageCandidate.java new file mode 100644 index 0000000..22038f3 --- /dev/null +++ b/lens-cube/src/main/java/org/apache/lens/cube/parse/StorageCandidate.java @@ -0,0 +1,560 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.lens.cube.parse; + +import static org.apache.lens.cube.parse.CandidateTablePruneCause.*; +import static org.apache.lens.cube.parse.StorageUtil.*; + +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.*; + +import org.apache.lens.cube.metadata.*; +import org.apache.lens.server.api.error.LensException; +import org.apache.lens.server.api.metastore.DataCompletenessChecker; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.util.ReflectionUtils; +import org.apache.hadoop.hive.ql.metadata.HiveException; + +import com.google.common.collect.Sets; +import lombok.Getter; +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; + +/** + * Represents a fact on a storage table and the dimensions it needs to be joined with to answer the query + */ +@Slf4j +public class StorageCandidate implements Candidate, CandidateTable { + + @Getter + private final CubeQueryContext cubeql; + private final TimeRangeWriter rangeWriter; + private final String processTimePartCol; + private final CubeMetastoreClient client; + private final String completenessPartCol; + private final float completenessThreshold; + @Getter + private final String name; + /** + * Valid udpate periods populated by Phase 1. + */ + private TreeSet<UpdatePeriod> validUpdatePeriods = new TreeSet<>(); + private Configuration conf = null; + private Map<String, Map<String, Float>> incompleteMeasureData = new HashMap<>(); + private SimpleDateFormat partWhereClauseFormat = null; + /** + * Participating fact, storage and dimensions for this StorageCandidate + */ + @Getter + private CubeFactTable fact; + @Getter + private String storageName; + private Map<Dimension, CandidateDim> dimensions; + private Map<TimeRange, String> rangeToWhere = new LinkedHashMap<>(); + @Getter + private CubeInterface cube; + /** + * Cached fact columns + */ + private Collection<String> factColumns; + /** + * This map holds Tags (A tag refers to one or more measures) that have incomplete (below configured threshold) data. + * Value is a map of date string and %completeness. + */ + @Getter + @Setter + private Map<String, Map<String, Float>> incompleteDataDetails; + /** + * Partition calculated by getPartition() method. + */ + private Set<FactPartition> storagePartitions = new HashSet<>(); + /** + * Non existing partitions + */ + private Set<String> nonExistingPartitions = new HashSet<>(); + @Getter + private String alias = null; + + public StorageCandidate(CubeInterface cube, CubeFactTable fact, String storageName, String alias, + CubeQueryContext cubeql) { + if ((cube == null) || (fact == null) || (storageName == null) || (alias == null)) { + throw new IllegalArgumentException("Cube,fact and storageName should be non null"); + } + this.cube = cube; + this.fact = fact; + this.cubeql = cubeql; + this.storageName = storageName; + this.conf = cubeql.getConf(); + this.alias = alias; + this.name = MetastoreUtil.getFactOrDimtableStorageTableName(fact.getName(), storageName); + rangeWriter = ReflectionUtils.newInstance(conf + .getClass(CubeQueryConfUtil.TIME_RANGE_WRITER_CLASS, CubeQueryConfUtil.DEFAULT_TIME_RANGE_WRITER, + TimeRangeWriter.class), conf); + this.processTimePartCol = conf.get(CubeQueryConfUtil.PROCESS_TIME_PART_COL); + String formatStr = conf.get(CubeQueryConfUtil.PART_WHERE_CLAUSE_DATE_FORMAT); + if (formatStr != null) { + this.partWhereClauseFormat = new SimpleDateFormat(formatStr); + } + completenessPartCol = conf.get(CubeQueryConfUtil.COMPLETENESS_CHECK_PART_COL); + client = cubeql.getMetastoreClient(); + completenessThreshold = conf + .getFloat(CubeQueryConfUtil.COMPLETENESS_THRESHOLD, CubeQueryConfUtil.DEFAULT_COMPLETENESS_THRESHOLD); + } + + @Override + public String toHQL() { + return null; + } + + @Override + public QueryAST getQueryAst() { + return null; + } + + @Override + public String getStorageString(String alias) { + return null; + } + + @Override + public AbstractCubeTable getTable() { + return fact; + } + + @Override + public AbstractCubeTable getBaseTable() { + return (AbstractCubeTable) cube; + } + + @Override + public Collection<String> getColumns() { + if (factColumns == null) { + factColumns = fact.getValidColumns(); + if (factColumns == null) { + factColumns = fact.getAllFieldNames(); + } + } + return factColumns; + } + + @Override + public Date getStartTime() { + return fact.getStartTime(); + } + + @Override + public Date getEndTime() { + return fact.getEndTime(); + } + + @Override + public double getCost() { + return fact.weight(); + } + + @Override + public boolean contains(Candidate candidate) { + return this.equals(candidate); + } + + @Override + public Collection<Candidate> getChildren() { + return null; + } + + private void updatePartitionStorage(FactPartition part) throws LensException { + try { + if (client.isStorageTablePartitionACandidate(name, part.getPartSpec()) && (client + .factPartitionExists(fact, part, name))) { + part.getStorageTables().add(name); + part.setFound(true); + } + } catch (HiveException e) { + log.warn("Hive exception while getting storage table partition", e); + } + } + + /** + * Gets FactPartitions for the given fact using the following logic + * + * 1. Find the max update interval that will be used for the query. Lets assume time range is 15 Sep to 15 Dec and the + * fact has two storage with update periods as MONTHLY,DAILY,HOURLY. In this case the data for + * [15 sep - 1 oct)U[1 Dec - 15 Dec) will be answered by DAILY partitions and [1 oct - 1Dec) will be answered by + * MONTHLY partitions. The max interavl for this query will be MONTHLY. + * + * 2.Prune Storgaes that do not fall in the queries time range. + * {@link CubeMetastoreClient#isStorageTableCandidateForRange(String, Date, Date)} + * + * 3. Iterate over max interavl . In out case it will give two months Oct and Nov. Find partitions for these two months. + * Check validity of FactPartitions for Oct and Nov via {@link #updatePartitionStorage(FactPartition)}. + * If the partition is missing, try getting partitions for the time range form other update periods (DAILY,HOURLY).This + * is achieved by calling getPartitions() recursively but passing only 2 update periods (DAILY,HOURLY) + * + * 4.If the monthly partitions are found, check for lookahead partitions and call getPartitions recursively for the + * remaining time intervals i.e, [15 sep - 1 oct) and [1 Dec - 15 Dec) + */ + private boolean getPartitions(Date fromDate, Date toDate, String partCol, Set<FactPartition> partitions, + TreeSet<UpdatePeriod> updatePeriods, boolean addNonExistingParts, boolean failOnPartialData, + PartitionRangesForPartitionColumns missingPartitions) throws LensException { + if (fromDate.equals(toDate) || fromDate.after(toDate)) { + return true; + } + UpdatePeriod interval = CubeFactTable.maxIntervalInRange(fromDate, toDate, updatePeriods); + if (interval == null) { + log.info("No max interval for range: {} to {}", fromDate, toDate); + return false; + } + + if (interval == UpdatePeriod.CONTINUOUS && rangeWriter.getClass().equals(BetweenTimeRangeWriter.class)) { + FactPartition part = new FactPartition(partCol, fromDate, interval, null, partWhereClauseFormat); + partitions.add(part); + part.getStorageTables().add(name); + part = new FactPartition(partCol, toDate, interval, null, partWhereClauseFormat); + partitions.add(part); + part.getStorageTables().add(name); + log.info("Added continuous fact partition for storage table {}", name); + return true; + } + + if (!client.isStorageTableCandidateForRange(name, fromDate, toDate)) { + cubeql.addStoragePruningMsg(this, + new CandidateTablePruneCause(CandidateTablePruneCause.CandidateTablePruneCode.TIME_RANGE_NOT_ANSWERABLE)); + // skipStorageCauses.put(name, new CandidateTablePruneCause.SkipStorageCause(RANGE_NOT_ANSWERABLE)); + return false; + } else if (!client.partColExists(name, partCol)) { + log.info("{} does not exist in {}", partCol, name); + // skipStorageCauses.put(name, CandidateTablePruneCause.SkipStorageCause.partColDoesNotExist(partCol)); + List<String> missingCols = new ArrayList<>(); + missingCols.add(partCol); + cubeql.addStoragePruningMsg(this, partitionColumnsMissing(missingCols)); + return false; + } + + Date ceilFromDate = DateUtil.getCeilDate(fromDate, interval); + Date floorToDate = DateUtil.getFloorDate(toDate, interval); + + int lookAheadNumParts = conf + .getInt(CubeQueryConfUtil.getLookAheadPTPartsKey(interval), CubeQueryConfUtil.DEFAULT_LOOK_AHEAD_PT_PARTS); + + TimeRange.Iterable.Iterator iter = TimeRange.iterable(ceilFromDate, floorToDate, interval, 1).iterator(); + // add partitions from ceilFrom to floorTo + while (iter.hasNext()) { + Date dt = iter.next(); + Date nextDt = iter.peekNext(); + FactPartition part = new FactPartition(partCol, dt, interval, null, partWhereClauseFormat); + updatePartitionStorage(part); + log.debug("Storage tables containing Partition {} are: {}", part, part.getStorageTables()); + if (part.isFound()) { + log.debug("Adding existing partition {}", part); + partitions.add(part); + log.debug("Looking for look ahead process time partitions for {}", part); + if (processTimePartCol == null) { + log.debug("processTimePartCol is null"); + } else if (partCol.equals(processTimePartCol)) { + log.debug("part column is process time col"); + } else if (updatePeriods.first().equals(interval)) { + log.debug("Update period is the least update period"); + } else if ((iter.getNumIters() - iter.getCounter()) > lookAheadNumParts) { + // see if this is the part of the last-n look ahead partitions + log.debug("Not a look ahead partition"); + } else { + log.debug("Looking for look ahead process time partitions for {}", part); + // check if finer partitions are required + // final partitions are required if no partitions from + // look-ahead + // process time are present + TimeRange.Iterable.Iterator processTimeIter = TimeRange.iterable(nextDt, lookAheadNumParts, interval, 1) + .iterator(); + while (processTimeIter.hasNext()) { + Date pdt = processTimeIter.next(); + Date nextPdt = processTimeIter.peekNext(); + FactPartition processTimePartition = new FactPartition(processTimePartCol, pdt, interval, null, + partWhereClauseFormat); + updatePartitionStorage(processTimePartition); + if (processTimePartition.isFound()) { + log.debug("Finer parts not required for look-ahead partition :{}", part); + } else { + log.debug("Looked ahead process time partition {} is not found", processTimePartition); + TreeSet<UpdatePeriod> newset = new TreeSet<UpdatePeriod>(); + newset.addAll(updatePeriods); + newset.remove(interval); + log.debug("newset of update periods:{}", newset); + if (!newset.isEmpty()) { + // Get partitions for look ahead process time + log.debug("Looking for process time partitions between {} and {}", pdt, nextPdt); + Set<FactPartition> processTimeParts = getPartitions( + TimeRange.getBuilder().fromDate(pdt).toDate(nextPdt).partitionColumn(processTimePartCol).build(), + newset, true, false, missingPartitions); + log.debug("Look ahead partitions: {}", processTimeParts); + TimeRange timeRange = TimeRange.getBuilder().fromDate(dt).toDate(nextDt).build(); + for (FactPartition pPart : processTimeParts) { + log.debug("Looking for finer partitions in pPart: {}", pPart); + for (Date date : timeRange.iterable(pPart.getPeriod(), 1)) { + FactPartition innerPart = new FactPartition(partCol, date, pPart.getPeriod(), pPart, + partWhereClauseFormat); + updatePartitionStorage(innerPart); + innerPart.setFound(pPart.isFound()); + if (innerPart.isFound()) { + partitions.add(innerPart); + } + } + log.debug("added all sub partitions blindly in pPart: {}", pPart); + } + } + } + } + } + } else { + log.info("Partition:{} does not exist in any storage table", part); + TreeSet<UpdatePeriod> newset = new TreeSet<>(); + newset.addAll(updatePeriods); + newset.remove(interval); + if (!getPartitions(dt, nextDt, partCol, partitions, newset, false, failOnPartialData, missingPartitions)) { + log.debug("Adding non existing partition {}", part); + if (addNonExistingParts) { + // Add non existing partitions for all cases of whether we populate all non existing or not. + missingPartitions.add(part); + if (!failOnPartialData) { + if (client.isStorageTablePartitionACandidate(name, part.getPartSpec())) { + log.info("Storage tables not eligible"); + return false; + } + partitions.add(part); + part.getStorageTables().add(name); + } + } else { + log.info("No finer granual partitions exist for {}", part); + return false; + } + } else { + log.debug("Finer granual partitions added for {}", part); + } + } + } + return + getPartitions(fromDate, ceilFromDate, partCol, partitions, updatePeriods, addNonExistingParts, failOnPartialData, + missingPartitions) && getPartitions(floorToDate, toDate, partCol, partitions, updatePeriods, + addNonExistingParts, failOnPartialData, missingPartitions); + } + + /** + * Finds all the partitions for a storage table with a particular time range. + * + * @param timeRange : TimeRange to check completeness for. TimeRange consists of start time, end time and the + * partition column + * @param failOnPartialData : fail fast if the candidate can answer the query only partially + * @return Steps: + * 1. Get skip storage causes + * 2. getPartitions for timeRange and validUpdatePeriods + */ + @Override + public boolean evaluateCompleteness(TimeRange timeRange, boolean failOnPartialData) throws LensException { + // Check the measure tags. + if (!evaluateMeasuresCompleteness(timeRange)) { + log + .info("Fact table:{} has partitions with incomplete data: {} for given ranges: {}", fact, incompleteMeasureData, + cubeql.getTimeRanges()); + cubeql.addStoragePruningMsg(this, incompletePartitions(incompleteMeasureData)); + if (failOnPartialData) { + return false; + } + } + PartitionRangesForPartitionColumns missingParts = new PartitionRangesForPartitionColumns(); + PruneCauses<StorageCandidate> storagePruningMsgs = cubeql.getStoragePruningMsgs(); + Set<String> unsupportedTimeDims = Sets.newHashSet(); + Set<String> partColsQueried = Sets.newHashSet(); + partColsQueried.add(timeRange.getPartitionColumn()); + StringBuilder extraWhereClauseFallback = new StringBuilder(); + Set<FactPartition> rangeParts = getPartitions(timeRange, validUpdatePeriods, true, failOnPartialData, missingParts); + String partCol = timeRange.getPartitionColumn(); + boolean partColNotSupported = rangeParts.isEmpty(); + String storageTableName = getStorageName(); + if (storagePruningMsgs.containsKey(storageTableName)) { + List<CandidateTablePruneCause> causes = storagePruningMsgs.get(storageTableName); + // Find the PART_COL_DOES_NOT_EXISTS + for (CandidateTablePruneCause cause : causes) { + if (cause.getCause().equals(CandidateTablePruneCode.PART_COL_DOES_NOT_EXIST)) { + partColNotSupported = cause.getNonExistantPartCols().contains(partCol); + } + } + } + TimeRange prevRange = timeRange; + String sep = ""; + while (rangeParts.isEmpty()) { + String timeDim = cubeql.getBaseCube().getTimeDimOfPartitionColumn(partCol); + if (partColNotSupported && !getFact().getColumns().contains(timeDim)) { + unsupportedTimeDims.add(cubeql.getBaseCube().getTimeDimOfPartitionColumn(timeRange.getPartitionColumn())); + break; + } + TimeRange fallBackRange = getFallbackRange(prevRange, this.getFact().getName(), cubeql); + log.info("No partitions for range:{}. fallback range: {}", timeRange, fallBackRange); + if (fallBackRange == null) { + break; + } + partColsQueried.add(fallBackRange.getPartitionColumn()); + rangeParts = getPartitions(fallBackRange, validUpdatePeriods, true, failOnPartialData, missingParts); + extraWhereClauseFallback.append(sep) + .append(prevRange.toTimeDimWhereClause(cubeql.getAliasForTableName(cubeql.getCube()), timeDim)); + sep = " AND "; + prevRange = fallBackRange; + partCol = prevRange.getPartitionColumn(); + if (!rangeParts.isEmpty()) { + break; + } + } + if (!unsupportedTimeDims.isEmpty()) { + log.info("Not considering fact table:{} as it doesn't support time dimensions: {}", this.getFact(), + unsupportedTimeDims); + cubeql.addStoragePruningMsg(this, timeDimNotSupported(unsupportedTimeDims)); + return false; + } + Set<String> nonExistingParts = missingParts.toSet(partColsQueried); + // TODO union : Relook at this. + nonExistingPartitions.addAll(nonExistingParts); + if (rangeParts.size() == 0 || (failOnPartialData && !nonExistingParts.isEmpty())) { + log.info("No partitions for fallback range:{}", timeRange); + return false; + } + String extraWhere = extraWhereClauseFallback.toString(); + if (!StringUtils.isEmpty(extraWhere)) { + rangeToWhere.put(timeRange, "((" + rangeWriter + .getTimeRangeWhereClause(cubeql, cubeql.getAliasForTableName(cubeql.getCube().getName()), rangeParts) + + ") and (" + extraWhere + "))"); + } else { + rangeToWhere.put(timeRange, rangeWriter + .getTimeRangeWhereClause(cubeql, cubeql.getAliasForTableName(cubeql.getCube().getName()), rangeParts)); + } + // Add all the partitions. storagePartitions contains all the partitions for previous time ranges also. + this.storagePartitions.addAll(rangeParts); + return true; + } + + private boolean evaluateMeasuresCompleteness(TimeRange timeRange) throws LensException { + String factDataCompletenessTag = fact.getDataCompletenessTag(); + if (factDataCompletenessTag == null) { + log.info("Not checking completeness for the fact table:{} as the dataCompletenessTag is not set", fact); + return true; + } + Set<String> measureTag = new HashSet<>(); + Map<String, String> tagToMeasureOrExprMap = new HashMap<>(); + + processMeasuresFromExprMeasures(cubeql, measureTag, tagToMeasureOrExprMap); + + Set<String> measures = cubeql.getQueriedMsrs(); + if (measures == null) { + measures = new HashSet<>(); + } + for (String measure : measures) { + processCubeColForDataCompleteness(cubeql, measure, measure, measureTag, tagToMeasureOrExprMap); + } + //Checking if dataCompletenessTag is set for the fact + if (measureTag.isEmpty()) { + log.info("No Queried measures with the dataCompletenessTag, hence skipping the availability check"); + return true; + } + boolean isDataComplete = false; + DataCompletenessChecker completenessChecker = client.getCompletenessChecker(); + DateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + formatter.setTimeZone(TimeZone.getTimeZone("UTC")); + if (!timeRange.getPartitionColumn().equals(completenessPartCol)) { + log.info("Completeness check not available for partCol:{}", timeRange.getPartitionColumn()); + return true; + } + Date from = timeRange.getFromDate(); + Date to = timeRange.getToDate(); + Map<String, Map<Date, Float>> completenessMap = completenessChecker + .getCompleteness(factDataCompletenessTag, from, to, measureTag); + if (completenessMap != null && !completenessMap.isEmpty()) { + for (Map.Entry<String, Map<Date, Float>> measureCompleteness : completenessMap.entrySet()) { + String tag = measureCompleteness.getKey(); + for (Map.Entry<Date, Float> completenessResult : measureCompleteness.getValue().entrySet()) { + if (completenessResult.getValue() < completenessThreshold) { + log.info("Completeness for the measure_tag {} is {}, threshold: {}, for the hour {}", tag, + completenessResult.getValue(), completenessThreshold, formatter.format(completenessResult.getKey())); + String measureorExprFromTag = tagToMeasureOrExprMap.get(tag); + Map<String, Float> incompletePartition = incompleteMeasureData.get(measureorExprFromTag); + if (incompletePartition == null) { + incompletePartition = new HashMap<>(); + incompleteMeasureData.put(measureorExprFromTag, incompletePartition); + } + incompletePartition.put(formatter.format(completenessResult.getKey()), completenessResult.getValue()); + isDataComplete = true; + } + } + } + } + return isDataComplete; + } + + private Set<FactPartition> getPartitions(TimeRange timeRange, TreeSet<UpdatePeriod> updatePeriods, + boolean addNonExistingParts, boolean failOnPartialData, PartitionRangesForPartitionColumns missingParts) + throws LensException { + Set<FactPartition> partitions = new TreeSet<>(); + if (timeRange != null && timeRange.isCoverableBy(updatePeriods) && getPartitions(timeRange.getFromDate(), + timeRange.getToDate(), timeRange.getPartitionColumn(), partitions, updatePeriods, addNonExistingParts, + failOnPartialData, missingParts)) { + return partitions; + } + return new TreeSet<>(); + } + + @Override + public Set<FactPartition> getParticipatingPartitions() { + return null; + } + + @Override + public boolean isExpressionEvaluable(ExpressionResolver.ExpressionContext expr) { + return expr.isEvaluable(this); + } + + @Override + public boolean equals(Object obj) { + if (super.equals(obj)) { + return true; + } + + if (obj == null || !(obj instanceof StorageCandidate)) { + return false; + } + + StorageCandidate storageCandidateObj = (StorageCandidate) obj; + //Assuming that same instance of cube and fact will be used across StorageCandidate s and hence relying directly + //on == check for these. + return (this.cube == storageCandidateObj.cube && this.fact == storageCandidateObj.fact && this.storageName + .equals(storageCandidateObj.storageName)); + } + + @Override + public int hashCode() { + return this.name.hashCode(); + } + + @Override + public String toString() { + return getName(); + } + + public void addValidUpdatePeriod(UpdatePeriod updatePeriod) { + this.validUpdatePeriods.add(updatePeriod); + } +} http://git-wip-us.apache.org/repos/asf/lens/blob/b6f0cc3d/lens-cube/src/main/java/org/apache/lens/cube/parse/StorageTableResolver.java ---------------------------------------------------------------------- diff --git a/lens-cube/src/main/java/org/apache/lens/cube/parse/StorageTableResolver.java b/lens-cube/src/main/java/org/apache/lens/cube/parse/StorageTableResolver.java index cdf6812..daab851 100644 --- a/lens-cube/src/main/java/org/apache/lens/cube/parse/StorageTableResolver.java +++ b/lens-cube/src/main/java/org/apache/lens/cube/parse/StorageTableResolver.java @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an @@ -18,33 +18,28 @@ */ package org.apache.lens.cube.parse; +import static org.apache.lens.cube.metadata.MetastoreUtil.getFactOrDimtableStorageTableName; +import static org.apache.lens.cube.parse.CandidateTablePruneCause.CandidateTablePruneCode.TIMEDIM_NOT_SUPPORTED; +import static org.apache.lens.cube.parse.CandidateTablePruneCause.CandidateTablePruneCode.TIME_RANGE_NOT_ANSWERABLE; +import static org.apache.lens.cube.parse.CandidateTablePruneCause.noCandidateStorages; +import static org.apache.lens.cube.parse.StorageUtil.getFallbackRange; + import java.text.DateFormat; -import java.text.ParseException; import java.text.SimpleDateFormat; - import java.util.*; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import static org.apache.lens.cube.metadata.DateUtil.WSPACE; -import static org.apache.lens.cube.metadata.MetastoreUtil.*; -import static org.apache.lens.cube.parse.CandidateTablePruneCause.*; -import static org.apache.lens.cube.parse.CandidateTablePruneCause.CandidateTablePruneCode.*; -import static org.apache.lens.cube.parse.CandidateTablePruneCause.SkipStorageCode.*; import org.apache.lens.cube.metadata.*; -import org.apache.lens.cube.parse.CandidateTablePruneCause.*; +import org.apache.lens.cube.parse.CandidateTablePruneCause.CandidateTablePruneCode; +import org.apache.lens.cube.parse.CandidateTablePruneCause.SkipStorageCause; +import org.apache.lens.cube.parse.CandidateTablePruneCause.SkipStorageCode; +import org.apache.lens.cube.parse.CandidateTablePruneCause.SkipUpdatePeriodCode; import org.apache.lens.server.api.error.LensException; -import org.apache.lens.server.api.metastore.*; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.util.ReflectionUtils; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; - import lombok.extern.slf4j.Slf4j; /** @@ -57,36 +52,23 @@ class StorageTableResolver implements ContextRewriter { private final Configuration conf; private final List<String> supportedStorages; private final boolean allStoragesSupported; - CubeMetastoreClient client; private final boolean failOnPartialData; private final List<String> validDimTables; private final Map<CubeFactTable, Map<UpdatePeriod, Set<String>>> validStorageMap = new HashMap<>(); - private String processTimePartCol = null; private final UpdatePeriod maxInterval; + // TODO union : Remove this. All partitions are stored in the StorageCandidate. private final Map<String, Set<String>> nonExistingPartitions = new HashMap<>(); + CubeMetastoreClient client; + Map<String, List<String>> storagePartMap = new HashMap<String, List<String>>(); + private String processTimePartCol = null; private TimeRangeWriter rangeWriter; private DateFormat partWhereClauseFormat = null; private PHASE phase; + // TODO union : we do not need this. Remove the storage candidate private HashMap<CubeFactTable, Map<String, SkipStorageCause>> skipStorageCausesPerFact; private float completenessThreshold; private String completenessPartCol; - enum PHASE { - FACT_TABLES, FACT_PARTITIONS, DIM_TABLE_AND_PARTITIONS; - - static PHASE first() { - return values()[0]; - } - - static PHASE last() { - return values()[values().length - 1]; - } - - PHASE next() { - return values()[(this.ordinal() + 1) % values().length]; - } - } - public StorageTableResolver(Configuration conf) { this.conf = conf; this.supportedStorages = getSupportedStorages(conf); @@ -101,16 +83,16 @@ class StorageTableResolver implements ContextRewriter { } else { this.maxInterval = null; } - rangeWriter = - ReflectionUtils.newInstance(conf.getClass(CubeQueryConfUtil.TIME_RANGE_WRITER_CLASS, - CubeQueryConfUtil.DEFAULT_TIME_RANGE_WRITER, TimeRangeWriter.class), this.conf); + rangeWriter = ReflectionUtils.newInstance(conf + .getClass(CubeQueryConfUtil.TIME_RANGE_WRITER_CLASS, CubeQueryConfUtil.DEFAULT_TIME_RANGE_WRITER, + TimeRangeWriter.class), this.conf); String formatStr = conf.get(CubeQueryConfUtil.PART_WHERE_CLAUSE_DATE_FORMAT); if (formatStr != null) { partWhereClauseFormat = new SimpleDateFormat(formatStr); } this.phase = PHASE.first(); - completenessThreshold = conf.getFloat(CubeQueryConfUtil.COMPLETENESS_THRESHOLD, - CubeQueryConfUtil.DEFAULT_COMPLETENESS_THRESHOLD); + completenessThreshold = conf + .getFloat(CubeQueryConfUtil.COMPLETENESS_THRESHOLD, CubeQueryConfUtil.DEFAULT_COMPLETENESS_THRESHOLD); completenessPartCol = conf.get(CubeQueryConfUtil.COMPLETENESS_CHECK_PART_COL); } @@ -122,36 +104,23 @@ class StorageTableResolver implements ContextRewriter { return null; } - public boolean isStorageSupported(String storage) { + public boolean isStorageSupportedOnDriver(String storage) { return allStoragesSupported || supportedStorages.contains(storage); } - Map<String, List<String>> storagePartMap = new HashMap<String, List<String>>(); - @Override public void rewriteContext(CubeQueryContext cubeql) throws LensException { client = cubeql.getMetastoreClient(); switch (phase) { - case FACT_TABLES: - if (!cubeql.getCandidateFacts().isEmpty()) { - // resolve storage table names - resolveFactStorageTableNames(cubeql); + case STORAGE_TABLES: + if (!cubeql.getCandidates().isEmpty()) { + resolveStorageTable(cubeql); } - cubeql.pruneCandidateFactSet(CandidateTablePruneCode.NO_CANDIDATE_STORAGES); break; - case FACT_PARTITIONS: - if (!cubeql.getCandidateFacts().isEmpty()) { - // resolve storage partitions - resolveFactStoragePartitions(cubeql); - } - cubeql.pruneCandidateFactSet(CandidateTablePruneCode.NO_CANDIDATE_STORAGES); - if (client != null && client.isDataCompletenessCheckEnabled()) { - if (!cubeql.getCandidateFacts().isEmpty()) { - // resolve incomplete fact partition - resolveFactCompleteness(cubeql); - } - cubeql.pruneCandidateFactSet(CandidateTablePruneCode.INCOMPLETE_PARTITION); + case STORAGE_PARTITIONS: + if (!cubeql.getCandidates().isEmpty()) { + resolveStoragePartitions(cubeql); } break; case DIM_TABLE_AND_PARTITIONS: @@ -162,13 +131,32 @@ class StorageTableResolver implements ContextRewriter { cubeql.getAutoJoinCtx().pruneAllPathsForCandidateDims(cubeql.getCandidateDimTables()); cubeql.getAutoJoinCtx().refreshJoinPathColumns(); } + // TODO union : What is this? We may not need this as it non existing partitions are stored in StorageCandidate + cubeql.setNonexistingParts(nonExistingPartitions); break; } - //Doing this on all three phases. Keep updating cubeql with the current identified missing partitions. - cubeql.setNonexistingParts(nonExistingPartitions); phase = phase.next(); } + /** + * Each candidate in the set is a complex candidate. We will evaluate each one to get + * all the partitions needed to answer the query. + * + * @param cubeql + */ + private void resolveStoragePartitions(CubeQueryContext cubeql) throws LensException { + Set<Candidate> candidateList = cubeql.getCandidates(); + for (Candidate candidate : candidateList) { + boolean isComplete = true; + for (TimeRange range : cubeql.getTimeRanges()) { + isComplete &= candidate.evaluateCompleteness(range, failOnPartialData); + } + if (!isComplete) { + // TODO union : Prune this candidate? + } + } + } + private void resolveDimStorageTablesAndPartitions(CubeQueryContext cubeql) throws LensException { Set<Dimension> allDims = new HashSet<Dimension>(cubeql.getDimensions()); for (Aliased<Dimension> dim : cubeql.getOptionalDimensions()) { @@ -184,8 +172,8 @@ class StorageTableResolver implements ContextRewriter { CandidateDim candidate = i.next(); CubeDimensionTable dimtable = candidate.dimtable; if (dimtable.getStorages().isEmpty()) { - cubeql.addDimPruningMsgs(dim, dimtable, new CandidateTablePruneCause( - CandidateTablePruneCode.MISSING_STORAGES)); + cubeql + .addDimPruningMsgs(dim, dimtable, new CandidateTablePruneCause(CandidateTablePruneCode.MISSING_STORAGES)); i.remove(); continue; } @@ -194,7 +182,7 @@ class StorageTableResolver implements ContextRewriter { boolean foundPart = false; Map<String, SkipStorageCause> skipStorageCauses = new HashMap<>(); for (String storage : dimtable.getStorages()) { - if (isStorageSupported(storage)) { + if (isStorageSupportedOnDriver(storage)) { String tableName = getFactOrDimtableStorageTableName(dimtable.getName(), storage).toLowerCase(); if (validDimTables != null && !validDimTables.contains(tableName)) { log.info("Not considering dim storage table:{} as it is not a valid dim storage", tableName); @@ -212,9 +200,8 @@ class StorageTableResolver implements ContextRewriter { } if (!failOnPartialData || foundPart) { storageTables.add(tableName); - String whereClause = - StorageUtil.getWherePartClause(dim.getTimedDimension(), null, - StorageConstants.getPartitionsForLatest()); + String whereClause = StorageUtil + .getWherePartClause(dim.getTimedDimension(), null, StorageConstants.getPartitionsForLatest()); whereClauses.put(tableName, whereClause); } else { log.info("Not considering dim storage table:{} as no dim partitions exist", tableName); @@ -239,78 +226,115 @@ class StorageTableResolver implements ContextRewriter { continue; } // pick the first storage table - candidate.setStorageTable(storageTables.iterator().next()); - candidate.setWhereClause(whereClauses.get(candidate.getStorageTable())); + candidate.setStorageName(storageTables.iterator().next()); + candidate.setWhereClause(whereClauses.get(candidate.getStorageName())); } } } - // Resolves all the storage table names, which are valid for each updatePeriod - private void resolveFactStorageTableNames(CubeQueryContext cubeql) throws LensException { - Iterator<CandidateFact> i = cubeql.getCandidateFacts().iterator(); - skipStorageCausesPerFact = new HashMap<>(); - while (i.hasNext()) { - CubeFactTable fact = i.next().fact; - if (fact.getUpdatePeriods().isEmpty()) { - cubeql.addFactPruningMsgs(fact, new CandidateTablePruneCause(CandidateTablePruneCode.MISSING_STORAGES)); - i.remove(); + /** + * Following storages are removed: + * 1. The storage is not supported by driver. + * 2. The storage is not in the valid storage list. + * 3. The storage is not in any time range in the query. + * 4. The storage having no valid update period. + * + * This method also creates a list of valid update periods and stores them into {@link StorageCandidate}. + * + * TODO union : Do fourth point before 3. + */ + private void resolveStorageTable(CubeQueryContext cubeql) throws LensException { + Iterator<Candidate> it = cubeql.getCandidates().iterator(); + while (it.hasNext()) { + Candidate c = it.next(); + assert (c instanceof StorageCandidate); + StorageCandidate sc = (StorageCandidate) c; + String storageTable = sc.getStorageName(); + if (!isStorageSupportedOnDriver(storageTable)) { + log.info("Skipping storage: {} as it is not supported", storageTable); + cubeql.addStoragePruningMsg(sc, new CandidateTablePruneCause(CandidateTablePruneCode.UNSUPPORTED_STORAGE)); + it.remove(); + continue; + } + String str = conf.get(CubeQueryConfUtil.getValidStorageTablesKey(sc.getFact().getName())); + List<String> validFactStorageTables = StringUtils.isBlank(str) + ? null + : Arrays.asList(StringUtils.split(str.toLowerCase(), ",")); + // Check if storagetable is in the list of valid storages. + if (validFactStorageTables != null && !validFactStorageTables.contains(storageTable)) { + log.info("Skipping storage table {} as it is not valid", storageTable); + cubeql.addStoragePruningMsg(sc, new CandidateTablePruneCause(CandidateTablePruneCode.INVALID_STORAGE)); + it.remove(); continue; } - Map<UpdatePeriod, Set<String>> storageTableMap = new TreeMap<UpdatePeriod, Set<String>>(); - validStorageMap.put(fact, storageTableMap); - String str = conf.get(CubeQueryConfUtil.getValidStorageTablesKey(fact.getName())); - List<String> validFactStorageTables = - StringUtils.isBlank(str) ? null : Arrays.asList(StringUtils.split(str.toLowerCase(), ",")); - Map<String, SkipStorageCause> skipStorageCauses = new HashMap<>(); - for (Map.Entry<String, Set<UpdatePeriod>> entry : fact.getUpdatePeriods().entrySet()) { - String storage = entry.getKey(); - // skip storages that are not supported - if (!isStorageSupported(storage)) { - log.info("Skipping storage: {} as it is not supported", storage); - skipStorageCauses.put(storage, new SkipStorageCause(SkipStorageCode.UNSUPPORTED)); - continue; + boolean valid = false; + Set<CandidateTablePruneCause.CandidateTablePruneCode> codes = new HashSet<>(); + for (TimeRange range : cubeql.getTimeRanges()) { + boolean columnInRange = client + .isStorageTableCandidateForRange(storageTable, range.getFromDate(), range.getToDate()); + boolean partitionColumnExists = client.partColExists(storageTable, range.getPartitionColumn()); + valid = columnInRange && partitionColumnExists; + if (valid) { + break; } - String table = getStorageTableName(fact, storage, validFactStorageTables); - // skip the update period if the storage is not valid - if (table == null) { - skipStorageCauses.put(storage, new SkipStorageCause(SkipStorageCode.INVALID)); + if (!columnInRange) { + codes.add(TIME_RANGE_NOT_ANSWERABLE); continue; } - List<String> validUpdatePeriods = - CubeQueryConfUtil.getStringList(conf, CubeQueryConfUtil.getValidUpdatePeriodsKey(fact.getName(), storage)); - - boolean isStorageAdded = false; - Map<String, SkipUpdatePeriodCode> skipUpdatePeriodCauses = new HashMap<String, SkipUpdatePeriodCode>(); - for (UpdatePeriod updatePeriod : entry.getValue()) { - if (maxInterval != null && updatePeriod.compareTo(maxInterval) > 0) { - log.info("Skipping update period {} for fact {}", updatePeriod, fact); - skipUpdatePeriodCauses.put(updatePeriod.toString(), SkipUpdatePeriodCode.QUERY_INTERVAL_BIGGER); + // This means fallback is required. + if (!partitionColumnExists) { + String timeDim = cubeql.getBaseCube().getTimeDimOfPartitionColumn(range.getPartitionColumn()); + if (!sc.getFact().getColumns().contains(timeDim)) { + // Not a time dimension so no fallback required. + codes.add(TIMEDIM_NOT_SUPPORTED); continue; } - if (validUpdatePeriods != null && !validUpdatePeriods.contains(updatePeriod.name().toLowerCase())) { - log.info("Skipping update period {} for fact {} for storage {}", updatePeriod, fact, storage); - skipUpdatePeriodCauses.put(updatePeriod.toString(), SkipUpdatePeriodCode.INVALID); + TimeRange fallBackRange = getFallbackRange(range, sc.getFact().getCubeName(), cubeql); + if (fallBackRange == null) { + log.info("No partitions for range:{}. fallback range: {}", range, fallBackRange); continue; } - Set<String> storageTables = storageTableMap.get(updatePeriod); - if (storageTables == null) { - storageTables = new LinkedHashSet<>(); - storageTableMap.put(updatePeriod, storageTables); + valid = client + .isStorageTableCandidateForRange(storageTable, fallBackRange.getFromDate(), fallBackRange.getToDate()); + if (valid) { + break; + } else { + codes.add(TIME_RANGE_NOT_ANSWERABLE); } - isStorageAdded = true; - log.debug("Adding storage table:{} for fact:{} for update period {}", table, fact, updatePeriod); - storageTables.add(table); } - if (!isStorageAdded) { - skipStorageCauses.put(storage, SkipStorageCause.noCandidateUpdatePeriod(skipUpdatePeriodCauses)); + } + if (!valid) { + it.remove(); + for (CandidateTablePruneCode code : codes) { + cubeql.addStoragePruningMsg(sc, new CandidateTablePruneCause(code)); + } + continue; + } + + List<String> validUpdatePeriods = CubeQueryConfUtil + .getStringList(conf, CubeQueryConfUtil.getValidUpdatePeriodsKey(sc.getFact().getName(), storageTable)); + boolean isStorageAdded = false; + Map<String, SkipUpdatePeriodCode> skipUpdatePeriodCauses = new HashMap<>(); + + // Check for update period. + for (UpdatePeriod updatePeriod : sc.getFact().getUpdatePeriods().get(storageTable)) { + if (maxInterval != null && updatePeriod.compareTo(maxInterval) > 0) { + log.info("Skipping update period {} for fact {}", updatePeriod, sc.getFact()); + skipUpdatePeriodCauses.put(updatePeriod.toString(), SkipUpdatePeriodCode.QUERY_INTERVAL_BIGGER); + continue; + } + if (validUpdatePeriods != null && !validUpdatePeriods.contains(updatePeriod.name().toLowerCase())) { + log.info("Skipping update period {} for fact {} for storage {}", updatePeriod, sc.getFact(), storageTable); + skipUpdatePeriodCauses.put(updatePeriod.toString(), SkipUpdatePeriodCode.INVALID); + continue; } + isStorageAdded = true; + sc.addValidUpdatePeriod(updatePeriod); } - skipStorageCausesPerFact.put(fact, skipStorageCauses); - if (storageTableMap.isEmpty()) { - log.info("Not considering fact table:{} as it does not have any storage tables", fact); - cubeql.addFactPruningMsgs(fact, noCandidateStorages(skipStorageCauses)); - i.remove(); + if (!isStorageAdded) { + cubeql.addStoragePruningMsg(sc, CandidateTablePruneCause.updatePeriodsRejected(skipUpdatePeriodCauses)); + it.remove(); } } } @@ -321,7 +345,7 @@ class StorageTableResolver implements ContextRewriter { return set; } - String getStorageTableName(CubeFactTable fact, String storage, List<String> validFactStorageTables) { + private String getStorageTableName(CubeFactTable fact, String storage, List<String> validFactStorageTables) { String tableName = getFactOrDimtableStorageTableName(fact.getName(), storage).toLowerCase(); if (validFactStorageTables != null && !validFactStorageTables.contains(tableName)) { log.info("Skipping storage table {} as it is not valid", tableName); @@ -330,507 +354,12 @@ class StorageTableResolver implements ContextRewriter { return tableName; } - private TimeRange getFallbackRange(TimeRange range, CandidateFact cfact, CubeQueryContext cubeql) - throws LensException { - Cube baseCube = cubeql.getBaseCube(); - ArrayList<String> tableNames = Lists.newArrayList(cfact.fact.getName(), cubeql.getCube().getName()); - if (!cubeql.getCube().getName().equals(baseCube.getName())) { - tableNames.add(baseCube.getName()); - } - String fallBackString = null; - String timedim = baseCube.getTimeDimOfPartitionColumn(range.getPartitionColumn()); - for (String tableName : tableNames) { - fallBackString = cubeql.getMetastoreClient().getTable(tableName).getParameters() - .get(MetastoreConstants.TIMEDIM_RELATION + timedim); - if (StringUtils.isNotBlank(fallBackString)) { - break; - } - } - if (StringUtils.isBlank(fallBackString)) { - return null; - } - Matcher matcher = Pattern.compile("(.*?)\\+\\[(.*?),(.*?)\\]").matcher(fallBackString.replaceAll(WSPACE, "")); - if (!matcher.matches()) { - return null; - } - DateUtil.TimeDiff diff1 = DateUtil.TimeDiff.parseFrom(matcher.group(2).trim()); - DateUtil.TimeDiff diff2 = DateUtil.TimeDiff.parseFrom(matcher.group(3).trim()); - String relatedTimeDim = matcher.group(1).trim(); - String fallbackPartCol = baseCube.getPartitionColumnOfTimeDim(relatedTimeDim); - return TimeRange.getBuilder() - .fromDate(diff2.negativeOffsetFrom(range.getFromDate())) - .toDate(diff1.negativeOffsetFrom(range.getToDate())) - .partitionColumn(fallbackPartCol).build(); - } - - private void resolveFactStoragePartitions(CubeQueryContext cubeql) throws LensException { - // Find candidate tables wrt supported storages - Iterator<CandidateFact> i = cubeql.getCandidateFacts().iterator(); - while (i.hasNext()) { - CandidateFact cfact = i.next(); - Map<TimeRange, String> whereClauseForFallback = new LinkedHashMap<TimeRange, String>(); - List<FactPartition> answeringParts = new ArrayList<>(); - Map<String, SkipStorageCause> skipStorageCauses = skipStorageCausesPerFact.get(cfact.fact); - if (skipStorageCauses == null) { - skipStorageCauses = new HashMap<>(); - } - PartitionRangesForPartitionColumns missingParts = new PartitionRangesForPartitionColumns(); - boolean noPartsForRange = false; - Set<String> unsupportedTimeDims = Sets.newHashSet(); - Set<String> partColsQueried = Sets.newHashSet(); - for (TimeRange range : cubeql.getTimeRanges()) { - partColsQueried.add(range.getPartitionColumn()); - StringBuilder extraWhereClause = new StringBuilder(); - Set<FactPartition> rangeParts = getPartitions(cfact.fact, range, skipStorageCauses, missingParts); - // If no partitions were found, then we'll fallback. - String partCol = range.getPartitionColumn(); - boolean partColNotSupported = rangeParts.isEmpty(); - for (String storage : cfact.fact.getStorages()) { - String storageTableName = getFactOrDimtableStorageTableName(cfact.fact.getName(), storage).toLowerCase(); - partColNotSupported &= skipStorageCauses.containsKey(storageTableName) - && skipStorageCauses.get(storageTableName).getCause().equals(PART_COL_DOES_NOT_EXIST) - && skipStorageCauses.get(storageTableName).getNonExistantPartCols().contains(partCol); - } - TimeRange prevRange = range; - String sep = ""; - while (rangeParts.isEmpty()) { - // TODO: should we add a condition whether on range's partcol any missing partitions are not there - String timeDim = cubeql.getBaseCube().getTimeDimOfPartitionColumn(partCol); - if (partColNotSupported && !cfact.getColumns().contains(timeDim)) { - unsupportedTimeDims.add(cubeql.getBaseCube().getTimeDimOfPartitionColumn(range.getPartitionColumn())); - break; - } - TimeRange fallBackRange = getFallbackRange(prevRange, cfact, cubeql); - log.info("No partitions for range:{}. fallback range: {}", range, fallBackRange); - if (fallBackRange == null) { - break; - } - partColsQueried.add(fallBackRange.getPartitionColumn()); - rangeParts = getPartitions(cfact.fact, fallBackRange, skipStorageCauses, missingParts); - extraWhereClause.append(sep) - .append(prevRange.toTimeDimWhereClause(cubeql.getAliasForTableName(cubeql.getCube()), timeDim)); - sep = " AND "; - prevRange = fallBackRange; - partCol = prevRange.getPartitionColumn(); - if (!rangeParts.isEmpty()) { - break; - } - } - whereClauseForFallback.put(range, extraWhereClause.toString()); - if (rangeParts.isEmpty()) { - log.info("No partitions for fallback range:{}", range); - noPartsForRange = true; - continue; - } - // If multiple storage tables are part of the same fact, - // capture range->storage->partitions - Map<String, LinkedHashSet<FactPartition>> tablePartMap = new HashMap<String, LinkedHashSet<FactPartition>>(); - for (FactPartition factPart : rangeParts) { - for (String table : factPart.getStorageTables()) { - if (!tablePartMap.containsKey(table)) { - tablePartMap.put(table, new LinkedHashSet<>(Collections.singletonList(factPart))); - } else { - LinkedHashSet<FactPartition> storagePart = tablePartMap.get(table); - storagePart.add(factPart); - } - } - } - cfact.getRangeToStoragePartMap().put(range, tablePartMap); - cfact.incrementPartsQueried(rangeParts.size()); - answeringParts.addAll(rangeParts); - cfact.getPartsQueried().addAll(rangeParts); - } - if (!unsupportedTimeDims.isEmpty()) { - log.info("Not considering fact table:{} as it doesn't support time dimensions: {}", cfact.fact, - unsupportedTimeDims); - cubeql.addFactPruningMsgs(cfact.fact, timeDimNotSupported(unsupportedTimeDims)); - i.remove(); - continue; - } - Set<String> nonExistingParts = missingParts.toSet(partColsQueried); - if (!nonExistingParts.isEmpty()) { - addNonExistingParts(cfact.fact.getName(), nonExistingParts); - } - if (cfact.getNumQueriedParts() == 0 || (failOnPartialData && (noPartsForRange || !nonExistingParts.isEmpty()))) { - log.info("Not considering fact table:{} as it could not find partition for given ranges: {}", cfact.fact, - cubeql.getTimeRanges()); - /* - * This fact is getting discarded because of any of following reasons: - * 1. Has missing partitions - * 2. All Storage tables were skipped for some reasons. - * 3. Storage tables do not have the update period for the timerange queried. - */ - if (failOnPartialData && !nonExistingParts.isEmpty()) { - cubeql.addFactPruningMsgs(cfact.fact, missingPartitions(nonExistingParts)); - } else if (!skipStorageCauses.isEmpty()) { - CandidateTablePruneCause cause = noCandidateStorages(skipStorageCauses); - cubeql.addFactPruningMsgs(cfact.fact, cause); - } else { - CandidateTablePruneCause cause = - new CandidateTablePruneCause(NO_FACT_UPDATE_PERIODS_FOR_GIVEN_RANGE); - cubeql.addFactPruningMsgs(cfact.fact, cause); - } - i.remove(); - continue; - } - // Map from storage to covering parts - Map<String, Set<FactPartition>> minimalStorageTables = new LinkedHashMap<String, Set<FactPartition>>(); - StorageUtil.getMinimalAnsweringTables(answeringParts, minimalStorageTables); - if (minimalStorageTables.isEmpty()) { - log.info("Not considering fact table:{} as it does not have any storage tables", cfact); - cubeql.addFactPruningMsgs(cfact.fact, noCandidateStorages(skipStorageCauses)); - i.remove(); - continue; - } - Set<String> storageTables = new LinkedHashSet<>(); - storageTables.addAll(minimalStorageTables.keySet()); - cfact.setStorageTables(storageTables); - // Update range->storage->partitions with time range where clause - for (TimeRange trange : cfact.getRangeToStoragePartMap().keySet()) { - Map<String, String> rangeToWhere = new HashMap<>(); - for (Map.Entry<String, Set<FactPartition>> entry : minimalStorageTables.entrySet()) { - String table = entry.getKey(); - Set<FactPartition> minimalParts = entry.getValue(); - - LinkedHashSet<FactPartition> rangeParts = cfact.getRangeToStoragePartMap().get(trange).get(table); - LinkedHashSet<FactPartition> minimalPartsCopy = Sets.newLinkedHashSet(); - - if (rangeParts != null) { - minimalPartsCopy.addAll(minimalParts); - minimalPartsCopy.retainAll(rangeParts); - } - if (!StringUtils.isEmpty(whereClauseForFallback.get(trange))) { - rangeToWhere.put(table, "((" - + rangeWriter.getTimeRangeWhereClause(cubeql, cubeql.getAliasForTableName(cubeql.getCube().getName()), - minimalPartsCopy) + ") and (" + whereClauseForFallback.get(trange) + "))"); - } else { - rangeToWhere.put(table, rangeWriter.getTimeRangeWhereClause(cubeql, - cubeql.getAliasForTableName(cubeql.getCube().getName()), minimalPartsCopy)); - } - } - cfact.getRangeToStorageWhereMap().put(trange, rangeToWhere); - } - log.info("Resolved partitions for fact {}: {} storageTables:{}", cfact, answeringParts, storageTables); - } - } - - private static boolean processCubeColForDataCompleteness(CubeQueryContext cubeql, String cubeCol, String alias, - Set<String> measureTag, - Map<String, String> tagToMeasureOrExprMap) { - CubeMeasure column = cubeql.getCube().getMeasureByName(cubeCol); - if (column != null && column.getTags() != null) { - String dataCompletenessTag = column.getTags().get(MetastoreConstants.MEASURE_DATACOMPLETENESS_TAG); - //Checking if dataCompletenessTag is set for queried measure - if (dataCompletenessTag != null) { - measureTag.add(dataCompletenessTag); - String value = tagToMeasureOrExprMap.get(dataCompletenessTag); - if (value == null) { - tagToMeasureOrExprMap.put(dataCompletenessTag, alias); - } else { - value = value.concat(",").concat(alias); - tagToMeasureOrExprMap.put(dataCompletenessTag, value); - } - return true; - } - } - return false; - } - - private static void processMeasuresFromExprMeasures(CubeQueryContext cubeql, Set<String> measureTag, - Map<String, String> tagToMeasureOrExprMap) { - boolean isExprProcessed; - String cubeAlias = cubeql.getAliasForTableName(cubeql.getCube().getName()); - for (String expr : cubeql.getQueriedExprsWithMeasures()) { - isExprProcessed = false; - for (ExpressionResolver.ExprSpecContext esc : cubeql.getExprCtx().getExpressionContext(expr, cubeAlias) - .getAllExprs()) { - if (esc.getTblAliasToColumns().get(cubeAlias) != null) { - for (String cubeCol : esc.getTblAliasToColumns().get(cubeAlias)) { - if (processCubeColForDataCompleteness(cubeql, cubeCol, expr, measureTag, tagToMeasureOrExprMap)) { - /* This is done to associate the expression with one of the dataCompletenessTag for the measures. - So, even if the expression is composed of measures with different dataCompletenessTags, we will be - determining the dataCompleteness from one of the measure and this expression is grouped with the - other queried measures that have the same dataCompletenessTag. */ - isExprProcessed = true; - break; - } - } - } - if (isExprProcessed) { - break; - } - } - } - } - - private void resolveFactCompleteness(CubeQueryContext cubeql) throws LensException { - if (client == null || client.getCompletenessChecker() == null || completenessPartCol == null) { - return; - } - DataCompletenessChecker completenessChecker = client.getCompletenessChecker(); - Set<String> measureTag = new HashSet<>(); - Map<String, String> tagToMeasureOrExprMap = new HashMap<>(); - - processMeasuresFromExprMeasures(cubeql, measureTag, tagToMeasureOrExprMap); - - Set<String> measures = cubeql.getQueriedMsrs(); - if (measures == null) { - measures = new HashSet<>(); - } - for (String measure : measures) { - processCubeColForDataCompleteness(cubeql, measure, measure, measureTag, tagToMeasureOrExprMap); - } - //Checking if dataCompletenessTag is set for the fact - if (measureTag.isEmpty()) { - log.info("No Queried measures with the dataCompletenessTag, hence skipping the availability check"); - return; - } - Iterator<CandidateFact> i = cubeql.getCandidateFacts().iterator(); - DateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); - formatter.setTimeZone(TimeZone.getTimeZone("UTC")); - while (i.hasNext()) { - CandidateFact cFact = i.next(); - // Map from measure to the map from partition to %completeness - Map<String, Map<String, Float>> incompleteMeasureData = new HashMap<>(); - - String factDataCompletenessTag = cFact.fact.getDataCompletenessTag(); - if (factDataCompletenessTag == null) { - log.info("Not checking completeness for the fact table:{} as the dataCompletenessTag is not set", cFact.fact); - continue; - } - boolean isFactDataIncomplete = false; - for (TimeRange range : cubeql.getTimeRanges()) { - if (!range.getPartitionColumn().equals(completenessPartCol)) { - log.info("Completeness check not available for partCol:{}", range.getPartitionColumn()); - continue; - } - Date from = range.getFromDate(); - Date to = range.getToDate(); - Map<String, Map<Date, Float>> completenessMap = completenessChecker.getCompleteness(factDataCompletenessTag, - from, to, measureTag); - if (completenessMap != null && !completenessMap.isEmpty()) { - for (Map.Entry<String, Map<Date, Float>> measureCompleteness : completenessMap.entrySet()) { - String tag = measureCompleteness.getKey(); - for (Map.Entry<Date, Float> completenessResult : measureCompleteness.getValue().entrySet()) { - if (completenessResult.getValue() < completenessThreshold) { - log.info("Completeness for the measure_tag {} is {}, threshold: {}, for the hour {}", tag, - completenessResult.getValue(), completenessThreshold, - formatter.format(completenessResult.getKey())); - String measureorExprFromTag = tagToMeasureOrExprMap.get(tag); - Map<String, Float> incompletePartition = incompleteMeasureData.get(measureorExprFromTag); - if (incompletePartition == null) { - incompletePartition = new HashMap<>(); - incompleteMeasureData.put(measureorExprFromTag, incompletePartition); - } - incompletePartition.put(formatter.format(completenessResult.getKey()), completenessResult.getValue()); - isFactDataIncomplete = true; - } - } - } - } - } - if (isFactDataIncomplete) { - log.info("Fact table:{} has partitions with incomplete data: {} for given ranges: {}", cFact.fact, - incompleteMeasureData, cubeql.getTimeRanges()); - if (failOnPartialData) { - i.remove(); - cubeql.addFactPruningMsgs(cFact.fact, incompletePartitions(incompleteMeasureData)); - } else { - cFact.setDataCompletenessMap(incompleteMeasureData); - } - } - } - } - void addNonExistingParts(String name, Set<String> nonExistingParts) { nonExistingPartitions.put(name, nonExistingParts); } - private Set<FactPartition> getPartitions(CubeFactTable fact, TimeRange range, - Map<String, SkipStorageCause> skipStorageCauses, - PartitionRangesForPartitionColumns missingPartitions) throws LensException { - try { - return getPartitions(fact, range, getValidUpdatePeriods(fact), true, failOnPartialData, skipStorageCauses, - missingPartitions); - } catch (Exception e) { - throw new LensException(e); - } - } - - private Set<FactPartition> getPartitions(CubeFactTable fact, TimeRange range, TreeSet<UpdatePeriod> updatePeriods, - boolean addNonExistingParts, boolean failOnPartialData, Map<String, SkipStorageCause> skipStorageCauses, - PartitionRangesForPartitionColumns missingPartitions) - throws Exception { - Set<FactPartition> partitions = new TreeSet<>(); - if (range != null && range.isCoverableBy(updatePeriods) - && getPartitions(fact, range.getFromDate(), range.getToDate(), range.getPartitionColumn(), partitions, - updatePeriods, addNonExistingParts, failOnPartialData, skipStorageCauses, missingPartitions)) { - return partitions; - } else { - return new TreeSet<>(); - } - } - - private boolean getPartitions(CubeFactTable fact, Date fromDate, Date toDate, String partCol, - Set<FactPartition> partitions, TreeSet<UpdatePeriod> updatePeriods, - boolean addNonExistingParts, boolean failOnPartialData, Map<String, SkipStorageCause> skipStorageCauses, - PartitionRangesForPartitionColumns missingPartitions) - throws Exception { - log.info("getPartitions for {} from fromDate:{} toDate:{}", fact, fromDate, toDate); - if (fromDate.equals(toDate) || fromDate.after(toDate)) { - return true; - } - UpdatePeriod interval = CubeFactTable.maxIntervalInRange(fromDate, toDate, updatePeriods); - if (interval == null) { - log.info("No max interval for range: {} to {}", fromDate, toDate); - return false; - } - log.debug("Max interval for {} is: {}", fact, interval); - Set<String> storageTbls = new LinkedHashSet<String>(); - storageTbls.addAll(validStorageMap.get(fact).get(interval)); - - if (interval == UpdatePeriod.CONTINUOUS && rangeWriter.getClass().equals(BetweenTimeRangeWriter.class)) { - for (String storageTbl : storageTbls) { - FactPartition part = new FactPartition(partCol, fromDate, interval, null, partWhereClauseFormat); - partitions.add(part); - part.getStorageTables().add(storageTbl); - part = new FactPartition(partCol, toDate, interval, null, partWhereClauseFormat); - partitions.add(part); - part.getStorageTables().add(storageTbl); - log.info("Added continuous fact partition for storage table {}", storageTbl); - } - return true; - } - - Iterator<String> it = storageTbls.iterator(); - while (it.hasNext()) { - String storageTableName = it.next(); - if (!client.isStorageTableCandidateForRange(storageTableName, fromDate, toDate)) { - skipStorageCauses.put(storageTableName, new SkipStorageCause(RANGE_NOT_ANSWERABLE)); - it.remove(); - } else if (!client.partColExists(storageTableName, partCol)) { - log.info("{} does not exist in {}", partCol, storageTableName); - skipStorageCauses.put(storageTableName, SkipStorageCause.partColDoesNotExist(partCol)); - it.remove(); - } - } - - if (storageTbls.isEmpty()) { - return false; - } - Date ceilFromDate = DateUtil.getCeilDate(fromDate, interval); - Date floorToDate = DateUtil.getFloorDate(toDate, interval); - - int lookAheadNumParts = - conf.getInt(CubeQueryConfUtil.getLookAheadPTPartsKey(interval), CubeQueryConfUtil.DEFAULT_LOOK_AHEAD_PT_PARTS); - - TimeRange.Iterable.Iterator iter = TimeRange.iterable(ceilFromDate, floorToDate, interval, 1).iterator(); - // add partitions from ceilFrom to floorTo - while (iter.hasNext()) { - Date dt = iter.next(); - Date nextDt = iter.peekNext(); - FactPartition part = new FactPartition(partCol, dt, interval, null, partWhereClauseFormat); - log.debug("candidate storage tables for searching partitions: {}", storageTbls); - updateFactPartitionStorageTablesFrom(fact, part, storageTbls); - log.debug("Storage tables containing Partition {} are: {}", part, part.getStorageTables()); - if (part.isFound()) { - log.debug("Adding existing partition {}", part); - partitions.add(part); - log.debug("Looking for look ahead process time partitions for {}", part); - if (processTimePartCol == null) { - log.debug("processTimePartCol is null"); - } else if (partCol.equals(processTimePartCol)) { - log.debug("part column is process time col"); - } else if (updatePeriods.first().equals(interval)) { - log.debug("Update period is the least update period"); - } else if ((iter.getNumIters() - iter.getCounter()) > lookAheadNumParts) { - // see if this is the part of the last-n look ahead partitions - log.debug("Not a look ahead partition"); - } else { - log.debug("Looking for look ahead process time partitions for {}", part); - // check if finer partitions are required - // final partitions are required if no partitions from - // look-ahead - // process time are present - TimeRange.Iterable.Iterator processTimeIter = TimeRange.iterable(nextDt, lookAheadNumParts, - interval, 1).iterator(); - while (processTimeIter.hasNext()) { - Date pdt = processTimeIter.next(); - Date nextPdt = processTimeIter.peekNext(); - FactPartition processTimePartition = new FactPartition(processTimePartCol, pdt, interval, null, - partWhereClauseFormat); - updateFactPartitionStorageTablesFrom(fact, processTimePartition, - part.getStorageTables()); - if (processTimePartition.isFound()) { - log.debug("Finer parts not required for look-ahead partition :{}", part); - } else { - log.debug("Looked ahead process time partition {} is not found", processTimePartition); - TreeSet<UpdatePeriod> newset = new TreeSet<UpdatePeriod>(); - newset.addAll(updatePeriods); - newset.remove(interval); - log.debug("newset of update periods:{}", newset); - if (!newset.isEmpty()) { - // Get partitions for look ahead process time - log.debug("Looking for process time partitions between {} and {}", pdt, nextPdt); - Set<FactPartition> processTimeParts = - getPartitions(fact, TimeRange.getBuilder().fromDate(pdt).toDate(nextPdt).partitionColumn( - processTimePartCol).build(), newset, true, false, skipStorageCauses, missingPartitions); - log.debug("Look ahead partitions: {}", processTimeParts); - TimeRange timeRange = TimeRange.getBuilder().fromDate(dt).toDate(nextDt).build(); - for (FactPartition pPart : processTimeParts) { - log.debug("Looking for finer partitions in pPart: {}", pPart); - for (Date date : timeRange.iterable(pPart.getPeriod(), 1)) { - FactPartition innerPart = new FactPartition(partCol, date, pPart.getPeriod(), pPart, - partWhereClauseFormat); - updateFactPartitionStorageTablesFrom(fact, innerPart, pPart); - if (innerPart.isFound()) { - partitions.add(innerPart); - } - } - log.debug("added all sub partitions blindly in pPart: {}", pPart); - } - } - } - } - } - } else { - log.info("Partition:{} does not exist in any storage table", part); - TreeSet<UpdatePeriod> newset = new TreeSet<UpdatePeriod>(); - newset.addAll(updatePeriods); - newset.remove(interval); - if (!getPartitions(fact, dt, nextDt, partCol, partitions, newset, false, failOnPartialData, skipStorageCauses, - missingPartitions)) { - - log.debug("Adding non existing partition {}", part); - if (addNonExistingParts) { - // Add non existing partitions for all cases of whether we populate all non existing or not. - missingPartitions.add(part); - if (!failOnPartialData) { - Set<String> st = getStorageTablesWithoutPartCheck(part, storageTbls); - if (st.isEmpty()) { - log.info("No eligible storage tables"); - return false; - } - partitions.add(part); - part.getStorageTables().addAll(st); - } - } else { - log.info("No finer granual partitions exist for {}", part); - return false; - } - } else { - log.debug("Finer granual partitions added for {}", part); - } - } - } - return getPartitions(fact, fromDate, ceilFromDate, partCol, partitions, - updatePeriods, addNonExistingParts, failOnPartialData, skipStorageCauses, missingPartitions) - && getPartitions(fact, floorToDate, toDate, partCol, partitions, - updatePeriods, addNonExistingParts, failOnPartialData, skipStorageCauses, missingPartitions); - } - - private Set<String> getStorageTablesWithoutPartCheck(FactPartition part, - Set<String> storageTableNames) throws LensException, HiveException { + private Set<String> getStorageTablesWithoutPartCheck(FactPartition part, Set<String> storageTableNames) + throws LensException, HiveException { Set<String> validStorageTbls = new HashSet<>(); for (String storageTableName : storageTableNames) { // skip all storage tables for which are not eligible for this partition @@ -843,21 +372,19 @@ class StorageTableResolver implements ContextRewriter { return validStorageTbls; } - private void updateFactPartitionStorageTablesFrom(CubeFactTable fact, - FactPartition part, Set<String> storageTableNames) throws LensException, HiveException, ParseException { - for (String storageTableName : storageTableNames) { - // skip all storage tables for which are not eligible for this partition - if (client.isStorageTablePartitionACandidate(storageTableName, part.getPartSpec()) - && (client.factPartitionExists(fact, part, storageTableName))) { - part.getStorageTables().add(storageTableName); - part.setFound(true); - } + enum PHASE { + STORAGE_TABLES, STORAGE_PARTITIONS, DIM_TABLE_AND_PARTITIONS; + + static PHASE first() { + return values()[0]; + } + + static PHASE last() { + return values()[values().length - 1]; } - } - private void updateFactPartitionStorageTablesFrom(CubeFactTable fact, - FactPartition part, FactPartition pPart) throws LensException, HiveException, ParseException { - updateFactPartitionStorageTablesFrom(fact, part, pPart.getStorageTables()); - part.setFound(part.isFound() && pPart.isFound()); + PHASE next() { + return values()[(this.ordinal() + 1) % values().length]; + } } } http://git-wip-us.apache.org/repos/asf/lens/blob/b6f0cc3d/lens-cube/src/main/java/org/apache/lens/cube/parse/StorageUtil.java ---------------------------------------------------------------------- diff --git a/lens-cube/src/main/java/org/apache/lens/cube/parse/StorageUtil.java b/lens-cube/src/main/java/org/apache/lens/cube/parse/StorageUtil.java index f9636d1..4f5d405 100644 --- a/lens-cube/src/main/java/org/apache/lens/cube/parse/StorageUtil.java +++ b/lens-cube/src/main/java/org/apache/lens/cube/parse/StorageUtil.java @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an @@ -18,13 +18,19 @@ */ package org.apache.lens.cube.parse; +import static org.apache.lens.cube.metadata.DateUtil.WSPACE; + import java.util.*; +import java.util.regex.Matcher; +import java.util.regex.Pattern; -import org.apache.lens.cube.metadata.FactPartition; -import org.apache.lens.cube.metadata.StorageConstants; +import org.apache.lens.cube.metadata.*; +import org.apache.lens.server.api.error.LensException; import org.apache.commons.lang.StringUtils; +import com.google.common.collect.Lists; + public final class StorageUtil { private StorageUtil() { @@ -69,8 +75,8 @@ public final class StorageUtil { String sep = ""; for (String timePartCol : timedDimensions) { if (!timePartCol.equals(partCol)) { - sb.append(sep).append(alias).append(".").append(timePartCol) - .append(" != '").append(StorageConstants.LATEST_PARTITION_VALUE).append("'"); + sb.append(sep).append(alias).append(".").append(timePartCol).append(" != '") + .append(StorageConstants.LATEST_PARTITION_VALUE).append("'"); sep = " AND "; } } @@ -82,15 +88,11 @@ public final class StorageUtil { String sep = "(("; for (String clause : clauses) { if (clause != null && !clause.isEmpty()) { - sb - .append(sep) - .append(clause); + sb.append(sep).append(clause); sep = ") AND ("; } } - return sb - .append(sep.equals("((") ? "" : "))") - .toString(); + return sb.append(sep.equals("((") ? "" : "))").toString(); } /** @@ -161,4 +163,108 @@ public final class StorageUtil { return null; } } + + /** + * Get fallback range + * @param range + * @param factName + * @param cubeql + * @return + * @throws LensException + */ + public static TimeRange getFallbackRange(TimeRange range, String factName, CubeQueryContext cubeql) + throws LensException { + Cube baseCube = cubeql.getBaseCube(); + ArrayList<String> tableNames = Lists.newArrayList(factName, cubeql.getCube().getName()); + if (!cubeql.getCube().getName().equals(baseCube.getName())) { + tableNames.add(baseCube.getName()); + } + String fallBackString = null; + String timedim = baseCube.getTimeDimOfPartitionColumn(range.getPartitionColumn()); + for (String tableName : tableNames) { + fallBackString = cubeql.getMetastoreClient().getTable(tableName).getParameters() + .get(MetastoreConstants.TIMEDIM_RELATION + timedim); + if (StringUtils.isNotBlank(fallBackString)) { + break; + } + } + if (StringUtils.isBlank(fallBackString)) { + return null; + } + Matcher matcher = Pattern.compile("(.*?)\\+\\[(.*?),(.*?)\\]").matcher(fallBackString.replaceAll(WSPACE, "")); + if (!matcher.matches()) { + return null; + } + DateUtil.TimeDiff diff1 = DateUtil.TimeDiff.parseFrom(matcher.group(2).trim()); + DateUtil.TimeDiff diff2 = DateUtil.TimeDiff.parseFrom(matcher.group(3).trim()); + String relatedTimeDim = matcher.group(1).trim(); + String fallbackPartCol = baseCube.getPartitionColumnOfTimeDim(relatedTimeDim); + return TimeRange.getBuilder().fromDate(diff2.negativeOffsetFrom(range.getFromDate())) + .toDate(diff1.negativeOffsetFrom(range.getToDate())).partitionColumn(fallbackPartCol).build(); + } + + /** + * Checks how much data is completed for a column. + * See this: {@link org.apache.lens.server.api.metastore.DataCompletenessChecker} + * @param cubeql + * @param cubeCol + * @param alias + * @param measureTag + * @param tagToMeasureOrExprMap + * @return + */ + public static boolean processCubeColForDataCompleteness(CubeQueryContext cubeql, String cubeCol, String alias, + Set<String> measureTag, Map<String, String> tagToMeasureOrExprMap) { + CubeMeasure column = cubeql.getCube().getMeasureByName(cubeCol); + if (column != null && column.getTags() != null) { + String dataCompletenessTag = column.getTags().get(MetastoreConstants.MEASURE_DATACOMPLETENESS_TAG); + //Checking if dataCompletenessTag is set for queried measure + if (dataCompletenessTag != null) { + measureTag.add(dataCompletenessTag); + String value = tagToMeasureOrExprMap.get(dataCompletenessTag); + if (value == null) { + tagToMeasureOrExprMap.put(dataCompletenessTag, alias); + } else { + value = value.concat(",").concat(alias); + tagToMeasureOrExprMap.put(dataCompletenessTag, value); + } + return true; + } + } + return false; + } + + /** + * Extract the expression for the measure. + * @param cubeql + * @param measureTag + * @param tagToMeasureOrExprMap + */ + public static void processMeasuresFromExprMeasures(CubeQueryContext cubeql, Set<String> measureTag, + Map<String, String> tagToMeasureOrExprMap) { + boolean isExprProcessed; + String cubeAlias = cubeql.getAliasForTableName(cubeql.getCube().getName()); + for (String expr : cubeql.getQueriedExprsWithMeasures()) { + isExprProcessed = false; + for (ExpressionResolver.ExprSpecContext esc : cubeql.getExprCtx().getExpressionContext(expr, cubeAlias) + .getAllExprs()) { + if (esc.getTblAliasToColumns().get(cubeAlias) != null) { + for (String cubeCol : esc.getTblAliasToColumns().get(cubeAlias)) { + if (processCubeColForDataCompleteness(cubeql, cubeCol, expr, measureTag, tagToMeasureOrExprMap)) { + /* This is done to associate the expression with one of the dataCompletenessTag for the measures. + So, even if the expression is composed of measures with different dataCompletenessTags, we will be + determining the dataCompleteness from one of the measure and this expression is grouped with the + other queried measures that have the same dataCompletenessTag. */ + isExprProcessed = true; + break; + } + } + } + if (isExprProcessed) { + break; + } + } + } + } } +
