[CARBONDATA-1967][PARTITION] Fix autocompaction and auto merge index in partition tables
Auto compaction is not working in case of the partition table and merge index files are merging always even though it is configured as false. Solution: Auto compaction code is added after finishing of partition loading. And also merge index configuration is checked before going for index merging. This closes #1748 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/d7852abe Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/d7852abe Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/d7852abe Branch: refs/heads/branch-1.3 Commit: d7852abeaa8915055bbad92e250662320b090bbc Parents: 45787fb Author: ravipesala <[email protected]> Authored: Tue Jan 2 18:26:15 2018 +0530 Committer: Jacky Li <[email protected]> Committed: Thu Jan 4 16:06:40 2018 +0800 ---------------------------------------------------------------------- .../core/util/path/CarbonTablePath.java | 11 ++++++ .../hadoop/api/CarbonOutputCommitter.java | 22 +++++++++++- ...andardPartitionTableCompactionTestCase.scala | 33 ++++++++++++++++++ .../StandardPartitionTableLoadingTestCase.scala | 29 ++++++++++++++++ .../spark/rdd/CarbonDataRDDFactory.scala | 2 +- .../management/CarbonLoadDataCommand.scala | 35 +++++++++++++++----- .../processing/util/DeleteLoadFolders.java | 3 +- 7 files changed, 124 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7852abe/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java index c33c0a0..9e66657 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java +++ b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java @@ -111,6 +111,17 @@ public class CarbonTablePath extends Path { } /** + * Return true if the fileNameWithPath ends with partition map file extension name + */ + public static boolean isPartitionMapFile(String fileNameWithPath) { + int pos = fileNameWithPath.lastIndexOf('.'); + if (pos != -1) { + return fileNameWithPath.substring(pos).startsWith(PARTITION_MAP_EXT); + } + return false; + } + + /** * check if it is carbon index file matching extension * * @param fileNameWithPath http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7852abe/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java index 6f5d0e4..97d5a7f 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java @@ -24,12 +24,14 @@ import java.util.Set; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.metadata.PartitionMapFileStore; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; import org.apache.carbondata.core.mutate.CarbonUpdateUtil; import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; import org.apache.carbondata.core.statusmanager.SegmentStatus; import org.apache.carbondata.core.statusmanager.SegmentStatusManager; +import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.carbondata.core.writer.CarbonIndexFileMergeWriter; import org.apache.carbondata.processing.loading.model.CarbonLoadModel; @@ -95,7 +97,7 @@ public class CarbonOutputCommitter extends FileOutputCommitter { .addDataIndexSizeIntoMetaEntry(newMetaEntry, loadModel.getSegmentId(), carbonTable); if (segmentSize > 0) { CarbonLoaderUtil.recordNewLoadMetadata(newMetaEntry, loadModel, false, overwriteSet); - new CarbonIndexFileMergeWriter().mergeCarbonIndexFilesOfSegment(segmentPath); + mergeCarbonIndexFiles(segmentPath); String updateTime = context.getConfiguration().get(CarbonTableOutputFormat.UPADTE_TIMESTAMP, null); if (updateTime != null) { @@ -111,6 +113,24 @@ public class CarbonOutputCommitter extends FileOutputCommitter { } /** + * Merge index files to a new single file. + */ + private void mergeCarbonIndexFiles(String segmentPath) throws IOException { + boolean mergeIndex = false; + try { + mergeIndex = Boolean.parseBoolean(CarbonProperties.getInstance().getProperty( + CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, + CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT)); + } catch (Exception e) { + mergeIndex = Boolean.parseBoolean( + CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT); + } + if (mergeIndex) { + new CarbonIndexFileMergeWriter().mergeCarbonIndexFilesOfSegment(segmentPath); + } + } + + /** * Update the tablestatus as fail if any fail happens. * * @param context http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7852abe/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCompactionTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCompactionTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCompactionTestCase.scala index 9056fea..3e6cd26 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCompactionTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCompactionTestCase.scala @@ -161,6 +161,38 @@ class StandardPartitionTableCompactionTestCase extends QueryTest with BeforeAndA checkAnswer(sql(s"""select count(*) from staticpartition where deptname='finance'"""), p2) } + test("enable auto compaction for partition table"){ + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "true") + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD, "4,2") + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.PRESERVE_LATEST_SEGMENTS_NUMBER, "0") + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.DAYS_ALLOWED_TO_COMPACT, "0") + + sql( + """ + | CREATE TABLE staticpartitioncompaction (empno int, doj Timestamp, + | workgroupcategoryname String, deptno int, + | projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int, + | utilization int,salary int,workgroupcategory int, empname String, designation String) + | PARTITIONED BY (deptname String) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + + for (i <- 0 until 4) { + sql(s"""insert into staticpartitioncompaction PARTITION(deptname='software') select empno,doj,workgroupcategoryname,deptno,projectcode,projectjoindate,projectenddate,attendance,utilization,salary,workgroupcategory,empname,designation from originTable""") + } + sql("CLEAN FILES FOR TABLE staticpartitioncompaction").show() + var segments = sql("SHOW SEGMENTS FOR TABLE staticpartitioncompaction") + var segmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) } + assert(segmentSequenceIds.size==1) + assert(segmentSequenceIds.contains("0.1")) + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "false") + } + override def afterAll = { dropTable } @@ -173,6 +205,7 @@ class StandardPartitionTableCompactionTestCase extends QueryTest with BeforeAndA sql("drop table if exists partitionthree") sql("drop table if exists partitionmajor") sql("drop table if exists staticpartition") + sql("drop table if exists staticpartitioncompaction") } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7852abe/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala index bd4252f..b0afb0f 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala @@ -272,6 +272,34 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte } } + test("merge carbon index disable data loading for partition table for three partition column") { + CarbonProperties.getInstance.addProperty( + CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "false") + sql( + """ + | CREATE TABLE mergeindexpartitionthree (empno int, doj Timestamp, + | workgroupcategoryname String, deptno int, deptname String, + | projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int, + | utilization int,salary int) + | PARTITIONED BY (workgroupcategory int, empname String, designation String) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE mergeindexpartitionthree OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + + val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_mergeindexpartitionthree") + val tablePath = new CarbonTablePath(carbonTable.getCarbonTableIdentifier, + carbonTable.getTablePath) + val segmentDir = tablePath.getCarbonDataDirectoryPath("0", "0") + val carbonFile = FileFactory.getCarbonFile(segmentDir, FileFactory.getFileType(segmentDir)) + val files = carbonFile.listFiles(new CarbonFileFilter { + override def accept(file: CarbonFile): Boolean = CarbonTablePath.isCarbonIndexFile(file.getName) + }) + CarbonProperties.getInstance.addProperty( + CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, + CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT) + assert(files.length == 10) + } + test("load static partition table for one static partition column with load syntax issue") { sql( """ @@ -307,6 +335,7 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte sql("drop table if exists loadstaticpartitionone") sql("drop table if exists loadstaticpartitiononeoverwrite") sql("drop table if exists streamingpartitionedtable") + sql("drop table if exists mergeindexpartitionthree") sql("drop table if exists loadstaticpartitiononeissue") sql("drop table if exists loadpartitionwithspecialchar") } http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7852abe/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala index 8a8338e..18e9181 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala @@ -712,7 +712,7 @@ object CarbonDataRDDFactory { /** * Trigger compaction after data load */ - private def handleSegmentMerging( + def handleSegmentMerging( sqlContext: SQLContext, carbonLoadModel: CarbonLoadModel, carbonTable: CarbonTable, http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7852abe/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala index 55c8769..383f272 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala @@ -373,7 +373,12 @@ case class CarbonLoadDataCommand( if (carbonTable.isHivePartitionTable) { try { - loadDataWithPartition(sparkSession, carbonLoadModel, hadoopConf, loadDataFrame) + loadDataWithPartition( + sparkSession, + carbonLoadModel, + hadoopConf, + loadDataFrame, + operationContext) } finally { server match { case Some(dictServer) => @@ -428,7 +433,12 @@ case class CarbonLoadDataCommand( dictionaryDataFrame) } if (table.isHivePartitionTable) { - loadDataWithPartition(sparkSession, carbonLoadModel, hadoopConf, loadDataFrame) + loadDataWithPartition( + sparkSession, + carbonLoadModel, + hadoopConf, + loadDataFrame, + operationContext) } else { CarbonDataRDDFactory.loadCarbonData( sparkSession.sqlContext, @@ -448,16 +458,12 @@ case class CarbonLoadDataCommand( * Loads the data in a hive partition way. This method uses InsertIntoTable command to load data * into partitoned data. The table relation would be converted to HadoopFSRelation to let spark * handling the partitioning. - * @param sparkSession - * @param carbonLoadModel - * @param hadoopConf - * @param dataFrame - * @return */ private def loadDataWithPartition(sparkSession: SparkSession, carbonLoadModel: CarbonLoadModel, hadoopConf: Configuration, - dataFrame: Option[DataFrame]) = { + dataFrame: Option[DataFrame], + operationContext: OperationContext) = { val table = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable val identifier = TableIdentifier(table.getTableName, Some(table.getDatabaseName)) val logicalPlan = @@ -640,6 +646,19 @@ case class CarbonLoadDataCommand( } else { Dataset.ofRows(sparkSession, convertedPlan) } + try { + // Trigger auto compaction + CarbonDataRDDFactory.handleSegmentMerging( + sparkSession.sqlContext, + carbonLoadModel, + table, + operationContext) + } catch { + case e: Exception => + throw new Exception( + "Dataload is success. Auto-Compaction has failed. Please check logs.", + e) + } } private def convertToLogicalRelation( http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7852abe/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java b/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java index a6bbe48..845f629 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java +++ b/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java @@ -65,7 +65,8 @@ public final class DeleteLoadFolders { @Override public boolean accept(CarbonFile file) { return (CarbonTablePath.isCarbonDataFile(file.getName()) - || CarbonTablePath.isCarbonIndexFile(file.getName())); + || CarbonTablePath.isCarbonIndexFile(file.getName()) + || CarbonTablePath.isPartitionMapFile(file.getName())); } });
