Repository: incubator-carbondata
Updated Branches:
  refs/heads/branch-1.1 8e50b878f -> dbf76485f (forced update)


lazy rdd iterator


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/e52e6413
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/e52e6413
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/e52e6413

Branch: refs/heads/branch-1.1
Commit: e52e641372e298511ba0135054c97855153356dc
Parents: d51387b
Author: QiangCai <qiang...@qq.com>
Authored: Fri Apr 7 19:38:19 2017 +0530
Committer: Venkata Ramana G <ramana.gollam...@huawei.com>
Committed: Tue Apr 11 14:08:53 2017 +0530

----------------------------------------------------------------------
 .../spark/rdd/NewCarbonDataLoadRDD.scala        | 100 +++++++++++++++----
 1 file changed, 81 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e52e6413/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
index 72ee90f..a6d231d 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -32,6 +32,7 @@ import org.apache.hadoop.mapreduce.{TaskAttemptID, TaskType}
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
 import org.apache.spark.{Partition, SerializableWritable, SparkContext, 
SparkEnv, TaskContext}
 import org.apache.spark.rdd.{DataLoadCoalescedRDD, DataLoadPartitionWrap, RDD}
+import org.apache.spark.serializer.SerializerInstance
 import org.apache.spark.sql.Row
 import org.apache.spark.util.SparkUtil
 
@@ -408,19 +409,14 @@ class NewDataFrameLoaderRDD[K, V](
         val recordReaders = mutable.Buffer[CarbonIterator[Array[AnyRef]]]()
         val partitionIterator = 
firstParent[DataLoadPartitionWrap[Row]].iterator(theSplit, context)
         val serializer = SparkEnv.get.closureSerializer.newInstance()
-        var serializeBuffer: ByteBuffer = null
+        var serializeBytes: Array[Byte] = null
         while(partitionIterator.hasNext) {
           val value = partitionIterator.next()
-          val newInstance = {
-            if (serializeBuffer == null) {
-              serializeBuffer = serializer.serialize[RDD[Row]](value.rdd)
-            }
-            serializeBuffer.rewind()
-            serializer.deserialize[RDD[Row]](serializeBuffer)
+          if (serializeBytes == null) {
+            serializeBytes = serializer.serialize[RDD[Row]](value.rdd).array()
           }
-          recordReaders += new 
NewRddIterator(newInstance.iterator(value.partition, context),
-              carbonLoadModel,
-              context)
+          recordReaders += new LazyRddIterator(serializer, serializeBytes, 
value.partition,
+              carbonLoadModel, context)
         }
 
         val loader = new SparkPartitionLoader(model,
@@ -477,15 +473,16 @@ class NewRddIterator(rddIter: Iterator[Row],
     carbonLoadModel: CarbonLoadModel,
     context: TaskContext) extends CarbonIterator[Array[AnyRef]] {
 
-  val timeStampformatString = 
CarbonProperties.getInstance().getProperty(CarbonCommonConstants
-    .CARBON_TIMESTAMP_FORMAT, 
CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
-  val timeStampFormat = new SimpleDateFormat(timeStampformatString)
-  val dateFormatString = 
CarbonProperties.getInstance().getProperty(CarbonCommonConstants
+  private val timeStampformatString = CarbonProperties.getInstance()
+    .getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+      CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
+  private val timeStampFormat = new SimpleDateFormat(timeStampformatString)
+  private val dateFormatString = 
CarbonProperties.getInstance().getProperty(CarbonCommonConstants
     .CARBON_DATE_FORMAT, CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)
-  val dateFormat = new SimpleDateFormat(dateFormatString)
-  val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1
-  val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2
-  val serializationNullFormat =
+  private val dateFormat = new SimpleDateFormat(dateFormatString)
+  private val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1
+  private val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2
+  private val serializationNullFormat =
     
carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 
2)(1)
   def hasNext: Boolean = rddIter.hasNext
 
@@ -499,8 +496,73 @@ class NewRddIterator(rddIter: Iterator[Row],
     columns
   }
 
-  override def initialize: Unit = {
+  override def initialize(): Unit = {
     SparkUtil.setTaskContext(context)
   }
 
 }
+
+/**
+ * LazyRddIterator invoke rdd.iterator method when invoking hasNext method.
+ * @param serializer
+ * @param serializeBytes
+ * @param partition
+ * @param carbonLoadModel
+ * @param context
+ */
+class LazyRddIterator(serializer: SerializerInstance,
+    serializeBytes: Array[Byte],
+    partition: Partition,
+    carbonLoadModel: CarbonLoadModel,
+    context: TaskContext) extends CarbonIterator[Array[AnyRef]] {
+
+  private val timeStampformatString = CarbonProperties.getInstance()
+    .getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+      CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
+  private val timeStampFormat = new SimpleDateFormat(timeStampformatString)
+  private val dateFormatString = CarbonProperties.getInstance()
+    .getProperty(CarbonCommonConstants.CARBON_DATE_FORMAT,
+      CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)
+  private val dateFormat = new SimpleDateFormat(dateFormatString)
+  private val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1
+  private val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2
+  private val serializationNullFormat =
+    
carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 
2)(1)
+
+  private var rddIter: Iterator[Row] = null
+  private var uninitialized = true
+  private var closed = false
+
+  def hasNext: Boolean = {
+    if (uninitialized) {
+      uninitialized = false
+      rddIter = 
serializer.deserialize[RDD[Row]](ByteBuffer.wrap(serializeBytes))
+        .iterator(partition, context)
+    }
+    if (closed) {
+      false
+    } else {
+      rddIter.hasNext
+    }
+  }
+
+  def next: Array[AnyRef] = {
+    val row = rddIter.next()
+    val columns = new Array[AnyRef](row.length)
+    for (i <- 0 until columns.length) {
+      columns(i) = CarbonScalaUtil.getString(row.get(i), 
serializationNullFormat,
+        delimiterLevel1, delimiterLevel2, timeStampFormat, dateFormat)
+    }
+    columns
+  }
+
+  override def initialize(): Unit = {
+    SparkUtil.setTaskContext(context)
+  }
+
+  override def close(): Unit = {
+    closed = true
+    rddIter = null
+  }
+
+}

Reply via email to