Repository: carbondata Updated Branches: refs/heads/master 8f08c4abb -> daa91c88e
[HOTFIX] Fix streaming CI issue for Spark 2.3 integration This closes #2712 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/daa91c88 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/daa91c88 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/daa91c88 Branch: refs/heads/master Commit: daa91c88e1b85d4bce869c92e0ab48f97e55b005 Parents: 8f08c4a Author: QiangCai <[email protected]> Authored: Wed Sep 12 12:00:27 2018 +0800 Committer: Jacky Li <[email protected]> Committed: Wed Sep 12 18:34:15 2018 +0800 ---------------------------------------------------------------------- .../spark/carbondata/TestStreamingTableOperation.scala | 13 ++++++------- .../carbondata/streaming/CarbonStreamRecordWriter.java | 4 ++++ 2 files changed, 10 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/daa91c88/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala index f43b637..31c9597 100644 --- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala +++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala @@ -161,10 +161,10 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { test("test blocking update and delete operation on streaming table") { val exceptionMsgUpdate = intercept[MalformedCarbonCommandException] { - sql("""UPDATE source d SET (d.c2) = (d.c2 + 1) WHERE d.c1 = 'a'""").show() + sql("""UPDATE source d SET (d.c2) = (d.c2 + 1) WHERE d.c1 = 'a'""").collect() } val exceptionMsgDelete = intercept[MalformedCarbonCommandException] { - sql("""DELETE FROM source WHERE d.c1 = 'a'""").show() + sql("""DELETE FROM source WHERE d.c1 = 'a'""").collect() } assert(exceptionMsgUpdate.getMessage.equals("Data update is not allowed for streaming table")) assert(exceptionMsgDelete.getMessage.equals("Data delete is not allowed for streaming table")) @@ -172,16 +172,16 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { test("test blocking alter table operation on streaming table") { val addColException = intercept[MalformedCarbonCommandException] { - sql("""ALTER TABLE source ADD COLUMNS (c6 string)""").show() + sql("""ALTER TABLE source ADD COLUMNS (c6 string)""").collect() } val dropColException = intercept[MalformedCarbonCommandException] { - sql("""ALTER TABLE source DROP COLUMNS (c1)""").show() + sql("""ALTER TABLE source DROP COLUMNS (c1)""").collect() } val renameException = intercept[MalformedCarbonCommandException] { - sql("""ALTER TABLE source RENAME to t""").show() + sql("""ALTER TABLE source RENAME to t""").collect() } val changeDataTypeException = intercept[MalformedCarbonCommandException] { - sql("""ALTER TABLE source CHANGE c2 c2 bigint""").show() + sql("""ALTER TABLE source CHANGE c2 c2 bigint""").collect() } assertResult("Alter table add column is not allowed for streaming table")(addColException.getMessage) assertResult("Alter table drop column is not allowed for streaming table")(dropColException.getMessage) @@ -419,7 +419,6 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { Row("name_12", 480000.0), Row("name_11", 440000.0), Row("name_13", 520000.0))) - // sql("select * from agg_table2_p1").show() checkAnswer(sql("select * from agg_table2_p1"), Seq( Row("name_10", 200000.0), http://git-wip-us.apache.org/repos/asf/carbondata/blob/daa91c88/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java index f7ce1f2..eeb9cc1 100644 --- a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java +++ b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java @@ -337,6 +337,10 @@ public class CarbonStreamRecordWriter extends RecordWriter<Void, Object> { } public BlockletMinMaxIndex getBatchMinMaxIndex() { + if (output == null) { + return StreamSegment.mergeBlockletMinMax( + batchMinMaxIndex, null, measureDataTypes); + } return StreamSegment.mergeBlockletMinMax( batchMinMaxIndex, output.generateBlockletMinMax(), measureDataTypes); }
