Repository: carbondata Updated Branches: refs/heads/master 4a090ce27 -> 0e8588744
[CARBONDATA-3009] Move method for mergeIndex to correct the place Currently the entry point of function for mergeIndex is in CommonUtil which is not proper. Here in this commit, we will move this to the right place. This closes #2817 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/0e858874 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/0e858874 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/0e858874 Branch: refs/heads/master Commit: 0e8588744fa0d4d0ee0c02d52de09231492f56d8 Parents: 4a090ce Author: xuchuanyin <[email protected]> Authored: Tue Oct 16 14:57:16 2018 +0800 Committer: Jacky Li <[email protected]> Committed: Thu Oct 18 17:08:08 2018 +0800 ---------------------------------------------------------------------- .../carbondata/spark/util/CommonUtil.scala | 68 ++------------------ .../apache/spark/rdd/CarbonMergeFilesRDD.scala | 63 ++++++++++++++++++ .../sql/events/MergeIndexEventListener.scala | 8 ++- 3 files changed, 72 insertions(+), 67 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/0e858874/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala index f6e2b94..82a2f9d 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala @@ -29,13 +29,11 @@ import scala.util.Random import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.lib.input.FileInputFormat import org.apache.spark.{SparkContext, SparkEnv} -import org.apache.spark.rdd.CarbonMergeFilesRDD -import org.apache.spark.sql.{Row, RowFactory, SparkSession} +import org.apache.spark.sql.{Row, RowFactory} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.execution.command.{ColumnProperty, Field, PartitionerField} import org.apache.spark.sql.types.{MetadataBuilder, StringType} -import org.apache.spark.sql.util.CarbonException import org.apache.spark.util.FileUtils import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException @@ -43,14 +41,13 @@ import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.memory.{UnsafeMemoryManager, UnsafeSortMemoryManager} -import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata} +import org.apache.carbondata.core.metadata.CarbonMetadata import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes} import org.apache.carbondata.core.metadata.schema.PartitionInfo import org.apache.carbondata.core.metadata.schema.partition.PartitionType -import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.scan.partition.PartitionUtil -import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager} -import org.apache.carbondata.core.util.{ByteUtil, CarbonProperties, CarbonUtil} +import org.apache.carbondata.core.statusmanager.SegmentStatusManager +import org.apache.carbondata.core.util.{ByteUtil, CarbonProperties} import org.apache.carbondata.core.util.comparator.Comparator import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat @@ -826,61 +823,4 @@ object CommonUtil { }) } } - - /** - * Merge the carbonindex files with in the segment to carbonindexmerge file inside same segment - * - * @param sparkContext - * @param segmentIds - * @param tablePath - * @param carbonTable - * @param mergeIndexProperty - * @param readFileFooterFromCarbonDataFile flag to read file footer information from carbondata - * file. This will used in case of upgrade from version - * which do not store the blocklet info to current - * version - */ - def mergeIndexFiles(sparkSession: SparkSession, - segmentIds: Seq[String], - segmentFileNameToSegmentIdMap: java.util.Map[String, String], - tablePath: String, - carbonTable: CarbonTable, - mergeIndexProperty: Boolean, - readFileFooterFromCarbonDataFile: Boolean = false): Unit = { - if (mergeIndexProperty) { - new CarbonMergeFilesRDD( - sparkSession, - carbonTable, - segmentIds, - segmentFileNameToSegmentIdMap, - carbonTable.isHivePartitionTable, - readFileFooterFromCarbonDataFile).collect() - } else { - try { - if (CarbonProperties.getInstance().getProperty( - CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, - CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT).toBoolean) { - new CarbonMergeFilesRDD( - sparkSession, - carbonTable, - segmentIds, - segmentFileNameToSegmentIdMap, - carbonTable.isHivePartitionTable, - readFileFooterFromCarbonDataFile).collect() - } - } catch { - case _: Exception => - if (CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT.toBoolean) { - new CarbonMergeFilesRDD( - sparkSession, - carbonTable, - segmentIds, - segmentFileNameToSegmentIdMap, - carbonTable.isHivePartitionTable, - readFileFooterFromCarbonDataFile).collect() - } - } - } - } - } http://git-wip-us.apache.org/repos/asf/carbondata/blob/0e858874/integration/spark-common/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala b/integration/spark-common/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala index e29a658..3605dde 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala @@ -20,7 +20,9 @@ package org.apache.spark.rdd import org.apache.spark.{Partition, TaskContext} import org.apache.spark.sql.SparkSession +import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.core.writer.CarbonIndexFileMergeWriter import org.apache.carbondata.processing.util.CarbonLoaderUtil @@ -34,6 +36,67 @@ case class CarbonMergeFilePartition(rddId: Int, idx: Int, segmentId: String) override def hashCode(): Int = 41 * (41 + rddId) + idx } +object CarbonMergeFilesRDD { + + /** + * Merge the carbonindex files with in the segment to carbonindexmerge file inside same segment + * + * @param sparkSession carbon session + * @param segmentIds the segments to process + * @param segmentFileNameToSegmentIdMap a map that map the segmentFileName to segmentId + * @param tablePath table path + * @param carbonTable carbon table + * @param mergeIndexProperty whether to merge the property of the carbon index, the usage + * scenario is the same as that of `readFileFooterFromCarbonDataFile` + * @param readFileFooterFromCarbonDataFile flag to read file footer information from carbondata + * file. This will used in case of upgrade from version + * which do not store the blocklet info to current + * version + */ + def mergeIndexFiles(sparkSession: SparkSession, + segmentIds: Seq[String], + segmentFileNameToSegmentIdMap: java.util.Map[String, String], + tablePath: String, + carbonTable: CarbonTable, + mergeIndexProperty: Boolean, + readFileFooterFromCarbonDataFile: Boolean = false): Unit = { + if (mergeIndexProperty) { + new CarbonMergeFilesRDD( + sparkSession, + carbonTable, + segmentIds, + segmentFileNameToSegmentIdMap, + carbonTable.isHivePartitionTable, + readFileFooterFromCarbonDataFile).collect() + } else { + try { + if (CarbonProperties.getInstance().getProperty( + CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, + CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT).toBoolean) { + new CarbonMergeFilesRDD( + sparkSession, + carbonTable, + segmentIds, + segmentFileNameToSegmentIdMap, + carbonTable.isHivePartitionTable, + readFileFooterFromCarbonDataFile).collect() + } + } catch { + case _: Exception => + if (CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT.toBoolean) { + new CarbonMergeFilesRDD( + sparkSession, + carbonTable, + segmentIds, + segmentFileNameToSegmentIdMap, + carbonTable.isHivePartitionTable, + readFileFooterFromCarbonDataFile).collect() + } + } + } + } +} + /** * RDD to merge all carbonindex files of each segment to carbonindex file into the same segment. * @param ss http://git-wip-us.apache.org/repos/asf/carbondata/blob/0e858874/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala index 639a0e3..c8c9a47 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala @@ -23,6 +23,8 @@ import scala.collection.JavaConverters._ import scala.collection.mutable import org.apache.spark.internal.Logging +import org.apache.spark.SparkContext +import org.apache.spark.rdd.CarbonMergeFilesRDD import org.apache.spark.sql.SparkSession import org.apache.spark.sql.util.CarbonException @@ -61,7 +63,7 @@ class MergeIndexEventListener extends OperationEventListener with Logging { segmentFileNameMap .put(loadModel.getSegmentId, String.valueOf(loadModel.getFactTimeStamp)) - CommonUtil.mergeIndexFiles(sparkSession, + CarbonMergeFilesRDD.mergeIndexFiles(sparkSession, Seq(loadModel.getSegmentId), segmentFileNameMap, carbonTable.getTablePath, @@ -116,7 +118,7 @@ class MergeIndexEventListener extends OperationEventListener with Logging { // readFileFooterFromCarbonDataFile flag should be true. This flag is check for legacy // store (store <= 1.1 version) and create merge Index file as per new store so that // old store is also upgraded to new store - CommonUtil.mergeIndexFiles( + CarbonMergeFilesRDD.mergeIndexFiles( sparkSession = sparkSession, segmentIds = validSegmentIds, segmentFileNameToSegmentIdMap = segmentFileNameMap, @@ -176,7 +178,7 @@ class MergeIndexEventListener extends OperationEventListener with Logging { val validMergedSegIds = validSegments .filter { seg => mergedSegmentIds.contains(seg.getSegmentNo) }.map(_.getSegmentNo) if (null != validMergedSegIds && validMergedSegIds.nonEmpty) { - CommonUtil.mergeIndexFiles(sparkSession, + CarbonMergeFilesRDD.mergeIndexFiles(sparkSession, validMergedSegIds, segmentFileNameMap, carbonTable.getTablePath,
