Repository: incubator-carbondata
Updated Branches:
  refs/heads/master e23222cf1 -> 8802e9ebb


if during compaction of 4 loads, for any executor if only first 3 loads task is 
assigned then the col cardinality calculation based on the last segment info 
will become wrong.

in this case the cardinality will go wrong for that executor.


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/072dfee4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/072dfee4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/072dfee4

Branch: refs/heads/master
Commit: 072dfee476293713d02bd2b29d61f6ad660d37c0
Parents: e23222c
Author: ravikiran <ravikiran.sn...@gmail.com>
Authored: Sat Sep 17 00:46:24 2016 +0530
Committer: Venkata Ramana G <ramana.gollam...@huawei.com>
Committed: Sun Sep 18 02:59:46 2016 +0530

----------------------------------------------------------------------
 .../carbondata/spark/rdd/CarbonMergerRDD.scala  | 52 ++++++++++++++------
 .../apache/carbondata/spark/rdd/Compactor.scala |  4 +-
 .../execution/command/carbonTableSchema.scala   | 19 +++++--
 3 files changed, 55 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/072dfee4/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index 54d7539..32159c2 100644
--- 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -36,7 +36,7 @@ import 
org.apache.carbondata.core.carbon.datastore.block.{Distributable, Segment
 import org.apache.carbondata.core.carbon.metadata.blocklet.DataFileFooter
 import org.apache.carbondata.core.carbon.path.CarbonTablePath
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, 
CarbonUtilException}
 import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit}
 import 
org.apache.carbondata.integration.spark.merger.{CarbonCompactionExecutor, 
CarbonCompactionUtil, RowResultMerger}
 import org.apache.carbondata.processing.util.CarbonDataProcessorUtil
@@ -102,6 +102,12 @@ class CarbonMergerRDD[K, V](
         var dataloadStatus = CarbonCommonConstants.STORE_LOADSTATUS_FAILURE
         val carbonSparkPartition = theSplit.asInstanceOf[CarbonSparkPartition]
 
+        // get destination segment properties as sent from driver which is of 
last segment.
+
+        val segmentProperties = new 
SegmentProperties(carbonMergerMapping.maxSegmentColumnSchemaList
+          .asJava,
+          carbonMergerMapping.maxSegmentColCardinality)
+
         // sorting the table block info List.
         var tableBlockInfoList = carbonSparkPartition.tableBlockInfos
 
@@ -115,19 +121,6 @@ class CarbonMergerRDD[K, V](
 
         carbonLoadModel.setStorePath(hdfsStoreLocation)
 
-        // taking the last table block info for getting the segment properties.
-        val listMetadata = 
dataFileMetadataSegMapping.get(tableBlockInfoList.get
-        (tableBlockInfoList.size() - 1).getSegmentId()
-        )
-
-        val colCardinality: Array[Int] = listMetadata.get(listMetadata.size() 
- 1).getSegmentInfo
-          .getColumnCardinality
-
-        val segmentProperties = new SegmentProperties(
-          listMetadata.get(listMetadata.size() - 1).getColumnInTable,
-          colCardinality
-        )
-
         val exec = new CarbonCompactionExecutor(segmentMapping, 
segmentProperties, databaseName,
           factTableName, hdfsStoreLocation, 
carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable,
           dataFileMetadataSegMapping
@@ -170,7 +163,7 @@ class CarbonMergerRDD[K, V](
             segmentProperties,
             tempStoreLoc,
             carbonLoadModel,
-            colCardinality
+            carbonMergerMapping.maxSegmentColCardinality
           )
         mergeStatus = merger.mergerSlice()
 
@@ -238,6 +231,8 @@ class CarbonMergerRDD[K, V](
 
     val taskInfoList = new util.ArrayList[Distributable]
 
+    var blocksOfLastSegment: List[TableBlockInfo] = null
+
     // for each valid segment.
     for (eachSeg <- carbonMergerMapping.validSegments) {
 
@@ -259,6 +254,11 @@ class CarbonMergerRDD[K, V](
         )
       )
 
+      // keep on assigning till last one is reached.
+      if (null != blocksOfOneSegment && blocksOfOneSegment.size > 0) {
+        blocksOfLastSegment = blocksOfOneSegment.asJava
+      }
+
       // populate the task and its block mapping.
       blocksOfOneSegment.foreach(tableBlockInfo => {
         val taskNo = 
CarbonTablePath.DataFileUtil.getTaskNo(tableBlockInfo.getFilePath)
@@ -280,6 +280,28 @@ class CarbonMergerRDD[K, V](
           taskInfoList.add(new TableTaskInfo(entry._1, 
entry._2).asInstanceOf[Distributable])
       )
     }
+
+    // prepare the details required to extract the segment properties using 
last segment.
+    if (null != blocksOfLastSegment && blocksOfLastSegment.size > 0)
+    {
+      val lastBlockInfo = blocksOfLastSegment.get(blocksOfLastSegment.size - 1)
+
+      var dataFileFooter: DataFileFooter = null
+
+      try {
+        dataFileFooter = CarbonUtil.readMetadatFile(lastBlockInfo.getFilePath,
+          lastBlockInfo.getBlockOffset, lastBlockInfo.getBlockLength)
+      } catch {
+        case e: CarbonUtilException =>
+          logError("Exception in preparing the data file footer for compaction 
" + e.getMessage)
+          throw e
+      }
+
+      carbonMergerMapping.maxSegmentColCardinality = 
dataFileFooter.getSegmentInfo
+        .getColumnCardinality
+      carbonMergerMapping.maxSegmentColumnSchemaList = 
dataFileFooter.getColumnInTable.asScala
+        .toList
+    }
     // send complete list of blocks to the mapping util.
     nodeMapping = CarbonLoaderUtil.nodeBlockMapping(taskInfoList, -1)
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/072dfee4/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
index 5eebede..172ca16 100644
--- 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
+++ 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
@@ -69,7 +69,9 @@ object Compactor {
       schemaName,
       factTableName,
       validSegments,
-      
carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableId
+      
carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableId,
+      maxSegmentColCardinality = null,
+      maxSegmentColumnSchemaList = null
     )
     carbonLoadModel.setStorePath(carbonMergerMapping.hdfsStoreLocation)
     val segmentStatusManager = new SegmentStatusManager(new 
AbsoluteTableIdentifier

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/072dfee4/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 5fb1b42..eb9727e 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -143,10 +143,21 @@ case class Default(key: String, value: String)
 
 case class DataLoadTableFileMapping(table: String, loadPath: String)
 
-case class CarbonMergerMapping(storeLocation: String, hdfsStoreLocation: 
String,
-  partitioner: Partitioner, metadataFilePath: String, mergedLoadName: String,
-  kettleHomePath: String, tableCreationTime: Long, databaseName: String,
-  factTableName: String, validSegments: Array[String], tableId: String)
+case class CarbonMergerMapping(storeLocation: String,
+    hdfsStoreLocation: String,
+    partitioner: Partitioner,
+    metadataFilePath: String,
+    mergedLoadName: String,
+    kettleHomePath: String,
+    tableCreationTime: Long,
+    databaseName: String,
+    factTableName: String,
+    validSegments: Array[String],
+    tableId: String,
+    // maxSegmentColCardinality is Cardinality of last segment of compaction
+    var maxSegmentColCardinality: Array[Int],
+    // maxSegmentColumnSchemaList is list of column schema of last segment of 
compaction
+    var maxSegmentColumnSchemaList: List[ColumnSchema])
 
 case class NodeInfo(TaskId: String, noOfBlocks: Int)
 

Reply via email to