Repository: carbondata
Updated Branches:
  refs/heads/branch-1.3 39ac94e46 -> 1997ca235


[CARBONDATA-2161] update mergeTo column for compacted segment of streaming table

This closes #1971


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/c2785b35
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/c2785b35
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/c2785b35

Branch: refs/heads/branch-1.3
Commit: c2785b352f7b7cb2dd524811b0696fb18c12d5b0
Parents: 39ac94e
Author: BJangir <babulaljangir...@gmail.com>
Authored: Mon Feb 12 01:02:30 2018 +0530
Committer: Venkata Ramana G <ramana.gollam...@huawei.com>
Committed: Tue Feb 27 12:45:55 2018 +0530

----------------------------------------------------------------------
 .../apache/spark/carbondata/TestStreamingTableOperation.scala   | 5 +++++
 .../org/apache/carbondata/streaming/StreamHandoffRDD.scala      | 1 +
 2 files changed, 6 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/c2785b35/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
 
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
index a368cef..4b3a957 100644
--- 
a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
+++ 
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
@@ -637,6 +637,11 @@ class TestStreamingTableOperation extends QueryTest with 
BeforeAndAfterAll {
     assertResult(newSegments.length / 
2)(newSegments.filter(_.getString(1).equals("Success")).length)
     assertResult(newSegments.length / 
2)(newSegments.filter(_.getString(1).equals("Compacted")).length)
 
+    //Verify MergeTO column entry for compacted Segments
+    newSegments.filter(_.getString(1).equals("Compacted")).foreach{ rw =>
+      assertResult("Compacted")(rw.getString(1))
+      
assertResult((Integer.parseInt(rw.getString(0))+2).toString)(rw.getString(4))
+    }
     checkAnswer(
       sql("select count(*) from streaming.stream_table_reopen"),
       Seq(Row(2 * 100 * 2))

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c2785b35/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
 
b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
index 41dfa50..4caa401 100644
--- 
a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
+++ 
b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
@@ -398,6 +398,7 @@ object StreamHandoffRDD {
           throw new Exception("Failed to update table status for streaming 
segment")
         } else {
           streamSegment.get.setSegmentStatus(SegmentStatus.COMPACTED)
+          streamSegment.get.setMergedLoadName(loadModel.getSegmentId)
         }
 
         // refresh table status file

Reply via email to