Repository: carbondata Updated Branches: refs/heads/branch-1.3 39ac94e46 -> 1997ca235
[CARBONDATA-2161] update mergeTo column for compacted segment of streaming table This closes #1971 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/c2785b35 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/c2785b35 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/c2785b35 Branch: refs/heads/branch-1.3 Commit: c2785b352f7b7cb2dd524811b0696fb18c12d5b0 Parents: 39ac94e Author: BJangir <babulaljangir...@gmail.com> Authored: Mon Feb 12 01:02:30 2018 +0530 Committer: Venkata Ramana G <ramana.gollam...@huawei.com> Committed: Tue Feb 27 12:45:55 2018 +0530 ---------------------------------------------------------------------- .../apache/spark/carbondata/TestStreamingTableOperation.scala | 5 +++++ .../org/apache/carbondata/streaming/StreamHandoffRDD.scala | 1 + 2 files changed, 6 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/c2785b35/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala index a368cef..4b3a957 100644 --- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala +++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala @@ -637,6 +637,11 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { assertResult(newSegments.length / 2)(newSegments.filter(_.getString(1).equals("Success")).length) assertResult(newSegments.length / 2)(newSegments.filter(_.getString(1).equals("Compacted")).length) + //Verify MergeTO column entry for compacted Segments + newSegments.filter(_.getString(1).equals("Compacted")).foreach{ rw => + assertResult("Compacted")(rw.getString(1)) + assertResult((Integer.parseInt(rw.getString(0))+2).toString)(rw.getString(4)) + } checkAnswer( sql("select count(*) from streaming.stream_table_reopen"), Seq(Row(2 * 100 * 2)) http://git-wip-us.apache.org/repos/asf/carbondata/blob/c2785b35/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala index 41dfa50..4caa401 100644 --- a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala +++ b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala @@ -398,6 +398,7 @@ object StreamHandoffRDD { throw new Exception("Failed to update table status for streaming segment") } else { streamSegment.get.setSegmentStatus(SegmentStatus.COMPACTED) + streamSegment.get.setMergedLoadName(loadModel.getSegmentId) } // refresh table status file