jihoonson commented on a change in pull request #11828:
URL: https://github.com/apache/druid/pull/11828#discussion_r761496254
##########
File path:
processing/src/main/java/org/apache/druid/query/context/ResponseContext.java
##########
@@ -129,214 +349,389 @@
*
* @see org.apache.druid.query.Query#getMostSpecificId
*/
- REMAINING_RESPONSES_FROM_QUERY_SERVERS(
+ public static final Key REMAINING_RESPONSES_FROM_QUERY_SERVERS = new
AbstractKey(
"remainingResponsesFromQueryServers",
- (totalRemainingPerId, idAndNumResponses) -> {
- final ConcurrentHashMap<String, Integer> map =
(ConcurrentHashMap<String, Integer>) totalRemainingPerId;
- final NonnullPair<String, Integer> pair = (NonnullPair<String,
Integer>) idAndNumResponses;
- map.compute(
- pair.lhs,
- (id, remaining) -> remaining == null ? pair.rhs : remaining
+ pair.rhs
- );
- return map;
- }
- ),
+ false, true,
+ Object.class)
+ {
+ @Override
+ @SuppressWarnings("unchecked")
+ public Object mergeValues(Object totalRemainingPerId, Object
idAndNumResponses)
+ {
+ final ConcurrentHashMap<String, Integer> map =
(ConcurrentHashMap<String, Integer>) totalRemainingPerId;
+ final NonnullPair<String, Integer> pair = (NonnullPair<String,
Integer>) idAndNumResponses;
+ map.compute(
+ pair.lhs,
+ (id, remaining) -> remaining == null ? pair.rhs : remaining +
pair.rhs);
+ return map;
+ }
+ };
+
/**
* Lists missing segments.
*/
- MISSING_SEGMENTS(
+ public static final Key MISSING_SEGMENTS = new AbstractKey(
"missingSegments",
- (oldValue, newValue) -> {
- final ArrayList<SegmentDescriptor> result = new
ArrayList<SegmentDescriptor>((List) oldValue);
- result.addAll((List) newValue);
- return result;
- }
- ),
+ true, true,
+ new TypeReference<List<SegmentDescriptor>>() {})
+ {
+ @Override
+ @SuppressWarnings("unchecked")
+ public Object mergeValues(Object oldValue, Object newValue)
+ {
+ final List<SegmentDescriptor> result = new
ArrayList<SegmentDescriptor>((List<SegmentDescriptor>) oldValue);
+ result.addAll((List<SegmentDescriptor>) newValue);
+ return result;
+ }
+ };
+
/**
* Entity tag. A part of HTTP cache validation mechanism.
* Is being removed from the context before sending and used as a separate
HTTP header.
*/
- ETAG("ETag"),
+ public static final Key ETAG = new StringKey("ETag", false, true);
+
/**
- * Query fail time (current time + timeout).
- * It is not updated continuously as {@link Key#TIMEOUT_AT}.
+ * Query total bytes gathered.
*/
- QUERY_FAIL_DEADLINE_MILLIS("queryFailTime"),
+ public static final Key QUERY_TOTAL_BYTES_GATHERED = new AbstractKey(
+ "queryTotalBytesGathered",
+ false, false,
+ new TypeReference<AtomicLong>() {})
+ {
+ @Override
+ public Object mergeValues(Object oldValue, Object newValue)
+ {
+ return ((AtomicLong) newValue).addAndGet(((AtomicLong)
newValue).get());
+ }
+ };
+
/**
- * Query total bytes gathered.
+ * Query fail time (current time + timeout).
+ * It is not updated continuously as {@link Keys#TIMEOUT_AT}.
*/
- QUERY_TOTAL_BYTES_GATHERED("queryTotalBytesGathered"),
+ public static final Key QUERY_FAIL_DEADLINE_MILLIS = new LongKey(
+ "queryFailTime",
+ false);
+
/**
* This variable indicates when a running query should be expired,
* and is effective only when 'timeout' of queryContext has a positive
value.
* Continuously updated by {@link
org.apache.druid.query.scan.ScanQueryEngine}
* by reducing its value on the time of every scan iteration.
*/
- TIMEOUT_AT("timeoutAt"),
+ public static final Key TIMEOUT_AT = new LongKey(
+ "timeoutAt",
+ false);
+
/**
- * The number of scanned rows.
- * For backward compatibility the context key name still equals to "count".
+ * The number of rows scanned by {@link
org.apache.druid.query.scan.ScanQueryEngine}.
+ *
+ * Named "count" for backwards compatibility with older data servers that
still send this, even though it's now
+ * marked as internal.
*/
- NUM_SCANNED_ROWS(
+ public static final Key NUM_SCANNED_ROWS = new CounterKey(
"count",
- (oldValue, newValue) -> ((Number) oldValue).longValue() +
((Number) newValue).longValue()
- ),
+ false);
+
/**
* The total CPU time for threads related to Sequence processing of the
query.
* Resulting value on a Broker is a sum of downstream values from
historicals / realtime nodes.
* For additional information see {@link
org.apache.druid.query.CPUTimeMetricQueryRunner}
*/
- CPU_CONSUMED_NANOS(
+ public static final Key CPU_CONSUMED_NANOS = new CounterKey(
"cpuConsumed",
- (oldValue, newValue) -> ((Number) oldValue).longValue() +
((Number) newValue).longValue()
- ),
+ false);
+
/**
* Indicates if a {@link ResponseContext} was truncated during
serialization.
*/
- TRUNCATED(
+ public static final Key TRUNCATED = new BooleanKey(
"truncated",
- (oldValue, newValue) -> (boolean) oldValue || (boolean) newValue
- );
+ false);
+
+ /**
+ * One and only global list of keys. This is a semi-constant: it is mutable
+ * at start-up time, but then is not thread-safe, and must remain unchanged
+ * for the duration of the server run.
+ */
+ public static final Keys INSTANCE = new Keys();
/**
* ConcurrentSkipListMap is used to have the natural ordering of its keys.
- * Thread-safe structure is required since there is no guarantee that
{@link #registerKey(BaseKey)}
+ * Thread-safe structure is required since there is no guarantee that
{@link #registerKey(Key)}
* would be called only from class static blocks.
*/
- private static final ConcurrentMap<String, BaseKey> REGISTERED_KEYS = new
ConcurrentSkipListMap<>();
+ private final ConcurrentMap<String, Key> registered_keys = new
ConcurrentSkipListMap<>();
Review comment:
```suggestion
private final ConcurrentMap<String, Key> registeredKeys = new
ConcurrentSkipListMap<>();
```
```suggestion
private final ConcurrentMap<String, Key> registered_keys = new
ConcurrentSkipListMap<>();
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]