APACHE-KYLIN-2746: Separate filter row count & aggregated row count for metrics collection returned by coprocessor
Signed-off-by: lidongsjtu <lid...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/5c8e4d8a Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/5c8e4d8a Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/5c8e4d8a Branch: refs/heads/master Commit: 5c8e4d8a2b6e2315e932458ef9c1c77152567970 Parents: e1479a7 Author: Zhong <nju_y...@apache.org> Authored: Sun Oct 29 11:19:19 2017 +0800 Committer: lidongsjtu <lid...@apache.org> Committed: Thu Nov 2 17:36:02 2017 +0800 ---------------------------------------------------------------------- .../kylin/gridtable/GTAggregateScanner.java | 8 +- .../apache/kylin/gridtable/GTFilterScanner.java | 6 + .../hbase/cube/v2/CubeHBaseEndpointRPC.java | 6 +- .../coprocessor/endpoint/CubeVisitService.java | 10 +- .../endpoint/generated/CubeVisitProtos.java | 115 +++++++++++++++++-- .../endpoint/protobuf/CubeVisit.proto | 1 + 6 files changed, 128 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/5c8e4d8a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java index a3ad0f6..1928e0b 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java @@ -79,7 +79,7 @@ public class GTAggregateScanner implements IGTScanner, IGTBypassChecker { final boolean spillEnabled; final TupleFilter havingFilter; - private int aggregatedRowCount = 0; + private long inputRowCount = 0L; private MemoryWaterLevel memTracker; private boolean[] aggrMask; @@ -151,6 +151,10 @@ public class GTAggregateScanner implements IGTScanner, IGTBypassChecker { return info; } + public long getInputRowCount() { + return inputRowCount; + } + @Override public void close() throws IOException { inputScanner.close(); @@ -371,7 +375,7 @@ public class GTAggregateScanner implements IGTScanner, IGTBypassChecker { } boolean aggregate(GTRecord r) { - if (++aggregatedRowCount % 100000 == 0) { + if (++inputRowCount % 100000 == 0) { if (memTracker != null) { memTracker.markHigh(); } http://git-wip-us.apache.org/repos/asf/kylin/blob/5c8e4d8a/core-cube/src/main/java/org/apache/kylin/gridtable/GTFilterScanner.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTFilterScanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTFilterScanner.java index 11a23d6..12074bd 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTFilterScanner.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTFilterScanner.java @@ -39,6 +39,7 @@ public class GTFilterScanner extends GTForwardingScanner { private IEvaluatableTuple oneTuple; // avoid instance creation private GTRecord next = null; + private long inputRowCount = 0L; private IGTBypassChecker checker = null; @@ -65,6 +66,10 @@ public class GTFilterScanner extends GTForwardingScanner { this.checker = checker; } + public long getInputRowCount() { + return inputRowCount; + } + @Override public Iterator<GTRecord> iterator() { return new Iterator<GTRecord>() { @@ -79,6 +84,7 @@ public class GTFilterScanner extends GTForwardingScanner { while (inputIterator.hasNext()) { next = inputIterator.next(); + inputRowCount++; if (!evaluate()) { continue; } http://git-wip-us.apache.org/repos/asf/kylin/blob/5c8e4d8a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java index 0de1cc1..03f8937 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java @@ -214,7 +214,8 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { cuboid.getId(), storageContext.getFilterMask(), rpcException, stats.getServiceEndTime() - stats.getServiceStartTime(), 0, stats.getScannedRowCount(), - stats.getScannedRowCount() - stats.getAggregatedRowCount(), + stats.getScannedRowCount() - stats.getAggregatedRowCount() + - stats.getFilteredRowCount(), stats.getAggregatedRowCount(), stats.getScannedBytes()); // if any other region has responded with error, skip further processing @@ -309,7 +310,8 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { sb.append("Endpoint RPC returned from HTable ").append(cubeSeg.getStorageLocationIdentifier()).append(" Shard ").append(BytesUtil.toHex(region)).append(" on host: ").append(stats.getHostname()).append("."); sb.append("Total scanned row: ").append(stats.getScannedRowCount()).append(". "); sb.append("Total scanned bytes: ").append(stats.getScannedBytes()).append(". "); - sb.append("Total filtered/aggred row: ").append(stats.getAggregatedRowCount()).append(". "); + sb.append("Total filtered row: ").append(stats.getFilteredRowCount()).append(". "); + sb.append("Total aggred row: ").append(stats.getAggregatedRowCount()).append(". "); sb.append("Time elapsed in EP: ").append(stats.getServiceEndTime() - stats.getServiceStartTime()).append("(ms). "); sb.append("Server CPU usage: ").append(stats.getSystemCpuLoad()).append(", server physical mem left: ").append(stats.getFreePhysicalMemorySize()).append(", server swap mem left:").append(stats.getFreeSwapSpaceSize()).append("."); sb.append("Etc message: ").append(stats.getEtcMsg()).append("."); http://git-wip-us.apache.org/repos/asf/kylin/blob/5c8e4d8a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java index d94b547..d99e6c5 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java @@ -48,6 +48,7 @@ import org.apache.kylin.common.util.BytesUtil; import org.apache.kylin.common.util.CompressionUtils; import org.apache.kylin.common.util.SetThreadName; import org.apache.kylin.cube.kv.RowConstants; +import org.apache.kylin.gridtable.GTAggregateScanner; import org.apache.kylin.gridtable.GTRecord; import org.apache.kylin.gridtable.GTScanRequest; import org.apache.kylin.gridtable.IGTScanner; @@ -310,7 +311,7 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement ByteBuffer buffer = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE); ByteArrayOutputStream outputStream = new ByteArrayOutputStream(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE);//ByteArrayOutputStream will auto grow - int finalRowCount = 0; + long finalRowCount = 0L; try { for (GTRecord oneRecord : finalScanner) { @@ -349,6 +350,10 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement finalScanner.close(); } + long rowCountBeforeAggr = finalScanner instanceof GTAggregateScanner + ? ((GTAggregateScanner) finalScanner).getInputRowCount() + : finalRowCount; + appendProfileInfo(sb, "agg done", serviceStartTime); logger.info("Total scanned {} rows and {} bytes", cellListIterator.getTotalScannedRowCount(), cellListIterator.getTotalScannedRowBytes()); @@ -385,7 +390,8 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement done.run(responseBuilder.// setCompressedRows(HBaseZeroCopyByteString.wrap(compressedAllRows)).//too many array copies setStats(CubeVisitProtos.CubeVisitResponse.Stats.newBuilder() - .setAggregatedRowCount(cellListIterator.getTotalScannedRowCount() - finalRowCount) + .setFilteredRowCount(cellListIterator.getTotalScannedRowCount() - rowCountBeforeAggr) + .setAggregatedRowCount(rowCountBeforeAggr - finalRowCount) .setScannedRowCount(cellListIterator.getTotalScannedRowCount()) .setScannedBytes(cellListIterator.getTotalScannedRowBytes()) .setServiceStartTime(serviceStartTime).setServiceEndTime(System.currentTimeMillis()) http://git-wip-us.apache.org/repos/asf/kylin/blob/5c8e4d8a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/generated/CubeVisitProtos.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/generated/CubeVisitProtos.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/generated/CubeVisitProtos.java index 4c662c9..b3e8e4e 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/generated/CubeVisitProtos.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/generated/CubeVisitProtos.java @@ -2599,6 +2599,16 @@ public final class CubeVisitProtos { * <code>optional int64 scannedBytes = 11;</code> */ long getScannedBytes(); + + // optional int64 filteredRowCount = 12; + /** + * <code>optional int64 filteredRowCount = 12;</code> + */ + boolean hasFilteredRowCount(); + /** + * <code>optional int64 filteredRowCount = 12;</code> + */ + long getFilteredRowCount(); } /** * Protobuf type {@code CubeVisitResponse.Stats} @@ -2706,6 +2716,11 @@ public final class CubeVisitProtos { scannedBytes_ = input.readInt64(); break; } + case 96: { + bitField0_ |= 0x00000800; + filteredRowCount_ = input.readInt64(); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -2976,6 +2991,22 @@ public final class CubeVisitProtos { return scannedBytes_; } + // optional int64 filteredRowCount = 12; + public static final int FILTEREDROWCOUNT_FIELD_NUMBER = 12; + private long filteredRowCount_; + /** + * <code>optional int64 filteredRowCount = 12;</code> + */ + public boolean hasFilteredRowCount() { + return ((bitField0_ & 0x00000800) == 0x00000800); + } + /** + * <code>optional int64 filteredRowCount = 12;</code> + */ + public long getFilteredRowCount() { + return filteredRowCount_; + } + private void initFields() { serviceStartTime_ = 0L; serviceEndTime_ = 0L; @@ -2988,6 +3019,7 @@ public final class CubeVisitProtos { etcMsg_ = ""; normalComplete_ = 0; scannedBytes_ = 0L; + filteredRowCount_ = 0L; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -3034,6 +3066,9 @@ public final class CubeVisitProtos { if (((bitField0_ & 0x00000400) == 0x00000400)) { output.writeInt64(11, scannedBytes_); } + if (((bitField0_ & 0x00000800) == 0x00000800)) { + output.writeInt64(12, filteredRowCount_); + } getUnknownFields().writeTo(output); } @@ -3087,6 +3122,10 @@ public final class CubeVisitProtos { size += com.google.protobuf.CodedOutputStream .computeInt64Size(11, scannedBytes_); } + if (((bitField0_ & 0x00000800) == 0x00000800)) { + size += com.google.protobuf.CodedOutputStream + .computeInt64Size(12, filteredRowCount_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -3162,6 +3201,11 @@ public final class CubeVisitProtos { result = result && (getScannedBytes() == other.getScannedBytes()); } + result = result && (hasFilteredRowCount() == other.hasFilteredRowCount()); + if (hasFilteredRowCount()) { + result = result && (getFilteredRowCount() + == other.getFilteredRowCount()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -3222,6 +3266,10 @@ public final class CubeVisitProtos { hash = (37 * hash) + SCANNEDBYTES_FIELD_NUMBER; hash = (53 * hash) + hashLong(getScannedBytes()); } + if (hasFilteredRowCount()) { + hash = (37 * hash) + FILTEREDROWCOUNT_FIELD_NUMBER; + hash = (53 * hash) + hashLong(getFilteredRowCount()); + } hash = (29 * hash) + getUnknownFields().hashCode(); memoizedHashCode = hash; return hash; @@ -3353,6 +3401,8 @@ public final class CubeVisitProtos { bitField0_ = (bitField0_ & ~0x00000200); scannedBytes_ = 0L; bitField0_ = (bitField0_ & ~0x00000400); + filteredRowCount_ = 0L; + bitField0_ = (bitField0_ & ~0x00000800); return this; } @@ -3425,6 +3475,10 @@ public final class CubeVisitProtos { to_bitField0_ |= 0x00000400; } result.scannedBytes_ = scannedBytes_; + if (((from_bitField0_ & 0x00000800) == 0x00000800)) { + to_bitField0_ |= 0x00000800; + } + result.filteredRowCount_ = filteredRowCount_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -3478,6 +3532,9 @@ public final class CubeVisitProtos { if (other.hasScannedBytes()) { setScannedBytes(other.getScannedBytes()); } + if (other.hasFilteredRowCount()) { + setFilteredRowCount(other.getFilteredRowCount()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -3950,6 +4007,39 @@ public final class CubeVisitProtos { return this; } + // optional int64 filteredRowCount = 12; + private long filteredRowCount_ ; + /** + * <code>optional int64 filteredRowCount = 12;</code> + */ + public boolean hasFilteredRowCount() { + return ((bitField0_ & 0x00000800) == 0x00000800); + } + /** + * <code>optional int64 filteredRowCount = 12;</code> + */ + public long getFilteredRowCount() { + return filteredRowCount_; + } + /** + * <code>optional int64 filteredRowCount = 12;</code> + */ + public Builder setFilteredRowCount(long value) { + bitField0_ |= 0x00000800; + filteredRowCount_ = value; + onChanged(); + return this; + } + /** + * <code>optional int64 filteredRowCount = 12;</code> + */ + public Builder clearFilteredRowCount() { + bitField0_ = (bitField0_ & ~0x00000800); + filteredRowCount_ = 0L; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:CubeVisitResponse.Stats) } @@ -5614,25 +5704,26 @@ public final class CubeVisitProtos { "\030\005 \002(\t\022\017\n\007queryId\030\006 \001(\t\022\032\n\014spillEnabled\030" + "\007 \001(\010:\004true\022\024\n\014maxScanBytes\030\010 \001(\003\022\037\n\020isE" + "xactAggregate\030\t \001(\010:\005false\032\027\n\007IntList\022\014\n", - "\004ints\030\001 \003(\005\"\253\004\n\021CubeVisitResponse\022\026\n\016com" + + "\004ints\030\001 \003(\005\"\305\004\n\021CubeVisitResponse\022\026\n\016com" + "pressedRows\030\001 \002(\014\022\'\n\005stats\030\002 \002(\0132\030.CubeV" + "isitResponse.Stats\022/\n\terrorInfo\030\003 \001(\0132\034." + - "CubeVisitResponse.ErrorInfo\032\220\002\n\005Stats\022\030\n" + + "CubeVisitResponse.ErrorInfo\032\252\002\n\005Stats\022\030\n" + "\020serviceStartTime\030\001 \001(\003\022\026\n\016serviceEndTim" + "e\030\002 \001(\003\022\027\n\017scannedRowCount\030\003 \001(\003\022\032\n\022aggr" + "egatedRowCount\030\004 \001(\003\022\025\n\rsystemCpuLoad\030\005 " + "\001(\001\022\036\n\026freePhysicalMemorySize\030\006 \001(\001\022\031\n\021f" + "reeSwapSpaceSize\030\007 \001(\001\022\020\n\010hostname\030\010 \001(\t" + "\022\016\n\006etcMsg\030\t \001(\t\022\026\n\016normalComplete\030\n \001(\005", - "\022\024\n\014scannedBytes\030\013 \001(\003\032H\n\tErrorInfo\022*\n\004t" + - "ype\030\001 \002(\0162\034.CubeVisitResponse.ErrorType\022" + - "\017\n\007message\030\002 \002(\t\"G\n\tErrorType\022\020\n\014UNKNOWN" + - "_TYPE\020\000\022\013\n\007TIMEOUT\020\001\022\033\n\027RESOURCE_LIMIT_E" + - "XCEEDED\020\0022F\n\020CubeVisitService\0222\n\tvisitCu" + - "be\022\021.CubeVisitRequest\032\022.CubeVisitRespons" + - "eB`\nEorg.apache.kylin.storage.hbase.cube" + - ".v2.coprocessor.endpoint.generatedB\017Cube" + - "VisitProtosH\001\210\001\001\240\001\001" + "\022\024\n\014scannedBytes\030\013 \001(\003\022\030\n\020filteredRowCou" + + "nt\030\014 \001(\003\032H\n\tErrorInfo\022*\n\004type\030\001 \002(\0162\034.Cu" + + "beVisitResponse.ErrorType\022\017\n\007message\030\002 \002" + + "(\t\"G\n\tErrorType\022\020\n\014UNKNOWN_TYPE\020\000\022\013\n\007TIM" + + "EOUT\020\001\022\033\n\027RESOURCE_LIMIT_EXCEEDED\020\0022F\n\020C" + + "ubeVisitService\0222\n\tvisitCube\022\021.CubeVisit" + + "Request\032\022.CubeVisitResponseB`\nEorg.apach" + + "e.kylin.storage.hbase.cube.v2.coprocesso" + + "r.endpoint.generatedB\017CubeVisitProtosH\001\210" + + "\001\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -5662,7 +5753,7 @@ public final class CubeVisitProtos { internal_static_CubeVisitResponse_Stats_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_CubeVisitResponse_Stats_descriptor, - new java.lang.String[] { "ServiceStartTime", "ServiceEndTime", "ScannedRowCount", "AggregatedRowCount", "SystemCpuLoad", "FreePhysicalMemorySize", "FreeSwapSpaceSize", "Hostname", "EtcMsg", "NormalComplete", "ScannedBytes", }); + new java.lang.String[] { "ServiceStartTime", "ServiceEndTime", "ScannedRowCount", "AggregatedRowCount", "SystemCpuLoad", "FreePhysicalMemorySize", "FreeSwapSpaceSize", "Hostname", "EtcMsg", "NormalComplete", "ScannedBytes", "FilteredRowCount", }); internal_static_CubeVisitResponse_ErrorInfo_descriptor = internal_static_CubeVisitResponse_descriptor.getNestedTypes().get(1); internal_static_CubeVisitResponse_ErrorInfo_fieldAccessorTable = new http://git-wip-us.apache.org/repos/asf/kylin/blob/5c8e4d8a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/protobuf/CubeVisit.proto ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/protobuf/CubeVisit.proto b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/protobuf/CubeVisit.proto index 8ca8756..40dbc68 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/protobuf/CubeVisit.proto +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/protobuf/CubeVisit.proto @@ -57,6 +57,7 @@ message CubeVisitResponse { optional string etcMsg = 9; optional int32 normalComplete =10; optional int64 scannedBytes = 11; + optional int64 filteredRowCount = 12; } enum ErrorType { UNKNOWN_TYPE = 0;