Repository: carbondata
Updated Branches:
  refs/heads/master 001795ce2 -> 067b833f1


[CARBONDATA-2161] update mergeTo column for compacted segment of streaming table

This closes #1971


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/067b833f
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/067b833f
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/067b833f

Branch: refs/heads/master
Commit: 067b833f1f41bc4103a1d8248eb3d99e5990a127
Parents: 001795c
Author: BJangir <babulaljangir...@gmail.com>
Authored: Mon Feb 12 01:02:30 2018 +0530
Committer: QiangCai <qiang...@qq.com>
Committed: Fri Feb 23 14:36:24 2018 +0800

----------------------------------------------------------------------
 .../apache/spark/carbondata/TestStreamingTableOperation.scala   | 5 +++++
 .../org/apache/carbondata/streaming/StreamHandoffRDD.scala      | 1 +
 2 files changed, 6 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/067b833f/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
 
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
index a368cef..4b3a957 100644
--- 
a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
+++ 
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
@@ -637,6 +637,11 @@ class TestStreamingTableOperation extends QueryTest with 
BeforeAndAfterAll {
     assertResult(newSegments.length / 
2)(newSegments.filter(_.getString(1).equals("Success")).length)
     assertResult(newSegments.length / 
2)(newSegments.filter(_.getString(1).equals("Compacted")).length)
 
+    //Verify MergeTO column entry for compacted Segments
+    newSegments.filter(_.getString(1).equals("Compacted")).foreach{ rw =>
+      assertResult("Compacted")(rw.getString(1))
+      
assertResult((Integer.parseInt(rw.getString(0))+2).toString)(rw.getString(4))
+    }
     checkAnswer(
       sql("select count(*) from streaming.stream_table_reopen"),
       Seq(Row(2 * 100 * 2))

http://git-wip-us.apache.org/repos/asf/carbondata/blob/067b833f/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
 
b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
index 41dfa50..4caa401 100644
--- 
a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
+++ 
b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
@@ -398,6 +398,7 @@ object StreamHandoffRDD {
           throw new Exception("Failed to update table status for streaming 
segment")
         } else {
           streamSegment.get.setSegmentStatus(SegmentStatus.COMPACTED)
+          streamSegment.get.setMergedLoadName(loadModel.getSegmentId)
         }
 
         // refresh table status file

Reply via email to