Repository: carbondata Updated Branches: refs/heads/branch-1.3 87de69729 -> 53200ccff
[CARBONDATA-2261] Support Set segment command for Streaming Table Problem Statement: Currently Set Segment is not supported for Streaming segments. This PR is to support Set segment command for "Streaming " ,"Streaming Finished" segments This closes #2075 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/53200ccf Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/53200ccf Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/53200ccf Branch: refs/heads/branch-1.3 Commit: 53200ccffa65b11a7be4dbfa86794e4657126619 Parents: 87de697 Author: BJangir <[email protected]> Authored: Sun Mar 18 23:00:30 2018 +0530 Committer: manishgupta88 <[email protected]> Committed: Wed Mar 21 20:39:55 2018 +0530 ---------------------------------------------------------------------- .../hadoop/api/CarbonTableInputFormat.java | 3 +- .../carbondata/spark/rdd/CarbonScanRDD.scala | 4 +++ .../TestStreamingTableOperation.scala | 30 ++++++++++++++++++++ 3 files changed, 36 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/53200ccf/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java index f6624cd..2d6c03a 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java @@ -364,6 +364,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> { if (getValidateSegmentsToAccess(job.getConfiguration())) { List<Segment> validSegments = segments.getValidSegments(); streamSegments = segments.getStreamSegments(); + streamSegments = getFilteredSegment(job,streamSegments); if (validSegments.size() == 0) { return getSplitsOfStreaming(job, identifier, streamSegments); } @@ -372,7 +373,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> { List<Segment> filteredSegmentToAccess = getFilteredSegment(job, segments.getValidSegments()); if (filteredSegmentToAccess.size() == 0) { - return new ArrayList<>(0); + return getSplitsOfStreaming(job, identifier, streamSegments); } else { setSegmentsToAccess(job.getConfiguration(), filteredSegmentToAccess); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/53200ccf/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala index 772f702..a62f60a 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala @@ -130,6 +130,10 @@ class CarbonScanRDD( if (batchPartitions.isEmpty) { streamPartitions.toArray } else { + logInfo( + s""" + | Identified no.of Streaming Blocks: ${streamPartitions.size}, + """.stripMargin) // should keep the order by index of partition batchPartitions.appendAll(streamPartitions) batchPartitions.toArray http://git-wip-us.apache.org/repos/asf/carbondata/blob/53200ccf/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala index a7dfabd..53edb9d 100644 --- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala +++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala @@ -994,6 +994,36 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { sql("select count(*) from streaming.stream_table_handoff"), Seq(Row(2 * 100)) ) + try{ + sql("set carbon.input.segments.streaming.stream_table_handoff = 1") + checkAnswer( + sql("select count(*) from streaming.stream_table_handoff"), + Seq(Row(100)) + ) + sql("set carbon.input.segments.streaming.stream_table_handoff = *") + checkAnswer( + sql("select count(*) from streaming.stream_table_handoff"), + Seq(Row(2 * 100)) + ) + sql("set carbon.input.segments.streaming.stream_table_handoff = 2") + checkAnswer( + sql("select count(*) from streaming.stream_table_handoff"), + Seq(Row(1 * 100)) + ) + sql("set carbon.input.segments.streaming.stream_table_handoff = 1,2") + checkAnswer( + sql("select count(*) from streaming.stream_table_handoff"), + Seq(Row(2 * 100)) + ) + sql("set carbon.input.segments.streaming.stream_table_handoff = 3") + checkAnswer( + sql("select count(*) from streaming.stream_table_handoff"), + Seq(Row(0)) + ) + } + finally { + sql("set carbon.input.segments.streaming.stream_table_handoff = *") + } try { sql("ALTER TABLE stream_table_handoff SET TBLPROPERTIES('streaming'='false')")
