This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 397874e548 Add Debug APIs that return Thread and Query Resource Usage
(#13583)
397874e548 is described below
commit 397874e548a8ba06ff95a0777c10909393f400c2
Author: Rajat Venkatesh <[email protected]>
AuthorDate: Mon Jul 22 19:43:26 2024 +0530
Add Debug APIs that return Thread and Query Resource Usage (#13583)
* Checkpoint
* Checkpoint
* Add javadocs for new interfaces.
* Code linting and java docs.
* Add two quick starts for resource tracking.
* Add a javadoc
* Dont serialize thread info
* Add auth to query debug resource
---
.../broker/api/resources/PinotBrokerDebug.java | 26 ++++++
.../CPUMemThreadLevelAccountingObjects.java | 23 +++++-
.../PerQueryCPUMemAccountantFactory.java | 85 ++++++++++++++++++--
.../java/org/apache/pinot/core/auth/Actions.java | 1 +
.../pinot/server/api/resources/DebugResource.java | 26 ++++++
.../pinot/spi/accounting/QueryResourceTracker.java | 46 +++++++++++
.../spi/accounting/ThreadResourceTracker.java | 52 ++++++++++++
.../accounting/ThreadResourceUsageAccountant.java | 14 ++++
.../java/org/apache/pinot/spi/trace/Tracing.java | 15 ++++
.../tools/MultiStageResourceTrackerQuickStart.java | 56 +++++++++++++
.../pinot/tools/MultistageEngineQuickStart.java | 24 +++---
.../SingleStageResourceTrackingQuickStart.java | 93 ++++++++++++++++++++++
.../apache/pinot/tools/utils/PinotConfigUtils.java | 22 +++++
.../apache/pinot/tools/utils/SampleQueries.java | 42 ++++++++++
14 files changed, 503 insertions(+), 22 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerDebug.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerDebug.java
index 11e35eb6f7..8311b5de87 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerDebug.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerDebug.java
@@ -27,6 +27,7 @@ import io.swagger.annotations.ApiResponses;
import io.swagger.annotations.Authorization;
import io.swagger.annotations.SecurityDefinition;
import io.swagger.annotations.SwaggerDefinition;
+import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -57,7 +58,11 @@ import org.apache.pinot.core.routing.RoutingTable;
import org.apache.pinot.core.routing.TimeBoundaryInfo;
import org.apache.pinot.core.transport.ServerInstance;
import
org.apache.pinot.core.transport.server.routing.stats.ServerRoutingStatsManager;
+import org.apache.pinot.spi.accounting.QueryResourceTracker;
+import org.apache.pinot.spi.accounting.ThreadResourceTracker;
+import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.trace.Tracing;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.sql.parsers.CalciteSqlCompiler;
@@ -269,4 +274,25 @@ public class PinotBrokerDebug {
private long getRequestId() {
return _requestIdGenerator.getAndIncrement();
}
+
+ @GET
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("/debug/threads/resourceUsage")
+ @Authorize(targetType = TargetType.CLUSTER, action =
Actions.Cluster.DEBUG_RESOURCE_USAGE)
+ @ApiOperation(value = "Get resource usage of threads")
+ public Collection<? extends ThreadResourceTracker> getThreadResourceUsage() {
+ ThreadResourceUsageAccountant threadAccountant =
Tracing.getThreadAccountant();
+ return threadAccountant.getThreadResources();
+ }
+
+ @GET
+ @Path("debug/queries/resourceUsage")
+ @Produces(MediaType.APPLICATION_JSON)
+ @Authorize(targetType = TargetType.CLUSTER, action =
Actions.Cluster.DEBUG_RESOURCE_USAGE)
+ @ApiOperation(value = "Get current resource usage of queries in this
service", notes = "This is a debug endpoint, "
+ + "and won't maintain backward compatibility")
+ public Collection<? extends QueryResourceTracker> getQueryUsage() {
+ ThreadResourceUsageAccountant threadAccountant =
Tracing.getThreadAccountant();
+ return threadAccountant.getQueryResources().values();
+ }
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/CPUMemThreadLevelAccountingObjects.java
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/CPUMemThreadLevelAccountingObjects.java
index 431643942a..6a375b95bb 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/CPUMemThreadLevelAccountingObjects.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/CPUMemThreadLevelAccountingObjects.java
@@ -18,10 +18,12 @@
*/
package org.apache.pinot.core.accounting;
+import com.fasterxml.jackson.annotation.JsonIgnore;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.pinot.spi.accounting.ThreadExecutionContext;
+import org.apache.pinot.spi.accounting.ThreadResourceTracker;
import org.apache.pinot.spi.utils.CommonConstants;
@@ -34,7 +36,7 @@ public class CPUMemThreadLevelAccountingObjects {
* Entry to track the task execution status and usage stats of a Thread
* (including but not limited to server worker thread, runner thread, broker
jetty thread, or broker netty thread)
*/
- public static class ThreadEntry {
+ public static class ThreadEntry implements ThreadResourceTracker {
// current query_id, task_id of the thread; this field is accessed by the
thread itself and the accountant
AtomicReference<TaskEntry> _currentThreadTaskStatus = new
AtomicReference<>();
// current sample of thread memory usage/cputime ; this field is accessed
by the thread itself and the accountant
@@ -77,11 +79,30 @@ public class CPUMemThreadLevelAccountingObjects {
*
* @return the current query id on the thread, {@code null} if idle
*/
+ @JsonIgnore
@Nullable
public TaskEntry getCurrentThreadTaskStatus() {
return _currentThreadTaskStatus.get();
}
+ public long getCPUTimeMS() {
+ return _currentThreadCPUTimeSampleMS;
+ }
+
+ public long getAllocatedBytes() {
+ return _currentThreadMemoryAllocationSampleBytes;
+ }
+
+ public String getQueryId() {
+ TaskEntry taskEntry = _currentThreadTaskStatus.get();
+ return taskEntry == null ? "" : taskEntry.getQueryId();
+ }
+
+ public int getTaskId() {
+ TaskEntry taskEntry = _currentThreadTaskStatus.get();
+ return taskEntry == null ? -1 : taskEntry.getTaskId();
+ }
+
public void setThreadTaskStatus(@Nonnull String queryId, int taskId,
@Nonnull Thread anchorThread) {
_currentThreadTaskStatus.set(new TaskEntry(queryId, taskId,
anchorThread));
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
index 598b68b344..34e496d3bf 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
@@ -18,8 +18,10 @@
*/
package org.apache.pinot.core.accounting;
+import com.fasterxml.jackson.annotation.JsonIgnore;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean;
+import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
@@ -38,8 +40,10 @@ import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.metrics.ServerGauge;
import org.apache.pinot.common.metrics.ServerMeter;
import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.spi.accounting.QueryResourceTracker;
import org.apache.pinot.spi.accounting.ThreadAccountantFactory;
import org.apache.pinot.spi.accounting.ThreadExecutionContext;
+import org.apache.pinot.spi.accounting.ThreadResourceTracker;
import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
import org.apache.pinot.spi.config.instance.InstanceType;
@@ -162,6 +166,65 @@ public class PerQueryCPUMemAccountantFactory implements
ThreadAccountantFactory
_watcherTask = new WatcherTask();
}
+ @Override
+ public Collection<? extends ThreadResourceTracker> getThreadResources() {
+ return _threadEntriesMap.values();
+ }
+
+ /**
+ * This function aggregates resource usage from all active threads and
groups by queryId.
+ * It is inspired by {@link
PerQueryCPUMemResourceUsageAccountant::aggregate}. The major difference is that
+ * it only reads from thread entries and does not update them.
+ * @return A map of query id, QueryResourceTracker.
+ */
+ @Override
+ public Map<String, ? extends QueryResourceTracker> getQueryResources() {
+ HashMap<String, AggregatedStats> ret = new HashMap<>();
+
+ // for each {pqr, pqw}
+ for (Map.Entry<Thread, CPUMemThreadLevelAccountingObjects.ThreadEntry>
entry : _threadEntriesMap.entrySet()) {
+ // sample current usage
+ CPUMemThreadLevelAccountingObjects.ThreadEntry threadEntry =
entry.getValue();
+ long currentCPUSample = _isThreadCPUSamplingEnabled ?
threadEntry._currentThreadCPUTimeSampleMS : 0;
+ long currentMemSample =
+ _isThreadMemorySamplingEnabled ?
threadEntry._currentThreadMemoryAllocationSampleBytes : 0;
+ // sample current running task status
+ CPUMemThreadLevelAccountingObjects.TaskEntry currentTaskStatus =
threadEntry.getCurrentThreadTaskStatus();
+ Thread thread = entry.getKey();
+ LOGGER.trace("tid: {}, task: {}", thread.getId(), currentTaskStatus);
+
+ // if current thread is not idle
+ if (currentTaskStatus != null) {
+ // extract query id from queryTask string
+ String queryId = currentTaskStatus.getQueryId();
+ if (queryId != null) {
+ Thread anchorThread = currentTaskStatus.getAnchorThread();
+ boolean isAnchorThread = currentTaskStatus.isAnchorThread();
+ ret.compute(queryId,
+ (k, v) -> v == null ? new AggregatedStats(currentCPUSample,
currentMemSample, anchorThread,
+ isAnchorThread, threadEntry._errorStatus, queryId)
+ : v.merge(currentCPUSample, currentMemSample,
isAnchorThread, threadEntry._errorStatus));
+ }
+ }
+ }
+
+ // if triggered, accumulate stats of finished tasks of each active query
+ for (Map.Entry<String, AggregatedStats> queryIdResult : ret.entrySet()) {
+ String activeQueryId = queryIdResult.getKey();
+ long accumulatedCPUValue =
+ _isThreadCPUSamplingEnabled ?
_finishedTaskCPUStatsAggregator.getOrDefault(activeQueryId, 0L) : 0;
+ long concurrentCPUValue =
+ _isThreadCPUSamplingEnabled ?
_concurrentTaskCPUStatsAggregator.getOrDefault(activeQueryId, 0L) : 0;
+ long accumulatedMemValue =
+ _isThreadMemorySamplingEnabled ?
_finishedTaskMemStatsAggregator.getOrDefault(activeQueryId, 0L) : 0;
+ long concurrentMemValue =
+ _isThreadMemorySamplingEnabled ?
_concurrentTaskMemStatsAggregator.getOrDefault(activeQueryId, 0L) : 0;
+ queryIdResult.getValue()
+ .merge(accumulatedCPUValue + concurrentCPUValue,
accumulatedMemValue + concurrentMemValue, false, null);
+ }
+ return ret;
+ }
+
@Override
public void sampleUsage() {
sampleThreadBytesAllocated();
@@ -402,7 +465,7 @@ public class PerQueryCPUMemAccountantFactory implements
ThreadAccountantFactory
/**
* aggregated usage of a query, _thread is the runner
*/
- protected static class AggregatedStats {
+ protected static class AggregatedStats implements QueryResourceTracker {
final String _queryId;
final Thread _anchorThread;
boolean _isAnchorThread;
@@ -432,14 +495,22 @@ public class PerQueryCPUMemAccountantFactory implements
ThreadAccountantFactory
+ '}';
}
- public long getCpuNS() {
- return _cpuNS;
+ @Override
+ public String getQueryId() {
+ return _queryId;
}
+ @Override
public long getAllocatedBytes() {
return _allocatedBytes;
}
+ @Override
+ public long getCpuTimeNs() {
+ return _cpuNS;
+ }
+
+ @JsonIgnore
public Thread getAnchorThread() {
return _anchorThread;
}
@@ -804,7 +875,7 @@ public class PerQueryCPUMemAccountantFactory implements
ThreadAccountantFactory
}
} else {
maxUsageTuple =
Collections.max(_aggregatedUsagePerActiveQuery.values(),
- Comparator.comparing(AggregatedStats::getCpuNS));
+ Comparator.comparing(AggregatedStats::getCpuTimeNs));
if (_oomKillQueryEnabled) {
maxUsageTuple._exceptionAtomicReference
.set(new RuntimeException(String.format(
@@ -828,12 +899,12 @@ public class PerQueryCPUMemAccountantFactory implements
ThreadAccountantFactory
AggregatedStats value = entry.getValue();
if (value._cpuNS > _cpuTimeBasedKillingThresholdNS) {
LOGGER.error("Current task status recorded is {}. Query {} got
picked because using {} ns of cpu time,"
- + " greater than threshold {}", _threadEntriesMap,
value._queryId, value.getCpuNS(),
+ + " greater than threshold {}", _threadEntriesMap,
value._queryId, value.getCpuTimeNs(),
_cpuTimeBasedKillingThresholdNS);
value._exceptionAtomicReference.set(new RuntimeException(
String.format("Query %s got killed on %s: %s because using %d "
- + "CPU time exceeding limit of %d ns CPU time",
- value._queryId, _instanceType, _instanceId,
value.getCpuNS(), _cpuTimeBasedKillingThresholdNS)));
+ + "CPU time exceeding limit of %d ns CPU time",
value._queryId, _instanceType, _instanceId,
+ value.getCpuTimeNs(), _cpuTimeBasedKillingThresholdNS)));
interruptRunnerThread(value.getAnchorThread());
}
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java
b/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java
index 877041eb7d..51cab16711 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java
@@ -34,6 +34,7 @@ public class Actions {
public static final String CREATE_TENANT = "CreateTenant";
public static final String CREATE_USER = "CreateUser";
public static final String DEBUG_TASK = "DebugTask";
+ public static final String DEBUG_RESOURCE_USAGE = "DebugResourceUsage";
public static final String DELETE_CLUSTER_CONFIG = "DeleteClusterConfig";
public static final String DELETE_INSTANCE = "DeleteInstance";
public static final String DELETE_TASK = "DeleteTask";
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/DebugResource.java
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/DebugResource.java
index b90f4b42a4..23f7d9a6cc 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/DebugResource.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/DebugResource.java
@@ -26,6 +26,7 @@ import io.swagger.annotations.Authorization;
import io.swagger.annotations.SecurityDefinition;
import io.swagger.annotations.SwaggerDefinition;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -52,8 +53,12 @@ import
org.apache.pinot.segment.local.data.manager.SegmentDataManager;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
import org.apache.pinot.segment.spi.ImmutableSegment;
import org.apache.pinot.server.starter.ServerInstance;
+import org.apache.pinot.spi.accounting.QueryResourceTracker;
+import org.apache.pinot.spi.accounting.ThreadResourceTracker;
+import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.stream.ConsumerPartitionState;
+import org.apache.pinot.spi.trace.Tracing;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import static org.apache.pinot.spi.utils.CommonConstants.DATABASE;
@@ -123,6 +128,27 @@ public class DebugResource {
}
}
+ @GET
+ @Path("threads/resourceUsage")
+ @Produces(MediaType.APPLICATION_JSON)
+ @ApiOperation(value = "Get current resource usage of threads",
+ notes = "This is a debug endpoint, and won't maintain backward
compatibility")
+ public Collection<? extends ThreadResourceTracker> getThreadUsage() {
+ ThreadResourceUsageAccountant threadAccountant =
Tracing.getThreadAccountant();
+ return threadAccountant.getThreadResources();
+ }
+
+ @GET
+ @Path("queries/resourceUsage")
+ @Produces(MediaType.APPLICATION_JSON)
+ @ApiOperation(value = "Get current resource usage of queries in this
service",
+ notes = "This is a debug endpoint, and won't maintain backward
compatibility")
+ public Collection<? extends QueryResourceTracker> getQueryUsage() {
+ ThreadResourceUsageAccountant threadAccountant =
Tracing.getThreadAccountant();
+ Collection<? extends QueryResourceTracker> resources =
threadAccountant.getQueryResources().values();
+ return resources;
+ }
+
private List<SegmentServerDebugInfo> getSegmentServerDebugInfo(String
tableNameWithType, TableType tableType) {
List<SegmentServerDebugInfo> segmentServerDebugInfos = new ArrayList<>();
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/QueryResourceTracker.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/QueryResourceTracker.java
new file mode 100644
index 0000000000..869528bb75
--- /dev/null
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/QueryResourceTracker.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.accounting;
+
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+
+
+/**
+ * Tracks allocated bytes and CPU time for a query in a server or a broker.
+ */
+@JsonSerialize
+public interface QueryResourceTracker {
+ /**
+ * QueryId tracked by the implementation.
+ * @return a string containing the query id.
+ */
+ String getQueryId();
+
+ /**
+ * Allocated bytes for a query in a server or broker
+ * @return A long containing the number of bytes allocated to execute the
query.
+ */
+ long getAllocatedBytes();
+
+ /**
+ * Total execution CPU Time(nanoseconds) of a query in a server or broker.
+ * @return A long containing the nanoseconds.
+ */
+ long getCpuTimeNs();
+}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceTracker.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceTracker.java
new file mode 100644
index 0000000000..ff78a0d33c
--- /dev/null
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceTracker.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.accounting;
+
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+
+
+/**
+ * Tracks allocated bytes and CPU time by a thread when executing a task of a
query.
+ */
+@JsonSerialize
+public interface ThreadResourceTracker {
+ /**
+ * Total execution CPU Time(nanoseconds) of a thread when executing a query
task in a server or broker.
+ * @return A long containing the nanoseconds.
+ */
+ long getCPUTimeMS();
+
+ /**
+ * Allocated bytes for a query task in a server or broker
+ * @return A long containing the number of bytes allocated to execute the
query task.
+ */
+ long getAllocatedBytes();
+
+ /**
+ * QueryId of the task the thread is executing.
+ * @return a string containing the query id.
+ */
+ String getQueryId();
+
+ /**
+ * TaskId of the task the thread is executing.
+ * @return an int containing the task id.
+ */
+ int getTaskId();
+}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java
index 41bbd59110..8be0632e6b 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java
@@ -18,6 +18,8 @@
*/
package org.apache.pinot.spi.accounting;
+import java.util.Collection;
+import java.util.Map;
import javax.annotation.Nullable;
@@ -74,4 +76,16 @@ public interface ThreadResourceUsageAccountant {
* @return empty string if N/A
*/
Exception getErrorStatus();
+
+ /**
+ * Get all the ThreadResourceTrackers for all threads executing query tasks
+ * @return A collection of ThreadResourceTracker objects
+ */
+ Collection<? extends ThreadResourceTracker> getThreadResources();
+
+ /**
+ * Get all the QueryResourceTrackers for all the queries executing in a
broker or server.
+ * @return A Map of String, QueryResourceTracker for all the queries.
+ */
+ Map<String, ? extends QueryResourceTracker> getQueryResources();
}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java
index a853b03c20..910ebc35cf 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java
@@ -20,9 +20,14 @@ package org.apache.pinot.spi.trace;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
+import org.apache.pinot.spi.accounting.QueryResourceTracker;
import org.apache.pinot.spi.accounting.ThreadAccountantFactory;
import org.apache.pinot.spi.accounting.ThreadExecutionContext;
+import org.apache.pinot.spi.accounting.ThreadResourceTracker;
import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
import org.apache.pinot.spi.env.PinotConfiguration;
@@ -226,6 +231,16 @@ public class Tracing {
public Exception getErrorStatus() {
return null;
}
+
+ @Override
+ public Collection<? extends ThreadResourceTracker> getThreadResources() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public Map<String, ? extends QueryResourceTracker> getQueryResources() {
+ return Collections.emptyMap();
+ }
}
/**
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/MultiStageResourceTrackerQuickStart.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/MultiStageResourceTrackerQuickStart.java
new file mode 100644
index 0000000000..a669011a18
--- /dev/null
+++
b/pinot-tools/src/main/java/org/apache/pinot/tools/MultiStageResourceTrackerQuickStart.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tools;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.tools.admin.PinotAdministrator;
+import org.apache.pinot.tools.utils.SampleQueries;
+
+
+public class MultiStageResourceTrackerQuickStart extends
SingleStageResourceTrackingQuickStart {
+ @Override
+ protected List<String> getQueries() {
+ return List.of(SampleQueries.COUNT_BASEBALL_STATS,
SampleQueries.BASEBALL_STATS_SELF_JOIN,
+ SampleQueries.BASEBALL_JOIN_DIM_BASEBALL_TEAMS);
+ }
+
+ @Override
+ protected Map<String, String> getQueryOptions() {
+ return Collections.singletonMap("queryOptions",
+ CommonConstants.Broker.Request.QueryOptionKey.USE_MULTISTAGE_ENGINE +
"=true");
+ }
+
+ @Override
+ public List<String> types() {
+ return Collections.singletonList("MULTI_STAGE_RESOURCE_TRACKING");
+ }
+
+ public static void main(String[] args)
+ throws Exception {
+ List<String> arguments = new ArrayList<>();
+ arguments.addAll(Arrays.asList("QuickStart", "-type",
"MULTI_STAGE_RESOURCE_TRACKING"));
+ arguments.addAll(Arrays.asList(args));
+ PinotAdministrator.main(arguments.toArray(new String[arguments.size()]));
+ }
+}
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/MultistageEngineQuickStart.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/MultistageEngineQuickStart.java
index 658aa694dc..e249c37c0b 100644
---
a/pinot-tools/src/main/java/org/apache/pinot/tools/MultistageEngineQuickStart.java
+++
b/pinot-tools/src/main/java/org/apache/pinot/tools/MultistageEngineQuickStart.java
@@ -27,6 +27,7 @@ import java.util.Map;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.tools.admin.PinotAdministrator;
import org.apache.pinot.tools.admin.command.QuickstartRunner;
+import org.apache.pinot.tools.utils.SampleQueries;
public class MultistageEngineQuickStart extends Quickstart {
@@ -58,28 +59,23 @@ public class MultistageEngineQuickStart extends Quickstart {
printStatus(Quickstart.Color.YELLOW, "***** Multi-stage engine quickstart
setup complete *****");
Map<String, String> queryOptions = Collections.singletonMap("queryOptions",
CommonConstants.Broker.Request.QueryOptionKey.USE_MULTISTAGE_ENGINE +
"=true");
- String q1 = "SELECT count(*) FROM baseballStats_OFFLINE LIMIT 10";
printStatus(Quickstart.Color.YELLOW, "Total number of documents in the
table");
- printStatus(Quickstart.Color.CYAN, "Query : " + q1);
- printStatus(Quickstart.Color.YELLOW,
prettyPrintResponse(runner.runQuery(q1, queryOptions)));
+ printStatus(Quickstart.Color.CYAN, "Query : " +
SampleQueries.COUNT_BASEBALL_STATS);
+ printStatus(Quickstart.Color.YELLOW,
+
prettyPrintResponse(runner.runQuery(SampleQueries.COUNT_BASEBALL_STATS,
queryOptions)));
printStatus(Quickstart.Color.GREEN,
"***************************************************");
- String q2 = "SELECT a.playerID, a.runs, a.yearID, b.runs, b.yearID"
- + " FROM baseballStats_OFFLINE AS a JOIN baseballStats_OFFLINE AS b ON
a.playerID = b.playerID"
- + " WHERE a.runs > 160 AND b.runs < 2 LIMIT 10";
printStatus(Quickstart.Color.YELLOW, "Correlate the same player(s) with
more than 160-run some year(s) and"
+ " with less than 2-run some other year(s)");
- printStatus(Quickstart.Color.CYAN, "Query : " + q2);
- printStatus(Quickstart.Color.YELLOW,
prettyPrintResponse(runner.runQuery(q2, queryOptions)));
+ printStatus(Quickstart.Color.CYAN, "Query : " +
SampleQueries.BASEBALL_STATS_SELF_JOIN);
+ printStatus(Quickstart.Color.YELLOW,
+
prettyPrintResponse(runner.runQuery(SampleQueries.BASEBALL_STATS_SELF_JOIN,
queryOptions)));
printStatus(Quickstart.Color.GREEN,
"***************************************************");
- String q3 = "SELECT a.playerName, a.teamID, b.teamName \n"
- + "FROM baseballStats_OFFLINE AS a\n"
- + "JOIN dimBaseballTeams_OFFLINE AS b\n"
- + "ON a.teamID = b.teamID LIMIT 10";
printStatus(Quickstart.Color.YELLOW, "Baseball Stats with joined team
names");
- printStatus(Quickstart.Color.CYAN, "Query : " + q3);
- printStatus(Quickstart.Color.YELLOW,
prettyPrintResponse(runner.runQuery(q3, queryOptions)));
+ printStatus(Quickstart.Color.CYAN, "Query : " +
SampleQueries.BASEBALL_JOIN_DIM_BASEBALL_TEAMS);
+ printStatus(Quickstart.Color.YELLOW,
+
prettyPrintResponse(runner.runQuery(SampleQueries.BASEBALL_JOIN_DIM_BASEBALL_TEAMS,
queryOptions)));
printStatus(Quickstart.Color.GREEN,
"***************************************************");
String q4 =
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/SingleStageResourceTrackingQuickStart.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/SingleStageResourceTrackingQuickStart.java
new file mode 100644
index 0000000000..8fbc4fbdf6
--- /dev/null
+++
b/pinot-tools/src/main/java/org/apache/pinot/tools/SingleStageResourceTrackingQuickStart.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tools;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.apache.pinot.tools.admin.PinotAdministrator;
+import org.apache.pinot.tools.admin.command.QuickstartRunner;
+import org.apache.pinot.tools.utils.PinotConfigUtils;
+import org.apache.pinot.tools.utils.SampleQueries;
+
+public class SingleStageResourceTrackingQuickStart extends Quickstart {
+ @Override
+ protected Map<String, Object> getConfigOverrides() {
+ Map<String, Object> configOverrides = new HashMap<>();
+ // Quickstart.getConfigOverrides may return an ImmutableMap.
+ configOverrides.putAll(super.getConfigOverrides());
+ configOverrides.putAll(PinotConfigUtils.getResourceTrackingConf());
+ return configOverrides;
+ }
+
+ @Override
+ public void runSampleQueries(QuickstartRunner runner)
+ throws Exception {
+ List<String> queries = getQueries();
+ Map<String, String> queryOptions = getQueryOptions();
+
+ printStatus(Color.YELLOW, "***** Running queries for eternity *****");
+ ExecutorService service = Executors.newFixedThreadPool(10);
+
+ for (int i = 0; i < 10; i++) {
+ service.submit(() -> {
+ try {
+ while (true) {
+ for (String query : queries) {
+ runner.runQuery(query, queryOptions);
+ }
+ Thread.sleep(10);
+ }
+ } catch (Exception e) {
+ printStatus(Color.CYAN, e.getMessage());
+ }
+ });
+ }
+ }
+
+ protected List<String> getQueries() {
+ return List.of(SampleQueries.COUNT_BASEBALL_STATS,
+ SampleQueries.BASEBALL_SUM_RUNS_Q1,
+ SampleQueries.BASEBALL_SUM_RUNS_Q2,
+ SampleQueries.BASEBALL_SUM_RUNS_Q3,
+ SampleQueries.BASEBALL_ORDER_BY_YEAR);
+ }
+
+ protected Map<String, String> getQueryOptions() {
+ return Collections.emptyMap();
+ }
+
+ @Override
+ public List<String> types() {
+ return Collections.singletonList("SINGLE_STAGE_RESOURCE_TRACKING");
+ }
+
+ public static void main(String[] args)
+ throws Exception {
+ List<String> arguments = new ArrayList<>();
+ arguments.addAll(Arrays.asList("QuickStart", "-type",
"SINGLE_STAGE_RESOURCE_TRACKING"));
+ arguments.addAll(Arrays.asList(args));
+ PinotAdministrator.main(arguments.toArray(new String[arguments.size()]));
+ }
+}
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/utils/PinotConfigUtils.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/utils/PinotConfigUtils.java
index 77d0eee664..0a9e1cd31e 100644
---
a/pinot-tools/src/main/java/org/apache/pinot/tools/utils/PinotConfigUtils.java
+++
b/pinot-tools/src/main/java/org/apache/pinot/tools/utils/PinotConfigUtils.java
@@ -32,6 +32,7 @@ import
org.apache.commons.configuration2.ex.ConfigurationException;
import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.ControllerConf.ControllerPeriodicTasksConf;
+import org.apache.pinot.core.accounting.PerQueryCPUMemAccountantFactory;
import org.apache.pinot.spi.env.CommonsConfigurationUtils;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.NetUtils;
@@ -215,6 +216,27 @@ public class PinotConfigUtils {
return properties;
}
+ public static Map<String, Object> getResourceTrackingConf() {
+ Map<String, Object> configOverrides = new HashMap<>();
+
configOverrides.put(CommonConstants.Server.CONFIG_OF_ENABLE_THREAD_CPU_TIME_MEASUREMENT,
true);
+
configOverrides.put(CommonConstants.Server.CONFIG_OF_ENABLE_THREAD_ALLOCATED_BYTES_MEASUREMENT,
true);
+
configOverrides.put(CommonConstants.Broker.CONFIG_OF_ENABLE_THREAD_CPU_TIME_MEASUREMENT,
true);
+
configOverrides.put(CommonConstants.Broker.CONFIG_OF_ENABLE_THREAD_ALLOCATED_BYTES_MEASUREMENT,
true);
+
configOverrides.put(CommonConstants.Accounting.CONFIG_OF_ENABLE_THREAD_CPU_SAMPLING,
true);
+
configOverrides.put(CommonConstants.Accounting.CONFIG_OF_ENABLE_THREAD_MEMORY_SAMPLING,
true);
+
+ configOverrides.put(
+ CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX + "." +
CommonConstants.Accounting.CONFIG_OF_FACTORY_NAME,
+ PerQueryCPUMemAccountantFactory.class.getCanonicalName());
+ configOverrides.put(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX + "."
+ + CommonConstants.Accounting.CONFIG_OF_ENABLE_THREAD_MEMORY_SAMPLING,
true);
+ configOverrides.put(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX + "."
+ + CommonConstants.Accounting.CONFIG_OF_ENABLE_THREAD_CPU_SAMPLING,
false);
+ configOverrides.put(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX + "."
+ + CommonConstants.Accounting.CONFIG_OF_OOM_PROTECTION_KILLING_QUERY,
true);
+ return configOverrides;
+ }
+
public static int getAvailablePort() {
try {
try (ServerSocket socket = new ServerSocket(0)) {
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/utils/SampleQueries.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/utils/SampleQueries.java
new file mode 100644
index 0000000000..8247de78b7
--- /dev/null
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/utils/SampleQueries.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tools.utils;
+
+public class SampleQueries {
+ private SampleQueries() {
+ }
+
+ public final static String COUNT_BASEBALL_STATS = "SELECT count(*) FROM
baseballStats_OFFLINE LIMIT 10";
+ public final static String BASEBALL_STATS_SELF_JOIN = "SELECT a.playerID,
a.runs, a.yearID, b.runs, b.yearID"
+ + " FROM baseballStats_OFFLINE AS a JOIN baseballStats_OFFLINE AS b ON
a.playerID = b.playerID"
+ + " WHERE a.runs > 160 AND b.runs < 2 LIMIT 10";
+ public final static String BASEBALL_JOIN_DIM_BASEBALL_TEAMS =
+ "SELECT a.playerName, a.teamID, b.teamName \n" + "FROM
baseballStats_OFFLINE AS a\n"
+ + "JOIN dimBaseballTeams_OFFLINE AS b\n" + "ON a.teamID = b.teamID
LIMIT 10";
+ public final static String BASEBALL_SUM_RUNS_Q1 =
+ "select playerName, sum(runs) from baseballStats group by playerName
order by sum(runs) desc limit 5";
+ public final static String BASEBALL_SUM_RUNS_Q2 =
+ "select playerName, sum(runs) from baseballStats where yearID=2000 group
by playerName order by sum(runs) "
+ + "desc limit 5";
+ public final static String BASEBALL_SUM_RUNS_Q3 =
+ "select playerName, sum(runs) from baseballStats where yearID>=2000
group by playerName order by sum(runs) "
+ + "desc limit 10";
+ public final static String BASEBALL_ORDER_BY_YEAR =
+ "select playerName, runs, homeRuns from baseballStats order by yearID
limit 10";
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]