Repository: carbondata Updated Branches: refs/heads/master 4eb37240f -> 34cb55194
[CARBONDATA-1851] Refactor to use only SegmentsToAccess for Aggregatetable, move tableFolderDeletion to TableProcessingOperations This closes #1616 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/34cb5519 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/34cb5519 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/34cb5519 Branch: refs/heads/master Commit: 34cb55194c2061efe54acaf4cea67d5b6179034c Parents: 4eb3724 Author: rahulforallp <[email protected]> Authored: Tue Dec 5 19:48:38 2017 +0530 Committer: Venkata Ramana G <[email protected]> Committed: Tue Dec 12 14:45:41 2017 +0530 ---------------------------------------------------------------------- .../filesystem/AbstractDFSCarbonFile.java | 4 +- .../core/metadata/schema/table/CarbonTable.java | 10 ++ .../carbondata/core/scan/model/QueryModel.java | 46 +++--- .../carbondata/core/util/CarbonProperties.java | 8 + .../carbondata/core/util/SessionParams.java | 7 +- .../hadoop/api/CarbonTableInputFormat.java | 17 +- .../hadoop/util/CarbonInputFormatUtil.java | 8 +- .../org/apache/carbondata/events/Events.scala | 2 +- .../load/DataLoadProcessorStepOnSpark.scala | 4 +- .../spark/rdd/AlterTableLoadPartitionRDD.scala | 3 +- .../carbondata/spark/rdd/CarbonMergerRDD.scala | 3 +- .../carbondata/spark/rdd/CarbonScanRDD.scala | 20 +-- .../spark/rdd/NewCarbonDataLoadRDD.scala | 10 +- .../carbondata/spark/rdd/UpdateDataLoad.scala | 6 +- .../apache/carbondata/spark/rdd/Compactor.scala | 5 +- .../management/CarbonCleanFilesCommand.scala | 9 +- .../CarbonAlterTableDropPartitionCommand.scala | 7 +- .../CarbonAlterTableSplitPartitionCommand.scala | 7 +- .../loading/TableProcessingOperations.java | 164 +++++++++++++++++++ .../store/CarbonFactDataHandlerModel.java | 2 +- .../processing/util/CarbonLoaderUtil.java | 100 ----------- 21 files changed, 249 insertions(+), 193 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/34cb5519/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java index 3eb97bc..fcd230a 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java @@ -63,7 +63,7 @@ public abstract class AbstractDFSCarbonFile implements CarbonFile { fs = path.getFileSystem(this.hadoopConf); fileStatus = fs.getFileStatus(path); } catch (IOException e) { - LOGGER.error("Exception occurred:" + e.getMessage()); + LOGGER.debug("Exception occurred:" + e.getMessage()); } } @@ -78,7 +78,7 @@ public abstract class AbstractDFSCarbonFile implements CarbonFile { fs = path.getFileSystem(this.hadoopConf); fileStatus = fs.getFileStatus(path); } catch (IOException e) { - LOGGER.error("Exception occurred:" + e.getMessage()); + LOGGER.debug("Exception occurred:" + e.getMessage()); } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/34cb5519/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java index e5d8839..4ebc02d 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java @@ -273,6 +273,16 @@ public class CarbonTable implements Serializable { } /** + * to get the all dimension of a table + * + * @param tableName + * @return + */ + public List<CarbonDimension> getImplicitDimensionByTableName(String tableName) { + return tableImplicitDimensionsMap.get(tableName); + } + + /** * Read all primitive/complex children and set it as list of child carbon dimension to parent * dimension * http://git-wip-us.apache.org/repos/asf/carbondata/blob/34cb5519/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java index 67b8681..5e4872b 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java @@ -122,10 +122,9 @@ public class QueryModel implements Serializable { public static QueryModel createModel(AbsoluteTableIdentifier absoluteTableIdentifier, CarbonQueryPlan queryPlan, CarbonTable carbonTable, DataTypeConverter converter) { QueryModel queryModel = new QueryModel(); - String factTableName = carbonTable.getTableName(); queryModel.setAbsoluteTableIdentifier(absoluteTableIdentifier); - fillQueryModel(queryPlan, carbonTable, queryModel, factTableName); + fillQueryModel(queryPlan, carbonTable, queryModel); queryModel.setForcedDetailRawQuery(queryPlan.isRawDetailQuery()); queryModel.setQueryId(queryPlan.getQueryId()); @@ -134,7 +133,7 @@ public class QueryModel implements Serializable { } private static void fillQueryModel(CarbonQueryPlan queryPlan, CarbonTable carbonTable, - QueryModel queryModel, String factTableName) { + QueryModel queryModel) { queryModel.setAbsoluteTableIdentifier(carbonTable.getAbsoluteTableIdentifier()); queryModel.setQueryDimension(queryPlan.getDimensions()); queryModel.setQueryMeasures(queryPlan.getMeasures()); @@ -142,9 +141,8 @@ public class QueryModel implements Serializable { boolean[] isFilterDimensions = new boolean[carbonTable.getDimensionOrdinalMax()]; boolean[] isFilterMeasures = new boolean[carbonTable.getNumberOfMeasures(carbonTable.getTableName())]; - processFilterExpression(queryPlan.getFilterExpression(), - carbonTable.getDimensionByTableName(factTableName), - carbonTable.getMeasureByTableName(factTableName), isFilterDimensions, isFilterMeasures); + processFilterExpression(carbonTable, queryPlan.getFilterExpression(), isFilterDimensions, + isFilterMeasures); queryModel.setIsFilterDimensions(isFilterDimensions); queryModel.setIsFilterMeasures(isFilterMeasures); } @@ -153,8 +151,7 @@ public class QueryModel implements Serializable { queryModel.setTable(carbonTable); } - public static void processFilterExpression(Expression filterExpression, - List<CarbonDimension> dimensions, List<CarbonMeasure> measures, + public static void processFilterExpression(CarbonTable carbonTable, Expression filterExpression, final boolean[] isFilterDimensions, final boolean[] isFilterMeasures) { if (null != filterExpression) { if (null != filterExpression.getChildren() && filterExpression.getChildren().size() == 0) { @@ -162,24 +159,22 @@ public class QueryModel implements Serializable { List<ColumnExpression> listOfCol = ((ConditionalExpression) filterExpression).getColumnList(); for (ColumnExpression expression : listOfCol) { - setDimAndMsrColumnNode(dimensions, measures, expression, isFilterDimensions, - isFilterMeasures); + setDimAndMsrColumnNode(carbonTable, expression, isFilterDimensions, isFilterMeasures); } } } for (Expression expression : filterExpression.getChildren()) { if (expression instanceof ColumnExpression) { - setDimAndMsrColumnNode(dimensions, measures, (ColumnExpression) expression, - isFilterDimensions, isFilterMeasures); + setDimAndMsrColumnNode(carbonTable, (ColumnExpression) expression, isFilterDimensions, + isFilterMeasures); } else if (expression instanceof UnknownExpression) { UnknownExpression exp = ((UnknownExpression) expression); List<ColumnExpression> listOfColExpression = exp.getAllColumnList(); for (ColumnExpression col : listOfColExpression) { - setDimAndMsrColumnNode(dimensions, measures, col, isFilterDimensions, isFilterMeasures); + setDimAndMsrColumnNode(carbonTable, col, isFilterDimensions, isFilterMeasures); } } else { - processFilterExpression(expression, dimensions, measures, isFilterDimensions, - isFilterMeasures); + processFilterExpression(carbonTable, expression, isFilterDimensions, isFilterMeasures); } } } @@ -195,15 +190,16 @@ public class QueryModel implements Serializable { return null; } - private static void setDimAndMsrColumnNode(List<CarbonDimension> dimensions, - List<CarbonMeasure> measures, ColumnExpression col, boolean[] isFilterDimensions, - boolean[] isFilterMeasures) { + private static void setDimAndMsrColumnNode(CarbonTable carbonTable, ColumnExpression col, + boolean[] isFilterDimensions, boolean[] isFilterMeasures) { CarbonDimension dim; CarbonMeasure msr; String columnName; columnName = col.getColumnName(); - dim = CarbonUtil.findDimension(dimensions, columnName); - msr = getCarbonMetadataMeasure(columnName, measures); + dim = CarbonUtil + .findDimension(carbonTable.getDimensionByTableName(carbonTable.getTableName()), columnName); + msr = getCarbonMetadataMeasure(columnName, + carbonTable.getMeasureByTableName(carbonTable.getTableName())); col.setDimension(false); col.setMeasure(false); @@ -215,13 +211,21 @@ public class QueryModel implements Serializable { if (null != isFilterDimensions) { isFilterDimensions[dim.getOrdinal()] = true; } - } else { + } else if (msr != null) { col.setCarbonColumn(msr); col.setMeasure(msr); col.setMeasure(true); if (null != isFilterMeasures) { isFilterMeasures[msr.getOrdinal()] = true; } + } else { + // check if this is an implicit dimension + dim = CarbonUtil + .findDimension(carbonTable.getImplicitDimensionByTableName(carbonTable.getTableName()), + columnName); + col.setCarbonColumn(dim); + col.setDimension(dim); + col.setDimension(true); } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/34cb5519/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java index 281ee15..fe396cb 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java +++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java @@ -990,4 +990,12 @@ public final class CarbonProperties { return addedProperty; } + /** + * to add external property + * + * @param externalPropertySet + */ + public void addPropertyToPropertySet(Set<String> externalPropertySet) { + propertySet.addAll(externalPropertySet); + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/34cb5519/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java index 5dda9e4..0540ed6 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java +++ b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java @@ -80,12 +80,7 @@ public class SessionParams implements Serializable { * @return properties value */ public SessionParams addProperty(String key, String value) throws InvalidConfigurationException { - boolean isValidConf = validateKeyValue(key, value); - if (isValidConf) { - LOGGER.audit("The key " + key + " with value " + value + " added in the session param"); - sProps.put(key, value); - } - return this; + return addProperty(key, value, true); } /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/34cb5519/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java index 15d1304..c16b0aa 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java @@ -280,14 +280,6 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> { configuration.set(CarbonTableInputFormat.VALIDATE_INPUT_SEGMENT_IDs, validate.toString()); } - public static void setAggeragateTableSegments(Configuration configuration, String segments) { - configuration.set(CarbonCommonConstants.CARBON_INPUT_SEGMENTS, segments); - } - - private static String getAggeragateTableSegments(Configuration configuration) { - return configuration.get(CarbonCommonConstants.CARBON_INPUT_SEGMENTS); - } - /** * get list of segment to access */ @@ -330,14 +322,13 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> { if (null == carbonTable) { throw new IOException("Missing/Corrupt schema file for table."); } - String aggregateTableSegments = getAggeragateTableSegments(job.getConfiguration()); TableDataMap blockletMap = DataMapStoreManager.getInstance().getDataMap(identifier, BlockletDataMap.NAME, BlockletDataMapFactory.class.getName()); List<String> invalidSegments = new ArrayList<>(); List<UpdateVO> invalidTimestampsList = new ArrayList<>(); List<String> streamSegments = null; - List<String> filteredSegmentToAccess = null; + if (getValidateSegmentsToAccess(job.getConfiguration())) { // get all valid segments and set them into the configuration SegmentStatusManager segmentStatusManager = new SegmentStatusManager(identifier); @@ -349,7 +340,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> { return getSplitsOfStreaming(job, identifier, streamSegments); } - filteredSegmentToAccess = getFilteredSegment(job, validSegments); + List<String> filteredSegmentToAccess = getFilteredSegment(job, validSegments); if (filteredSegmentToAccess.size() == 0) { return new ArrayList<>(0); } else { @@ -363,10 +354,10 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> { if (invalidSegments.size() > 0) { blockletMap.clear(invalidSegments); } - } else { - filteredSegmentToAccess = Arrays.asList(aggregateTableSegments.split(",")); } + // get updated filtered list + List<String> filteredSegmentToAccess = Arrays.asList(getSegmentsToAccess(job)); // Clean the updated segments from memory if the update happens on segments List<String> toBeCleanedSegments = new ArrayList<>(); for (SegmentUpdateDetails segmentUpdateDetail : updateStatusManager http://git-wip-us.apache.org/repos/asf/carbondata/blob/34cb5519/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java index bf1b188..514428b 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java @@ -121,12 +121,8 @@ public class CarbonInputFormatUtil { public static void processFilterExpression(Expression filterExpression, CarbonTable carbonTable, boolean[] isFilterDimensions, boolean[] isFilterMeasures) { - List<CarbonDimension> dimensions = - carbonTable.getDimensionByTableName(carbonTable.getTableName()); - List<CarbonMeasure> measures = - carbonTable.getMeasureByTableName(carbonTable.getTableName()); - QueryModel.processFilterExpression(filterExpression, dimensions, measures, - isFilterDimensions, isFilterMeasures); + QueryModel.processFilterExpression(carbonTable, filterExpression, isFilterDimensions, + isFilterMeasures); if (null != filterExpression) { // Optimize Filter Expression and fit RANGE filters is conditions apply. http://git-wip-us.apache.org/repos/asf/carbondata/blob/34cb5519/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala index 8e69855..799d8c4 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala @@ -20,7 +20,7 @@ package org.apache.carbondata.events import org.apache.spark.sql.SparkSession import org.apache.spark.sql.execution.command.{AlterTableAddColumnsModel, AlterTableDataTypeChangeModel, AlterTableDropColumnModel, AlterTableRenameModel, CarbonMergerMapping} -import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier} +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.processing.loading.model.CarbonLoadModel http://git-wip-us.apache.org/repos/asf/carbondata/blob/34cb5519/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala index c28426d..6759b20 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala @@ -27,7 +27,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException import org.apache.carbondata.core.datastore.row.CarbonRow import org.apache.carbondata.core.util.CarbonProperties -import org.apache.carbondata.processing.loading.DataLoadProcessBuilder +import org.apache.carbondata.processing.loading.{DataLoadProcessBuilder, TableProcessingOperations} import org.apache.carbondata.processing.loading.converter.impl.RowConverterImpl import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException @@ -206,7 +206,7 @@ object DataLoadProcessorStepOnSpark { dataWriter.close() } // clean up the folders and files created locally for data load operation - CarbonLoaderUtil.deleteLocalDataLoadFolderLocation(model, false, false) + TableProcessingOperations.deleteLocalDataLoadFolderLocation(model, false, false) } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/34cb5519/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala index 37ab8c3..1ecab9f 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala @@ -28,6 +28,7 @@ import org.apache.spark.util.PartitionUtils import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.processing.loading.TableProcessingOperations import org.apache.carbondata.processing.partition.spliter.RowResultProcessor import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, CarbonLoaderUtil} import org.apache.carbondata.spark.AlterPartitionResult @@ -119,7 +120,7 @@ class AlterTableLoadPartitionRDD[K, V](alterPartitionModel: AlterPartitionModel, case e: Exception => sys.error(s"Exception when executing Row result processor ${e.getMessage}") } finally { - CarbonLoaderUtil + TableProcessingOperations .deleteLocalDataLoadFolderLocation(carbonLoadModel, false, true) } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/34cb5519/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala index 82b2a57..fb4634e 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala @@ -46,6 +46,7 @@ import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, DataTypeUt import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonMultiBlockSplit} import org.apache.carbondata.hadoop.api.CarbonTableInputFormat import org.apache.carbondata.hadoop.util.{CarbonInputFormatUtil, CarbonInputSplitTaskInfo} +import org.apache.carbondata.processing.loading.TableProcessingOperations import org.apache.carbondata.processing.loading.model.CarbonLoadModel import org.apache.carbondata.processing.merger._ import org.apache.carbondata.processing.splits.TableSplit @@ -233,7 +234,7 @@ class CarbonMergerRDD[K, V]( // delete temp location data try { val isCompactionFlow = true - CarbonLoaderUtil + TableProcessingOperations .deleteLocalDataLoadFolderLocation(carbonLoadModel, isCompactionFlow, false) } catch { case e: Exception => http://git-wip-us.apache.org/repos/asf/carbondata/blob/34cb5519/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala index d599c22..cc7f757 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala @@ -357,22 +357,14 @@ class CarbonScanRDD( CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP_DEFAULT).toBoolean) { CarbonTableInputFormat.setDataMapJob(conf, new SparkDataMapJob) } + + // when validate segments is disabled in thread local update it to CarbonTableInputFormat val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo if (carbonSessionInfo != null) { - val segmentsToScan = carbonSessionInfo.getSessionParams.getProperty( - CarbonCommonConstants.CARBON_INPUT_SEGMENTS + - identifier.getCarbonTableIdentifier.getDatabaseName + "." + - identifier.getCarbonTableIdentifier.getTableName) - if (segmentsToScan != null) { - CarbonTableInputFormat.setAggeragateTableSegments(conf, segmentsToScan) - } - val validateSegments = carbonSessionInfo.getSessionParams.getProperty( - CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS + - identifier.getCarbonTableIdentifier.getDatabaseName + "." + - identifier.getCarbonTableIdentifier.getTableName) - if (validateSegments != null) { - CarbonTableInputFormat.setValidateSegmentsToAccess(conf, validateSegments.toBoolean) - } + CarbonTableInputFormat.setValidateSegmentsToAccess(conf, carbonSessionInfo.getSessionParams + .getProperty(CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS + + identifier.getCarbonTableIdentifier.getDatabaseName + "." + + identifier.getCarbonTableIdentifier.getTableName, "true").toBoolean) } format } http://git-wip-us.apache.org/repos/asf/carbondata/blob/34cb5519/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala index f948ac8..b27521a 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala @@ -45,12 +45,12 @@ import org.apache.carbondata.core.datastore.compression.CompressorFactory import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus} import org.apache.carbondata.core.statusmanager.LoadMetadataDetails import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, ThreadLocalTaskInfo} -import org.apache.carbondata.processing.loading.{DataLoadExecutor, FailureCauses} +import org.apache.carbondata.processing.loading.{DataLoadExecutor, FailureCauses, TableProcessingOperations} import org.apache.carbondata.processing.loading.csvinput.{BlockDetails, CSVInputFormat, CSVRecordReaderIterator} import org.apache.carbondata.processing.loading.exception.NoRetryException import org.apache.carbondata.processing.loading.model.CarbonLoadModel import org.apache.carbondata.processing.splits.TableSplit -import org.apache.carbondata.processing.util.{CarbonLoaderUtil, CarbonQueryUtil} +import org.apache.carbondata.processing.util.CarbonQueryUtil import org.apache.carbondata.spark.DataLoadResult import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, Util} @@ -261,7 +261,7 @@ class NewCarbonDataLoadRDD[K, V]( throw e } finally { // clean up the folders and files created locally for data load operation - CarbonLoaderUtil.deleteLocalDataLoadFolderLocation(model, false, false) + TableProcessingOperations.deleteLocalDataLoadFolderLocation(model, false, false) // in case of failure the same operation will be re-tried several times. // So print the data load statistics only in case of non failure case if (SegmentStatus.LOAD_FAILURE != loadMetadataDetails.getSegmentStatus) { @@ -403,7 +403,7 @@ class NewDataFrameLoaderRDD[K, V]( throw e } finally { // clean up the folders and files created locally for data load operation - CarbonLoaderUtil.deleteLocalDataLoadFolderLocation(model, false, false) + TableProcessingOperations.deleteLocalDataLoadFolderLocation(model, false, false) // in case of failure the same operation will be re-tried several times. // So print the data load statistics only in case of non failure case if (SegmentStatus.LOAD_FAILURE != loadMetadataDetails.getSegmentStatus) { @@ -587,7 +587,7 @@ class PartitionTableDataLoaderRDD[K, V]( throw e } finally { // clean up the folders and files created locally for data load operation - CarbonLoaderUtil.deleteLocalDataLoadFolderLocation(model, false, false) + TableProcessingOperations.deleteLocalDataLoadFolderLocation(model, false, false) // in case of failure the same operation will be re-tried several times. // So print the data load statistics only in case of non failure case if (SegmentStatus.LOAD_FAILURE != loadMetadataDetails.getSegmentStatus) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/34cb5519/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala index b1dfc01..4934cbc 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala @@ -24,11 +24,9 @@ import org.apache.spark.sql.Row import org.apache.carbondata.common.CarbonIterator import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus} +import org.apache.carbondata.processing.loading.{DataLoadExecutor, TableProcessingOperations} import org.apache.carbondata.processing.loading.model.CarbonLoadModel -import org.apache.carbondata.processing.loading.DataLoadExecutor -import org.apache.carbondata.processing.util.CarbonLoaderUtil /** * Data load in case of update command . @@ -65,7 +63,7 @@ object UpdateDataLoad { LOGGER.error(e) throw e } finally { - CarbonLoaderUtil.deleteLocalDataLoadFolderLocation(carbonLoadModel, false, false) + TableProcessingOperations.deleteLocalDataLoadFolderLocation(carbonLoadModel, false, false) } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/34cb5519/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala index 6fafc95..6da8bd6 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala @@ -24,9 +24,9 @@ import org.apache.spark.sql.execution.command.CompactionModel import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} import org.apache.carbondata.core.statusmanager.LoadMetadataDetails +import org.apache.carbondata.processing.loading.TableProcessingOperations import org.apache.carbondata.processing.loading.model.CarbonLoadModel import org.apache.carbondata.processing.merger.CarbonDataMergerUtil -import org.apache.carbondata.processing.util.CarbonLoaderUtil abstract class Compactor(carbonLoadModel: CarbonLoadModel, compactionModel: CompactionModel, @@ -52,7 +52,8 @@ abstract class Compactor(carbonLoadModel: CarbonLoadModel, // status. // so deleting those folders. try { - CarbonLoaderUtil.deletePartialLoadDataIfExist(carbonLoadModel, true) + TableProcessingOperations + .deletePartialLoadDataIfExist(carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable, true) } catch { case e: Exception => LOGGER.error(s"Exception in compaction thread while clean up of stale segments" + http://git-wip-us.apache.org/repos/asf/carbondata/blob/34cb5519/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala index eacfded..e0530f6 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala @@ -57,7 +57,7 @@ case class CarbonCleanFilesCommand( } val cleanFilesPostEvent: CleanFilesPostEvent = CleanFilesPostEvent(carbonTable, sparkSession) - OperationListenerBus.getInstance.fireEvent(cleanFilesPostEvent) + OperationListenerBus.getInstance.fireEvent(cleanFilesPostEvent, operationContext) Seq.empty } @@ -76,10 +76,6 @@ case class CarbonCleanFilesCommand( private def cleanGarbageData(sparkSession: SparkSession, databaseNameOp: Option[String], tableName: String): Unit = { val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession) - val cleanFilesPreEvent: CleanFilesPreEvent = - CleanFilesPreEvent(carbonTable, - sparkSession) - OperationListenerBus.getInstance.fireEvent(cleanFilesPreEvent) CarbonStore.cleanFiles( CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession), @@ -87,9 +83,6 @@ case class CarbonCleanFilesCommand( CarbonProperties.getStorePath, carbonTable, forceTableClean) - - val cleanFilesPostEvent: CleanFilesPostEvent = CleanFilesPostEvent(carbonTable, sparkSession) - OperationListenerBus.getInstance.fireEvent(cleanFilesPostEvent) } private def cleanGarbageDataInAllTables(sparkSession: SparkSession): Unit = { http://git-wip-us.apache.org/repos/asf/carbondata/blob/34cb5519/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala index 69aa91a..fb515fa 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala @@ -35,13 +35,12 @@ import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage} import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata} import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl import org.apache.carbondata.core.metadata.schema.partition.PartitionType -import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.mutate.CarbonUpdateUtil import org.apache.carbondata.core.statusmanager.SegmentStatusManager import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} import org.apache.carbondata.core.util.path.CarbonStorePath +import org.apache.carbondata.processing.loading.TableProcessingOperations import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel} -import org.apache.carbondata.processing.util.CarbonLoaderUtil import org.apache.carbondata.spark.partition.DropPartitionCallable case class CarbonAlterTableDropPartitionCommand( @@ -224,7 +223,9 @@ case class CarbonAlterTableDropPartitionCommand( } finally { executor.shutdown() try { - CarbonLoaderUtil.deletePartialLoadDataIfExist(carbonLoadModel, false) + TableProcessingOperations + .deletePartialLoadDataIfExist(carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable, + false) } catch { case e: Exception => LOGGER.error(s"Exception in dropping partition thread while deleting partial load file" + http://git-wip-us.apache.org/repos/asf/carbondata/blob/34cb5519/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala index 338ec5a..1a535fd 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala @@ -38,13 +38,12 @@ import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetad import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl import org.apache.carbondata.core.metadata.schema.PartitionInfo import org.apache.carbondata.core.metadata.schema.partition.PartitionType -import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.mutate.CarbonUpdateUtil import org.apache.carbondata.core.statusmanager.SegmentStatusManager import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} import org.apache.carbondata.core.util.path.CarbonStorePath +import org.apache.carbondata.processing.loading.TableProcessingOperations import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel} -import org.apache.carbondata.processing.util.CarbonLoaderUtil import org.apache.carbondata.spark.partition.SplitPartitionCallable /** @@ -233,7 +232,9 @@ case class CarbonAlterTableSplitPartitionCommand( } finally { executor.shutdown() try { - CarbonLoaderUtil.deletePartialLoadDataIfExist(carbonLoadModel, false) + TableProcessingOperations + .deletePartialLoadDataIfExist(carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable, + false) } catch { case e: Exception => LOGGER.error(s"Exception in add/split partition thread while deleting partial load file" + http://git-wip-us.apache.org/repos/asf/carbondata/blob/34cb5519/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java b/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java new file mode 100644 index 0000000..cb53d6e --- /dev/null +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.processing.loading; + +import java.io.File; +import java.io.IOException; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.datastore.filesystem.CarbonFile; +import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; +import org.apache.carbondata.core.statusmanager.SegmentStatusManager; +import org.apache.carbondata.core.util.CarbonProperties; +import org.apache.carbondata.core.util.CarbonThreadFactory; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.path.CarbonStorePath; +import org.apache.carbondata.core.util.path.CarbonTablePath; +import org.apache.carbondata.processing.loading.model.CarbonLoadModel; +import org.apache.carbondata.processing.util.CarbonDataProcessorUtil; +import org.apache.carbondata.processing.util.CarbonLoaderUtil; + +import org.apache.commons.lang3.StringUtils; +import org.apache.spark.annotation.DeveloperApi; + +@DeveloperApi +public class TableProcessingOperations { + private static final LogService LOGGER = + LogServiceFactory.getLogService(CarbonLoaderUtil.class.getName()); + + /** + * + * @param carbonTable + * @param isCompactionFlow + * @throws IOException + */ + public static void deletePartialLoadDataIfExist(CarbonTable carbonTable, + final boolean isCompactionFlow) throws IOException { + String metaDataLocation = carbonTable.getMetaDataFilepath(); + final LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metaDataLocation); + CarbonTablePath carbonTablePath = CarbonStorePath + .getCarbonTablePath(carbonTable.getTablePath(), carbonTable.getCarbonTableIdentifier()); + + //delete folder which metadata no exist in tablestatus + for (int i = 0; i < carbonTable.getPartitionCount(); i++) { + final String partitionCount = i + ""; + String partitionPath = carbonTablePath.getPartitionDir(partitionCount); + FileFactory.FileType fileType = FileFactory.getFileType(partitionPath); + if (FileFactory.isFileExist(partitionPath, fileType)) { + CarbonFile carbonFile = FileFactory.getCarbonFile(partitionPath, fileType); + CarbonFile[] listFiles = carbonFile.listFiles(new CarbonFileFilter() { + @Override public boolean accept(CarbonFile path) { + String segmentId = + CarbonTablePath.DataPathUtil.getSegmentId(path.getAbsolutePath() + "/dummy"); + boolean found = false; + for (int j = 0; j < details.length; j++) { + if (details[j].getLoadName().equals(segmentId) && details[j].getPartitionCount() + .equals(partitionCount)) { + found = true; + break; + } + } + return !found; + } + }); + for (int k = 0; k < listFiles.length; k++) { + String segmentId = + CarbonTablePath.DataPathUtil.getSegmentId(listFiles[k].getAbsolutePath() + "/dummy"); + if (isCompactionFlow) { + if (segmentId.contains(".")) { + CarbonLoaderUtil.deleteStorePath(listFiles[k].getAbsolutePath()); + } + } else { + if (!segmentId.contains(".")) { + CarbonLoaderUtil.deleteStorePath(listFiles[k].getAbsolutePath()); + } + } + } + } + } + } + + /** + * + * This method will delete the local data load folder location after data load is complete + * + * @param loadModel + * @param isCompactionFlow COMPACTION keyword will be added to path to make path unique if true + * @param isAltPartitionFlow Alter_Partition keyword will be added to path to make path unique if + * true + */ + public static void deleteLocalDataLoadFolderLocation(CarbonLoadModel loadModel, + boolean isCompactionFlow, boolean isAltPartitionFlow) { + String tableName = loadModel.getTableName(); + String databaseName = loadModel.getDatabaseName(); + String tempLocationKey = CarbonDataProcessorUtil + .getTempStoreLocationKey(databaseName, tableName, loadModel.getSegmentId(), + loadModel.getTaskNo(), isCompactionFlow, isAltPartitionFlow); + deleteLocalDataLoadFolderLocation(tempLocationKey, tableName); + } + + /** + * + * This method will delete the local data load folder location after data load is complete + * + * @param tempLocationKey temporary location set in carbon properties + * @param tableName + */ + public static void deleteLocalDataLoadFolderLocation(String tempLocationKey, String tableName) { + + // form local store location + final String localStoreLocations = CarbonProperties.getInstance().getProperty(tempLocationKey); + if (localStoreLocations == null) { + throw new RuntimeException("Store location not set for the key " + tempLocationKey); + } + // submit local folder clean up in another thread so that main thread execution is not blocked + ExecutorService localFolderDeletionService = Executors + .newFixedThreadPool(1, new CarbonThreadFactory("LocalFolderDeletionPool:" + tableName)); + try { + localFolderDeletionService.submit(new Callable<Void>() { + @Override public Void call() throws Exception { + long startTime = System.currentTimeMillis(); + String[] locArray = StringUtils.split(localStoreLocations, File.pathSeparator); + for (String loc : locArray) { + try { + CarbonUtil.deleteFoldersAndFiles(new File(loc)); + } catch (IOException | InterruptedException e) { + LOGGER.error(e, "Failed to delete local data load folder location: " + loc); + } + } + LOGGER.info( + "Deleted the local store location: " + localStoreLocations + " : Time taken: " + ( + System.currentTimeMillis() - startTime)); + return null; + } + }); + } finally { + if (null != localFolderDeletionService) { + localFolderDeletionService.shutdown(); + } + } + + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/34cb5519/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java index 37b585d..a8ae513 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java @@ -305,7 +305,7 @@ public class CarbonFactDataHandlerModel { } carbonFactDataHandlerModel.setMeasureDataType(measureDataTypes); String carbonDataDirectoryPath = CarbonDataProcessorUtil - .checkAndCreateCarbonStoreLocation(loadModel.getTablePath(), loadModel.getDatabaseName(), + .checkAndCreateCarbonStoreLocation(carbonTable.getTablePath(), loadModel.getDatabaseName(), tableName, loadModel.getPartitionId(), loadModel.getSegmentId()); carbonFactDataHandlerModel.setCarbonDataDirectoryPath(carbonDataDirectoryPath); List<CarbonDimension> dimensionByTableName = carbonTable.getDimensionByTableName(tableName); http://git-wip-us.apache.org/repos/asf/carbondata/blob/34cb5519/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java index 4275603..9e6a73e 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java @@ -18,7 +18,6 @@ package org.apache.carbondata.processing.util; import java.io.BufferedWriter; import java.io.DataOutputStream; -import java.io.File; import java.io.IOException; import java.io.OutputStreamWriter; import java.net.InetAddress; @@ -26,9 +25,6 @@ import java.net.UnknownHostException; import java.nio.charset.Charset; import java.text.SimpleDateFormat; import java.util.*; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; @@ -56,8 +52,6 @@ import org.apache.carbondata.core.mutate.CarbonUpdateUtil; import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; import org.apache.carbondata.core.statusmanager.SegmentStatus; import org.apache.carbondata.core.statusmanager.SegmentStatusManager; -import org.apache.carbondata.core.util.CarbonProperties; -import org.apache.carbondata.core.util.CarbonThreadFactory; import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.core.util.path.CarbonStorePath; import org.apache.carbondata.core.util.path.CarbonTablePath; @@ -66,7 +60,6 @@ import org.apache.carbondata.processing.merger.NodeBlockRelation; import org.apache.carbondata.processing.merger.NodeMultiBlockRelation; import com.google.gson.Gson; -import org.apache.commons.lang3.StringUtils; public final class CarbonLoaderUtil { @@ -129,52 +122,6 @@ public final class CarbonLoaderUtil { } return true; } - public static void deletePartialLoadDataIfExist(CarbonLoadModel loadModel, - final boolean isCompactionFlow) throws IOException { - CarbonTable carbonTable = loadModel.getCarbonDataLoadSchema().getCarbonTable(); - String metaDataLocation = carbonTable.getMetaDataFilepath(); - final LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metaDataLocation); - CarbonTablePath carbonTablePath = CarbonStorePath - .getCarbonTablePath(loadModel.getTablePath(), carbonTable.getCarbonTableIdentifier()); - - //delete folder which metadata no exist in tablestatus - for (int i = 0; i < carbonTable.getPartitionCount(); i++) { - final String partitionCount = i + ""; - String partitionPath = carbonTablePath.getPartitionDir(partitionCount); - FileType fileType = FileFactory.getFileType(partitionPath); - if (FileFactory.isFileExist(partitionPath, fileType)) { - CarbonFile carbonFile = FileFactory.getCarbonFile(partitionPath, fileType); - CarbonFile[] listFiles = carbonFile.listFiles(new CarbonFileFilter() { - @Override public boolean accept(CarbonFile path) { - String segmentId = - CarbonTablePath.DataPathUtil.getSegmentId(path.getAbsolutePath() + "/dummy"); - boolean found = false; - for (int j = 0; j < details.length; j++) { - if (details[j].getLoadName().equals(segmentId) && details[j].getPartitionCount() - .equals(partitionCount)) { - found = true; - break; - } - } - return !found; - } - }); - for (int k = 0; k < listFiles.length; k++) { - String segmentId = - CarbonTablePath.DataPathUtil.getSegmentId(listFiles[k].getAbsolutePath() + "/dummy"); - if (isCompactionFlow) { - if (segmentId.contains(".")) { - deleteStorePath(listFiles[k].getAbsolutePath()); - } - } else { - if (!segmentId.contains(".")) { - deleteStorePath(listFiles[k].getAbsolutePath()); - } - } - } - } - } - } public static void deleteStorePath(String path) { try { @@ -188,53 +135,6 @@ public final class CarbonLoaderUtil { } } - - /** - * This method will delete the local data load folder location after data load is complete - * - * @param loadModel - */ - public static void deleteLocalDataLoadFolderLocation(CarbonLoadModel loadModel, - boolean isCompactionFlow, boolean isAltPartitionFlow) { - String databaseName = loadModel.getDatabaseName(); - String tableName = loadModel.getTableName(); - String tempLocationKey = CarbonDataProcessorUtil - .getTempStoreLocationKey(databaseName, tableName, loadModel.getSegmentId(), - loadModel.getTaskNo(), isCompactionFlow, isAltPartitionFlow); - // form local store location - final String localStoreLocations = CarbonProperties.getInstance().getProperty(tempLocationKey); - if (localStoreLocations == null) { - throw new RuntimeException("Store location not set for the key " + tempLocationKey); - } - // submit local folder clean up in another thread so that main thread execution is not blocked - ExecutorService localFolderDeletionService = Executors - .newFixedThreadPool(1, new CarbonThreadFactory("LocalFolderDeletionPool:" + tableName)); - try { - localFolderDeletionService.submit(new Callable<Void>() { - @Override public Void call() throws Exception { - long startTime = System.currentTimeMillis(); - String[] locArray = StringUtils.split(localStoreLocations, File.pathSeparator); - for (String loc : locArray) { - try { - CarbonUtil.deleteFoldersAndFiles(new File(loc)); - } catch (IOException | InterruptedException e) { - LOGGER.error(e, - "Failed to delete local data load folder location: " + loc); - } - } - LOGGER.info("Deleted the local store location: " + localStoreLocations - + " : Time taken: " + (System.currentTimeMillis() - startTime)); - return null; - } - }); - } finally { - if (null != localFolderDeletionService) { - localFolderDeletionService.shutdown(); - } - } - - } - /** * This API will write the load level metadata for the loadmanagement module inorder to * manage the load and query execution management smoothly.
