Repository: carbondata
Updated Branches:
  refs/heads/master 4a090ce27 -> 0e8588744


[CARBONDATA-3009] Move method for mergeIndex to correct the place

Currently the entry point of function for mergeIndex is in CommonUtil
which is not proper. Here in this commit, we will move this to the right
place.

This closes #2817


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/0e858874
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/0e858874
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/0e858874

Branch: refs/heads/master
Commit: 0e8588744fa0d4d0ee0c02d52de09231492f56d8
Parents: 4a090ce
Author: xuchuanyin <[email protected]>
Authored: Tue Oct 16 14:57:16 2018 +0800
Committer: Jacky Li <[email protected]>
Committed: Thu Oct 18 17:08:08 2018 +0800

----------------------------------------------------------------------
 .../carbondata/spark/util/CommonUtil.scala      | 68 ++------------------
 .../apache/spark/rdd/CarbonMergeFilesRDD.scala  | 63 ++++++++++++++++++
 .../sql/events/MergeIndexEventListener.scala    |  8 ++-
 3 files changed, 72 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/0e858874/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index f6e2b94..82a2f9d 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -29,13 +29,11 @@ import scala.util.Random
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
 import org.apache.spark.{SparkContext, SparkEnv}
-import org.apache.spark.rdd.CarbonMergeFilesRDD
-import org.apache.spark.sql.{Row, RowFactory, SparkSession}
+import org.apache.spark.sql.{Row, RowFactory}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference}
 import org.apache.spark.sql.execution.command.{ColumnProperty, Field, 
PartitionerField}
 import org.apache.spark.sql.types.{MetadataBuilder, StringType}
-import org.apache.spark.sql.util.CarbonException
 import org.apache.spark.util.FileUtils
 
 import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
@@ -43,14 +41,13 @@ import 
org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.memory.{UnsafeMemoryManager, 
UnsafeSortMemoryManager}
-import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, 
CarbonMetadata}
+import org.apache.carbondata.core.metadata.CarbonMetadata
 import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes}
 import org.apache.carbondata.core.metadata.schema.PartitionInfo
 import org.apache.carbondata.core.metadata.schema.partition.PartitionType
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.scan.partition.PartitionUtil
-import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, 
SegmentStatusManager}
-import org.apache.carbondata.core.util.{ByteUtil, CarbonProperties, CarbonUtil}
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager
+import org.apache.carbondata.core.util.{ByteUtil, CarbonProperties}
 import org.apache.carbondata.core.util.comparator.Comparator
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat
@@ -826,61 +823,4 @@ object CommonUtil {
       })
     }
   }
-
-  /**
-   * Merge the carbonindex files with in the segment to carbonindexmerge file 
inside same segment
-   *
-   * @param sparkContext
-   * @param segmentIds
-   * @param tablePath
-   * @param carbonTable
-   * @param mergeIndexProperty
-   * @param readFileFooterFromCarbonDataFile flag to read file footer 
information from carbondata
-   *                                         file. This will used in case of 
upgrade from version
-   *                                         which do not store the blocklet 
info to current
-   *                                         version
-   */
-  def mergeIndexFiles(sparkSession: SparkSession,
-    segmentIds: Seq[String],
-    segmentFileNameToSegmentIdMap: java.util.Map[String, String],
-    tablePath: String,
-    carbonTable: CarbonTable,
-    mergeIndexProperty: Boolean,
-    readFileFooterFromCarbonDataFile: Boolean = false): Unit = {
-    if (mergeIndexProperty) {
-      new CarbonMergeFilesRDD(
-        sparkSession,
-        carbonTable,
-        segmentIds,
-        segmentFileNameToSegmentIdMap,
-        carbonTable.isHivePartitionTable,
-        readFileFooterFromCarbonDataFile).collect()
-    } else {
-      try {
-        if (CarbonProperties.getInstance().getProperty(
-          CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT,
-          
CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT).toBoolean) {
-          new CarbonMergeFilesRDD(
-            sparkSession,
-            carbonTable,
-            segmentIds,
-            segmentFileNameToSegmentIdMap,
-            carbonTable.isHivePartitionTable,
-            readFileFooterFromCarbonDataFile).collect()
-        }
-      } catch {
-        case _: Exception =>
-          if 
(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT.toBoolean) {
-            new CarbonMergeFilesRDD(
-              sparkSession,
-              carbonTable,
-              segmentIds,
-              segmentFileNameToSegmentIdMap,
-              carbonTable.isHivePartitionTable,
-              readFileFooterFromCarbonDataFile).collect()
-          }
-      }
-    }
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0e858874/integration/spark-common/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala
 
b/integration/spark-common/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala
index e29a658..3605dde 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala
@@ -20,7 +20,9 @@ package org.apache.spark.rdd
 import org.apache.spark.{Partition, TaskContext}
 import org.apache.spark.sql.SparkSession
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.core.writer.CarbonIndexFileMergeWriter
 import org.apache.carbondata.processing.util.CarbonLoaderUtil
@@ -34,6 +36,67 @@ case class CarbonMergeFilePartition(rddId: Int, idx: Int, 
segmentId: String)
   override def hashCode(): Int = 41 * (41 + rddId) + idx
 }
 
