Repository: carbondata Updated Branches: refs/heads/master f810389fa -> f664805bf
Fix unsupported data type exception for streaming Background: when spark uses Kryo serialization, streaming app throws the exception "Unsupported data type". Root cause: 1. collect the data type list to driver side from executor side. 2. when using Kryo, datatype single instances are not working. Solution: don't collect measure data type list from executor side to avoid serialization. This closes #2832 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/f664805b Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/f664805b Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/f664805b Branch: refs/heads/master Commit: f664805bf6fc9dea1683123aa7a25e7913dac9ae Parents: f810389 Author: QiangCai <[email protected]> Authored: Thu Oct 18 10:56:13 2018 +0800 Committer: Zhang Zhichao <[email protected]> Committed: Fri Oct 19 23:14:33 2018 +0800 ---------------------------------------------------------------------- .../streaming/CarbonAppendableStreamSink.scala | 25 +++++++++++++++----- .../streaming/index/StreamFileIndex.java | 10 -------- .../streaming/segment/StreamSegment.java | 19 +++++++-------- 3 files changed, 28 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/f664805b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala index 2fdbc86..3d8170e 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.streaming import java.util.Date +import scala.collection.JavaConverters._ + import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce._ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl @@ -36,6 +38,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.dictionary.server.DictionaryServer +import org.apache.carbondata.core.metadata.datatype.DataType import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.stats.QueryStatistic import org.apache.carbondata.core.util.CarbonProperties @@ -43,7 +46,6 @@ import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.events.{OperationContext, OperationListenerBus} import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants -import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePostExecutionEvent, LoadTablePreExecutionEvent} import org.apache.carbondata.processing.loading.model.CarbonLoadModel import org.apache.carbondata.spark.rdd.StreamHandoffRDD @@ -102,6 +104,17 @@ class CarbonAppendableStreamSink( CarbonProperties.getInstance().isEnableAutoHandoff ) + // measure data type array + private lazy val msrDataTypes = { + carbonLoadModel + .getCarbonDataLoadSchema + .getCarbonTable + .getMeasures + .asScala + .map(_.getDataType) + .toArray + } + override def addBatch(batchId: Long, data: DataFrame): Unit = { if (batchId <= fileLog.getLatest().map(_._1).getOrElse(-1L)) { CarbonAppendableStreamSink.LOGGER.info(s"Skipping already committed batch $batchId") @@ -133,14 +146,14 @@ class CarbonAppendableStreamSink( CarbonAppendableStreamSink.writeDataFileJob( sparkSession, carbonTable, - parameters, batchId, currentSegmentId, data.queryExecution, committer, hadoopConf, carbonLoadModel, - server) + server, + msrDataTypes) // fire post event on every batch add val loadTablePostExecutionEvent = new LoadTablePostExecutionEvent( carbonTable.getCarbonTableIdentifier, @@ -212,14 +225,14 @@ object CarbonAppendableStreamSink { def writeDataFileJob( sparkSession: SparkSession, carbonTable: CarbonTable, - parameters: Map[String, String], batchId: Long, segmentId: String, queryExecution: QueryExecution, committer: FileCommitProtocol, hadoopConf: Configuration, carbonLoadModel: CarbonLoadModel, - server: Option[DictionaryServer]): Unit = { + server: Option[DictionaryServer], + msrDataTypes: Array[DataType]): Unit = { // create job val job = Job.getInstance(hadoopConf) @@ -276,7 +289,7 @@ object CarbonAppendableStreamSink { // update data file info in index file StreamSegment.updateIndexFile( CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentId), - result.map(_._2)) + result.map(_._2), msrDataTypes) } catch { // catch fault of executor side http://git-wip-us.apache.org/repos/asf/carbondata/blob/f664805b/streaming/src/main/java/org/apache/carbondata/streaming/index/StreamFileIndex.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/index/StreamFileIndex.java b/streaming/src/main/java/org/apache/carbondata/streaming/index/StreamFileIndex.java index fa8a694..0a5113e 100644 --- a/streaming/src/main/java/org/apache/carbondata/streaming/index/StreamFileIndex.java +++ b/streaming/src/main/java/org/apache/carbondata/streaming/index/StreamFileIndex.java @@ -21,7 +21,6 @@ import java.io.Serializable; import org.apache.carbondata.common.annotations.InterfaceAudience; import org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex; -import org.apache.carbondata.core.metadata.datatype.DataType; @InterfaceAudience.Internal public class StreamFileIndex implements Serializable { @@ -35,8 +34,6 @@ public class StreamFileIndex implements Serializable { private long rowCount; - private DataType[] msrDataTypes; - public StreamFileIndex(String fileName, BlockletMinMaxIndex minMaxIndex, long rowCount) { this.fileName = fileName; this.minMaxIndex = minMaxIndex; @@ -67,11 +64,4 @@ public class StreamFileIndex implements Serializable { this.rowCount = rowCount; } - public DataType[] getMsrDataTypes() { - return msrDataTypes; - } - - public void setMsrDataTypes(DataType[] msrDataTypes) { - this.msrDataTypes = msrDataTypes; - } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/f664805b/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java b/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java index b436b18..ba3d64a 100644 --- a/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java +++ b/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java @@ -269,10 +269,9 @@ public class StreamSegment { * create a StreamBlockIndex from the SimpleStatsResult array */ private static StreamFileIndex createStreamBlockIndex(String fileName, - BlockletMinMaxIndex minMaxIndex, DataType[] msrDataTypes, int blockletRowCount) { + BlockletMinMaxIndex minMaxIndex, int blockletRowCount) { StreamFileIndex streamFileIndex = new StreamFileIndex(fileName, minMaxIndex, blockletRowCount); - streamFileIndex.setMsrDataTypes(msrDataTypes); return streamFileIndex; } @@ -298,7 +297,7 @@ public class StreamSegment { inputIterators.close(); return createStreamBlockIndex(writer.getFileName(), writer.getBatchMinMaxIndex(), - writer.getMeasureDataTypes(), blockletRowCount); + blockletRowCount); } catch (Throwable ex) { if (writer != null) { LOGGER.error("Failed to append batch data to stream segment: " + @@ -449,8 +448,8 @@ public class StreamSegment { * 2.1 if blocklet index is null, use the BlockletMinMaxIndex index of stream * 2.2 if blocklet index is not null, combine these two index */ - private static void mergeBatchMinMax(StreamFileIndex blockletIndex, BlockletMinMaxIndex fileIndex) - throws IOException { + private static void mergeBatchMinMax(StreamFileIndex blockletIndex, + BlockletMinMaxIndex fileIndex, DataType[] msrDataTypes) throws IOException { if (fileIndex == null) { // backward compatibility // it will not create a min/max index for the old stream file(without min/max index). @@ -465,7 +464,6 @@ public class StreamSegment { return; } - DataType[] msrDataTypes = blockletIndex.getMsrDataTypes(); SerializableComparator[] comparators = new SerializableComparator[msrDataTypes.length]; for (int index = 0; index < comparators.length; index++) { comparators[index] = Comparator.getComparatorByDataTypeForMeasure(msrDataTypes[index]); @@ -594,7 +592,8 @@ public class StreamSegment { * merge new blocklet index and old file index to create new file index */ private static void updateStreamFileIndex(Map<String, StreamFileIndex> indexMap, - String indexPath, FileFactory.FileType fileType) throws IOException { + String indexPath, FileFactory.FileType fileType, DataType[] msrDataTypes + ) throws IOException { List<BlockIndex> blockIndexList = readIndexFile(indexPath, fileType); for (BlockIndex blockIndex : blockIndexList) { BlockletMinMaxIndex fileIndex = CarbonMetadataUtil @@ -607,7 +606,7 @@ public class StreamSegment { } else { // merge minMaxIndex into StreamBlockIndex blockletIndex.setRowCount(blockletIndex.getRowCount() + blockIndex.getNum_rows()); - mergeBatchMinMax(blockletIndex, fileIndex); + mergeBatchMinMax(blockletIndex, fileIndex, msrDataTypes); } } } @@ -616,7 +615,7 @@ public class StreamSegment { * update carbon index file after a stream batch. */ public static void updateIndexFile(String segmentDir, - StreamFileIndex[] blockIndexes) throws IOException { + StreamFileIndex[] blockIndexes, DataType[] msrDataTypes) throws IOException { FileFactory.FileType fileType = FileFactory.getFileType(segmentDir); String filePath = CarbonTablePath.getCarbonStreamIndexFilePath(segmentDir); // update min/max index @@ -624,7 +623,7 @@ public class StreamSegment { for (StreamFileIndex fileIndex : blockIndexes) { indexMap.put(fileIndex.getFileName(), fileIndex); } - updateStreamFileIndex(indexMap, filePath, fileType); + updateStreamFileIndex(indexMap, filePath, fileType, msrDataTypes); String tempFilePath = filePath + CarbonCommonConstants.TEMPWRITEFILEEXTENSION; CarbonIndexFileWriter writer = new CarbonIndexFileWriter();
