This is an automated email from the ASF dual-hosted git repository. ajantha pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push: new 28bcfa3 [CARBONDATA-3579] Support merging index files when adding new partition 28bcfa3 is described below commit 28bcfa3a8c0f1ec8142fac1f81a0e050282762df Author: Jacky Li <jacky.li...@qq.com> AuthorDate: Thu Nov 14 02:20:08 2019 +0800 [CARBONDATA-3579] Support merging index files when adding new partition Normally, application will use Carbon SDK to write files into a partition folder, then add the folder to partitioned carbon table. If there are many threads writes to the same partition folder, there will be many carbon index files, and it is not good for query performance since all index files need to be read to spark driver. So, a better way is to merge the index files when adding new partition to carbon table. This closes #3452 --- .../StandardPartitionTableQueryTestCase.scala | 50 +++++++++++++++++++++- .../carbondata/events/AlterTableEvents.scala | 6 +-- .../spark/sql/events/MergeIndexEventListener.scala | 24 +++++++---- .../CarbonAlterTableAddHivePartitionCommand.scala | 23 ++++++++-- 4 files changed, 88 insertions(+), 15 deletions(-) diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala index fb4b511..e151547 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala @@ -20,7 +20,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.strategy.CarbonDataSourceScan import org.apache.spark.sql.test.Spark2TestQueryExecutor import org.apache.spark.sql.test.util.QueryTest -import org.apache.spark.sql.{DataFrame, Row} +import org.apache.spark.sql.{CarbonEnv, DataFrame, Row} import org.apache.spark.util.SparkUtil import org.scalatest.BeforeAndAfterAll @@ -28,6 +28,8 @@ import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandExcepti import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.sdk.file.{CarbonWriter, Field, Schema} import org.apache.carbondata.spark.rdd.CarbonScanRDD class StandardPartitionTableQueryTestCase extends QueryTest with BeforeAndAfterAll { @@ -314,6 +316,52 @@ test("Creation of partition table should fail if the colname in table schema and FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(location)) } + test("sdk write and add partition based on location on partition table"){ + sql("drop table if exists partitionTable") + sql("create table partitionTable (id int,name String) partitioned by(email string) stored as carbondata") + sql("insert into partitionTable select 1,'blue','abc'") + val schemaFile = + CarbonTablePath.getSchemaFilePath( + CarbonEnv.getCarbonTable(None, "partitionTable")(sqlContext.sparkSession).getTablePath) + + val sdkWritePath = metaStoreDB +"/" +"def" + FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(sdkWritePath)) + + (1 to 3).foreach { i => + val writer = CarbonWriter.builder() + .outputPath(sdkWritePath) + .writtenBy("test") + .withSchemaFile(schemaFile) + .withCsvInput() + .build() + writer.write(Seq("2", "red", "def").toArray) + writer.write(Seq("3", "black", "def").toArray) + writer.close() + } + + sql(s"alter table partitionTable add partition (email='def') location '$sdkWritePath'") + sql("show partitions partitionTable").show(false) + checkAnswer(sql("show partitions partitionTable"), Seq(Row("email=abc"), Row("email=def"))) + checkAnswer(sql("select email from partitionTable"), Seq(Row("abc"), Row("def"), Row("def"), Row("def"), Row("def"), Row("def"), Row("def"))) + checkAnswer(sql("select count(*) from partitionTable"), Seq(Row(7))) + checkAnswer(sql("select id from partitionTable where email = 'def'"), Seq(Row(2), Row(3), Row(2), Row(3), Row(2), Row(3))) + // alter table add partition should merge index files + assert(FileFactory.getCarbonFile(sdkWritePath) + .listFiles() + .exists(_.getName.contains(".carbonindexmerge"))) + + // do compaction to sort data written by sdk + sql("alter table partitionTable compact 'major'") + assert(sql("show segments for table partitionTable").collectAsList().get(0).getString(1).contains("Compacted")) + checkAnswer(sql("show partitions partitionTable"), Seq(Row("email=abc"), Row("email=def"))) + checkAnswer(sql("select email from partitionTable"), Seq(Row("abc"), Row("def"), Row("def"), Row("def"), Row("def"), Row("def"), Row("def"))) + checkAnswer(sql("select count(*) from partitionTable"), Seq(Row(7))) + checkAnswer(sql("select id from partitionTable where email = 'def'"), Seq(Row(2), Row(3), Row(2), Row(3), Row(2), Row(3))) + + sql("drop table if exists partitionTable") + FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(sdkWritePath)) + } + test("add partition with static column partition with load command") { sql( """ diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala index 56f4d29..ea73ebe 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala @@ -205,9 +205,9 @@ case class AlterTableCompactionAbortEvent(sparkSession: SparkSession, /** * Compaction Event for handling merge index in alter DDL * - * @param sparkSession - * @param carbonTable - * @param alterTableModel + * @param sparkSession spark session + * @param carbonTable carbon table + * @param alterTableModel alter request */ case class AlterTableMergeIndexEvent(sparkSession: SparkSession, carbonTable: CarbonTable, diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala index 967f390..fcee9f1 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala @@ -84,12 +84,20 @@ class MergeIndexEventListener extends OperationEventListener with Logging { carbonMainTable .getTableName }") - val validSegments: mutable.Buffer[Segment] = CarbonDataMergerUtil.getValidSegmentList( - carbonMainTable.getAbsoluteTableIdentifier, carbonMainTable.isChildTable).asScala - val validSegmentIds: mutable.Buffer[String] = mutable.Buffer[String]() - validSegments.foreach { segment => - validSegmentIds += segment.getSegmentNo - } + val segmentsToMerge = + if (alterTableMergeIndexEvent.alterTableModel.customSegmentIds.isEmpty) { + val validSegments = CarbonDataMergerUtil.getValidSegmentList( + carbonMainTable.getAbsoluteTableIdentifier, + carbonMainTable.isChildTable).asScala + val validSegmentIds: mutable.Buffer[String] = mutable.Buffer[String]() + validSegments.foreach { segment => + validSegmentIds += segment.getSegmentNo + } + validSegmentIds + } else { + alterTableMergeIndexEvent.alterTableModel.customSegmentIds.get + } + val loadFolderDetailsArray = SegmentStatusManager .readLoadMetadata(carbonMainTable.getMetadataPath) val segmentFileNameMap: java.util.Map[String, String] = new util.HashMap[String, @@ -105,14 +113,14 @@ class MergeIndexEventListener extends OperationEventListener with Logging { // old store is also upgraded to new store CarbonMergeFilesRDD.mergeIndexFiles( sparkSession = sparkSession, - segmentIds = validSegmentIds, + segmentIds = segmentsToMerge, segmentFileNameToSegmentIdMap = segmentFileNameMap, tablePath = carbonMainTable.getTablePath, carbonTable = carbonMainTable, mergeIndexProperty = true, readFileFooterFromCarbonDataFile = true) // clear Block dataMap Cache - MergeIndexUtil.clearBlockDataMapCache(carbonMainTable, validSegmentIds) + MergeIndexUtil.clearBlockDataMapCache(carbonMainTable, segmentsToMerge) val requestMessage = "Compaction request completed for table " + s"${ carbonMainTable.getDatabaseName }.${ carbonMainTable.getTableName }" LOGGER.info(requestMessage) diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala index a7bf5f4..f29b7b2 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala @@ -24,7 +24,7 @@ import scala.collection.JavaConverters._ import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec -import org.apache.spark.sql.execution.command.{AlterTableAddPartitionCommand, AlterTableDropPartitionCommand, AtomicRunnableCommand} +import org.apache.spark.sql.execution.command.{AlterTableAddPartitionCommand, AlterTableDropPartitionCommand, AlterTableModel, AtomicRunnableCommand} import org.apache.spark.sql.optimizer.CarbonFilters import org.apache.carbondata.common.logging.LogServiceFactory @@ -36,7 +36,7 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.statusmanager.SegmentStatus import org.apache.carbondata.core.util.CarbonUtil import org.apache.carbondata.core.util.path.CarbonTablePath -import org.apache.carbondata.events.{OperationContext, OperationListenerBus, PostAlterTableHivePartitionCommandEvent, PreAlterTableHivePartitionCommandEvent} +import org.apache.carbondata.events.{AlterTableMergeIndexEvent, OperationContext, OperationListenerBus, PostAlterTableHivePartitionCommandEvent, PreAlterTableHivePartitionCommandEvent} import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel} import org.apache.carbondata.processing.util.CarbonLoaderUtil @@ -116,7 +116,7 @@ case class CarbonAlterTableAddHivePartitionCommand( if (segmentFile != null) { val indexToSchemas = SegmentFileStore.getSchemaFiles(segmentFile, table.getTablePath) val tableColums = table.getTableInfo.getFactTable.getListOfColumns.asScala - var isSameSchema = indexToSchemas.asScala.exists{ case(key, columnSchemas) => + val isSameSchema = indexToSchemas.asScala.exists{ case(key, columnSchemas) => columnSchemas.asScala.exists { col => tableColums.exists(p => p.getColumnUniqueId.equals(col.getColumnUniqueId)) } && columnSchemas.size() == tableColums.length @@ -153,6 +153,23 @@ case class CarbonAlterTableAddHivePartitionCommand( CarbonLoaderUtil.addDataIndexSizeIntoMetaEntry(newMetaEntry, loadModel.getSegmentId, table) // Make the load as success in table status CarbonLoaderUtil.recordNewLoadMetadata(newMetaEntry, loadModel, false, false) + + // Normally, application will use Carbon SDK to write files into a partition folder, then + // add the folder to partitioned carbon table. + // If there are many threads writes to the same partition folder, there will be many + // carbon index files, and it is not good for query performance since all index files + // need to be read to spark driver. + // So, here trigger to merge the index files by sending an event + val alterTableModel = AlterTableModel( + dbName = Some(table.getDatabaseName), + tableName = table.getTableName, + segmentUpdateStatusManager = None, + compactionType = "", // to trigger index merge, this is not required + factTimeStamp = Some(System.currentTimeMillis()), + alterSql = null, + customSegmentIds = Some(Seq(loadModel.getSegmentId).toList)) + val mergeIndexEvent = AlterTableMergeIndexEvent(sparkSession, table, alterTableModel) + OperationListenerBus.getInstance.fireEvent(mergeIndexEvent, new OperationContext) } } Seq.empty[Row]