+object CarbonMergeFilesRDD {
+
+  /**
+   * Merge the carbonindex files with in the segment to carbonindexmerge file 
inside same segment
+   *
+   * @param sparkSession carbon session
+   * @param segmentIds the segments to process
+   * @param segmentFileNameToSegmentIdMap a map that map the segmentFileName 
to segmentId
+   * @param tablePath table path
+   * @param carbonTable carbon table
+   * @param mergeIndexProperty whether to merge the property of the carbon 
index, the usage
+   *                           scenario is the same as that of 
`readFileFooterFromCarbonDataFile`
+   * @param readFileFooterFromCarbonDataFile flag to read file footer 
information from carbondata
+   *                                         file. This will used in case of 
upgrade from version
+   *                                         which do not store the blocklet 
info to current
+   *                                         version
+   */
+  def mergeIndexFiles(sparkSession: SparkSession,
+      segmentIds: Seq[String],
+      segmentFileNameToSegmentIdMap: java.util.Map[String, String],
+      tablePath: String,
+      carbonTable: CarbonTable,
+      mergeIndexProperty: Boolean,
+      readFileFooterFromCarbonDataFile: Boolean = false): Unit = {
+    if (mergeIndexProperty) {
+      new CarbonMergeFilesRDD(
+        sparkSession,
+        carbonTable,
+        segmentIds,
+        segmentFileNameToSegmentIdMap,
+        carbonTable.isHivePartitionTable,
+        readFileFooterFromCarbonDataFile).collect()
+    } else {
+      try {
+        if (CarbonProperties.getInstance().getProperty(
+          CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT,
+          
CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT).toBoolean) {
+          new CarbonMergeFilesRDD(
+            sparkSession,
+            carbonTable,
+            segmentIds,
+            segmentFileNameToSegmentIdMap,
+            carbonTable.isHivePartitionTable,
+            readFileFooterFromCarbonDataFile).collect()
+        }
+      } catch {
+        case _: Exception =>
+          if 
(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT.toBoolean) {
+            new CarbonMergeFilesRDD(
+              sparkSession,
+              carbonTable,
+              segmentIds,
+              segmentFileNameToSegmentIdMap,
+              carbonTable.isHivePartitionTable,
+              readFileFooterFromCarbonDataFile).collect()
+          }
+      }
+    }
+  }
+}
+
 /**
  * RDD to merge all carbonindex files of each segment to carbonindex file into 
the same segment.
  * @param ss

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0e858874/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
index 639a0e3..c8c9a47 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
@@ -23,6 +23,8 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable
 
 import org.apache.spark.internal.Logging
+import org.apache.spark.SparkContext
+import org.apache.spark.rdd.CarbonMergeFilesRDD
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.util.CarbonException
 
@@ -61,7 +63,7 @@ class MergeIndexEventListener extends OperationEventListener 
with Logging {
 
             segmentFileNameMap
               .put(loadModel.getSegmentId, 
String.valueOf(loadModel.getFactTimeStamp))
-            CommonUtil.mergeIndexFiles(sparkSession,
+            CarbonMergeFilesRDD.mergeIndexFiles(sparkSession,
               Seq(loadModel.getSegmentId),
               segmentFileNameMap,
               carbonTable.getTablePath,
@@ -116,7 +118,7 @@ class MergeIndexEventListener extends 
OperationEventListener with Logging {
               // readFileFooterFromCarbonDataFile flag should be true. This 
flag is check for legacy
               // store (store <= 1.1 version) and create merge Index file as 
per new store so that
               // old store is also upgraded to new store
-              CommonUtil.mergeIndexFiles(
+              CarbonMergeFilesRDD.mergeIndexFiles(
                 sparkSession = sparkSession,
                 segmentIds = validSegmentIds,
                 segmentFileNameToSegmentIdMap = segmentFileNameMap,
@@ -176,7 +178,7 @@ class MergeIndexEventListener extends 
OperationEventListener with Logging {
     val validMergedSegIds = validSegments
       .filter { seg => mergedSegmentIds.contains(seg.getSegmentNo) 
}.map(_.getSegmentNo)
     if (null != validMergedSegIds && validMergedSegIds.nonEmpty) {
-      CommonUtil.mergeIndexFiles(sparkSession,
+      CarbonMergeFilesRDD.mergeIndexFiles(sparkSession,
           validMergedSegIds,
           segmentFileNameMap,
           carbonTable.getTablePath,

Reply via email to