http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala new file mode 100644 index 0000000..bf5f660 --- /dev/null +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala @@ -0,0 +1,435 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.rdd + +import java.text.SimpleDateFormat +import java.util +import java.util.Date + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.mapreduce.{Job, TaskAttemptID, TaskType} +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl +import org.apache.spark.{Partition, SerializableWritable, SparkContext, TaskContext} +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.datamap.Segment +import org.apache.carbondata.core.datastore.block.SegmentProperties +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage} +import org.apache.carbondata.core.metadata.CarbonMetadata +import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.core.scan.result.iterator.RawResultIterator +import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager} +import org.apache.carbondata.core.util.CarbonUtil +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.events.{OperationContext, OperationListenerBus} +import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonProjection} +import org.apache.carbondata.hadoop.api.{CarbonInputFormat, CarbonTableInputFormat} +import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePostStatusUpdateEvent, LoadTablePreStatusUpdateEvent} +import org.apache.carbondata.processing.loading.model.CarbonLoadModel +import org.apache.carbondata.processing.merger.{CompactionResultSortProcessor, CompactionType} +import org.apache.carbondata.processing.util.CarbonLoaderUtil +import org.apache.carbondata.spark.{HandoffResult, HandoffResultImpl} +import org.apache.carbondata.spark.util.CommonUtil +import org.apache.carbondata.streaming.{CarbonStreamInputFormat, CarbonStreamRecordReader} + + +/** + * partition of the handoff segment + */ +class HandoffPartition( + val rddId: Int, + val idx: Int, + @transient val inputSplit: CarbonInputSplit +) extends Partition { + + val split = new SerializableWritable[CarbonInputSplit](inputSplit) + + override val index: Int = idx + + override def hashCode(): Int = 41 * (41 + rddId) + idx +} + +/** + * package the record reader of the handoff segment to RawResultIterator + */ +class StreamingRawResultIterator( + recordReader: CarbonStreamRecordReader +) extends RawResultIterator(null, null, null) { + + override def hasNext: Boolean = { + recordReader.nextKeyValue() + } + + override def next(): Array[Object] = { + val rowTmp = recordReader + .getCurrentValue + .asInstanceOf[GenericInternalRow] + .values + .asInstanceOf[Array[Object]] + val row = new Array[Object](rowTmp.length) + System.arraycopy(rowTmp, 0, row, 0, rowTmp.length) + row + } +} + +/** + * execute streaming segment handoff + */ +class StreamHandoffRDD[K, V]( + sc: SparkContext, + result: HandoffResult[K, V], + carbonLoadModel: CarbonLoadModel, + handOffSegmentId: String +) extends CarbonRDD[(K, V)](sc, Nil, sc.hadoopConfiguration) { + + private val jobTrackerId: String = { + val formatter = new SimpleDateFormat("yyyyMMddHHmm") + formatter.format(new Date()) + } + + override def internalCompute( + split: Partition, + context: TaskContext + ): Iterator[(K, V)] = { + carbonLoadModel.setTaskNo("" + split.index) + val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable + CarbonMetadata.getInstance().addCarbonTable(carbonTable) + // the input iterator is using raw row + val iteratorList = prepareInputIterator(split, carbonTable) + + CommonUtil.setTempStoreLocation(split.index, carbonLoadModel, true, false) + // use CompactionResultSortProcessor to sort data dan write to columnar files + val processor = prepareHandoffProcessor(carbonTable) + val status = processor.execute(iteratorList) + + new Iterator[(K, V)] { + private var finished = false + + override def hasNext: Boolean = { + !finished + } + + override def next(): (K, V) = { + finished = true + result.getKey("" + split.index, status) + } + } + } + + /** + * prepare input iterator by basing CarbonStreamRecordReader + */ + private def prepareInputIterator( + split: Partition, + carbonTable: CarbonTable + ): util.ArrayList[RawResultIterator] = { + val inputSplit = split.asInstanceOf[HandoffPartition].split.value + val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, split.index, 0) + val hadoopConf = new Configuration() + CarbonInputFormat.setDatabaseName(hadoopConf, carbonTable.getDatabaseName) + CarbonInputFormat.setTableName(hadoopConf, carbonTable.getTableName) + CarbonInputFormat.setTablePath(hadoopConf, carbonTable.getTablePath) + val projection = new CarbonProjection + val dataFields = carbonTable.getStreamStorageOrderColumn(carbonTable.getTableName) + (0 until dataFields.size()).foreach { index => + projection.addColumn(dataFields.get(index).getColName) + } + CarbonInputFormat.setColumnProjection(hadoopConf, projection) + CarbonInputFormat.setTableInfo(hadoopConf, carbonTable.getTableInfo) + val attemptContext = new TaskAttemptContextImpl(hadoopConf, attemptId) + val format = new CarbonTableInputFormat[Array[Object]]() + val model = format.createQueryModel(inputSplit, attemptContext) + val inputFormat = new CarbonStreamInputFormat + val streamReader = inputFormat.createRecordReader(inputSplit, attemptContext) + .asInstanceOf[CarbonStreamRecordReader] + streamReader.setVectorReader(false) + streamReader.setQueryModel(model) + streamReader.setUseRawRow(true) + streamReader.initialize(inputSplit, attemptContext) + val iteratorList = new util.ArrayList[RawResultIterator](1) + iteratorList.add(new StreamingRawResultIterator(streamReader)) + iteratorList + } + + private def prepareHandoffProcessor( + carbonTable: CarbonTable + ): CompactionResultSortProcessor = { + val wrapperColumnSchemaList = CarbonUtil.getColumnSchemaList( + carbonTable.getDimensionByTableName(carbonTable.getTableName), + carbonTable.getMeasureByTableName(carbonTable.getTableName)) + val dimLensWithComplex = + (0 until wrapperColumnSchemaList.size()).map(_ => Integer.MAX_VALUE).toArray + val dictionaryColumnCardinality = + CarbonUtil.getFormattedCardinality(dimLensWithComplex, wrapperColumnSchemaList) + val segmentProperties = + new SegmentProperties(wrapperColumnSchemaList, dictionaryColumnCardinality) + + new CompactionResultSortProcessor( + carbonLoadModel, + carbonTable, + segmentProperties, + CompactionType.STREAMING, + carbonTable.getTableName, + null + ) + } + + /** + * get the partitions of the handoff segment + */ + override protected def getPartitions: Array[Partition] = { + val job = Job.getInstance(FileFactory.getConfiguration) + val inputFormat = new CarbonTableInputFormat[Array[Object]]() + val segmentList = new util.ArrayList[Segment](1) + segmentList.add(Segment.toSegment(handOffSegmentId)) + val splits = inputFormat.getSplitsOfStreaming( + job, + carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.getAbsoluteTableIdentifier, + segmentList + ) + + (0 until splits.size()).map { index => + new HandoffPartition(id, index, splits.get(index).asInstanceOf[CarbonInputSplit]) + }.toArray[Partition] + } +} + +object StreamHandoffRDD { + + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + + def iterateStreamingHandoff( + carbonLoadModel: CarbonLoadModel, + sparkSession: SparkSession + ): Unit = { + val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable + val identifier = carbonTable.getAbsoluteTableIdentifier + var continueHandoff = false + // require handoff lock on table + val lock = CarbonLockFactory.getCarbonLockObj(identifier, LockUsage.HANDOFF_LOCK) + try { + if (lock.lockWithRetries()) { + LOGGER.info("Acquired the handoff lock for table" + + s" ${ carbonTable.getDatabaseName }.${ carbonTable.getTableName }") + // handoff streaming segment one by one + do { + val segmentStatusManager = new SegmentStatusManager(identifier) + var loadMetadataDetails: Array[LoadMetadataDetails] = null + // lock table to read table status file + val statusLock = segmentStatusManager.getTableStatusLock + try { + if (statusLock.lockWithRetries()) { + loadMetadataDetails = SegmentStatusManager.readLoadMetadata( + CarbonTablePath.getMetadataPath(identifier.getTablePath)) + } + } finally { + if (null != statusLock) { + statusLock.unlock() + } + } + if (null != loadMetadataDetails) { + val streamSegments = + loadMetadataDetails.filter(_.getSegmentStatus == SegmentStatus.STREAMING_FINISH) + + continueHandoff = streamSegments.length > 0 + if (continueHandoff) { + // handoff a streaming segment + val loadMetadataDetail = streamSegments(0) + executeStreamingHandoff( + carbonLoadModel, + sparkSession, + loadMetadataDetail.getLoadName + ) + } + } else { + continueHandoff = false + } + } while (continueHandoff) + } + } finally { + if (null != lock) { + lock.unlock() + } + } + } + + /** + * start new thread to execute stream segment handoff + */ + def startStreamingHandoffThread( + carbonLoadModel: CarbonLoadModel, + sparkSession: SparkSession, + isDDL: Boolean + ): Unit = { + if (isDDL) { + iterateStreamingHandoff(carbonLoadModel, sparkSession) + } else { + // start a new thread to execute streaming segment handoff + val handoffThread = new Thread() { + override def run(): Unit = { + iterateStreamingHandoff(carbonLoadModel, sparkSession) + } + } + handoffThread.start() + } + } + + /** + * invoke StreamHandoffRDD to handoff a streaming segment to a columnar segment + */ + def executeStreamingHandoff( + carbonLoadModel: CarbonLoadModel, + sparkSession: SparkSession, + handoffSegmenId: String + ): Unit = { + var loadStatus = SegmentStatus.SUCCESS + var errorMessage: String = "Handoff failure" + try { + // generate new columnar segment + val newMetaEntry = new LoadMetadataDetails + carbonLoadModel.setFactTimeStamp(System.currentTimeMillis()) + CarbonLoaderUtil.populateNewLoadMetaEntry( + newMetaEntry, + SegmentStatus.INSERT_IN_PROGRESS, + carbonLoadModel.getFactTimeStamp, + false) + val operationContext = new OperationContext() + val loadTablePreStatusUpdateEvent: LoadTablePreStatusUpdateEvent = + new LoadTablePreStatusUpdateEvent( + carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.getCarbonTableIdentifier, + carbonLoadModel) + OperationListenerBus.getInstance().fireEvent(loadTablePreStatusUpdateEvent, operationContext) + + CarbonLoaderUtil.recordNewLoadMetadata(newMetaEntry, carbonLoadModel, true, false) + val loadTablePostStatusUpdateEvent: LoadTablePostStatusUpdateEvent = + new LoadTablePostStatusUpdateEvent(carbonLoadModel) + OperationListenerBus.getInstance() + .fireEvent(loadTablePostStatusUpdateEvent, operationContext) + // convert a streaming segment to columnar segment + val status = new StreamHandoffRDD( + sparkSession.sparkContext, + new HandoffResultImpl(), + carbonLoadModel, + handoffSegmenId).collect() + + status.foreach { x => + if (!x._2) { + loadStatus = SegmentStatus.LOAD_FAILURE + } + } + } catch { + case ex: Exception => + loadStatus = SegmentStatus.LOAD_FAILURE + errorMessage = errorMessage + ": " + ex.getCause.getMessage + LOGGER.error(errorMessage) + LOGGER.error(ex, s"Handoff failed on streaming segment $handoffSegmenId") + } + + if (loadStatus == SegmentStatus.LOAD_FAILURE) { + CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel) + LOGGER.info("********starting clean up**********") + CarbonLoaderUtil.deleteSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt) + LOGGER.info("********clean up done**********") + LOGGER.audit(s"Handoff is failed for " + + s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") + LOGGER.error("Cannot write load metadata file as handoff failed") + throw new Exception(errorMessage) + } + + if (loadStatus == SegmentStatus.SUCCESS) { + val done = updateLoadMetadata(handoffSegmenId, carbonLoadModel) + if (!done) { + val errorMessage = "Handoff failed due to failure in table status updation." + LOGGER.audit("Handoff is failed for " + + s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") + LOGGER.error("Handoff failed due to failure in table status updation.") + throw new Exception(errorMessage) + } + done + } + + } + + /** + * update streaming segment and new columnar segment + */ + private def updateLoadMetadata( + handoffSegmentId: String, + loadModel: CarbonLoadModel + ): Boolean = { + var status = false + val metaDataFilepath = loadModel.getCarbonDataLoadSchema.getCarbonTable.getMetadataPath + val identifier = loadModel.getCarbonDataLoadSchema.getCarbonTable.getAbsoluteTableIdentifier + val metadataPath = CarbonTablePath.getMetadataPath(identifier.getTablePath) + val fileType = FileFactory.getFileType(metadataPath) + if (!FileFactory.isFileExist(metadataPath, fileType)) { + FileFactory.mkdirs(metadataPath, fileType) + } + val tableStatusPath = CarbonTablePath.getTableStatusFilePath(identifier.getTablePath) + val segmentStatusManager = new SegmentStatusManager(identifier) + val carbonLock = segmentStatusManager.getTableStatusLock + try { + if (carbonLock.lockWithRetries()) { + LOGGER.info( + "Acquired lock for table" + loadModel.getDatabaseName() + "." + loadModel.getTableName() + + " for table status updation") + val listOfLoadFolderDetailsArray = + SegmentStatusManager.readLoadMetadata(metaDataFilepath) + + // update new columnar segment to success status + val newSegment = + listOfLoadFolderDetailsArray.find(_.getLoadName.equals(loadModel.getSegmentId)) + if (newSegment.isEmpty) { + throw new Exception("Failed to update table status for new segment") + } else { + newSegment.get.setSegmentStatus(SegmentStatus.SUCCESS) + newSegment.get.setLoadEndTime(System.currentTimeMillis()) + } + + // update streaming segment to compacted status + val streamSegment = + listOfLoadFolderDetailsArray.find(_.getLoadName.equals(handoffSegmentId)) + if (streamSegment.isEmpty) { + throw new Exception("Failed to update table status for streaming segment") + } else { + streamSegment.get.setSegmentStatus(SegmentStatus.COMPACTED) + streamSegment.get.setMergedLoadName(loadModel.getSegmentId) + } + + // refresh table status file + SegmentStatusManager.writeLoadDetailsIntoFile(tableStatusPath, listOfLoadFolderDetailsArray) + status = true + } else { + LOGGER.error("Not able to acquire the lock for Table status updation for table " + loadModel + .getDatabaseName() + "." + loadModel.getTableName()) + } + } finally { + if (carbonLock.unlock()) { + LOGGER.info("Table unlocked successfully after table status updation" + + loadModel.getDatabaseName() + "." + loadModel.getTableName()) + } else { + LOGGER.error("Unable to unlock Table lock for table" + loadModel.getDatabaseName() + + "." + loadModel.getTableName() + " during table status updation") + } + } + status + } +}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala index 3250a53..5f55ef3 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala @@ -47,6 +47,7 @@ import org.apache.carbondata.processing.exception.DataLoadingException import org.apache.carbondata.processing.loading.FailureCauses import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException import org.apache.carbondata.processing.util.CarbonDataProcessorUtil +import org.apache.carbondata.streaming.parser.FieldConverter object CarbonScalaUtil { @@ -121,55 +122,8 @@ object CarbonScalaUtil { timeStampFormat: SimpleDateFormat, dateFormat: SimpleDateFormat, level: Int = 1): String = { - if (value == null) { - serializationNullFormat - } else { - value match { - case s: String => if (s.length > CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT) { - throw new Exception("Dataload failed, String length cannot exceed " + - CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT + " characters") - } else { - s - } - case d: java.math.BigDecimal => d.toPlainString - case i: java.lang.Integer => i.toString - case d: java.lang.Double => d.toString - case t: java.sql.Timestamp => timeStampFormat format t - case d: java.sql.Date => dateFormat format d - case b: java.lang.Boolean => b.toString - case s: java.lang.Short => s.toString - case f: java.lang.Float => f.toString - case bs: Array[Byte] => new String(bs, - Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)) - case s: scala.collection.Seq[Any] => - val delimiter = if (level == 1) { - delimiterLevel1 - } else { - delimiterLevel2 - } - val builder = new StringBuilder() - s.foreach { x => - builder.append(getString(x, serializationNullFormat, delimiterLevel1, - delimiterLevel2, timeStampFormat, dateFormat, level + 1)).append(delimiter) - } - builder.substring(0, builder.length - delimiter.length()) - case m: scala.collection.Map[Any, Any] => - throw new Exception("Unsupported data type: Map") - case r: org.apache.spark.sql.Row => - val delimiter = if (level == 1) { - delimiterLevel1 - } else { - delimiterLevel2 - } - val builder = new StringBuilder() - for (i <- 0 until r.length) { - builder.append(getString(r(i), serializationNullFormat, delimiterLevel1, - delimiterLevel2, timeStampFormat, dateFormat, level + 1)).append(delimiter) - } - builder.substring(0, builder.length - delimiter.length()) - case other => other.toString - } - } + FieldConverter.objectToString(value, serializationNullFormat, delimiterLevel1, + delimiterLevel2, timeStampFormat, dateFormat, level) } /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/CarbonSparkStreamingListener.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/CarbonSparkStreamingListener.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/CarbonSparkStreamingListener.scala new file mode 100644 index 0000000..a99a1e8 --- /dev/null +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/CarbonSparkStreamingListener.scala @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.streaming + +import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd} + +class CarbonSparkStreamingListener extends SparkListener { + + /** + * When Spark Streaming App stops, remove all locks for stream table. + */ + override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = { + CarbonStreamSparkStreaming.cleanAllLockAfterStop() + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/CarbonStreamSparkStreaming.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/CarbonStreamSparkStreaming.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/CarbonStreamSparkStreaming.scala new file mode 100644 index 0000000..28f04b1 --- /dev/null +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/CarbonStreamSparkStreaming.scala @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.streaming + +import java.util + +import scala.collection.JavaConverters._ + +import org.apache.hadoop.conf.Configuration +import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} +import org.apache.spark.sql.execution.streaming.{CarbonAppendableStreamSink, Sink} +import org.apache.spark.streaming.Time + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock, LockUsage} +import org.apache.carbondata.core.metadata.schema.table.CarbonTable + +/** + * Interface used to write stream data to stream table + * when integrate with Spark Streaming. + * + * NOTE: Current integration with Spark Streaming is an alpha feature. + */ +class CarbonStreamSparkStreamingWriter(val sparkSession: SparkSession, + val carbonTable: CarbonTable, + val configuration: Configuration) { + + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + + private var isInitialize: Boolean = false + + private var lock: ICarbonLock = null + private var carbonAppendableStreamSink: Sink = null + + /** + * Acquired the lock for stream table + */ + def lockStreamTable(): Unit = { + lock = CarbonLockFactory.getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier, + LockUsage.STREAMING_LOCK) + if (lock.lockWithRetries()) { + LOGGER.info("Acquired the lock for stream table: " + + carbonTable.getDatabaseName + "." + + carbonTable.getTableName) + } else { + LOGGER.error("Not able to acquire the lock for stream table:" + + carbonTable.getDatabaseName + "." + carbonTable.getTableName) + throw new InterruptedException( + "Not able to acquire the lock for stream table: " + carbonTable.getDatabaseName + "." + + carbonTable.getTableName) + } + } + + /** + * unlock for stream table + */ + def unLockStreamTable(): Unit = { + if (null != lock) { + lock.unlock() + LOGGER.info("unlock for stream table: " + + carbonTable.getDatabaseName + "." + + carbonTable.getTableName) + } + } + + def initialize(): Unit = { + carbonAppendableStreamSink = StreamSinkFactory.createStreamTableSink( + sparkSession, + configuration, + carbonTable, + extraOptions.toMap).asInstanceOf[CarbonAppendableStreamSink] + + lockStreamTable() + + isInitialize = true + } + + def writeStreamData(dataFrame: DataFrame, time: Time): Unit = { + if (!isInitialize) { + initialize() + } + carbonAppendableStreamSink.addBatch(time.milliseconds, dataFrame) + } + + private val extraOptions = new scala.collection.mutable.HashMap[String, String] + private var mode: SaveMode = SaveMode.ErrorIfExists + + this.option("dbName", carbonTable.getDatabaseName) + this.option("tableName", carbonTable.getTableName) + + /** + * Specifies the behavior when data or table already exists. Options include: + * - `SaveMode.Overwrite`: overwrite the existing data. + * - `SaveMode.Append`: append the data. + * - `SaveMode.Ignore`: ignore the operation (i.e. no-op). + * - `SaveMode.ErrorIfExists`: default option, throw an exception at runtime. + */ + def mode(saveMode: SaveMode): CarbonStreamSparkStreamingWriter = { + if (mode == SaveMode.ErrorIfExists) { + mode = saveMode + } + this + } + + /** + * Specifies the behavior when data or table already exists. Options include: + * - `overwrite`: overwrite the existing data. + * - `append`: append the data. + * - `ignore`: ignore the operation (i.e. no-op). + * - `error or default`: default option, throw an exception at runtime. + */ + def mode(saveMode: String): CarbonStreamSparkStreamingWriter = { + if (mode == SaveMode.ErrorIfExists) { + mode = saveMode.toLowerCase(util.Locale.ROOT) match { + case "overwrite" => SaveMode.Overwrite + case "append" => SaveMode.Append + case "ignore" => SaveMode.Ignore + case "error" | "default" => SaveMode.ErrorIfExists + case _ => throw new IllegalArgumentException(s"Unknown save mode: $saveMode. " + + "Accepted save modes are 'overwrite', 'append', 'ignore', 'error'.") + } + } + this + } + + /** + * Adds an output option + */ + def option(key: String, value: String): CarbonStreamSparkStreamingWriter = { + if (!extraOptions.contains(key)) { + extraOptions += (key -> value) + } + this + } + + /** + * Adds an output option + */ + def option(key: String, value: Boolean): CarbonStreamSparkStreamingWriter = + option(key, value.toString) + + /** + * Adds an output option + */ + def option(key: String, value: Long): CarbonStreamSparkStreamingWriter = + option(key, value.toString) + + /** + * Adds an output option + */ + def option(key: String, value: Double): CarbonStreamSparkStreamingWriter = + option(key, value.toString) +} + +object CarbonStreamSparkStreaming { + + @transient private val tableMap = + new util.HashMap[String, CarbonStreamSparkStreamingWriter]() + + def getTableMap: util.Map[String, CarbonStreamSparkStreamingWriter] = tableMap + + /** + * remove all stream lock. + */ + def cleanAllLockAfterStop(): Unit = { + tableMap.asScala.values.foreach { writer => writer.unLockStreamTable() } + tableMap.clear() + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/CarbonStreamingQueryListener.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/CarbonStreamingQueryListener.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/CarbonStreamingQueryListener.scala new file mode 100644 index 0000000..6d83fad --- /dev/null +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/CarbonStreamingQueryListener.scala @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.streaming + +import java.util +import java.util.UUID + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.execution.streaming.{CarbonAppendableStreamSink, StreamExecution} +import org.apache.spark.sql.streaming.StreamingQueryListener + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock, LockUsage} + +class CarbonStreamingQueryListener(spark: SparkSession) extends StreamingQueryListener { + + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + + private val cache = new util.HashMap[UUID, ICarbonLock]() + + override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = { + val streamQuery = spark.streams.get(event.id) + val qry = if (streamQuery.isInstanceOf[StreamExecution]) { + // adapt spark 2.1 + streamQuery.asInstanceOf[StreamExecution] + } else { + // adapt spark 2.2 and later version + val clazz = Class.forName("org.apache.spark.sql.execution.streaming.StreamingQueryWrapper") + val method = clazz.getMethod("streamingQuery") + method.invoke(streamQuery).asInstanceOf[StreamExecution] + } + if (qry.sink.isInstanceOf[CarbonAppendableStreamSink]) { + LOGGER.info("Carbon streaming query started: " + event.id) + val sink = qry.sink.asInstanceOf[CarbonAppendableStreamSink] + val carbonTable = sink.carbonTable + val lock = CarbonLockFactory.getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier, + LockUsage.STREAMING_LOCK) + if (lock.lockWithRetries()) { + LOGGER.info("Acquired the lock for stream table: " + carbonTable.getDatabaseName + "." + + carbonTable.getTableName) + cache.put(event.id, lock) + } else { + LOGGER.error("Not able to acquire the lock for stream table:" + + carbonTable.getDatabaseName + "." + carbonTable.getTableName) + throw new InterruptedException( + "Not able to acquire the lock for stream table: " + carbonTable.getDatabaseName + "." + + carbonTable.getTableName) + } + } + } + + override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = { + } + + override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = { + val lock = cache.remove(event.id) + if (null != lock) { + LOGGER.info("Carbon streaming query: " + event.id) + lock.unlock() + } + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala new file mode 100644 index 0000000..bc7b042 --- /dev/null +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.streaming + +import scala.collection.JavaConverters._ + +import org.apache.hadoop.conf.Configuration +import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd} +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.execution.streaming.{CarbonAppendableStreamSink, Sink} + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.dictionary.server.{DictionaryServer, NonSecureDictionaryServer} +import org.apache.carbondata.core.dictionary.service.NonSecureDictionaryServiceProvider +import org.apache.carbondata.core.metadata.encoder.Encoding +import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.events.{OperationContext, OperationListenerBus} +import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePostExecutionEvent, LoadTablePreExecutionEvent} +import org.apache.carbondata.processing.loading.model.{CarbonLoadModel, CarbonLoadModelBuilder, LoadOption} +import org.apache.carbondata.spark.dictionary.provider.SecureDictionaryServiceProvider +import org.apache.carbondata.spark.dictionary.server.SecureDictionaryServer +import org.apache.carbondata.streaming.segment.StreamSegment + +/** + * Stream sink factory + */ +object StreamSinkFactory { + + def createStreamTableSink( + sparkSession: SparkSession, + hadoopConf: Configuration, + carbonTable: CarbonTable, + parameters: Map[String, String]): Sink = { + validateParameters(parameters) + + // build load model + val carbonLoadModel = buildCarbonLoadModelForStream( + sparkSession, + hadoopConf, + carbonTable, + parameters, + "") + // fire pre event before streamin is started + // in case of streaming options and optionsFinal can be same + val operationContext = new OperationContext + val loadTablePreExecutionEvent = new LoadTablePreExecutionEvent( + carbonTable.getCarbonTableIdentifier, + carbonLoadModel, + carbonLoadModel.getFactFilePath, + false, + parameters.asJava, + parameters.asJava, + false + ) + OperationListenerBus.getInstance().fireEvent(loadTablePreExecutionEvent, operationContext) + // prepare the stream segment + val segmentId = getStreamSegmentId(carbonTable) + carbonLoadModel.setSegmentId(segmentId) + + // start server if necessary + val server = startDictionaryServer( + sparkSession, + carbonTable, + carbonLoadModel) + if (server.isDefined) { + carbonLoadModel.setUseOnePass(true) + } else { + carbonLoadModel.setUseOnePass(false) + } + // default is carbon appended stream sink + val carbonAppendableStreamSink = new CarbonAppendableStreamSink( + sparkSession, + carbonTable, + segmentId, + parameters, + carbonLoadModel, + server) + + // fire post event before streamin is started + val loadTablePostExecutionEvent = new LoadTablePostExecutionEvent( + carbonTable.getCarbonTableIdentifier, + carbonLoadModel + ) + OperationListenerBus.getInstance().fireEvent(loadTablePostExecutionEvent, operationContext) + carbonAppendableStreamSink + } + + private def validateParameters(parameters: Map[String, String]): Unit = { + val segmentSize = parameters.get(CarbonCommonConstants.HANDOFF_SIZE) + if (segmentSize.isDefined) { + try { + val value = java.lang.Long.parseLong(segmentSize.get) + if (value < CarbonCommonConstants.HANDOFF_SIZE_MIN) { + new CarbonStreamException(CarbonCommonConstants.HANDOFF_SIZE + + "should be bigger than or equal " + + CarbonCommonConstants.HANDOFF_SIZE_MIN) + } + } catch { + case _: NumberFormatException => + new CarbonStreamException(CarbonCommonConstants.HANDOFF_SIZE + + s" $segmentSize is an illegal number") + } + } + } + + /** + * get current stream segment id + * @return + */ + private def getStreamSegmentId(carbonTable: CarbonTable): String = { + val segmentId = StreamSegment.open(carbonTable) + val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentId) + val fileType = FileFactory.getFileType(segmentDir) + if (!FileFactory.isFileExist(segmentDir, fileType)) { + // Create table directory path, in case of enabling hive metastore first load may not have + // table folder created. + FileFactory.mkdirs(segmentDir, fileType) + } + if (FileFactory.isFileExist(segmentDir, fileType)) { + // recover fault + StreamSegment.recoverSegmentIfRequired(segmentDir) + } else { + FileFactory.mkdirs(segmentDir, fileType) + } + segmentId + } + + def startDictionaryServer( + sparkSession: SparkSession, + carbonTable: CarbonTable, + carbonLoadModel: CarbonLoadModel): Option[DictionaryServer] = { + // start dictionary server when use one pass load and dimension with DICTIONARY + // encoding is present. + val allDimensions = carbonTable.getAllDimensions.asScala.toList + val createDictionary = allDimensions.exists { + carbonDimension => carbonDimension.hasEncoding(Encoding.DICTIONARY) && + !carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY) + } + val carbonSecureModeDictServer = CarbonProperties.getInstance. + getProperty(CarbonCommonConstants.CARBON_SECURE_DICTIONARY_SERVER, + CarbonCommonConstants.CARBON_SECURE_DICTIONARY_SERVER_DEFAULT) + + val sparkConf = sparkSession.sqlContext.sparkContext.getConf + val sparkDriverHost = sparkSession.sqlContext.sparkContext. + getConf.get("spark.driver.host") + + val server: Option[DictionaryServer] = if (createDictionary) { + if (sparkConf.get("spark.authenticate", "false").equalsIgnoreCase("true") && + carbonSecureModeDictServer.toBoolean) { + val dictionaryServer = SecureDictionaryServer.getInstance(sparkConf, + sparkDriverHost.toString, carbonLoadModel.getDictionaryServerPort, carbonTable) + carbonLoadModel.setDictionaryServerPort(dictionaryServer.getPort) + carbonLoadModel.setDictionaryServerHost(dictionaryServer.getHost) + carbonLoadModel.setDictionaryServerSecretKey(dictionaryServer.getSecretKey) + carbonLoadModel.setDictionaryEncryptServerSecure(dictionaryServer.isEncryptSecureServer) + carbonLoadModel.setDictionaryServiceProvider(new SecureDictionaryServiceProvider()) + sparkSession.sparkContext.addSparkListener(new SparkListener() { + override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) { + dictionaryServer.shutdown() + } + }) + Some(dictionaryServer) + } else { + val dictionaryServer = NonSecureDictionaryServer + .getInstance(carbonLoadModel.getDictionaryServerPort, carbonTable) + carbonLoadModel.setDictionaryServerPort(dictionaryServer.getPort) + carbonLoadModel.setDictionaryServerHost(dictionaryServer.getHost) + carbonLoadModel.setDictionaryEncryptServerSecure(false) + carbonLoadModel + .setDictionaryServiceProvider(new NonSecureDictionaryServiceProvider(dictionaryServer + .getPort)) + sparkSession.sparkContext.addSparkListener(new SparkListener() { + override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) { + dictionaryServer.shutdown() + } + }) + Some(dictionaryServer) + } + } else { + None + } + server + } + + private def buildCarbonLoadModelForStream( + sparkSession: SparkSession, + hadoopConf: Configuration, + carbonTable: CarbonTable, + parameters: Map[String, String], + segmentId: String): CarbonLoadModel = { + val carbonProperty: CarbonProperties = CarbonProperties.getInstance() + carbonProperty.addProperty("zookeeper.enable.lock", "false") + val optionsFinal = LoadOption.fillOptionWithDefaultValue(parameters.asJava) + optionsFinal.put("sort_scope", "no_sort") + if (parameters.get("fileheader").isEmpty) { + optionsFinal.put("fileheader", carbonTable.getCreateOrderColumn(carbonTable.getTableName) + .asScala.map(_.getColName).mkString(",")) + } + val carbonLoadModel = new CarbonLoadModel() + new CarbonLoadModelBuilder(carbonTable).build( + parameters.asJava, + optionsFinal, + carbonLoadModel, + hadoopConf) + carbonLoadModel.setSegmentId(segmentId) + // stream should use one pass + val dictionaryServerPort = parameters.getOrElse( + CarbonCommonConstants.DICTIONARY_SERVER_PORT, + carbonProperty.getProperty( + CarbonCommonConstants.DICTIONARY_SERVER_PORT, + CarbonCommonConstants.DICTIONARY_SERVER_PORT_DEFAULT)) + val sparkDriverHost = sparkSession.sqlContext.sparkContext. + getConf.get("spark.driver.host") + carbonLoadModel.setDictionaryServerHost(sparkDriverHost) + carbonLoadModel.setDictionaryServerPort(dictionaryServerPort.toInt) + carbonLoadModel + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala new file mode 100644 index 0000000..402bc4b --- /dev/null +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala @@ -0,0 +1,362 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.util.Date + +import scala.collection.JavaConverters._ + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.mapreduce._ +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl +import org.apache.spark.TaskContext +import org.apache.spark.internal.io.FileCommitProtocol +import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.{QueryExecution, SQLExecution} +import org.apache.spark.sql.types.StructType +import org.apache.spark.util.{SerializableConfiguration, Utils} + +import org.apache.carbondata.common.CarbonIterator +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.dictionary.server.DictionaryServer +import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.core.stats.QueryStatistic +import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.events.{OperationContext, OperationListenerBus} +import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil +import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants +import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePostExecutionEvent, LoadTablePreExecutionEvent} +import org.apache.carbondata.processing.loading.model.CarbonLoadModel +import org.apache.carbondata.spark.rdd.StreamHandoffRDD +import org.apache.carbondata.streaming.{CarbonStreamException, CarbonStreamOutputFormat} +import org.apache.carbondata.streaming.parser.CarbonStreamParser +import org.apache.carbondata.streaming.segment.StreamSegment + +/** + * an implement of stream sink, it persist each batch to disk by appending the batch data to + * data files. + */ +class CarbonAppendableStreamSink( + sparkSession: SparkSession, + val carbonTable: CarbonTable, + var currentSegmentId: String, + parameters: Map[String, String], + carbonLoadModel: CarbonLoadModel, + server: Option[DictionaryServer]) extends Sink { + + private val fileLogPath = CarbonTablePath.getStreamingLogDir(carbonTable.getTablePath) + private val fileLog = new FileStreamSinkLog(FileStreamSinkLog.VERSION, sparkSession, fileLogPath) + // prepare configuration + private val hadoopConf = { + val conf = sparkSession.sessionState.newHadoopConf() + // put all parameters into hadoopConf + parameters.foreach { entry => + conf.set(entry._1, entry._2) + } + // properties below will be used for default CarbonStreamParser + conf.set("carbon_complex_delimiter_level_1", + carbonLoadModel.getComplexDelimiterLevel1) + conf.set("carbon_complex_delimiter_level_2", + carbonLoadModel.getComplexDelimiterLevel2) + conf.set( + DataLoadProcessorConstants.SERIALIZATION_NULL_FORMAT, + carbonLoadModel.getSerializationNullFormat().split(",")(1)) + conf.set( + CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, + carbonLoadModel.getTimestampformat()) + conf.set( + CarbonCommonConstants.CARBON_DATE_FORMAT, + carbonLoadModel.getDateFormat()) + conf + } + // segment max size(byte) + private val segmentMaxSize = hadoopConf.getLong( + CarbonCommonConstants.HANDOFF_SIZE, + CarbonProperties.getInstance().getHandoffSize + ) + + // auto handoff + private val enableAutoHandoff = hadoopConf.getBoolean( + CarbonCommonConstants.ENABLE_AUTO_HANDOFF, + CarbonProperties.getInstance().isEnableAutoHandoff + ) + + override def addBatch(batchId: Long, data: DataFrame): Unit = { + if (batchId <= fileLog.getLatest().map(_._1).getOrElse(-1L)) { + CarbonAppendableStreamSink.LOGGER.info(s"Skipping already committed batch $batchId") + } else { + + val statistic = new QueryStatistic() + + // fire pre event on every batch add + // in case of streaming options and optionsFinal can be same + val operationContext = new OperationContext + val loadTablePreExecutionEvent = new LoadTablePreExecutionEvent( + carbonTable.getCarbonTableIdentifier, + carbonLoadModel, + carbonLoadModel.getFactFilePath, + false, + parameters.asJava, + parameters.asJava, + false + ) + OperationListenerBus.getInstance().fireEvent(loadTablePreExecutionEvent, operationContext) + checkOrHandOffSegment() + + // committer will record how this spark job commit its output + val committer = FileCommitProtocol.instantiate( + className = sparkSession.sessionState.conf.streamingFileCommitProtocolClass, + jobId = batchId.toString, + outputPath = fileLogPath, + isAppend = false) + + committer match { + case manifestCommitter: ManifestFileCommitProtocol => + manifestCommitter.setupManifestOptions(fileLog, batchId) + case _ => // Do nothing + } + + CarbonAppendableStreamSink.writeDataFileJob( + sparkSession, + carbonTable, + parameters, + batchId, + currentSegmentId, + data.queryExecution, + committer, + hadoopConf, + carbonLoadModel, + server) + // fire post event on every batch add + val loadTablePostExecutionEvent = new LoadTablePostExecutionEvent( + carbonTable.getCarbonTableIdentifier, + carbonLoadModel + ) + OperationListenerBus.getInstance().fireEvent(loadTablePostExecutionEvent, operationContext) + + statistic.addStatistics(s"add batch: $batchId", System.currentTimeMillis()) + CarbonAppendableStreamSink.LOGGER.info( + s"${statistic.getMessage}, taken time(ms): ${statistic.getTimeTaken}") + } + } + + /** + * if the directory size of current segment beyond the threshold, hand off new segment + */ + private def checkOrHandOffSegment(): Unit = { + val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, currentSegmentId) + val fileType = FileFactory.getFileType(segmentDir) + if (segmentMaxSize <= StreamSegment.size(segmentDir)) { + val newSegmentId = StreamSegment.close(carbonTable, currentSegmentId) + currentSegmentId = newSegmentId + val newSegmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, currentSegmentId) + FileFactory.mkdirs(newSegmentDir, fileType) + + // TODO trigger hand off operation + if (enableAutoHandoff) { + StreamHandoffRDD.startStreamingHandoffThread( + carbonLoadModel, + sparkSession, + false) + } + } + } +} + +object CarbonAppendableStreamSink { + + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + + /** + * package the hadoop configuration and it will be passed to executor side from driver side + */ + case class WriteDataFileJobDescription( + serializableHadoopConf: SerializableConfiguration, + batchId: Long, + segmentId: String) + + /** + * Run a spark job to append the newly arrived data to the existing row format + * file directly. + * If there are failure in the task, spark will re-try the task and + * carbon will do recovery by HDFS file truncate. (see StreamSegment.tryRecoverFromTaskFault) + * If there are job level failure, every files in the stream segment will do truncate + * if necessary. (see StreamSegment.tryRecoverFromJobFault) + */ + def writeDataFileJob( + sparkSession: SparkSession, + carbonTable: CarbonTable, + parameters: Map[String, String], + batchId: Long, + segmentId: String, + queryExecution: QueryExecution, + committer: FileCommitProtocol, + hadoopConf: Configuration, + carbonLoadModel: CarbonLoadModel, + server: Option[DictionaryServer]): Unit = { + + // create job + val job = Job.getInstance(hadoopConf) + job.setOutputKeyClass(classOf[Void]) + job.setOutputValueClass(classOf[InternalRow]) + val jobId = CarbonInputFormatUtil.getJobId(new Date, batchId.toInt) + job.setJobID(jobId) + + val description = WriteDataFileJobDescription( + serializableHadoopConf = new SerializableConfiguration(job.getConfiguration), + batchId, + segmentId + ) + + // run write data file job + SQLExecution.withNewExecutionId(sparkSession, queryExecution) { + var result: Array[TaskCommitMessage] = null + try { + committer.setupJob(job) + // initialize dictionary server + if (server.isDefined) { + server.get.initializeDictionaryGenerator(carbonTable) + } + + val rowSchema = queryExecution.analyzed.schema + // write data file + result = sparkSession.sparkContext.runJob(queryExecution.toRdd, + (taskContext: TaskContext, iterator: Iterator[InternalRow]) => { + writeDataFileTask( + description, + carbonLoadModel, + sparkStageId = taskContext.stageId(), + sparkPartitionId = taskContext.partitionId(), + sparkAttemptNumber = taskContext.attemptNumber(), + committer, + iterator, + rowSchema + ) + }) + + // write dictionary + if (server.isDefined) { + try { + server.get.writeTableDictionary(carbonTable.getCarbonTableIdentifier.getTableId) + } catch { + case _: Exception => + LOGGER.error( + s"Error while writing dictionary file for ${carbonTable.getTableUniqueName}") + throw new Exception( + "Streaming ingest failed due to error while writing dictionary file") + } + } + + // update data file info in index file + StreamSegment.updateIndexFile( + CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentId)) + + } catch { + // catch fault of executor side + case t: Throwable => + val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentId) + StreamSegment.recoverSegmentIfRequired(segmentDir) + LOGGER.error(t, s"Aborting job ${ job.getJobID }.") + committer.abortJob(job) + throw new CarbonStreamException("Job failed to write data file", t) + } + committer.commitJob(job, result) + LOGGER.info(s"Job ${ job.getJobID } committed.") + } + } + + /** + * execute a task for each partition to write a data file + */ + def writeDataFileTask( + description: WriteDataFileJobDescription, + carbonLoadModel: CarbonLoadModel, + sparkStageId: Int, + sparkPartitionId: Int, + sparkAttemptNumber: Int, + committer: FileCommitProtocol, + iterator: Iterator[InternalRow], + rowSchema: StructType + ): TaskCommitMessage = { + + val jobId = CarbonInputFormatUtil.getJobId(new Date, sparkStageId) + val taskId = new TaskID(jobId, TaskType.MAP, sparkPartitionId) + val taskAttemptId = new TaskAttemptID(taskId, sparkAttemptNumber) + + // Set up the attempt context required to use in the output committer. + val taskAttemptContext: TaskAttemptContext = { + // Set up the configuration object + val hadoopConf = description.serializableHadoopConf.value + CarbonStreamOutputFormat.setSegmentId(hadoopConf, description.segmentId) + hadoopConf.set("mapred.job.id", jobId.toString) + hadoopConf.set("mapred.tip.id", taskAttemptId.getTaskID.toString) + hadoopConf.set("mapred.task.id", taskAttemptId.toString) + hadoopConf.setBoolean("mapred.task.is.map", true) + hadoopConf.setInt("mapred.task.partition", 0) + new TaskAttemptContextImpl(hadoopConf, taskAttemptId) + } + + committer.setupTask(taskAttemptContext) + + try { + Utils.tryWithSafeFinallyAndFailureCallbacks(block = { + + val parserName = taskAttemptContext.getConfiguration.get( + CarbonStreamParser.CARBON_STREAM_PARSER, + CarbonStreamParser.CARBON_STREAM_PARSER_DEFAULT) + + val streamParser = + Class.forName(parserName).newInstance.asInstanceOf[CarbonStreamParser] + streamParser.initialize(taskAttemptContext.getConfiguration, rowSchema) + + StreamSegment.appendBatchData(new InputIterator(iterator, streamParser), + taskAttemptContext, carbonLoadModel) + })(catchBlock = { + committer.abortTask(taskAttemptContext) + LOGGER.error(s"Job $jobId aborted.") + }) + committer.commitTask(taskAttemptContext) + } catch { + case t: Throwable => + throw new CarbonStreamException("Task failed while writing rows", t) + } + } + + /** + * convert spark iterator to carbon iterator, so that java module can use it. + */ + class InputIterator(rddIter: Iterator[InternalRow], streamParser: CarbonStreamParser) + extends CarbonIterator[Array[Object]] { + + override def hasNext: Boolean = rddIter.hasNext + + override def next: Array[Object] = { + streamParser.parserRow(rddIter.next()) + } + + override def close(): Unit = { + streamParser.close() + } + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/integration/spark-common/src/main/scala/org/apache/spark/sql/test/TestQueryExecutor.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/test/TestQueryExecutor.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/test/TestQueryExecutor.scala index f582145..34f901f 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/sql/test/TestQueryExecutor.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/test/TestQueryExecutor.scala @@ -124,7 +124,9 @@ object TestQueryExecutor { TestQueryExecutor.projectPath + "/processing/target", TestQueryExecutor.projectPath + "/integration/spark-common/target", TestQueryExecutor.projectPath + "/integration/spark2/target", - TestQueryExecutor.projectPath + "/integration/spark-common/target/jars") + TestQueryExecutor.projectPath + "/integration/spark-common/target/jars", + TestQueryExecutor.projectPath + "/streaming/target") + lazy val jars = { val jarsLocal = new ArrayBuffer[String]() modules.foreach { path => http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/integration/spark2/pom.xml ---------------------------------------------------------------------- diff --git a/integration/spark2/pom.xml b/integration/spark2/pom.xml index 13b3d8d..e4593be 100644 --- a/integration/spark2/pom.xml +++ b/integration/spark2/pom.xml @@ -36,7 +36,7 @@ <dependencies> <dependency> <groupId>org.apache.carbondata</groupId> - <artifactId>carbondata-streaming</artifactId> + <artifactId>carbondata-spark-common</artifactId> <version>${project.version}</version> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala index f6bdff6..1038fcf 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala @@ -27,7 +27,6 @@ import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd} import org.apache.spark.sql.SparkSession.Builder import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.catalyst.plans.logical.{Command, LocalRelation, Union} -import org.apache.spark.sql.execution.streaming.CarbonStreamingQueryListener import org.apache.spark.sql.hive.execution.command.CarbonSetCommand import org.apache.spark.sql.internal.{SessionState, SharedState} import org.apache.spark.sql.profiler.{Profiler, SQLStart} @@ -36,6 +35,7 @@ import org.apache.spark.util.{CarbonReflectionUtils, Utils} import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.util.{CarbonProperties, CarbonSessionInfo, ThreadLocalSessionInfo} import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil +import org.apache.carbondata.streaming.CarbonStreamingQueryListener /** * Session implementation for {org.apache.spark.sql.SparkSession} http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala index 5183b02..e60a583 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala @@ -46,8 +46,7 @@ import org.apache.carbondata.events._ import org.apache.carbondata.processing.loading.events.LoadEvents.LoadMetadataEvent import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel} import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CompactionType} -import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory -import org.apache.carbondata.streaming.StreamHandoffRDD +import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, StreamHandoffRDD} import org.apache.carbondata.streaming.segment.StreamSegment /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/streaming/pom.xml ---------------------------------------------------------------------- diff --git a/streaming/pom.xml b/streaming/pom.xml index 01affec..b8c447d 100644 --- a/streaming/pom.xml +++ b/streaming/pom.xml @@ -22,11 +22,16 @@ <dependencies> <dependency> <groupId>org.apache.carbondata</groupId> - <artifactId>carbondata-spark-common</artifactId> + <artifactId>carbondata-hadoop</artifactId> <version>${project.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> + <artifactId>spark-sql_${scala.binary.version}</artifactId> + <version>${spark.version}</version> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_${scala.binary.version}</artifactId> <version>${spark.version}</version> <scope>${spark.deps.scope}</scope> @@ -44,7 +49,7 @@ </dependencies> <build> - <testSourceDirectory>src/test/scala</testSourceDirectory> + <testSourceDirectory>src/test/java</testSourceDirectory> <resources> <resource> <directory>src/resources</directory> http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamInputFormat.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamInputFormat.java b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamInputFormat.java new file mode 100644 index 0000000..66d89c8 --- /dev/null +++ b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamInputFormat.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.streaming; + +import java.io.IOException; + +import org.apache.carbondata.core.cache.Cache; +import org.apache.carbondata.core.cache.dictionary.Dictionary; +import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.metadata.datatype.DataType; +import org.apache.carbondata.core.metadata.datatype.DataTypes; +import org.apache.carbondata.core.metadata.encoder.Encoding; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; +import org.apache.carbondata.core.scan.complextypes.ArrayQueryType; +import org.apache.carbondata.core.scan.complextypes.PrimitiveQueryType; +import org.apache.carbondata.core.scan.complextypes.StructQueryType; +import org.apache.carbondata.core.scan.filter.GenericQueryType; +import org.apache.carbondata.core.util.CarbonUtil; + +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; + +/** + * Stream input format + */ +public class CarbonStreamInputFormat extends FileInputFormat<Void, Object> { + + public static final String READ_BUFFER_SIZE = "carbon.stream.read.buffer.size"; + public static final String READ_BUFFER_SIZE_DEFAULT = "65536"; + + @Override public RecordReader<Void, Object> createRecordReader(InputSplit split, + TaskAttemptContext context) throws IOException, InterruptedException { + return new CarbonStreamRecordReader(); + } + + public static GenericQueryType[] getComplexDimensions(CarbonTable carbontable, + CarbonColumn[] carbonColumns, Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache) + throws IOException { + GenericQueryType[] queryTypes = new GenericQueryType[carbonColumns.length]; + for (int i = 0; i < carbonColumns.length; i++) { + if (carbonColumns[i].isComplex()) { + if (DataTypes.isArrayType(carbonColumns[i].getDataType())) { + queryTypes[i] = new ArrayQueryType(carbonColumns[i].getColName(), + carbonColumns[i].getColName(), i); + } else if (DataTypes.isStructType(carbonColumns[i].getDataType())) { + queryTypes[i] = new StructQueryType(carbonColumns[i].getColName(), + carbonColumns[i].getColName(), i); + } else { + throw new UnsupportedOperationException( + carbonColumns[i].getDataType().getName() + " is not supported"); + } + + fillChildren(carbontable, queryTypes[i], (CarbonDimension) carbonColumns[i], i, cache); + } + } + + return queryTypes; + } + + private static void fillChildren(CarbonTable carbontable, GenericQueryType parentQueryType, + CarbonDimension dimension, int parentBlockIndex, + Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache) throws IOException { + for (int i = 0; i < dimension.getNumberOfChild(); i++) { + CarbonDimension child = dimension.getListOfChildDimensions().get(i); + DataType dataType = child.getDataType(); + GenericQueryType queryType = null; + if (DataTypes.isArrayType(dataType)) { + queryType = + new ArrayQueryType(child.getColName(), dimension.getColName(), ++parentBlockIndex); + + } else if (DataTypes.isStructType(dataType)) { + queryType = + new StructQueryType(child.getColName(), dimension.getColName(), ++parentBlockIndex); + parentQueryType.addChildren(queryType); + } else { + boolean isDirectDictionary = + CarbonUtil.hasEncoding(child.getEncoder(), Encoding.DIRECT_DICTIONARY); + String dictionaryPath = carbontable.getTableInfo().getFactTable().getTableProperties() + .get(CarbonCommonConstants.DICTIONARY_PATH); + DictionaryColumnUniqueIdentifier dictionarIdentifier = + new DictionaryColumnUniqueIdentifier(carbontable.getAbsoluteTableIdentifier(), + child.getColumnIdentifier(), child.getDataType(), dictionaryPath); + + queryType = + new PrimitiveQueryType(child.getColName(), dimension.getColName(), ++parentBlockIndex, + child.getDataType(), 4, cache.get(dictionarIdentifier), + isDirectDictionary); + } + parentQueryType.addChildren(queryType); + if (child.getNumberOfChild() > 0) { + fillChildren(carbontable, queryType, child, parentBlockIndex, cache); + } + } + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamOutputFormat.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamOutputFormat.java b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamOutputFormat.java new file mode 100644 index 0000000..f9f0d76 --- /dev/null +++ b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamOutputFormat.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.streaming; + +import java.io.IOException; +import java.nio.charset.Charset; + +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.hadoop.util.ObjectSerializationUtil; +import org.apache.carbondata.processing.loading.model.CarbonLoadModel; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; + +/** + * Stream output format + */ +public class CarbonStreamOutputFormat extends FileOutputFormat<Void, Object> { + + static final byte[] CARBON_SYNC_MARKER = + "@carbondata_sync".getBytes(Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)); + + public static final String CARBON_ENCODER_ROW_BUFFER_SIZE = "carbon.stream.row.buffer.size"; + + public static final int CARBON_ENCODER_ROW_BUFFER_SIZE_DEFAULT = 1024; + + public static final String CARBON_STREAM_BLOCKLET_ROW_NUMS = "carbon.stream.blocklet.row.nums"; + + public static final int CARBON_STREAM_BLOCKLET_ROW_NUMS_DEFAULT = 32000; + + public static final String CARBON_STREAM_CACHE_SIZE = "carbon.stream.cache.size"; + + public static final int CARBON_STREAM_CACHE_SIZE_DEFAULT = 32 * 1024 * 1024; + + private static final String LOAD_Model = "mapreduce.output.carbon.load.model"; + + private static final String SEGMENT_ID = "carbon.segment.id"; + + @Override public RecordWriter<Void, Object> getRecordWriter(TaskAttemptContext job) + throws IOException, InterruptedException { + return new CarbonStreamRecordWriter(job); + } + + public static void setCarbonLoadModel(Configuration hadoopConf, CarbonLoadModel carbonLoadModel) + throws IOException { + if (carbonLoadModel != null) { + hadoopConf.set(LOAD_Model, ObjectSerializationUtil.convertObjectToString(carbonLoadModel)); + } + } + + public static CarbonLoadModel getCarbonLoadModel(Configuration hadoopConf) throws IOException { + String value = hadoopConf.get(LOAD_Model); + if (value == null) { + return null; + } else { + return (CarbonLoadModel) ObjectSerializationUtil.convertStringToObject(value); + } + } + + public static void setSegmentId(Configuration hadoopConf, String segmentId) throws IOException { + if (segmentId != null) { + hadoopConf.set(SEGMENT_ID, segmentId); + } + } + + public static String getSegmentId(Configuration hadoopConf) throws IOException { + return hadoopConf.get(SEGMENT_ID); + } + +}
