http://git-wip-us.apache.org/repos/asf/carbondata/blob/0586146a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergeFilesRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergeFilesRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergeFilesRDD.scala new file mode 100644 index 0000000..6e8b000 --- /dev/null +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergeFilesRDD.scala @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.rdd + +import org.apache.spark.{Partition, SparkContext, TaskContext} + +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.core.writer.CarbonIndexFileMergeWriter + +case class CarbonMergeFilePartition(rddId: Int, val idx: Int, segmentPath: String) + extends Partition { + + override val index: Int = idx + + override def hashCode(): Int = 41 * (41 + rddId) + idx +} + +/** + * RDD to merge all carbonindex files of each segment to carbonindex file into the same segment. + * @param sc + * @param tablePath + * @param segments segments to be merged + */ +class CarbonMergeFilesRDD( + sc: SparkContext, + tablePath: String, + segments: Seq[String]) + extends CarbonRDD[String](sc, Nil) { + + override def getPartitions: Array[Partition] = { + segments.zipWithIndex.map {s => + CarbonMergeFilePartition(id, s._2, CarbonTablePath.getSegmentPath(tablePath, s._1)) + }.toArray + } + + override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[String] = { + val iter = new Iterator[String] { + val split = theSplit.asInstanceOf[CarbonMergeFilePartition] + logInfo("Merging carbon index files of segment : " + split.segmentPath) + + new CarbonIndexFileMergeWriter().mergeCarbonIndexFilesOfSegment(split.segmentPath) + + var havePair = false + var finished = false + + override def hasNext: Boolean = { + if (!finished && !havePair) { + finished = true + havePair = !finished + } + !finished + } + + override def next(): String = { + if (!hasNext) { + throw new java.util.NoSuchElementException("End of stream") + } + havePair = false + "" + } + + } + iter + } + +} +
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0586146a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala index fb610c1..cb25756 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala @@ -22,10 +22,12 @@ import scala.collection.JavaConverters._ import org.apache.spark.sql.execution.command.{CarbonMergerMapping, CompactionCallableModel} import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.mutate.CarbonUpdateUtil import org.apache.carbondata.core.statusmanager.SegmentStatusManager import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CompactionType} import org.apache.carbondata.spark.MergeResultImpl +import org.apache.carbondata.spark.util.CommonUtil /** * Compactor class which handled the compaction cases. @@ -106,6 +108,8 @@ object Compactor { } if (finalMergeStatus) { + val mergedLoadNumber = CarbonDataMergerUtil.getLoadNumberFromLoadName(mergedLoadName) + CommonUtil.mergeIndexFiles(sc.sparkContext, Seq(mergedLoadNumber), storePath, carbonTable) val endTime = System.nanoTime() logger.info(s"time taken to merge $mergedLoadName is ${ endTime - startTime }") val statusFileUpdation = @@ -116,7 +120,7 @@ object Compactor { carbonLoadModel))) || (CarbonDataMergerUtil .updateLoadMetadataWithMergeStatus(loadsToMerge, carbonTable.getMetaDataFilepath(), - mergedLoadName, carbonLoadModel, mergeLoadStartTime, compactionType)) + mergedLoadNumber, carbonLoadModel, mergeLoadStartTime, compactionType)) ) if (!statusFileUpdation) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/0586146a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala index bc24c12..27ebf42 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala @@ -28,7 +28,7 @@ import org.apache.commons.lang3.StringUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.lib.input.FileInputFormat import org.apache.spark.SparkContext -import org.apache.spark.sql.{Row, RowFactory} +import org.apache.spark.sql.{Row, RowFactory, SQLContext} import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.execution.command.{ColumnProperty, Field, PartitionerField} import org.apache.spark.sql.types.{MetadataBuilder, StringType} @@ -44,6 +44,7 @@ import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes} import org.apache.carbondata.core.metadata.schema.PartitionInfo import org.apache.carbondata.core.metadata.schema.partition.PartitionType +import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.mutate.CarbonUpdateUtil import org.apache.carbondata.core.scan.partition.PartitionUtil import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager} @@ -55,6 +56,7 @@ import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingExcep import org.apache.carbondata.processing.loading.model.CarbonLoadModel import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, CarbonLoaderUtil} import org.apache.carbondata.spark.exception.MalformedCarbonCommandException +import org.apache.carbondata.spark.rdd.CarbonMergeFilesRDD object CommonUtil { private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) @@ -831,4 +833,20 @@ object CommonUtil { LOGGER.error(s) } } + + /** + * Merge the carbonindex files with in the segment to carbonindexmerge file inside same segment + */ + def mergeIndexFiles(sparkContext: SparkContext, + segmentIds: Seq[String], + storePath: String, + carbonTable: CarbonTable): Unit = { + if (CarbonProperties.getInstance().getProperty( + CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, + CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT).toBoolean) { + new CarbonMergeFilesRDD(sparkContext, AbsoluteTableIdentifier.from(storePath, + carbonTable.getDatabaseName, carbonTable.getFactTableName).getTablePath, + segmentIds).collect() + } + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/0586146a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala index 4649082..1163b3f 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala @@ -35,12 +35,11 @@ import org.apache.spark.util.PartitionUtils import org.apache.carbondata.common.constants.LoggerAction import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes} +import org.apache.carbondata.core.metadata.datatype.DataTypes import org.apache.carbondata.core.metadata.schema.PartitionInfo import org.apache.carbondata.core.metadata.schema.partition.PartitionType import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, DataTypeUtil} -import org.apache.carbondata.processing.loading.sort.SortScopeOptions import org.apache.carbondata.spark.exception.MalformedCarbonCommandException import org.apache.carbondata.spark.util.{CommonUtil, DataTypeConverterUtil} http://git-wip-us.apache.org/repos/asf/carbondata/blob/0586146a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala index 87de8ae..628d444 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala @@ -98,8 +98,10 @@ object CarbonDataRDDFactory { .setLoadMetadataDetails(alterTableModel.segmentUpdateStatusManager.get .getLoadMetadataDetails.toList.asJava) } - } - else { + } else if (alterTableModel.compactionType. + equalsIgnoreCase(CompactionType.SEGMENT_INDEX_COMPACTION.toString)) { + compactionType = CompactionType.SEGMENT_INDEX_COMPACTION + } else { compactionType = CompactionType.MINOR_COMPACTION } @@ -110,6 +112,14 @@ object CarbonDataRDDFactory { if (null == carbonLoadModel.getLoadMetadataDetails) { CommonUtil.readLoadMetadataDetails(carbonLoadModel) } + if (compactionType == CompactionType.SEGMENT_INDEX_COMPACTION) { + // Just launch job to merge index and return + CommonUtil.mergeIndexFiles(sqlContext.sparkContext, + carbonLoadModel.getLoadMetadataDetails.asScala.map(_.getLoadName), + carbonLoadModel.getStorePath, + carbonTable) + return + } // reading the start time of data load. val loadStartTime : Long = if (alterTableModel.factTimeStamp.isEmpty) { @@ -959,9 +969,10 @@ object CarbonDataRDDFactory { } )) - } - else { - val newStatusMap = scala.collection.mutable.Map.empty[String, String] + } else { + CommonUtil.mergeIndexFiles(sqlContext.sparkContext, + Seq(carbonLoadModel.getSegmentId), storePath, carbonTable) + val newStatusMap = scala.collection.mutable.Map.empty[String, String] if (status.nonEmpty) { status.foreach { eachLoadStatus => val state = newStatusMap.get(eachLoadStatus._1) @@ -1142,8 +1153,10 @@ object CarbonDataRDDFactory { } + /** * repartition the input data for partition table. + * * @param sqlContext * @param dataFrame * @param carbonLoadModel http://git-wip-us.apache.org/repos/asf/carbondata/blob/0586146a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala index 715af1d..bdfaa5a 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala @@ -87,7 +87,8 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy { altertablemodel.dbName))(sparkSession) if (isCarbonTable) { if (altertablemodel.compactionType.equalsIgnoreCase("minor") || - altertablemodel.compactionType.equalsIgnoreCase("major")) { + altertablemodel.compactionType.equalsIgnoreCase("major") || + altertablemodel.compactionType.equalsIgnoreCase("SEGMENT_INDEX_COMPACTION")) { ExecutedCommandExec(alterTable) :: Nil } else { throw new MalformedCarbonCommandException( http://git-wip-us.apache.org/repos/asf/carbondata/blob/0586146a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java index 13972c8..53add22 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java @@ -131,24 +131,6 @@ public final class CarbonDataMergerUtil { /** - * Form the Name of the New Merge Folder - * - * @param segmentToBeMerged - * @return - */ - public static String getMergedLoadName(final String segmentToBeMerged) { - if (segmentToBeMerged.contains(".")) { - String beforeDecimal = segmentToBeMerged.substring(0, segmentToBeMerged.indexOf(".")); - String afterDecimal = segmentToBeMerged.substring(segmentToBeMerged.indexOf(".") + 1); - int fraction = Integer.parseInt(afterDecimal) + 1; - return beforeDecimal + "." + fraction; - } else { - return segmentToBeMerged + "." + 1; - } - - } - - /** * Update Both Segment Update Status and Table Status for the case of IUD Delete * delta compaction. * @@ -294,13 +276,13 @@ public final class CarbonDataMergerUtil { * method to update table status in case of IUD Update Delta Compaction. * @param loadsToMerge * @param metaDataFilepath - * @param MergedLoadName + * @param mergedLoadNumber * @param carbonLoadModel * @param compactionType * @return */ public static boolean updateLoadMetadataWithMergeStatus(List<LoadMetadataDetails> loadsToMerge, - String metaDataFilepath, String MergedLoadName, CarbonLoadModel carbonLoadModel, + String metaDataFilepath, String mergedLoadNumber, CarbonLoadModel carbonLoadModel, long mergeLoadStartTime, CompactionType compactionType) { boolean tableStatusUpdationStatus = false; @@ -323,10 +305,6 @@ public final class CarbonDataMergerUtil { LoadMetadataDetails[] loadDetails = SegmentStatusManager.readLoadMetadata(metaDataFilepath); - String mergedLoadNumber = MergedLoadName.substring( - MergedLoadName.lastIndexOf(CarbonCommonConstants.LOAD_FOLDER) - + CarbonCommonConstants.LOAD_FOLDER.length(), MergedLoadName.length()); - long modificationOrDeletionTimeStamp = CarbonUpdateUtil.readCurrentTime(); for (LoadMetadataDetails loadDetail : loadDetails) { // check if this segment is merged. @@ -391,6 +369,17 @@ public final class CarbonDataMergerUtil { } /** + * Get the load number from load name. + * @param loadName + * @return + */ + public static String getLoadNumberFromLoadName(String loadName) { + return loadName.substring( + loadName.lastIndexOf(CarbonCommonConstants.LOAD_FOLDER) + CarbonCommonConstants.LOAD_FOLDER + .length(), loadName.length()); + } + + /** * To identify which all segments can be merged. * * @param carbonLoadModel http://git-wip-us.apache.org/repos/asf/carbondata/blob/0586146a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionType.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionType.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionType.java index 6b9c80a..863257c 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionType.java +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionType.java @@ -27,5 +27,6 @@ public enum CompactionType { MAJOR_COMPACTION, IUD_UPDDEL_DELTA_COMPACTION, IUD_DELETE_DELTA_COMPACTION, + SEGMENT_INDEX_COMPACTION, NONE }
