Repository: carbondata Updated Branches: refs/heads/master e32551b8f -> 0528a7985
[CARBONDATA-2923] Log the hit information of streaming segments Log the hit information of streaming segments after the hit information of batch segments This closes #2700 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/0528a798 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/0528a798 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/0528a798 Branch: refs/heads/master Commit: 0528a79855a70cfa103d3083ffa977de6e11a839 Parents: e32551b Author: QiangCai <[email protected]> Authored: Mon Sep 10 11:05:43 2018 +0800 Committer: Jacky Li <[email protected]> Committed: Mon Sep 10 17:52:32 2018 +0800 ---------------------------------------------------------------------- .../org/apache/carbondata/core/stream/StreamPruner.java | 7 +++++++ .../apache/carbondata/hadoop/api/CarbonInputFormat.java | 10 ++++++++++ .../carbondata/hadoop/api/CarbonTableInputFormat.java | 4 +++- .../org/apache/carbondata/spark/rdd/CarbonScanRDD.scala | 12 ++++++++---- 4 files changed, 28 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/0528a798/core/src/main/java/org/apache/carbondata/core/stream/StreamPruner.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/stream/StreamPruner.java b/core/src/main/java/org/apache/carbondata/core/stream/StreamPruner.java index ac3589f..b40355b 100644 --- a/core/src/main/java/org/apache/carbondata/core/stream/StreamPruner.java +++ b/core/src/main/java/org/apache/carbondata/core/stream/StreamPruner.java @@ -45,6 +45,8 @@ public class StreamPruner { private CarbonTable carbonTable; private FilterExecuter filterExecuter; + private int totalFileNums = 0; + public StreamPruner(CarbonTable carbonTable) { this.carbonTable = carbonTable; } @@ -138,6 +140,11 @@ public class StreamPruner { } } } + totalFileNums = streamFileList.size(); return streamFileList; } + + public int getTotalFileNums() { + return totalFileNums; + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/0528a798/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java index b497e3a..d0dff5a 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java @@ -123,6 +123,8 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> { // record segment number and hit blocks protected int numSegments = 0; protected int numStreamSegments = 0; + protected int numStreamFiles = 0; + protected int hitedStreamFiles = 0; protected int numBlocks = 0; public int getNumSegments() { @@ -133,6 +135,14 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> { return numStreamSegments; } + public int getNumStreamFiles() { + return numStreamFiles; + } + + public int getHitedStreamFiles() { + return hitedStreamFiles; + } + public int getNumBlocks() { return numBlocks; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/0528a798/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java index 05b73dd..ba6e043 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java @@ -363,7 +363,9 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> { StreamPruner streamPruner = new StreamPruner(carbonTable); streamPruner.init(filterResolverIntf); List<StreamFile> streamFiles = streamPruner.prune(streamSegments); - + // record the hit information of the streaming files + this.hitedStreamFiles = streamFiles.size(); + this.numStreamFiles = streamPruner.getTotalFileNums(); for (StreamFile streamFile : streamFiles) { Path path = new Path(streamFile.getFilePath()); long length = streamFile.getFileSize(); http://git-wip-us.apache.org/repos/asf/carbondata/blob/0528a798/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala index 01c77f0..eb7abbc 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala @@ -165,14 +165,18 @@ class CarbonScanRDD[T: ClassTag]( if (batchPartitions.isEmpty) { partitions = streamPartitions.toArray } else { - logInfo( - s""" - | Identified no.of Streaming Blocks: ${ streamPartitions.size }, - """.stripMargin) // should keep the order by index of partition batchPartitions.appendAll(streamPartitions) partitions = batchPartitions.toArray } + logInfo( + s""" + | Identified no.of.streaming splits/tasks: ${ streamPartitions.size }, + | no.of.streaming files: ${format.getHitedStreamFiles}, + | no.of.total streaming files: ${format.getNumStreamFiles}, + | no.of.total streaming segement: ${format.getNumStreamSegments} + """.stripMargin) + } partitions } finally {
