Repository: carbondata Updated Branches: refs/heads/master 001795ce2 -> 067b833f1
[CARBONDATA-2161] update mergeTo column for compacted segment of streaming table This closes #1971 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/067b833f Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/067b833f Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/067b833f Branch: refs/heads/master Commit: 067b833f1f41bc4103a1d8248eb3d99e5990a127 Parents: 001795c Author: BJangir <[email protected]> Authored: Mon Feb 12 01:02:30 2018 +0530 Committer: QiangCai <[email protected]> Committed: Fri Feb 23 14:36:24 2018 +0800 ---------------------------------------------------------------------- .../apache/spark/carbondata/TestStreamingTableOperation.scala | 5 +++++ .../org/apache/carbondata/streaming/StreamHandoffRDD.scala | 1 + 2 files changed, 6 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/067b833f/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala index a368cef..4b3a957 100644 --- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala +++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala @@ -637,6 +637,11 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { assertResult(newSegments.length / 2)(newSegments.filter(_.getString(1).equals("Success")).length) assertResult(newSegments.length / 2)(newSegments.filter(_.getString(1).equals("Compacted")).length) + //Verify MergeTO column entry for compacted Segments + newSegments.filter(_.getString(1).equals("Compacted")).foreach{ rw => + assertResult("Compacted")(rw.getString(1)) + assertResult((Integer.parseInt(rw.getString(0))+2).toString)(rw.getString(4)) + } checkAnswer( sql("select count(*) from streaming.stream_table_reopen"), Seq(Row(2 * 100 * 2)) http://git-wip-us.apache.org/repos/asf/carbondata/blob/067b833f/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala index 41dfa50..4caa401 100644 --- a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala +++ b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala @@ -398,6 +398,7 @@ object StreamHandoffRDD { throw new Exception("Failed to update table status for streaming segment") } else { streamSegment.get.setSegmentStatus(SegmentStatus.COMPACTED) + streamSegment.get.setMergedLoadName(loadModel.getSegmentId) } // refresh table status file
