Repository: carbondata Updated Branches: refs/heads/master 0c0f90ca7 -> f1c6dddec
[CARBONDATA-1698]Adding support for table level compaction configuration Adding support for table level compaction configuration This closes #1575 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/f1c6ddde Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/f1c6ddde Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/f1c6ddde Branch: refs/heads/master Commit: f1c6dddec8d7057dfc1a1f70555faec4c6184a52 Parents: 0c0f90c Author: Jin Zhou <[email protected]> Authored: Fri Dec 15 19:53:41 2017 +0800 Committer: chenliang613 <[email protected]> Committed: Thu Dec 21 17:07:44 2017 +0800 ---------------------------------------------------------------------- .../core/constants/CarbonCommonConstants.java | 14 +- .../carbondata/core/util/CarbonProperties.java | 10 +- docs/data-management-on-carbondata.md | 27 +- .../TableLevelCompactionOptionExample.scala | 106 +++++++ .../TestCreateTableWithCompactionOptions.scala | 198 ++++++++++++++ ...CompactionSupportGlobalSortBigFileTest.scala | 4 +- .../TableLevelCompactionOptionTest.scala | 274 +++++++++++++++++++ .../carbondata/spark/util/CommonUtil.scala | 119 ++++++++ .../spark/sql/catalyst/CarbonDDLSqlParser.scala | 2 + .../spark/rdd/CarbonDataRDDFactory.scala | 7 +- .../CarbonAlterTableCompactionCommand.scala | 3 +- .../table/CarbonDescribeFormattedCommand.scala | 32 +++ .../processing/merger/CarbonDataMergerUtil.java | 107 +++++--- 13 files changed, 854 insertions(+), 49 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/f1c6ddde/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java index 9534099..71ab668 100644 --- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java +++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java @@ -789,12 +789,12 @@ public final class CarbonCommonConstants { * Size of Major Compaction in MBs */ @CarbonProperty - public static final String MAJOR_COMPACTION_SIZE = "carbon.major.compaction.size"; + public static final String CARBON_MAJOR_COMPACTION_SIZE = "carbon.major.compaction.size"; /** * By default size of major compaction in MBs. */ - public static final String DEFAULT_MAJOR_COMPACTION_SIZE = "1024"; + public static final String DEFAULT_CARBON_MAJOR_COMPACTION_SIZE = "1024"; /** * This property is used to tell how many segments to be preserved from merging. @@ -873,6 +873,16 @@ public final class CarbonCommonConstants { public static final String TABLE_BLOCKSIZE = "table_blocksize"; // set in column level to disable inverted index public static final String NO_INVERTED_INDEX = "no_inverted_index"; + // table property name of major compaction size + public static final String TABLE_MAJOR_COMPACTION_SIZE = "major_compaction_size"; + // table property name of auto load merge + public static final String TABLE_AUTO_LOAD_MERGE = "auto_load_merge"; + // table property name of compaction level threshold + public static final String TABLE_COMPACTION_LEVEL_THRESHOLD = "compaction_level_threshold"; + // table property name of preserve segments numbers while compaction + public static final String TABLE_COMPACTION_PRESERVE_SEGMENTS = "compaction_preserve_segments"; + // table property name of allowed compaction days while compaction + public static final String TABLE_ALLOWED_COMPACTION_DAYS = "allowed_compaction_days"; /** * 16 mb size http://git-wip-us.apache.org/repos/asf/carbondata/blob/f1c6ddde/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java index 11aea99..7b80a8b 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java +++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java @@ -620,10 +620,12 @@ public final class CarbonProperties { public long getMajorCompactionSize() { long compactionSize; try { - compactionSize = Long.parseLong(getProperty(CarbonCommonConstants.MAJOR_COMPACTION_SIZE, - CarbonCommonConstants.DEFAULT_MAJOR_COMPACTION_SIZE)); + compactionSize = Long.parseLong(getProperty( + CarbonCommonConstants.CARBON_MAJOR_COMPACTION_SIZE, + CarbonCommonConstants.DEFAULT_CARBON_MAJOR_COMPACTION_SIZE)); } catch (NumberFormatException e) { - compactionSize = Long.parseLong(CarbonCommonConstants.DEFAULT_MAJOR_COMPACTION_SIZE); + compactionSize = Long.parseLong( + CarbonCommonConstants.DEFAULT_CARBON_MAJOR_COMPACTION_SIZE); } return compactionSize; } @@ -684,7 +686,7 @@ public final class CarbonProperties { * @param commaSeparatedLevels the string format value before separating * @return the int array format value after separating by comma */ - private int[] getIntArray(String commaSeparatedLevels) { + public int[] getIntArray(String commaSeparatedLevels) { String[] levels = commaSeparatedLevels.split(","); int[] compactionSize = new int[levels.length]; int i = 0; http://git-wip-us.apache.org/repos/asf/carbondata/blob/f1c6ddde/docs/data-management-on-carbondata.md ---------------------------------------------------------------------- diff --git a/docs/data-management-on-carbondata.md b/docs/data-management-on-carbondata.md index 7f6e3b9..94820ad 100644 --- a/docs/data-management-on-carbondata.md +++ b/docs/data-management-on-carbondata.md @@ -92,6 +92,26 @@ This tutorial is going to introduce all commands and data operations on CarbonDa ``` NOTE: 512 or 512M both are accepted. + - **Table Compaction Configuration** + + These properties are table level compaction configurations, if not specified, system level configurations in carbon.properties will be used. + Following are 5 configurations: + + * MAJOR_COMPACTION_SIZE: same meaning with carbon.major.compaction.size, size in MB. + * AUTO_LOAD_MERGE: same meaning with carbon.enable.auto.load.merge. + * COMPACTION_LEVEL_THRESHOLD: same meaning with carbon.compaction.level.threshold. + * COMPACTION_PRESERVE_SEGMENTS: same meaning with carbon.numberof.preserve.segments. + * ALLOWED_COMPACTION_DAYS: same meaning with carbon.allowed.compaction.days. + + ``` + TBLPROPERTIES ('MAJOR_COMPACTION_SIZE'='2048', + 'AUTO_LOAD_MERGE'='true', + 'COMPACTION_LEVEL_THRESHOLD'='5,6', + 'COMPACTION_PRESERVE_SEGMENTS'='10', + 'ALLOWED_COMPACTION_DAYS'='5') + ``` + + ### Example: ``` @@ -109,7 +129,12 @@ This tutorial is going to introduce all commands and data operations on CarbonDa 'NO_INVERTED_INDEX'='productBatch', 'SORT_COLUMNS'='productName,storeCity', 'SORT_SCOPE'='NO_SORT', - 'TABLE_BLOCKSIZE'='512') + 'TABLE_BLOCKSIZE'='512', + 'MAJOR_COMPACTION_SIZE'='2048', + 'AUTO_LOAD_MERGE'='true', + 'COMPACTION_LEVEL_THRESHOLD'='5,6', + 'COMPACTION_PRESERVE_SEGMENTS'='10', + 'ALLOWED_COMPACTION_DAYS'='5') ``` ## TABLE MANAGEMENT http://git-wip-us.apache.org/repos/asf/carbondata/blob/f1c6ddde/examples/spark2/src/main/scala/org/apache/carbondata/examples/TableLevelCompactionOptionExample.scala ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/TableLevelCompactionOptionExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/TableLevelCompactionOptionExample.scala new file mode 100644 index 0000000..8b34a8b --- /dev/null +++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/TableLevelCompactionOptionExample.scala @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.examples + +import java.io.File + +object TableLevelCompactionOptionExample { + + def main(args: Array[String]) { + val spark = ExampleUtils.createCarbonSession("DataManagementExample") + spark.sparkContext.setLogLevel("WARN") + + spark.sql("DROP TABLE IF EXISTS carbon_table") + + // Create table with table level compaction options + // while loading and compacting, table level compaction options will be used instead of + // options specified in carbon.properties + spark.sql( + s""" + | CREATE TABLE IF NOT EXISTS carbon_table( + | ID Int, + | date Date, + | country String, + | name String, + | phonetype String, + | serialname String, + | salary Int, + | floatField float + | ) STORED BY 'carbondata' + | TBLPROPERTIES ( + | 'MAJOR_COMPACTION_SIZE'='1024', + | 'AUTO_LOAD_MERGE'='true', + | 'COMPACTION_LEVEL_THRESHOLD'='3,2', + | 'COMPACTION_PRESERVE_SEGMENTS'='2', + | 'ALLOWED_COMPACTION_DAYS'='1') + """.stripMargin) + + val rootPath = new File(this.getClass.getResource("/").getPath + + "../../../..").getCanonicalPath + val path = s"$rootPath/examples/spark2/src/main/resources/dataSample.csv" + + // load 6 segments + // scalastyle:off + (1 to 6).foreach(_ => spark.sql( + s""" + | LOAD DATA LOCAL INPATH '$path' + | INTO TABLE carbon_table + | OPTIONS('HEADER'='true') + """.stripMargin)) + // scalastyle:on + + // show all segments, existing segments are 0.1,3,4,5, compacted segments are 0,1,2 + // because of 2 segments are preserved, only one level-1 minor compaction is triggered + spark.sql("SHOW SEGMENTS FOR TABLE carbon_table").show() + + // load another 2 segments + // scalastyle:off + (1 to 2).foreach(_ => spark.sql( + s""" + | LOAD DATA LOCAL INPATH '$path' + | INTO TABLE carbon_table + | OPTIONS('HEADER'='true') + """.stripMargin)) + // scalastyle:on + + // show all segments, existing segments will be 0.2,6,7, + // compacted segments are 0,1,2,3,4,5,0.1,3.1 + spark.sql("SHOW SEGMENTS FOR TABLE carbon_table").show() + + // load another 2 segments + // scalastyle:off + (1 to 2).foreach(_ => spark.sql( + s""" + | LOAD DATA LOCAL INPATH '$path' + | INTO TABLE carbon_table + | OPTIONS('HEADER'='true') + """.stripMargin)) + // scalastyle:on + + // do major compaction, there will be 3 segment left(2 preserved segments) + spark.sql("ALTER TABLE carbon_table COMPACT 'MAJOR'") + spark.sql("CLEAN FILES FOR TABLE carbon_table") + spark.sql("SHOW SEGMENTS FOR TABLE carbon_table").show() + + // Drop table + spark.sql("DROP TABLE IF EXISTS carbon_table") + + spark.stop() + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/f1c6ddde/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableWithCompactionOptions.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableWithCompactionOptions.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableWithCompactionOptions.scala new file mode 100644 index 0000000..d7c05ce --- /dev/null +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableWithCompactionOptions.scala @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the"License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an"AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.testsuite.createTable + +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterAll + +class TestCreateTableWithCompactionOptions extends QueryTest with BeforeAndAfterAll { + + val tableWithCompactionOptions = "tableWithCompactionOptions" + val tableWithInvalidMajorCompactionSize = "tableWithInvalidMajorCompactionSize" + val tableWithInvalidAutoLoadMerge = "tableWithInvalidAutoLoadMerge" + val tableWithInvalidLevelThreshold = "tableWithInvalidLevelThreshold" + val tableWithInvalidPreserveSegments = "tableWithInvalidPreserveSegments" + val tableWithInvalidAllowedDays = "tableWithInvalidAllowedDays" + val tableWithoutCompactionOptions = "tableWithoutCompactionOptions" + + override def beforeAll: Unit = { + cleanTables() + } + + override def afterAll: Unit = { + cleanTables() + } + + private def cleanTables(): Unit = { + sql("use default") + sql(s"DROP TABLE IF EXISTS $tableWithCompactionOptions") + sql(s"DROP TABLE IF EXISTS $tableWithInvalidMajorCompactionSize") + sql(s"DROP TABLE IF EXISTS $tableWithInvalidAutoLoadMerge") + sql(s"DROP TABLE IF EXISTS $tableWithInvalidLevelThreshold") + sql(s"DROP TABLE IF EXISTS $tableWithInvalidPreserveSegments") + sql(s"DROP TABLE IF EXISTS $tableWithInvalidAllowedDays") + sql(s"DROP TABLE IF EXISTS $tableWithoutCompactionOptions") + } + + test("test create table with compaction options") { + sql( + s""" + | CREATE TABLE $tableWithCompactionOptions( + | intField INT, + | stringField STRING + | ) + | STORED BY 'carbondata' + | TBLPROPERTIES('MAJOR_COMPACTION_SIZE'='10240', + | 'AUTO_LOAD_MERGE'='true', + | 'COMPACTION_LEVEL_THRESHOLD'='5,6', + | 'COMPACTION_PRESERVE_SEGMENTS'='10', + | 'ALLOWED_COMPACTION_DAYS'='5') + """.stripMargin) + + val tableOptions = sql(s"DESCRIBE FORMATTED $tableWithCompactionOptions") + .collect().map(r => (r.getString(0).trim, r.getString(1).trim)).toMap + + assert(tableOptions.contains("MAJOR_COMPACTION_SIZE")) + assert(tableOptions.getOrElse("MAJOR_COMPACTION_SIZE","").equals("10240")) + assert(tableOptions.contains("AUTO_LOAD_MERGE")) + assert(tableOptions.getOrElse("AUTO_LOAD_MERGE","").equals("true")) + assert(tableOptions.contains("COMPACTION_LEVEL_THRESHOLD")) + assert(tableOptions.getOrElse("COMPACTION_LEVEL_THRESHOLD","").equals("5,6")) + assert(tableOptions.contains("COMPACTION_PRESERVE_SEGMENTS")) + assert(tableOptions.getOrElse("COMPACTION_PRESERVE_SEGMENTS","").equals("10")) + assert(tableOptions.contains("ALLOWED_COMPACTION_DAYS")) + assert(tableOptions.getOrElse("ALLOWED_COMPACTION_DAYS","").equals("5")) + } + + test("test create table with invalid major compaction size") { + val exception: Exception = intercept[Exception] { + sql( + s""" + |CREATE TABLE $tableWithInvalidMajorCompactionSize + |( + |intField INT, + |stringField STRING + |) + |STORED BY 'carbondata' + |TBLPROPERTIES('MAJOR_COMPACTION_SIZE'='abc') + """.stripMargin) + } + assert(exception.getMessage.contains( + "Invalid major_compaction_size value found: abc, " + + "only int value greater than 0 is supported.")) + } + + test("test create table with invalid auto load merge") { + val exception: Exception = intercept[Exception] { + sql( + s""" + |CREATE TABLE $tableWithInvalidAutoLoadMerge + |( + |intField INT, + |stringField STRING + |) + |STORED BY 'carbondata' + |TBLPROPERTIES('AUTO_LOAD_MERGE'='123') + """.stripMargin) + } + assert(exception.getMessage.contains( + "Invalid auto_load_merge value found: 123, only true|false is supported.")) + } + + test("test create table with invalid level threshold") { + val exception: Exception = intercept[Exception] { + sql( + s""" + |CREATE TABLE $tableWithInvalidLevelThreshold + |( + |intField INT, + |stringField STRING + |) + |STORED BY 'carbondata' + |TBLPROPERTIES( + |'AUTO_LOAD_MERGE'='true', + |'COMPACTION_LEVEL_THRESHOLD'='x,6') + """.stripMargin) + } + assert(exception.getMessage.contains( + "Invalid compaction_level_threshold value found: x,6, " + + "only int values separated by comma and between 0 and 100 are supported.")) + } + + test("test create table with invalid preserve segments number") { + val exception: Exception = intercept[Exception] { + sql( + s""" + |CREATE TABLE $tableWithInvalidPreserveSegments + |( + |intField INT, + |stringField STRING + |) + |STORED BY 'carbondata' + |TBLPROPERTIES( + |'AUTO_LOAD_MERGE'='true', + |'COMPACTION_LEVEL_THRESHOLD'='4,6', + |'COMPACTION_PRESERVE_SEGMENTS'='abc') + """.stripMargin) + } + assert(exception.getMessage.contains( + "Invalid compaction_preserve_segments value found: abc, " + + "only int value between 0 and 100 is supported.")) + } + + test("test create table with invalid allowed days") { + val exception: Exception = intercept[Exception] { + sql( + s""" + |CREATE TABLE $tableWithInvalidAllowedDays + |( + |intField INT, + |stringField STRING + |) + |STORED BY 'carbondata' + |TBLPROPERTIES( + |'AUTO_LOAD_MERGE'='true', + |'ALLOWED_COMPACTION_DAYS'='abc') + """.stripMargin) + } + assert(exception.getMessage.contains( + "Invalid allowed_compaction_days value found: abc, " + + "only int value between 0 and 100 is supported.")) + } + + test("test create table without compaction options") { + sql( + s""" + | CREATE TABLE $tableWithoutCompactionOptions( + | intField INT, + | stringField STRING + | ) + | STORED BY 'carbondata' + """.stripMargin) + + val tableOptions = sql(s"DESCRIBE FORMATTED $tableWithoutCompactionOptions") + .collect().map(r => (r.getString(0).trim, r.getString(1).trim)).toMap + + assert(!tableOptions.contains("MAJOR_COMPACTION_SIZE")) + assert(!tableOptions.contains("AUTO_LOAD_MERGE")) + assert(!tableOptions.contains("COMPACTION_LEVEL_THRESHOLD")) + assert(!tableOptions.contains("COMPACTION_PRESERVE_SEGMENTS")) + assert(!tableOptions.contains("ALLOWED_COMPACTION_DAYS")) + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/f1c6ddde/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortBigFileTest.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortBigFileTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortBigFileTest.scala index 6d79f6c..c522c1e 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortBigFileTest.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortBigFileTest.scala @@ -51,7 +51,7 @@ class CompactionSupportGlobalSortBigFileTest extends QueryTest with BeforeAndAft CompactionSupportGlobalSortBigFileTest.deleteFile(file3) CompactionSupportGlobalSortBigFileTest.deleteFile(file4) CompactionSupportGlobalSortBigFileTest.deleteFile(file5) - resetConf(CarbonCommonConstants.DEFAULT_MAJOR_COMPACTION_SIZE) + resetConf(CarbonCommonConstants.DEFAULT_CARBON_MAJOR_COMPACTION_SIZE) } override def beforeEach { @@ -104,7 +104,7 @@ class CompactionSupportGlobalSortBigFileTest extends QueryTest with BeforeAndAft private def resetConf(size:String) { CarbonProperties.getInstance() - .addProperty(CarbonCommonConstants.MAJOR_COMPACTION_SIZE, size) + .addProperty(CarbonCommonConstants.CARBON_MAJOR_COMPACTION_SIZE, size) } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/f1c6ddde/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/TableLevelCompactionOptionTest.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/TableLevelCompactionOptionTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/TableLevelCompactionOptionTest.scala new file mode 100644 index 0000000..5445779 --- /dev/null +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/TableLevelCompactionOptionTest.scala @@ -0,0 +1,274 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the"License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an"AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.testsuite.datacompaction + +import java.io.{File, PrintWriter} + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} + +class TableLevelCompactionOptionTest extends QueryTest + with BeforeAndAfterEach with BeforeAndAfterAll { + + val tempFilePath: String = s"$resourcesPath/temp/tableLevelCompactionParaTest.csv" + val sampleFilePath: String = resourcesPath + "/sample.csv" + + override def beforeEach { + cleanTable() + } + + override def afterEach { + resetConf() + cleanTable() + } + + private def resetConf() ={ + CarbonProperties.getInstance().addProperty( + CarbonCommonConstants.CARBON_MAJOR_COMPACTION_SIZE, + CarbonCommonConstants.DEFAULT_CARBON_MAJOR_COMPACTION_SIZE) + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, + CarbonCommonConstants.DEFAULT_ENABLE_AUTO_LOAD_MERGE) + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD, + CarbonCommonConstants.DEFAULT_SEGMENT_LEVEL_THRESHOLD) + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.PRESERVE_LATEST_SEGMENTS_NUMBER, + CarbonCommonConstants.DEFAULT_PRESERVE_LATEST_SEGMENTS_NUMBER) + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.DAYS_ALLOWED_TO_COMPACT, + CarbonCommonConstants.DEFAULT_DAYS_ALLOWED_TO_COMPACT) + } + + private def cleanTable() = { + deleteTempFile() + sql("DROP TABLE IF EXISTS carbon_table") + } + + private def generateTempFile() = { + val writer = new PrintWriter(new File(tempFilePath)) + try { + writer.println("id,name,city,age") + val lines = + s"""|1,david,shenzhen,31 + |2,eason,shenzhen,27 + |3,jarry,wuhan,35 + |3,jarry,Bangalore,35 + |4,kunal,Delhi,26 + |4,vishal,Bangalore,29""".stripMargin + for (i <- 0 until 250000) { + writer.println(lines) + } + writer.flush() + } finally { + if (writer != null) writer.close() + } + } + + private def deleteTempFile() = { + val file = new File(tempFilePath) + if (file.exists()) { + file.delete() + } + } + + test("MAJOR_COMPACTION_SIZE, use system level configuration"){ + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_MAJOR_COMPACTION_SIZE, "10") + + // generate a temp file which is larger than 1M but smaller than 5M + generateTempFile() + + sql( + """ + |CREATE TABLE carbon_table + |(id INT, name STRING, city STRING, age INT) + |STORED BY 'org.apache.carbondata.format' + |TBLPROPERTIES('SORT_COLUMNS'='city,name') + """.stripMargin) + + for (i <- 0 until 2) { + sql(s"LOAD DATA LOCAL INPATH '$tempFilePath' INTO TABLE carbon_table") + } + + sql("ALTER TABLE carbon_table COMPACT 'MAJOR'") + sql("CLEAN FILES FOR TABLE carbon_table") + + val segments = sql("SHOW SEGMENTS FOR TABLE carbon_table") + val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) } + assert(!SegmentSequenceIds.contains("0")) + assert(!SegmentSequenceIds.contains("1")) + assert(SegmentSequenceIds.contains("0.1")) + + } + + test("MAJOR_COMPACTION_SIZE, use table level configuration") { + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_MAJOR_COMPACTION_SIZE, "10") + + // generate a temp file which is larger than 1M but smaller than 5M + generateTempFile() + + sql( + """ + |CREATE TABLE carbon_table + |(id INT, name STRING, city STRING, age INT) + |STORED BY 'org.apache.carbondata.format' + |TBLPROPERTIES('SORT_COLUMNS'='city,name', + |'MAJOR_COMPACTION_SIZE'='1') + """.stripMargin) + + for (i <- 0 until 2) { + sql(s"LOAD DATA LOCAL INPATH '$tempFilePath' INTO TABLE carbon_table") + } + + // each segment is larger than 1M, so no segments will be compacted + sql("ALTER TABLE carbon_table COMPACT 'MAJOR'") + sql("CLEAN FILES FOR TABLE carbon_table") + + val segments = sql("SHOW SEGMENTS FOR TABLE carbon_table") + val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) } + assert(SegmentSequenceIds.contains("0")) + assert(SegmentSequenceIds.contains("1")) + assert(!SegmentSequenceIds.contains("0.1")) + + CarbonProperties.getInstance().addProperty( + CarbonCommonConstants.CARBON_MAJOR_COMPACTION_SIZE, + CarbonCommonConstants.DEFAULT_CARBON_MAJOR_COMPACTION_SIZE) + } + + test("ENABLE_AUTO_LOAD_MERGE: true, use system level configuration"){ + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "true") + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD, "4,2") + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.PRESERVE_LATEST_SEGMENTS_NUMBER, "0") + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.DAYS_ALLOWED_TO_COMPACT, "0") + + sql( + """ + | CREATE TABLE carbon_table(id INT, name STRING, city STRING, age INT) + | STORED BY 'org.apache.carbondata.format' + | TBLPROPERTIES('SORT_COLUMNS'='city,name') + """.stripMargin) + + for (i <- 0 until 8) { + sql(s"LOAD DATA LOCAL INPATH '$sampleFilePath' INTO TABLE carbon_table") + } + sql("CLEAN FILES FOR TABLE carbon_table") + var segments = sql("SHOW SEGMENTS FOR TABLE carbon_table") + var segmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) } + assert(segmentSequenceIds.size==1) + assert(segmentSequenceIds.contains("0.2")) + } + + test("ENABLE_AUTO_LOAD_MERGE: false, use table level configuration"){ + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "true") + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD, "4,2") + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.PRESERVE_LATEST_SEGMENTS_NUMBER, "0") + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.DAYS_ALLOWED_TO_COMPACT, "0") + + sql( + """ + | CREATE TABLE carbon_table(id INT, name STRING, city STRING, age INT) + | STORED BY 'org.apache.carbondata.format' + | TBLPROPERTIES('SORT_COLUMNS'='city,name', + | 'AUTO_LOAD_MERGE'='false') + """.stripMargin) + + for (i <- 0 until 8) { + sql(s"LOAD DATA LOCAL INPATH '$sampleFilePath' INTO TABLE carbon_table") + } + // table level configuration: 'AUTO_LOAD_MERGE'='false', so no segments will be compacted + checkExistence(sql("SHOW SEGMENTS FOR TABLE carbon_table"), false, "Compacted") + var segments = sql("SHOW SEGMENTS FOR TABLE carbon_table") + var segmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) } + assert(segmentSequenceIds.size==8) + assert(!segmentSequenceIds.contains("0.1")) + assert(!segmentSequenceIds.contains("4.1")) + assert(!segmentSequenceIds.contains("0.2")) + } + + test("ENABLE_AUTO_LOAD_MERGE: true, use table level configuration") { + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "true") + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD, "4,2") + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.PRESERVE_LATEST_SEGMENTS_NUMBER, "0") + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.DAYS_ALLOWED_TO_COMPACT, "0") + + sql( + """ + | CREATE TABLE carbon_table(id INT, name STRING, city STRING, age INT) + | STORED BY 'org.apache.carbondata.format' + | TBLPROPERTIES('SORT_COLUMNS'='city,name', + | 'AUTO_LOAD_MERGE'='true', + | 'COMPACTION_LEVEL_THRESHOLD'='3,2', + | 'COMPACTION_PRESERVE_SEGMENTS'='2', + | 'TABLE_ALLOWED_COMPACTION_DAYS'='1') + """.stripMargin) + + // load 6 segments, the latest 2 segments are preserved + // only one level-1 minor compaction is triggered which compacts segment 0,1,2 to segment 0.1 + // seg0 \ + // seg1 -- compacted to seg0.1 + // seg2 / + // seg3 + // seg4 (preserved) + // seg5 (preserved) + for (i <- 0 until 6) { + sql(s"LOAD DATA LOCAL INPATH '$sampleFilePath' INTO TABLE carbon_table") + } + sql("CLEAN FILES FOR TABLE carbon_table") + var segments = sql("SHOW SEGMENTS FOR TABLE carbon_table") + var segmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) } + assert(segmentSequenceIds.contains("0.1")) + assert(!segmentSequenceIds.contains("3.1")) + assert(!segmentSequenceIds.contains("0.2")) + + // load another two segments, the latest 2 segments are preserved + // level-2 minor compaction is triggered which compacts segment 0,1,2,3,4,5 -> 0.2 + // seg0 \ + // seg1 -- compacted to seg0.1 \ + // seg2 / -- compacted to seg0.2 + // seg3 \ / + // seg4 -- compacted to seg3.1 + // seg5 / + // seg6 (preserved) + // seg7 (preserved) + for (i <- 0 until 2) { + sql(s"LOAD DATA LOCAL INPATH '$sampleFilePath' INTO TABLE carbon_table") + } + sql("CLEAN FILES FOR TABLE carbon_table") + segments = sql("SHOW SEGMENTS FOR TABLE carbon_table") + segmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) } + assert(segmentSequenceIds.contains("0.2")) + assert(!segmentSequenceIds.contains("0.1")) + assert(!segmentSequenceIds.contains("3.1")) + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/f1c6ddde/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala index c902537..ae30300 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala @@ -471,6 +471,125 @@ object CommonUtil { } /** + * validate table level properties for compaction + * + * @param tableProperties + */ + def validateTableLevelCompactionProperties(tableProperties: Map[String, String]): Unit = { + validateMajorCompactionSize(tableProperties) + validateAutoLoadMerge(tableProperties) + validateCompactionLevelThreshold(tableProperties) + validateCompactionPreserveSegmentsOrAllowedDays(tableProperties, + CarbonCommonConstants.TABLE_COMPACTION_PRESERVE_SEGMENTS) + validateCompactionPreserveSegmentsOrAllowedDays(tableProperties, + CarbonCommonConstants.TABLE_ALLOWED_COMPACTION_DAYS) + } + + /** + * This method will validate the major compaction size specified by the user + * the property is used while doing major compaction + * + * @param tableProperties + */ + def validateMajorCompactionSize(tableProperties: Map[String, String]): Unit = { + var majorCompactionSize: Integer = 0 + val tblPropName = CarbonCommonConstants.TABLE_MAJOR_COMPACTION_SIZE + if (tableProperties.get(tblPropName).isDefined) { + val majorCompactionSizeStr: String = + parsePropertyValueStringInMB(tableProperties(tblPropName)) + try { + majorCompactionSize = Integer.parseInt(majorCompactionSizeStr) + } catch { + case e: NumberFormatException => + throw new MalformedCarbonCommandException(s"Invalid $tblPropName value found: " + + s"$majorCompactionSizeStr, only int value greater than 0 is supported.") + } + if (majorCompactionSize < 0) { + throw new MalformedCarbonCommandException(s"Invalid $tblPropName value found: " + + s"$majorCompactionSizeStr, only int value greater than 0 is supported.") + } + tableProperties.put(tblPropName, majorCompactionSizeStr) + } + } + + /** + * This method will validate the auto merge load property specified by the user + * the property is used while doing minor compaction + * + * @param tableProperties + */ + def validateAutoLoadMerge(tableProperties: Map[String, String]): Unit = { + val tblPropName = CarbonCommonConstants.TABLE_AUTO_LOAD_MERGE + if (tableProperties.get(tblPropName).isDefined) { + val trimStr = tableProperties(tblPropName).trim + if (!trimStr.equalsIgnoreCase("true") && !trimStr.equalsIgnoreCase("false")) { + throw new MalformedCarbonCommandException(s"Invalid $tblPropName value found: " + + s"$trimStr, only true|false is supported.") + } + tableProperties.put(tblPropName, trimStr) + } + } + + /** + * This method will validate the compaction level threshold property specified by the user + * the property is used while doing minor compaction + * + * @param tableProperties + */ + def validateCompactionLevelThreshold(tableProperties: Map[String, String]): Unit = { + val tblPropName = CarbonCommonConstants.TABLE_COMPACTION_LEVEL_THRESHOLD + if (tableProperties.get(tblPropName).isDefined) { + val regularedStr = tableProperties(tblPropName).replace(" ", "") + try { + val levels: Array[String] = regularedStr.split(",") + val thresholds = regularedStr.split(",").map(levelThresholdStr => levelThresholdStr.toInt) + if (!thresholds.forall(t => t < 100 && t > 0)) { + throw new MalformedCarbonCommandException(s"Invalid $tblPropName value found: " + + s"$regularedStr, only int values separated by comma and between 0 " + + s"and 100 are supported.") + } + } + catch { + case e: NumberFormatException => + throw new MalformedCarbonCommandException(s"Invalid $tblPropName value found: " + + s"$regularedStr, only int values separated by comma and between 0 " + + s"and 100 are supported.") + } + tableProperties.put(tblPropName, regularedStr) + } + } + + /** + * This method will validate the compaction preserve segments property + * or compaction allowed days property + * + * @param tableProperties + * @param tblPropName + */ + def validateCompactionPreserveSegmentsOrAllowedDays(tableProperties: Map[String, String], + tblPropName: String): Unit = { + if (tblPropName.equals(CarbonCommonConstants.TABLE_COMPACTION_PRESERVE_SEGMENTS) || + tblPropName.equals(CarbonCommonConstants.TABLE_ALLOWED_COMPACTION_DAYS)) { + var propValue: Integer = 0 + if (tableProperties.get(tblPropName).isDefined) { + val propValueStr = tableProperties(tblPropName).trim + try { + propValue = Integer.parseInt(propValueStr) + } catch { + case e: NumberFormatException => + throw new MalformedCarbonCommandException(s"Invalid $tblPropName value found: " + + s"$propValueStr, only int value between 0 and 100 is supported.") + } + if (propValue < 0 || propValue > 100) { + throw new MalformedCarbonCommandException(s"Invalid $tblPropName value found: " + + s"$propValueStr, only int value between 0 and 100 is supported.") + } + tableProperties.put(tblPropName, propValueStr) + } + } + } + + /** * This method will validate the table block size specified by the user * * @param tableProperties http://git-wip-us.apache.org/repos/asf/carbondata/blob/f1c6ddde/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala index a39465f..6e9b36c 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala @@ -275,6 +275,8 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser { // validate the tableBlockSize from table properties CommonUtil.validateTableBlockSize(tableProperties) + // validate table level properties for compaction + CommonUtil.validateTableLevelCompactionProperties(tableProperties) TableModel( ifNotExistPresent, http://git-wip-us.apache.org/repos/asf/carbondata/blob/f1c6ddde/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala index 8c9700c..d288122 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala @@ -208,7 +208,8 @@ object CarbonDataRDDFactory { val newCarbonLoadModel = prepareCarbonLoadModel(table) - val compactionSize = CarbonDataMergerUtil.getCompactionSize(CompactionType.MAJOR) + val compactionSize = CarbonDataMergerUtil + .getCompactionSize(CompactionType.MAJOR, carbonLoadModel) val newcompactionModel = CompactionModel( compactionSize, @@ -720,8 +721,8 @@ object CarbonDataRDDFactory { operationContext: OperationContext ): Unit = { LOGGER.info(s"compaction need status is" + - s" ${ CarbonDataMergerUtil.checkIfAutoLoadMergingRequired() }") - if (CarbonDataMergerUtil.checkIfAutoLoadMergingRequired()) { + s" ${ CarbonDataMergerUtil.checkIfAutoLoadMergingRequired(carbonTable) }") + if (CarbonDataMergerUtil.checkIfAutoLoadMergingRequired(carbonTable)) { LOGGER.audit(s"Compaction request received for table " + s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") val compactionSize = 0 http://git-wip-us.apache.org/repos/asf/carbondata/blob/f1c6ddde/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala index cb0ff2d..517fbda 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala @@ -121,7 +121,8 @@ case class CarbonAlterTableCompactionCommand( operationContext: OperationContext): Unit = { val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName) val compactionType = CompactionType.valueOf(alterTableModel.compactionType.toUpperCase) - val compactionSize: Long = CarbonDataMergerUtil.getCompactionSize(compactionType) + val compactionSize: Long = CarbonDataMergerUtil + .getCompactionSize(compactionType, carbonLoadModel) if (CompactionType.IUD_UPDDEL_DELTA == compactionType) { if (alterTableModel.segmentUpdateStatusManager.isDefined) { carbonLoadModel.setSegmentUpdateStatusManager( http://git-wip-us.apache.org/repos/asf/carbondata/blob/f1c6ddde/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala index 807c8e5..ccdbd6e 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala @@ -107,6 +107,38 @@ private[sql] case class CarbonDescribeFormattedCommand( val isStreaming = carbonTable.getTableInfo.getFactTable.getTableProperties.asScala .getOrElse("streaming", "false") results ++= Seq(("Streaming", isStreaming, "")) + + val tblProps = carbonTable.getTableInfo.getFactTable.getTableProperties + results ++= Seq(("SORT_SCOPE", tblProps.getOrDefault("sort_scope", CarbonCommonConstants + .LOAD_SORT_SCOPE_DEFAULT), CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT)) + + // show table level compaction options + if (tblProps.containsKey(CarbonCommonConstants.TABLE_MAJOR_COMPACTION_SIZE)) { + results ++= Seq((CarbonCommonConstants.TABLE_MAJOR_COMPACTION_SIZE.toUpperCase + , tblProps.get(CarbonCommonConstants.TABLE_MAJOR_COMPACTION_SIZE), + CarbonCommonConstants.DEFAULT_CARBON_MAJOR_COMPACTION_SIZE)) + } + if (tblProps.containsKey(CarbonCommonConstants.TABLE_AUTO_LOAD_MERGE)) { + results ++= Seq((CarbonCommonConstants.TABLE_AUTO_LOAD_MERGE.toUpperCase, + tblProps.get(CarbonCommonConstants.TABLE_AUTO_LOAD_MERGE), + CarbonCommonConstants.DEFAULT_ENABLE_AUTO_LOAD_MERGE)) + } + if (tblProps.containsKey(CarbonCommonConstants.TABLE_COMPACTION_LEVEL_THRESHOLD)) { + results ++= Seq((CarbonCommonConstants.TABLE_COMPACTION_LEVEL_THRESHOLD.toUpperCase, + tblProps.get(CarbonCommonConstants.TABLE_COMPACTION_LEVEL_THRESHOLD), + CarbonCommonConstants.DEFAULT_SEGMENT_LEVEL_THRESHOLD)) + } + if (tblProps.containsKey(CarbonCommonConstants.TABLE_COMPACTION_PRESERVE_SEGMENTS)) { + results ++= Seq((CarbonCommonConstants.TABLE_COMPACTION_PRESERVE_SEGMENTS.toUpperCase, + tblProps.get(CarbonCommonConstants.TABLE_COMPACTION_PRESERVE_SEGMENTS), + CarbonCommonConstants.DEFAULT_PRESERVE_LATEST_SEGMENTS_NUMBER)) + } + if (tblProps.containsKey(CarbonCommonConstants.TABLE_ALLOWED_COMPACTION_DAYS)) { + results ++= Seq((CarbonCommonConstants.TABLE_ALLOWED_COMPACTION_DAYS.toUpperCase, + tblProps.get(CarbonCommonConstants.TABLE_ALLOWED_COMPACTION_DAYS), + CarbonCommonConstants.DEFAULT_DAYS_ALLOWED_TO_COMPACT)) + } + results ++= Seq(("", "", ""), ("##Detailed Column property", "", "")) if (colPropStr.length() > 0) { results ++= Seq((colPropStr, "", "")) http://git-wip-us.apache.org/repos/asf/carbondata/blob/f1c6ddde/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java index 3729b1d..e245927 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java @@ -21,15 +21,7 @@ import java.io.File; import java.io.IOException; import java.text.ParseException; import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Calendar; -import java.util.Collections; -import java.util.Comparator; -import java.util.Date; -import java.util.HashSet; -import java.util.List; -import java.util.Set; +import java.util.*; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; @@ -78,7 +70,8 @@ public final class CarbonDataMergerUtil { // carbon data file case. CarbonFile[] factFile = carbonFile.listFiles(new CarbonFileFilter() { - @Override public boolean accept(CarbonFile file) { + @Override + public boolean accept(CarbonFile file) { return CarbonTablePath.isCarbonDataFile(file.getName()); } }); @@ -93,17 +86,22 @@ public final class CarbonDataMergerUtil { /** * To check whether the merge property is enabled or not. * + * @Params carbonTable * @return */ - - public static boolean checkIfAutoLoadMergingRequired() { + public static boolean checkIfAutoLoadMergingRequired(CarbonTable carbonTable) { // load merge is not supported as per new store format // moving the load merge check in early to avoid unnecessary load listing causing IOException // check whether carbons segment merging operation is enabled or not. // default will be false. + Map<String, String> tblProps = carbonTable.getTableInfo().getFactTable().getTableProperties(); + String isLoadMergeEnabled = CarbonProperties.getInstance() - .getProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, - CarbonCommonConstants.DEFAULT_ENABLE_AUTO_LOAD_MERGE); + .getProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, + CarbonCommonConstants.DEFAULT_ENABLE_AUTO_LOAD_MERGE); + if (tblProps.containsKey(CarbonCommonConstants.TABLE_AUTO_LOAD_MERGE)) { + isLoadMergeEnabled = tblProps.get(CarbonCommonConstants.TABLE_AUTO_LOAD_MERGE); + } if (isLoadMergeEnabled.equalsIgnoreCase("false")) { return false; } @@ -386,9 +384,11 @@ public final class CarbonDataMergerUtil { * @return */ public static List<LoadMetadataDetails> identifySegmentsToBeMerged( - CarbonLoadModel carbonLoadModel, long compactionSize, - List<LoadMetadataDetails> segments, CompactionType compactionType) { + CarbonLoadModel carbonLoadModel, long compactionSize, + List<LoadMetadataDetails> segments, CompactionType compactionType) { String tablePath = carbonLoadModel.getTablePath(); + Map<String, String> tableLevelProperties = carbonLoadModel.getCarbonDataLoadSchema() + .getCarbonTable().getTableInfo().getFactTable().getTableProperties(); List<LoadMetadataDetails> sortedSegments = new ArrayList<LoadMetadataDetails>(segments); sortSegments(sortedSegments); @@ -402,22 +402,24 @@ public final class CarbonDataMergerUtil { // check preserve property and preserve the configured number of latest loads. List<LoadMetadataDetails> listOfSegmentsAfterPreserve = - checkPreserveSegmentsPropertyReturnRemaining(sortedSegments); + checkPreserveSegmentsPropertyReturnRemaining(sortedSegments, tableLevelProperties); // filter the segments if the compaction based on days is configured. List<LoadMetadataDetails> listOfSegmentsLoadedInSameDateInterval = - identifySegmentsToBeMergedBasedOnLoadedDate(listOfSegmentsAfterPreserve); + identifySegmentsToBeMergedBasedOnLoadedDate(listOfSegmentsAfterPreserve, + tableLevelProperties); List<LoadMetadataDetails> listOfSegmentsToBeMerged; // identify the segments to merge based on the Size of the segments across partition. if (CompactionType.MAJOR == compactionType) { listOfSegmentsToBeMerged = identifySegmentsToBeMergedBasedOnSize(compactionSize, - listOfSegmentsLoadedInSameDateInterval, carbonLoadModel, tablePath); + listOfSegmentsLoadedInSameDateInterval, carbonLoadModel, tablePath); } else { listOfSegmentsToBeMerged = - identifySegmentsToBeMergedBasedOnSegCount(listOfSegmentsLoadedInSameDateInterval); + identifySegmentsToBeMergedBasedOnSegCount(listOfSegmentsLoadedInSameDateInterval, + tableLevelProperties); } return listOfSegmentsToBeMerged; @@ -430,7 +432,8 @@ public final class CarbonDataMergerUtil { public static void sortSegments(List segments) { // sort the segment details. Collections.sort(segments, new Comparator<LoadMetadataDetails>() { - @Override public int compare(LoadMetadataDetails seg1, LoadMetadataDetails seg2) { + @Override + public int compare(LoadMetadataDetails seg1, LoadMetadataDetails seg2) { double seg1Id = Double.parseDouble(seg1.getLoadName()); double seg2Id = Double.parseDouble(seg2.getLoadName()); return Double.compare(seg1Id, seg2Id); @@ -443,19 +446,26 @@ public final class CarbonDataMergerUtil { * This property is configurable. * * @param listOfSegmentsBelowThresholdSize + * @param tblProps * @return */ private static List<LoadMetadataDetails> identifySegmentsToBeMergedBasedOnLoadedDate( - List<LoadMetadataDetails> listOfSegmentsBelowThresholdSize) { + List<LoadMetadataDetails> listOfSegmentsBelowThresholdSize, Map<String, String> tblProps) { List<LoadMetadataDetails> loadsOfSameDate = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); long numberOfDaysAllowedToMerge = 0; try { + // overwrite system level option by table level option if exists numberOfDaysAllowedToMerge = Long.parseLong(CarbonProperties.getInstance() - .getProperty(CarbonCommonConstants.DAYS_ALLOWED_TO_COMPACT, - CarbonCommonConstants.DEFAULT_DAYS_ALLOWED_TO_COMPACT)); + .getProperty(CarbonCommonConstants.DAYS_ALLOWED_TO_COMPACT, + CarbonCommonConstants.DEFAULT_DAYS_ALLOWED_TO_COMPACT)); + if (tblProps.containsKey(CarbonCommonConstants.TABLE_ALLOWED_COMPACTION_DAYS)) { + numberOfDaysAllowedToMerge = Long.parseLong( + tblProps.get(CarbonCommonConstants.TABLE_ALLOWED_COMPACTION_DAYS)); + } + if (numberOfDaysAllowedToMerge < 0 || numberOfDaysAllowedToMerge > 100) { LOGGER.error( "The specified value for property " + CarbonCommonConstants.DAYS_ALLOWED_TO_COMPACT @@ -675,18 +685,27 @@ public final class CarbonDataMergerUtil { * * @param listOfSegmentsAfterPreserve the list of segments after * preserve and before filtering by minor compaction level + * @param tblProps * @return the list of segments to be merged after filtering by minor compaction level */ private static List<LoadMetadataDetails> identifySegmentsToBeMergedBasedOnSegCount( - List<LoadMetadataDetails> listOfSegmentsAfterPreserve) { + List<LoadMetadataDetails> listOfSegmentsAfterPreserve, Map<String, String> tblProps) { List<LoadMetadataDetails> mergedSegments = - new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); + new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); List<LoadMetadataDetails> unMergedSegments = - new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); - - int[] noOfSegmentLevelsCount = - CarbonProperties.getInstance().getCompactionSegmentLevelCount(); + new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); + + int[] noOfSegmentLevelsCount = CarbonProperties.getInstance() + .getCompactionSegmentLevelCount(); + // overwrite system level option by table level option if exists + if (tblProps.containsKey(CarbonCommonConstants.TABLE_COMPACTION_LEVEL_THRESHOLD)) { + noOfSegmentLevelsCount = CarbonProperties.getInstance() + .getIntArray(tblProps.get(CarbonCommonConstants.TABLE_COMPACTION_LEVEL_THRESHOLD)); + if (0 == noOfSegmentLevelsCount.length) { + noOfSegmentLevelsCount = CarbonProperties.getInstance().getCompactionSegmentLevelCount(); + } + } int level1Size = 0; int level2Size = 0; @@ -756,14 +775,21 @@ public final class CarbonDataMergerUtil { * checks number of loads to be preserved and returns remaining valid segments * * @param segments + * @param tblProps * @return */ private static List<LoadMetadataDetails> checkPreserveSegmentsPropertyReturnRemaining( - List<LoadMetadataDetails> segments) { + List<LoadMetadataDetails> segments, Map<String, String> tblProps) { // check whether the preserving of the segments from merging is enabled or not. - // get the number of loads to be preserved. - int numberOfSegmentsToBePreserved = - CarbonProperties.getInstance().getNumberOfSegmentsToBePreserved(); + // get the number of loads to be preserved. default value is system level option + // overwrite system level option by table level option if exists + int numberOfSegmentsToBePreserved = CarbonProperties.getInstance() + .getNumberOfSegmentsToBePreserved(); + if (tblProps.containsKey(CarbonCommonConstants.TABLE_COMPACTION_PRESERVE_SEGMENTS)) { + numberOfSegmentsToBePreserved = Integer.parseInt( + tblProps.get(CarbonCommonConstants.TABLE_COMPACTION_PRESERVE_SEGMENTS)); + } + // get the number of valid segments and retain the latest loads from merging. return CarbonDataMergerUtil .getValidLoadDetailsWithRetaining(segments, numberOfSegmentsToBePreserved); @@ -808,14 +834,23 @@ public final class CarbonDataMergerUtil { * This will give the compaction sizes configured based on compaction type. * * @param compactionType + * @param carbonLoadModel * @return */ - public static long getCompactionSize(CompactionType compactionType) { - + public static long getCompactionSize(CompactionType compactionType, + CarbonLoadModel carbonLoadModel) { long compactionSize = 0; switch (compactionType) { case MAJOR: + // default value is system level option compactionSize = CarbonProperties.getInstance().getMajorCompactionSize(); + // if table level option is identified, use it to overwrite system level option + Map<String, String> tblProps = carbonLoadModel.getCarbonDataLoadSchema() + .getCarbonTable().getTableInfo().getFactTable().getTableProperties(); + if (tblProps.containsKey(CarbonCommonConstants.TABLE_MAJOR_COMPACTION_SIZE)) { + compactionSize = Long.parseLong( + tblProps.get(CarbonCommonConstants.TABLE_MAJOR_COMPACTION_SIZE)); + } break; default: // this case can not come. }
