http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
new file mode 100644
index 0000000..bf5f660
--- /dev/null
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
@@ -0,0 +1,435 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.rdd
+
+import java.text.SimpleDateFormat
+import java.util
+import java.util.Date
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.mapreduce.{Job, TaskAttemptID, TaskType}
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+import org.apache.spark.{Partition, SerializableWritable, SparkContext, 
TaskContext}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.datamap.Segment
+import org.apache.carbondata.core.datastore.block.SegmentProperties
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage}
+import org.apache.carbondata.core.metadata.CarbonMetadata
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.scan.result.iterator.RawResultIterator
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, 
SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.util.CarbonUtil
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.events.{OperationContext, OperationListenerBus}
+import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonProjection}
+import org.apache.carbondata.hadoop.api.{CarbonInputFormat, 
CarbonTableInputFormat}
+import 
org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePostStatusUpdateEvent,
 LoadTablePreStatusUpdateEvent}
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.processing.merger.{CompactionResultSortProcessor, 
CompactionType}
+import org.apache.carbondata.processing.util.CarbonLoaderUtil
+import org.apache.carbondata.spark.{HandoffResult, HandoffResultImpl}
+import org.apache.carbondata.spark.util.CommonUtil
+import org.apache.carbondata.streaming.{CarbonStreamInputFormat, 
CarbonStreamRecordReader}
+
+
+/**
+ * partition of the handoff segment
+ */
+class HandoffPartition(
+    val rddId: Int,
+    val idx: Int,
+    @transient val inputSplit: CarbonInputSplit
+) extends Partition {
+
+  val split = new SerializableWritable[CarbonInputSplit](inputSplit)
+
+  override val index: Int = idx
+
+  override def hashCode(): Int = 41 * (41 + rddId) + idx
+}
+
+/**
+ * package the record reader of the handoff segment to RawResultIterator
+ */
+class StreamingRawResultIterator(
+    recordReader: CarbonStreamRecordReader
+) extends RawResultIterator(null, null, null) {
+
+  override def hasNext: Boolean = {
+    recordReader.nextKeyValue()
+  }
+
+  override def next(): Array[Object] = {
+    val rowTmp = recordReader
+      .getCurrentValue
+      .asInstanceOf[GenericInternalRow]
+      .values
+      .asInstanceOf[Array[Object]]
+    val row = new Array[Object](rowTmp.length)
+    System.arraycopy(rowTmp, 0, row, 0, rowTmp.length)
+    row
+  }
+}
+
+/**
+ * execute streaming segment handoff
+ */
+class StreamHandoffRDD[K, V](
+    sc: SparkContext,
+    result: HandoffResult[K, V],
+    carbonLoadModel: CarbonLoadModel,
+    handOffSegmentId: String
+) extends CarbonRDD[(K, V)](sc, Nil, sc.hadoopConfiguration) {
+
+  private val jobTrackerId: String = {
+    val formatter = new SimpleDateFormat("yyyyMMddHHmm")
+    formatter.format(new Date())
+  }
+
+  override def internalCompute(
+      split: Partition,
+      context: TaskContext
+  ): Iterator[(K, V)] = {
+    carbonLoadModel.setTaskNo("" + split.index)
+    val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+    CarbonMetadata.getInstance().addCarbonTable(carbonTable)
+    // the input iterator is using raw row
+    val iteratorList = prepareInputIterator(split, carbonTable)
+
+    CommonUtil.setTempStoreLocation(split.index, carbonLoadModel, true, false)
+    // use CompactionResultSortProcessor to sort data dan write to columnar 
files
+    val processor = prepareHandoffProcessor(carbonTable)
+    val status = processor.execute(iteratorList)
+
+    new Iterator[(K, V)] {
+      private var finished = false
+
+      override def hasNext: Boolean = {
+        !finished
+      }
+
+      override def next(): (K, V) = {
+        finished = true
+        result.getKey("" + split.index, status)
+      }
+    }
+  }
+
+  /**
+   * prepare input iterator by basing CarbonStreamRecordReader
+   */
+  private def prepareInputIterator(
+      split: Partition,
+      carbonTable: CarbonTable
+  ): util.ArrayList[RawResultIterator] = {
+    val inputSplit = split.asInstanceOf[HandoffPartition].split.value
+    val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, 
split.index, 0)
+    val hadoopConf = new Configuration()
+    CarbonInputFormat.setDatabaseName(hadoopConf, carbonTable.getDatabaseName)
+    CarbonInputFormat.setTableName(hadoopConf, carbonTable.getTableName)
+    CarbonInputFormat.setTablePath(hadoopConf, carbonTable.getTablePath)
+    val projection = new CarbonProjection
+    val dataFields = 
carbonTable.getStreamStorageOrderColumn(carbonTable.getTableName)
+    (0 until dataFields.size()).foreach { index =>
+      projection.addColumn(dataFields.get(index).getColName)
+    }
+    CarbonInputFormat.setColumnProjection(hadoopConf, projection)
+    CarbonInputFormat.setTableInfo(hadoopConf, carbonTable.getTableInfo)
+    val attemptContext = new TaskAttemptContextImpl(hadoopConf, attemptId)
+    val format = new CarbonTableInputFormat[Array[Object]]()
+    val model = format.createQueryModel(inputSplit, attemptContext)
+    val inputFormat = new CarbonStreamInputFormat
+    val streamReader = inputFormat.createRecordReader(inputSplit, 
attemptContext)
+      .asInstanceOf[CarbonStreamRecordReader]
+    streamReader.setVectorReader(false)
+    streamReader.setQueryModel(model)
+    streamReader.setUseRawRow(true)
+    streamReader.initialize(inputSplit, attemptContext)
+    val iteratorList = new util.ArrayList[RawResultIterator](1)
+    iteratorList.add(new StreamingRawResultIterator(streamReader))
+    iteratorList
+  }
+
+  private def prepareHandoffProcessor(
+      carbonTable: CarbonTable
+  ): CompactionResultSortProcessor = {
+    val wrapperColumnSchemaList = CarbonUtil.getColumnSchemaList(
+      carbonTable.getDimensionByTableName(carbonTable.getTableName),
+      carbonTable.getMeasureByTableName(carbonTable.getTableName))
+    val dimLensWithComplex =
+      (0 until wrapperColumnSchemaList.size()).map(_ => 
Integer.MAX_VALUE).toArray
+    val dictionaryColumnCardinality =
+      CarbonUtil.getFormattedCardinality(dimLensWithComplex, 
wrapperColumnSchemaList)
+    val segmentProperties =
+      new SegmentProperties(wrapperColumnSchemaList, 
dictionaryColumnCardinality)
+
+    new CompactionResultSortProcessor(
+      carbonLoadModel,
+      carbonTable,
+      segmentProperties,
+      CompactionType.STREAMING,
+      carbonTable.getTableName,
+      null
+    )
+  }
+
+  /**
+   * get the partitions of the handoff segment
+   */
+  override protected def getPartitions: Array[Partition] = {
+    val job = Job.getInstance(FileFactory.getConfiguration)
+    val inputFormat = new CarbonTableInputFormat[Array[Object]]()
+    val segmentList = new util.ArrayList[Segment](1)
+    segmentList.add(Segment.toSegment(handOffSegmentId))
+    val splits = inputFormat.getSplitsOfStreaming(
+      job,
+      
carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.getAbsoluteTableIdentifier,
+      segmentList
+    )
+
+    (0 until splits.size()).map { index =>
+      new HandoffPartition(id, index, 
splits.get(index).asInstanceOf[CarbonInputSplit])
+    }.toArray[Partition]
+  }
+}
+
+object StreamHandoffRDD {
+
+  private val LOGGER = 
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  def iterateStreamingHandoff(
+      carbonLoadModel: CarbonLoadModel,
+      sparkSession: SparkSession
+  ): Unit = {
+    val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+    val identifier = carbonTable.getAbsoluteTableIdentifier
+    var continueHandoff = false
+    // require handoff lock on table
+    val lock = CarbonLockFactory.getCarbonLockObj(identifier, 
LockUsage.HANDOFF_LOCK)
+    try {
+      if (lock.lockWithRetries()) {
+        LOGGER.info("Acquired the handoff lock for table" +
+                    s" ${ carbonTable.getDatabaseName }.${ 
carbonTable.getTableName }")
+        // handoff streaming segment one by one
+        do {
+          val segmentStatusManager = new SegmentStatusManager(identifier)
+          var loadMetadataDetails: Array[LoadMetadataDetails] = null
+          // lock table to read table status file
+          val statusLock = segmentStatusManager.getTableStatusLock
+          try {
+            if (statusLock.lockWithRetries()) {
+              loadMetadataDetails = SegmentStatusManager.readLoadMetadata(
+                CarbonTablePath.getMetadataPath(identifier.getTablePath))
+            }
+          } finally {
+            if (null != statusLock) {
+              statusLock.unlock()
+            }
+          }
+          if (null != loadMetadataDetails) {
+            val streamSegments =
+              loadMetadataDetails.filter(_.getSegmentStatus == 
SegmentStatus.STREAMING_FINISH)
+
+            continueHandoff = streamSegments.length > 0
+            if (continueHandoff) {
+              // handoff a streaming segment
+              val loadMetadataDetail = streamSegments(0)
+              executeStreamingHandoff(
+                carbonLoadModel,
+                sparkSession,
+                loadMetadataDetail.getLoadName
+              )
+            }
+          } else {
+            continueHandoff = false
+          }
+        } while (continueHandoff)
+      }
+    } finally {
+      if (null != lock) {
+        lock.unlock()
+      }
+    }
+  }
+
+  /**
+   * start new thread to execute stream segment handoff
+   */
+  def startStreamingHandoffThread(
+      carbonLoadModel: CarbonLoadModel,
+      sparkSession: SparkSession,
+      isDDL: Boolean
+  ): Unit = {
+    if (isDDL) {
+      iterateStreamingHandoff(carbonLoadModel, sparkSession)
+    } else {
+      // start a new thread to execute streaming segment handoff
+      val handoffThread = new Thread() {
+        override def run(): Unit = {
+          iterateStreamingHandoff(carbonLoadModel, sparkSession)
+        }
+      }
+      handoffThread.start()
+    }
+  }
+
+  /**
+   * invoke StreamHandoffRDD to handoff a streaming segment to a columnar 
segment
+   */
+  def executeStreamingHandoff(
+      carbonLoadModel: CarbonLoadModel,
+      sparkSession: SparkSession,
+      handoffSegmenId: String
+  ): Unit = {
+    var loadStatus = SegmentStatus.SUCCESS
+    var errorMessage: String = "Handoff failure"
+    try {
+      // generate new columnar segment
+      val newMetaEntry = new LoadMetadataDetails
+      carbonLoadModel.setFactTimeStamp(System.currentTimeMillis())
+      CarbonLoaderUtil.populateNewLoadMetaEntry(
+        newMetaEntry,
+        SegmentStatus.INSERT_IN_PROGRESS,
+        carbonLoadModel.getFactTimeStamp,
+        false)
+      val operationContext = new OperationContext()
+      val loadTablePreStatusUpdateEvent: LoadTablePreStatusUpdateEvent =
+        new LoadTablePreStatusUpdateEvent(
+          
carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.getCarbonTableIdentifier,
+          carbonLoadModel)
+      
OperationListenerBus.getInstance().fireEvent(loadTablePreStatusUpdateEvent, 
operationContext)
+
+      CarbonLoaderUtil.recordNewLoadMetadata(newMetaEntry, carbonLoadModel, 
true, false)
+      val loadTablePostStatusUpdateEvent: LoadTablePostStatusUpdateEvent =
+        new LoadTablePostStatusUpdateEvent(carbonLoadModel)
+      OperationListenerBus.getInstance()
+        .fireEvent(loadTablePostStatusUpdateEvent, operationContext)
+      // convert a streaming segment to columnar segment
+      val status = new StreamHandoffRDD(
+        sparkSession.sparkContext,
+        new HandoffResultImpl(),
+        carbonLoadModel,
+        handoffSegmenId).collect()
+
+      status.foreach { x =>
+        if (!x._2) {
+          loadStatus = SegmentStatus.LOAD_FAILURE
+        }
+      }
+    } catch {
+      case ex: Exception =>
+        loadStatus = SegmentStatus.LOAD_FAILURE
+        errorMessage = errorMessage + ": " + ex.getCause.getMessage
+        LOGGER.error(errorMessage)
+        LOGGER.error(ex, s"Handoff failed on streaming segment 
$handoffSegmenId")
+    }
+
+    if (loadStatus == SegmentStatus.LOAD_FAILURE) {
+      CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel)
+      LOGGER.info("********starting clean up**********")
+      CarbonLoaderUtil.deleteSegment(carbonLoadModel, 
carbonLoadModel.getSegmentId.toInt)
+      LOGGER.info("********clean up done**********")
+      LOGGER.audit(s"Handoff is failed for " +
+                   s"${ carbonLoadModel.getDatabaseName }.${ 
carbonLoadModel.getTableName }")
+      LOGGER.error("Cannot write load metadata file as handoff failed")
+      throw new Exception(errorMessage)
+    }
+
+    if (loadStatus == SegmentStatus.SUCCESS) {
+      val done = updateLoadMetadata(handoffSegmenId, carbonLoadModel)
+      if (!done) {
+        val errorMessage = "Handoff failed due to failure in table status 
updation."
+        LOGGER.audit("Handoff is failed for " +
+                     s"${ carbonLoadModel.getDatabaseName }.${ 
carbonLoadModel.getTableName }")
+        LOGGER.error("Handoff failed due to failure in table status updation.")
+        throw new Exception(errorMessage)
+      }
+      done
+    }
+
+  }
+
+  /**
+   * update streaming segment and new columnar segment
+   */
+  private def updateLoadMetadata(
+      handoffSegmentId: String,
+      loadModel: CarbonLoadModel
+  ): Boolean = {
+    var status = false
+    val metaDataFilepath = 
loadModel.getCarbonDataLoadSchema.getCarbonTable.getMetadataPath
+    val identifier = 
loadModel.getCarbonDataLoadSchema.getCarbonTable.getAbsoluteTableIdentifier
+    val metadataPath = CarbonTablePath.getMetadataPath(identifier.getTablePath)
+    val fileType = FileFactory.getFileType(metadataPath)
+    if (!FileFactory.isFileExist(metadataPath, fileType)) {
+      FileFactory.mkdirs(metadataPath, fileType)
+    }
+    val tableStatusPath = 
CarbonTablePath.getTableStatusFilePath(identifier.getTablePath)
+    val segmentStatusManager = new SegmentStatusManager(identifier)
+    val carbonLock = segmentStatusManager.getTableStatusLock
+    try {
+      if (carbonLock.lockWithRetries()) {
+        LOGGER.info(
+          "Acquired lock for table" + loadModel.getDatabaseName() + "." + 
loadModel.getTableName()
+          + " for table status updation")
+        val listOfLoadFolderDetailsArray =
+          SegmentStatusManager.readLoadMetadata(metaDataFilepath)
+
+        // update new columnar segment to success status
+        val newSegment =
+          
listOfLoadFolderDetailsArray.find(_.getLoadName.equals(loadModel.getSegmentId))
+        if (newSegment.isEmpty) {
+          throw new Exception("Failed to update table status for new segment")
+        } else {
+          newSegment.get.setSegmentStatus(SegmentStatus.SUCCESS)
+          newSegment.get.setLoadEndTime(System.currentTimeMillis())
+        }
+
+        // update streaming segment to compacted status
+        val streamSegment =
+          
listOfLoadFolderDetailsArray.find(_.getLoadName.equals(handoffSegmentId))
+        if (streamSegment.isEmpty) {
+          throw new Exception("Failed to update table status for streaming 
segment")
+        } else {
+          streamSegment.get.setSegmentStatus(SegmentStatus.COMPACTED)
+          streamSegment.get.setMergedLoadName(loadModel.getSegmentId)
+        }
+
+        // refresh table status file
+        SegmentStatusManager.writeLoadDetailsIntoFile(tableStatusPath, 
listOfLoadFolderDetailsArray)
+        status = true
+      } else {
+        LOGGER.error("Not able to acquire the lock for Table status updation 
for table " + loadModel
+          .getDatabaseName() + "." + loadModel.getTableName())
+      }
+    } finally {
+      if (carbonLock.unlock()) {
+        LOGGER.info("Table unlocked successfully after table status updation" +
+                    loadModel.getDatabaseName() + "." + 
loadModel.getTableName())
+      } else {
+        LOGGER.error("Unable to unlock Table lock for table" + 
loadModel.getDatabaseName() +
+                     "." + loadModel.getTableName() + " during table status 
updation")
+      }
+    }
+    status
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
index 3250a53..5f55ef3 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
@@ -47,6 +47,7 @@ import 
org.apache.carbondata.processing.exception.DataLoadingException
 import org.apache.carbondata.processing.loading.FailureCauses
 import 
org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException
 import org.apache.carbondata.processing.util.CarbonDataProcessorUtil
+import org.apache.carbondata.streaming.parser.FieldConverter
 
 object CarbonScalaUtil {
 
@@ -121,55 +122,8 @@ object CarbonScalaUtil {
       timeStampFormat: SimpleDateFormat,
       dateFormat: SimpleDateFormat,
       level: Int = 1): String = {
-    if (value == null) {
-      serializationNullFormat
-    } else {
-      value match {
-        case s: String => if (s.length > 
CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT) {
-          throw new Exception("Dataload failed, String length cannot exceed " +
-                              
CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT + " characters")
-        } else {
-          s
-        }
-        case d: java.math.BigDecimal => d.toPlainString
-        case i: java.lang.Integer => i.toString
-        case d: java.lang.Double => d.toString
-        case t: java.sql.Timestamp => timeStampFormat format t
-        case d: java.sql.Date => dateFormat format d
-        case b: java.lang.Boolean => b.toString
-        case s: java.lang.Short => s.toString
-        case f: java.lang.Float => f.toString
-        case bs: Array[Byte] => new String(bs,
-          Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET))
-        case s: scala.collection.Seq[Any] =>
-          val delimiter = if (level == 1) {
-            delimiterLevel1
-          } else {
-            delimiterLevel2
-          }
-          val builder = new StringBuilder()
-          s.foreach { x =>
-            builder.append(getString(x, serializationNullFormat, 
delimiterLevel1,
-                delimiterLevel2, timeStampFormat, dateFormat, level + 
1)).append(delimiter)
-          }
-          builder.substring(0, builder.length - delimiter.length())
-        case m: scala.collection.Map[Any, Any] =>
-          throw new Exception("Unsupported data type: Map")
-        case r: org.apache.spark.sql.Row =>
-          val delimiter = if (level == 1) {
-            delimiterLevel1
-          } else {
-            delimiterLevel2
-          }
-          val builder = new StringBuilder()
-          for (i <- 0 until r.length) {
-            builder.append(getString(r(i), serializationNullFormat, 
delimiterLevel1,
-                delimiterLevel2, timeStampFormat, dateFormat, level + 
1)).append(delimiter)
-          }
-          builder.substring(0, builder.length - delimiter.length())
-        case other => other.toString
-      }
-    }
+    FieldConverter.objectToString(value, serializationNullFormat, 
delimiterLevel1,
+      delimiterLevel2, timeStampFormat, dateFormat, level)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/CarbonSparkStreamingListener.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/CarbonSparkStreamingListener.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/CarbonSparkStreamingListener.scala
new file mode 100644
index 0000000..a99a1e8
--- /dev/null
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/CarbonSparkStreamingListener.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.streaming
+
+import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
+
+class CarbonSparkStreamingListener extends SparkListener {
+
+  /**
+   * When Spark Streaming App stops, remove all locks for stream table.
+   */
+  override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): 
Unit = {
+    CarbonStreamSparkStreaming.cleanAllLockAfterStop()
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/CarbonStreamSparkStreaming.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/CarbonStreamSparkStreaming.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/CarbonStreamSparkStreaming.scala
new file mode 100644
index 0000000..28f04b1
--- /dev/null
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/CarbonStreamSparkStreaming.scala
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.streaming
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
+import org.apache.spark.sql.execution.streaming.{CarbonAppendableStreamSink, 
Sink}
+import org.apache.spark.streaming.Time
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock, 
LockUsage}
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+
+/**
+ * Interface used to write stream data to stream table
+ * when integrate with Spark Streaming.
+ *
+ * NOTE: Current integration with Spark Streaming is an alpha feature.
+ */
+class CarbonStreamSparkStreamingWriter(val sparkSession: SparkSession,
+    val carbonTable: CarbonTable,
+    val configuration: Configuration) {
+
+  private val LOGGER = 
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  private var isInitialize: Boolean = false
+
+  private var lock: ICarbonLock = null
+  private var carbonAppendableStreamSink: Sink = null
+
+  /**
+   * Acquired the lock for stream table
+   */
+  def lockStreamTable(): Unit = {
+    lock = 
CarbonLockFactory.getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
+      LockUsage.STREAMING_LOCK)
+    if (lock.lockWithRetries()) {
+      LOGGER.info("Acquired the lock for stream table: " +
+                  carbonTable.getDatabaseName + "." +
+                  carbonTable.getTableName)
+    } else {
+      LOGGER.error("Not able to acquire the lock for stream table:" +
+                   carbonTable.getDatabaseName + "." + 
carbonTable.getTableName)
+      throw new InterruptedException(
+        "Not able to acquire the lock for stream table: " + 
carbonTable.getDatabaseName + "." +
+        carbonTable.getTableName)
+    }
+  }
+
+  /**
+   * unlock for stream table
+   */
+  def unLockStreamTable(): Unit = {
+    if (null != lock) {
+      lock.unlock()
+      LOGGER.info("unlock for stream table: " +
+                  carbonTable.getDatabaseName + "." +
+                  carbonTable.getTableName)
+    }
+  }
+
+  def initialize(): Unit = {
+    carbonAppendableStreamSink = StreamSinkFactory.createStreamTableSink(
+      sparkSession,
+      configuration,
+      carbonTable,
+      extraOptions.toMap).asInstanceOf[CarbonAppendableStreamSink]
+
+    lockStreamTable()
+
+    isInitialize = true
+  }
+
+  def writeStreamData(dataFrame: DataFrame, time: Time): Unit = {
+    if (!isInitialize) {
+      initialize()
+    }
+    carbonAppendableStreamSink.addBatch(time.milliseconds, dataFrame)
+  }
+
+  private val extraOptions = new scala.collection.mutable.HashMap[String, 
String]
+  private var mode: SaveMode = SaveMode.ErrorIfExists
+
+  this.option("dbName", carbonTable.getDatabaseName)
+  this.option("tableName", carbonTable.getTableName)
+
+  /**
+   * Specifies the behavior when data or table already exists. Options include:
+   *   - `SaveMode.Overwrite`: overwrite the existing data.
+   *   - `SaveMode.Append`: append the data.
+   *   - `SaveMode.Ignore`: ignore the operation (i.e. no-op).
+   *   - `SaveMode.ErrorIfExists`: default option, throw an exception at 
runtime.
+   */
+  def mode(saveMode: SaveMode): CarbonStreamSparkStreamingWriter = {
+    if (mode == SaveMode.ErrorIfExists) {
+      mode = saveMode
+    }
+    this
+  }
+
+  /**
+   * Specifies the behavior when data or table already exists. Options include:
+   *   - `overwrite`: overwrite the existing data.
+   *   - `append`: append the data.
+   *   - `ignore`: ignore the operation (i.e. no-op).
+   *   - `error or default`: default option, throw an exception at runtime.
+   */
+  def mode(saveMode: String): CarbonStreamSparkStreamingWriter = {
+    if (mode == SaveMode.ErrorIfExists) {
+      mode = saveMode.toLowerCase(util.Locale.ROOT) match {
+        case "overwrite" => SaveMode.Overwrite
+        case "append" => SaveMode.Append
+        case "ignore" => SaveMode.Ignore
+        case "error" | "default" => SaveMode.ErrorIfExists
+        case _ => throw new IllegalArgumentException(s"Unknown save mode: 
$saveMode. " +
+          "Accepted save modes are 'overwrite', 'append', 'ignore', 'error'.")
+      }
+    }
+    this
+  }
+
+  /**
+   * Adds an output option
+   */
+  def option(key: String, value: String): CarbonStreamSparkStreamingWriter = {
+    if (!extraOptions.contains(key)) {
+      extraOptions += (key -> value)
+    }
+    this
+  }
+
+  /**
+   * Adds an output option
+   */
+  def option(key: String, value: Boolean): CarbonStreamSparkStreamingWriter =
+    option(key, value.toString)
+
+  /**
+   * Adds an output option
+   */
+  def option(key: String, value: Long): CarbonStreamSparkStreamingWriter =
+    option(key, value.toString)
+
+  /**
+   * Adds an output option
+   */
+  def option(key: String, value: Double): CarbonStreamSparkStreamingWriter =
+    option(key, value.toString)
+}
+
+object CarbonStreamSparkStreaming {
+
+  @transient private val tableMap =
+    new util.HashMap[String, CarbonStreamSparkStreamingWriter]()
+
+  def getTableMap: util.Map[String, CarbonStreamSparkStreamingWriter] = 
tableMap
+
+  /**
+   * remove all stream lock.
+   */
+  def cleanAllLockAfterStop(): Unit = {
+    tableMap.asScala.values.foreach { writer => writer.unLockStreamTable() }
+    tableMap.clear()
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/CarbonStreamingQueryListener.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/CarbonStreamingQueryListener.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/CarbonStreamingQueryListener.scala
new file mode 100644
index 0000000..6d83fad
--- /dev/null
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/CarbonStreamingQueryListener.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.streaming
+
+import java.util
+import java.util.UUID
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.execution.streaming.{CarbonAppendableStreamSink, 
StreamExecution}
+import org.apache.spark.sql.streaming.StreamingQueryListener
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock, 
LockUsage}
+
+class CarbonStreamingQueryListener(spark: SparkSession) extends 
StreamingQueryListener {
+
+  private val LOGGER = 
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  private val cache = new util.HashMap[UUID, ICarbonLock]()
+
+  override def onQueryStarted(event: 
StreamingQueryListener.QueryStartedEvent): Unit = {
+    val streamQuery = spark.streams.get(event.id)
+    val qry = if (streamQuery.isInstanceOf[StreamExecution]) {
+      // adapt spark 2.1
+      streamQuery.asInstanceOf[StreamExecution]
+    } else {
+      // adapt spark 2.2 and later version
+      val clazz = 
Class.forName("org.apache.spark.sql.execution.streaming.StreamingQueryWrapper")
+      val method = clazz.getMethod("streamingQuery")
+      method.invoke(streamQuery).asInstanceOf[StreamExecution]
+    }
+    if (qry.sink.isInstanceOf[CarbonAppendableStreamSink]) {
+      LOGGER.info("Carbon streaming query started: " + event.id)
+      val sink = qry.sink.asInstanceOf[CarbonAppendableStreamSink]
+      val carbonTable = sink.carbonTable
+      val lock = 
CarbonLockFactory.getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
+        LockUsage.STREAMING_LOCK)
+      if (lock.lockWithRetries()) {
+        LOGGER.info("Acquired the lock for stream table: " + 
carbonTable.getDatabaseName + "." +
+                    carbonTable.getTableName)
+        cache.put(event.id, lock)
+      } else {
+        LOGGER.error("Not able to acquire the lock for stream table:" +
+                     carbonTable.getDatabaseName + "." + 
carbonTable.getTableName)
+        throw new InterruptedException(
+          "Not able to acquire the lock for stream table: " + 
carbonTable.getDatabaseName + "." +
+          carbonTable.getTableName)
+      }
+    }
+  }
+
+  override def onQueryProgress(event: 
StreamingQueryListener.QueryProgressEvent): Unit = {
+  }
+
+  override def onQueryTerminated(event: 
StreamingQueryListener.QueryTerminatedEvent): Unit = {
+    val lock = cache.remove(event.id)
+    if (null != lock) {
+      LOGGER.info("Carbon streaming query: " + event.id)
+      lock.unlock()
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
new file mode 100644
index 0000000..bc7b042
--- /dev/null
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.streaming
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.execution.streaming.{CarbonAppendableStreamSink, 
Sink}
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.dictionary.server.{DictionaryServer, 
NonSecureDictionaryServer}
+import 
org.apache.carbondata.core.dictionary.service.NonSecureDictionaryServiceProvider
+import org.apache.carbondata.core.metadata.encoder.Encoding
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.events.{OperationContext, OperationListenerBus}
+import 
org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePostExecutionEvent,
 LoadTablePreExecutionEvent}
+import org.apache.carbondata.processing.loading.model.{CarbonLoadModel, 
CarbonLoadModelBuilder, LoadOption}
+import 
org.apache.carbondata.spark.dictionary.provider.SecureDictionaryServiceProvider
+import org.apache.carbondata.spark.dictionary.server.SecureDictionaryServer
+import org.apache.carbondata.streaming.segment.StreamSegment
+
+/**
+ * Stream sink factory
+ */
+object StreamSinkFactory {
+
+  def createStreamTableSink(
+      sparkSession: SparkSession,
+      hadoopConf: Configuration,
+      carbonTable: CarbonTable,
+      parameters: Map[String, String]): Sink = {
+    validateParameters(parameters)
+
+    // build load model
+    val carbonLoadModel = buildCarbonLoadModelForStream(
+      sparkSession,
+      hadoopConf,
+      carbonTable,
+      parameters,
+      "")
+    // fire pre event before streamin is started
+    // in case of streaming options and optionsFinal can be same
+    val operationContext = new OperationContext
+    val loadTablePreExecutionEvent = new LoadTablePreExecutionEvent(
+      carbonTable.getCarbonTableIdentifier,
+      carbonLoadModel,
+      carbonLoadModel.getFactFilePath,
+      false,
+      parameters.asJava,
+      parameters.asJava,
+      false
+    )
+    OperationListenerBus.getInstance().fireEvent(loadTablePreExecutionEvent, 
operationContext)
+    // prepare the stream segment
+    val segmentId = getStreamSegmentId(carbonTable)
+    carbonLoadModel.setSegmentId(segmentId)
+
+    // start server if necessary
+    val server = startDictionaryServer(
+      sparkSession,
+      carbonTable,
+      carbonLoadModel)
+    if (server.isDefined) {
+      carbonLoadModel.setUseOnePass(true)
+    } else {
+      carbonLoadModel.setUseOnePass(false)
+    }
+    // default is carbon appended stream sink
+    val carbonAppendableStreamSink = new CarbonAppendableStreamSink(
+      sparkSession,
+      carbonTable,
+      segmentId,
+      parameters,
+      carbonLoadModel,
+      server)
+
+    // fire post event before streamin is started
+    val loadTablePostExecutionEvent = new LoadTablePostExecutionEvent(
+      carbonTable.getCarbonTableIdentifier,
+      carbonLoadModel
+    )
+    OperationListenerBus.getInstance().fireEvent(loadTablePostExecutionEvent, 
operationContext)
+    carbonAppendableStreamSink
+  }
+
+  private def validateParameters(parameters: Map[String, String]): Unit = {
+    val segmentSize = parameters.get(CarbonCommonConstants.HANDOFF_SIZE)
+    if (segmentSize.isDefined) {
+      try {
+        val value = java.lang.Long.parseLong(segmentSize.get)
+        if (value < CarbonCommonConstants.HANDOFF_SIZE_MIN) {
+          new CarbonStreamException(CarbonCommonConstants.HANDOFF_SIZE +
+                                    "should be bigger than or equal " +
+                                    CarbonCommonConstants.HANDOFF_SIZE_MIN)
+        }
+      } catch {
+        case _: NumberFormatException =>
+          new CarbonStreamException(CarbonCommonConstants.HANDOFF_SIZE +
+                                    s" $segmentSize is an illegal number")
+      }
+    }
+  }
+
+  /**
+   * get current stream segment id
+   * @return
+   */
+  private def getStreamSegmentId(carbonTable: CarbonTable): String = {
+    val segmentId = StreamSegment.open(carbonTable)
+    val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, 
segmentId)
+    val fileType = FileFactory.getFileType(segmentDir)
+    if (!FileFactory.isFileExist(segmentDir, fileType)) {
+      // Create table directory path, in case of enabling hive metastore first 
load may not have
+      // table folder created.
+      FileFactory.mkdirs(segmentDir, fileType)
+    }
+    if (FileFactory.isFileExist(segmentDir, fileType)) {
+      // recover fault
+      StreamSegment.recoverSegmentIfRequired(segmentDir)
+    } else {
+      FileFactory.mkdirs(segmentDir, fileType)
+    }
+    segmentId
+  }
+
+  def startDictionaryServer(
+      sparkSession: SparkSession,
+      carbonTable: CarbonTable,
+      carbonLoadModel: CarbonLoadModel): Option[DictionaryServer] = {
+    // start dictionary server when use one pass load and dimension with 
DICTIONARY
+    // encoding is present.
+    val allDimensions = carbonTable.getAllDimensions.asScala.toList
+    val createDictionary = allDimensions.exists {
+      carbonDimension => carbonDimension.hasEncoding(Encoding.DICTIONARY) &&
+                         
!carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY)
+    }
+    val carbonSecureModeDictServer = CarbonProperties.getInstance.
+      getProperty(CarbonCommonConstants.CARBON_SECURE_DICTIONARY_SERVER,
+        CarbonCommonConstants.CARBON_SECURE_DICTIONARY_SERVER_DEFAULT)
+
+    val sparkConf = sparkSession.sqlContext.sparkContext.getConf
+    val sparkDriverHost = sparkSession.sqlContext.sparkContext.
+      getConf.get("spark.driver.host")
+
+    val server: Option[DictionaryServer] = if (createDictionary) {
+      if (sparkConf.get("spark.authenticate", 
"false").equalsIgnoreCase("true") &&
+          carbonSecureModeDictServer.toBoolean) {
+        val dictionaryServer = SecureDictionaryServer.getInstance(sparkConf,
+          sparkDriverHost.toString, carbonLoadModel.getDictionaryServerPort, 
carbonTable)
+        carbonLoadModel.setDictionaryServerPort(dictionaryServer.getPort)
+        carbonLoadModel.setDictionaryServerHost(dictionaryServer.getHost)
+        
carbonLoadModel.setDictionaryServerSecretKey(dictionaryServer.getSecretKey)
+        
carbonLoadModel.setDictionaryEncryptServerSecure(dictionaryServer.isEncryptSecureServer)
+        carbonLoadModel.setDictionaryServiceProvider(new 
SecureDictionaryServiceProvider())
+        sparkSession.sparkContext.addSparkListener(new SparkListener() {
+          override def onApplicationEnd(applicationEnd: 
SparkListenerApplicationEnd) {
+            dictionaryServer.shutdown()
+          }
+        })
+        Some(dictionaryServer)
+      } else {
+        val dictionaryServer = NonSecureDictionaryServer
+          .getInstance(carbonLoadModel.getDictionaryServerPort, carbonTable)
+        carbonLoadModel.setDictionaryServerPort(dictionaryServer.getPort)
+        carbonLoadModel.setDictionaryServerHost(dictionaryServer.getHost)
+        carbonLoadModel.setDictionaryEncryptServerSecure(false)
+        carbonLoadModel
+          .setDictionaryServiceProvider(new 
NonSecureDictionaryServiceProvider(dictionaryServer
+            .getPort))
+        sparkSession.sparkContext.addSparkListener(new SparkListener() {
+          override def onApplicationEnd(applicationEnd: 
SparkListenerApplicationEnd) {
+            dictionaryServer.shutdown()
+          }
+        })
+        Some(dictionaryServer)
+      }
+    } else {
+      None
+    }
+    server
+  }
+
+  private def buildCarbonLoadModelForStream(
+      sparkSession: SparkSession,
+      hadoopConf: Configuration,
+      carbonTable: CarbonTable,
+      parameters: Map[String, String],
+      segmentId: String): CarbonLoadModel = {
+    val carbonProperty: CarbonProperties = CarbonProperties.getInstance()
+    carbonProperty.addProperty("zookeeper.enable.lock", "false")
+    val optionsFinal = LoadOption.fillOptionWithDefaultValue(parameters.asJava)
+    optionsFinal.put("sort_scope", "no_sort")
+    if (parameters.get("fileheader").isEmpty) {
+      optionsFinal.put("fileheader", 
carbonTable.getCreateOrderColumn(carbonTable.getTableName)
+        .asScala.map(_.getColName).mkString(","))
+    }
+    val carbonLoadModel = new CarbonLoadModel()
+    new CarbonLoadModelBuilder(carbonTable).build(
+      parameters.asJava,
+      optionsFinal,
+      carbonLoadModel,
+      hadoopConf)
+    carbonLoadModel.setSegmentId(segmentId)
+    // stream should use one pass
+    val dictionaryServerPort = parameters.getOrElse(
+      CarbonCommonConstants.DICTIONARY_SERVER_PORT,
+      carbonProperty.getProperty(
+        CarbonCommonConstants.DICTIONARY_SERVER_PORT,
+        CarbonCommonConstants.DICTIONARY_SERVER_PORT_DEFAULT))
+    val sparkDriverHost = sparkSession.sqlContext.sparkContext.
+      getConf.get("spark.driver.host")
+    carbonLoadModel.setDictionaryServerHost(sparkDriverHost)
+    carbonLoadModel.setDictionaryServerPort(dictionaryServerPort.toInt)
+    carbonLoadModel
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
new file mode 100644
index 0000000..402bc4b
--- /dev/null
+++ 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
@@ -0,0 +1,362 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.util.Date
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.mapreduce._
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.io.FileCommitProtocol
+import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
+import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.{QueryExecution, SQLExecution}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.{SerializableConfiguration, Utils}
+
+import org.apache.carbondata.common.CarbonIterator
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.dictionary.server.DictionaryServer
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.stats.QueryStatistic
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.events.{OperationContext, OperationListenerBus}
+import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
+import 
org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants
+import 
org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePostExecutionEvent,
 LoadTablePreExecutionEvent}
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.spark.rdd.StreamHandoffRDD
+import org.apache.carbondata.streaming.{CarbonStreamException, 
CarbonStreamOutputFormat}
+import org.apache.carbondata.streaming.parser.CarbonStreamParser
+import org.apache.carbondata.streaming.segment.StreamSegment
+
+/**
+ * an implement of stream sink, it persist each batch to disk by appending the 
batch data to
+ * data files.
+ */
+class CarbonAppendableStreamSink(
+    sparkSession: SparkSession,
+    val carbonTable: CarbonTable,
+    var currentSegmentId: String,
+    parameters: Map[String, String],
+    carbonLoadModel: CarbonLoadModel,
+    server: Option[DictionaryServer]) extends Sink {
+
+  private val fileLogPath = 
CarbonTablePath.getStreamingLogDir(carbonTable.getTablePath)
+  private val fileLog = new FileStreamSinkLog(FileStreamSinkLog.VERSION, 
sparkSession, fileLogPath)
+  // prepare configuration
+  private val hadoopConf = {
+    val conf = sparkSession.sessionState.newHadoopConf()
+    // put all parameters into hadoopConf
+    parameters.foreach { entry =>
+      conf.set(entry._1, entry._2)
+    }
+    // properties below will be used for default CarbonStreamParser
+    conf.set("carbon_complex_delimiter_level_1",
+      carbonLoadModel.getComplexDelimiterLevel1)
+    conf.set("carbon_complex_delimiter_level_2",
+      carbonLoadModel.getComplexDelimiterLevel2)
+    conf.set(
+      DataLoadProcessorConstants.SERIALIZATION_NULL_FORMAT,
+      carbonLoadModel.getSerializationNullFormat().split(",")(1))
+    conf.set(
+      CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+      carbonLoadModel.getTimestampformat())
+    conf.set(
+      CarbonCommonConstants.CARBON_DATE_FORMAT,
+      carbonLoadModel.getDateFormat())
+    conf
+  }
+  // segment max size(byte)
+  private val segmentMaxSize = hadoopConf.getLong(
+    CarbonCommonConstants.HANDOFF_SIZE,
+    CarbonProperties.getInstance().getHandoffSize
+  )
+
+  // auto handoff
+  private val enableAutoHandoff = hadoopConf.getBoolean(
+    CarbonCommonConstants.ENABLE_AUTO_HANDOFF,
+    CarbonProperties.getInstance().isEnableAutoHandoff
+  )
+
+  override def addBatch(batchId: Long, data: DataFrame): Unit = {
+    if (batchId <= fileLog.getLatest().map(_._1).getOrElse(-1L)) {
+      CarbonAppendableStreamSink.LOGGER.info(s"Skipping already committed 
batch $batchId")
+    } else {
+
+      val statistic = new QueryStatistic()
+
+      // fire pre event on every batch add
+      // in case of streaming options and optionsFinal can be same
+      val operationContext = new OperationContext
+      val loadTablePreExecutionEvent = new LoadTablePreExecutionEvent(
+        carbonTable.getCarbonTableIdentifier,
+        carbonLoadModel,
+        carbonLoadModel.getFactFilePath,
+        false,
+        parameters.asJava,
+        parameters.asJava,
+        false
+      )
+      OperationListenerBus.getInstance().fireEvent(loadTablePreExecutionEvent, 
operationContext)
+      checkOrHandOffSegment()
+
+      // committer will record how this spark job commit its output
+      val committer = FileCommitProtocol.instantiate(
+        className = 
sparkSession.sessionState.conf.streamingFileCommitProtocolClass,
+        jobId = batchId.toString,
+        outputPath = fileLogPath,
+        isAppend = false)
+
+      committer match {
+        case manifestCommitter: ManifestFileCommitProtocol =>
+          manifestCommitter.setupManifestOptions(fileLog, batchId)
+        case _ => // Do nothing
+      }
+
+      CarbonAppendableStreamSink.writeDataFileJob(
+        sparkSession,
+        carbonTable,
+        parameters,
+        batchId,
+        currentSegmentId,
+        data.queryExecution,
+        committer,
+        hadoopConf,
+        carbonLoadModel,
+        server)
+      // fire post event on every batch add
+      val loadTablePostExecutionEvent = new LoadTablePostExecutionEvent(
+        carbonTable.getCarbonTableIdentifier,
+        carbonLoadModel
+      )
+      
OperationListenerBus.getInstance().fireEvent(loadTablePostExecutionEvent, 
operationContext)
+
+      statistic.addStatistics(s"add batch: $batchId", 
System.currentTimeMillis())
+      CarbonAppendableStreamSink.LOGGER.info(
+        s"${statistic.getMessage}, taken time(ms): ${statistic.getTimeTaken}")
+    }
+  }
+
+  /**
+   * if the directory size of current segment beyond the threshold, hand off 
new segment
+   */
+  private def checkOrHandOffSegment(): Unit = {
+    val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, 
currentSegmentId)
+    val fileType = FileFactory.getFileType(segmentDir)
+    if (segmentMaxSize <= StreamSegment.size(segmentDir)) {
+      val newSegmentId = StreamSegment.close(carbonTable, currentSegmentId)
+      currentSegmentId = newSegmentId
+      val newSegmentDir = 
CarbonTablePath.getSegmentPath(carbonTable.getTablePath, currentSegmentId)
+      FileFactory.mkdirs(newSegmentDir, fileType)
+
+      // TODO trigger hand off operation
+      if (enableAutoHandoff) {
+        StreamHandoffRDD.startStreamingHandoffThread(
+          carbonLoadModel,
+          sparkSession,
+          false)
+      }
+    }
+  }
+}
+
+object CarbonAppendableStreamSink {
+
+  private val LOGGER = 
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  /**
+   * package the hadoop configuration and it will be passed to executor side 
from driver side
+   */
+  case class WriteDataFileJobDescription(
+      serializableHadoopConf: SerializableConfiguration,
+      batchId: Long,
+      segmentId: String)
+
+  /**
+   * Run a spark job to append the newly arrived data to the existing row 
format
+   * file directly.
+   * If there are failure in the task, spark will re-try the task and
+   * carbon will do recovery by HDFS file truncate. (see 
StreamSegment.tryRecoverFromTaskFault)
+   * If there are job level failure, every files in the stream segment will do 
truncate
+   * if necessary. (see StreamSegment.tryRecoverFromJobFault)
+   */
+  def writeDataFileJob(
+      sparkSession: SparkSession,
+      carbonTable: CarbonTable,
+      parameters: Map[String, String],
+      batchId: Long,
+      segmentId: String,
+      queryExecution: QueryExecution,
+      committer: FileCommitProtocol,
+      hadoopConf: Configuration,
+      carbonLoadModel: CarbonLoadModel,
+      server: Option[DictionaryServer]): Unit = {
+
+    // create job
+    val job = Job.getInstance(hadoopConf)
+    job.setOutputKeyClass(classOf[Void])
+    job.setOutputValueClass(classOf[InternalRow])
+    val jobId = CarbonInputFormatUtil.getJobId(new Date, batchId.toInt)
+    job.setJobID(jobId)
+
+    val description = WriteDataFileJobDescription(
+      serializableHadoopConf = new 
SerializableConfiguration(job.getConfiguration),
+      batchId,
+      segmentId
+    )
+
+    // run write data file job
+    SQLExecution.withNewExecutionId(sparkSession, queryExecution) {
+      var result: Array[TaskCommitMessage] = null
+      try {
+        committer.setupJob(job)
+        // initialize dictionary server
+        if (server.isDefined) {
+          server.get.initializeDictionaryGenerator(carbonTable)
+        }
+
+        val rowSchema = queryExecution.analyzed.schema
+        // write data file
+        result = sparkSession.sparkContext.runJob(queryExecution.toRdd,
+          (taskContext: TaskContext, iterator: Iterator[InternalRow]) => {
+            writeDataFileTask(
+              description,
+              carbonLoadModel,
+              sparkStageId = taskContext.stageId(),
+              sparkPartitionId = taskContext.partitionId(),
+              sparkAttemptNumber = taskContext.attemptNumber(),
+              committer,
+              iterator,
+              rowSchema
+            )
+          })
+
+        // write dictionary
+        if (server.isDefined) {
+          try {
+            
server.get.writeTableDictionary(carbonTable.getCarbonTableIdentifier.getTableId)
+          } catch {
+            case _: Exception =>
+              LOGGER.error(
+                s"Error while writing dictionary file for 
${carbonTable.getTableUniqueName}")
+              throw new Exception(
+                "Streaming ingest failed due to error while writing dictionary 
file")
+          }
+        }
+
+        // update data file info in index file
+        StreamSegment.updateIndexFile(
+          CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentId))
+
+      } catch {
+        // catch fault of executor side
+        case t: Throwable =>
+          val segmentDir = 
CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentId)
+          StreamSegment.recoverSegmentIfRequired(segmentDir)
+          LOGGER.error(t, s"Aborting job ${ job.getJobID }.")
+          committer.abortJob(job)
+          throw new CarbonStreamException("Job failed to write data file", t)
+      }
+      committer.commitJob(job, result)
+      LOGGER.info(s"Job ${ job.getJobID } committed.")
+    }
+  }
+
+  /**
+   * execute a task for each partition to write a data file
+   */
+  def writeDataFileTask(
+      description: WriteDataFileJobDescription,
+      carbonLoadModel: CarbonLoadModel,
+      sparkStageId: Int,
+      sparkPartitionId: Int,
+      sparkAttemptNumber: Int,
+      committer: FileCommitProtocol,
+      iterator: Iterator[InternalRow],
+      rowSchema: StructType
+  ): TaskCommitMessage = {
+
+    val jobId = CarbonInputFormatUtil.getJobId(new Date, sparkStageId)
+    val taskId = new TaskID(jobId, TaskType.MAP, sparkPartitionId)
+    val taskAttemptId = new TaskAttemptID(taskId, sparkAttemptNumber)
+
+    // Set up the attempt context required to use in the output committer.
+    val taskAttemptContext: TaskAttemptContext = {
+      // Set up the configuration object
+      val hadoopConf = description.serializableHadoopConf.value
+      CarbonStreamOutputFormat.setSegmentId(hadoopConf, description.segmentId)
+      hadoopConf.set("mapred.job.id", jobId.toString)
+      hadoopConf.set("mapred.tip.id", taskAttemptId.getTaskID.toString)
+      hadoopConf.set("mapred.task.id", taskAttemptId.toString)
+      hadoopConf.setBoolean("mapred.task.is.map", true)
+      hadoopConf.setInt("mapred.task.partition", 0)
+      new TaskAttemptContextImpl(hadoopConf, taskAttemptId)
+    }
+
+    committer.setupTask(taskAttemptContext)
+
+    try {
+      Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
+
+        val parserName = taskAttemptContext.getConfiguration.get(
+          CarbonStreamParser.CARBON_STREAM_PARSER,
+          CarbonStreamParser.CARBON_STREAM_PARSER_DEFAULT)
+
+        val streamParser =
+          
Class.forName(parserName).newInstance.asInstanceOf[CarbonStreamParser]
+        streamParser.initialize(taskAttemptContext.getConfiguration, rowSchema)
+
+        StreamSegment.appendBatchData(new InputIterator(iterator, 
streamParser),
+          taskAttemptContext, carbonLoadModel)
+      })(catchBlock = {
+        committer.abortTask(taskAttemptContext)
+        LOGGER.error(s"Job $jobId aborted.")
+      })
+      committer.commitTask(taskAttemptContext)
+    } catch {
+      case t: Throwable =>
+        throw new CarbonStreamException("Task failed while writing rows", t)
+    }
+  }
+
+  /**
+   * convert spark iterator to carbon iterator, so that java module can use it.
+   */
+  class InputIterator(rddIter: Iterator[InternalRow], streamParser: 
CarbonStreamParser)
+    extends CarbonIterator[Array[Object]] {
+
+    override def hasNext: Boolean = rddIter.hasNext
+
+    override def next: Array[Object] = {
+      streamParser.parserRow(rddIter.next())
+    }
+
+    override def close(): Unit = {
+      streamParser.close()
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/integration/spark-common/src/main/scala/org/apache/spark/sql/test/TestQueryExecutor.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/test/TestQueryExecutor.scala
 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/test/TestQueryExecutor.scala
index f582145..34f901f 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/test/TestQueryExecutor.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/test/TestQueryExecutor.scala
@@ -124,7 +124,9 @@ object TestQueryExecutor {
     TestQueryExecutor.projectPath + "/processing/target",
     TestQueryExecutor.projectPath + "/integration/spark-common/target",
     TestQueryExecutor.projectPath + "/integration/spark2/target",
-    TestQueryExecutor.projectPath + "/integration/spark-common/target/jars")
+    TestQueryExecutor.projectPath + "/integration/spark-common/target/jars",
+    TestQueryExecutor.projectPath + "/streaming/target")
+
   lazy val jars = {
     val jarsLocal = new ArrayBuffer[String]()
     modules.foreach { path =>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/integration/spark2/pom.xml
----------------------------------------------------------------------
diff --git a/integration/spark2/pom.xml b/integration/spark2/pom.xml
index 13b3d8d..e4593be 100644
--- a/integration/spark2/pom.xml
+++ b/integration/spark2/pom.xml
@@ -36,7 +36,7 @@
   <dependencies>
     <dependency>
       <groupId>org.apache.carbondata</groupId>
-      <artifactId>carbondata-streaming</artifactId>
+      <artifactId>carbondata-spark-common</artifactId>
       <version>${project.version}</version>
     </dependency>
     <dependency>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala 
b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
index f6bdff6..1038fcf 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
@@ -27,7 +27,6 @@ import org.apache.spark.scheduler.{SparkListener, 
SparkListenerApplicationEnd}
 import org.apache.spark.sql.SparkSession.Builder
 import org.apache.spark.sql.catalyst.encoders.RowEncoder
 import org.apache.spark.sql.catalyst.plans.logical.{Command, LocalRelation, 
Union}
-import org.apache.spark.sql.execution.streaming.CarbonStreamingQueryListener
 import org.apache.spark.sql.hive.execution.command.CarbonSetCommand
 import org.apache.spark.sql.internal.{SessionState, SharedState}
 import org.apache.spark.sql.profiler.{Profiler, SQLStart}
@@ -36,6 +35,7 @@ import org.apache.spark.util.{CarbonReflectionUtils, Utils}
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonSessionInfo, 
ThreadLocalSessionInfo}
 import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
+import org.apache.carbondata.streaming.CarbonStreamingQueryListener
 
 /**
  * Session implementation for {org.apache.spark.sql.SparkSession}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
index 5183b02..e60a583 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
@@ -46,8 +46,7 @@ import org.apache.carbondata.events._
 import 
org.apache.carbondata.processing.loading.events.LoadEvents.LoadMetadataEvent
 import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, 
CarbonLoadModel}
 import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, 
CompactionType}
-import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
-import org.apache.carbondata.streaming.StreamHandoffRDD
+import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, StreamHandoffRDD}
 import org.apache.carbondata.streaming.segment.StreamSegment
 
 /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/streaming/pom.xml
----------------------------------------------------------------------
diff --git a/streaming/pom.xml b/streaming/pom.xml
index 01affec..b8c447d 100644
--- a/streaming/pom.xml
+++ b/streaming/pom.xml
@@ -22,11 +22,16 @@
   <dependencies>
     <dependency>
       <groupId>org.apache.carbondata</groupId>
-      <artifactId>carbondata-spark-common</artifactId>
+      <artifactId>carbondata-hadoop</artifactId>
       <version>${project.version}</version>
     </dependency>
     <dependency>
       <groupId>org.apache.spark</groupId>
+      <artifactId>spark-sql_${scala.binary.version}</artifactId>
+      <version>${spark.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
       <artifactId>spark-streaming_${scala.binary.version}</artifactId>
       <version>${spark.version}</version>
       <scope>${spark.deps.scope}</scope>
@@ -44,7 +49,7 @@
   </dependencies>
 
   <build>
-    <testSourceDirectory>src/test/scala</testSourceDirectory>
+    <testSourceDirectory>src/test/java</testSourceDirectory>
     <resources>
       <resource>
         <directory>src/resources</directory>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamInputFormat.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamInputFormat.java
 
b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamInputFormat.java
new file mode 100644
index 0000000..66d89c8
--- /dev/null
+++ 
b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamInputFormat.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.streaming;
+
+import java.io.IOException;
+
+import org.apache.carbondata.core.cache.Cache;
+import org.apache.carbondata.core.cache.dictionary.Dictionary;
+import 
org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.encoder.Encoding;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
+import org.apache.carbondata.core.scan.complextypes.ArrayQueryType;
+import org.apache.carbondata.core.scan.complextypes.PrimitiveQueryType;
+import org.apache.carbondata.core.scan.complextypes.StructQueryType;
+import org.apache.carbondata.core.scan.filter.GenericQueryType;
+import org.apache.carbondata.core.util.CarbonUtil;
+
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+
+/**
+ * Stream input format
+ */
+public class CarbonStreamInputFormat extends FileInputFormat<Void, Object> {
+
+  public static final String READ_BUFFER_SIZE = 
"carbon.stream.read.buffer.size";
+  public static final String READ_BUFFER_SIZE_DEFAULT = "65536";
+
+  @Override public RecordReader<Void, Object> createRecordReader(InputSplit 
split,
+      TaskAttemptContext context) throws IOException, InterruptedException {
+    return new CarbonStreamRecordReader();
+  }
+
+  public static GenericQueryType[] getComplexDimensions(CarbonTable 
carbontable,
+      CarbonColumn[] carbonColumns, Cache<DictionaryColumnUniqueIdentifier, 
Dictionary> cache)
+      throws IOException {
+    GenericQueryType[] queryTypes = new GenericQueryType[carbonColumns.length];
+    for (int i = 0; i < carbonColumns.length; i++) {
+      if (carbonColumns[i].isComplex()) {
+        if (DataTypes.isArrayType(carbonColumns[i].getDataType())) {
+          queryTypes[i] = new ArrayQueryType(carbonColumns[i].getColName(),
+              carbonColumns[i].getColName(), i);
+        } else if (DataTypes.isStructType(carbonColumns[i].getDataType())) {
+          queryTypes[i] = new StructQueryType(carbonColumns[i].getColName(),
+              carbonColumns[i].getColName(), i);
+        } else {
+          throw new UnsupportedOperationException(
+              carbonColumns[i].getDataType().getName() + " is not supported");
+        }
+
+        fillChildren(carbontable, queryTypes[i], (CarbonDimension) 
carbonColumns[i], i, cache);
+      }
+    }
+
+    return queryTypes;
+  }
+
+  private static void fillChildren(CarbonTable carbontable, GenericQueryType 
parentQueryType,
+      CarbonDimension dimension, int parentBlockIndex,
+      Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache) throws 
IOException {
+    for (int i = 0; i < dimension.getNumberOfChild(); i++) {
+      CarbonDimension child = dimension.getListOfChildDimensions().get(i);
+      DataType dataType = child.getDataType();
+      GenericQueryType queryType = null;
+      if (DataTypes.isArrayType(dataType)) {
+        queryType =
+            new ArrayQueryType(child.getColName(), dimension.getColName(), 
++parentBlockIndex);
+
+      } else if (DataTypes.isStructType(dataType)) {
+        queryType =
+            new StructQueryType(child.getColName(), dimension.getColName(), 
++parentBlockIndex);
+        parentQueryType.addChildren(queryType);
+      } else {
+        boolean isDirectDictionary =
+            CarbonUtil.hasEncoding(child.getEncoder(), 
Encoding.DIRECT_DICTIONARY);
+        String dictionaryPath = 
carbontable.getTableInfo().getFactTable().getTableProperties()
+            .get(CarbonCommonConstants.DICTIONARY_PATH);
+        DictionaryColumnUniqueIdentifier dictionarIdentifier =
+            new 
DictionaryColumnUniqueIdentifier(carbontable.getAbsoluteTableIdentifier(),
+                child.getColumnIdentifier(), child.getDataType(), 
dictionaryPath);
+
+        queryType =
+            new PrimitiveQueryType(child.getColName(), dimension.getColName(), 
++parentBlockIndex,
+                child.getDataType(), 4, cache.get(dictionarIdentifier),
+                isDirectDictionary);
+      }
+      parentQueryType.addChildren(queryType);
+      if (child.getNumberOfChild() > 0) {
+        fillChildren(carbontable, queryType, child, parentBlockIndex, cache);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamOutputFormat.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamOutputFormat.java
 
b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamOutputFormat.java
new file mode 100644
index 0000000..f9f0d76
--- /dev/null
+++ 
b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamOutputFormat.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.streaming;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.hadoop.util.ObjectSerializationUtil;
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+/**
+ * Stream output format
+ */
+public class CarbonStreamOutputFormat extends FileOutputFormat<Void, Object> {
+
+  static final byte[] CARBON_SYNC_MARKER =
+      
"@carbondata_sync".getBytes(Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET));
+
+  public static final String CARBON_ENCODER_ROW_BUFFER_SIZE = 
"carbon.stream.row.buffer.size";
+
+  public static final int CARBON_ENCODER_ROW_BUFFER_SIZE_DEFAULT = 1024;
+
+  public static final String CARBON_STREAM_BLOCKLET_ROW_NUMS = 
"carbon.stream.blocklet.row.nums";
+
+  public static final int CARBON_STREAM_BLOCKLET_ROW_NUMS_DEFAULT = 32000;
+
+  public static final String CARBON_STREAM_CACHE_SIZE = 
"carbon.stream.cache.size";
+
+  public static final int CARBON_STREAM_CACHE_SIZE_DEFAULT = 32 * 1024 * 1024;
+
+  private static final String LOAD_Model = 
"mapreduce.output.carbon.load.model";
+
+  private static final String SEGMENT_ID = "carbon.segment.id";
+
+  @Override public RecordWriter<Void, Object> 
getRecordWriter(TaskAttemptContext job)
+      throws IOException, InterruptedException {
+    return new CarbonStreamRecordWriter(job);
+  }
+
+  public static void setCarbonLoadModel(Configuration hadoopConf, 
CarbonLoadModel carbonLoadModel)
+      throws IOException {
+    if (carbonLoadModel != null) {
+      hadoopConf.set(LOAD_Model, 
ObjectSerializationUtil.convertObjectToString(carbonLoadModel));
+    }
+  }
+
+  public static CarbonLoadModel getCarbonLoadModel(Configuration hadoopConf) 
throws IOException {
+    String value = hadoopConf.get(LOAD_Model);
+    if (value == null) {
+      return null;
+    } else {
+      return (CarbonLoadModel) 
ObjectSerializationUtil.convertStringToObject(value);
+    }
+  }
+
+  public static void setSegmentId(Configuration hadoopConf, String segmentId) 
throws IOException {
+    if (segmentId != null) {
+      hadoopConf.set(SEGMENT_ID, segmentId);
+    }
+  }
+
+  public static String getSegmentId(Configuration hadoopConf) throws 
IOException {
+    return hadoopConf.get(SEGMENT_ID);
+  }
+
+}

Reply via email to