This is an automated email from the ASF dual-hosted git repository.

snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new d684d5e  Adding the support for sampling logs (#3913)
d684d5e is described below

commit d684d5eb26e6a9b01a2db4505267967fe68d3924
Author: Seunghyun Lee <[email protected]>
AuthorDate: Thu Mar 7 18:24:35 2019 -0800

    Adding the support for sampling logs (#3913)
    
    * Adding the support for sampling logs
    For high throughput use cases, logging becomes the bottleneck
    on both server and broker. "Category.callAppenders()" is using
    a synchronized block that all the worker threads can be stalled
    due to logging. Long term solution should be migration towards
    log4j2. As a short term fix, this pr adds the configuration for
    sampling logs.
    
    * Change to use maxRate concept, added forceLog() to ensure some outliers 
to get logged
    
    * Addressed comments and added log for reporting dropped log
    
    * Added dropped log for server side as well
---
 .../requesthandler/BaseBrokerRequestHandler.java   | 80 ++++++++++++++++++----
 .../apache/pinot/common/utils/CommonConstants.java |  2 +
 .../core/query/scheduler/PriorityScheduler.java    |  8 ++-
 .../pinot/core/query/scheduler/QueryScheduler.java | 72 ++++++++++++++++---
 .../query/scheduler/fcfs/BoundedFCFSScheduler.java |  9 +--
 .../query/scheduler/fcfs/FCFSQueryScheduler.java   |  2 +-
 .../tokenbucket/TokenPriorityScheduler.java        |  8 +--
 .../query/scheduler/PrioritySchedulerTest.java     | 14 ++--
 .../request/ScheduledRequestHandlerTest.java       | 10 +--
 9 files changed, 159 insertions(+), 46 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index 47d478a..20d384e 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -20,12 +20,14 @@ package org.apache.pinot.broker.requesthandler;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.google.common.base.Splitter;
+import com.google.common.util.concurrent.RateLimiter;
 import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
@@ -82,6 +84,10 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
   protected final int _queryResponseLimit;
   protected final int _queryLogLength;
 
+  private final RateLimiter _queryLogRateLimiter;
+  private final RateLimiter _numDroppedLogRateLimiter;
+  private final AtomicInteger _numDroppedLog;
+
   public BaseBrokerRequestHandler(Configuration config, RoutingTable 
routingTable,
       TimeBoundaryService timeBoundaryService, AccessControlFactory 
accessControlFactory,
       TableQueryQuotaManager tableQueryQuotaManager, BrokerMetrics 
brokerMetrics) {
@@ -96,9 +102,15 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
     _brokerTimeoutMs = config.getLong(CONFIG_OF_BROKER_TIMEOUT_MS, 
DEFAULT_BROKER_TIMEOUT_MS);
     _queryResponseLimit = config.getInt(CONFIG_OF_BROKER_QUERY_RESPONSE_LIMIT, 
DEFAULT_BROKER_QUERY_RESPONSE_LIMIT);
     _queryLogLength = config.getInt(CONFIG_OF_BROKER_QUERY_LOG_LENGTH, 
DEFAULT_BROKER_QUERY_LOG_LENGTH);
+    _queryLogRateLimiter = RateLimiter.create(
+        config.getDouble(CONFIG_OF_BROKER_QUERY_LOG_MAX_RATE_PER_SECOND, 
DEFAULT_BROKER_QUERY_LOG_MAX_RATE_PER_SECOND));
+
+    _numDroppedLog = new AtomicInteger(0);
+    _numDroppedLogRateLimiter = RateLimiter.create(1.0);
 
-    LOGGER.info("Broker Id: {}, timeout: {}ms, query response limit: {}, query 
log length: {}", _brokerId,
-        _brokerTimeoutMs, _queryResponseLimit, _queryLogLength);
+    LOGGER.info(
+        "Broker Id: {}, timeout: {}ms, query response limit: {}, query log 
length: {}, query log max rate: {}qps",
+        _brokerId, _brokerTimeoutMs, _queryResponseLimit, _queryLogLength, 
_queryLogRateLimiter.getRate());
   }
 
   private String getDefaultBrokerId() {
@@ -110,6 +122,7 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
     }
   }
 
+  @SuppressWarnings("Duplicates")
   @Override
   public BrokerResponse handleRequest(JsonNode request, @Nullable 
RequesterIdentity requesterIdentity,
       RequestStatistics requestStatistics)
@@ -292,22 +305,61 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
 
     LOGGER.debug("Broker Response: {}", brokerResponse);
 
-    // Table name might have been changed (with suffix _OFFLINE/_REALTIME 
appended)
-    LOGGER.info(
-        "RequestId:{}, table:{}, timeMs:{}, docs:{}/{}, entries:{}/{}, 
segments(queried/processed/matched):{}/{}/{} "
-            + "servers:{}/{}, groupLimitReached:{}, exceptions:{}, 
serverStats:{}, query:{}", requestId,
-        brokerRequest.getQuerySource().getTableName(), totalTimeMs, 
brokerResponse.getNumDocsScanned(),
-        brokerResponse.getTotalDocs(), 
brokerResponse.getNumEntriesScannedInFilter(),
-        brokerResponse.getNumEntriesScannedPostFilter(), 
brokerResponse.getNumSegmentsQueried(),
-        brokerResponse.getNumSegmentsProcessed(), 
brokerResponse.getNumSegmentsMatched(),
-        brokerResponse.getNumServersResponded(), 
brokerResponse.getNumServersQueried(),
-        brokerResponse.isNumGroupsLimitReached(), 
brokerResponse.getExceptionsSize(), serverStats.getServerStats(),
-        StringUtils.substring(query, 0, _queryLogLength));
-
+    if(_queryLogRateLimiter.tryAcquire() || forceLog(brokerResponse, 
totalTimeMs)) {
+      // Table name might have been changed (with suffix _OFFLINE/_REALTIME 
appended)
+      LOGGER.info(
+          "RequestId:{}, table:{}, timeMs:{}, docs:{}/{}, entries:{}/{}, 
segments(queried/processed/matched):{}/{}/{} "
+              + "servers:{}/{}, groupLimitReached:{}, exceptions:{}, 
serverStats:{}, query:{}", requestId,
+          brokerRequest.getQuerySource().getTableName(), totalTimeMs, 
brokerResponse.getNumDocsScanned(),
+          brokerResponse.getTotalDocs(), 
brokerResponse.getNumEntriesScannedInFilter(),
+          brokerResponse.getNumEntriesScannedPostFilter(), 
brokerResponse.getNumSegmentsQueried(),
+          brokerResponse.getNumSegmentsProcessed(), 
brokerResponse.getNumSegmentsMatched(),
+          brokerResponse.getNumServersResponded(), 
brokerResponse.getNumServersQueried(),
+          brokerResponse.isNumGroupsLimitReached(), 
brokerResponse.getExceptionsSize(), serverStats.getServerStats(),
+          StringUtils.substring(query, 0, _queryLogLength));
+
+      // Limit the dropping log message at most once per second.
+      if (_numDroppedLogRateLimiter.tryAcquire()) {
+        // NOTE: the reported number may not be accurate since we will be 
missing some increments happened between
+        // get() and set().
+        int numDroppedLog = _numDroppedLog.get();
+        if (numDroppedLog > 0) {
+          LOGGER.info("{} logs were dropped. (log max rate per second: {})", 
numDroppedLog,
+              _queryLogRateLimiter.getRate());
+          _numDroppedLog.set(0);
+        }
+      }
+    } else {
+      // Increment the count for dropped log
+      _numDroppedLog.incrementAndGet();
+    }
     return brokerResponse;
   }
 
   /**
+   * Helper function to decide whether to force the log
+   *
+   * TODO: come up with other criteria for forcing a log and come up with 
better numbers
+   *
+   */
+  private boolean forceLog(BrokerResponse brokerResponse, long totalTimeMs) {
+    if (brokerResponse.isNumGroupsLimitReached()) {
+      return true;
+    }
+
+    if (brokerResponse.getExceptionsSize() > 0) {
+      return true;
+    }
+
+    // If response time is more than 1 sec, force the log
+    if (totalTimeMs > 1000L) {
+      return true;
+    }
+
+    return false;
+  }
+
+  /**
    * Broker side validation on the broker request.
    * <p>Throw RuntimeException if query does not pass validation.
    * <p>Current validations are:
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
index 0e6f5a5..b76f2b8 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
@@ -111,6 +111,8 @@ public class CommonConstants {
     public static final int DEFAULT_BROKER_QUERY_RESPONSE_LIMIT = 
Integer.MAX_VALUE;
     public static final String CONFIG_OF_BROKER_QUERY_LOG_LENGTH = 
"pinot.broker.query.log.length";
     public static final int DEFAULT_BROKER_QUERY_LOG_LENGTH = 
Integer.MAX_VALUE;
+    public static final String CONFIG_OF_BROKER_QUERY_LOG_MAX_RATE_PER_SECOND 
= "pinot.broker.query.log.maxRatePerSecond";
+    public static final double DEFAULT_BROKER_QUERY_LOG_MAX_RATE_PER_SECOND = 
10_000d;
     public static final String CONFIG_OF_BROKER_TIMEOUT_MS = 
"pinot.broker.timeoutMs";
     public static final long DEFAULT_BROKER_TIMEOUT_MS = 10_000L;
     public static final String CONFIG_OF_BROKER_ID = "pinot.broker.id";
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/PriorityScheduler.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/PriorityScheduler.java
index afb2831..5f968b8 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/PriorityScheduler.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/PriorityScheduler.java
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.LongAccumulator;
 import javax.annotation.Nonnull;
+import org.apache.commons.configuration.Configuration;
 import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.metrics.ServerMeter;
 import org.apache.pinot.common.metrics.ServerMetrics;
@@ -53,9 +54,10 @@ public abstract class PriorityScheduler extends 
QueryScheduler {
   @VisibleForTesting
   Thread scheduler;
 
-  public PriorityScheduler(@Nonnull ResourceManager resourceManager, @Nonnull 
QueryExecutor queryExecutor,
-      @Nonnull SchedulerPriorityQueue queue, @Nonnull ServerMetrics metrics, 
@Nonnull LongAccumulator latestQueryTime) {
-    super(queryExecutor, resourceManager, metrics, latestQueryTime);
+  public PriorityScheduler(@Nonnull Configuration config, @Nonnull 
ResourceManager resourceManager,
+      @Nonnull QueryExecutor queryExecutor, @Nonnull SchedulerPriorityQueue 
queue, @Nonnull ServerMetrics metrics,
+      @Nonnull LongAccumulator latestQueryTime) {
+    super(config, queryExecutor, resourceManager, metrics, latestQueryTime);
     Preconditions.checkNotNull(queue);
     this.queryQueue = queue;
     this.numRunners = resourceManager.getNumQueryRunnerThreads();
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
index 82d2b5d..627cda7 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
@@ -22,11 +22,14 @@ import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListenableFutureTask;
+import com.google.common.util.concurrent.RateLimiter;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.LongAccumulator;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
+import org.apache.commons.configuration.Configuration;
 import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.metrics.ServerMeter;
 import org.apache.pinot.common.metrics.ServerMetrics;
@@ -48,8 +51,15 @@ import org.slf4j.LoggerFactory;
  */
 public abstract class QueryScheduler {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(QueryScheduler.class);
+
   private static final String INVALID_NUM_SCANNED = "-1";
   private static final String INVALID_SEGMENTS_COUNT = "-1";
+  private static final String QUERY_LOG_MAX_RATE_KEY = 
"query.log.maxRatePerSecond";
+  private static final double DEFAULT_QUERY_LOG_MAX_RATE = 10_000d;
+
+  private final RateLimiter queryLogRateLimiter;
+  private final RateLimiter numDroppedLogRateLimiter;
+  private final AtomicInteger numDroppedLogCounter;
 
   protected final ServerMetrics serverMetrics;
   protected final QueryExecutor queryExecutor;
@@ -63,8 +73,10 @@ public abstract class QueryScheduler {
    * @param resourceManager for managing server thread resources
    * @param serverMetrics server metrics collector
    */
-  public QueryScheduler(@Nonnull QueryExecutor queryExecutor, @Nonnull 
ResourceManager resourceManager,
-      @Nonnull ServerMetrics serverMetrics, @Nonnull LongAccumulator 
latestQueryTime) {
+  public QueryScheduler(@Nonnull Configuration config, @Nonnull QueryExecutor 
queryExecutor,
+      @Nonnull ResourceManager resourceManager, @Nonnull ServerMetrics 
serverMetrics,
+      @Nonnull LongAccumulator latestQueryTime) {
+    Preconditions.checkNotNull(config);
     Preconditions.checkNotNull(queryExecutor);
     Preconditions.checkNotNull(resourceManager);
     Preconditions.checkNotNull(serverMetrics);
@@ -73,6 +85,11 @@ public abstract class QueryScheduler {
     this.resourceManager = resourceManager;
     this.queryExecutor = queryExecutor;
     this.latestQueryTime = latestQueryTime;
+    this.queryLogRateLimiter = 
RateLimiter.create(config.getDouble(QUERY_LOG_MAX_RATE_KEY, 
DEFAULT_QUERY_LOG_MAX_RATE));
+    this.numDroppedLogRateLimiter = RateLimiter.create(1.0d);
+    this.numDroppedLogCounter = new AtomicInteger(0);
+
+    LOGGER.info("Query log max rate: {}", queryLogRateLimiter.getRate());
   }
 
   /**
@@ -122,6 +139,7 @@ public abstract class QueryScheduler {
    * @param executorService Executor service to use for parallelizing query 
processing
    * @return serialized query response
    */
+  @SuppressWarnings("Duplicates")
   @Nullable
   protected byte[] processQueryAndSerialize(@Nonnull ServerQueryRequest 
queryRequest,
       @Nonnull ExecutorService executorService) {
@@ -170,13 +188,30 @@ public abstract class QueryScheduler {
 
     TimerContext timerContext = queryRequest.getTimerContext();
     int numSegmentsQueried = queryRequest.getSegmentsToQuery().size();
-    LOGGER.info(
-        "Processed 
requestId={},table={},segments(queried/processed/matched)={}/{}/{},schedulerWaitMs={},totalExecMs={},totalTimeMs={},broker={},numDocsScanned={},scanInFilter={},scanPostFilter={},sched={}",
-        requestId, tableNameWithType, numSegmentsQueried, 
numSegmentsProcessed, numSegmentsMatched,
-        timerContext.getPhaseDurationMs(ServerQueryPhase.SCHEDULER_WAIT),
-        timerContext.getPhaseDurationMs(ServerQueryPhase.QUERY_PROCESSING),
-        timerContext.getPhaseDurationMs(ServerQueryPhase.TOTAL_QUERY_TIME), 
queryRequest.getBrokerId(), numDocsScanned,
-        numEntriesScannedInFilter, numEntriesScannedPostFilter, name());
+    long schedulerWaitMs = 
timerContext.getPhaseDurationMs(ServerQueryPhase.SCHEDULER_WAIT);
+
+    if (queryLogRateLimiter.tryAcquire() || forceLog(schedulerWaitMs, 
numDocsScanned)) {
+      LOGGER.info(
+          "Processed 
requestId={},table={},segments(queried/processed/matched)={}/{}/{},schedulerWaitMs={},totalExecMs={},totalTimeMs={},broker={},numDocsScanned={},scanInFilter={},scanPostFilter={},sched={}",
+          requestId, tableNameWithType, numSegmentsQueried, 
numSegmentsProcessed, numSegmentsMatched, schedulerWaitMs,
+          timerContext.getPhaseDurationMs(ServerQueryPhase.QUERY_PROCESSING),
+          timerContext.getPhaseDurationMs(ServerQueryPhase.TOTAL_QUERY_TIME), 
queryRequest.getBrokerId(),
+          numDocsScanned, numEntriesScannedInFilter, 
numEntriesScannedPostFilter, name());
+
+      // Limit the dropping log message at most once per second.
+      if (numDroppedLogRateLimiter.tryAcquire()) {
+        // NOTE: the reported number may not be accurate since we will be 
missing some increments happened between
+        // get() and set().
+        int numDroppedLog = numDroppedLogCounter.get();
+        if (numDroppedLog > 0) {
+          LOGGER.info("{} logs were dropped. (log max rate per second: {})", 
numDroppedLog,
+              queryLogRateLimiter.getRate());
+          numDroppedLogCounter.set(0);
+        }
+      }
+    } else {
+      numDroppedLogCounter.incrementAndGet();
+    }
 
     serverMetrics.addMeteredTableValue(tableNameWithType, 
ServerMeter.NUM_SEGMENTS_QUERIED, numSegmentsQueried);
     serverMetrics.addMeteredTableValue(tableNameWithType, 
ServerMeter.NUM_SEGMENTS_PROCESSED, numSegmentsProcessed);
@@ -186,6 +221,25 @@ public abstract class QueryScheduler {
   }
 
   /**
+   * Helper function to decide whether to force the log
+   *
+   * TODO: come up with other criteria for forcing a log and come up with 
better numbers
+   *
+   */
+  private boolean forceLog(long schedulerWaitMs, long numDocsScanned) {
+    // If scheduler wait time is larger than 100ms, force the log
+    if (schedulerWaitMs > 100L) {
+      return true;
+    }
+
+    // If the number of document scanned is larger than 1 million rows, force 
the log
+    if (numDocsScanned > 1_000_000L) {
+      return true;
+    }
+    return false;
+  }
+
+  /**
    * Serialize the DataTable response for query request
    * @param queryRequest Server query request for which response is serialized
    * @param dataTable DataTable to serialize
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/fcfs/BoundedFCFSScheduler.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/fcfs/BoundedFCFSScheduler.java
index 8a425b1..301fb06 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/fcfs/BoundedFCFSScheduler.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/fcfs/BoundedFCFSScheduler.java
@@ -53,12 +53,13 @@ public class BoundedFCFSScheduler extends PriorityScheduler 
{
       }
     };
     MultiLevelPriorityQueue queue = new MultiLevelPriorityQueue(config, rm, 
groupFactory, new TableBasedGroupMapper());
-    return new BoundedFCFSScheduler(rm, queryExecutor, queue, serverMetrics, 
latestQueryTime);
+    return new BoundedFCFSScheduler(config, rm, queryExecutor, queue, 
serverMetrics, latestQueryTime);
   }
 
-  private BoundedFCFSScheduler(@Nonnull ResourceManager resourceManager, 
@Nonnull QueryExecutor queryExecutor,
-      @Nonnull SchedulerPriorityQueue queue, @Nonnull ServerMetrics metrics, 
@Nonnull LongAccumulator latestQueryTime) {
-    super(resourceManager, queryExecutor, queue, metrics, latestQueryTime);
+  private BoundedFCFSScheduler(@Nonnull Configuration config, @Nonnull 
ResourceManager resourceManager,
+      @Nonnull QueryExecutor queryExecutor, @Nonnull SchedulerPriorityQueue 
queue, @Nonnull ServerMetrics metrics,
+      @Nonnull LongAccumulator latestQueryTime) {
+    super(config, resourceManager, queryExecutor, queue, metrics, 
latestQueryTime);
   }
 
   @Override
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/fcfs/FCFSQueryScheduler.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/fcfs/FCFSQueryScheduler.java
index 95004c2..92016d6 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/fcfs/FCFSQueryScheduler.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/fcfs/FCFSQueryScheduler.java
@@ -42,7 +42,7 @@ public class FCFSQueryScheduler extends QueryScheduler {
 
   public FCFSQueryScheduler(@Nonnull Configuration config, @Nonnull 
QueryExecutor queryExecutor,
       @Nonnull ServerMetrics serverMetrics, @Nonnull LongAccumulator 
latestQueryTime) {
-    super(queryExecutor, new UnboundedResourceManager(config), serverMetrics, 
latestQueryTime);
+    super(config, queryExecutor, new UnboundedResourceManager(config), 
serverMetrics, latestQueryTime);
   }
 
   @Nonnull
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/tokenbucket/TokenPriorityScheduler.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/tokenbucket/TokenPriorityScheduler.java
index fdc4670..7a28bf2 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/tokenbucket/TokenPriorityScheduler.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/tokenbucket/TokenPriorityScheduler.java
@@ -59,13 +59,13 @@ public class TokenPriorityScheduler extends 
PriorityScheduler {
     };
 
     MultiLevelPriorityQueue queue = new MultiLevelPriorityQueue(config, rm, 
groupFactory, new TableBasedGroupMapper());
-    return new TokenPriorityScheduler(rm, queryExecutor, queue, metrics, 
latestQueryTime);
+    return new TokenPriorityScheduler(config, rm, queryExecutor, queue, 
metrics, latestQueryTime);
   }
 
-  private TokenPriorityScheduler(@Nonnull ResourceManager resourceManager, 
@Nonnull QueryExecutor queryExecutor,
-      @Nonnull MultiLevelPriorityQueue queue, @Nonnull ServerMetrics metrics,
+  private TokenPriorityScheduler(@Nonnull Configuration config, @Nonnull 
ResourceManager resourceManager,
+      @Nonnull QueryExecutor queryExecutor, @Nonnull MultiLevelPriorityQueue 
queue, @Nonnull ServerMetrics metrics,
       @Nonnull LongAccumulator latestQueryTime) {
-    super(resourceManager, queryExecutor, queue, metrics, latestQueryTime);
+    super(config, resourceManager, queryExecutor, queue, metrics, 
latestQueryTime);
   }
 
   @Override
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/PrioritySchedulerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/PrioritySchedulerTest.java
index 1bdb81b..06611c2 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/PrioritySchedulerTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/PrioritySchedulerTest.java
@@ -231,13 +231,13 @@ public class PrioritySchedulerTest {
     static TestSchedulerGroupFactory groupFactory;
     static LongAccumulator latestQueryTime;
 
-    public static TestPriorityScheduler create(Configuration conf) {
-      ResourceManager rm = new PolicyBasedResourceManager(conf);
+    public static TestPriorityScheduler create(Configuration config) {
+      ResourceManager rm = new PolicyBasedResourceManager(config);
       QueryExecutor qe = new TestQueryExecutor();
       groupFactory = new TestSchedulerGroupFactory();
-      MultiLevelPriorityQueue queue = new MultiLevelPriorityQueue(conf, rm, 
groupFactory, new TableBasedGroupMapper());
+      MultiLevelPriorityQueue queue = new MultiLevelPriorityQueue(config, rm, 
groupFactory, new TableBasedGroupMapper());
       latestQueryTime = new LongAccumulator(Long::max, 0);
-      return new TestPriorityScheduler(rm, qe, queue, metrics, 
latestQueryTime);
+      return new TestPriorityScheduler(config, rm, qe, queue, metrics, 
latestQueryTime);
     }
 
     public static TestPriorityScheduler create() {
@@ -246,10 +246,10 @@ public class PrioritySchedulerTest {
     }
 
     // store locally for easy access
-    public TestPriorityScheduler(@Nonnull ResourceManager resourceManager, 
@Nonnull QueryExecutor queryExecutor,
-        @Nonnull SchedulerPriorityQueue queue, @Nonnull ServerMetrics metrics,
+    public TestPriorityScheduler(@Nonnull Configuration config, @Nonnull 
ResourceManager resourceManager,
+        @Nonnull QueryExecutor queryExecutor, @Nonnull SchedulerPriorityQueue 
queue, @Nonnull ServerMetrics metrics,
         @Nonnull LongAccumulator latestQueryTime) {
-      super(resourceManager, queryExecutor, queue, metrics, latestQueryTime);
+      super(config, resourceManager, queryExecutor, queue, metrics, 
latestQueryTime);
     }
 
     ResourceManager getResourceManager() {
diff --git 
a/pinot-server/src/test/java/org/apache/pinot/server/request/ScheduledRequestHandlerTest.java
 
b/pinot-server/src/test/java/org/apache/pinot/server/request/ScheduledRequestHandlerTest.java
index 48578f5..00db1bd 100644
--- 
a/pinot-server/src/test/java/org/apache/pinot/server/request/ScheduledRequestHandlerTest.java
+++ 
b/pinot-server/src/test/java/org/apache/pinot/server/request/ScheduledRequestHandlerTest.java
@@ -34,6 +34,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.LongAccumulator;
 import javax.annotation.Nonnull;
+import org.apache.commons.configuration.Configuration;
 import org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.metrics.ServerMetrics;
@@ -65,6 +66,7 @@ import static org.mockito.Mockito.when;
 public class ScheduledRequestHandlerTest {
   private static final BrokerRequest DUMMY_BROKER_REQUEST =
       new Pql2Compiler().compileToBrokerRequest("SELECT * FROM 
myTable_OFFLINE");
+  private static final Configuration DEFAULT_SCHEDULER_CONFIG = new 
PropertiesConfiguration();
 
   private ServerMetrics serverMetrics;
   private ChannelHandlerContext channelHandlerContext;
@@ -119,8 +121,8 @@ public class ScheduledRequestHandlerTest {
   @Test
   public void testQueryProcessingException()
       throws Exception {
-    ScheduledRequestHandler handler =
-        new ScheduledRequestHandler(new QueryScheduler(queryExecutor, 
resourceManager, serverMetrics, latestQueryTime) {
+    ScheduledRequestHandler handler = new ScheduledRequestHandler(
+        new QueryScheduler(DEFAULT_SCHEDULER_CONFIG, queryExecutor, 
resourceManager, serverMetrics, latestQueryTime) {
           @Nonnull
           @Override
           public ListenableFuture<byte[]> submit(@Nonnull ServerQueryRequest 
queryRequest) {
@@ -159,8 +161,8 @@ public class ScheduledRequestHandlerTest {
   @Test
   public void testValidQueryResponse()
       throws InterruptedException, ExecutionException, TimeoutException, 
IOException {
-    ScheduledRequestHandler handler =
-        new ScheduledRequestHandler(new QueryScheduler(queryExecutor, 
resourceManager, serverMetrics, latestQueryTime) {
+    ScheduledRequestHandler handler = new ScheduledRequestHandler(
+        new QueryScheduler(DEFAULT_SCHEDULER_CONFIG, queryExecutor, 
resourceManager, serverMetrics, latestQueryTime) {
           @Nonnull
           @Override
           public ListenableFuture<byte[]> submit(@Nonnull ServerQueryRequest 
queryRequest) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to