Repository: carbondata Updated Branches: refs/heads/master 4b113b865 -> 65b69a9cd
[CARBONDATA-2261] Support Set segment command for Streaming Table Problem Statement: Currently Set Segment is not supported for Streaming segments. This PR is to support Set segment command for "Streaming " ,"Streaming Finished" segments This closes #2075 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/65b69a9c Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/65b69a9c Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/65b69a9c Branch: refs/heads/master Commit: 65b69a9cd902168ffec16cda9d84457d08bae22c Parents: 4b113b8 Author: BJangir <[email protected]> Authored: Sun Mar 18 23:00:30 2018 +0530 Committer: manishgupta88 <[email protected]> Committed: Wed Mar 21 20:27:16 2018 +0530 ---------------------------------------------------------------------- .../hadoop/api/CarbonTableInputFormat.java | 3 +- .../carbondata/spark/rdd/CarbonScanRDD.scala | 4 +++ .../TestStreamingTableOperation.scala | 30 ++++++++++++++++++++ 3 files changed, 36 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/65b69a9c/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java index bcc487e..11121e9 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java @@ -358,6 +358,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> { if (getValidateSegmentsToAccess(job.getConfiguration())) { List<Segment> validSegments = segments.getValidSegments(); streamSegments = segments.getStreamSegments(); + streamSegments = getFilteredSegment(job,streamSegments); if (validSegments.size() == 0) { return getSplitsOfStreaming(job, identifier, streamSegments); } @@ -366,7 +367,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> { List<Segment> filteredSegmentToAccess = getFilteredSegment(job, segments.getValidSegments()); if (filteredSegmentToAccess.size() == 0) { - return new ArrayList<>(0); + return getSplitsOfStreaming(job, identifier, streamSegments); } else { setSegmentsToAccess(job.getConfiguration(), filteredSegmentToAccess); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/65b69a9c/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala index 49a8023..b5c9543 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala @@ -130,6 +130,10 @@ class CarbonScanRDD( if (batchPartitions.isEmpty) { streamPartitions.toArray } else { + logInfo( + s""" + | Identified no.of Streaming Blocks: ${streamPartitions.size}, + """.stripMargin) // should keep the order by index of partition batchPartitions.appendAll(streamPartitions) batchPartitions.toArray http://git-wip-us.apache.org/repos/asf/carbondata/blob/65b69a9c/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala index aa00d07..00e1140 100644 --- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala +++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala @@ -997,6 +997,36 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { sql("select count(*) from streaming.stream_table_handoff"), Seq(Row(2 * 100)) ) + try{ + sql("set carbon.input.segments.streaming.stream_table_handoff = 1") + checkAnswer( + sql("select count(*) from streaming.stream_table_handoff"), + Seq(Row(100)) + ) + sql("set carbon.input.segments.streaming.stream_table_handoff = *") + checkAnswer( + sql("select count(*) from streaming.stream_table_handoff"), + Seq(Row(2 * 100)) + ) + sql("set carbon.input.segments.streaming.stream_table_handoff = 2") + checkAnswer( + sql("select count(*) from streaming.stream_table_handoff"), + Seq(Row(1 * 100)) + ) + sql("set carbon.input.segments.streaming.stream_table_handoff = 1,2") + checkAnswer( + sql("select count(*) from streaming.stream_table_handoff"), + Seq(Row(2 * 100)) + ) + sql("set carbon.input.segments.streaming.stream_table_handoff = 3") + checkAnswer( + sql("select count(*) from streaming.stream_table_handoff"), + Seq(Row(0)) + ) + } + finally { + sql("set carbon.input.segments.streaming.stream_table_handoff = *") + } } test("auto hand off, close and reopen streaming table") {
