Repository: incubator-carbondata Updated Branches: refs/heads/branch-1.1 8e50b878f -> dbf76485f (forced update)
lazy rdd iterator Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/e52e6413 Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/e52e6413 Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/e52e6413 Branch: refs/heads/branch-1.1 Commit: e52e641372e298511ba0135054c97855153356dc Parents: d51387b Author: QiangCai <qiang...@qq.com> Authored: Fri Apr 7 19:38:19 2017 +0530 Committer: Venkata Ramana G <ramana.gollam...@huawei.com> Committed: Tue Apr 11 14:08:53 2017 +0530 ---------------------------------------------------------------------- .../spark/rdd/NewCarbonDataLoadRDD.scala | 100 +++++++++++++++---- 1 file changed, 81 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e52e6413/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala index 72ee90f..a6d231d 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala @@ -32,6 +32,7 @@ import org.apache.hadoop.mapreduce.{TaskAttemptID, TaskType} import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl import org.apache.spark.{Partition, SerializableWritable, SparkContext, SparkEnv, TaskContext} import org.apache.spark.rdd.{DataLoadCoalescedRDD, DataLoadPartitionWrap, RDD} +import org.apache.spark.serializer.SerializerInstance import org.apache.spark.sql.Row import org.apache.spark.util.SparkUtil @@ -408,19 +409,14 @@ class NewDataFrameLoaderRDD[K, V]( val recordReaders = mutable.Buffer[CarbonIterator[Array[AnyRef]]]() val partitionIterator = firstParent[DataLoadPartitionWrap[Row]].iterator(theSplit, context) val serializer = SparkEnv.get.closureSerializer.newInstance() - var serializeBuffer: ByteBuffer = null + var serializeBytes: Array[Byte] = null while(partitionIterator.hasNext) { val value = partitionIterator.next() - val newInstance = { - if (serializeBuffer == null) { - serializeBuffer = serializer.serialize[RDD[Row]](value.rdd) - } - serializeBuffer.rewind() - serializer.deserialize[RDD[Row]](serializeBuffer) + if (serializeBytes == null) { + serializeBytes = serializer.serialize[RDD[Row]](value.rdd).array() } - recordReaders += new NewRddIterator(newInstance.iterator(value.partition, context), - carbonLoadModel, - context) + recordReaders += new LazyRddIterator(serializer, serializeBytes, value.partition, + carbonLoadModel, context) } val loader = new SparkPartitionLoader(model, @@ -477,15 +473,16 @@ class NewRddIterator(rddIter: Iterator[Row], carbonLoadModel: CarbonLoadModel, context: TaskContext) extends CarbonIterator[Array[AnyRef]] { - val timeStampformatString = CarbonProperties.getInstance().getProperty(CarbonCommonConstants - .CARBON_TIMESTAMP_FORMAT, CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT) - val timeStampFormat = new SimpleDateFormat(timeStampformatString) - val dateFormatString = CarbonProperties.getInstance().getProperty(CarbonCommonConstants + private val timeStampformatString = CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, + CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT) + private val timeStampFormat = new SimpleDateFormat(timeStampformatString) + private val dateFormatString = CarbonProperties.getInstance().getProperty(CarbonCommonConstants .CARBON_DATE_FORMAT, CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT) - val dateFormat = new SimpleDateFormat(dateFormatString) - val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1 - val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2 - val serializationNullFormat = + private val dateFormat = new SimpleDateFormat(dateFormatString) + private val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1 + private val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2 + private val serializationNullFormat = carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1) def hasNext: Boolean = rddIter.hasNext @@ -499,8 +496,73 @@ class NewRddIterator(rddIter: Iterator[Row], columns } - override def initialize: Unit = { + override def initialize(): Unit = { SparkUtil.setTaskContext(context) } } + +/** + * LazyRddIterator invoke rdd.iterator method when invoking hasNext method. + * @param serializer + * @param serializeBytes + * @param partition + * @param carbonLoadModel + * @param context + */ +class LazyRddIterator(serializer: SerializerInstance, + serializeBytes: Array[Byte], + partition: Partition, + carbonLoadModel: CarbonLoadModel, + context: TaskContext) extends CarbonIterator[Array[AnyRef]] { + + private val timeStampformatString = CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, + CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT) + private val timeStampFormat = new SimpleDateFormat(timeStampformatString) + private val dateFormatString = CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, + CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT) + private val dateFormat = new SimpleDateFormat(dateFormatString) + private val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1 + private val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2 + private val serializationNullFormat = + carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1) + + private var rddIter: Iterator[Row] = null + private var uninitialized = true + private var closed = false + + def hasNext: Boolean = { + if (uninitialized) { + uninitialized = false + rddIter = serializer.deserialize[RDD[Row]](ByteBuffer.wrap(serializeBytes)) + .iterator(partition, context) + } + if (closed) { + false + } else { + rddIter.hasNext + } + } + + def next: Array[AnyRef] = { + val row = rddIter.next() + val columns = new Array[AnyRef](row.length) + for (i <- 0 until columns.length) { + columns(i) = CarbonScalaUtil.getString(row.get(i), serializationNullFormat, + delimiterLevel1, delimiterLevel2, timeStampFormat, dateFormat) + } + columns + } + + override def initialize(): Unit = { + SparkUtil.setTaskContext(context) + } + + override def close(): Unit = { + closed = true + rddIter = null + } + +}