[CARBONDATA-2984][Streaming] Fix NPE when there is no data in the task of a batch
Fix NPE when there is no data in the task of a batch Streaming batch maybe has no data, so it doesn't require to append blocklet to streaming file. So it doesn't need to update min/max index of streaming file, just use min/max index of old file . This closes #2782 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/fa9c8323 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/fa9c8323 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/fa9c8323 Branch: refs/heads/branch-1.5 Commit: fa9c8323c11c083452d75886cbbdad1f23d6dfb7 Parents: 0b16816 Author: QiangCai <[email protected]> Authored: Fri Sep 28 14:48:39 2018 +0800 Committer: ravipesala <[email protected]> Committed: Wed Oct 3 20:13:50 2018 +0530 ---------------------------------------------------------------------- .../TestStreamingTableOperation.scala | 49 +++++++++++++++++++- .../streaming/CarbonStreamRecordWriter.java | 5 +- .../streaming/segment/StreamSegment.java | 15 ++++-- 3 files changed, 61 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa9c8323/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala index 43c1e5a..607c429 100644 --- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala +++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala @@ -37,6 +37,7 @@ import org.scalatest.BeforeAndAfterAll import org.apache.carbondata.common.exceptions.NoSuchStreamException import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider.TIMESERIES import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.statusmanager.{FileFormat, SegmentStatus} @@ -125,6 +126,8 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { createTable(tableName = "agg_table", streaming = true, withBatchLoad = false) + createTable(tableName = "stream_table_empty", streaming = true, withBatchLoad = false) + var csvDataDir = integrationPath + "/spark2/target/csvdatanew" generateCSVDataFile(spark, idStart = 10, rowNums = 5, csvDataDir) generateCSVDataFile(spark, idStart = 10, rowNums = 5, csvDataDir, SaveMode.Append) @@ -213,6 +216,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { sql("drop table if exists streaming.stream_table_reopen") sql("drop table if exists streaming.stream_table_drop") sql("drop table if exists streaming.agg_table_block") + sql("drop table if exists streaming.stream_table_empty") } // normal table not support streaming ingest @@ -226,7 +230,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { .asInstanceOf[CarbonRelation].metaData.carbonTable var server: ServerSocket = null try { - server = getServerSocket + server = getServerSocket() val thread1 = createWriteSocketThread(server, 2, 10, 1) thread1.start() // use thread pool to catch the exception of sink thread @@ -2253,6 +2257,46 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { sql("DROP TABLE IF EXISTS dim") } + // test empty batch + test("test empty batch") { + executeStreamingIngest( + tableName = "stream_table_empty", + batchNums = 1, + rowNumsEachBatch = 10, + intervalOfSource = 1, + intervalOfIngest = 3, + continueSeconds = 10, + generateBadRecords = false, + badRecordAction = "force", + autoHandoff = false + ) + var result = sql("select count(*) from streaming.stream_table_empty").collect() + assert(result(0).getLong(0) == 10) + + // clean checkpointDir and logDir + val carbonTable = CarbonEnv.getCarbonTable(Option("streaming"), "stream_table_empty")(spark) + FileFactory + .deleteAllFilesOfDir(new File(CarbonTablePath.getStreamingLogDir(carbonTable.getTablePath))) + FileFactory + .deleteAllFilesOfDir(new File(CarbonTablePath + .getStreamingCheckpointDir(carbonTable.getTablePath))) + + // some batches don't have data + executeStreamingIngest( + tableName = "stream_table_empty", + batchNums = 1, + rowNumsEachBatch = 1, + intervalOfSource = 1, + intervalOfIngest = 1, + continueSeconds = 10, + generateBadRecords = false, + badRecordAction = "force", + autoHandoff = false + ) + result = sql("select count(*) from streaming.stream_table_empty").collect() + assert(result(0).getLong(0) == 11) + } + def createWriteSocketThread( serverSocket: ServerSocket, writeNums: Int, @@ -2330,7 +2374,8 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { .load() // Write data from socket stream to carbondata file - qry = readSocketDF.writeStream + // repartition to simulate an empty partition when readSocketDF has only one row + qry = readSocketDF.repartition(2).writeStream .format("carbondata") .trigger(ProcessingTime(s"$intervalSecond seconds")) .option("checkpointLocation", CarbonTablePath.getStreamingCheckpointDir(carbonTable.getTablePath)) http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa9c8323/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java index 0d2a889..672f6a6 100644 --- a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java +++ b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java @@ -139,9 +139,7 @@ public class CarbonStreamRecordWriter extends RecordWriter<Void, Object> { segmentDir = CarbonTablePath.getSegmentPath( carbonTable.getAbsoluteTableIdentifier().getTablePath(), segmentId); fileName = CarbonTablePath.getCarbonDataFileName(0, taskNo, 0, 0, "0", segmentId); - } - private void initializeAtFirstRow() throws IOException, InterruptedException { // initialize metadata isNoDictionaryDimensionColumn = CarbonDataProcessorUtil.getNoDictionaryMapping(configuration.getDataFields()); @@ -153,6 +151,9 @@ public class CarbonStreamRecordWriter extends RecordWriter<Void, Object> { measureDataTypes[i] = dataFields[dimensionWithComplexCount + i].getColumn().getDataType(); } + } + + private void initializeAtFirstRow() throws IOException, InterruptedException { // initialize parser and converter rowParser = new RowParserImpl(dataFields, configuration); badRecordLogger = BadRecordsLoggerProvider.createBadRecordLogger(configuration); http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa9c8323/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java b/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java index 51417c4..6ee6876 100644 --- a/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java +++ b/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java @@ -458,6 +458,13 @@ public class StreamSegment { return; } + BlockletMinMaxIndex minMaxIndex = blockletIndex.getMinMaxIndex(); + // if min/max of new blocklet is null, use min/max of old file + if (minMaxIndex == null) { + blockletIndex.setMinMaxIndex(fileIndex); + return; + } + DataType[] msrDataTypes = blockletIndex.getMsrDataTypes(); SerializableComparator[] comparators = new SerializableComparator[msrDataTypes.length]; for (int index = 0; index < comparators.length; index++) { @@ -465,11 +472,11 @@ public class StreamSegment { } // min value - byte[][] minValues = blockletIndex.getMinMaxIndex().getMinValues(); + byte[][] minValues = minMaxIndex.getMinValues(); byte[][] mergedMinValues = fileIndex.getMinValues(); if (minValues == null || minValues.length == 0) { // use file index - blockletIndex.getMinMaxIndex().setMinValues(mergedMinValues); + minMaxIndex.setMinValues(mergedMinValues); } else if (mergedMinValues != null && mergedMinValues.length != 0) { if (minValues.length != mergedMinValues.length) { throw new IOException("the lengths of the min values should be same."); @@ -494,10 +501,10 @@ public class StreamSegment { } // max value - byte[][] maxValues = blockletIndex.getMinMaxIndex().getMaxValues(); + byte[][] maxValues = minMaxIndex.getMaxValues(); byte[][] mergedMaxValues = fileIndex.getMaxValues(); if (maxValues == null || maxValues.length == 0) { - blockletIndex.getMinMaxIndex().setMaxValues(mergedMaxValues); + minMaxIndex.setMaxValues(mergedMaxValues); } else if (mergedMaxValues != null && mergedMaxValues.length != 0) { if (maxValues.length != mergedMaxValues.length) { throw new IOException("the lengths of the max values should be same.");
