This is an automated email from the ASF dual-hosted git repository. ajantha pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push: new d3204f7 [CARBONDATA-3718] Support SegmentLevel MinMax for better Pruning and less driver memory usage for cache d3204f7 is described below commit d3204f7b47ae2a3e1c77748774591a58c3e00757 Author: Indhumathi27 <indhumathi...@gmail.com> AuthorDate: Thu Jan 9 15:41:44 2020 +0530 [CARBONDATA-3718] Support SegmentLevel MinMax for better Pruning and less driver memory usage for cache Why is this PR needed? In Cloud scenarios, index is too big to store in SparkDriver, since VM may not have so much memory. Currently in Carbon, we will load all indexes to cache for first time query. Since Carbon LRU Cache does not support time-based expiration, indexes will be removed from cache based on LeastRecentlyUsed mechanism, when the carbon lru cache is full. In some scenarios, where user's table has more segments and if user queries only very few segments often, we no need to load all indexes to cache. For filter queries, if we prune and load only matched segments to cache, then driver's memory will be saved. What changes were proposed in this PR? Added all block minmax with column-id and sort_column info to segment metadata file and prune segment based on segment files and load index only for matched segment. Added a configurable carbon property 'carbon.load.all.index.to.cache' to allow user to load all indexes to cache if needed. BY default, value will be true, which loads all indexes to cache. Does this PR introduce any user interface change? Yes. Is any new testcase added? Yes This closes #3584 --- .../core/constants/CarbonCommonConstants.java | 10 ++ .../apache/carbondata/core/datamap/Segment.java | 14 ++ .../carbondata/core/datamap/TableDataMap.java | 13 +- .../core/datamap/dev/DataMapFactory.java | 7 +- .../core/indexstore/SegmentBlockIndexInfo.java | 54 ++++++ .../blockletindex/BlockletDataMapFactory.java | 196 +++++++++++++++++++-- .../carbondata/core/metadata/SegmentFileStore.java | 87 ++++++++- .../metadata/schema/table/column/ColumnSchema.java | 19 +- .../TableStatusReadCommittedScope.java | 1 + .../core/segmentmeta/BlockColumnMetaDataInfo.java | 64 +++++++ .../segmentmeta/SegmentColumnMetaDataInfo.java | 79 +++++++++ .../core/segmentmeta/SegmentMetaDataInfo.java | 37 ++++ .../core/segmentmeta/SegmentMetaDataInfoStats.java | 167 ++++++++++++++++++ docs/configuration-parameters.md | 1 + .../spark/load/DataLoadProcessBuilderOnSpark.scala | 39 ++-- .../spark/rdd/CarbonDataRDDFactory.scala | 88 ++++++--- .../carbondata/spark/rdd/CarbonIUDMergerRDD.scala | 8 +- .../carbondata/spark/rdd/CarbonMergerRDD.scala | 46 ++--- .../spark/rdd/CarbonTableCompactor.scala | 42 ++++- .../rdd/CompactionTaskCompletionListener.scala | 77 ++++++++ .../spark/rdd/InsertTaskCompletionListener.scala | 19 +- .../spark/rdd/NewCarbonDataLoadRDD.scala | 24 ++- .../carbondata/spark/rdd/UpdateDataLoad.scala | 11 +- .../CarbonTaskCompletionListener.scala | 5 + .../management/CarbonInsertFromStageCommand.scala | 15 +- .../command/management/CommonLoadUtils.scala | 79 ++++++++- .../secondaryindex/util/SecondaryIndexUtil.scala | 1 + .../allqueries/TestPruneUsingSegmentMinMax.scala | 121 +++++++++++++ .../store/writer/AbstractFactDataWriter.java | 17 ++ 29 files changed, 1215 insertions(+), 126 deletions(-) diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java index baf5a37..fbfacec 100644 --- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java +++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java @@ -2421,5 +2421,15 @@ public final class CarbonCommonConstants { */ public static final int INDEX_CACHE_EXPIRATION_TIME_IN_SECONDS_DEFAULT = Integer.MAX_VALUE; + /** + * Load all indexes to carbon LRU cache + */ + public static final String CARBON_LOAD_ALL_SEGMENT_INDEXES_TO_CACHE = + "carbon.load.all.segment.indexes.to.cache"; + /** + * Default value for loading cache is true + * Make this false, to load index for the matched segments from filter expression + */ + public static final String CARBON_LOAD_ALL_SEGMENT_INDEXES_TO_CACHE_DEFAULT = "true"; } diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java b/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java index 708dc1d..9849df2 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java @@ -31,6 +31,7 @@ import java.util.Set; import org.apache.carbondata.core.metadata.schema.table.Writable; import org.apache.carbondata.core.mutate.UpdateVO; import org.apache.carbondata.core.readcommitter.ReadCommittedScope; +import org.apache.carbondata.core.segmentmeta.SegmentMetaDataInfo; import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; import org.apache.carbondata.core.statusmanager.SegmentRefreshInfo; import org.apache.carbondata.core.statusmanager.SegmentStatusManager; @@ -85,6 +86,11 @@ public class Segment implements Serializable, Writable { */ private transient Map<String, String> options; + /** + * Segment metadata info + */ + private SegmentMetaDataInfo segmentMetaDataInfo; + public Segment() { } @@ -376,4 +382,12 @@ public class Segment implements Serializable, Writable { } this.indexSize = in.readLong(); } + + public SegmentMetaDataInfo getSegmentMetaDataInfo() { + return segmentMetaDataInfo; + } + + public void setSegmentMetaDataInfo(SegmentMetaDataInfo segmentMetaDataInfo) { + this.segmentMetaDataInfo = segmentMetaDataInfo; + } } diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java index 62424fe..8d680b9 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java @@ -120,10 +120,11 @@ public final class TableDataMap extends OperationEventListener { final List<ExtendedBlocklet> blocklets = new ArrayList<>(); List<Segment> segments = getCarbonSegments(allsegments); final Map<Segment, List<DataMap>> dataMaps; - if (table.isHivePartitionTable() && filter != null && !filter.isEmpty() && partitions != null) { - dataMaps = dataMapFactory.getDataMaps(segments, partitions); + boolean isFilterPresent = filter != null && !filter.isEmpty(); + if (table.isHivePartitionTable() && isFilterPresent && partitions != null) { + dataMaps = dataMapFactory.getDataMaps(segments, partitions, filter); } else { - dataMaps = dataMapFactory.getDataMaps(segments); + dataMaps = dataMapFactory.getDataMaps(segments, filter); } if (dataMaps.isEmpty()) { @@ -133,9 +134,9 @@ public final class TableDataMap extends OperationEventListener { // for filter queries int totalFiles = 0; int datamapsCount = 0; - // In case if filter has matched partitions, then update the segments with datamap's - // segment list, as getDataMaps will return segments that matches the partition. - if (null != partitions && !partitions.isEmpty()) { + // In case if filter is present, then update the segments with datamap's segment list + // based on segment or partition pruning + if (isFilterPresent) { segments = new ArrayList<>(dataMaps.keySet()); } for (Segment segment : segments) { diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java index 6296bf8..711c495 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java @@ -26,6 +26,7 @@ import java.util.Set; import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException; import org.apache.carbondata.core.datamap.DataMapDistributable; +import org.apache.carbondata.core.datamap.DataMapFilter; import org.apache.carbondata.core.datamap.DataMapLevel; import org.apache.carbondata.core.datamap.DataMapMeta; import org.apache.carbondata.core.datamap.Segment; @@ -89,8 +90,8 @@ public abstract class DataMapFactory<T extends DataMap> { /** * Get the datamap for all segments */ - public Map<Segment, List<CoarseGrainDataMap>> getDataMaps(List<Segment> segments) - throws IOException { + public Map<Segment, List<CoarseGrainDataMap>> getDataMaps(List<Segment> segments, + DataMapFilter filter) throws IOException { Map<Segment, List<CoarseGrainDataMap>> dataMaps = new HashMap<>(); for (Segment segment : segments) { dataMaps.put(segment, (List<CoarseGrainDataMap>) this.getDataMaps(segment)); @@ -103,7 +104,7 @@ public abstract class DataMapFactory<T extends DataMap> { * matches the partition. */ public Map<Segment, List<CoarseGrainDataMap>> getDataMaps(List<Segment> segments, - List<PartitionSpec> partitionSpecs) throws IOException { + List<PartitionSpec> partitionSpecs, DataMapFilter dataMapFilter) throws IOException { Map<Segment, List<CoarseGrainDataMap>> dataMaps = new HashMap<>(); for (Segment segment : segments) { dataMaps.put(segment, (List<CoarseGrainDataMap>) this.getDataMaps(segment, partitionSpecs)); diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/SegmentBlockIndexInfo.java b/core/src/main/java/org/apache/carbondata/core/indexstore/SegmentBlockIndexInfo.java new file mode 100644 index 0000000..95bcf1f --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/SegmentBlockIndexInfo.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.indexstore; + +import java.util.Set; + +import org.apache.carbondata.core.segmentmeta.SegmentMetaDataInfo; + +/** + * Holds tableBlockUniqueIdentifiers and block level minMax values for the segment + */ +public class SegmentBlockIndexInfo { + + /** + * IndexFile's information for the segment + */ + private Set<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers; + + /** + * segment metadata info + */ + private SegmentMetaDataInfo segmentMetaDataInfo; + + public SegmentBlockIndexInfo( + Set<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers, + SegmentMetaDataInfo segmentMetaDataInfo) { + this.tableBlockIndexUniqueIdentifiers = tableBlockIndexUniqueIdentifiers; + this.segmentMetaDataInfo = segmentMetaDataInfo; + } + + public Set<TableBlockIndexUniqueIdentifier> getTableBlockIndexUniqueIdentifiers() { + return tableBlockIndexUniqueIdentifiers; + } + + public SegmentMetaDataInfo getSegmentMetaDataInfo() { + return segmentMetaDataInfo; + } + +} diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java index b0f0214..44cc6da 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java @@ -19,6 +19,7 @@ package org.apache.carbondata.core.indexstore.blockletindex; import java.io.IOException; import java.util.ArrayList; +import java.util.BitSet; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -26,11 +27,14 @@ import java.util.Map; import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; import org.apache.carbondata.core.cache.Cache; import org.apache.carbondata.core.cache.CacheProvider; import org.apache.carbondata.core.cache.CacheType; +import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datamap.DataMapDistributable; +import org.apache.carbondata.core.datamap.DataMapFilter; import org.apache.carbondata.core.datamap.DataMapMeta; import org.apache.carbondata.core.datamap.Segment; import org.apache.carbondata.core.datamap.dev.CacheableDataMap; @@ -42,6 +46,7 @@ import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMapFactor import org.apache.carbondata.core.datamap.dev.expr.DataMapDistributableWrapper; import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper; import org.apache.carbondata.core.datastore.block.SegmentProperties; +import org.apache.carbondata.core.datastore.block.SegmentPropertiesAndSchemaHolder; import org.apache.carbondata.core.datastore.filesystem.CarbonFile; import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.features.TableOperation; @@ -50,14 +55,21 @@ import org.apache.carbondata.core.indexstore.BlockletDataMapIndexWrapper; import org.apache.carbondata.core.indexstore.BlockletDetailsFetcher; import org.apache.carbondata.core.indexstore.ExtendedBlocklet; import org.apache.carbondata.core.indexstore.PartitionSpec; +import org.apache.carbondata.core.indexstore.SegmentBlockIndexInfo; import org.apache.carbondata.core.indexstore.SegmentPropertiesFetcher; import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier; import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifierWrapper; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; import org.apache.carbondata.core.metadata.schema.table.DataMapSchema; +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; +import org.apache.carbondata.core.scan.filter.FilterUtil; +import org.apache.carbondata.core.scan.filter.executer.FilterExecuter; import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; +import org.apache.carbondata.core.segmentmeta.SegmentColumnMetaDataInfo; +import org.apache.carbondata.core.segmentmeta.SegmentMetaDataInfo; import org.apache.carbondata.core.util.BlockletDataMapUtil; +import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.carbondata.events.Event; @@ -79,7 +91,7 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory private AbsoluteTableIdentifier identifier; // segmentId -> list of index file - private Map<String, Set<TableBlockIndexUniqueIdentifier>> segmentMap = new ConcurrentHashMap<>(); + private Map<String, SegmentBlockIndexInfo> segmentMap = new ConcurrentHashMap<>(); private Cache<TableBlockIndexUniqueIdentifierWrapper, BlockletDataMapIndexWrapper> cache; @@ -122,16 +134,16 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory /** * Get the datamap for all segments */ - public Map<Segment, List<CoarseGrainDataMap>> getDataMaps(List<Segment> segments) - throws IOException { - return getDataMaps(segments, null); + public Map<Segment, List<CoarseGrainDataMap>> getDataMaps(List<Segment> segments, + DataMapFilter filter) throws IOException { + return getDataMaps(segments, null, filter); } /** * Get the datamap for all segments */ public Map<Segment, List<CoarseGrainDataMap>> getDataMaps(List<Segment> segments, - List<PartitionSpec> partitionsToPrune) throws IOException { + List<PartitionSpec> partitionsToPrune, DataMapFilter filter) throws IOException { List<TableBlockIndexUniqueIdentifierWrapper> tableBlockIndexUniqueIdentifierWrappers = new ArrayList<>(); Map<Segment, List<CoarseGrainDataMap>> dataMaps = new HashMap<>(); @@ -140,9 +152,28 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory segmentMap.put(segment.getSegmentNo(), segment); Set<TableBlockIndexUniqueIdentifier> identifiers = getTableBlockIndexUniqueIdentifiers(segment); - // get tableBlockIndexUniqueIdentifierWrappers from segment file info - getTableBlockUniqueIdentifierWrappers(partitionsToPrune, - tableBlockIndexUniqueIdentifierWrappers, identifiers); + if (null != partitionsToPrune) { + // get tableBlockIndexUniqueIdentifierWrappers from segment file info + getTableBlockUniqueIdentifierWrappers(partitionsToPrune, + tableBlockIndexUniqueIdentifierWrappers, identifiers); + } else { + SegmentMetaDataInfo segmentMetaDataInfo = segment.getSegmentMetaDataInfo(); + boolean isLoadAllIndex = Boolean.parseBoolean(CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.CARBON_LOAD_ALL_SEGMENT_INDEXES_TO_CACHE, + CarbonCommonConstants.CARBON_LOAD_ALL_SEGMENT_INDEXES_TO_CACHE_DEFAULT)); + if (!isLoadAllIndex && null != segmentMetaDataInfo && null != filter && !filter.isEmpty() + && null != filter.getExpression() && null == FilterUtil + .getImplicitFilterExpression(filter.getExpression())) { + getTableBlockIndexUniqueIdentifierUsingSegmentMinMax(segment, segmentMetaDataInfo, filter, + identifiers, tableBlockIndexUniqueIdentifierWrappers); + } else { + for (TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier : identifiers) { + tableBlockIndexUniqueIdentifierWrappers.add( + new TableBlockIndexUniqueIdentifierWrapper(tableBlockIndexUniqueIdentifier, + this.getCarbonTable())); + } + } + } } List<BlockletDataMapIndexWrapper> blockletDataMapIndexWrappers = cache.getAll(tableBlockIndexUniqueIdentifierWrappers); @@ -186,6 +217,115 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory } } + /** + * Using blockLevel minmax values, identify if segment has to be added for further pruning and to + * load segment index info to cache + * @param segment to be identified if needed for loading block datamaps + * @param segmentMetaDataInfo list of block level min max values + * @param filter filter expression + * @param identifiers tableBlockIndexUniqueIdentifiers + * @param tableBlockIndexUniqueIdentifierWrappers to add tableBlockIndexUniqueIdentifiers + */ + private void getTableBlockIndexUniqueIdentifierUsingSegmentMinMax(Segment segment, + SegmentMetaDataInfo segmentMetaDataInfo, DataMapFilter filter, + Set<TableBlockIndexUniqueIdentifier> identifiers, + List<TableBlockIndexUniqueIdentifierWrapper> tableBlockIndexUniqueIdentifierWrappers) { + boolean isScanRequired = false; + Map<String, SegmentColumnMetaDataInfo> segmentColumnMetaDataInfoMap = + segmentMetaDataInfo.getSegmentColumnMetaDataInfoMap(); + int length = segmentColumnMetaDataInfoMap.size(); + // Add columnSchemas based on the columns present in segment + List<ColumnSchema> columnSchemas = new ArrayList<>(); + byte[][] min = new byte[length][]; + byte[][] max = new byte[length][]; + boolean[] minMaxFlag = new boolean[length]; + int i = 0; + + // get current columnSchema list for the table + Map<String, ColumnSchema> tableColumnSchemas = + this.getCarbonTable().getTableInfo().getFactTable().getListOfColumns().stream() + .collect(Collectors.toMap(ColumnSchema::getColumnUniqueId, ColumnSchema::clone)); + + // fill min,max and columnSchema values + for (Map.Entry<String, SegmentColumnMetaDataInfo> columnMetaData : + segmentColumnMetaDataInfoMap.entrySet()) { + ColumnSchema columnSchema = tableColumnSchemas.get(columnMetaData.getKey()); + if (null != columnSchema) { + // get segment sort column and column drift info + boolean isSortColumnInSegment = columnMetaData.getValue().isSortColumn(); + boolean isColumnDriftInSegment = columnMetaData.getValue().isColumnDrift(); + if (null != columnSchema.getColumnProperties()) { + // get current sort column and column drift info from current columnSchema + String isSortColumn = + columnSchema.getColumnProperties().get(CarbonCommonConstants.SORT_COLUMNS); + String isColumnDrift = + columnSchema.getColumnProperties().get(CarbonCommonConstants.COLUMN_DRIFT); + if (null != isSortColumn) { + if (isSortColumn.equalsIgnoreCase("true") && !isSortColumnInSegment) { + // Unset current column schema column properties + modifyColumnSchemaForSortColumn(columnSchema, isColumnDriftInSegment, isColumnDrift, + false); + } else if (isSortColumn.equalsIgnoreCase("false") && isSortColumnInSegment) { + // set sort column to true in current column schema column properties + modifyColumnSchemaForSortColumn(columnSchema, isColumnDriftInSegment, isColumnDrift, + true); + } + } else { + modifyColumnSchemaForSortColumn(columnSchema, isColumnDriftInSegment, isColumnDrift, + false); + } + } + columnSchemas.add(columnSchema); + min[i] = columnMetaData.getValue().getColumnMinValue(); + max[i] = columnMetaData.getValue().getColumnMaxValue(); + minMaxFlag[i] = min[i].length != 0 && max[i].length != 0; + i++; + } + } + // get segmentProperties using created columnSchemas list + SegmentProperties segmentProperties = SegmentPropertiesAndSchemaHolder.getInstance() + .addSegmentProperties(this.getCarbonTable(), columnSchemas, segment.getSegmentNo()) + .getSegmentProperties(); + + FilterResolverIntf resolver = + new DataMapFilter(segmentProperties, this.getCarbonTable(), filter.getExpression()) + .getResolver(); + // prepare filter executer using datmapFilter resolver + FilterExecuter filterExecuter = + FilterUtil.getFilterExecuterTree(resolver, segmentProperties, null, null, false); + // check if block has to be pruned based on segment minmax + BitSet scanRequired = filterExecuter.isScanRequired(max, min, minMaxFlag); + if (!scanRequired.isEmpty()) { + isScanRequired = true; + } + if (isScanRequired) { + for (TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier : identifiers) { + tableBlockIndexUniqueIdentifierWrappers.add( + new TableBlockIndexUniqueIdentifierWrapper(tableBlockIndexUniqueIdentifier, + this.getCarbonTable())); + } + } + } + + private void modifyColumnSchemaForSortColumn(ColumnSchema columnSchema, boolean columnDrift, + String isColumnDrift, boolean isSortColumnInSegment) { + if (!isSortColumnInSegment) { + if (null != isColumnDrift && isColumnDrift.equalsIgnoreCase("true") && !columnDrift) { + columnSchema.setDimensionColumn(false); + } + columnSchema.setSortColumn(false); + columnSchema.getColumnProperties().clear(); + } else { + // modify column schema, if current columnSchema is changed + columnSchema.setSortColumn(true); + if (!columnSchema.isDimensionColumn()) { + columnSchema.setDimensionColumn(true); + columnSchema.getColumnProperties().put(CarbonCommonConstants.COLUMN_DRIFT, "true"); + } + columnSchema.getColumnProperties().put(CarbonCommonConstants.SORT_COLUMNS, "true"); + } + } + @Override public List<CoarseGrainDataMap> getDataMaps(Segment segment) throws IOException { return getDataMaps(segment, null); @@ -211,13 +351,19 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory public Set<TableBlockIndexUniqueIdentifier> getTableBlockIndexUniqueIdentifiers(Segment segment) throws IOException { - Set<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers = - segmentMap.get(segment.getSegmentNo()); - if (tableBlockIndexUniqueIdentifiers == null) { + SegmentBlockIndexInfo segmentBlockIndexInfo = segmentMap.get(segment.getSegmentNo()); + Set<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers = null; + if (null != segmentBlockIndexInfo) { + segment.setSegmentMetaDataInfo( + segmentMap.get(segment.getSegmentNo()).getSegmentMetaDataInfo()); + return segmentBlockIndexInfo.getTableBlockIndexUniqueIdentifiers(); + } else { tableBlockIndexUniqueIdentifiers = BlockletDataMapUtil.getTableBlockUniqueIdentifiers(segment); if (tableBlockIndexUniqueIdentifiers.size() > 0) { - segmentMap.put(segment.getSegmentNo(), tableBlockIndexUniqueIdentifiers); + segmentMap.put(segment.getSegmentNo(), + new SegmentBlockIndexInfo(tableBlockIndexUniqueIdentifiers, + segment.getSegmentMetaDataInfo())); } } return tableBlockIndexUniqueIdentifiers; @@ -319,7 +465,11 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory @Override public void clear(String segment) { - Set<TableBlockIndexUniqueIdentifier> blockIndexes = segmentMap.remove(segment); + SegmentBlockIndexInfo segmentBlockIndexInfo = segmentMap.remove(segment); + Set<TableBlockIndexUniqueIdentifier> blockIndexes = null; + if (null != segmentBlockIndexInfo) { + blockIndexes = segmentBlockIndexInfo.getTableBlockIndexUniqueIdentifiers(); + } if (blockIndexes != null) { for (TableBlockIndexUniqueIdentifier blockIndex : blockIndexes) { TableBlockIndexUniqueIdentifierWrapper blockIndexWrapper = @@ -351,8 +501,9 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory public String getCacheSize() { long sum = 0L; int numOfIndexFiles = 0; - for (Map.Entry<String, Set<TableBlockIndexUniqueIdentifier>> entry : segmentMap.entrySet()) { - for (TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier : entry.getValue()) { + for (Map.Entry<String, SegmentBlockIndexInfo> entry : segmentMap.entrySet()) { + for (TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier : entry.getValue() + .getTableBlockIndexUniqueIdentifiers()) { BlockletDataMapIndexWrapper blockletDataMapIndexWrapper = cache.getIfPresent( new TableBlockIndexUniqueIdentifierWrapper(tableBlockIndexUniqueIdentifier, getCarbonTable())); @@ -392,8 +543,13 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory private List<TableBlockIndexUniqueIdentifierWrapper> getTableBlockIndexUniqueIdentifier( DataMapDistributable distributable) throws IOException { List<TableBlockIndexUniqueIdentifierWrapper> identifiersWrapper = new ArrayList<>(); - Set<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers = + SegmentBlockIndexInfo segmentBlockIndexInfo = segmentMap.get(distributable.getSegment().getSegmentNo()); + Set<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers = null; + if (null != segmentBlockIndexInfo) { + tableBlockIndexUniqueIdentifiers = + segmentBlockIndexInfo.getTableBlockIndexUniqueIdentifiers(); + } if (tableBlockIndexUniqueIdentifiers == null) { tableBlockIndexUniqueIdentifiers = new HashSet<>(); Set<String> indexFiles = distributable.getSegment().getCommittedIndexFile().keySet(); @@ -417,7 +573,9 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory this.getCarbonTable())); tableBlockIndexUniqueIdentifiers.add(tableBlockIndexUniqueIdentifier); } - segmentMap.put(distributable.getSegment().getSegmentNo(), tableBlockIndexUniqueIdentifiers); + segmentMap.put(distributable.getSegment().getSegmentNo(), + new SegmentBlockIndexInfo(tableBlockIndexUniqueIdentifiers, + distributable.getSegment().getSegmentMetaDataInfo())); } else { for (TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier : tableBlockIndexUniqueIdentifiers) { @@ -539,7 +697,7 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory private Set<TableBlockIndexUniqueIdentifier> getTableSegmentUniqueIdentifiers(Segment segment) throws IOException { Set<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers = - segmentMap.get(segment.getSegmentNo()); + segmentMap.get(segment.getSegmentNo()).getTableBlockIndexUniqueIdentifiers(); if (tableBlockIndexUniqueIdentifiers == null) { tableBlockIndexUniqueIdentifiers = BlockletDataMapUtil.getSegmentUniqueIdentifiers(segment); } @@ -550,7 +708,7 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory Map<String, Set<TableBlockIndexUniqueIdentifier>> indexUniqueIdentifiers) { for (Map.Entry<String, Set<TableBlockIndexUniqueIdentifier>> identifier : indexUniqueIdentifiers .entrySet()) { - segmentMap.put(identifier.getKey(), identifier.getValue()); + segmentMap.put(identifier.getKey(), new SegmentBlockIndexInfo(identifier.getValue(), null)); } } diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java index c42fa7f..af5724b 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java @@ -54,12 +54,16 @@ import org.apache.carbondata.core.metadata.blocklet.DataFileFooter; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; import org.apache.carbondata.core.mutate.CarbonUpdateUtil; +import org.apache.carbondata.core.segmentmeta.SegmentColumnMetaDataInfo; +import org.apache.carbondata.core.segmentmeta.SegmentMetaDataInfo; +import org.apache.carbondata.core.segmentmeta.SegmentMetaDataInfoStats; import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; import org.apache.carbondata.core.statusmanager.SegmentStatus; import org.apache.carbondata.core.statusmanager.SegmentStatusManager; import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager; import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.core.util.DataFileFooterConverter; +import org.apache.carbondata.core.util.ObjectSerializationUtil; import org.apache.carbondata.core.util.path.CarbonTablePath; import com.google.gson.Gson; @@ -180,11 +184,17 @@ public class SegmentFileStore { * @param carbonTable CarbonTable * @param segmentId segment id * @param UUID a UUID string used to construct the segment file name + * @param segmentMetaDataInfo list of block level min and max values for segment * @return segment file name */ + public static String writeSegmentFile(CarbonTable carbonTable, String segmentId, String UUID, + SegmentMetaDataInfo segmentMetaDataInfo) throws IOException { + return writeSegmentFile(carbonTable, segmentId, UUID, null, segmentMetaDataInfo); + } + public static String writeSegmentFile(CarbonTable carbonTable, String segmentId, String UUID) throws IOException { - return writeSegmentFile(carbonTable, segmentId, UUID, null); + return writeSegmentFile(carbonTable, segmentId, UUID, null, null); } /** @@ -193,12 +203,18 @@ public class SegmentFileStore { * @param carbonTable CarbonTable * @param segmentId segment id * @param UUID a UUID string used to construct the segment file name + * @param segPath segment path + * @param segmentMetaDataInfo segment metadata info * @return segment file name */ public static String writeSegmentFile(CarbonTable carbonTable, String segmentId, String UUID, - String segPath) - throws IOException { - return writeSegmentFile(carbonTable, segmentId, UUID, null, segPath); + String segPath, SegmentMetaDataInfo segmentMetaDataInfo) throws IOException { + return writeSegmentFile(carbonTable, segmentId, UUID, null, segPath, segmentMetaDataInfo); + } + + public static String writeSegmentFile(CarbonTable carbonTable, String segmentId, String UUID, + String segPath) throws IOException { + return writeSegmentFile(carbonTable, segmentId, UUID, null, segPath, null); } /** @@ -315,7 +331,8 @@ public class SegmentFileStore { * @throws IOException */ public static String writeSegmentFile(CarbonTable carbonTable, String segmentId, String UUID, - final String currentLoadTimeStamp, String absSegPath) throws IOException { + final String currentLoadTimeStamp, String absSegPath, SegmentMetaDataInfo segmentMetaDataInfo) + throws IOException { String tablePath = carbonTable.getTablePath(); boolean supportFlatFolder = carbonTable.isSupportFlatFolder(); String segmentPath = absSegPath; @@ -356,6 +373,10 @@ public class SegmentFileStore { } } segmentFile.addPath(segmentRelativePath, folderDetails); + // set segmentMinMax to segmentFile + if (null != segmentMetaDataInfo) { + segmentFile.setSegmentMetaDataInfo(segmentMetaDataInfo); + } String segmentFileFolder = CarbonTablePath.getSegmentFilesLocation(tablePath); CarbonFile carbonFile = FileFactory.getCarbonFile(segmentFileFolder); if (!carbonFile.exists()) { @@ -582,7 +603,7 @@ public class SegmentFileStore { * @param partitionSpecs */ public static SegmentFile getSegmentFileForPhysicalDataPartitions(String tablePath, - List<PartitionSpec> partitionSpecs) { + List<PartitionSpec> partitionSpecs) throws IOException { SegmentFile segmentFile = null; for (PartitionSpec spec : partitionSpecs) { String location = spec.getLocation().toString(); @@ -1223,11 +1244,16 @@ public class SegmentFileStore { */ private Map<String, String> options; + /** + * Segment metadata information such as column_id, min-max, alter properties + */ + private String segmentMetaDataInfo; + SegmentFile() { locationMap = new HashMap<>(); } - public SegmentFile merge(SegmentFile segmentFile) { + public SegmentFile merge(SegmentFile segmentFile) throws IOException { if (this == segmentFile) { return this; } @@ -1240,6 +1266,37 @@ public class SegmentFileStore { locationMap.put(entry.getKey(), entry.getValue()); } } + if (segmentMetaDataInfo != null) { + SegmentMetaDataInfo currentSegmentMetaDataInfo = + (SegmentMetaDataInfo) ObjectSerializationUtil + .convertStringToObject(segmentMetaDataInfo); + if (null != segmentFile.getSegmentMetaDataInfo()) { + // get updated segmentColumnMetaDataInfo based on comparing block min-max values + Map<String, SegmentColumnMetaDataInfo> previousBlockColumnMetaDataInfo = + segmentFile.getSegmentMetaDataInfo().getSegmentColumnMetaDataInfoMap(); + for (Map.Entry<String, SegmentColumnMetaDataInfo> entry : + previousBlockColumnMetaDataInfo.entrySet()) { + if (currentSegmentMetaDataInfo.getSegmentColumnMetaDataInfoMap() + .containsKey(entry.getKey())) { + SegmentColumnMetaDataInfo currentBlockMinMaxInfo = + currentSegmentMetaDataInfo.getSegmentColumnMetaDataInfoMap() + .get(entry.getKey()); + byte[] blockMaxValue = SegmentMetaDataInfoStats.getInstance() + .compareAndUpdateMinMax(currentBlockMinMaxInfo.getColumnMaxValue(), + entry.getValue().getColumnMaxValue(), false); + byte[] blockMinValue = SegmentMetaDataInfoStats.getInstance() + .compareAndUpdateMinMax(currentBlockMinMaxInfo.getColumnMinValue(), + entry.getValue().getColumnMinValue(), true); + currentSegmentMetaDataInfo.getSegmentColumnMetaDataInfoMap().get(entry.getKey()) + .setColumnMaxValue(blockMaxValue); + currentSegmentMetaDataInfo.getSegmentColumnMetaDataInfoMap().get(entry.getKey()) + .setColumnMinValue(blockMinValue); + } + } + } + segmentMetaDataInfo = + ObjectSerializationUtil.convertObjectToString(currentSegmentMetaDataInfo); + } } if (locationMap == null) { locationMap = segmentFile.locationMap; @@ -1265,6 +1322,22 @@ public class SegmentFileStore { public void setOptions(Map<String, String> options) { this.options = options; } + + public SegmentMetaDataInfo getSegmentMetaDataInfo() { + SegmentMetaDataInfo newSegmentMetaDataInfo = null; + try { + newSegmentMetaDataInfo = (SegmentMetaDataInfo) ObjectSerializationUtil + .convertStringToObject(segmentMetaDataInfo); + } catch (IOException e) { + LOGGER.error("Error while getting segment metadata info"); + } + return newSegmentMetaDataInfo; + } + + public void setSegmentMetaDataInfo(SegmentMetaDataInfo segmentMetaDataInfo) throws IOException { + this.segmentMetaDataInfo = ObjectSerializationUtil.convertObjectToString( + segmentMetaDataInfo); + } } public static SegmentFile createSegmentFile(String partitionPath, FolderDetails folderDetails) { diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java index 933898c..b21add4 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java @@ -17,8 +17,12 @@ package org.apache.carbondata.core.metadata.schema.table.column; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.DataInput; +import java.io.DataInputStream; import java.io.DataOutput; +import java.io.DataOutputStream; import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; @@ -37,7 +41,7 @@ import org.apache.carbondata.core.preagg.TimeSeriesUDF; /** * Store the information about the column meta data present the table */ -public class ColumnSchema implements Serializable, Writable { +public class ColumnSchema implements Serializable, Writable, Cloneable { /** * serialization version @@ -600,4 +604,17 @@ public class ColumnSchema implements Serializable, Writable { public void setIndexColumn(boolean indexColumn) { this.indexColumn = indexColumn; } + + public ColumnSchema clone() { + try (ByteArrayOutputStream bos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(bos)) { + this.write(dos); + DataInputStream dis = new DataInputStream(new ByteArrayInputStream(bos.toByteArray())); + ColumnSchema columnSchema = (ColumnSchema) super.clone(); + columnSchema.readFields(dis); + return columnSchema; + } catch (IOException | CloneNotSupportedException e) { + throw new RuntimeException("Error occur while cloning ColumnSchema", e); + } + } } diff --git a/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java b/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java index 7e59156..5d4ed4e 100644 --- a/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java +++ b/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java @@ -86,6 +86,7 @@ public class TableStatusReadCommittedScope implements ReadCommittedScope { SegmentFileStore fileStore = new SegmentFileStore(identifier.getTablePath(), segment.getSegmentFileName()); indexFiles = fileStore.getIndexOrMergeFiles(); + segment.setSegmentMetaDataInfo(fileStore.getSegmentFile().getSegmentMetaDataInfo()); } return indexFiles; } diff --git a/core/src/main/java/org/apache/carbondata/core/segmentmeta/BlockColumnMetaDataInfo.java b/core/src/main/java/org/apache/carbondata/core/segmentmeta/BlockColumnMetaDataInfo.java new file mode 100644 index 0000000..c138243 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/segmentmeta/BlockColumnMetaDataInfo.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.segmentmeta; + +import java.io.Serializable; +import java.util.List; + +import org.apache.carbondata.format.ColumnSchema; + +/** + * Hold's columnSchemas and updated min-max values for all columns in a segment + */ +public class BlockColumnMetaDataInfo implements Serializable { + + private List<org.apache.carbondata.format.ColumnSchema> columnSchemas; + + private byte[][] min; + + private byte[][] max; + + public BlockColumnMetaDataInfo(List<org.apache.carbondata.format.ColumnSchema> columnSchemas, + byte[][] min, byte[][] max) { + this.columnSchemas = columnSchemas; + this.min = min; + this.max = max; + } + + public byte[][] getMin() { + return min; + } + + public void setMinMax(byte[][] min, byte[][] max) { + this.min = min; + this.max = max; + } + + public byte[][] getMax() { + return max; + } + + public List<ColumnSchema> getColumnSchemas() { + return columnSchemas; + } + + public void setColumnSchemas(List<ColumnSchema> columnSchemas) { + this.columnSchemas = columnSchemas; + } +} + diff --git a/core/src/main/java/org/apache/carbondata/core/segmentmeta/SegmentColumnMetaDataInfo.java b/core/src/main/java/org/apache/carbondata/core/segmentmeta/SegmentColumnMetaDataInfo.java new file mode 100644 index 0000000..3536411 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/segmentmeta/SegmentColumnMetaDataInfo.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.segmentmeta; + +import java.io.Serializable; + +/** + * Represent segment level column metadata information + */ +public class SegmentColumnMetaDataInfo implements Serializable { + + /** + * true if column is a sort column + */ + private boolean isSortColumn; + + /** + * segment level min value for a column + */ + private byte[] columnMinValue; + + /** + * segment level max value for a column + */ + private byte[] columnMaxValue; + + /** + * true if column measure if changed to dimension + */ + private boolean isColumnDrift; + + public SegmentColumnMetaDataInfo(boolean isSortColumn, byte[] columnMinValue, + byte[] columnMaxValue, boolean isColumnDrift) { + this.isSortColumn = isSortColumn; + this.columnMinValue = columnMinValue; + this.columnMaxValue = columnMaxValue; + this.isColumnDrift = isColumnDrift; + } + + public boolean isSortColumn() { + return isSortColumn; + } + + public byte[] getColumnMinValue() { + return columnMinValue; + } + + public byte[] getColumnMaxValue() { + return columnMaxValue; + } + + public boolean isColumnDrift() { + return isColumnDrift; + } + + public void setColumnMinValue(byte[] columnMinValue) { + this.columnMinValue = columnMinValue; + } + + public void setColumnMaxValue(byte[] columnMaxValue) { + this.columnMaxValue = columnMaxValue; + } +} + diff --git a/core/src/main/java/org/apache/carbondata/core/segmentmeta/SegmentMetaDataInfo.java b/core/src/main/java/org/apache/carbondata/core/segmentmeta/SegmentMetaDataInfo.java new file mode 100644 index 0000000..721522f --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/segmentmeta/SegmentMetaDataInfo.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.segmentmeta; + +import java.io.Serializable; +import java.util.Map; + +/** + * Represent segment metadata information + */ +public class SegmentMetaDataInfo implements Serializable { + + private Map<String, SegmentColumnMetaDataInfo> segmentColumnMetaDataInfoMap; + + SegmentMetaDataInfo(Map<String, SegmentColumnMetaDataInfo> segmentColumnMetaDataInfoMap) { + this.segmentColumnMetaDataInfoMap = segmentColumnMetaDataInfoMap; + } + + public Map<String, SegmentColumnMetaDataInfo> getSegmentColumnMetaDataInfoMap() { + return segmentColumnMetaDataInfoMap; + } +} diff --git a/core/src/main/java/org/apache/carbondata/core/segmentmeta/SegmentMetaDataInfoStats.java b/core/src/main/java/org/apache/carbondata/core/segmentmeta/SegmentMetaDataInfoStats.java new file mode 100644 index 0000000..704df72 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/segmentmeta/SegmentMetaDataInfoStats.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.segmentmeta; + +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.Map; + +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.util.ByteUtil; + +/** + * Holds segment level meta data information such as min,max, sortColumn info for the + * corresponding table + */ +public class SegmentMetaDataInfoStats { + + private SegmentMetaDataInfoStats() { + tableSegmentMetaDataInfoMap = new LinkedHashMap<>(); + } + + public static synchronized SegmentMetaDataInfoStats getInstance() { + if (null == segmentMetaDataInfoStats) { + segmentMetaDataInfoStats = new SegmentMetaDataInfoStats(); + return segmentMetaDataInfoStats; + } else { + return segmentMetaDataInfoStats; + } + } + + private Map<String, Map<String, BlockColumnMetaDataInfo>> tableSegmentMetaDataInfoMap; + + private static SegmentMetaDataInfoStats segmentMetaDataInfoStats; + + /** + * Prepare of map with key as column-id and value as SegmentColumnMetaDataInfo using the + * tableSegmentMetaDataInfoMap + * + * @param tableName get corresponding tableName from map + * @param segmentId get corresponding segment Id from map + * @return segmentMetaDataInfo for the corresponding segment + */ + public synchronized SegmentMetaDataInfo getTableSegmentMetaDataInfo(String tableName, + String segmentId) { + Map<String, SegmentColumnMetaDataInfo> segmentColumnMetaDataInfoMap = new LinkedHashMap<>(); + Map<String, BlockColumnMetaDataInfo> segmentMetaDataInfoMap = + this.tableSegmentMetaDataInfoMap.get(tableName); + if (null != segmentMetaDataInfoMap && !segmentMetaDataInfoMap.isEmpty() + && null != segmentMetaDataInfoMap.get(segmentId)) { + BlockColumnMetaDataInfo blockColumnMetaDataInfo = segmentMetaDataInfoMap.get(segmentId); + for (int i = 0; i < blockColumnMetaDataInfo.getColumnSchemas().size(); i++) { + org.apache.carbondata.format.ColumnSchema columnSchema = + blockColumnMetaDataInfo.getColumnSchemas().get(i); + boolean isSortColumn = false; + boolean isColumnDrift = false; + if (null != columnSchema.columnProperties && !columnSchema.columnProperties.isEmpty()) { + if (null != columnSchema.columnProperties.get(CarbonCommonConstants.SORT_COLUMNS)) { + isSortColumn = true; + } + if (null != columnSchema.columnProperties.get(CarbonCommonConstants.COLUMN_DRIFT)) { + isColumnDrift = true; + } + } + segmentColumnMetaDataInfoMap.put(columnSchema.column_id, + new SegmentColumnMetaDataInfo(isSortColumn, blockColumnMetaDataInfo.getMin()[i], + blockColumnMetaDataInfo.getMax()[i], isColumnDrift)); + } + } + return new SegmentMetaDataInfo(segmentColumnMetaDataInfoMap); + } + + public synchronized void setBlockMetaDataInfo(String tableName, String segmentId, + BlockColumnMetaDataInfo currentBlockColumnMetaInfo) { + // check if tableName is present in tableSegmentMetaDataInfoMap + if (!this.tableSegmentMetaDataInfoMap.isEmpty() && null != this.tableSegmentMetaDataInfoMap + .get(tableName) && null != this.tableSegmentMetaDataInfoMap.get(tableName).get(segmentId)) { + // get previous blockColumn metadata information + BlockColumnMetaDataInfo previousBlockColumnMetaInfo = + this.tableSegmentMetaDataInfoMap.get(tableName).get(segmentId); + // compare and get updated min and max values + byte[][] updatedMin = compareAndUpdateMinMax(previousBlockColumnMetaInfo.getMin(), + currentBlockColumnMetaInfo.getMin(), true); + byte[][] updatedMax = compareAndUpdateMinMax(previousBlockColumnMetaInfo.getMax(), + currentBlockColumnMetaInfo.getMax(), false); + // update the segment + this.tableSegmentMetaDataInfoMap.get(tableName).get(segmentId) + .setMinMax(updatedMin, updatedMax); + } else { + Map<String, BlockColumnMetaDataInfo> segmentMinMaxMap = new HashMap<>(); + if (null != this.tableSegmentMetaDataInfoMap.get(tableName) + && !this.tableSegmentMetaDataInfoMap.get(tableName).isEmpty()) { + segmentMinMaxMap = this.tableSegmentMetaDataInfoMap.get(tableName); + } + segmentMinMaxMap.put(segmentId, currentBlockColumnMetaInfo); + this.tableSegmentMetaDataInfoMap.put(tableName, segmentMinMaxMap); + } + } + + /** + * Clear the corresponding segmentId and tableName from the segmentMinMaxMap + */ + public synchronized void clear(String tableName, String segmentId) { + if (null != tableSegmentMetaDataInfoMap.get(tableName)) { + if (null != tableSegmentMetaDataInfoMap.get(tableName).get(segmentId)) { + tableSegmentMetaDataInfoMap.get(tableName).remove(segmentId); + } + if (tableSegmentMetaDataInfoMap.get(tableName).isEmpty()) { + tableSegmentMetaDataInfoMap.remove(tableName); + } + } + } + + /** + * This method will do min/max comparison of values and update if required + */ + public synchronized byte[] compareAndUpdateMinMax(byte[] minMaxValueCompare1, + byte[] minMaxValueCompare2, boolean isMinValueComparison) { + // Compare and update min max values + byte[] updatedMinMaxValues = new byte[minMaxValueCompare1.length]; + System.arraycopy(minMaxValueCompare1, 0, updatedMinMaxValues, 0, minMaxValueCompare1.length); + int compare = + ByteUtil.UnsafeComparer.INSTANCE.compareTo(minMaxValueCompare2, minMaxValueCompare1); + if (isMinValueComparison) { + if (compare < 0) { + updatedMinMaxValues = minMaxValueCompare2; + } + } else if (compare > 0) { + updatedMinMaxValues = minMaxValueCompare2; + } + return updatedMinMaxValues; + } + + private synchronized byte[][] compareAndUpdateMinMax(byte[][] minMaxValueCompare1, + byte[][] minMaxValueCompare2, boolean isMinValueComparison) { + // Compare and update min max values + byte[][] updatedMinMaxValues = new byte[minMaxValueCompare1.length][]; + System.arraycopy(minMaxValueCompare1, 0, updatedMinMaxValues, 0, minMaxValueCompare1.length); + for (int i = 0; i < minMaxValueCompare1.length; i++) { + int compare = ByteUtil.UnsafeComparer.INSTANCE + .compareTo(minMaxValueCompare2[i], minMaxValueCompare1[i]); + if (isMinValueComparison) { + if (compare < 0) { + updatedMinMaxValues[i] = minMaxValueCompare2[i]; + } + } else if (compare > 0) { + updatedMinMaxValues[i] = minMaxValueCompare2[i]; + } + } + return updatedMinMaxValues; + } + +} \ No newline at end of file diff --git a/docs/configuration-parameters.md b/docs/configuration-parameters.md index 1f521bdc..a570d4c 100644 --- a/docs/configuration-parameters.md +++ b/docs/configuration-parameters.md @@ -146,6 +146,7 @@ This section provides the details of all the configurations required for the Car | carbon.query.prefetch.enable | true | By default this property is true, so prefetch is used in query to read next blocklet asynchronously in other thread while processing current blocklet in main thread. This can help to reduce CPU idle time. Setting this property false will disable this prefetch feature in query. | | carbon.query.stage.input.enable | false | Stage input files are data files written by external applications (such as Flink), but have not been loaded into carbon table. Enabling this configuration makes query to include these files, thus makes query on latest data. However, since these files are not indexed, query maybe slower as full scan is required for these files. | | carbon.driver.pruning.multi.thread.enable.files.count | 100000 | To prune in multi-thread when total number of segment files for a query increases beyond the configured value. | +| carbon.load.all.segment.indexes.to.cache | true | Setting this configuration to false, will prune and load only matched segment indexes to cache using segment metadata information such as columnid and it's minmax values, which decreases the usage of driver memory. | ## Data Mutation Configuration | Parameter | Default Value | Description | diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala index b332849..8fb3002 100644 --- a/integration/spark/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala +++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala @@ -35,7 +35,7 @@ import org.apache.spark.sql.execution.command.ExecutionErrors import org.apache.spark.sql.util.{SparkSQLUtil, SparkTypeConverter} import org.apache.spark.storage.StorageLevel import org.apache.spark.unsafe.types.UTF8String -import org.apache.spark.util.LongAccumulator +import org.apache.spark.util.{CollectionAccumulator, LongAccumulator} import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.converter.SparkDataTypeConverterImpl @@ -44,6 +44,7 @@ import org.apache.carbondata.core.datastore.row.CarbonRow import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes, StructField, StructType} import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.metadata.schema.table.column.{CarbonColumn, CarbonDimension} +import org.apache.carbondata.core.segmentmeta.SegmentMetaDataInfo import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus} import org.apache.carbondata.core.util._ import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer @@ -56,7 +57,7 @@ import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat import org.apache.carbondata.processing.loading.model.CarbonLoadModel import org.apache.carbondata.processing.sort.sortdata.{NewRowComparator, NewRowComparatorForNormalDims, SortParameters} import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, TableOptionConstant} -import org.apache.carbondata.spark.rdd.{CarbonScanRDD, StringArrayRow} +import org.apache.carbondata.spark.rdd.{CarbonScanRDD, InsertTaskCompletionListener, StringArrayRow} import org.apache.carbondata.spark.util.{CommonUtil, Util} import org.apache.carbondata.store.CarbonRowReadSupport @@ -70,7 +71,9 @@ object DataLoadProcessBuilderOnSpark { sparkSession: SparkSession, dataFrame: Option[DataFrame], model: CarbonLoadModel, - hadoopConf: Configuration): Array[(String, (LoadMetadataDetails, ExecutionErrors))] = { + hadoopConf: Configuration, + segmentMetaDataAccumulator: CollectionAccumulator[Map[String, SegmentMetaDataInfo]]) + : Array[(String, (LoadMetadataDetails, ExecutionErrors))] = { var isLoadFromCSV = false val originRDD = if (dataFrame.isDefined) { dataFrame.get.rdd @@ -160,10 +163,10 @@ object DataLoadProcessBuilderOnSpark { // 4. Write sc.runJob(sortRDD, (context: TaskContext, rows: Iterator[CarbonRow]) => { - setTaskListener() - val model = modelBroadcast.value.getCopyWithTaskNo(context.partitionId.toString) + setTaskListener(model.getTableName, model.getSegmentId, segmentMetaDataAccumulator) + val loadModel = modelBroadcast.value.getCopyWithTaskNo(context.partitionId.toString) DataLoadProcessorStepOnSpark.writeFunc( - rows, context.partitionId, model, writeStepRowCounter, conf.value.value) + rows, context.partitionId, loadModel, writeStepRowCounter, conf.value.value) }) // clean cache only if persisted and keeping unpersist non-blocking as non-blocking call will @@ -186,7 +189,9 @@ object DataLoadProcessBuilderOnSpark { sparkSession: SparkSession, scanResultRDD : RDD[InternalRow], model: CarbonLoadModel, - hadoopConf: Configuration): Array[(String, (LoadMetadataDetails, ExecutionErrors))] = { + hadoopConf: Configuration, + segmentMetaDataAccumulator: CollectionAccumulator[Map[String, SegmentMetaDataInfo]]) + : Array[(String, (LoadMetadataDetails, ExecutionErrors))] = { val originRDD = scanResultRDD val sc = sparkSession.sparkContext @@ -245,9 +250,9 @@ object DataLoadProcessBuilderOnSpark { // 3. Write sc.runJob(newRDD, (context: TaskContext, rows: Iterator[CarbonRow]) => { - setTaskListener() - val model = modelBroadcast.value.getCopyWithTaskNo(context.partitionId.toString) - DataLoadProcessorStepOnSpark.writeFunc(rows, context.partitionId, model, + setTaskListener(model.getTableName, model.getSegmentId, segmentMetaDataAccumulator) + val loadModel = modelBroadcast.value.getCopyWithTaskNo(context.partitionId.toString) + DataLoadProcessorStepOnSpark.writeFunc(rows, context.partitionId, loadModel, writeStepRowCounter, conf.value.value) }) // clean cache only if persisted and keeping unpersist non-blocking as non-blocking call will @@ -298,7 +303,9 @@ object DataLoadProcessBuilderOnSpark { def loadDataUsingRangeSort( sparkSession: SparkSession, model: CarbonLoadModel, - hadoopConf: Configuration): Array[(String, (LoadMetadataDetails, ExecutionErrors))] = { + hadoopConf: Configuration, + segmentMetaDataAccumulator: CollectionAccumulator[Map[String, SegmentMetaDataInfo]]) + : Array[(String, (LoadMetadataDetails, ExecutionErrors))] = { // initialize and prepare row counter val sc = sparkSession.sparkContext val modelBroadcast = sc.broadcast(model) @@ -349,7 +356,7 @@ object DataLoadProcessBuilderOnSpark { // 4. Sort and Write data sc.runJob(rangeRDD, (context: TaskContext, rows: Iterator[CarbonRow]) => { - setTaskListener() + setTaskListener(model.getTableName, model.getSegmentId, segmentMetaDataAccumulator) DataLoadProcessorStepOnSpark.sortAndWriteFunc(rows, context.partitionId, modelBroadcast, writeStepRowCounter, conf.value.value) }) @@ -479,9 +486,11 @@ object DataLoadProcessBuilderOnSpark { } } - def setTaskListener(): Unit = { - TaskContext.get.addTaskCompletionListener { _ => - CommonUtil.clearUnsafeMemory(ThreadLocalTaskInfo.getCarbonTaskInfo.getTaskId) + def setTaskListener(tableName: String, + segmentId: String, + segmentMetaDataAccumulator: CollectionAccumulator[Map[String, SegmentMetaDataInfo]]): Unit = { + TaskContext.get.addTaskCompletionListener { + new InsertTaskCompletionListener(null, null, segmentMetaDataAccumulator, tableName, segmentId) } TaskMetricsMap.initializeThreadLocal() val carbonTaskInfo = new CarbonTaskInfo diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala index a7603e2..4777ab8 100644 --- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala +++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala @@ -41,6 +41,7 @@ import org.apache.spark.sql.execution.command.management.CommonLoadUtils import org.apache.spark.sql.hive.DistributionUtil import org.apache.spark.sql.optimizer.CarbonFilters import org.apache.spark.sql.util.{CarbonException, SparkSQLUtil} +import org.apache.spark.util.CollectionAccumulator import org.apache.carbondata.common.constants.LoggerAction import org.apache.carbondata.common.logging.LogServiceFactory @@ -56,6 +57,7 @@ import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock, LockUsa import org.apache.carbondata.core.metadata.{CarbonTableIdentifier, ColumnarFormatVersion, SegmentFileStore} import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.mutate.CarbonUpdateUtil +import org.apache.carbondata.core.segmentmeta.{SegmentMetaDataInfo, SegmentMetaDataInfoStats} import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager} import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, ThreadLocalSessionInfo} import org.apache.carbondata.core.util.path.CarbonTablePath @@ -316,7 +318,10 @@ object CarbonDataRDDFactory { val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable var status: Array[(String, (LoadMetadataDetails, ExecutionErrors))] = null var res: Array[List[(String, (LoadMetadataDetails, ExecutionErrors))]] = null - + // accumulator to collect segment metadata + val segmentMetaDataAccumulator = sqlContext + .sparkContext + .collectionAccumulator[Map[String, SegmentMetaDataInfo]] // create new segment folder in carbon store if (updateModel.isEmpty && carbonLoadModel.isCarbonTransactionalTable) { CarbonLoaderUtil.checkAndCreateCarbonDataLocation(carbonLoadModel.getSegmentId, carbonTable) @@ -339,7 +344,8 @@ object CarbonDataRDDFactory { carbonLoadModel, updateModel, carbonTable, - hadoopConf) + hadoopConf, + segmentMetaDataAccumulator) res.foreach { resultOfSeg => resultOfSeg.foreach { resultOfBlock => if (resultOfBlock._2._1.getSegmentStatus == SegmentStatus.LOAD_FAILURE) { @@ -373,9 +379,14 @@ object CarbonDataRDDFactory { .sparkSession, convertedRdd, carbonLoadModel, - hadoopConf) + hadoopConf, + segmentMetaDataAccumulator) } else { - loadDataFrame(sqlContext, None, Some(convertedRdd), carbonLoadModel) + loadDataFrame(sqlContext, + None, + Some(convertedRdd), + carbonLoadModel, + segmentMetaDataAccumulator) } } else { if (dataFrame.isEmpty && isSortTable && @@ -383,16 +394,24 @@ object CarbonDataRDDFactory { (sortScope.equals(SortScopeOptions.SortScope.GLOBAL_SORT) || sortScope.equals(SortScopeOptions.SortScope.LOCAL_SORT))) { DataLoadProcessBuilderOnSpark - .loadDataUsingRangeSort(sqlContext.sparkSession, carbonLoadModel, hadoopConf) + .loadDataUsingRangeSort(sqlContext.sparkSession, + carbonLoadModel, + hadoopConf, + segmentMetaDataAccumulator) } else if (isSortTable && sortScope.equals(SortScopeOptions.SortScope.GLOBAL_SORT)) { DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort(sqlContext.sparkSession, dataFrame, carbonLoadModel, - hadoopConf) + hadoopConf, + segmentMetaDataAccumulator) } else if (dataFrame.isDefined) { - loadDataFrame(sqlContext, dataFrame, None, carbonLoadModel) + loadDataFrame(sqlContext, + dataFrame, + None, + carbonLoadModel, + segmentMetaDataAccumulator) } else { - loadDataFile(sqlContext, carbonLoadModel, hadoopConf) + loadDataFile(sqlContext, carbonLoadModel, hadoopConf, segmentMetaDataAccumulator) } } val newStatusMap = scala.collection.mutable.Map.empty[String, SegmentStatus] @@ -479,7 +498,17 @@ object CarbonDataRDDFactory { segmentDetails.add(new Segment(resultOfBlock._2._1.getLoadName)) } } - val segmentFiles = updateSegmentFiles(carbonTable, segmentDetails, updateModel.get) + var segmentMetaDataInfoMap = scala.collection.mutable.Map.empty[String, SegmentMetaDataInfo] + if (!segmentMetaDataAccumulator.isZero) { + segmentMetaDataAccumulator.value.asScala.foreach(map => if (map.nonEmpty) { + segmentMetaDataInfoMap = segmentMetaDataInfoMap ++ map + }) + } + val segmentFiles = updateSegmentFiles(carbonTable, + segmentDetails, + updateModel.get, + segmentMetaDataInfoMap.asJava) + // this means that the update doesnt have any records to update so no need to do table // status file updation. if (resultSize == 0) { @@ -550,9 +579,14 @@ object CarbonDataRDDFactory { loadStatus } + val segmentMetaDataInfo = CommonLoadUtils.getSegmentMetaDataInfoFromAccumulator( + carbonLoadModel.getSegmentId, + segmentMetaDataAccumulator) val segmentFileName = SegmentFileStore.writeSegmentFile(carbonTable, carbonLoadModel.getSegmentId, - String.valueOf(carbonLoadModel.getFactTimeStamp)) + String.valueOf(carbonLoadModel.getFactTimeStamp), segmentMetaDataInfo) + // clear segmentMetaDataAccumulator + segmentMetaDataAccumulator.reset() SegmentFileStore.updateTableStatusFile( carbonTable, @@ -667,7 +701,8 @@ object CarbonDataRDDFactory { private def updateSegmentFiles( carbonTable: CarbonTable, segmentDetails: util.HashSet[Segment], - updateModel: UpdateTableModel) = { + updateModel: UpdateTableModel, + segmentMetaDataInfoMap: util.Map[String, SegmentMetaDataInfo]) = { val metadataDetails = SegmentStatusManager.readTableStatusFile( CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath)) @@ -677,11 +712,13 @@ object CarbonDataRDDFactory { val segmentFile = load.getSegmentFile var segmentFiles: Seq[CarbonFile] = Seq.empty[CarbonFile] + val segmentMetaDataInfo = segmentMetaDataInfoMap.get(seg.getSegmentNo) val file = SegmentFileStore.writeSegmentFile( carbonTable, seg.getSegmentNo, String.valueOf(System.currentTimeMillis()), - load.getPath) + load.getPath, + segmentMetaDataInfo) if (segmentFile != null) { segmentFiles ++= FileFactory.getCarbonFile( @@ -720,7 +757,9 @@ object CarbonDataRDDFactory { carbonLoadModel: CarbonLoadModel, updateModel: Option[UpdateTableModel], carbonTable: CarbonTable, - hadoopConf: Configuration): Array[List[(String, (LoadMetadataDetails, ExecutionErrors))]] = { + hadoopConf: Configuration, + segmentMetaDataAccumulator: CollectionAccumulator[Map[String, SegmentMetaDataInfo]] + ): Array[List[(String, (LoadMetadataDetails, ExecutionErrors))]] = { val segmentUpdateParallelism = CarbonProperties.getInstance().getParallelismForSegmentUpdate val updateRdd = dataFrame.get.rdd @@ -773,7 +812,8 @@ object CarbonDataRDDFactory { updateModel, segId.getSegmentNo, newTaskNo, - partition).toList).toIterator + partition, + segmentMetaDataAccumulator).toList).toIterator }.collect() } } @@ -786,7 +826,9 @@ object CarbonDataRDDFactory { updateModel: Option[UpdateTableModel], key: String, taskNo: Long, - iter: Iterator[Row]): Iterator[(String, (LoadMetadataDetails, ExecutionErrors))] = { + iter: Iterator[Row], + segmentMetaDataAccumulator: CollectionAccumulator[Map[String, SegmentMetaDataInfo]] + ): Iterator[(String, (LoadMetadataDetails, ExecutionErrors))] = { val rddResult = new updateResultImpl() val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) val resultIter = new Iterator[(String, (LoadMetadataDetails, ExecutionErrors))] { @@ -811,7 +853,8 @@ object CarbonDataRDDFactory { index, iter, carbonLoadModel, - loadMetadataDetails) + loadMetadataDetails, + segmentMetaDataAccumulator) } catch { case e: NoRetryException => loadMetadataDetails @@ -994,7 +1037,8 @@ object CarbonDataRDDFactory { sqlContext: SQLContext, dataFrame: Option[DataFrame], scanResultRDD: Option[RDD[InternalRow]], - carbonLoadModel: CarbonLoadModel + carbonLoadModel: CarbonLoadModel, + segmentMetaDataAccumulator: CollectionAccumulator[Map[String, SegmentMetaDataInfo]] ): Array[(String, (LoadMetadataDetails, ExecutionErrors))] = { try { val rdd = if (dataFrame.isDefined) { @@ -1024,7 +1068,8 @@ object CarbonDataRDDFactory { sqlContext.sparkSession, new DataLoadResultImpl(), carbonLoadModel, - newRdd + newRdd, + segmentMetaDataAccumulator ).collect() } catch { case ex: Exception => @@ -1039,7 +1084,8 @@ object CarbonDataRDDFactory { private def loadDataFile( sqlContext: SQLContext, carbonLoadModel: CarbonLoadModel, - hadoopConf: Configuration + hadoopConf: Configuration, + segmentMetaDataAccumulator: CollectionAccumulator[Map[String, SegmentMetaDataInfo]] ): Array[(String, (LoadMetadataDetails, ExecutionErrors))] = { /* * when data load handle by node partition @@ -1134,7 +1180,9 @@ object CarbonDataRDDFactory { sqlContext.sparkSession, new DataLoadResultImpl(), carbonLoadModel, - blocksGroupBy + blocksGroupBy, + segmentMetaDataAccumulator ).collect() } + } diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala index a0baad0..5bd05a9 100644 --- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala +++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala @@ -24,9 +24,11 @@ import org.apache.spark.Partition import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.sql.SparkSession import org.apache.spark.sql.execution.command.CarbonMergerMapping +import org.apache.spark.util.CollectionAccumulator import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier} +import org.apache.carbondata.core.segmentmeta.SegmentMetaDataInfo import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonMultiBlockSplit} import org.apache.carbondata.hadoop.api.CarbonInputFormat import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil @@ -41,11 +43,13 @@ class CarbonIUDMergerRDD[K, V]( @transient private val ss: SparkSession, result: MergeResult[K, V], carbonLoadModel: CarbonLoadModel, - carbonMergerMapping: CarbonMergerMapping) + carbonMergerMapping: CarbonMergerMapping, + segmentMetaDataAccumulator: CollectionAccumulator[Map[String, SegmentMetaDataInfo]]) extends CarbonMergerRDD[K, V](ss, result, carbonLoadModel, - carbonMergerMapping) { + carbonMergerMapping, + segmentMetaDataAccumulator) { override def internalGetPartitions: Array[Partition] = { val startTime = System.currentTimeMillis() diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala index 4278ade..5c2cc1b 100644 --- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala +++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala @@ -37,6 +37,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.command.{CarbonMergerMapping, NodeInfo} import org.apache.spark.sql.hive.DistributionUtil import org.apache.spark.sql.util.CarbonException +import org.apache.spark.util.CollectionAccumulator import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.converter.SparkDataTypeConverterImpl @@ -55,13 +56,13 @@ import org.apache.carbondata.core.mutate.UpdateVO import org.apache.carbondata.core.scan.expression import org.apache.carbondata.core.scan.expression.Expression import org.apache.carbondata.core.scan.result.iterator.RawResultIterator +import org.apache.carbondata.core.segmentmeta.SegmentMetaDataInfo import org.apache.carbondata.core.statusmanager.{FileFormat, LoadMetadataDetails, SegmentStatusManager, SegmentUpdateStatusManager} import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, DataTypeUtil} import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonInputSplitWrapper, CarbonMultiBlockSplit, CarbonProjection} import org.apache.carbondata.hadoop.api.{CarbonInputFormat, CarbonTableInputFormat} import org.apache.carbondata.hadoop.util.{CarbonInputFormatUtil, CarbonInputSplitTaskInfo} -import org.apache.carbondata.processing.loading.TableProcessingOperations import org.apache.carbondata.processing.loading.model.CarbonLoadModel import org.apache.carbondata.processing.merger._ import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, CarbonLoaderUtil} @@ -73,7 +74,8 @@ class CarbonMergerRDD[K, V]( @transient private val ss: SparkSession, result: MergeResult[K, V], carbonLoadModel: CarbonLoadModel, - carbonMergerMapping: CarbonMergerMapping) + carbonMergerMapping: CarbonMergerMapping, + segmentMetaDataAccumulator: CollectionAccumulator[Map[String, SegmentMetaDataInfo]]) extends CarbonRDD[(K, V)](ss, Nil) { ss.sparkContext.setLocalProperty("spark.scheduler.pool", "DDL") @@ -121,7 +123,8 @@ class CarbonMergerRDD[K, V]( var mergeNumber = "" var exec: CarbonCompactionExecutor = null var processor: AbstractResultProcessor = null - var rawResultIteratorMap: util.Map[String, util.List[RawResultIterator]] = _ + var rawResultIteratorMap: util.Map[String, util.List[RawResultIterator]] = new util + .HashMap[String, util.List[RawResultIterator]]() try { // sorting the table block info List. val splitList = if (null == rangeColumn || singleRange) { @@ -202,8 +205,14 @@ class CarbonMergerRDD[K, V]( new SparkDataTypeConverterImpl) // add task completion listener to clean up the resources - context.addTaskCompletionListener { _ => - close() + context.addTaskCompletionListener { + new CompactionTaskCompletionListener(carbonLoadModel, + exec, + processor, + rawResultIteratorMap, + segmentMetaDataAccumulator, + queryStartTime + ) } try { // fire a query and get the results. @@ -265,33 +274,6 @@ class CarbonMergerRDD[K, V]( throw e } - private def close(): Unit = { - deleteLocalDataFolders() - // close all the query executor service and clean up memory acquired during query processing - if (null != exec) { - LOGGER.info("Cleaning up query resources acquired during compaction") - exec.close(rawResultIteratorMap.get(CarbonCompactionUtil.UNSORTED_IDX), queryStartTime) - exec.close(rawResultIteratorMap.get(CarbonCompactionUtil.SORTED_IDX), queryStartTime) - } - // clean up the resources for processor - if (null != processor) { - LOGGER.info("Closing compaction processor instance to clean up loading resources") - processor.close() - } - } - - private def deleteLocalDataFolders(): Unit = { - try { - LOGGER.info("Deleting local folder store location") - val isCompactionFlow = true - TableProcessingOperations - .deleteLocalDataLoadFolderLocation(carbonLoadModel, isCompactionFlow, false) - } catch { - case e: Exception => - LOGGER.error(e) - } - } - var finished = false override def hasNext: Boolean = { diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala index 4dd472b..d1d8743 100644 --- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala +++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala @@ -29,8 +29,9 @@ import org.apache.hadoop.mapreduce.{InputSplit, Job} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.sql.{CarbonUtils, SparkSession, SQLContext} import org.apache.spark.sql.execution.command.{CarbonMergerMapping, CompactionCallableModel, CompactionModel} +import org.apache.spark.sql.execution.command.management.CommonLoadUtils import org.apache.spark.sql.util.SparkSQLUtil -import org.apache.spark.util.MergeIndexUtil +import org.apache.spark.util.{CollectionAccumulator, MergeIndexUtil} import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.constants.SortScopeOptions.SortScope @@ -38,6 +39,7 @@ import org.apache.carbondata.core.datamap.{DataMapStoreManager, Segment} import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.metadata.SegmentFileStore +import org.apache.carbondata.core.segmentmeta.SegmentMetaDataInfo import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager} import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.core.util.CarbonUtil @@ -194,6 +196,10 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel, OperationListenerBus.getInstance().fireEvent(dataMapPreExecutionEvent, dataMapOperationContext) } + // accumulator to collect segment metadata + val segmentMetaDataAccumulator = sqlContext + .sparkContext + .collectionAccumulator[Map[String, SegmentMetaDataInfo]] val mergeStatus = if (CompactionType.IUD_UPDDEL_DELTA == compactionType) { @@ -201,19 +207,24 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel, sc.sparkSession, new MergeResultImpl(), carbonLoadModel, - carbonMergerMapping + carbonMergerMapping, + segmentMetaDataAccumulator ).collect } else if (SortScope.GLOBAL_SORT == carbonTable.getSortScope && !carbonTable.getSortColumns.isEmpty && carbonTable.getRangeColumn == null && CarbonUtil.isStandardCarbonTable(carbonTable)) { - compactSegmentsByGlobalSort(sc.sparkSession, carbonLoadModel, carbonMergerMapping) + compactSegmentsByGlobalSort(sc.sparkSession, + carbonLoadModel, + carbonMergerMapping, + segmentMetaDataAccumulator) } else { new CarbonMergerRDD( sc.sparkSession, new MergeResultImpl(), carbonLoadModel, - carbonMergerMapping + carbonMergerMapping, + segmentMetaDataAccumulator ).collect } @@ -248,21 +259,31 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel, } else { // Get the segment files each updated segment in case of IUD compaction if (compactionType == CompactionType.IUD_UPDDEL_DELTA) { - val segmentFilesList = loadsToMerge.asScala.map{seg => + val segmentFilesList = loadsToMerge.asScala.map { seg => + val segmentMetaDataInfo = new SegmentFileStore(carbonLoadModel.getTablePath, + seg.getSegmentFile).getSegmentFile.getSegmentMetaDataInfo val file = SegmentFileStore.writeSegmentFile( carbonTable, seg.getLoadName, - carbonLoadModel.getFactTimeStamp.toString) + carbonLoadModel.getFactTimeStamp.toString, + segmentMetaDataInfo) new Segment(seg.getLoadName, file) }.filter(_.getSegmentFileName != null).asJava segmentFilesForIUDCompact = new util.ArrayList[Segment](segmentFilesList) } else { + // get segmentMetadata info from accumulator + val segmentMetaDataInfo = CommonLoadUtils.getSegmentMetaDataInfoFromAccumulator( + mergedLoadNumber, + segmentMetaDataAccumulator) segmentFileName = SegmentFileStore.writeSegmentFile( carbonTable, mergedLoadNumber, - carbonLoadModel.getFactTimeStamp.toString) + carbonLoadModel.getFactTimeStamp.toString, + segmentMetaDataInfo) } } + // clear segmentMetaDataAccumulator + segmentMetaDataAccumulator.reset() // Used to inform the commit listener that the commit is fired from compaction flow. operationContext.setProperty("isCompaction", "true") // trigger event for compaction @@ -362,7 +383,9 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel, def compactSegmentsByGlobalSort( sparkSession: SparkSession, carbonLoadModel: CarbonLoadModel, - carbonMergerMapping: CarbonMergerMapping): Array[(String, Boolean)] = { + carbonMergerMapping: CarbonMergerMapping, + segmentMetaDataAccumulator: CollectionAccumulator[Map[String, SegmentMetaDataInfo]]) + : Array[(String, Boolean)] = { val table = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable val splits = splitsOfSegments( sparkSession, @@ -386,7 +409,8 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel, sparkSession, Option(dataFrame), outputModel, - SparkSQLUtil.sessionState(sparkSession).newHadoopConf()) + SparkSQLUtil.sessionState(sparkSession).newHadoopConf(), + segmentMetaDataAccumulator) .map { row => (row._1, FailureCauses.NONE == row._2._2.failureCauses) } diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CompactionTaskCompletionListener.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CompactionTaskCompletionListener.scala new file mode 100644 index 0000000..c494b7f --- /dev/null +++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CompactionTaskCompletionListener.scala @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.rdd + +import java.util + +import org.apache.log4j.Logger +import org.apache.spark.TaskContext +import org.apache.spark.sql.carbondata.execution.datasources.tasklisteners.CarbonCompactionTaskCompletionListener +import org.apache.spark.sql.execution.command.management.CommonLoadUtils +import org.apache.spark.util.CollectionAccumulator + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.scan.result.iterator.RawResultIterator +import org.apache.carbondata.core.segmentmeta.SegmentMetaDataInfo +import org.apache.carbondata.processing.loading.TableProcessingOperations +import org.apache.carbondata.processing.loading.model.CarbonLoadModel +import org.apache.carbondata.processing.merger.{AbstractResultProcessor, CarbonCompactionExecutor, CarbonCompactionUtil} + +class CompactionTaskCompletionListener( + carbonLoadModel: CarbonLoadModel, + exec: CarbonCompactionExecutor, + processor: AbstractResultProcessor, + rawResultIteratorMap: util.Map[String, util.List[RawResultIterator]], + segmentMetaDataAccumulator: CollectionAccumulator[Map[String, SegmentMetaDataInfo]], + queryStartTime: Long) + extends CarbonCompactionTaskCompletionListener { + + val LOGGER: Logger = LogServiceFactory.getLogService(this.getClass.getName) + + override def onTaskCompletion(context: TaskContext): Unit = { + deleteLocalDataFolders() + // close all the query executor service and clean up memory acquired during query processing + if (null != exec) { + LOGGER.info("Cleaning up query resources acquired during compaction") + exec.close(rawResultIteratorMap.get(CarbonCompactionUtil.UNSORTED_IDX), queryStartTime) + exec.close(rawResultIteratorMap.get(CarbonCompactionUtil.SORTED_IDX), queryStartTime) + } + // clean up the resources for processor + if (null != processor) { + LOGGER.info("Closing compaction processor instance to clean up loading resources") + processor.close() + } + // fill segment metadata to accumulator + CommonLoadUtils.fillSegmentMetaDataInfoToAccumulator(carbonLoadModel.getTableName, + carbonLoadModel.getSegmentId, + segmentMetaDataAccumulator) + } + + private def deleteLocalDataFolders(): Unit = { + try { + LOGGER.info("Deleting local folder store location") + val isCompactionFlow = true + TableProcessingOperations + .deleteLocalDataLoadFolderLocation(carbonLoadModel, isCompactionFlow, false) + } catch { + case e: Exception => + LOGGER.error(e) + } + } + +} diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/InsertTaskCompletionListener.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/InsertTaskCompletionListener.scala index b6a2fa7..0daf4a3 100644 --- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/InsertTaskCompletionListener.scala +++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/InsertTaskCompletionListener.scala @@ -20,21 +20,34 @@ package org.apache.carbondata.spark.rdd import org.apache.spark.TaskContext import org.apache.spark.sql.carbondata.execution.datasources.tasklisteners.CarbonLoadTaskCompletionListener import org.apache.spark.sql.execution.command.ExecutionErrors +import org.apache.spark.sql.execution.command.management.CommonLoadUtils +import org.apache.spark.util.CollectionAccumulator +import org.apache.carbondata.core.segmentmeta.SegmentMetaDataInfo import org.apache.carbondata.core.util.{DataTypeUtil, ThreadLocalTaskInfo} import org.apache.carbondata.processing.loading.{DataLoadExecutor, FailureCauses} import org.apache.carbondata.spark.util.CommonUtil class InsertTaskCompletionListener(dataLoadExecutor: DataLoadExecutor, - executorErrors: ExecutionErrors) + executorErrors: ExecutionErrors, + segmentMetaDataAccumulator: CollectionAccumulator[Map[String, SegmentMetaDataInfo]], + tableName: String, + segmentId: String) extends CarbonLoadTaskCompletionListener { override def onTaskCompletion(context: TaskContext): Unit = { try { - dataLoadExecutor.close() + // fill segment metadata to accumulator + CommonLoadUtils.fillSegmentMetaDataInfoToAccumulator( + tableName, + segmentId, + segmentMetaDataAccumulator) + if (null != dataLoadExecutor) { + dataLoadExecutor.close() + } } catch { case e: Exception => - if (executorErrors.failureCauses == FailureCauses.NONE) { + if (null != executorErrors && executorErrors.failureCauses == FailureCauses.NONE) { // If already error happened before task completion, // that error need to be thrown. Not the new error. Hence skip this. throw e diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala index c1f18b4..23a2683 100644 --- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala +++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala @@ -32,13 +32,14 @@ import org.apache.spark.serializer.SerializerInstance import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.command.ExecutionErrors -import org.apache.spark.util.SparkUtil +import org.apache.spark.util.{CollectionAccumulator, SparkUtil} import org.apache.carbondata.common.CarbonIterator import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.metadata.datatype.DataTypes +import org.apache.carbondata.core.segmentmeta.SegmentMetaDataInfo import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus} import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, DataTypeUtil} import org.apache.carbondata.core.util.path.CarbonTablePath @@ -100,7 +101,8 @@ class NewCarbonDataLoadRDD[K, V]( @transient private val ss: SparkSession, result: DataLoadResult[K, V], carbonLoadModel: CarbonLoadModel, - blocksGroupBy: Array[(String, Array[BlockDetails])]) + blocksGroupBy: Array[(String, Array[BlockDetails])], + segmentMetaDataAccumulator: CollectionAccumulator[Map[String, SegmentMetaDataInfo]]) extends CarbonRDD[(K, V)](ss, Nil) { ss.sparkContext.setLocalProperty("spark.scheduler.pool", "DDL") @@ -146,7 +148,13 @@ class NewCarbonDataLoadRDD[K, V]( val executor = new DataLoadExecutor() // in case of success, failure or cancelation clear memory and stop execution context - .addTaskCompletionListener { new InsertTaskCompletionListener(executor, executionErrors) } + .addTaskCompletionListener { + new InsertTaskCompletionListener(executor, + executionErrors, + segmentMetaDataAccumulator, + carbonLoadModel.getTableName, + carbonLoadModel.getSegment.getSegmentNo) + } executor.execute(model, loader.storeLocation, recordReaders) @@ -247,7 +255,9 @@ class NewDataFrameLoaderRDD[K, V]( @transient private val ss: SparkSession, result: DataLoadResult[K, V], carbonLoadModel: CarbonLoadModel, - prev: DataLoadCoalescedRDD[_]) extends CarbonRDD[(K, V)](ss, prev) { + prev: DataLoadCoalescedRDD[_], + segmentMetaDataAccumulator: CollectionAccumulator[Map[String, SegmentMetaDataInfo]] + ) extends CarbonRDD[(K, V)](ss, prev) { override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = { val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) @@ -298,7 +308,11 @@ class NewDataFrameLoaderRDD[K, V]( val executor = new DataLoadExecutor // in case of success, failure or cancelation clear memory and stop execution context - .addTaskCompletionListener(new InsertTaskCompletionListener(executor, executionErrors)) + .addTaskCompletionListener(new InsertTaskCompletionListener(executor, + executionErrors, + segmentMetaDataAccumulator, + carbonLoadModel.getTableName, + carbonLoadModel.getSegment.getSegmentNo)) executor.execute(model, loader.storeLocation, recordReaders.toArray) } catch { case e: NoRetryException => diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala index f4fdbc1..3060d4a 100644 --- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala +++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala @@ -21,9 +21,12 @@ import scala.collection.mutable import org.apache.spark.TaskContext import org.apache.spark.sql.Row +import org.apache.spark.sql.execution.command.management.CommonLoadUtils +import org.apache.spark.util.CollectionAccumulator import org.apache.carbondata.common.CarbonIterator import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.segmentmeta.SegmentMetaDataInfo import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus} import org.apache.carbondata.core.util.ThreadLocalTaskInfo import org.apache.carbondata.processing.loading.{DataLoadExecutor, TableProcessingOperations} @@ -40,7 +43,8 @@ object UpdateDataLoad { index: Long, iter: Iterator[Row], carbonLoadModel: CarbonLoadModel, - loadMetadataDetails: LoadMetadataDetails): Unit = { + loadMetadataDetails: LoadMetadataDetails, + segmentMetaDataAccumulator: CollectionAccumulator[Map[String, SegmentMetaDataInfo]]): Unit = { val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) try { val recordReaders = mutable.Buffer[CarbonIterator[Array[AnyRef]]]() @@ -58,6 +62,11 @@ object UpdateDataLoad { loadMetadataDetails.setSegmentStatus(SegmentStatus.SUCCESS) val executor = new DataLoadExecutor TaskContext.get().addTaskCompletionListener { context => + // fill segment metadata to accumulator + CommonLoadUtils.fillSegmentMetaDataInfoToAccumulator( + carbonLoadModel.getTableName, + segId, + segmentMetaDataAccumulator) executor.close() CommonUtil.clearUnsafeMemory(ThreadLocalTaskInfo.getCarbonTaskInfo.getTaskId) } diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/tasklisteners/CarbonTaskCompletionListener.scala b/integration/spark/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/tasklisteners/CarbonTaskCompletionListener.scala index 5547228..f3a63fb 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/tasklisteners/CarbonTaskCompletionListener.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/tasklisteners/CarbonTaskCompletionListener.scala @@ -39,6 +39,11 @@ trait CarbonQueryTaskCompletionListener extends TaskCompletionListener */ trait CarbonLoadTaskCompletionListener extends TaskCompletionListener +/** + * Load completion listener + */ +trait CarbonCompactionTaskCompletionListener extends TaskCompletionListener + case class CarbonQueryTaskCompletionListenerImpl(iter: RecordReaderIterator[InternalRow], freeMemory: Boolean = false) extends CarbonQueryTaskCompletionListener { override def onTaskCompletion(context: TaskContext): Unit = { diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala index b971dcd..c1888cb 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala @@ -39,6 +39,7 @@ import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, ICarbonLock, LockUsage} import org.apache.carbondata.core.metadata.{ColumnarFormatVersion, SegmentFileStore} import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.core.segmentmeta.SegmentMetaDataInfo import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager, StageInput} import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.hadoop.CarbonInputSplit @@ -282,17 +283,27 @@ case class CarbonInsertFromStageCommand( s"${table.getDatabaseName}.${table.getTableName}") val start = System.currentTimeMillis() val dataFrame = DataLoadProcessBuilderOnSpark.createInputDataFrame(spark, table, splits) + // accumulator to collect segment metadata info such as columnId and it's minMax values + val segmentMetaDataAccumulator = spark.sqlContext + .sparkContext + .collectionAccumulator[Map[String, SegmentMetaDataInfo]] if (table.getBucketingInfo == null) { DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort( spark, Option(dataFrame), loadModel, - SparkSQLUtil.sessionState(spark).newHadoopConf() + SparkSQLUtil.sessionState(spark).newHadoopConf(), + segmentMetaDataAccumulator ).map { row => (row._1, FailureCauses.NONE == row._2._2.failureCauses) } } else { - CarbonDataRDDFactory.loadDataFrame(spark.sqlContext, Option(dataFrame), None, loadModel) + CarbonDataRDDFactory.loadDataFrame( + spark.sqlContext, + Option(dataFrame), + None, + loadModel, + segmentMetaDataAccumulator) } LOGGER.info(s"finish data loading, time taken ${System.currentTimeMillis() - start}ms") diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala index 40a732a..f355795 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala @@ -41,7 +41,7 @@ import org.apache.spark.sql.types._ import org.apache.spark.sql.util.SparkSQLUtil import org.apache.spark.storage.StorageLevel import org.apache.spark.unsafe.types.UTF8String -import org.apache.spark.util.{CarbonReflectionUtils, SparkUtil} +import org.apache.spark.util.{CarbonReflectionUtils, CollectionAccumulator, SparkUtil} import org.apache.carbondata.common.Strings import org.apache.carbondata.common.logging.LogServiceFactory @@ -56,6 +56,7 @@ import org.apache.carbondata.core.metadata.encoder.Encoding import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo} import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, TupleIdEnum} +import org.apache.carbondata.core.segmentmeta.{SegmentMetaDataInfo, SegmentMetaDataInfoStats} import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager} import org.apache.carbondata.core.util._ import org.apache.carbondata.core.util.path.CarbonTablePath @@ -1089,4 +1090,80 @@ object CommonLoadUtils { } } + /** + * Fill segment level metadata to accumulator based on tableName and segmentId + */ + def fillSegmentMetaDataInfoToAccumulator( + tableName: String, + segmentId: String, + segmentMetaDataAccumulator: CollectionAccumulator[Map[String, SegmentMetaDataInfo]]) + : CollectionAccumulator[Map[String, SegmentMetaDataInfo]] = { + synchronized { + val segmentMetaDataInfo = SegmentMetaDataInfoStats.getInstance() + .getTableSegmentMetaDataInfo(tableName, segmentId) + if (null != segmentMetaDataInfo) { + segmentMetaDataAccumulator.add(scala.Predef + .Map(segmentId -> segmentMetaDataInfo)) + SegmentMetaDataInfoStats.getInstance().clear(tableName, segmentId) + } + segmentMetaDataAccumulator + } + } + + /** + * Collect segmentMetaDataInfo from all tasks and compare min-max values and prepare final + * segmentMetaDataInfo + * + * @param segmentId collect the segmentMetaDataInfo for the corresponding segmentId + * @param metaDataAccumulator segmentMetaDataAccumulator + * @return segmentMetaDataInfo + */ + def getSegmentMetaDataInfoFromAccumulator( + segmentId: String, + metaDataAccumulator: CollectionAccumulator[Map[String, SegmentMetaDataInfo]]) + : SegmentMetaDataInfo = { + var segmentMetaDataInfo: SegmentMetaDataInfo = null + if (!metaDataAccumulator.isZero) { + val segmentMetaData = metaDataAccumulator.value.asScala + segmentMetaData.foreach { segmentColumnMetaDataInfo => + val currentValue = segmentColumnMetaDataInfo.get(segmentId) + if (currentValue.isDefined) { + if (null == segmentMetaDataInfo) { + segmentMetaDataInfo = currentValue.get + } else if (segmentMetaDataInfo.getSegmentColumnMetaDataInfoMap.isEmpty) { + segmentMetaDataInfo = currentValue.get + } else { + val iterator = currentValue.get.getSegmentColumnMetaDataInfoMap + .entrySet() + .iterator() + while (iterator.hasNext) { + val currentSegmentColumnMetaDataMap = iterator.next() + if (segmentMetaDataInfo.getSegmentColumnMetaDataInfoMap + .containsKey(currentSegmentColumnMetaDataMap.getKey)) { + val currentMax = SegmentMetaDataInfoStats.getInstance() + .compareAndUpdateMinMax(segmentMetaDataInfo + .getSegmentColumnMetaDataInfoMap + .get(currentSegmentColumnMetaDataMap.getKey) + .getColumnMaxValue, + currentSegmentColumnMetaDataMap.getValue.getColumnMaxValue, + false) + val currentMin = SegmentMetaDataInfoStats.getInstance() + .compareAndUpdateMinMax(segmentMetaDataInfo.getSegmentColumnMetaDataInfoMap + .get(currentSegmentColumnMetaDataMap.getKey).getColumnMinValue, + currentSegmentColumnMetaDataMap.getValue.getColumnMinValue, + true) + segmentMetaDataInfo.getSegmentColumnMetaDataInfoMap + .get(currentSegmentColumnMetaDataMap.getKey) + .setColumnMaxValue(currentMax) + segmentMetaDataInfo.getSegmentColumnMetaDataInfoMap + .get(currentSegmentColumnMetaDataMap.getKey) + .setColumnMinValue(currentMin) + } + } + } + } + } + } + segmentMetaDataInfo + } } diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala index 91b5038..ef19585 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala @@ -203,6 +203,7 @@ object SecondaryIndexUtil { seg.getLoadName, segmentIdToLoadStartTimeMapping(seg.getLoadName).toString, carbonLoadModel.getFactTimeStamp.toString, + null, null) val segment = new Segment(seg.getLoadName, file) SegmentFileStore.updateTableStatusFile(indexCarbonTable, diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestPruneUsingSegmentMinMax.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestPruneUsingSegmentMinMax.scala new file mode 100644 index 0000000..ab8dd62 --- /dev/null +++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestPruneUsingSegmentMinMax.scala @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.spark.testsuite.allqueries + +import org.apache.spark.sql.Row +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterAll + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties + +class TestPruneUsingSegmentMinMax extends QueryTest with BeforeAndAfterAll { + + override def beforeAll(): Unit = { + drop + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_LOAD_ALL_SEGMENT_INDEXES_TO_CACHE, "false") + } + + private def drop = { + sql("drop table if exists carbon") + sql("drop table if exists parquet") + } + + test("test if matched segment is only loaded to cache") { + createTablesAndLoadData + checkAnswer(sql("select * from carbon where a=1"),sql("select * from parquet where a=1")) + val showCache = sql("show metacache on table carbon").collect() + assert(showCache(0).get(2).toString.equalsIgnoreCase("1/3 index files cached")) + drop + } + + private def createTablesAndLoadData = { + drop + sql("create table carbon(a int, b string, c double,d int,e timestamp) stored as carbondata") + sql("insert into carbon values(1,'ab',23.4,5,'2017-09-01 00:00:00'),(2,'aa',23.6,8,'2017-09-02 00:00:00')") + sql("insert into carbon values(3,'ab',23.4,5,'2017-09-01 00:00:00'),(4,'aa',23.6,8,'2017-09-02 00:00:00')") + sql("insert into carbon values(5,'ab',23.4,5,'2017-09-01 00:00:00'),(6,'aa',23.6,8,'2017-09-02 00:00:00')") + sql("create table parquet(a int, b string, c double,d int,e timestamp) using parquet") + sql("insert into parquet values(1,'ab',23.4,5,'2017-09-01 00:00:00'),(2,'aa',23.6,8,'2017-09-02 00:00:00')") + sql("insert into parquet values(3,'ab',23.4,5,'2017-09-01 00:00:00'),(4,'aa',23.6,8,'2017-09-02 00:00:00')") + sql("insert into parquet values(5,'ab',23.4,5,'2017-09-01 00:00:00'),(6,'aa',23.6,8,'2017-09-02 00:00:00')") + } + + test("test if matched segment is only loaded to cache after drop column") { + createTablesAndLoadData + checkAnswer(sql("select * from carbon where a=1"),sql("select * from parquet where a=1")) + var showCache = sql("show metacache on table carbon").collect() + assert(showCache(0).get(2).toString.equalsIgnoreCase("1/3 index files cached")) + checkAnswer(sql("select * from carbon where a=5"),sql("select * from parquet where a=5")) + showCache = sql("show metacache on table carbon").collect() + assert(showCache(0).get(2).toString.equalsIgnoreCase("2/3 index files cached")) + sql("alter table carbon drop columns(d,e)") + sql("insert into carbon values(7,'gg',45.6),(8,'eg',45.6)") + checkAnswer(sql("select a from carbon where a=1"),sql("select a from parquet where a=1")) + showCache = sql("show metacache on table carbon").collect() + assert(showCache(0).get(2).toString.equalsIgnoreCase("1/4 index files cached")) + checkAnswer(sql("select * from carbon where a=7"), Seq(Row(7, "gg", 45.6))) + showCache = sql("show metacache on table carbon").collect() + assert(showCache(0).get(2).toString.equalsIgnoreCase("2/4 index files cached")) + drop + } + + test("test if matched segment is only loaded to cache after add column") { + createTablesAndLoadData + sql("alter table carbon add columns(g decimal(3,2))") + sql("insert into carbon values(7,'gg',45.6,3,'2017-09-01 00:00:00',23.5),(8,'eg',45.6,6,'2017-09-01 00:00:00', 4.34)") + checkAnswer(sql("select a from carbon where a=1"),sql("select a from parquet where a=1")) + var showCache = sql("show metacache on table carbon").collect() + assert(showCache(0).get(2).toString.equalsIgnoreCase("1/4 index files cached")) + checkAnswer(sql("select a from carbon where a=7"), Seq(Row(7))) + showCache = sql("show metacache on table carbon").collect() + assert(showCache(0).get(2).toString.equalsIgnoreCase("2/4 index files cached")) + drop + } + + test("test segment pruning after update operation") { + createTablesAndLoadData + checkAnswer(sql("select a from carbon where a=1"), Seq(Row(1))) + var showCache = sql("show metacache on table carbon").collect() + assert(showCache(0).get(2).toString.equalsIgnoreCase("1/3 index files cached")) + sql("insert into carbon values(1,'ab',23.4,5,'2017-09-01 00:00:00'),(2,'aa',23.6,8,'2017-09-02 00:00:00')") + sql("insert into carbon values(1,'ab',23.4,5,'2017-09-01 00:00:00'),(2,'aa',23.6,8,'2017-09-02 00:00:00')") + sql("update carbon set(a)=(10) where a=1").show(false) + checkAnswer(sql("select count(*) from carbon where a=10"), Seq(Row(3))) + showCache = sql("show metacache on table carbon").collect() + assert(showCache(0).get(2).toString.equalsIgnoreCase("6/8 index files cached")) + drop + } + + test("alter set/unset sort column properties") { + createTablesAndLoadData + sql(s"alter table carbon set tblproperties('sort_scope'='local_sort', 'sort_columns'='a')") + sql("insert into carbon values(3,'ab',23.4,5,'2017-09-01 00:00:00'),(4,'aa',23.6,8,'2017-09-02 00:00:00')") + sql("insert into carbon values(3,'ab',23.4,5,'2017-09-01 00:00:00'),(6,'aa',23.6,8,'2017-09-02 00:00:00')") + assert(sql("select a from carbon where a=3").count() == 3) + val showCache = sql("show metacache on table carbon").collect() + assert(showCache(0).get(2).toString.equalsIgnoreCase("3/5 index files cached")) + } + + override def afterAll(): Unit = { + drop + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_LOAD_ALL_SEGMENT_INDEXES_TO_CACHE, + CarbonCommonConstants.CARBON_LOAD_ALL_SEGMENT_INDEXES_TO_CACHE_DEFAULT) + } + +} diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java index 17c47bd..fadaef0 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java @@ -40,7 +40,10 @@ import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.metadata.converter.SchemaConverter; import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl; import org.apache.carbondata.core.metadata.index.BlockIndexInfo; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; +import org.apache.carbondata.core.segmentmeta.BlockColumnMetaDataInfo; +import org.apache.carbondata.core.segmentmeta.SegmentMetaDataInfoStats; import org.apache.carbondata.core.util.CarbonMetadataUtil; import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.core.util.CarbonThreadFactory; @@ -389,6 +392,20 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter { indexHeader.setIs_sort(model.getSortScope() != null && model.getSortScope() != NO_SORT); // get the block index info thrift List<BlockIndex> blockIndexThrift = CarbonMetadataUtil.getBlockIndexInfo(blockIndexInfoList); + // get all block minmax and add to segmentMinMaxMap + CarbonTable carbonTable = model.getTableSpec().getCarbonTable(); + if (null != model.getSegmentId() && !carbonTable.isHivePartitionTable() && !carbonTable + .isIndexTable()) { + for (BlockIndexInfo blockIndex : blockIndexInfoList) { + byte[][] min = blockIndex.getBlockletIndex().getMinMaxIndex().getMinValues(); + byte[][] max = blockIndex.getBlockletIndex().getMinMaxIndex().getMaxValues(); + BlockColumnMetaDataInfo blockColumnMetaDataInfo = + new BlockColumnMetaDataInfo(thriftColumnSchemaList, min, max); + SegmentMetaDataInfoStats.getInstance() + .setBlockMetaDataInfo(model.getTableName(), model.getSegmentId(), + blockColumnMetaDataInfo); + } + } String indexFileName; if (enableDirectlyWriteDataToStorePath) { String rawFileName = model.getCarbonDataDirectoryPath() + CarbonCommonConstants.FILE_SEPARATOR