LENS-1389: Back Merge with master and fix lens-cube tests
Project: http://git-wip-us.apache.org/repos/asf/lens/repo Commit: http://git-wip-us.apache.org/repos/asf/lens/commit/2aaf6e0a Tree: http://git-wip-us.apache.org/repos/asf/lens/tree/2aaf6e0a Diff: http://git-wip-us.apache.org/repos/asf/lens/diff/2aaf6e0a Branch: refs/heads/lens-1381 Commit: 2aaf6e0a0345f3328bac0bf8ccfaeff99ca05c0f Parents: 4d49359 Author: Rajat Khandelwal <[email protected]> Authored: Tue Feb 28 18:17:15 2017 +0530 Committer: Rajat Khandelwal <[email protected]> Committed: Tue Feb 28 18:17:15 2017 +0530 ---------------------------------------------------------------------- lens-api/src/main/resources/cube-0.1.xsd | 30 +- .../NoCandidateFactAvailableException.java | 1 + .../lens/cube/metadata/CubeFactTable.java | 68 ++- .../lens/cube/metadata/CubeMetastoreClient.java | 339 ++++++----- .../org/apache/lens/cube/metadata/DateUtil.java | 28 +- .../lens/cube/metadata/FactPartition.java | 5 +- .../lens/cube/metadata/MetastoreUtil.java | 6 + .../org/apache/lens/cube/metadata/Storage.java | 30 +- .../apache/lens/cube/metadata/TimeRange.java | 18 +- .../cube/parse/AbridgedTimeRangeWriter.java | 45 +- .../lens/cube/parse/BetweenTimeRangeWriter.java | 2 +- .../parse/CandidateCoveringSetsResolver.java | 35 +- .../apache/lens/cube/parse/CandidateTable.java | 1 - .../cube/parse/CandidateTablePruneCause.java | 151 ++--- .../lens/cube/parse/CandidateTableResolver.java | 36 +- .../apache/lens/cube/parse/CandidateUtil.java | 85 +-- .../apache/lens/cube/parse/CheckTableNames.java | 1 - .../lens/cube/parse/ColumnLifetimeChecker.java | 131 ++++ .../lens/cube/parse/CubeQueryContext.java | 337 +++++------ .../lens/cube/parse/CubeQueryRewriter.java | 12 +- .../lens/cube/parse/CubeSemanticAnalyzer.java | 14 +- .../cube/parse/DenormalizationResolver.java | 227 ++++--- .../lens/cube/parse/ExpressionResolver.java | 287 ++++----- .../apache/lens/cube/parse/FieldValidator.java | 1 - .../cube/parse/MaxCoveringFactResolver.java | 7 +- .../org/apache/lens/cube/parse/PruneCauses.java | 29 +- .../lens/cube/parse/QueriedPhraseContext.java | 2 +- .../lens/cube/parse/StorageCandidate.java | 148 +++-- .../lens/cube/parse/StorageTableResolver.java | 147 +++-- .../org/apache/lens/cube/parse/StorageUtil.java | 1 - .../lens/cube/parse/TimeRangeChecker.java | 238 -------- .../lens/cube/parse/TrackDenormContext.java | 37 ++ .../lens/cube/parse/UnionQueryWriter.java | 2 - .../FactPartitionBasedQueryCostCalculator.java | 3 + .../cube/metadata/TestCubeMetastoreClient.java | 151 ++++- .../apache/lens/cube/parse/CubeTestSetup.java | 14 +- .../FieldsCannotBeQueriedTogetherTest.java | 2 +- .../lens/cube/parse/TestBaseCubeQueries.java | 3 +- .../cube/parse/TestBetweenTimeRangeWriter.java | 51 +- .../lens/cube/parse/TestCubeRewriter.java | 51 +- .../cube/parse/TestDenormalizationResolver.java | 73 +-- .../lens/cube/parse/TestExpressionResolver.java | 16 + .../lens/cube/parse/TestJoinResolver.java | 2 +- .../lens/cube/parse/TestQueryMetrics.java | 50 +- .../lens/cube/parse/TestTimeRangeResolver.java | 21 +- .../parse/TestTimeRangeWriterWithQuery.java | 1 - .../lens/cube/parse/TestUnionQueries.java | 15 +- ...stFactPartitionBasedQueryCostCalculator.java | 21 +- .../lens/driver/jdbc/ColumnarSQLRewriter.java | 2 +- .../lens/driver/jdbc/DruidSQLRewriter.java | 2 +- .../src/test/resources/yaml/city_table.yaml | 3 +- .../src/test/resources/yaml/customer_table.yaml | 3 +- .../src/test/resources/yaml/dim_table.yaml | 3 +- .../src/test/resources/yaml/dim_table2.yaml | 3 +- .../src/test/resources/yaml/dim_table4.yaml | 3 +- .../src/test/resources/yaml/fact1.yaml | 3 +- .../src/test/resources/yaml/fact2.yaml | 3 +- .../src/test/resources/yaml/product_table.yaml | 3 +- .../src/test/resources/yaml/rawfact.yaml | 3 +- .../yaml/sales-aggr-continuous-fact.yaml | 3 +- .../test/resources/yaml/sales-aggr-fact1.yaml | 6 +- .../test/resources/yaml/sales-aggr-fact2.yaml | 6 +- .../src/test/resources/yaml/sales-raw-fact.yaml | 3 +- .../regression/core/constants/DriverConfig.java | 2 - .../core/helpers/ScheduleResourceHelper.java | 62 +- .../apache/lens/regression/util/AssertUtil.java | 1 - .../src/main/resources/template.lens.properties | 9 +- .../apache/lens/regression/ITSessionTests.java | 163 +++++ .../apache/lens/regression/SessionTests.java | 163 ----- .../client/ITDuplicateQueryTests.java | 188 ++++++ .../regression/client/ITKillQueryTests.java | 361 +++++++++++ .../lens/regression/client/ITListQueryTest.java | 7 +- .../regression/client/ITPreparedQueryTests.java | 13 +- .../lens/regression/client/ITQueryApiTests.java | 182 ++---- .../regression/client/ITScheduleQueryTests.java | 284 --------- .../client/ITSessionResourceTests.java | 401 ++++++++++++ .../lens/regression/client/KillQueryTests.java | 362 ----------- .../regression/client/SessionResourceTests.java | 403 ------------ .../regression/config/ITServerConfigTests.java | 197 +++--- .../regression/config/ITSessionConfigTests.java | 4 +- .../scheduler/ITMaxScheduledQueryTests.java | 160 +++++ .../scheduler/ITScheduleQueryTests.java | 337 +++++++++++ .../lens/regression/throttling/ITCostTests.java | 176 +----- .../throttling/ITQueueNumberTests.java | 232 +++++++ .../throttling/ITThrottlingTests.java | 605 +++++++++++++++++++ .../lens/regression/throttling/Throttling.java | 604 ------------------ .../metastore/CubeMetastoreServiceImpl.java | 182 ++++-- .../apache/lens/server/metastore/JAXBUtils.java | 68 ++- .../apache/lens/server/query/LensServerDAO.java | 6 +- .../server/query/QueryExecutionServiceImpl.java | 3 +- .../lens/server/session/HiveSessionService.java | 58 +- .../lens/server/session/LensSessionImpl.java | 20 +- .../src/main/resources/lensserver-default.xml | 11 + .../server/metastore/TestMetastoreService.java | 186 +++++- .../TestQueryIndependenceFromSessionClose.java | 71 ++- pom.xml | 4 +- src/site/apt/admin/config.apt | 128 ++-- tools/conf/server/logback.xml | 4 +- 98 files changed, 4826 insertions(+), 3855 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/lens/blob/2aaf6e0a/lens-api/src/main/resources/cube-0.1.xsd ---------------------------------------------------------------------- diff --git a/lens-api/src/main/resources/cube-0.1.xsd b/lens-api/src/main/resources/cube-0.1.xsd index f438f48..060eb43 100644 --- a/lens-api/src/main/resources/cube-0.1.xsd +++ b/lens-api/src/main/resources/cube-0.1.xsd @@ -681,8 +681,27 @@ </xs:complexType> <xs:complexType name="x_update_periods"> - <xs:sequence> + <xs:annotation> + <xs:documentation> + A list of update_period which contains either update period table descriptor or list of update_peroid enum. + </xs:documentation> + </xs:annotation> + <xs:choice maxOccurs="1" minOccurs="0"> + <xs:element name="update_period_table_descriptor" type="x_update_period_table_descriptor" maxOccurs="unbounded" + minOccurs="0"/> <xs:element name="update_period" type="x_update_period" maxOccurs="unbounded" minOccurs="0"/> + </xs:choice> + </xs:complexType> + + <xs:complexType name="x_update_period_table_descriptor"> + <xs:annotation> + <xs:documentation> + An update period descriptor keeps an enum of update period and a storage table descriptor. + </xs:documentation> + </xs:annotation> + <xs:sequence> + <xs:element name="update_period" type="x_update_period" maxOccurs="1" minOccurs="1"/> + <xs:element name="table_desc" type="x_storage_table_desc" maxOccurs="1" minOccurs="1"/> </xs:sequence> </xs:complexType> @@ -1001,14 +1020,15 @@ <xs:complexType name="x_storage_table_element"> <xs:annotation> <xs:documentation> - Storage and storage table description and update periods + Storage and storage table description and update periods. table_desc is invalid when update_periods has a list + of update_period_table_descriptor instead of a list of enums. </xs:documentation> </xs:annotation> - <xs:sequence> + <xs:all> <xs:element name="update_periods" type="x_update_periods" maxOccurs="1" minOccurs="0"/> <xs:element name="storage_name" type="xs:string"/> - <xs:element type="x_storage_table_desc" name="table_desc"/> - </xs:sequence> + <xs:element type="x_storage_table_desc" name="table_desc" maxOccurs="1" minOccurs="0"/> + </xs:all> </xs:complexType> <xs:complexType name="x_storage_tables"> http://git-wip-us.apache.org/repos/asf/lens/blob/2aaf6e0a/lens-cube/src/main/java/org/apache/lens/cube/error/NoCandidateFactAvailableException.java ---------------------------------------------------------------------- diff --git a/lens-cube/src/main/java/org/apache/lens/cube/error/NoCandidateFactAvailableException.java b/lens-cube/src/main/java/org/apache/lens/cube/error/NoCandidateFactAvailableException.java index bdfa3a0..6f08d0f 100644 --- a/lens-cube/src/main/java/org/apache/lens/cube/error/NoCandidateFactAvailableException.java +++ b/lens-cube/src/main/java/org/apache/lens/cube/error/NoCandidateFactAvailableException.java @@ -34,6 +34,7 @@ public class NoCandidateFactAvailableException extends LensException { @Getter private final CubeQueryContext cubeQueryContext; + @Getter private final PruneCauses<StorageCandidate> briefAndDetailedError; public NoCandidateFactAvailableException(CubeQueryContext cubeql) { http://git-wip-us.apache.org/repos/asf/lens/blob/2aaf6e0a/lens-cube/src/main/java/org/apache/lens/cube/metadata/CubeFactTable.java ---------------------------------------------------------------------- diff --git a/lens-cube/src/main/java/org/apache/lens/cube/metadata/CubeFactTable.java b/lens-cube/src/main/java/org/apache/lens/cube/metadata/CubeFactTable.java index adb6c92..896a7a1 100644 --- a/lens-cube/src/main/java/org/apache/lens/cube/metadata/CubeFactTable.java +++ b/lens-cube/src/main/java/org/apache/lens/cube/metadata/CubeFactTable.java @@ -29,10 +29,14 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.ql.metadata.Table; import com.google.common.collect.Lists; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; @Slf4j public class CubeFactTable extends AbstractCubeTable { + @Getter + // Map<StorageName, Map<update_period, storage_table_prefix>> + private final Map<String, Map<UpdatePeriod, String>> storagePrefixUpdatePeriodMap; private String cubeName; private final Map<String, Set<UpdatePeriod>> storageUpdatePeriods; @@ -40,8 +44,10 @@ public class CubeFactTable extends AbstractCubeTable { super(hiveTable); this.storageUpdatePeriods = getUpdatePeriods(getName(), getProperties()); this.cubeName = getCubeName(getName(), getProperties()); + this.storagePrefixUpdatePeriodMap = getUpdatePeriodMap(getName(), getProperties()); } + public CubeFactTable(String cubeName, String factName, List<FieldSchema> columns, Map<String, Set<UpdatePeriod>> storageUpdatePeriods) { this(cubeName, factName, columns, storageUpdatePeriods, 0L, new HashMap<String, String>()); @@ -54,9 +60,18 @@ public class CubeFactTable extends AbstractCubeTable { public CubeFactTable(String cubeName, String factName, List<FieldSchema> columns, Map<String, Set<UpdatePeriod>> storageUpdatePeriods, double weight, Map<String, String> properties) { + this(cubeName, factName, columns, storageUpdatePeriods, weight, properties, + new HashMap<String, Map<UpdatePeriod, String>>()); + + } + + public CubeFactTable(String cubeName, String factName, List<FieldSchema> columns, + Map<String, Set<UpdatePeriod>> storageUpdatePeriods, double weight, Map<String, String> properties, + Map<String, Map<UpdatePeriod, String>> storagePrefixUpdatePeriodMap) { super(factName, columns, properties, weight); this.cubeName = cubeName; this.storageUpdatePeriods = storageUpdatePeriods; + this.storagePrefixUpdatePeriodMap = storagePrefixUpdatePeriodMap; addProperties(); } @@ -65,6 +80,18 @@ public class CubeFactTable extends AbstractCubeTable { super.addProperties(); addCubeNames(getName(), getProperties(), cubeName); addUpdatePeriodProperies(getName(), getProperties(), storageUpdatePeriods); + addStorageTableProperties(getName(), getProperties(), storagePrefixUpdatePeriodMap); + } + + private void addStorageTableProperties(String name, Map<String, String> properties, + Map<String, Map<UpdatePeriod, String>> storageUpdatePeriodMap) { + for (String storageName : storageUpdatePeriodMap.keySet()) { + String prefix = MetastoreUtil.getFactKeyPrefix(name) + "." + storageName; + for (Map.Entry updatePeriodEntry : storageUpdatePeriodMap.get(storageName).entrySet()) { + String updatePeriod = ((UpdatePeriod) updatePeriodEntry.getKey()).getName(); + properties.put(prefix + "." + updatePeriod, (String) updatePeriodEntry.getValue()); + } + } } private static void addUpdatePeriodProperies(String name, Map<String, String> props, @@ -82,7 +109,29 @@ public class CubeFactTable extends AbstractCubeTable { props.put(MetastoreUtil.getFactCubeNameKey(factName), cubeName); } - private static Map<String, Set<UpdatePeriod>> getUpdatePeriods(String name, Map<String, String> props) { + private Map<String, Map<UpdatePeriod, String>> getUpdatePeriodMap(String factName, Map<String, String> props) { + Map<String, Map<UpdatePeriod, String>> ret = new HashMap<>(); + for (Map.Entry entry : storageUpdatePeriods.entrySet()) { + String storage = (String) entry.getKey(); + for (UpdatePeriod period : (Set<UpdatePeriod>) entry.getValue()) { + String storagePrefixKey = MetastoreUtil + .getUpdatePeriodStoragePrefixKey(factName.trim(), storage, period.getName()); + String storageTableNamePrefix = props.get(storagePrefixKey); + if (storageTableNamePrefix == null) { + storageTableNamePrefix = storage; + } + Map<UpdatePeriod, String> mapOfUpdatePeriods = ret.get(storage); + if (mapOfUpdatePeriods == null) { + mapOfUpdatePeriods = new HashMap<>(); + ret.put(storage, mapOfUpdatePeriods); + } + mapOfUpdatePeriods.put(period, storageTableNamePrefix); + } + } + return ret; + } + + private Map<String, Set<UpdatePeriod>> getUpdatePeriods(String name, Map<String, String> props) { Map<String, Set<UpdatePeriod>> storageUpdatePeriods = new HashMap<>(); String storagesStr = props.get(MetastoreUtil.getFactStorageListKey(name)); if (!StringUtils.isBlank(storagesStr)) { @@ -273,13 +322,16 @@ public class CubeFactTable extends AbstractCubeTable { /** * Add a storage with specified update periods - * * @param storage * @param updatePeriods + * @param updatePeriodStoragePrefix */ - void addStorage(String storage, Set<UpdatePeriod> updatePeriods) { + void addStorage(String storage, Set<UpdatePeriod> updatePeriods, + Map<UpdatePeriod, String> updatePeriodStoragePrefix) { storageUpdatePeriods.put(storage, updatePeriods); + storagePrefixUpdatePeriodMap.put(storage, updatePeriodStoragePrefix); addUpdatePeriodProperies(getName(), getProperties(), storageUpdatePeriods); + addStorageTableProperties(getName(), getProperties(), storagePrefixUpdatePeriodMap); } /** @@ -289,6 +341,12 @@ public class CubeFactTable extends AbstractCubeTable { */ void dropStorage(String storage) { storageUpdatePeriods.remove(storage); + String prefix = MetastoreUtil.getFactKeyPrefix(getName()) + "." + storage; + for (Map.Entry updatePeriodEntry : storagePrefixUpdatePeriodMap.get(storage).entrySet()) { + String updatePeriod = ((UpdatePeriod)updatePeriodEntry.getKey()).getName(); + getProperties().remove(prefix + "." + updatePeriod); + } + storagePrefixUpdatePeriodMap.remove(storage); getProperties().remove(MetastoreUtil.getFactUpdatePeriodKey(getName(), storage)); String newStorages = StringUtils.join(storageUpdatePeriods.keySet(), ","); getProperties().put(MetastoreUtil.getFactStorageListKey(getName()), newStorages); @@ -351,5 +409,7 @@ public class CubeFactTable extends AbstractCubeTable { return Collections.min(Lists.newArrayList(getRelativeEndTime(), getAbsoluteEndTime())); } - + public String getTablePrefix(String storage, UpdatePeriod updatePeriod) { + return storagePrefixUpdatePeriodMap.get(storage).get(updatePeriod); + } } http://git-wip-us.apache.org/repos/asf/lens/blob/2aaf6e0a/lens-cube/src/main/java/org/apache/lens/cube/metadata/CubeMetastoreClient.java ---------------------------------------------------------------------- diff --git a/lens-cube/src/main/java/org/apache/lens/cube/metadata/CubeMetastoreClient.java b/lens-cube/src/main/java/org/apache/lens/cube/metadata/CubeMetastoreClient.java index aa2e9d1..78fb6d3 100644 --- a/lens-cube/src/main/java/org/apache/lens/cube/metadata/CubeMetastoreClient.java +++ b/lens-cube/src/main/java/org/apache/lens/cube/metadata/CubeMetastoreClient.java @@ -31,7 +31,7 @@ import org.apache.lens.cube.metadata.Storage.LatestInfo; import org.apache.lens.cube.metadata.Storage.LatestPartColumnInfo; import org.apache.lens.cube.metadata.timeline.PartitionTimeline; import org.apache.lens.cube.metadata.timeline.PartitionTimelineFactory; -import org.apache.lens.server.api.*; +import org.apache.lens.server.api.LensConfConstants; import org.apache.lens.server.api.error.LensException; import org.apache.lens.server.api.metastore.DataCompletenessChecker; import org.apache.lens.server.api.util.LensUtil; @@ -121,7 +121,13 @@ public class CubeMetastoreClient { if (ind <= 0) { throw new LensException("storageTable: " + storageTableName + ", does not belong to fact: " + fact.getName()); } - return storageTableName.substring(0, ind - StorageConstants.STORGAE_SEPARATOR.length()); + String name = storageTableName.substring(0, ind - StorageConstants.STORGAE_SEPARATOR.length()); + for (String storageName : fact.getStorages()) { + if (name.equalsIgnoreCase(storageName)) { + return storageName; + } + } + throw new LensException("storageTable: " + storageTableName + ", does not belong to fact: " + fact.getName()); } /** @@ -169,11 +175,11 @@ public class CubeMetastoreClient { UpdatePeriod updatePeriod = updatePeriodStr == null ? null : UpdatePeriod.valueOf(updatePeriodStr.toUpperCase()); List<PartitionTimeline> ret = Lists.newArrayList(); CubeFactTable fact = getCubeFact(factName); - List<String> keys = Lists.newArrayList(); + List<String> storageList = Lists.newArrayList(); if (storage != null) { - keys.add(storage); + storageList.add(storage); } else { - keys.addAll(fact.getStorages()); + storageList.addAll(fact.getStorages()); } String partCol = null; if (timeDimension != null) { @@ -186,9 +192,9 @@ public class CubeMetastoreClient { } partCol = baseCube.getPartitionColumnOfTimeDim(timeDimension); } - for (String key : keys) { + for (String storageName : storageList) { for (Map.Entry<UpdatePeriod, CaseInsensitiveStringHashMap<PartitionTimeline>> entry : partitionTimelineCache - .get(factName, key).entrySet()) { + .get(factName, storageName).entrySet()) { if (updatePeriod == null || entry.getKey().equals(updatePeriod)) { for (Map.Entry<String, PartitionTimeline> entry1 : entry.getValue().entrySet()) { if (partCol == null || partCol.equals(entry1.getKey())) { @@ -201,25 +207,30 @@ public class CubeMetastoreClient { return ret; } - public void updatePartition(String fact, String storageName, Partition partition) + public void updatePartition(String fact, String storageName, Partition partition, UpdatePeriod updatePeriod) throws HiveException, InvalidOperationException, LensException { - updatePartitions(fact, storageName, Collections.singletonList(partition)); + Map<UpdatePeriod, List<Partition>> updatePeriodListMap = new HashMap<>(); + updatePeriodListMap.put(updatePeriod, Collections.singletonList(partition)); + updatePartitions(fact, storageName, updatePeriodListMap); } - public void updatePartitions(String factOrDimtableName, String storageName, List<Partition> partitions) - throws HiveException, InvalidOperationException, LensException { - List<Partition> partitionsToAlter = Lists.newArrayList(); - partitionsToAlter.addAll(partitions); - partitionsToAlter.addAll(getAllLatestPartsEquivalentTo(factOrDimtableName, storageName, partitions)); - getStorage(storageName).updatePartitions(getClient(), factOrDimtableName, partitionsToAlter); + public void updatePartitions(String factOrDimtableName, String storageName, + Map<UpdatePeriod, List<Partition>> partitions) throws HiveException, InvalidOperationException, LensException { + for (Map.Entry entry : partitions.entrySet()) { + List<Partition> partitionsToAlter = Lists.newArrayList(); + partitionsToAlter.addAll((List<Partition>) entry.getValue()); + String storageTableName = getStorageTableName(factOrDimtableName, storageName, (UpdatePeriod) entry.getKey()); + partitionsToAlter.addAll( + getAllLatestPartsEquivalentTo(factOrDimtableName, storageTableName, (List<Partition>) entry.getValue())); + getStorage(storageName).updatePartitions(storageTableName, getClient(), factOrDimtableName, partitionsToAlter); + } } - private List<Partition> getAllLatestPartsEquivalentTo(String factOrDimtableName, String storageName, + private List<Partition> getAllLatestPartsEquivalentTo(String factOrDimtableName, String storageTableName, List<Partition> partitions) throws HiveException, LensException { if (isFactTable(factOrDimtableName)) { return Lists.newArrayList(); } - String storageTableName = getFactOrDimtableStorageTableName(factOrDimtableName, storageName); Table storageTable = getTable(storageTableName); List<String> timePartCols = getTimePartColNamesOfTable(storageTable); List<Partition> latestParts = Lists.newArrayList(); @@ -279,6 +290,17 @@ public class CubeMetastoreClient { } } + public void createCubeFactTable(String cubeName, String factName, List<FieldSchema> columns, + Map<String, Set<UpdatePeriod>> storageAggregatePeriods, double weight, Map<String, String> properties, + Map<String, StorageTableDesc> storageTableDescs, Map<String, Map<UpdatePeriod, String>> storageUpdatePeriodMap) + throws LensException { + CubeFactTable factTable = new CubeFactTable(cubeName, factName, columns, storageAggregatePeriods, weight, + properties, storageUpdatePeriodMap); + createCubeTable(factTable, storageTableDescs); + // do a get to update cache + getCubeFact(factName); + + } /** * In-memory storage of {@link PartitionTimeline} objects for each valid @@ -327,48 +349,75 @@ public class CubeMetastoreClient { public TreeMap<UpdatePeriod, CaseInsensitiveStringHashMap<PartitionTimeline>> get(String fact, String storage) throws HiveException, LensException { // SUSPEND CHECKSTYLE CHECK DoubleCheckedLockingCheck - String storageTableName = getStorageTableName(fact, Storage.getPrefix(storage)); - if (get(storageTableName) == null) { - synchronized (this) { - if (get(storageTableName) == null) { - Table storageTable = getTable(storageTableName); - if ("true".equalsIgnoreCase(storageTable.getParameters().get(getPartitionTimelineCachePresenceKey()))) { - try { - loadTimelinesFromTableProperties(fact, storage); - } catch (Exception e) { - // Ideally this should never come. But since we have another source, - // let's piggyback on that for loading timeline - log.error("Error while loading timelines from table properties.", e); - loadTimelinesFromAllPartitions(fact, storage); - } - } else { - loadTimelinesFromAllPartitions(fact, storage); + // Unique key for the timeline cache, based on storageName and fact. + String timeLineKey = (Storage.getPrefix(storage)+ fact).toLowerCase(); + synchronized (this) { + if (get(timeLineKey) == null) { + loadTimeLines(fact, storage, timeLineKey); + } + log.info("timeline for {} is: {}", storage, get(timeLineKey)); + // return the final value from memory + return get(timeLineKey); + // RESUME CHECKSTYLE CHECK DoubleCheckedLockingCheck + } + } + + /** + * @param fact + * @param storage + */ + private void loadTimeLines(String fact, String storage, String timeLineKey) throws LensException, HiveException { + Set<String> uniqueStorageTables = new HashSet<>(); + Map<UpdatePeriod, String> updatePeriodTableName = new HashMap<>(); + for (UpdatePeriod updatePeriod : getCubeFact(fact).getUpdatePeriods().get(storage)) { + String storageTableName = getStorageTableName(fact, storage, updatePeriod); + updatePeriodTableName.put(updatePeriod, storageTableName); + Table storageTable = getTable(storageTableName); + if ("true".equalsIgnoreCase(storageTable.getParameters().get(getPartitionTimelineCachePresenceKey()))) { + try { + loadTimelinesFromTableProperties(updatePeriod, storageTableName, timeLineKey); + } catch (Exception e) { + // Ideally this should never come. But since we have another source, + // let's piggyback on that for loading timeline + log.error("Error while loading timelines from table properties.", e); + ensureEntryForTimeLineKey(fact, storage, updatePeriod, storageTableName, timeLineKey); + if (!uniqueStorageTables.contains(storageTableName)) { + uniqueStorageTables.add(storageTableName); + loadTimelinesFromAllPartitions(storageTableName, timeLineKey); } } + } else { + ensureEntryForTimeLineKey(fact, storage, updatePeriod, storageTableName, timeLineKey); + if (!uniqueStorageTables.contains(storageTableName)) { + uniqueStorageTables.add(storageTableName); + loadTimelinesFromAllPartitions(storageTableName, timeLineKey); + } } - log.info("timeline for {} is: {}", storageTableName, get(storageTableName)); } - // return the final value from memory - return get(storageTableName); - // RESUME CHECKSTYLE CHECK DoubleCheckedLockingCheck + for (Map.Entry entry : updatePeriodTableName.entrySet()) { + alterTablePartitionCache(timeLineKey, (UpdatePeriod) entry.getKey(), (String) entry.getValue()); + } } - private void loadTimelinesFromAllPartitions(String fact, String storage) throws HiveException, LensException { + private void ensureEntryForTimeLineKey(String fact, String storage, UpdatePeriod updatePeriod, + String storageTableName, String timeLineKey) throws LensException { // Not found in table properties either, compute from all partitions of the fact-storage table. // First make sure all combinations of update period and partition column have an entry even // if no partitions exist - String storageTableName = getStorageTableName(fact, Storage.getPrefix(storage)); - log.info("loading from all partitions: {}", storageTableName); - Table storageTable = getTable(storageTableName); - if (getCubeFact(fact).getUpdatePeriods() != null && getCubeFact(fact).getUpdatePeriods().get( - storage) != null) { - for (UpdatePeriod updatePeriod : getCubeFact(fact).getUpdatePeriods().get(storage)) { - for (String partCol : getTimePartColNamesOfTable(storageTable)) { - ensureEntry(storageTableName, updatePeriod, partCol); - } + if (getCubeFact(fact).getUpdatePeriods() != null && getCubeFact(fact).getUpdatePeriods().get(storage) != null) { + log.info("loading from all partitions: {}", storageTableName); + Table storageTable = getTable(storageTableName); + for (String partCol : getTimePartColNamesOfTable(storageTable)) { + ensureEntry(timeLineKey, storageTableName, updatePeriod, partCol); } } + + } + + private void loadTimelinesFromAllPartitions(String storageTableName, String timeLineKey) + throws HiveException, LensException { // Then add all existing partitions for batch addition in respective timelines. + Table storageTable = getTable(storageTableName); List<String> timeParts = getTimePartColNamesOfTable(storageTable); List<FieldSchema> partCols = storageTable.getPartCols(); for (Partition partition : getPartitionsByFilter(storageTableName, null)) { @@ -382,23 +431,17 @@ public class CubeMetastoreClient { } for (int i = 0; i < partCols.size(); i++) { if (timeParts.contains(partCols.get(i).getName())) { - addForBatchAddition(storageTableName, period, partCols.get(i).getName(), values.get(i)); + addForBatchAddition(timeLineKey, storageTableName, period, partCols.get(i).getName(), values.get(i)); } } } - // commit all batch addition for the storage table, - // which will in-turn commit all batch additions in all it's timelines. - commitAllBatchAdditions(storageTableName); } - private void loadTimelinesFromTableProperties(String fact, String storage) throws HiveException, LensException { - // found in table properties, load from there. - String storageTableName = getStorageTableName(fact, Storage.getPrefix(storage)); + private void loadTimelinesFromTableProperties(UpdatePeriod updatePeriod, + String storageTableName, String timeLineKey) throws HiveException, LensException { log.info("loading from table properties: {}", storageTableName); - for (UpdatePeriod updatePeriod : getCubeFact(fact).getUpdatePeriods().get(storage)) { - for (String partCol : getTimePartColNamesOfTable(storageTableName)) { - ensureEntry(storageTableName, updatePeriod, partCol).init(getTable(storageTableName)); - } + for (String partCol : getTimePartColNamesOfTable(storageTableName)) { + ensureEntry(timeLineKey, storageTableName, updatePeriod, partCol).init(getTable(storageTableName)); } } @@ -406,16 +449,17 @@ public class CubeMetastoreClient { * Adds given partition(for storageTable, updatePeriod, partitionColum=partition) for batch addition in an * appropriate timeline object. Ignore if partition is not valid. * - * @param storageTable storage table + * @param timeLineKey key for the timeLine map + * @param storageTableName hive table name * @param updatePeriod update period * @param partitionColumn partition column * @param partition partition */ - public void addForBatchAddition(String storageTable, UpdatePeriod updatePeriod, String partitionColumn, - String partition) { + public void addForBatchAddition(String timeLineKey, String storageTableName, UpdatePeriod updatePeriod, + String partitionColumn, String partition) { try { - ensureEntry(storageTable, updatePeriod, partitionColumn).addForBatchAddition(TimePartition.of(updatePeriod, - partition)); + ensureEntry(timeLineKey, storageTableName, updatePeriod, partitionColumn) + .addForBatchAddition(TimePartition.of(updatePeriod, partition)); } catch (LensException e) { // to take care of the case where partition name is something like `latest` log.error("Couldn't parse partition: {} with update period: {}, skipping.", partition, updatePeriod, e); @@ -427,42 +471,18 @@ public class CubeMetastoreClient { * <p></p> * kind of like mkdir -p * - * @param storageTable storage table + * @param timeLineKey storage table * @param updatePeriod update period * @param partitionColumn partition column * @return timeline if already exists, or puts a new timeline and returns. */ - public PartitionTimeline ensureEntry(String storageTable, UpdatePeriod updatePeriod, String partitionColumn) { - if (get(storageTable) == null) { - put(storageTable, new TreeMap<UpdatePeriod, CaseInsensitiveStringHashMap<PartitionTimeline>>()); - } - if (get(storageTable).get(updatePeriod) == null) { - get(storageTable).put(updatePeriod, new CaseInsensitiveStringHashMap<PartitionTimeline>()); - } - if (get(storageTable).get(updatePeriod).get(partitionColumn) == null) { - get(storageTable).get(updatePeriod).put(partitionColumn, PartitionTimelineFactory.get( - CubeMetastoreClient.this, storageTable, updatePeriod, partitionColumn)); - } - return get(storageTable).get(updatePeriod).get(partitionColumn); - } - - /** - * commit all batch addition for all its timelines. - * - * @param storageTable storage table - * @throws HiveException - * @throws LensException - */ - public void commitAllBatchAdditions(String storageTable) throws HiveException, LensException { - if (get(storageTable) != null) { - for (UpdatePeriod updatePeriod : get(storageTable).keySet()) { - for (String partCol : get(storageTable).get(updatePeriod).keySet()) { - PartitionTimeline timeline = get(storageTable).get(updatePeriod).get(partCol); - timeline.commitBatchAdditions(); - } - } - alterTablePartitionCache(storageTable); - } + public PartitionTimeline ensureEntry(String timeLineKey, String storagTableName, UpdatePeriod updatePeriod, + String partitionColumn) { + return this + .computeIfAbsent(timeLineKey, s -> new TreeMap<>()) + .computeIfAbsent(updatePeriod, k -> new CaseInsensitiveStringHashMap<>()) + .computeIfAbsent(partitionColumn, c -> PartitionTimelineFactory.get( + CubeMetastoreClient.this, storagTableName, updatePeriod, c)); } /** check partition existence in the appropriate timeline if it exists */ @@ -478,9 +498,11 @@ public class CubeMetastoreClient { */ public PartitionTimeline get(String fact, String storage, UpdatePeriod updatePeriod, String partCol) throws HiveException, LensException { - return get(fact, storage) != null && get(fact, storage).get(updatePeriod) != null && get(fact, storage).get( - updatePeriod).get(partCol) != null ? get(fact, storage).get(updatePeriod).get(partCol) : null; + return get(fact, storage) != null && get(fact, storage).get(updatePeriod) != null + && get(fact, storage).get(updatePeriod).get(partCol) != null ? get(fact, storage).get(updatePeriod) + .get(partCol) : null; } + /** * returns the timeline corresponding to fact-storage table, updatePeriod, partCol. throws exception if not * exists, which would most probably mean the combination is incorrect. @@ -489,8 +511,8 @@ public class CubeMetastoreClient { throws HiveException, LensException { PartitionTimeline timeline = get(fact, storage, updatePeriod, partCol); if (timeline == null) { - throw new LensException(LensCubeErrorCode.TIMELINE_ABSENT.getLensErrorInfo(), - fact, storage, updatePeriod, partCol); + throw new LensException(LensCubeErrorCode.TIMELINE_ABSENT.getLensErrorInfo(), fact, storage, updatePeriod, + partCol); } return timeline; } @@ -519,8 +541,8 @@ public class CubeMetastoreClient { boolean updated = false; for (Map.Entry<String, Date> entry : timePartSpec.entrySet()) { TimePartition part = TimePartition.of(updatePeriod, entry.getValue()); - if (!partitionExistsByFilter(cubeTableName, storageName, StorageConstants.getPartFilter(entry.getKey(), - part.getDateString()))) { + if (!partitionExistsByFilter(cubeTableName, storageName, updatePeriod, + StorageConstants.getPartFilter(entry.getKey(), part.getDateString()))) { get(cubeTableName, storageName, updatePeriod, entry.getKey()).drop(part); updated = true; } @@ -565,10 +587,10 @@ public class CubeMetastoreClient { Hive.closeCurrent(); } - private void createOrAlterStorageHiveTable(Table parent, String storage, StorageTableDesc crtTblDesc) + private void createOrAlterStorageHiveTable(Table parent, String storageTableNamePrefix, StorageTableDesc crtTblDesc) throws LensException { try { - Table tbl = getStorage(storage).getStorageTable(getClient(), parent, crtTblDesc); + Table tbl = Storage.getStorageTable(storageTableNamePrefix, getClient(), parent, crtTblDesc); if (tableExists(tbl.getTableName())) { // alter table alterHiveTable(tbl.getTableName(), tbl); @@ -730,7 +752,7 @@ public class CubeMetastoreClient { * @param storageAggregatePeriods Aggregate periods for the storages * @param weight Weight of the cube * @param properties Properties of fact table - * @param storageTableDescs Map of storage to its storage table description + * @param storageTableDescs Map of storage table prefix to its storage table description * @throws LensException */ public void createCubeFactTable(String cubeName, String factName, List<FieldSchema> columns, @@ -808,7 +830,7 @@ public class CubeMetastoreClient { * Create cube table defined and create all the corresponding storage tables * * @param cubeTable Can be fact or dimension table - * @param storageTableDescs Map of storage to its storage table description + * @param storageTableDescs Map of storage tableName prefix to its storage table description * @throws LensException */ public void createCubeTable(AbstractCubeTable cubeTable, Map<String, StorageTableDesc> storageTableDescs) @@ -836,14 +858,17 @@ public class CubeMetastoreClient { * @param fact The CubeFactTable * @param storage The storage * @param updatePeriods Update periods of the fact on the storage - * @param storageTableDesc The storage table description + * @param storageTableDescs The storage table description * @throws LensException */ public void addStorage(CubeFactTable fact, String storage, Set<UpdatePeriod> updatePeriods, - StorageTableDesc storageTableDesc) throws LensException { - fact.addStorage(storage, updatePeriods); - createOrAlterStorageHiveTable(getTableWithTypeFailFast(fact.getName(), CubeTableType.FACT), - storage, storageTableDesc); + Map<String, StorageTableDesc> storageTableDescs, Map<UpdatePeriod, String> updatePeriodStoragePrefix) + throws LensException { + fact.addStorage(storage, updatePeriods, updatePeriodStoragePrefix); + for (Map.Entry entry : storageTableDescs.entrySet()) { + createOrAlterStorageHiveTable(getTableWithTypeFailFast(fact.getName(), CubeTableType.FACT), + (String) entry.getKey(), (StorageTableDesc) entry.getValue()); + } alterCubeTable(fact.getName(), getTableWithTypeFailFast(fact.getName(), CubeTableType.FACT), fact); updateFactCache(fact.getName()); } @@ -860,8 +885,8 @@ public class CubeMetastoreClient { public void addStorage(CubeDimensionTable dim, String storage, UpdatePeriod dumpPeriod, StorageTableDesc storageTableDesc) throws LensException { dim.alterSnapshotDumpPeriod(storage, dumpPeriod); - createOrAlterStorageHiveTable(getTableWithTypeFailFast(dim.getName(), CubeTableType.DIM_TABLE), - storage, storageTableDesc); + createOrAlterStorageHiveTable(getTableWithTypeFailFast(dim.getName(), CubeTableType.DIM_TABLE), storage, + storageTableDesc); alterCubeTable(dim.getName(), getTableWithTypeFailFast(dim.getName(), CubeTableType.DIM_TABLE), dim); updateDimCache(dim.getName()); } @@ -896,10 +921,19 @@ public class CubeMetastoreClient { return partsAdded; } + /** + * @param factOrDimTable + * @param storageName + * @param updatePeriod + * @param storagePartitionDescs + * @param type + * @return + * @throws HiveException + * @throws LensException + */ private List<Partition> addPartitions(String factOrDimTable, String storageName, UpdatePeriod updatePeriod, List<StoragePartitionDesc> storagePartitionDescs, CubeTableType type) throws HiveException, LensException { - String storageTableName = getStorageTableName(factOrDimTable.trim(), - Storage.getPrefix(storageName.trim())).toLowerCase(); + String storageTableName = getStorageTableName(factOrDimTable, storageName, updatePeriod); if (type == CubeTableType.DIM_TABLE) { // Adding partition in dimension table. Map<Map<String, String>, LatestInfo> latestInfos = Maps.newHashMap(); @@ -910,7 +944,7 @@ public class CubeMetastoreClient { } List<Partition> partsAdded = getStorage(storageName).addPartitions(getClient(), factOrDimTable, updatePeriod, storagePartitionDescs, - latestInfos); + latestInfos, storageTableName); ListIterator<Partition> iter = partsAdded.listIterator(); while (iter.hasNext()) { if (iter.next().getSpec().values().contains(StorageConstants.LATEST_PARTITION_VALUE)) { @@ -928,10 +962,11 @@ public class CubeMetastoreClient { // Adding partition in fact table. if (storagePartitionDescs.size() > 0) { partsAdded = getStorage(storageName).addPartitions(getClient(), factOrDimTable, updatePeriod, - storagePartitionDescs, null); + storagePartitionDescs, null, storageTableName); } // update hive table - alterTablePartitionCache(getStorageTableName(factOrDimTable, Storage.getPrefix(storageName))); + alterTablePartitionCache((Storage.getPrefix(storageName) + factOrDimTable).toLowerCase(), updatePeriod, + storageTableName); return partsAdded; } else { throw new LensException("Can't add partitions to anything other than fact or dimtable"); @@ -1018,20 +1053,20 @@ public class CubeMetastoreClient { } /** - * store back all timelines of given storage table to table properties + * store back all timelines of given storage to table properties * - * @param storageTableName storage table name + * @param timeLineKey key for the time line + * @param storageTableName Storage table name * @throws HiveException */ - private void alterTablePartitionCache(String storageTableName) throws HiveException, LensException { + private void alterTablePartitionCache(String timeLineKey, UpdatePeriod updatePeriod, String storageTableName) + throws HiveException, LensException { Table table = getTable(storageTableName); Map<String, String> params = table.getParameters(); - if (partitionTimelineCache.get(storageTableName) != null) { - for (UpdatePeriod updatePeriod : partitionTimelineCache.get(storageTableName).keySet()) { - for (Map.Entry<String, PartitionTimeline> entry : partitionTimelineCache.get(storageTableName) - .get(updatePeriod).entrySet()) { - entry.getValue().updateTableParams(table); - } + if (partitionTimelineCache.get(timeLineKey) != null) { + for (Map.Entry<String, PartitionTimeline> entry : partitionTimelineCache.get(timeLineKey).get(updatePeriod) + .entrySet()) { + entry.getValue().updateTableParams(table); } params.put(getPartitionTimelineCachePresenceKey(), "true"); alterHiveTable(storageTableName, table); @@ -1173,8 +1208,7 @@ public class CubeMetastoreClient { */ public void dropPartition(String cubeTableName, String storageName, Map<String, Date> timePartSpec, Map<String, String> nonTimePartSpec, UpdatePeriod updatePeriod) throws HiveException, LensException { - String storageTableName = getStorageTableName(cubeTableName.trim(), - Storage.getPrefix(storageName.trim())).toLowerCase(); + String storageTableName = getStorageTableName(cubeTableName.trim(), storageName, updatePeriod); Table hiveTable = getHiveTable(storageTableName); List<FieldSchema> partCols = hiveTable.getPartCols(); List<String> partColNames = new ArrayList<>(partCols.size()); @@ -1244,7 +1278,8 @@ public class CubeMetastoreClient { // dropping fact partition getStorage(storageName).dropPartition(getClient(), storageTableName, partVals, null, null); if (partitionTimelineCache.updateForDeletion(cubeTableName, storageName, updatePeriod, timePartSpec)) { - this.alterTablePartitionCache(storageTableName); + this.alterTablePartitionCache((Storage.getPrefix(storageName) + cubeTableName).toLowerCase(), updatePeriod, + storageTableName); } } } @@ -1277,7 +1312,7 @@ public class CubeMetastoreClient { public boolean factPartitionExists(String factName, String storageName, UpdatePeriod updatePeriod, Map<String, Date> partitionTimestamp, Map<String, String> partSpec) throws HiveException, LensException { - String storageTableName = getFactOrDimtableStorageTableName(factName, storageName); + String storageTableName = getStorageTableName(factName, storageName, updatePeriod); return partitionExists(storageTableName, updatePeriod, partitionTimestamp, partSpec); } @@ -1286,9 +1321,9 @@ public class CubeMetastoreClient { return partitionExists(storageTableName, getPartitionSpec(updatePeriod, partitionTimestamps)); } - public boolean partitionExistsByFilter(String cubeTableName, String storageName, String filter) throws LensException { - return partitionExistsByFilter(getStorageTableName(cubeTableName, Storage.getPrefix(storageName)), - filter); + public boolean partitionExistsByFilter(String cubeTableName, String storageName, UpdatePeriod updatePeriod, + String filter) throws LensException { + return partitionExistsByFilter(getStorageTableName(cubeTableName, storageName, updatePeriod), filter); } public boolean partitionExistsByFilter(String storageTableName, String filter) throws LensException { @@ -1354,7 +1389,7 @@ public class CubeMetastoreClient { boolean latestPartitionExists(String factOrDimTblName, String storageName, String latestPartCol) throws HiveException, LensException { - String storageTableName = getStorageTableName(factOrDimTblName, Storage.getPrefix(storageName)); + String storageTableName = MetastoreUtil.getStorageTableName(factOrDimTblName, Storage.getPrefix(storageName)); if (isDimensionTable(factOrDimTblName)) { return dimTableLatestPartitionExists(storageTableName); } else { @@ -2225,18 +2260,30 @@ public class CubeMetastoreClient { */ public void dropStorageFromFact(String factName, String storage) throws LensException { CubeFactTable cft = getFactTable(factName); + dropHiveTablesForStorage(factName, storage); cft.dropStorage(storage); - dropHiveTable(getFactOrDimtableStorageTableName(factName, storage)); alterCubeTable(factName, getTableWithTypeFailFast(factName, CubeTableType.FACT), cft); updateFactCache(factName); } + private void dropHiveTablesForStorage(String factName, String storage) throws LensException{ + CubeFactTable cft = getFactTable(factName); + Set<String> droppedTables = new HashSet<>(); + for (Map.Entry updatePeriodEntry : cft.getStoragePrefixUpdatePeriodMap().get(storage).entrySet()) { + UpdatePeriod updatePeriod = (UpdatePeriod) updatePeriodEntry.getKey(); + String storageTableName = getStorageTableName(factName, storage, updatePeriod); + if (!droppedTables.contains(storageTableName)) { + dropHiveTable(storageTableName); + } + droppedTables.add(storageTableName); + } + } // updateFact will be false when fact is fully dropped private void dropStorageFromFact(String factName, String storage, boolean updateFact) throws LensException { - CubeFactTable cft = getFactTable(factName); - dropHiveTable(getFactOrDimtableStorageTableName(factName, storage)); + dropHiveTablesForStorage(factName, storage); if (updateFact) { + CubeFactTable cft = getFactTable(factName); cft.dropStorage(storage); alterCubeTable(factName, getTableWithTypeFailFast(factName, CubeTableType.FACT), cft); updateFactCache(factName); @@ -2432,4 +2479,22 @@ public class CubeMetastoreClient { Date now = new Date(); return isStorageTableCandidateForRange(storageTableName, resolveDate(fromDate, now), resolveDate(toDate, now)); } + + private String getStorageTablePrefixFromStorage(String factOrDimTableName, String storage, UpdatePeriod updatePeriod) + throws LensException { + if (updatePeriod == null) { + return storage; + } + if (isFactTable(factOrDimTableName)) { + return getFactTable(factOrDimTableName).getTablePrefix(storage, updatePeriod); + } else { + return storage; + } + } + + public String getStorageTableName(String factOrDimTableName, String storage, UpdatePeriod updatePeriod) + throws LensException { + return MetastoreUtil.getFactOrDimtableStorageTableName(factOrDimTableName, + getStorageTablePrefixFromStorage(factOrDimTableName, storage, updatePeriod)); + } } http://git-wip-us.apache.org/repos/asf/lens/blob/2aaf6e0a/lens-cube/src/main/java/org/apache/lens/cube/metadata/DateUtil.java ---------------------------------------------------------------------- diff --git a/lens-cube/src/main/java/org/apache/lens/cube/metadata/DateUtil.java b/lens-cube/src/main/java/org/apache/lens/cube/metadata/DateUtil.java index 7717081..d10d72e 100644 --- a/lens-cube/src/main/java/org/apache/lens/cube/metadata/DateUtil.java +++ b/lens-cube/src/main/java/org/apache/lens/cube/metadata/DateUtil.java @@ -30,6 +30,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; import java.util.regex.Pattern; +import java.util.stream.Stream; import org.apache.lens.cube.error.LensCubeErrorCode; import org.apache.lens.server.api.error.LensException; @@ -305,11 +306,11 @@ public final class DateUtil { switch (interval) { case SECONDLY: case CONTINUOUS: - return getMilliSecondCoveringInfo(from, to, 1000); + return getMilliSecondCoveringInfo(from, to, 1000, interval); case MINUTELY: case HOURLY: case DAILY: - return getMilliSecondCoveringInfo(from, to, interval.weight()); + return getMilliSecondCoveringInfo(from, to, interval.weight(), interval); case WEEKLY: return getWeeklyCoveringInfo(from, to); case MONTHLY: @@ -323,18 +324,25 @@ public final class DateUtil { } } - private static CoveringInfo getMilliSecondCoveringInfo(Date from, Date to, long millisInInterval) { + private static CoveringInfo getMilliSecondCoveringInfo(Date from, Date to, long millisInInterval, UpdatePeriod interval) { long diff = to.getTime() - from.getTime(); - return new CoveringInfo((int) (diff / millisInInterval), diff % millisInInterval == 0); + return new CoveringInfo((int) (diff / millisInInterval), + Stream.of(from, to).allMatch(a->interval.truncate(a).equals(a))); + // start date and end date should lie on boundaries. } + /** + * Whether the range [from,to) is coverable by intervals + * @param from from time + * @param to to time + * @param intervals intervals to check + * @return true if any of the intervals can completely cover the range + */ static boolean isCoverableBy(Date from, Date to, Set<UpdatePeriod> intervals) { - for (UpdatePeriod period : intervals) { - if (getCoveringInfo(from, to, period).isCoverable()) { - return true; - } - } - return false; + return intervals.stream().anyMatch(period->isCoverableBy(from, to, period)); + } + private static boolean isCoverableBy(Date from, Date to, UpdatePeriod period) { + return getCoveringInfo(from, to, period).isCoverable(); } public static int getTimeDiff(Date fromDate, Date toDate, UpdatePeriod updatePeriod) { http://git-wip-us.apache.org/repos/asf/lens/blob/2aaf6e0a/lens-cube/src/main/java/org/apache/lens/cube/metadata/FactPartition.java ---------------------------------------------------------------------- diff --git a/lens-cube/src/main/java/org/apache/lens/cube/metadata/FactPartition.java b/lens-cube/src/main/java/org/apache/lens/cube/metadata/FactPartition.java index 1694b80..b90b569 100644 --- a/lens-cube/src/main/java/org/apache/lens/cube/metadata/FactPartition.java +++ b/lens-cube/src/main/java/org/apache/lens/cube/metadata/FactPartition.java @@ -64,7 +64,10 @@ public class FactPartition implements Comparable<FactPartition> { this.storageTables.addAll(storageTables); } } - + public FactPartition withoutContaining() { + return new FactPartition(this.getPartCol(), this.getPartSpec(), this.getPeriod(), null, this + .getPartFormat(), this.getStorageTables()); + } public FactPartition(String partCol, TimePartition timePartition) { this(partCol, timePartition, null, null); } http://git-wip-us.apache.org/repos/asf/lens/blob/2aaf6e0a/lens-cube/src/main/java/org/apache/lens/cube/metadata/MetastoreUtil.java ---------------------------------------------------------------------- diff --git a/lens-cube/src/main/java/org/apache/lens/cube/metadata/MetastoreUtil.java b/lens-cube/src/main/java/org/apache/lens/cube/metadata/MetastoreUtil.java index 53cf8af..57d4502 100644 --- a/lens-cube/src/main/java/org/apache/lens/cube/metadata/MetastoreUtil.java +++ b/lens-cube/src/main/java/org/apache/lens/cube/metadata/MetastoreUtil.java @@ -590,4 +590,10 @@ public class MetastoreUtil { } return copy; } + + public static String getUpdatePeriodStoragePrefixKey(String factTableName , String storageName, String updatePeriod) { + return MetastoreUtil.getFactKeyPrefix(factTableName) + "." + storageName + "." + updatePeriod; + } + + } http://git-wip-us.apache.org/repos/asf/lens/blob/2aaf6e0a/lens-cube/src/main/java/org/apache/lens/cube/metadata/Storage.java ---------------------------------------------------------------------- diff --git a/lens-cube/src/main/java/org/apache/lens/cube/metadata/Storage.java b/lens-cube/src/main/java/org/apache/lens/cube/metadata/Storage.java index cd9f705..936add4 100644 --- a/lens-cube/src/main/java/org/apache/lens/cube/metadata/Storage.java +++ b/lens-cube/src/main/java/org/apache/lens/cube/metadata/Storage.java @@ -124,14 +124,18 @@ public abstract class Storage extends AbstractCubeTable implements PartitionMeta /** * Get the storage table descriptor for the given parent table. * + * @param storageTableNamePrefix Storage table prefix based on update period * @param client The metastore client * @param parent Is either Fact or Dimension table * @param crtTbl Create table info * @return Table describing the storage table * @throws HiveException */ - public Table getStorageTable(Hive client, Table parent, StorageTableDesc crtTbl) throws HiveException { - String storageTableName = MetastoreUtil.getStorageTableName(parent.getTableName(), this.getPrefix()); + public static Table getStorageTable(String storageTableNamePrefix, Hive client, Table parent, StorageTableDesc crtTbl) + throws HiveException { + // Change it to the appropriate storage table name. + String storageTableName = MetastoreUtil + .getStorageTableName(parent.getTableName(), Storage.getPrefix(storageTableNamePrefix)); Table tbl = client.getTable(storageTableName, false); if (tbl == null) { tbl = client.newTable(storageTableName); @@ -235,21 +239,6 @@ public abstract class Storage extends AbstractCubeTable implements PartitionMeta } /** - * Add single partition to storage. Just calls #addPartitions. - * @param client - * @param addPartitionDesc - * @param latestInfo - * @throws HiveException - */ - public List<Partition> addPartition(Hive client, StoragePartitionDesc addPartitionDesc, LatestInfo latestInfo) - throws HiveException { - Map<Map<String, String>, LatestInfo> latestInfos = Maps.newHashMap(); - latestInfos.put(addPartitionDesc.getNonTimePartSpec(), latestInfo); - return addPartitions(client, addPartitionDesc.getCubeTableName(), addPartitionDesc.getUpdatePeriod(), - Collections.singletonList(addPartitionDesc), latestInfos); - } - - /** * Add given partitions in the underlying hive table and update latest partition links * * @param client hive client instance @@ -262,12 +251,11 @@ public abstract class Storage extends AbstractCubeTable implements PartitionMeta */ public List<Partition> addPartitions(Hive client, String factOrDimTable, UpdatePeriod updatePeriod, List<StoragePartitionDesc> storagePartitionDescs, - Map<Map<String, String>, LatestInfo> latestInfos) throws HiveException { + Map<Map<String, String>, LatestInfo> latestInfos, String tableName) throws HiveException { preAddPartitions(storagePartitionDescs); Map<Map<String, String>, Map<String, Integer>> latestPartIndexForPartCols = Maps.newHashMap(); boolean success = false; try { - String tableName = MetastoreUtil.getStorageTableName(factOrDimTable, this.getPrefix()); String dbName = SessionState.get().getCurrentDatabase(); AddPartitionDesc addParts = new AddPartitionDesc(dbName, tableName, true); Table storageTbl = client.getTable(dbName, tableName); @@ -383,11 +371,11 @@ public abstract class Storage extends AbstractCubeTable implements PartitionMeta * @throws InvalidOperationException * @throws HiveException */ - public void updatePartitions(Hive client, String fact, List<Partition> partitions) + public void updatePartitions(String storageTable, Hive client, String fact, List<Partition> partitions) throws InvalidOperationException, HiveException { boolean success = false; try { - client.alterPartitions(MetastoreUtil.getFactOrDimtableStorageTableName(fact, getName()), partitions, null); + client.alterPartitions(storageTable, partitions, null); success = true; } finally { if (success) { http://git-wip-us.apache.org/repos/asf/lens/blob/2aaf6e0a/lens-cube/src/main/java/org/apache/lens/cube/metadata/TimeRange.java ---------------------------------------------------------------------- diff --git a/lens-cube/src/main/java/org/apache/lens/cube/metadata/TimeRange.java b/lens-cube/src/main/java/org/apache/lens/cube/metadata/TimeRange.java index bf6cc5c..5bdbf74 100644 --- a/lens-cube/src/main/java/org/apache/lens/cube/metadata/TimeRange.java +++ b/lens-cube/src/main/java/org/apache/lens/cube/metadata/TimeRange.java @@ -22,6 +22,7 @@ import static org.apache.lens.cube.metadata.DateUtil.ABSDATE_PARSER; import java.util.Calendar; import java.util.Date; +import java.util.Set; import java.util.TreeSet; import org.apache.lens.cube.error.LensCubeErrorCode; @@ -32,6 +33,7 @@ import org.apache.hadoop.hive.ql.parse.ASTNode; import org.codehaus.jackson.annotate.JsonIgnoreProperties; +import lombok.Builder; import lombok.Data; import lombok.Getter; @@ -48,10 +50,24 @@ public class TimeRange { private ASTNode parent; private int childIndex; - public boolean isCoverableBy(TreeSet<UpdatePeriod> updatePeriods) { + public boolean isCoverableBy(Set<UpdatePeriod> updatePeriods) { return DateUtil.isCoverableBy(fromDate, toDate, updatePeriods); } + /** + * Truncate time range using the update period. + * The lower value of the truncated time range is the smallest date value equal to or larger than original + * time range's lower value which lies at the update period's boundary. Similarly for higher value. + * @param updatePeriod Update period to truncate time range with + * @return truncated time range + * @throws LensException If the truncated time range is invalid. + */ + public TimeRange truncate(UpdatePeriod updatePeriod) throws LensException { + TimeRange timeRange = new TimeRangeBuilder().partitionColumn(partitionColumn) + .fromDate(updatePeriod.getCeilDate(fromDate)).toDate(updatePeriod.getFloorDate(toDate)).build(); + timeRange.validate(); + return timeRange; + } public static class TimeRangeBuilder { private final TimeRange range; http://git-wip-us.apache.org/repos/asf/lens/blob/2aaf6e0a/lens-cube/src/main/java/org/apache/lens/cube/parse/AbridgedTimeRangeWriter.java ---------------------------------------------------------------------- diff --git a/lens-cube/src/main/java/org/apache/lens/cube/parse/AbridgedTimeRangeWriter.java b/lens-cube/src/main/java/org/apache/lens/cube/parse/AbridgedTimeRangeWriter.java index 8681e90..3916a48 100644 --- a/lens-cube/src/main/java/org/apache/lens/cube/parse/AbridgedTimeRangeWriter.java +++ b/lens-cube/src/main/java/org/apache/lens/cube/parse/AbridgedTimeRangeWriter.java @@ -19,7 +19,12 @@ package org.apache.lens.cube.parse; +import static com.google.common.collect.Sets.newHashSet; +import static java.util.Optional.ofNullable; +import static java.util.stream.Collectors.toMap; + import java.util.*; +import java.util.stream.Collectors; import org.apache.lens.cube.metadata.FactPartition; import org.apache.lens.server.api.error.LensException; @@ -33,14 +38,13 @@ import com.google.common.collect.Sets; * Collapses the time range filters using IN operators */ public class AbridgedTimeRangeWriter implements TimeRangeWriter { - //TODO: minimize use of String, use StringBuilders /** * Return IN clause for the partitions selected in the cube query * - * @param cubeQueryContext - * @param tableName - * @param parts + * @param cubeQueryContext cube query context + * @param tableName table name + * @param parts partitions * @return * @throws LensException */ @@ -80,7 +84,7 @@ public class AbridgedTimeRangeWriter implements TimeRangeWriter { for (FactPartition factPartition : parts) { String filter = TimeRangeUtils.getTimeRangePartitionFilter(factPartition, cubeQueryContext, tableName); if (filter.contains("AND")) { - allTimeRangeFilters.add(new StringBuilder("(").append(filter).append(")").toString()); + allTimeRangeFilters.add("(" + filter + ")"); } else { extractColumnAndCondition(filter, partFilterMap); } @@ -89,7 +93,7 @@ public class AbridgedTimeRangeWriter implements TimeRangeWriter { List<String> inClauses = new ArrayList<String>(partFilterMap.size()); for (String column : partFilterMap.keySet()) { String clause = - new StringBuilder("(").append(StringUtils.join(partFilterMap.get(column), ",")).append(")").toString(); + "(" + StringUtils.join(partFilterMap.get(column), ",") + ")"; inClauses.add(column + " IN " + clause); } @@ -120,29 +124,17 @@ public class AbridgedTimeRangeWriter implements TimeRangeWriter { private Map<Set<FactPartition>, Set<FactPartition>> groupPartitions(Collection<FactPartition> parts) { Map<FactPartition, Set<FactPartition>> partitionSetMap = new HashMap<FactPartition, Set<FactPartition>>(); for (FactPartition part : parts) { - FactPartition key = part.getContainingPart(); - FactPartition part2 = new FactPartition(part.getPartCol(), part.getPartSpec(), part.getPeriod(), null, part - .getPartFormat(), part.getStorageTables()); - if (partitionSetMap.get(key) == null) { - partitionSetMap.put(key, Sets.<FactPartition>newTreeSet()); - } - partitionSetMap.get(key).add(part2); + partitionSetMap.computeIfAbsent(part.getContainingPart(), k -> Sets.newTreeSet()).add(part.withoutContaining()); } Map<Set<FactPartition>, Set<FactPartition>> setSetOppositeMap = Maps.newHashMap(); for (Map.Entry<FactPartition, Set<FactPartition>> entry : partitionSetMap.entrySet()) { - if (setSetOppositeMap.get(entry.getValue()) == null) { - setSetOppositeMap.put(entry.getValue(), Sets.<FactPartition>newTreeSet()); - } + setSetOppositeMap.computeIfAbsent(entry.getValue(), k -> Sets.newTreeSet()); if (entry.getKey() != null) { setSetOppositeMap.get(entry.getValue()).add(entry.getKey()); } } - - Map<Set<FactPartition>, Set<FactPartition>> setSetMap = Maps.newHashMap(); - for (Map.Entry<Set<FactPartition>, Set<FactPartition>> entry : setSetOppositeMap.entrySet()) { - setSetMap.put(entry.getValue(), entry.getKey()); - } - return setSetMap; + // inverse again + return setSetOppositeMap.entrySet().stream().collect(toMap(Map.Entry::getValue, Map.Entry::getKey)); } // This takes the output of filter generated by TimeRangeUtils.getTimeRangePartitionFilter @@ -156,13 +148,6 @@ public class AbridgedTimeRangeWriter implements TimeRangeWriter { String column = subTokens[0].trim(); String filterValue = subTokens[1].trim(); - List<String> filterValues = partFilterMap.get(column); - - if (filterValues == null) { - filterValues = new ArrayList<String>(); - partFilterMap.put(column, filterValues); - } - - filterValues.add(filterValue); + partFilterMap.computeIfAbsent(column, k -> new ArrayList<>()).add(filterValue); } } http://git-wip-us.apache.org/repos/asf/lens/blob/2aaf6e0a/lens-cube/src/main/java/org/apache/lens/cube/parse/BetweenTimeRangeWriter.java ---------------------------------------------------------------------- diff --git a/lens-cube/src/main/java/org/apache/lens/cube/parse/BetweenTimeRangeWriter.java b/lens-cube/src/main/java/org/apache/lens/cube/parse/BetweenTimeRangeWriter.java index c8b8129..bd77498 100644 --- a/lens-cube/src/main/java/org/apache/lens/cube/parse/BetweenTimeRangeWriter.java +++ b/lens-cube/src/main/java/org/apache/lens/cube/parse/BetweenTimeRangeWriter.java @@ -92,7 +92,7 @@ public class BetweenTimeRangeWriter implements TimeRangeWriter { } String partCol = start.getPartCol(); - if (cubeQueryContext != null && !cubeQueryContext.shouldReplaceTimeDimWithPart()) { + if (!cubeQueryContext.shouldReplaceTimeDimWithPart()) { partCol = cubeQueryContext.getTimeDimOfPartitionColumn(partCol); } http://git-wip-us.apache.org/repos/asf/lens/blob/2aaf6e0a/lens-cube/src/main/java/org/apache/lens/cube/parse/CandidateCoveringSetsResolver.java ---------------------------------------------------------------------- diff --git a/lens-cube/src/main/java/org/apache/lens/cube/parse/CandidateCoveringSetsResolver.java b/lens-cube/src/main/java/org/apache/lens/cube/parse/CandidateCoveringSetsResolver.java index c36ce70..0b7d400 100644 --- a/lens-cube/src/main/java/org/apache/lens/cube/parse/CandidateCoveringSetsResolver.java +++ b/lens-cube/src/main/java/org/apache/lens/cube/parse/CandidateCoveringSetsResolver.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -34,11 +34,6 @@ import lombok.extern.slf4j.Slf4j; @Slf4j public class CandidateCoveringSetsResolver implements ContextRewriter { - private final Configuration conf; - public CandidateCoveringSetsResolver(Configuration conf) { - this.conf = conf; - } - @Override public void rewriteContext(CubeQueryContext cubeql) throws LensException { @@ -99,8 +94,7 @@ public class CandidateCoveringSetsResolver implements ContextRewriter { private void updateFinalCandidates(List<List<Candidate>> joinCandidates, CubeQueryContext cubeql) { List<Candidate> finalCandidates = new ArrayList<>(); - for (Iterator<List<Candidate>> itr = joinCandidates.iterator(); itr.hasNext();) { - List<Candidate> joinCandidate = itr.next(); + for (List<Candidate> joinCandidate : joinCandidates) { if (joinCandidate.size() == 1) { finalCandidates.add(joinCandidate.iterator().next()); } else { @@ -112,8 +106,7 @@ public class CandidateCoveringSetsResolver implements ContextRewriter { } private boolean isCandidateCoveringTimeRanges(UnionCandidate uc, List<TimeRange> ranges) { - for (Iterator<TimeRange> itr = ranges.iterator(); itr.hasNext();) { - TimeRange range = itr.next(); + for (TimeRange range : ranges) { if (!CandidateUtil.isTimeRangeCovered(uc.getChildren(), range.getFromDate(), range.getToDate())) { return false; } @@ -134,7 +127,7 @@ public class CandidateCoveringSetsResolver implements ContextRewriter { private List<Candidate> resolveTimeRangeCoveringFactSet(CubeQueryContext cubeql, Set<QueriedPhraseContext> queriedMsrs, List<QueriedPhraseContext> qpcList) throws LensException { // All Candidates - List<Candidate> allCandidates = new ArrayList<Candidate>(cubeql.getCandidates()); + List<Candidate> allCandidates = new ArrayList<>(cubeql.getCandidates()); // Partially valid candidates List<Candidate> allCandidatesPartiallyValid = new ArrayList<>(); List<Candidate> candidateSet = new ArrayList<>(); @@ -144,7 +137,6 @@ public class CandidateCoveringSetsResolver implements ContextRewriter { StorageCandidate sc = (StorageCandidate) cand; if (CandidateUtil.isValidForTimeRanges(sc, cubeql.getTimeRanges())) { candidateSet.add(CandidateUtil.cloneStorageCandidate(sc)); - continue; } else if (CandidateUtil.isPartiallyValidForTimeRanges(sc, cubeql.getTimeRanges())) { allCandidatesPartiallyValid.add(CandidateUtil.cloneStorageCandidate(sc)); } else { @@ -157,9 +149,9 @@ public class CandidateCoveringSetsResolver implements ContextRewriter { } // Get all covering fact sets List<UnionCandidate> unionCoveringSet = - getCombinations(new ArrayList<Candidate>(allCandidatesPartiallyValid), cubeql); + getCombinations(new ArrayList<>(allCandidatesPartiallyValid), cubeql); // Sort the Collection based on no of elements - Collections.sort(unionCoveringSet, new CandidateUtil.ChildrenSizeBasedCandidateComparator<UnionCandidate>()); + unionCoveringSet.sort(new CandidateUtil.ChildrenSizeBasedCandidateComparator<UnionCandidate>()); // prune non covering sets pruneUnionCandidatesNotCoveringAllRanges(unionCoveringSet, cubeql); // prune candidate set which doesn't contain any common measure i @@ -218,14 +210,13 @@ public class CandidateCoveringSetsResolver implements ContextRewriter { } } - public List<UnionCandidate> getCombinations(final List<Candidate> candidates, CubeQueryContext cubeql) { - int aliasCounter = 0; - List<UnionCandidate> combinations = new LinkedList<UnionCandidate>(); + private List<UnionCandidate> getCombinations(final List<Candidate> candidates, CubeQueryContext cubeql) { + List<UnionCandidate> combinations = new LinkedList<>(); int size = candidates.size(); int threshold = Double.valueOf(Math.pow(2, size)).intValue() - 1; for (int i = 1; i <= threshold; ++i) { - LinkedList<Candidate> individualCombinationList = new LinkedList<Candidate>(); + LinkedList<Candidate> individualCombinationList = new LinkedList<>(); int count = size - 1; int clonedI = i; while (count >= 0) { @@ -249,7 +240,7 @@ public class CandidateCoveringSetsResolver implements ContextRewriter { boolean evaluable = false; Candidate uc = i.next(); for (QueriedPhraseContext msr : msrs) { - evaluable = isMeasureAnswerablebyUnionCandidate(msr, uc, cubeql) ? true : false; + evaluable = isMeasureAnswerablebyUnionCandidate(msr, uc, cubeql); if (!evaluable) { break; } @@ -265,18 +256,18 @@ public class CandidateCoveringSetsResolver implements ContextRewriter { // Sets that contain all measures or no measures are removed from iteration. // find other facts for (Iterator<Candidate> i = ucSet.iterator(); i.hasNext();) { - Candidate uc = i.next(); + Candidate candidate = i.next(); i.remove(); // find the remaining measures in other facts if (i.hasNext()) { Set<QueriedPhraseContext> remainingMsrs = new HashSet<>(msrs); - Set<QueriedPhraseContext> coveredMsrs = CandidateUtil.coveredMeasures(uc, msrs, cubeql); + Set<QueriedPhraseContext> coveredMsrs = CandidateUtil.coveredMeasures(candidate, msrs, cubeql); remainingMsrs.removeAll(coveredMsrs); List<List<Candidate>> coveringSets = resolveJoinCandidates(ucSet, remainingMsrs, cubeql); if (!coveringSets.isEmpty()) { for (List<Candidate> candSet : coveringSets) { - candSet.add(uc); + candSet.add(candidate); msrCoveringSets.add(candSet); } } else { http://git-wip-us.apache.org/repos/asf/lens/blob/2aaf6e0a/lens-cube/src/main/java/org/apache/lens/cube/parse/CandidateTable.java ---------------------------------------------------------------------- diff --git a/lens-cube/src/main/java/org/apache/lens/cube/parse/CandidateTable.java b/lens-cube/src/main/java/org/apache/lens/cube/parse/CandidateTable.java index 5863c1c..168dcc6 100644 --- a/lens-cube/src/main/java/org/apache/lens/cube/parse/CandidateTable.java +++ b/lens-cube/src/main/java/org/apache/lens/cube/parse/CandidateTable.java @@ -73,6 +73,5 @@ public interface CandidateTable { /** * Get partitions queried */ - //TODO union: Name changed Set<?> getParticipatingPartitions(); } http://git-wip-us.apache.org/repos/asf/lens/blob/2aaf6e0a/lens-cube/src/main/java/org/apache/lens/cube/parse/CandidateTablePruneCause.java ---------------------------------------------------------------------- diff --git a/lens-cube/src/main/java/org/apache/lens/cube/parse/CandidateTablePruneCause.java b/lens-cube/src/main/java/org/apache/lens/cube/parse/CandidateTablePruneCause.java index c7f2047..6cb18e6 100644 --- a/lens-cube/src/main/java/org/apache/lens/cube/parse/CandidateTablePruneCause.java +++ b/lens-cube/src/main/java/org/apache/lens/cube/parse/CandidateTablePruneCause.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -18,9 +18,13 @@ */ package org.apache.lens.cube.parse; +import static com.google.common.collect.Lists.newArrayList; +import static com.google.common.collect.Lists.partition; +import static java.util.stream.Collectors.toSet; import static org.apache.lens.cube.parse.CandidateTablePruneCause.CandidateTablePruneCode.*; import java.util.*; +import java.util.stream.Stream; import org.apache.lens.cube.metadata.TimeRange; @@ -52,12 +56,8 @@ public class CandidateTablePruneCause { "present in any table", }; } else { - List<List<String>> columnSets = new ArrayList<List<String>>(); - for (CandidateTablePruneCause cause : causes) { - columnSets.add(cause.getMissingColumns()); - } return new String[]{ - "Column Sets: " + columnSets, + "Column Sets: " + causes.stream().map(CandidateTablePruneCause::getMissingColumns).collect(toSet()), "queriable together", }; } @@ -87,12 +87,9 @@ public class CandidateTablePruneCause { STORAGE_NOT_AVAILABLE_IN_RANGE("No storages available for all of these time ranges: %s") { @Override Object[] getFormatPlaceholders(Set<CandidateTablePruneCause> causes) { - Set<TimeRange> allRanges = Sets.newHashSet(); - for (CandidateTablePruneCause cause : causes) { - allRanges.addAll(cause.getInvalidRanges()); - } return new Object[]{ - allRanges.toString(), + causes.stream().map(CandidateTablePruneCause::getInvalidRanges).flatMap(Collection::stream) + .collect(toSet()).toString(), }; } }, @@ -108,11 +105,10 @@ public class CandidateTablePruneCause { // expression is not evaluable in the candidate EXPRESSION_NOT_EVALUABLE("%s expressions not evaluable") { Object[] getFormatPlaceholders(Set<CandidateTablePruneCause> causes) { - List<String> columns = new ArrayList<String>(); - for (CandidateTablePruneCause cause : causes) { - columns.addAll(cause.getMissingExpressions()); - } - return new String[]{columns.toString()}; + return new String[]{ + causes.stream().map(CandidateTablePruneCause::getMissingExpressions).flatMap(Collection::stream) + .collect(toSet()).toString() + }; } }, // column not valid in cube table. Commented the below line as it's not being used in master. @@ -126,12 +122,8 @@ public class CandidateTablePruneCause { "present in any table", }; } else { - List<List<String>> columnSets = new ArrayList<List<String>>(); - for (CandidateTablePruneCause cause : causes) { - columnSets.add(cause.getMissingColumns()); - } return new String[]{ - "Column Sets: " + columnSets, + "Column Sets: " + causes.stream().map(CandidateTablePruneCause::getMissingColumns).collect(toSet()), "queriable together", }; } @@ -146,17 +138,13 @@ public class CandidateTablePruneCause { TIMEDIM_NOT_SUPPORTED("Queried data not available for time dimensions: %s") { @Override Object[] getFormatPlaceholders(Set<CandidateTablePruneCause> causes) { - Set<String> dims = Sets.newHashSet(); - for(CandidateTablePruneCause cause: causes){ - dims.addAll(cause.getUnsupportedTimeDims()); - } return new Object[]{ - dims.toString(), + causes.stream().map(CandidateTablePruneCause::getUnsupportedTimeDims).flatMap(Collection::stream) + .collect(toSet()).toString(), }; } }, - //Commented as its not used anymore. - //NO_FACT_UPDATE_PERIODS_FOR_GIVEN_RANGE("No fact update periods for given range"), + NO_FACT_UPDATE_PERIODS_FOR_GIVEN_RANGE("No fact update periods for given range"), // no candidate update periods, update period cause will have why each // update period is not a candidate @@ -164,44 +152,37 @@ public class CandidateTablePruneCause { NO_COLUMN_PART_OF_A_JOIN_PATH("No column part of a join path. Join columns: [%s]") { Object[] getFormatPlaceholders(Set<CandidateTablePruneCause> causes) { - List<String> columns = new ArrayList<String>(); - for (CandidateTablePruneCause cause : causes) { - columns.addAll(cause.getJoinColumns()); - } - return new String[]{columns.toString()}; + return new String[]{ + causes.stream().map(CandidateTablePruneCause::getJoinColumns).flatMap(Collection::stream) + .collect(toSet()).toString() + }; } }, // cube table is an aggregated fact and queried column is not under default // aggregate MISSING_DEFAULT_AGGREGATE("Columns: [%s] are missing default aggregate") { Object[] getFormatPlaceholders(Set<CandidateTablePruneCause> causes) { - List<String> columns = new ArrayList<String>(); - for (CandidateTablePruneCause cause : causes) { - columns.addAll(cause.getColumnsMissingDefaultAggregate()); - } - return new String[]{columns.toString()}; + return new String[]{ + causes.stream().map(CandidateTablePruneCause::getColumnsMissingDefaultAggregate).flatMap(Collection::stream) + .collect(toSet()).toString() + }; } }, // missing partitions for cube table MISSING_PARTITIONS("Missing partitions for the cube table: %s") { Object[] getFormatPlaceholders(Set<CandidateTablePruneCause> causes) { - Set<Set<String>> missingPartitions = Sets.newHashSet(); - for (CandidateTablePruneCause cause : causes) { - missingPartitions.add(cause.getMissingPartitions()); - } - return new String[]{missingPartitions.toString()}; + return new String[]{ + causes.stream().map(CandidateTablePruneCause::getMissingPartitions).collect(toSet()).toString() + }; } }, // incomplete data in the fact - INCOMPLETE_PARTITION("Data is incomplete. Details : %s") { + INCOMPLETE_PARTITION("Data for the requested metrics is only partially complete. Partially complete metrics are:" + + " %s. Please try again later or rerun after removing incomplete metrics") { Object[] getFormatPlaceholders(Set<CandidateTablePruneCause> causes) { - Set<Map<String, Map<String, Float>>> incompletePartitions = Sets.newHashSet(); - for (CandidateTablePruneCause cause : causes) { - if (cause.getIncompletePartitions() != null) { - incompletePartitions.add(cause.getIncompletePartitions()); - } - } - return new String[]{incompletePartitions.toString()}; + return new String[]{ + causes.stream().map(CandidateTablePruneCause::getIncompletePartitions).collect(toSet()).toString() + }; } }; @@ -227,8 +208,9 @@ public class CandidateTablePruneCause { public enum SkipUpdatePeriodCode { // invalid update period INVALID, - // Query max interval is more than update period - QUERY_INTERVAL_BIGGER + //this update period is greater than the Query max interval as provided by user with lens.cube.query.max.interval + UPDATE_PERIOD_BIGGER_THAN_MAX, + QUERY_INTERVAL_SMALLER_THAN_UPDATE_PERIOD } // Used for Test cases only. @@ -244,11 +226,11 @@ public class CandidateTablePruneCause { // populated only incase of missing update periods cause private List<String> missingUpdatePeriods; // populated in case of missing columns - private List<String> missingColumns; + private Set<String> missingColumns; // populated in case of expressions not evaluable private List<String> missingExpressions; // populated in case of no column part of a join path - private List<String> joinColumns; + private Collection<String> joinColumns; // the columns that are missing default aggregate. only set in case of MISSING_DEFAULT_AGGREGATE private List<String> columnsMissingDefaultAggregate; // if a time dim is not supported by the fact. Would be set if and only if @@ -268,54 +250,46 @@ public class CandidateTablePruneCause { } // Different static constructors for different causes. - public static CandidateTablePruneCause storageNotAvailableInRange(List<TimeRange> ranges) { + static CandidateTablePruneCause storageNotAvailableInRange(List<TimeRange> ranges) { CandidateTablePruneCause cause = new CandidateTablePruneCause(STORAGE_NOT_AVAILABLE_IN_RANGE); cause.invalidRanges = ranges; return cause; } - public static CandidateTablePruneCause timeDimNotSupported(Set<String> unsupportedTimeDims) { + static CandidateTablePruneCause timeDimNotSupported(Set<String> unsupportedTimeDims) { CandidateTablePruneCause cause = new CandidateTablePruneCause(TIMEDIM_NOT_SUPPORTED); cause.unsupportedTimeDims = unsupportedTimeDims; return cause; } - public static CandidateTablePruneCause columnNotFound(CandidateTablePruneCode pruneCode, - Collection<String>... missingColumns) { - List<String> colList = new ArrayList<String>(); - for (Collection<String> missing : missingColumns) { - colList.addAll(missing); - } - CandidateTablePruneCause cause = new CandidateTablePruneCause(pruneCode); - cause.setMissingColumns(colList); + static CandidateTablePruneCause columnNotFound(Collection<String> missingColumns) { + CandidateTablePruneCause cause = new CandidateTablePruneCause(COLUMN_NOT_FOUND); + cause.setMissingColumns(Sets.newHashSet(missingColumns)); + return cause; + } + static CandidateTablePruneCause denormColumnNotFound(Collection<String> missingColumns) { + CandidateTablePruneCause cause = new CandidateTablePruneCause(DENORM_COLUMN_NOT_FOUND); + cause.setMissingColumns(Sets.newHashSet(missingColumns)); return cause; } - public static CandidateTablePruneCause columnNotFound(CandidateTablePruneCode pruneCode, String... columns) { - List<String> colList = new ArrayList<String>(); - for (String column : columns) { - colList.add(column); - } - return columnNotFound(pruneCode, colList); + static CandidateTablePruneCause columnNotFound(String... columns) { + return columnNotFound(newArrayList(columns)); } - public static CandidateTablePruneCause expressionNotEvaluable(String... exprs) { - List<String> colList = new ArrayList<String>(); - for (String column : exprs) { - colList.add(column); - } + static CandidateTablePruneCause expressionNotEvaluable(String... exprs) { CandidateTablePruneCause cause = new CandidateTablePruneCause(EXPRESSION_NOT_EVALUABLE); - cause.setMissingExpressions(colList); + cause.setMissingExpressions(newArrayList(exprs)); return cause; } - public static CandidateTablePruneCause missingPartitions(Set<String> nonExistingParts) { + static CandidateTablePruneCause missingPartitions(Set<String> nonExistingParts) { CandidateTablePruneCause cause = new CandidateTablePruneCause(MISSING_PARTITIONS); cause.setMissingPartitions(nonExistingParts); return cause; } - public static CandidateTablePruneCause incompletePartitions(Map<String, Map<String, Float>> incompleteParts) { + static CandidateTablePruneCause incompletePartitions(Map<String, Map<String, Float>> incompleteParts) { CandidateTablePruneCause cause = new CandidateTablePruneCause(INCOMPLETE_PARTITION); //incompleteParts may be null when partial data is allowed. cause.setIncompletePartitions(incompleteParts); @@ -325,17 +299,13 @@ public class CandidateTablePruneCause { public static CandidateTablePruneCause noColumnPartOfAJoinPath(final Collection<String> colSet) { CandidateTablePruneCause cause = new CandidateTablePruneCause(NO_COLUMN_PART_OF_A_JOIN_PATH); - cause.setJoinColumns(new ArrayList<String>() { - { - addAll(colSet); - } - }); + cause.setJoinColumns(colSet); return cause; } - public static CandidateTablePruneCause missingDefaultAggregate(String... names) { + static CandidateTablePruneCause missingDefaultAggregate(String... names) { CandidateTablePruneCause cause = new CandidateTablePruneCause(MISSING_DEFAULT_AGGREGATE); - cause.setColumnsMissingDefaultAggregate(Lists.newArrayList(names)); + cause.setColumnsMissingDefaultAggregate(newArrayList(names)); return cause; } @@ -344,8 +314,8 @@ public class CandidateTablePruneCause { * @param dimStoragePruningCauses * @return */ - public static CandidateTablePruneCause noCandidateStoragesForDimtable( - Map<String, CandidateTablePruneCode> dimStoragePruningCauses) { + static CandidateTablePruneCause noCandidateStoragesForDimtable( + Map<String, CandidateTablePruneCode> dimStoragePruningCauses) { CandidateTablePruneCause cause = new CandidateTablePruneCause(NO_CANDIDATE_STORAGES); cause.setDimStoragePruningCauses(new HashMap<String, CandidateTablePruneCode>()); for (Map.Entry<String, CandidateTablePruneCode> entry : dimStoragePruningCauses.entrySet()) { @@ -361,6 +331,9 @@ public class CandidateTablePruneCause { * @param missingPartitionColumns * @return */ + public static CandidateTablePruneCause partitionColumnsMissing(final String... missingPartitionColumns) { + return partitionColumnsMissing(Lists.newArrayList(missingPartitionColumns)); + } public static CandidateTablePruneCause partitionColumnsMissing(final List<String> missingPartitionColumns) { CandidateTablePruneCause cause = new CandidateTablePruneCause(PART_COL_DOES_NOT_EXIST); cause.nonExistantPartCols = missingPartitionColumns; @@ -372,7 +345,7 @@ public class CandidateTablePruneCause { * @param updatePeriodRejectionCause * @return */ - public static CandidateTablePruneCause updatePeriodsRejected( + static CandidateTablePruneCause updatePeriodsRejected( final Map<String, SkipUpdatePeriodCode> updatePeriodRejectionCause) { CandidateTablePruneCause cause = new CandidateTablePruneCause(NO_CANDIDATE_UPDATE_PERIODS); cause.updatePeriodRejectionCause = updatePeriodRejectionCause;
