Repository: carbondata Updated Branches: refs/heads/master 2e1ddb542 -> c723947a7
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala deleted file mode 100644 index bc7b042..0000000 --- a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala +++ /dev/null @@ -1,236 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.streaming - -import scala.collection.JavaConverters._ - -import org.apache.hadoop.conf.Configuration -import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd} -import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.execution.streaming.{CarbonAppendableStreamSink, Sink} - -import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.datastore.impl.FileFactory -import org.apache.carbondata.core.dictionary.server.{DictionaryServer, NonSecureDictionaryServer} -import org.apache.carbondata.core.dictionary.service.NonSecureDictionaryServiceProvider -import org.apache.carbondata.core.metadata.encoder.Encoding -import org.apache.carbondata.core.metadata.schema.table.CarbonTable -import org.apache.carbondata.core.util.CarbonProperties -import org.apache.carbondata.core.util.path.CarbonTablePath -import org.apache.carbondata.events.{OperationContext, OperationListenerBus} -import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePostExecutionEvent, LoadTablePreExecutionEvent} -import org.apache.carbondata.processing.loading.model.{CarbonLoadModel, CarbonLoadModelBuilder, LoadOption} -import org.apache.carbondata.spark.dictionary.provider.SecureDictionaryServiceProvider -import org.apache.carbondata.spark.dictionary.server.SecureDictionaryServer -import org.apache.carbondata.streaming.segment.StreamSegment - -/** - * Stream sink factory - */ -object StreamSinkFactory { - - def createStreamTableSink( - sparkSession: SparkSession, - hadoopConf: Configuration, - carbonTable: CarbonTable, - parameters: Map[String, String]): Sink = { - validateParameters(parameters) - - // build load model - val carbonLoadModel = buildCarbonLoadModelForStream( - sparkSession, - hadoopConf, - carbonTable, - parameters, - "") - // fire pre event before streamin is started - // in case of streaming options and optionsFinal can be same - val operationContext = new OperationContext - val loadTablePreExecutionEvent = new LoadTablePreExecutionEvent( - carbonTable.getCarbonTableIdentifier, - carbonLoadModel, - carbonLoadModel.getFactFilePath, - false, - parameters.asJava, - parameters.asJava, - false - ) - OperationListenerBus.getInstance().fireEvent(loadTablePreExecutionEvent, operationContext) - // prepare the stream segment - val segmentId = getStreamSegmentId(carbonTable) - carbonLoadModel.setSegmentId(segmentId) - - // start server if necessary - val server = startDictionaryServer( - sparkSession, - carbonTable, - carbonLoadModel) - if (server.isDefined) { - carbonLoadModel.setUseOnePass(true) - } else { - carbonLoadModel.setUseOnePass(false) - } - // default is carbon appended stream sink - val carbonAppendableStreamSink = new CarbonAppendableStreamSink( - sparkSession, - carbonTable, - segmentId, - parameters, - carbonLoadModel, - server) - - // fire post event before streamin is started - val loadTablePostExecutionEvent = new LoadTablePostExecutionEvent( - carbonTable.getCarbonTableIdentifier, - carbonLoadModel - ) - OperationListenerBus.getInstance().fireEvent(loadTablePostExecutionEvent, operationContext) - carbonAppendableStreamSink - } - - private def validateParameters(parameters: Map[String, String]): Unit = { - val segmentSize = parameters.get(CarbonCommonConstants.HANDOFF_SIZE) - if (segmentSize.isDefined) { - try { - val value = java.lang.Long.parseLong(segmentSize.get) - if (value < CarbonCommonConstants.HANDOFF_SIZE_MIN) { - new CarbonStreamException(CarbonCommonConstants.HANDOFF_SIZE + - "should be bigger than or equal " + - CarbonCommonConstants.HANDOFF_SIZE_MIN) - } - } catch { - case _: NumberFormatException => - new CarbonStreamException(CarbonCommonConstants.HANDOFF_SIZE + - s" $segmentSize is an illegal number") - } - } - } - - /** - * get current stream segment id - * @return - */ - private def getStreamSegmentId(carbonTable: CarbonTable): String = { - val segmentId = StreamSegment.open(carbonTable) - val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentId) - val fileType = FileFactory.getFileType(segmentDir) - if (!FileFactory.isFileExist(segmentDir, fileType)) { - // Create table directory path, in case of enabling hive metastore first load may not have - // table folder created. - FileFactory.mkdirs(segmentDir, fileType) - } - if (FileFactory.isFileExist(segmentDir, fileType)) { - // recover fault - StreamSegment.recoverSegmentIfRequired(segmentDir) - } else { - FileFactory.mkdirs(segmentDir, fileType) - } - segmentId - } - - def startDictionaryServer( - sparkSession: SparkSession, - carbonTable: CarbonTable, - carbonLoadModel: CarbonLoadModel): Option[DictionaryServer] = { - // start dictionary server when use one pass load and dimension with DICTIONARY - // encoding is present. - val allDimensions = carbonTable.getAllDimensions.asScala.toList - val createDictionary = allDimensions.exists { - carbonDimension => carbonDimension.hasEncoding(Encoding.DICTIONARY) && - !carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY) - } - val carbonSecureModeDictServer = CarbonProperties.getInstance. - getProperty(CarbonCommonConstants.CARBON_SECURE_DICTIONARY_SERVER, - CarbonCommonConstants.CARBON_SECURE_DICTIONARY_SERVER_DEFAULT) - - val sparkConf = sparkSession.sqlContext.sparkContext.getConf - val sparkDriverHost = sparkSession.sqlContext.sparkContext. - getConf.get("spark.driver.host") - - val server: Option[DictionaryServer] = if (createDictionary) { - if (sparkConf.get("spark.authenticate", "false").equalsIgnoreCase("true") && - carbonSecureModeDictServer.toBoolean) { - val dictionaryServer = SecureDictionaryServer.getInstance(sparkConf, - sparkDriverHost.toString, carbonLoadModel.getDictionaryServerPort, carbonTable) - carbonLoadModel.setDictionaryServerPort(dictionaryServer.getPort) - carbonLoadModel.setDictionaryServerHost(dictionaryServer.getHost) - carbonLoadModel.setDictionaryServerSecretKey(dictionaryServer.getSecretKey) - carbonLoadModel.setDictionaryEncryptServerSecure(dictionaryServer.isEncryptSecureServer) - carbonLoadModel.setDictionaryServiceProvider(new SecureDictionaryServiceProvider()) - sparkSession.sparkContext.addSparkListener(new SparkListener() { - override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) { - dictionaryServer.shutdown() - } - }) - Some(dictionaryServer) - } else { - val dictionaryServer = NonSecureDictionaryServer - .getInstance(carbonLoadModel.getDictionaryServerPort, carbonTable) - carbonLoadModel.setDictionaryServerPort(dictionaryServer.getPort) - carbonLoadModel.setDictionaryServerHost(dictionaryServer.getHost) - carbonLoadModel.setDictionaryEncryptServerSecure(false) - carbonLoadModel - .setDictionaryServiceProvider(new NonSecureDictionaryServiceProvider(dictionaryServer - .getPort)) - sparkSession.sparkContext.addSparkListener(new SparkListener() { - override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) { - dictionaryServer.shutdown() - } - }) - Some(dictionaryServer) - } - } else { - None - } - server - } - - private def buildCarbonLoadModelForStream( - sparkSession: SparkSession, - hadoopConf: Configuration, - carbonTable: CarbonTable, - parameters: Map[String, String], - segmentId: String): CarbonLoadModel = { - val carbonProperty: CarbonProperties = CarbonProperties.getInstance() - carbonProperty.addProperty("zookeeper.enable.lock", "false") - val optionsFinal = LoadOption.fillOptionWithDefaultValue(parameters.asJava) - optionsFinal.put("sort_scope", "no_sort") - if (parameters.get("fileheader").isEmpty) { - optionsFinal.put("fileheader", carbonTable.getCreateOrderColumn(carbonTable.getTableName) - .asScala.map(_.getColName).mkString(",")) - } - val carbonLoadModel = new CarbonLoadModel() - new CarbonLoadModelBuilder(carbonTable).build( - parameters.asJava, - optionsFinal, - carbonLoadModel, - hadoopConf) - carbonLoadModel.setSegmentId(segmentId) - // stream should use one pass - val dictionaryServerPort = parameters.getOrElse( - CarbonCommonConstants.DICTIONARY_SERVER_PORT, - carbonProperty.getProperty( - CarbonCommonConstants.DICTIONARY_SERVER_PORT, - CarbonCommonConstants.DICTIONARY_SERVER_PORT_DEFAULT)) - val sparkDriverHost = sparkSession.sqlContext.sparkContext. - getConf.get("spark.driver.host") - carbonLoadModel.setDictionaryServerHost(sparkDriverHost) - carbonLoadModel.setDictionaryServerPort(dictionaryServerPort.toInt) - carbonLoadModel - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/streaming/src/main/scala/org/apache/carbondata/streaming/parser/FieldConverter.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/parser/FieldConverter.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/parser/FieldConverter.scala new file mode 100644 index 0000000..8661417 --- /dev/null +++ b/streaming/src/main/scala/org/apache/carbondata/streaming/parser/FieldConverter.scala @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.streaming.parser + +import java.nio.charset.Charset +import java.text.SimpleDateFormat + +import org.apache.carbondata.core.constants.CarbonCommonConstants + +object FieldConverter { + + /** + * Return a String representation of the input value + * @param value input value + * @param serializationNullFormat string for null value + * @param delimiterLevel1 level 1 delimiter for complex type + * @param delimiterLevel2 level 2 delimiter for complex type + * @param timeStampFormat timestamp format + * @param dateFormat date format + * @param level level for recursive call + */ + def objectToString( + value: Any, + serializationNullFormat: String, + delimiterLevel1: String, + delimiterLevel2: String, + timeStampFormat: SimpleDateFormat, + dateFormat: SimpleDateFormat, + level: Int = 1): String = { + if (value == null) { + serializationNullFormat + } else { + value match { + case s: String => if (s.length > CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT) { + throw new Exception("Dataload failed, String length cannot exceed " + + CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT + " characters") + } else { + s + } + case d: java.math.BigDecimal => d.toPlainString + case i: java.lang.Integer => i.toString + case d: java.lang.Double => d.toString + case t: java.sql.Timestamp => timeStampFormat format t + case d: java.sql.Date => dateFormat format d + case b: java.lang.Boolean => b.toString + case s: java.lang.Short => s.toString + case f: java.lang.Float => f.toString + case bs: Array[Byte] => new String(bs, + Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)) + case s: scala.collection.Seq[Any] => + val delimiter = if (level == 1) { + delimiterLevel1 + } else { + delimiterLevel2 + } + val builder = new StringBuilder() + s.foreach { x => + builder.append(objectToString(x, serializationNullFormat, delimiterLevel1, + delimiterLevel2, timeStampFormat, dateFormat, level + 1)).append(delimiter) + } + builder.substring(0, builder.length - delimiter.length()) + case m: scala.collection.Map[Any, Any] => + throw new Exception("Unsupported data type: Map") + case r: org.apache.spark.sql.Row => + val delimiter = if (level == 1) { + delimiterLevel1 + } else { + delimiterLevel2 + } + val builder = new StringBuilder() + for (i <- 0 until r.length) { + builder.append(objectToString(r(i), serializationNullFormat, delimiterLevel1, + delimiterLevel2, timeStampFormat, dateFormat, level + 1)).append(delimiter) + } + builder.substring(0, builder.length - delimiter.length()) + case other => other.toString + } + } + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/streaming/src/main/scala/org/apache/carbondata/streaming/parser/RowStreamParserImp.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/parser/RowStreamParserImp.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/parser/RowStreamParserImp.scala index 5a227cf..1696fdc 100644 --- a/streaming/src/main/scala/org/apache/carbondata/streaming/parser/RowStreamParserImp.scala +++ b/streaming/src/main/scala/org/apache/carbondata/streaming/parser/RowStreamParserImp.scala @@ -17,6 +17,7 @@ package org.apache.carbondata.streaming.parser +import java.nio.charset.Charset import java.text.SimpleDateFormat import org.apache.hadoop.conf.Configuration @@ -27,7 +28,6 @@ import org.apache.spark.sql.types.StructType import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants -import org.apache.carbondata.spark.util.CarbonScalaUtil /** * SparkSQL Row Stream Parser. @@ -61,12 +61,13 @@ class RowStreamParserImp extends CarbonStreamParser { override def parserRow(value: InternalRow): Array[Object] = { this.encoder.fromRow(value).toSeq.map { x => { - CarbonScalaUtil.getString(x, - serializationNullFormat, complexDelimiterLevel1, complexDelimiterLevel2, + FieldConverter.objectToString( + x, serializationNullFormat, complexDelimiterLevel1, complexDelimiterLevel2, timeStampFormat, dateFormat) } }.toArray } override def close(): Unit = { } + } http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala b/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala deleted file mode 100644 index 6e6d092..0000000 --- a/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala +++ /dev/null @@ -1,362 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.streaming - -import java.util.Date - -import scala.collection.JavaConverters._ - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.mapreduce._ -import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl -import org.apache.spark.TaskContext -import org.apache.spark.internal.io.FileCommitProtocol -import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage -import org.apache.spark.sql.{DataFrame, SparkSession} -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.execution.{QueryExecution, SQLExecution} -import org.apache.spark.sql.types.StructType -import org.apache.spark.util.{SerializableConfiguration, Utils} - -import org.apache.carbondata.common.CarbonIterator -import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.datastore.impl.FileFactory -import org.apache.carbondata.core.dictionary.server.DictionaryServer -import org.apache.carbondata.core.metadata.schema.table.CarbonTable -import org.apache.carbondata.core.stats.QueryStatistic -import org.apache.carbondata.core.util.CarbonProperties -import org.apache.carbondata.core.util.path.CarbonTablePath -import org.apache.carbondata.events.{OperationContext, OperationListenerBus} -import org.apache.carbondata.hadoop.streaming.CarbonStreamOutputFormat -import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil -import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants -import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePostExecutionEvent, LoadTablePreExecutionEvent} -import org.apache.carbondata.processing.loading.model.CarbonLoadModel -import org.apache.carbondata.streaming.{CarbonStreamException, StreamHandoffRDD} -import org.apache.carbondata.streaming.parser.CarbonStreamParser -import org.apache.carbondata.streaming.segment.StreamSegment - -/** - * an implement of stream sink, it persist each batch to disk by appending the batch data to - * data files. - */ -class CarbonAppendableStreamSink( - sparkSession: SparkSession, - val carbonTable: CarbonTable, - var currentSegmentId: String, - parameters: Map[String, String], - carbonLoadModel: CarbonLoadModel, - server: Option[DictionaryServer]) extends Sink { - - private val fileLogPath = CarbonTablePath.getStreamingLogDir(carbonTable.getTablePath) - private val fileLog = new FileStreamSinkLog(FileStreamSinkLog.VERSION, sparkSession, fileLogPath) - // prepare configuration - private val hadoopConf = { - val conf = sparkSession.sessionState.newHadoopConf() - // put all parameters into hadoopConf - parameters.foreach { entry => - conf.set(entry._1, entry._2) - } - // properties below will be used for default CarbonStreamParser - conf.set("carbon_complex_delimiter_level_1", - carbonLoadModel.getComplexDelimiterLevel1) - conf.set("carbon_complex_delimiter_level_2", - carbonLoadModel.getComplexDelimiterLevel2) - conf.set( - DataLoadProcessorConstants.SERIALIZATION_NULL_FORMAT, - carbonLoadModel.getSerializationNullFormat().split(",")(1)) - conf.set( - CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, - carbonLoadModel.getTimestampformat()) - conf.set( - CarbonCommonConstants.CARBON_DATE_FORMAT, - carbonLoadModel.getDateFormat()) - conf - } - // segment max size(byte) - private val segmentMaxSize = hadoopConf.getLong( - CarbonCommonConstants.HANDOFF_SIZE, - CarbonProperties.getInstance().getHandoffSize - ) - - // auto handoff - private val enableAutoHandoff = hadoopConf.getBoolean( - CarbonCommonConstants.ENABLE_AUTO_HANDOFF, - CarbonProperties.getInstance().isEnableAutoHandoff - ) - - override def addBatch(batchId: Long, data: DataFrame): Unit = { - if (batchId <= fileLog.getLatest().map(_._1).getOrElse(-1L)) { - CarbonAppendableStreamSink.LOGGER.info(s"Skipping already committed batch $batchId") - } else { - - val statistic = new QueryStatistic() - - // fire pre event on every batch add - // in case of streaming options and optionsFinal can be same - val operationContext = new OperationContext - val loadTablePreExecutionEvent = new LoadTablePreExecutionEvent( - carbonTable.getCarbonTableIdentifier, - carbonLoadModel, - carbonLoadModel.getFactFilePath, - false, - parameters.asJava, - parameters.asJava, - false - ) - OperationListenerBus.getInstance().fireEvent(loadTablePreExecutionEvent, operationContext) - checkOrHandOffSegment() - - // committer will record how this spark job commit its output - val committer = FileCommitProtocol.instantiate( - className = sparkSession.sessionState.conf.streamingFileCommitProtocolClass, - jobId = batchId.toString, - outputPath = fileLogPath, - isAppend = false) - - committer match { - case manifestCommitter: ManifestFileCommitProtocol => - manifestCommitter.setupManifestOptions(fileLog, batchId) - case _ => // Do nothing - } - - CarbonAppendableStreamSink.writeDataFileJob( - sparkSession, - carbonTable, - parameters, - batchId, - currentSegmentId, - data.queryExecution, - committer, - hadoopConf, - carbonLoadModel, - server) - // fire post event on every batch add - val loadTablePostExecutionEvent = new LoadTablePostExecutionEvent( - carbonTable.getCarbonTableIdentifier, - carbonLoadModel - ) - OperationListenerBus.getInstance().fireEvent(loadTablePostExecutionEvent, operationContext) - - statistic.addStatistics(s"add batch: $batchId", System.currentTimeMillis()) - CarbonAppendableStreamSink.LOGGER.info( - s"${statistic.getMessage}, taken time(ms): ${statistic.getTimeTaken}") - } - } - - /** - * if the directory size of current segment beyond the threshold, hand off new segment - */ - private def checkOrHandOffSegment(): Unit = { - val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, currentSegmentId) - val fileType = FileFactory.getFileType(segmentDir) - if (segmentMaxSize <= StreamSegment.size(segmentDir)) { - val newSegmentId = StreamSegment.close(carbonTable, currentSegmentId) - currentSegmentId = newSegmentId - val newSegmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, currentSegmentId) - FileFactory.mkdirs(newSegmentDir, fileType) - - // TODO trigger hand off operation - if (enableAutoHandoff) { - StreamHandoffRDD.startStreamingHandoffThread( - carbonLoadModel, - sparkSession, - false) - } - } - } -} - -object CarbonAppendableStreamSink { - - private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) - - /** - * package the hadoop configuration and it will be passed to executor side from driver side - */ - case class WriteDataFileJobDescription( - serializableHadoopConf: SerializableConfiguration, - batchId: Long, - segmentId: String) - - /** - * Run a spark job to append the newly arrived data to the existing row format - * file directly. - * If there are failure in the task, spark will re-try the task and - * carbon will do recovery by HDFS file truncate. (see StreamSegment.tryRecoverFromTaskFault) - * If there are job level failure, every files in the stream segment will do truncate - * if necessary. (see StreamSegment.tryRecoverFromJobFault) - */ - def writeDataFileJob( - sparkSession: SparkSession, - carbonTable: CarbonTable, - parameters: Map[String, String], - batchId: Long, - segmentId: String, - queryExecution: QueryExecution, - committer: FileCommitProtocol, - hadoopConf: Configuration, - carbonLoadModel: CarbonLoadModel, - server: Option[DictionaryServer]): Unit = { - - // create job - val job = Job.getInstance(hadoopConf) - job.setOutputKeyClass(classOf[Void]) - job.setOutputValueClass(classOf[InternalRow]) - val jobId = CarbonInputFormatUtil.getJobId(new Date, batchId.toInt) - job.setJobID(jobId) - - val description = WriteDataFileJobDescription( - serializableHadoopConf = new SerializableConfiguration(job.getConfiguration), - batchId, - segmentId - ) - - // run write data file job - SQLExecution.withNewExecutionId(sparkSession, queryExecution) { - var result: Array[TaskCommitMessage] = null - try { - committer.setupJob(job) - // initialize dictionary server - if (server.isDefined) { - server.get.initializeDictionaryGenerator(carbonTable) - } - - val rowSchema = queryExecution.analyzed.schema - // write data file - result = sparkSession.sparkContext.runJob(queryExecution.toRdd, - (taskContext: TaskContext, iterator: Iterator[InternalRow]) => { - writeDataFileTask( - description, - carbonLoadModel, - sparkStageId = taskContext.stageId(), - sparkPartitionId = taskContext.partitionId(), - sparkAttemptNumber = taskContext.attemptNumber(), - committer, - iterator, - rowSchema - ) - }) - - // write dictionary - if (server.isDefined) { - try { - server.get.writeTableDictionary(carbonTable.getCarbonTableIdentifier.getTableId) - } catch { - case _: Exception => - LOGGER.error( - s"Error while writing dictionary file for ${carbonTable.getTableUniqueName}") - throw new Exception( - "Streaming ingest failed due to error while writing dictionary file") - } - } - - // update data file info in index file - StreamSegment.updateIndexFile( - CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentId)) - - } catch { - // catch fault of executor side - case t: Throwable => - val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentId) - StreamSegment.recoverSegmentIfRequired(segmentDir) - LOGGER.error(t, s"Aborting job ${ job.getJobID }.") - committer.abortJob(job) - throw new CarbonStreamException("Job failed to write data file", t) - } - committer.commitJob(job, result) - LOGGER.info(s"Job ${ job.getJobID } committed.") - } - } - - /** - * execute a task for each partition to write a data file - */ - def writeDataFileTask( - description: WriteDataFileJobDescription, - carbonLoadModel: CarbonLoadModel, - sparkStageId: Int, - sparkPartitionId: Int, - sparkAttemptNumber: Int, - committer: FileCommitProtocol, - iterator: Iterator[InternalRow], - rowSchema: StructType - ): TaskCommitMessage = { - - val jobId = CarbonInputFormatUtil.getJobId(new Date, sparkStageId) - val taskId = new TaskID(jobId, TaskType.MAP, sparkPartitionId) - val taskAttemptId = new TaskAttemptID(taskId, sparkAttemptNumber) - - // Set up the attempt context required to use in the output committer. - val taskAttemptContext: TaskAttemptContext = { - // Set up the configuration object - val hadoopConf = description.serializableHadoopConf.value - CarbonStreamOutputFormat.setSegmentId(hadoopConf, description.segmentId) - hadoopConf.set("mapred.job.id", jobId.toString) - hadoopConf.set("mapred.tip.id", taskAttemptId.getTaskID.toString) - hadoopConf.set("mapred.task.id", taskAttemptId.toString) - hadoopConf.setBoolean("mapred.task.is.map", true) - hadoopConf.setInt("mapred.task.partition", 0) - new TaskAttemptContextImpl(hadoopConf, taskAttemptId) - } - - committer.setupTask(taskAttemptContext) - - try { - Utils.tryWithSafeFinallyAndFailureCallbacks(block = { - - val parserName = taskAttemptContext.getConfiguration.get( - CarbonStreamParser.CARBON_STREAM_PARSER, - CarbonStreamParser.CARBON_STREAM_PARSER_DEFAULT) - - val streamParser = - Class.forName(parserName).newInstance.asInstanceOf[CarbonStreamParser] - streamParser.initialize(taskAttemptContext.getConfiguration, rowSchema) - - StreamSegment.appendBatchData(new InputIterator(iterator, streamParser), - taskAttemptContext, carbonLoadModel) - })(catchBlock = { - committer.abortTask(taskAttemptContext) - LOGGER.error(s"Job $jobId aborted.") - }) - committer.commitTask(taskAttemptContext) - } catch { - case t: Throwable => - throw new CarbonStreamException("Task failed while writing rows", t) - } - } - - /** - * convert spark iterator to carbon iterator, so that java module can use it. - */ - class InputIterator(rddIter: Iterator[InternalRow], streamParser: CarbonStreamParser) - extends CarbonIterator[Array[Object]] { - - override def hasNext: Boolean = rddIter.hasNext - - override def next: Array[Object] = { - streamParser.parserRow(rddIter.next()) - } - - override def close(): Unit = { - streamParser.close() - } - } - -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonStreamingQueryListener.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonStreamingQueryListener.scala b/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonStreamingQueryListener.scala deleted file mode 100644 index 2f911c5..0000000 --- a/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonStreamingQueryListener.scala +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.streaming - -import java.util -import java.util.UUID - -import org.apache.spark.SPARK_VERSION -import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.streaming.StreamingQueryListener - -import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock, LockUsage} - -class CarbonStreamingQueryListener(spark: SparkSession) extends StreamingQueryListener { - - private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) - - private val cache = new util.HashMap[UUID, ICarbonLock]() - - override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = { - val streamQuery = spark.streams.get(event.id) - val qry = if (streamQuery.isInstanceOf[StreamExecution]) { - // adapt spark 2.1 - streamQuery.asInstanceOf[StreamExecution] - } else { - // adapt spark 2.2 and later version - val clazz = Class.forName("org.apache.spark.sql.execution.streaming.StreamingQueryWrapper") - val method = clazz.getMethod("streamingQuery") - method.invoke(streamQuery).asInstanceOf[StreamExecution] - } - if (qry.sink.isInstanceOf[CarbonAppendableStreamSink]) { - LOGGER.info("Carbon streaming query started: " + event.id) - val sink = qry.sink.asInstanceOf[CarbonAppendableStreamSink] - val carbonTable = sink.carbonTable - val lock = CarbonLockFactory.getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier, - LockUsage.STREAMING_LOCK) - if (lock.lockWithRetries()) { - LOGGER.info("Acquired the lock for stream table: " + carbonTable.getDatabaseName + "." + - carbonTable.getTableName) - cache.put(event.id, lock) - } else { - LOGGER.error("Not able to acquire the lock for stream table:" + - carbonTable.getDatabaseName + "." + carbonTable.getTableName) - throw new InterruptedException( - "Not able to acquire the lock for stream table: " + carbonTable.getDatabaseName + "." + - carbonTable.getTableName) - } - } - } - - override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = { - } - - override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = { - val lock = cache.remove(event.id) - if (null != lock) { - LOGGER.info("Carbon streaming query: " + event.id) - lock.unlock() - } - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/streaming/src/test/java/org/apache/carbondata/streaming/CarbonStreamInputFormatTest.java ---------------------------------------------------------------------- diff --git a/streaming/src/test/java/org/apache/carbondata/streaming/CarbonStreamInputFormatTest.java b/streaming/src/test/java/org/apache/carbondata/streaming/CarbonStreamInputFormatTest.java new file mode 100644 index 0000000..a224446 --- /dev/null +++ b/streaming/src/test/java/org/apache/carbondata/streaming/CarbonStreamInputFormatTest.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.streaming; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.UUID; + +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; +import org.apache.carbondata.core.metadata.CarbonTableIdentifier; +import org.apache.carbondata.core.statusmanager.FileFormat; +import org.apache.carbondata.hadoop.CarbonInputSplit; +import org.apache.carbondata.hadoop.CarbonMultiBlockSplit; +import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil; + +import junit.framework.TestCase; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.TaskID; +import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; +import org.junit.Assert; +import org.junit.Test; + +public class CarbonStreamInputFormatTest extends TestCase { + + private TaskAttemptID taskAttemptId; + private TaskAttemptContext taskAttemptContext; + private Configuration hadoopConf; + private AbsoluteTableIdentifier identifier; + private String tablePath; + + + @Override protected void setUp() throws Exception { + tablePath = new File("target/stream_input").getCanonicalPath(); + String dbName = "default"; + String tableName = "stream_table_input"; + identifier = AbsoluteTableIdentifier.from( + tablePath, + new CarbonTableIdentifier(dbName, tableName, UUID.randomUUID().toString())); + + JobID jobId = CarbonInputFormatUtil.getJobId(new Date(), 0); + TaskID taskId = new TaskID(jobId, TaskType.MAP, 0); + taskAttemptId = new TaskAttemptID(taskId, 0); + + hadoopConf = new Configuration(); + taskAttemptContext = new TaskAttemptContextImpl(hadoopConf, taskAttemptId); + } + + private InputSplit buildInputSplit() throws IOException { + CarbonInputSplit carbonInputSplit = new CarbonInputSplit(); + List<CarbonInputSplit> splitList = new ArrayList<>(); + splitList.add(carbonInputSplit); + return new CarbonMultiBlockSplit(splitList, new String[] { "localhost" }, + FileFormat.ROW_V1); + } + + @Test public void testCreateRecordReader() { + try { + InputSplit inputSplit = buildInputSplit(); + CarbonStreamInputFormat inputFormat = new CarbonStreamInputFormat(); + RecordReader recordReader = inputFormat.createRecordReader(inputSplit, taskAttemptContext); + Assert.assertNotNull("Failed to create record reader", recordReader); + } catch (Exception e) { + e.printStackTrace(); + Assert.assertTrue(e.getMessage(), false); + } + } + + @Override protected void tearDown() throws Exception { + super.tearDown(); + if (tablePath != null) { + FileFactory.deleteAllFilesOfDir(new File(tablePath)); + } + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/streaming/src/test/java/org/apache/carbondata/streaming/CarbonStreamOutputFormatTest.java ---------------------------------------------------------------------- diff --git a/streaming/src/test/java/org/apache/carbondata/streaming/CarbonStreamOutputFormatTest.java b/streaming/src/test/java/org/apache/carbondata/streaming/CarbonStreamOutputFormatTest.java new file mode 100644 index 0000000..af79483 --- /dev/null +++ b/streaming/src/test/java/org/apache/carbondata/streaming/CarbonStreamOutputFormatTest.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.streaming; + +import java.io.File; +import java.io.IOException; +import java.util.Date; +import java.util.UUID; + +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; +import org.apache.carbondata.core.metadata.CarbonTableIdentifier; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.hadoop.testutil.StoreCreator; +import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil; +import org.apache.carbondata.processing.loading.model.CarbonLoadModel; + +import junit.framework.TestCase; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.TaskID; +import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; +import org.junit.Assert; +import org.junit.Test; + +public class CarbonStreamOutputFormatTest extends TestCase { + + private Configuration hadoopConf; + private TaskAttemptID taskAttemptId; + private CarbonLoadModel carbonLoadModel; + private String tablePath; + + @Override protected void setUp() throws Exception { + super.setUp(); + JobID jobId = CarbonInputFormatUtil.getJobId(new Date(), 0); + TaskID taskId = new TaskID(jobId, TaskType.MAP, 0); + taskAttemptId = new TaskAttemptID(taskId, 0); + + hadoopConf = new Configuration(); + hadoopConf.set("mapred.job.id", jobId.toString()); + hadoopConf.set("mapred.tip.id", taskAttemptId.getTaskID().toString()); + hadoopConf.set("mapred.task.id", taskAttemptId.toString()); + hadoopConf.setBoolean("mapred.task.is.map", true); + hadoopConf.setInt("mapred.task.partition", 0); + + tablePath = new File("target/stream_output").getCanonicalPath(); + String dbName = "default"; + String tableName = "stream_table_output"; + AbsoluteTableIdentifier identifier = + AbsoluteTableIdentifier.from( + tablePath, + new CarbonTableIdentifier(dbName, tableName, UUID.randomUUID().toString())); + + CarbonTable table = StoreCreator.createTable(identifier); + + String factFilePath = new File("../hadoop/src/test/resources/data.csv").getCanonicalPath(); + carbonLoadModel = StoreCreator.buildCarbonLoadModel(table, factFilePath, identifier); + } + + @Test public void testSetCarbonLoadModel() { + try { + CarbonStreamOutputFormat.setCarbonLoadModel(hadoopConf, carbonLoadModel); + } catch (IOException e) { + Assert.assertTrue("Failed to config CarbonLoadModel for CarbonStreamOutputFromat", false); + } + } + + @Test public void testGetCarbonLoadModel() { + try { + CarbonStreamOutputFormat.setCarbonLoadModel(hadoopConf, carbonLoadModel); + CarbonLoadModel model = CarbonStreamOutputFormat.getCarbonLoadModel(hadoopConf); + + Assert.assertNotNull("Failed to get CarbonLoadModel", model); + Assert.assertTrue("CarbonLoadModel should be same with previous", + carbonLoadModel.getFactTimeStamp() == model.getFactTimeStamp()); + + } catch (IOException e) { + Assert.assertTrue("Failed to get CarbonLoadModel for CarbonStreamOutputFromat", false); + } + } + + @Test public void testGetRecordWriter() { + CarbonStreamOutputFormat outputFormat = new CarbonStreamOutputFormat(); + try { + CarbonStreamOutputFormat.setCarbonLoadModel(hadoopConf, carbonLoadModel); + TaskAttemptContext taskAttemptContext = + new TaskAttemptContextImpl(hadoopConf, taskAttemptId); + RecordWriter recordWriter = outputFormat.getRecordWriter(taskAttemptContext); + Assert.assertNotNull("Failed to get CarbonStreamRecordWriter", recordWriter); + } catch (Exception e) { + e.printStackTrace(); + Assert.assertTrue(e.getMessage(), false); + } + } + + @Override protected void tearDown() throws Exception { + super.tearDown(); + if (tablePath != null) { + FileFactory.deleteAllFilesOfDir(new File(tablePath)); + } + } +}
