Repository: carbondata Updated Branches: refs/heads/master ee11bb1eb -> 3312ee72c
[CARBONDATA-2067] Fix NPE exception in StreamHandoffRDD To avoid NPE of carbon table in StreamHandoffRDD, add carbon table to cache in internalCompute method This closes #1846 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/3312ee72 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/3312ee72 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/3312ee72 Branch: refs/heads/master Commit: 3312ee72c5e67a3e7f00a47f925b6832a00b14a6 Parents: ee11bb1 Author: QiangCai <[email protected]> Authored: Tue Jan 23 16:20:19 2018 +0800 Committer: Jacky Li <[email protected]> Committed: Thu Jan 25 15:46:04 2018 +0800 ---------------------------------------------------------------------- .../org/apache/carbondata/streaming/StreamHandoffRDD.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/3312ee72/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala index b91be0c..d092580 100644 --- a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala +++ b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala @@ -17,7 +17,6 @@ package org.apache.carbondata.streaming -import java.io.IOException import java.text.SimpleDateFormat import java.util import java.util.Date @@ -32,7 +31,8 @@ import org.apache.spark.sql.SparkSession import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.datastore.block.SegmentProperties import org.apache.carbondata.core.datastore.impl.FileFactory -import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock, LockUsage} +import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage} +import org.apache.carbondata.core.metadata.CarbonMetadata import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.scan.result.iterator.RawResultIterator import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager} @@ -46,7 +46,6 @@ import org.apache.carbondata.processing.merger.{CompactionResultSortProcessor, C import org.apache.carbondata.processing.util.CarbonLoaderUtil import org.apache.carbondata.spark.{HandoffResult, HandoffResultImpl} import org.apache.carbondata.spark.rdd.CarbonRDD -import org.apache.carbondata.streaming.segment.StreamSegment /** * partition of the handoff segment @@ -106,6 +105,7 @@ class StreamHandoffRDD[K, V]( carbonLoadModel.setPartitionId("0") carbonLoadModel.setTaskNo("" + split.index) val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable + CarbonMetadata.getInstance().addCarbonTable(carbonTable) // the input iterator is using raw row val iteratorList = prepareInputIterator(split, carbonTable) // use CompactionResultSortProcessor to sort data dan write to columnar files
