APACHE-KYLIN-2746: Separate filter row count & aggregated row count for metrics 
collection returned by coprocessor

Signed-off-by: lidongsjtu <lid...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/5c8e4d8a
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/5c8e4d8a
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/5c8e4d8a

Branch: refs/heads/master
Commit: 5c8e4d8a2b6e2315e932458ef9c1c77152567970
Parents: e1479a7
Author: Zhong <nju_y...@apache.org>
Authored: Sun Oct 29 11:19:19 2017 +0800
Committer: lidongsjtu <lid...@apache.org>
Committed: Thu Nov 2 17:36:02 2017 +0800

----------------------------------------------------------------------
 .../kylin/gridtable/GTAggregateScanner.java     |   8 +-
 .../apache/kylin/gridtable/GTFilterScanner.java |   6 +
 .../hbase/cube/v2/CubeHBaseEndpointRPC.java     |   6 +-
 .../coprocessor/endpoint/CubeVisitService.java  |  10 +-
 .../endpoint/generated/CubeVisitProtos.java     | 115 +++++++++++++++++--
 .../endpoint/protobuf/CubeVisit.proto           |   1 +
 6 files changed, 128 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/5c8e4d8a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
----------------------------------------------------------------------
diff --git 
a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java 
b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
index a3ad0f6..1928e0b 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
@@ -79,7 +79,7 @@ public class GTAggregateScanner implements IGTScanner, 
IGTBypassChecker {
     final boolean spillEnabled;
     final TupleFilter havingFilter;
 
-    private int aggregatedRowCount = 0;
+    private long inputRowCount = 0L;
     private MemoryWaterLevel memTracker;
     private boolean[] aggrMask;
 
@@ -151,6 +151,10 @@ public class GTAggregateScanner implements IGTScanner, 
IGTBypassChecker {
         return info;
     }
 
+    public long getInputRowCount() {
+        return inputRowCount;
+    }
+
     @Override
     public void close() throws IOException {
         inputScanner.close();
@@ -371,7 +375,7 @@ public class GTAggregateScanner implements IGTScanner, 
IGTBypassChecker {
         }
 
         boolean aggregate(GTRecord r) {
-            if (++aggregatedRowCount % 100000 == 0) {
+            if (++inputRowCount % 100000 == 0) {
                 if (memTracker != null) {
                     memTracker.markHigh();
                 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/5c8e4d8a/core-cube/src/main/java/org/apache/kylin/gridtable/GTFilterScanner.java
----------------------------------------------------------------------
diff --git 
a/core-cube/src/main/java/org/apache/kylin/gridtable/GTFilterScanner.java 
b/core-cube/src/main/java/org/apache/kylin/gridtable/GTFilterScanner.java
index 11a23d6..12074bd 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTFilterScanner.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTFilterScanner.java
@@ -39,6 +39,7 @@ public class GTFilterScanner extends GTForwardingScanner {
     private IEvaluatableTuple oneTuple; // avoid instance creation
 
     private GTRecord next = null;
+    private long inputRowCount = 0L;
 
     private IGTBypassChecker checker = null;
 
@@ -65,6 +66,10 @@ public class GTFilterScanner extends GTForwardingScanner {
         this.checker = checker;
     }
 
+    public long getInputRowCount() {
+        return inputRowCount;
+    }
+
     @Override
     public Iterator<GTRecord> iterator() {
         return new Iterator<GTRecord>() {
@@ -79,6 +84,7 @@ public class GTFilterScanner extends GTForwardingScanner {
 
                 while (inputIterator.hasNext()) {
                     next = inputIterator.next();
+                    inputRowCount++;
                     if (!evaluate()) {
                         continue;
                     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/5c8e4d8a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
index 0de1cc1..03f8937 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
@@ -214,7 +214,8 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
                                                 cuboid.getId(), 
storageContext.getFilterMask(), rpcException,
                                                 stats.getServiceEndTime() - 
stats.getServiceStartTime(), 0,
                                                 stats.getScannedRowCount(),
-                                                stats.getScannedRowCount() - 
stats.getAggregatedRowCount(),
+                                                stats.getScannedRowCount() - 
stats.getAggregatedRowCount()
+                                                        - 
stats.getFilteredRowCount(),
                                                 stats.getAggregatedRowCount(), 
stats.getScannedBytes());
 
                                         // if any other region has responded 
with error, skip further processing
@@ -309,7 +310,8 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
         sb.append("Endpoint RPC returned from HTable 
").append(cubeSeg.getStorageLocationIdentifier()).append(" Shard 
").append(BytesUtil.toHex(region)).append(" on host: 
").append(stats.getHostname()).append(".");
         sb.append("Total scanned row: 
").append(stats.getScannedRowCount()).append(". ");
         sb.append("Total scanned bytes: 
").append(stats.getScannedBytes()).append(". ");
-        sb.append("Total filtered/aggred row: 
").append(stats.getAggregatedRowCount()).append(". ");
+        sb.append("Total filtered row: 
").append(stats.getFilteredRowCount()).append(". ");
+        sb.append("Total aggred row: 
").append(stats.getAggregatedRowCount()).append(". ");
         sb.append("Time elapsed in EP: ").append(stats.getServiceEndTime() - 
stats.getServiceStartTime()).append("(ms). ");
         sb.append("Server CPU usage: 
").append(stats.getSystemCpuLoad()).append(", server physical mem left: 
").append(stats.getFreePhysicalMemorySize()).append(", server swap mem 
left:").append(stats.getFreeSwapSpaceSize()).append(".");
         sb.append("Etc message: ").append(stats.getEtcMsg()).append(".");

http://git-wip-us.apache.org/repos/asf/kylin/blob/5c8e4d8a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
index d94b547..d99e6c5 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
@@ -48,6 +48,7 @@ import org.apache.kylin.common.util.BytesUtil;
 import org.apache.kylin.common.util.CompressionUtils;
 import org.apache.kylin.common.util.SetThreadName;
 import org.apache.kylin.cube.kv.RowConstants;
+import org.apache.kylin.gridtable.GTAggregateScanner;
 import org.apache.kylin.gridtable.GTRecord;
 import org.apache.kylin.gridtable.GTScanRequest;
 import org.apache.kylin.gridtable.IGTScanner;
@@ -310,7 +311,7 @@ public class CubeVisitService extends 
CubeVisitProtos.CubeVisitService implement
             ByteBuffer buffer = 
ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE);
 
             ByteArrayOutputStream outputStream = new 
ByteArrayOutputStream(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE);//ByteArrayOutputStream
 will auto grow
-            int finalRowCount = 0;
+            long finalRowCount = 0L;
 
             try {
                 for (GTRecord oneRecord : finalScanner) {
@@ -349,6 +350,10 @@ public class CubeVisitService extends 
CubeVisitProtos.CubeVisitService implement
                 finalScanner.close();
             }
 
+            long rowCountBeforeAggr = finalScanner instanceof 
GTAggregateScanner
+                    ? ((GTAggregateScanner) finalScanner).getInputRowCount()
+                    : finalRowCount;
+
             appendProfileInfo(sb, "agg done", serviceStartTime);
             logger.info("Total scanned {} rows and {} bytes", 
cellListIterator.getTotalScannedRowCount(),
                     cellListIterator.getTotalScannedRowBytes());
@@ -385,7 +390,8 @@ public class CubeVisitService extends 
CubeVisitProtos.CubeVisitService implement
             done.run(responseBuilder.//
                     
setCompressedRows(HBaseZeroCopyByteString.wrap(compressedAllRows)).//too many 
array copies 
                     
setStats(CubeVisitProtos.CubeVisitResponse.Stats.newBuilder()
-                            
.setAggregatedRowCount(cellListIterator.getTotalScannedRowCount() - 
finalRowCount)
+                            
.setFilteredRowCount(cellListIterator.getTotalScannedRowCount() - 
rowCountBeforeAggr)
+                            .setAggregatedRowCount(rowCountBeforeAggr - 
finalRowCount)
                             
.setScannedRowCount(cellListIterator.getTotalScannedRowCount())
                             
.setScannedBytes(cellListIterator.getTotalScannedRowBytes())
                             
.setServiceStartTime(serviceStartTime).setServiceEndTime(System.currentTimeMillis())

http://git-wip-us.apache.org/repos/asf/kylin/blob/5c8e4d8a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/generated/CubeVisitProtos.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/generated/CubeVisitProtos.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/generated/CubeVisitProtos.java
index 4c662c9..b3e8e4e 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/generated/CubeVisitProtos.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/generated/CubeVisitProtos.java
@@ -2599,6 +2599,16 @@ public final class CubeVisitProtos {
        * <code>optional int64 scannedBytes = 11;</code>
        */
       long getScannedBytes();
+
+      // optional int64 filteredRowCount = 12;
+      /**
+       * <code>optional int64 filteredRowCount = 12;</code>
+       */
+      boolean hasFilteredRowCount();
+      /**
+       * <code>optional int64 filteredRowCount = 12;</code>
+       */
+      long getFilteredRowCount();
     }
     /**
      * Protobuf type {@code CubeVisitResponse.Stats}
@@ -2706,6 +2716,11 @@ public final class CubeVisitProtos {
                 scannedBytes_ = input.readInt64();
                 break;
               }
+              case 96: {
+                bitField0_ |= 0x00000800;
+                filteredRowCount_ = input.readInt64();
+                break;
+              }
             }
           }
         } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -2976,6 +2991,22 @@ public final class CubeVisitProtos {
         return scannedBytes_;
       }
 
+      // optional int64 filteredRowCount = 12;
+      public static final int FILTEREDROWCOUNT_FIELD_NUMBER = 12;
+      private long filteredRowCount_;
+      /**
+       * <code>optional int64 filteredRowCount = 12;</code>
+       */
+      public boolean hasFilteredRowCount() {
+        return ((bitField0_ & 0x00000800) == 0x00000800);
+      }
+      /**
+       * <code>optional int64 filteredRowCount = 12;</code>
+       */
+      public long getFilteredRowCount() {
+        return filteredRowCount_;
+      }
+
       private void initFields() {
         serviceStartTime_ = 0L;
         serviceEndTime_ = 0L;
@@ -2988,6 +3019,7 @@ public final class CubeVisitProtos {
         etcMsg_ = "";
         normalComplete_ = 0;
         scannedBytes_ = 0L;
+        filteredRowCount_ = 0L;
       }
       private byte memoizedIsInitialized = -1;
       public final boolean isInitialized() {
@@ -3034,6 +3066,9 @@ public final class CubeVisitProtos {
         if (((bitField0_ & 0x00000400) == 0x00000400)) {
           output.writeInt64(11, scannedBytes_);
         }
+        if (((bitField0_ & 0x00000800) == 0x00000800)) {
+          output.writeInt64(12, filteredRowCount_);
+        }
         getUnknownFields().writeTo(output);
       }
 
@@ -3087,6 +3122,10 @@ public final class CubeVisitProtos {
           size += com.google.protobuf.CodedOutputStream
             .computeInt64Size(11, scannedBytes_);
         }
+        if (((bitField0_ & 0x00000800) == 0x00000800)) {
+          size += com.google.protobuf.CodedOutputStream
+            .computeInt64Size(12, filteredRowCount_);
+        }
         size += getUnknownFields().getSerializedSize();
         memoizedSerializedSize = size;
         return size;
@@ -3162,6 +3201,11 @@ public final class CubeVisitProtos {
           result = result && (getScannedBytes()
               == other.getScannedBytes());
         }
+        result = result && (hasFilteredRowCount() == 
other.hasFilteredRowCount());
+        if (hasFilteredRowCount()) {
+          result = result && (getFilteredRowCount()
+              == other.getFilteredRowCount());
+        }
         result = result &&
             getUnknownFields().equals(other.getUnknownFields());
         return result;
@@ -3222,6 +3266,10 @@ public final class CubeVisitProtos {
           hash = (37 * hash) + SCANNEDBYTES_FIELD_NUMBER;
           hash = (53 * hash) + hashLong(getScannedBytes());
         }
+        if (hasFilteredRowCount()) {
+          hash = (37 * hash) + FILTEREDROWCOUNT_FIELD_NUMBER;
+          hash = (53 * hash) + hashLong(getFilteredRowCount());
+        }
         hash = (29 * hash) + getUnknownFields().hashCode();
         memoizedHashCode = hash;
         return hash;
@@ -3353,6 +3401,8 @@ public final class CubeVisitProtos {
           bitField0_ = (bitField0_ & ~0x00000200);
           scannedBytes_ = 0L;
           bitField0_ = (bitField0_ & ~0x00000400);
+          filteredRowCount_ = 0L;
+          bitField0_ = (bitField0_ & ~0x00000800);
           return this;
         }
 
@@ -3425,6 +3475,10 @@ public final class CubeVisitProtos {
             to_bitField0_ |= 0x00000400;
           }
           result.scannedBytes_ = scannedBytes_;
+          if (((from_bitField0_ & 0x00000800) == 0x00000800)) {
+            to_bitField0_ |= 0x00000800;
+          }
+          result.filteredRowCount_ = filteredRowCount_;
           result.bitField0_ = to_bitField0_;
           onBuilt();
           return result;
@@ -3478,6 +3532,9 @@ public final class CubeVisitProtos {
           if (other.hasScannedBytes()) {
             setScannedBytes(other.getScannedBytes());
           }
+          if (other.hasFilteredRowCount()) {
+            setFilteredRowCount(other.getFilteredRowCount());
+          }
           this.mergeUnknownFields(other.getUnknownFields());
           return this;
         }
@@ -3950,6 +4007,39 @@ public final class CubeVisitProtos {
           return this;
         }
 
+        // optional int64 filteredRowCount = 12;
+        private long filteredRowCount_ ;
+        /**
+         * <code>optional int64 filteredRowCount = 12;</code>
+         */
+        public boolean hasFilteredRowCount() {
+          return ((bitField0_ & 0x00000800) == 0x00000800);
+        }
+        /**
+         * <code>optional int64 filteredRowCount = 12;</code>
+         */
+        public long getFilteredRowCount() {
+          return filteredRowCount_;
+        }
+        /**
+         * <code>optional int64 filteredRowCount = 12;</code>
+         */
+        public Builder setFilteredRowCount(long value) {
+          bitField0_ |= 0x00000800;
+          filteredRowCount_ = value;
+          onChanged();
+          return this;
+        }
+        /**
+         * <code>optional int64 filteredRowCount = 12;</code>
+         */
+        public Builder clearFilteredRowCount() {
+          bitField0_ = (bitField0_ & ~0x00000800);
+          filteredRowCount_ = 0L;
+          onChanged();
+          return this;
+        }
+
         // @@protoc_insertion_point(builder_scope:CubeVisitResponse.Stats)
       }
 
@@ -5614,25 +5704,26 @@ public final class CubeVisitProtos {
       "\030\005 \002(\t\022\017\n\007queryId\030\006 
\001(\t\022\032\n\014spillEnabled\030" +
       "\007 \001(\010:\004true\022\024\n\014maxScanBytes\030\010 
\001(\003\022\037\n\020isE" +
       "xactAggregate\030\t \001(\010:\005false\032\027\n\007IntList\022\014\n",
-      "\004ints\030\001 
\003(\005\"\253\004\n\021CubeVisitResponse\022\026\n\016com" +
+      "\004ints\030\001 
\003(\005\"\305\004\n\021CubeVisitResponse\022\026\n\016com" +
       "pressedRows\030\001 \002(\014\022\'\n\005stats\030\002 
\002(\0132\030.CubeV" +
       "isitResponse.Stats\022/\n\terrorInfo\030\003 \001(\0132\034." +
-      "CubeVisitResponse.ErrorInfo\032\220\002\n\005Stats\022\030\n" +
+      "CubeVisitResponse.ErrorInfo\032\252\002\n\005Stats\022\030\n" +
       "\020serviceStartTime\030\001 \001(\003\022\026\n\016serviceEndTim" +
       "e\030\002 \001(\003\022\027\n\017scannedRowCount\030\003 
\001(\003\022\032\n\022aggr" +
       "egatedRowCount\030\004 \001(\003\022\025\n\rsystemCpuLoad\030\005 " +
       "\001(\001\022\036\n\026freePhysicalMemorySize\030\006 
\001(\001\022\031\n\021f" +
       "reeSwapSpaceSize\030\007 \001(\001\022\020\n\010hostname\030\010 
\001(\t" +
       "\022\016\n\006etcMsg\030\t \001(\t\022\026\n\016normalComplete\030\n 
\001(\005",
-      "\022\024\n\014scannedBytes\030\013 
\001(\003\032H\n\tErrorInfo\022*\n\004t" +
-      "ype\030\001 \002(\0162\034.CubeVisitResponse.ErrorType\022" +
-      "\017\n\007message\030\002 \002(\t\"G\n\tErrorType\022\020\n\014UNKNOWN" 
+
-      
"_TYPE\020\000\022\013\n\007TIMEOUT\020\001\022\033\n\027RESOURCE_LIMIT_E" +
-      "XCEEDED\020\0022F\n\020CubeVisitService\0222\n\tvisitCu" +
-      "be\022\021.CubeVisitRequest\032\022.CubeVisitRespons" +
-      "eB`\nEorg.apache.kylin.storage.hbase.cube" +
-      ".v2.coprocessor.endpoint.generatedB\017Cube" +
-      "VisitProtosH\001\210\001\001\240\001\001"
+      "\022\024\n\014scannedBytes\030\013 
\001(\003\022\030\n\020filteredRowCou" +
+      "nt\030\014 \001(\003\032H\n\tErrorInfo\022*\n\004type\030\001 
\002(\0162\034.Cu" +
+      "beVisitResponse.ErrorType\022\017\n\007message\030\002 \002" +
+      "(\t\"G\n\tErrorType\022\020\n\014UNKNOWN_TYPE\020\000\022\013\n\007TIM" 
+
+      "EOUT\020\001\022\033\n\027RESOURCE_LIMIT_EXCEEDED\020\0022F\n\020C" +
+      "ubeVisitService\0222\n\tvisitCube\022\021.CubeVisit" +
+      "Request\032\022.CubeVisitResponseB`\nEorg.apach" +
+      "e.kylin.storage.hbase.cube.v2.coprocesso" +
+      "r.endpoint.generatedB\017CubeVisitProtosH\001\210" +
+      "\001\001\240\001\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner 
assigner =
       new 
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -5662,7 +5753,7 @@ public final class CubeVisitProtos {
           internal_static_CubeVisitResponse_Stats_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_CubeVisitResponse_Stats_descriptor,
-              new java.lang.String[] { "ServiceStartTime", "ServiceEndTime", 
"ScannedRowCount", "AggregatedRowCount", "SystemCpuLoad", 
"FreePhysicalMemorySize", "FreeSwapSpaceSize", "Hostname", "EtcMsg", 
"NormalComplete", "ScannedBytes", });
+              new java.lang.String[] { "ServiceStartTime", "ServiceEndTime", 
"ScannedRowCount", "AggregatedRowCount", "SystemCpuLoad", 
"FreePhysicalMemorySize", "FreeSwapSpaceSize", "Hostname", "EtcMsg", 
"NormalComplete", "ScannedBytes", "FilteredRowCount", });
           internal_static_CubeVisitResponse_ErrorInfo_descriptor =
             
internal_static_CubeVisitResponse_descriptor.getNestedTypes().get(1);
           internal_static_CubeVisitResponse_ErrorInfo_fieldAccessorTable = new

http://git-wip-us.apache.org/repos/asf/kylin/blob/5c8e4d8a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/protobuf/CubeVisit.proto
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/protobuf/CubeVisit.proto
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/protobuf/CubeVisit.proto
index 8ca8756..40dbc68 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/protobuf/CubeVisit.proto
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/protobuf/CubeVisit.proto
@@ -57,6 +57,7 @@ message CubeVisitResponse {
         optional string etcMsg = 9;
         optional int32 normalComplete =10;
         optional int64 scannedBytes = 11;
+        optional int64 filteredRowCount = 12;
     }
     enum ErrorType {
         UNKNOWN_TYPE = 0;

Reply via email to