Repository: carbondata Updated Branches: refs/heads/master 269f4c378 -> 6e58418eb
[CARBONDATA-3062] Fix Compatibility issue with cache_level as blocklet In case of hybrid store we can have block as well as blocklet schema. Scenario: When there is a hybrid store in which few loads are from legacy store which do not contain the blocklet information and hence they will be, by default have cache_level as BLOCK and few loads with latest store which contain the BLOCKLET information and have cache_level BLOCKLET. For these type of scenarios we need to have separate task and footer schemas. For all loads with/without blocklet info there will not be any additional cost of maintaining 2 variables This closes #2883 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/6e58418e Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/6e58418e Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/6e58418e Branch: refs/heads/master Commit: 6e58418eb15effbf60290d2e1b8ff06f8613d714 Parents: 269f4c3 Author: Indhumathi27 <indhumathi...@gmail.com> Authored: Tue Oct 30 21:38:56 2018 +0530 Committer: manishgupta88 <tomanishgupt...@gmail.com> Committed: Fri Nov 2 10:54:49 2018 +0530 ---------------------------------------------------------------------- .../block/SegmentPropertiesAndSchemaHolder.java | 82 +++++++++++++------- .../indexstore/blockletindex/BlockDataMap.java | 2 +- .../blockletindex/BlockletDataMap.java | 2 +- ...ithColumnMetCacheAndCacheLevelProperty.scala | 2 +- .../merger/RowResultMergerProcessor.java | 6 +- 5 files changed, 57 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/6e58418e/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentPropertiesAndSchemaHolder.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentPropertiesAndSchemaHolder.java b/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentPropertiesAndSchemaHolder.java index cc6341b..1b7e1f8 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentPropertiesAndSchemaHolder.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentPropertiesAndSchemaHolder.java @@ -284,11 +284,17 @@ public class SegmentPropertiesAndSchemaHolder { private int[] columnCardinality; private SegmentProperties segmentProperties; private List<CarbonColumn> minMaxCacheColumns; - private CarbonRowSchema[] taskSummarySchema; - // same variable can be used for block and blocklet schema because at any given cache_level - // with either block or blocklet and whenever cache_level is changed the cache and its - // corresponding segmentProperties is flushed - private CarbonRowSchema[] fileFooterEntrySchema; + // in case of hybrid store we can have block as well as blocklet schema + // Scenario: When there is a hybrid store in which few loads are from legacy store which do + // not contain the blocklet information and hence they will be, by default have cache_level as + // BLOCK and few loads with latest store which contain the BLOCKLET information and have + // cache_level BLOCKLET. For these type of scenarios we need to have separate task and footer + // schemas. For all loads with/without blocklet info there will not be any additional cost + // of maintaining 2 variables + private CarbonRowSchema[] taskSummarySchemaForBlock; + private CarbonRowSchema[] taskSummarySchemaForBlocklet; + private CarbonRowSchema[] fileFooterEntrySchemaForBlock; + private CarbonRowSchema[] fileFooterEntrySchemaForBlocklet; public SegmentPropertiesWrapper(CarbonTable carbonTable, List<ColumnSchema> columnsInTable, int[] columnCardinality) { @@ -314,8 +320,10 @@ public class SegmentPropertiesAndSchemaHolder { if (null != minMaxCacheColumns) { minMaxCacheColumns.clear(); } - taskSummarySchema = null; - fileFooterEntrySchema = null; + taskSummarySchemaForBlock = null; + taskSummarySchemaForBlocklet = null; + fileFooterEntrySchemaForBlock = null; + fileFooterEntrySchemaForBlocklet = null; } @Override public boolean equals(Object obj) { @@ -350,48 +358,62 @@ public class SegmentPropertiesAndSchemaHolder { return columnCardinality; } - public CarbonRowSchema[] getTaskSummarySchema(boolean storeBlockletCount, + public CarbonRowSchema[] getTaskSummarySchemaForBlock(boolean storeBlockletCount, boolean filePathToBeStored) throws MemoryException { - if (null == taskSummarySchema) { + if (null == taskSummarySchemaForBlock) { synchronized (taskSchemaLock) { - if (null == taskSummarySchema) { - taskSummarySchema = SchemaGenerator + if (null == taskSummarySchemaForBlock) { + taskSummarySchemaForBlock = SchemaGenerator .createTaskSummarySchema(segmentProperties, minMaxCacheColumns, storeBlockletCount, filePathToBeStored); } } } - return taskSummarySchema; + return taskSummarySchemaForBlock; + } + + public CarbonRowSchema[] getTaskSummarySchemaForBlocklet(boolean storeBlockletCount, + boolean filePathToBeStored) throws MemoryException { + if (null == taskSummarySchemaForBlocklet) { + synchronized (taskSchemaLock) { + if (null == taskSummarySchemaForBlocklet) { + taskSummarySchemaForBlocklet = SchemaGenerator + .createTaskSummarySchema(segmentProperties, minMaxCacheColumns, storeBlockletCount, + filePathToBeStored); + } + } + } + return taskSummarySchemaForBlocklet; } public CarbonRowSchema[] getBlockFileFooterEntrySchema() { - return getOrCreateFileFooterEntrySchema(true); + if (null == fileFooterEntrySchemaForBlock) { + synchronized (fileFooterSchemaLock) { + if (null == fileFooterEntrySchemaForBlock) { + fileFooterEntrySchemaForBlock = + SchemaGenerator.createBlockSchema(segmentProperties, minMaxCacheColumns); + } + } + } + return fileFooterEntrySchemaForBlock; } public CarbonRowSchema[] getBlockletFileFooterEntrySchema() { - return getOrCreateFileFooterEntrySchema(false); + if (null == fileFooterEntrySchemaForBlocklet) { + synchronized (fileFooterSchemaLock) { + if (null == fileFooterEntrySchemaForBlocklet) { + fileFooterEntrySchemaForBlocklet = + SchemaGenerator.createBlockletSchema(segmentProperties, minMaxCacheColumns); + } + } + } + return fileFooterEntrySchemaForBlocklet; } public List<CarbonColumn> getMinMaxCacheColumns() { return minMaxCacheColumns; } - private CarbonRowSchema[] getOrCreateFileFooterEntrySchema(boolean isCacheLevelBlock) { - if (null == fileFooterEntrySchema) { - synchronized (fileFooterSchemaLock) { - if (null == fileFooterEntrySchema) { - if (isCacheLevelBlock) { - fileFooterEntrySchema = - SchemaGenerator.createBlockSchema(segmentProperties, minMaxCacheColumns); - } else { - fileFooterEntrySchema = - SchemaGenerator.createBlockletSchema(segmentProperties, minMaxCacheColumns); - } - } - } - } - return fileFooterEntrySchema; - } } /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/6e58418e/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java index 3ab5923..67405f4 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java @@ -1006,7 +1006,7 @@ public class BlockDataMap extends CoarseGrainDataMap SegmentPropertiesAndSchemaHolder.getInstance() .getSegmentPropertiesWrapper(segmentPropertiesIndex); try { - return segmentPropertiesWrapper.getTaskSummarySchema(true, isFilePathStored); + return segmentPropertiesWrapper.getTaskSummarySchemaForBlock(true, isFilePathStored); } catch (MemoryException e) { throw new RuntimeException(e); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/6e58418e/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java index 242fc9e..390e92f 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java @@ -87,7 +87,7 @@ public class BlockletDataMap extends BlockDataMap implements Serializable { SegmentPropertiesAndSchemaHolder.getInstance() .getSegmentPropertiesWrapper(segmentPropertiesIndex); try { - return segmentPropertiesWrapper.getTaskSummarySchema(false, isFilePathStored); + return segmentPropertiesWrapper.getTaskSummarySchemaForBlocklet(false, isFilePathStored); } catch (MemoryException e) { throw new RuntimeException(e); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/6e58418e/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala index 10a3be8..1c54c48 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala @@ -92,7 +92,7 @@ class TestQueryWithColumnMetCacheAndCacheLevelProperty extends QueryTest with Be expectedLength: Int, storeBlockletCount: Boolean = false): Boolean = { val index = dataMaps(0).asInstanceOf[BlockDataMap].getSegmentPropertiesIndex val summarySchema = SegmentPropertiesAndSchemaHolder.getInstance() - .getSegmentPropertiesWrapper(index).getTaskSummarySchema(storeBlockletCount, false) + .getSegmentPropertiesWrapper(index).getTaskSummarySchemaForBlock(storeBlockletCount, false) val minSchemas = summarySchema(0).asInstanceOf[CarbonRowSchema.StructCarbonRowSchema] .getChildSchemas minSchemas.length == expectedLength http://git-wip-us.apache.org/repos/asf/carbondata/blob/6e58418e/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java index 6475ba8..83e630b 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java @@ -30,9 +30,7 @@ import org.apache.carbondata.core.datastore.row.CarbonRow; import org.apache.carbondata.core.datastore.row.WriteStepRowUtil; import org.apache.carbondata.core.indexstore.PartitionSpec; import org.apache.carbondata.core.keygenerator.KeyGenException; -import org.apache.carbondata.core.metadata.CarbonMetadata; import org.apache.carbondata.core.metadata.SegmentFileStore; -import org.apache.carbondata.core.metadata.schema.table.CarbonTable; import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; import org.apache.carbondata.core.scan.result.iterator.RawResultIterator; import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper; @@ -74,7 +72,6 @@ public class RowResultMergerProcessor extends AbstractResultProcessor { this.loadModel = loadModel; CarbonDataProcessorUtil.createLocations(tempStoreLocation); - CarbonTable carbonTable = CarbonMetadata.getInstance().getCarbonTable(databaseName, tableName); String carbonStoreLocation; if (partitionSpec != null) { carbonStoreLocation = @@ -86,7 +83,8 @@ public class RowResultMergerProcessor extends AbstractResultProcessor { loadModel.getSegmentId()); } CarbonFactDataHandlerModel carbonFactDataHandlerModel = CarbonFactDataHandlerModel - .getCarbonFactDataHandlerModel(loadModel, carbonTable, segProp, tableName, + .getCarbonFactDataHandlerModel(loadModel, + loadModel.getCarbonDataLoadSchema().getCarbonTable(), segProp, tableName, tempStoreLocation, carbonStoreLocation); setDataFileAttributesInModel(loadModel, compactionType, carbonFactDataHandlerModel); carbonFactDataHandlerModel.setCompactionFlow(true);