gortiz commented on code in PR #12704:
URL: https://github.com/apache/pinot/pull/12704#discussion_r1582764851


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java:
##########
@@ -419,4 +550,122 @@ public void send(BaseResultsBlock block)
       addResultsBlock(block);
     }
   }
+
+  public enum StatKey implements StatMap.Key {
+    TABLE(StatMap.Type.STRING),
+    EXECUTION_TIME_MS(StatMap.Type.LONG, null, 
DataTable.MetadataKey.TIME_USED_MS) {
+      @Override
+      public boolean includeDefaultInJson() {
+        return true;
+      }
+    },
+    EMITTED_ROWS(StatMap.Type.LONG, null, DataTable.MetadataKey.NUM_ROWS) {
+      @Override
+      public boolean includeDefaultInJson() {
+        return true;
+      }
+    },
+    NUM_DOCS_SCANNED(StatMap.Type.LONG),
+    NUM_ENTRIES_SCANNED_IN_FILTER(StatMap.Type.LONG),
+    NUM_ENTRIES_SCANNED_POST_FILTER(StatMap.Type.LONG),
+    NUM_SEGMENTS_QUERIED(StatMap.Type.INT),
+    NUM_SEGMENTS_PROCESSED(StatMap.Type.INT),
+    NUM_SEGMENTS_MATCHED(StatMap.Type.INT),
+    NUM_CONSUMING_SEGMENTS_QUERIED(StatMap.Type.INT),
+    // the timestamp indicating the freshness of the data queried in consuming 
segments.
+    // This can be ingestion timestamp if provided by the stream, or the last 
index time
+    MIN_CONSUMING_FRESHNESS_TIME_MS(StatMap.Type.LONG) {
+      @Override
+      public long merge(long value1, long value2) {
+        return StatMap.Key.minPositive(value1, value2);
+      }
+    },
+    TOTAL_DOCS(StatMap.Type.LONG),
+    NUM_GROUPS_LIMIT_REACHED(StatMap.Type.BOOLEAN),
+    //TRACE_INFO(StatMap.Type.STRING),
+    //REQUEST_ID(StatMap.Type.LONG),
+    NUM_RESIZES(StatMap.Type.INT),
+    RESIZE_TIME_MS(StatMap.Type.LONG),
+    THREAD_CPU_TIME_NS(StatMap.Type.LONG),
+    SYSTEM_ACTIVITIES_CPU_TIME_NS(StatMap.Type.LONG),
+    RESPONSE_SER_CPU_TIME_NS(StatMap.Type.LONG, 
"responseSerializationCpuTimeNs"),
+    NUM_SEGMENTS_PRUNED_BY_SERVER(StatMap.Type.INT),
+    NUM_SEGMENTS_PRUNED_INVALID(StatMap.Type.INT),
+    NUM_SEGMENTS_PRUNED_BY_LIMIT(StatMap.Type.INT),
+    NUM_SEGMENTS_PRUNED_BY_VALUE(StatMap.Type.INT),
+    //EXPLAIN_PLAN_NUM_EMPTY_FILTER_SEGMENTS(StatMap.Type.INT),
+    //EXPLAIN_PLAN_NUM_MATCH_ALL_FILTER_SEGMENTS(StatMap.Type.INT),
+    NUM_CONSUMING_SEGMENTS_PROCESSED(StatMap.Type.INT),
+    NUM_CONSUMING_SEGMENTS_MATCHED(StatMap.Type.INT),
+    NUM_BLOCKS(StatMap.Type.INT),
+    OPERATOR_EXECUTION_TIME_MS(StatMap.Type.LONG),
+    OPERATOR_ID(StatMap.Type.STRING),
+    OPERATOR_EXEC_START_TIME_MS(StatMap.Type.LONG) {
+      @Override
+      public long merge(long value1, long value2) {
+        return StatMap.Key.minPositive(value1, value2);
+      }
+    },
+    OPERATOR_EXEC_END_TIME_MS(StatMap.Type.LONG) {
+      @Override
+      public long merge(long value1, long value2) {
+        return Math.max(value1, value2);
+      }
+    },;
+    private final StatMap.Type _type;
+    @Nullable
+    private final DataTable.MetadataKey _v1Key;
+    private final String _statName;
+
+    StatKey(StatMap.Type type) {
+      this(type, null);
+    }
+
+    StatKey(StatMap.Type type, @Nullable String statName) {
+      _type = type;
+      _statName = statName == null ? StatMap.getDefaultStatName(this) : 
statName;
+      _v1Key = DataTable.MetadataKey.getByName(getStatName());
+    }
+
+    StatKey(StatMap.Type type, @Nullable String statName, @Nullable 
DataTable.MetadataKey v1Key) {
+      _type = type;
+      _statName = statName == null ? StatMap.getDefaultStatName(this) : 
statName;
+      _v1Key = v1Key == null ? DataTable.MetadataKey.getByName(getStatName()) 
: v1Key;
+    }
+
+    @Override
+    public String getStatName() {
+      return _statName;
+    }
+
+    @Override
+    public StatMap.Type getType() {
+      return _type;
+    }
+
+    public void updateV1Metadata(StatMap<DataTable.MetadataKey> oldMetadata, 
StatMap<StatKey> stats) {

Review Comment:
   The stats returned by `BrokerResponseNative` are based on the stats in 
`MetadataKey` and extend them with some extra stats not present in there. But 
instead of using the map from `MetadataKey` to values we already have, 
`BrokerResponseNative` stores all of them in specific attributes, which makes 
`BrokerResponseNative` a bit difficult to understand, specially to understand 
which attributes are unique to `BrokerResponseNative` and which are just a copy 
of some `MetadataKey`.
   
   In master, `BrokerResponseNativeV2` extends `BrokerResponseNative`, so it 
inherits the same pattern. This PR is improving the situation by storing a 
`StatMap` with the `MetadataKey` values. Now it is easy to see which values are 
just `MetadataKey` and which ones are actual new concepts added in the broker.
   
   > If we remove the mapping from v2 to v1, can we make v1 stats not 
implementing StatMap.Key?
   
   This is not the first time you ask about that. My answer is: Why is that a 
target? What is the issue of `MetadataKey` implementing `StatMap.Key`? To me it 
looks like an improvement. Now, for example, `MetadataKey`s can define how are 
they merged in a nice way. `StatMap` doesn't have to be limited to store the V2 
stats but any stat like feature in a efficient way.
   
   > Does it work if we do not fill the v1 stats, but directly use v2 stats to 
fill the broker response? 
   
   There is no V2 stat that can assume this role. Because `BrokerResponse` 
stats are showing an aggregated view of the query stats, while V2 stats are 
operation base. For example, in a query we may have 2 different aggregations. 
One of them may have reached its num groups limit and the other may not.
   
   In that case, there would be a single value for 
`BrokerResponse.isNumGroupsLimitReached`, which will be true. But from the V2 
stats point of view there are two instances of a 
`Map<LeafStageTransferableBlockOperator.StatKey>` (or 
`Map<AggregateOperator.StatKey>`). One of them will say that the limit has 
being reached and the other will say the opposite. In order to generate the 
actual `BrokerResponse` we need to merge both.
   
   Then we would need to create a new StatKey for BrokerResponse and that new 
enum would end up being almost exactly equal to `MetadataKey`.
   
   Could we implement `BrokerNativeResponseV2` without using a `StatMap`? Yes, 
we could, but then we would need to add a lot of logic. We already have that in 
`ExecutionStatsAggregator.setStats` for V1 and we already had that for V2. Now 
instead of having at least two almost identical large method merging the stats, 
we have that merge logic defined in the stats, so it is more difficult to mess 
it up. These large methods are problematic because they are are mostly adding 
stats, but sometimes the merge logic is different and we need to be aware of 
that each time we merge the stats. With `StatKey` we end up having smaller 
methods which call `StatKey.merge` which is where the actual logic is defined.
   
   > The overall goal here is to decouple the v2 and v1 handling so that in the 
future we can remove v1 easier if necessary.
   
   This PR doesn't make more difficult to do so, in fact it makes it easier. 
Apart from collecting stats in V1, the main blocker to remove `MetadataKey` 
(the V1 stats) is that `BrokerResponse` and `MetadataKey` are very coupled.
   
   The idea of mapping V2 stats into V1 is there just to make it easier to 
return them in `BrokerResponse`. If in the future we want to remove 
`MetadataKey`, we can just create a new class that implements `StatKey` and map 
from V2 to that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to