This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 1a4a5309c5d Cleanup deprecated methods in
ThreadResourceUsageAccountant (#16479)
1a4a5309c5d is described below
commit 1a4a5309c5d684df8a8a39051f388324531b76ab
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Fri Aug 1 12:36:54 2025 -0600
Cleanup deprecated methods in ThreadResourceUsageAccountant (#16479)
---
.../restlet/resources/SystemResourceInfo.java | 7 +--
.../apache/pinot/common/utils/PinotAppConfigs.java | 13 ++---
.../HeapUsagePublishingAccountantFactory.java | 7 +--
.../PerQueryCPUMemAccountantFactory.java | 42 ++------------
.../pinot/core/accounting/QueryAggregator.java | 11 ++--
.../pinot/core/accounting/ResourceAggregator.java | 50 ++++++++--------
.../accounting/ResourceUsageAccountantFactory.java | 67 ++++------------------
.../pinot/core/accounting/WorkloadAggregator.java | 1 +
.../core/query/scheduler/WorkloadScheduler.java | 2 +-
.../accounting/ResourceManagerAccountingTest.java | 1 +
.../pinot/core/accounting/TestThreadMXBean.java | 18 +++---
.../pinot/perf/BenchmarkWorkloadBudgetManager.java | 2 +-
.../minion/tasks/purge/PurgeTaskExecutor.java | 11 +---
.../accounting/ThreadResourceUsageAccountant.java | 38 +-----------
.../accounting/ThreadResourceUsageProvider.java | 13 +++--
.../accounting/WorkloadBudgetManager.java | 2 +-
.../java/org/apache/pinot/spi/trace/Tracing.java | 62 ++------------------
.../apache/pinot/spi/utils/ResourceUsageUtils.java | 43 ++++++++++++++
.../spi}/accounting/WorkloadBudgetManagerTest.java | 6 +-
.../ThrottleOnCriticalHeapUsageExecutorTest.java | 30 ++--------
20 files changed, 133 insertions(+), 293 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/SystemResourceInfo.java
b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/SystemResourceInfo.java
index b188ad3f1ad..8220a12ed46 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/SystemResourceInfo.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/SystemResourceInfo.java
@@ -20,11 +20,10 @@ package org.apache.pinot.common.restlet.resources;
import com.google.common.collect.ImmutableMap;
import java.lang.management.ManagementFactory;
-import java.lang.management.MemoryMXBean;
-import java.lang.management.MemoryUsage;
import java.lang.management.OperatingSystemMXBean;
import java.util.HashMap;
import java.util.Map;
+import org.apache.pinot.spi.utils.ResourceUsageUtils;
/**
@@ -58,9 +57,7 @@ public class SystemResourceInfo {
_totalMemoryMB = runtime.totalMemory() / MEGA_BYTES;
}
- MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
- MemoryUsage heapMemoryUsage = memoryMXBean.getHeapMemoryUsage();
- _maxHeapSizeMB = heapMemoryUsage.getMax() / MEGA_BYTES;
+ _maxHeapSizeMB = ResourceUsageUtils.getMaxHeapSize() / MEGA_BYTES;
}
/**
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/PinotAppConfigs.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/PinotAppConfigs.java
index 0f29c3a2871..67c36e719dc 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/utils/PinotAppConfigs.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/PinotAppConfigs.java
@@ -25,20 +25,20 @@ import com.fasterxml.jackson.annotation.JsonPropertyOrder;
import com.fasterxml.jackson.core.JsonProcessingException;
import java.lang.management.GarbageCollectorMXBean;
import java.lang.management.ManagementFactory;
-import java.lang.management.MemoryMXBean;
import java.lang.management.MemoryManagerMXBean;
import java.lang.management.MemoryUsage;
import java.lang.management.OperatingSystemMXBean;
import java.lang.management.RuntimeMXBean;
-import java.lang.management.ThreadMXBean;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.commons.io.FileUtils;
+import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.Obfuscator;
+import org.apache.pinot.spi.utils.ResourceUsageUtils;
/**
@@ -126,12 +126,9 @@ public class PinotAppConfigs {
}
private RuntimeConfig buildRuntimeConfig() {
- MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
- MemoryUsage heapMemoryUsage = memoryMXBean.getHeapMemoryUsage();
-
- ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
- return new RuntimeConfig(threadMXBean.getTotalStartedThreadCount(),
threadMXBean.getThreadCount(),
- FileUtils.byteCountToDisplaySize(heapMemoryUsage.getMax()),
+ MemoryUsage heapMemoryUsage = ResourceUsageUtils.getHeapMemoryUsage();
+ return new
RuntimeConfig(ThreadResourceUsageProvider.getTotalStartedThreadCount(),
+ ThreadResourceUsageProvider.getThreadCount(),
FileUtils.byteCountToDisplaySize(heapMemoryUsage.getMax()),
FileUtils.byteCountToDisplaySize(heapMemoryUsage.getUsed()));
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/HeapUsagePublishingAccountantFactory.java
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/HeapUsagePublishingAccountantFactory.java
index 7e82550286e..c9861b74df3 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/HeapUsagePublishingAccountantFactory.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/HeapUsagePublishingAccountantFactory.java
@@ -18,8 +18,6 @@
*/
package org.apache.pinot.core.accounting;
-import java.lang.management.ManagementFactory;
-import java.lang.management.MemoryMXBean;
import java.util.Timer;
import java.util.TimerTask;
import org.apache.pinot.common.metrics.ServerGauge;
@@ -30,6 +28,7 @@ import org.apache.pinot.spi.config.instance.InstanceType;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.trace.Tracing;
import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.ResourceUsageUtils;
/**
@@ -46,7 +45,6 @@ public class HeapUsagePublishingAccountantFactory implements
ThreadAccountantFac
}
public static class HeapUsagePublishingResourceUsageAccountant extends
Tracing.DefaultThreadResourceUsageAccountant {
- static final MemoryMXBean MEMORY_MX_BEAN =
ManagementFactory.getMemoryMXBean();
private final Timer _timer;
private final int _period;
@@ -56,8 +54,7 @@ public class HeapUsagePublishingAccountantFactory implements
ThreadAccountantFac
}
public void publishHeapUsageMetrics() {
- ServerMetrics.get()
- .setValueOfGlobalGauge(ServerGauge.JVM_HEAP_USED_BYTES,
MEMORY_MX_BEAN.getHeapMemoryUsage().getUsed());
+
ServerMetrics.get().setValueOfGlobalGauge(ServerGauge.JVM_HEAP_USED_BYTES,
ResourceUsageUtils.getUsedHeapSize());
}
@Override
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
index 46e700fe843..b88f06f2917 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
@@ -21,8 +21,6 @@ package org.apache.pinot.core.accounting;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
-import java.lang.management.ManagementFactory;
-import java.lang.management.MemoryMXBean;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
@@ -57,6 +55,7 @@ import
org.apache.pinot.spi.config.provider.PinotClusterConfigChangeListener;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.metrics.PinotMetricUtils;
import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.ResourceUsageUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -78,11 +77,6 @@ public class PerQueryCPUMemAccountantFactory implements
ThreadAccountantFactory
}
public static class PerQueryCPUMemResourceUsageAccountant implements
ThreadResourceUsageAccountant {
-
- /**
- * MemoryMXBean to get total heap used memory
- */
- static final MemoryMXBean MEMORY_MX_BEAN =
ManagementFactory.getMemoryMXBean();
private static final Logger LOGGER =
LoggerFactory.getLogger(PerQueryCPUMemResourceUsageAccountant.class);
private static final boolean IS_DEBUG_MODE_ENABLED =
LOGGER.isDebugEnabled();
/**
@@ -326,22 +320,6 @@ public class PerQueryCPUMemAccountantFactory implements
ThreadAccountantFactory
return false;
}
- @Override
- @Deprecated
- public void createExecutionContext(String queryId, int taskId,
ThreadExecutionContext.TaskType taskType,
- @Nullable ThreadExecutionContext parentContext) {
- }
-
- @Override
- @Deprecated
- public void setThreadResourceUsageProvider(ThreadResourceUsageProvider
threadResourceUsageProvider) {
- }
-
- @Override
- @Deprecated
- public void updateQueryUsageConcurrently(String queryId, long cpuTimeNs,
long memoryAllocatedBytes) {
- }
-
@Override
public void updateQueryUsageConcurrently(String queryId, long cpuTimeNs,
long memoryAllocatedBytes,
TrackingScope trackingScope) {
@@ -359,11 +337,6 @@ public class PerQueryCPUMemAccountantFactory implements
ThreadAccountantFactory
}
}
- @Override
- @Deprecated
- public void updateQueryUsageConcurrently(String queryId) {
- }
-
/**
* The thread would need to do {@code setThreadResourceUsageProvider}
first upon it is scheduled.
* This is to be called from a worker or a runner thread to update its
corresponding cpu usage entry
@@ -386,13 +359,6 @@ public class PerQueryCPUMemAccountantFactory implements
ThreadAccountantFactory
}
}
- @Deprecated
- @Override
- public void setupRunner(String queryId, int taskId,
ThreadExecutionContext.TaskType taskType) {
- setupRunner(queryId, taskId, taskType,
CommonConstants.Accounting.DEFAULT_WORKLOAD_NAME);
- }
-
-
@Override
public void setupRunner(@Nullable String queryId, int taskId,
ThreadExecutionContext.TaskType taskType,
String workloadName) {
@@ -559,8 +525,8 @@ public class PerQueryCPUMemAccountantFactory implements
ThreadAccountantFactory
Thread thread = entry.getKey();
if (!thread.isAlive()) {
+ LOGGER.debug("Thread: {} is no longer alive, removing it from
_threadEntriesMap", thread.getName());
_threadEntriesMap.remove(thread);
- LOGGER.debug("Removing thread from _threadLocalEntry: {}",
thread.getName());
}
}
_cancelSentQueries = cancellingQueries;
@@ -701,7 +667,7 @@ public class PerQueryCPUMemAccountantFactory implements
ThreadAccountantFactory
private final AbstractMetrics.Gauge _memoryUsageGauge;
WatcherTask() {
- _queryMonitorConfig.set(new QueryMonitorConfig(_config,
MEMORY_MX_BEAN.getHeapMemoryUsage().getMax()));
+ _queryMonitorConfig.set(new QueryMonitorConfig(_config,
ResourceUsageUtils.getMaxHeapSize()));
logQueryMonitorConfig();
switch (_instanceType) {
@@ -835,7 +801,7 @@ public class PerQueryCPUMemAccountantFactory implements
ThreadAccountantFactory
}
private void collectTriggerMetrics() {
- _usedBytes = MEMORY_MX_BEAN.getHeapMemoryUsage().getUsed();
+ _usedBytes = ResourceUsageUtils.getUsedHeapSize();
LOGGER.debug("Heap used bytes {}", _usedBytes);
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/QueryAggregator.java
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/QueryAggregator.java
index 7c8bb815109..4c051c1b5b8 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/QueryAggregator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/QueryAggregator.java
@@ -19,8 +19,6 @@
package org.apache.pinot.core.accounting;
import com.fasterxml.jackson.annotation.JsonIgnore;
-import java.lang.management.ManagementFactory;
-import java.lang.management.MemoryMXBean;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
@@ -42,6 +40,7 @@ import org.apache.pinot.spi.config.instance.InstanceType;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.metrics.PinotMetricUtils;
import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.ResourceUsageUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -56,8 +55,6 @@ import org.slf4j.LoggerFactory;
public class QueryAggregator implements ResourceAggregator {
private static final Logger LOGGER =
LoggerFactory.getLogger(QueryAggregator.class);
- static final MemoryMXBean MEMORY_MX_BEAN =
ManagementFactory.getMemoryMXBean();
-
enum TriggeringLevel {
Normal, HeapMemoryAlarmingVerbose, CPUTimeBasedKilling,
HeapMemoryCritical, HeapMemoryPanic
}
@@ -81,7 +78,7 @@ public class QueryAggregator implements ResourceAggregator {
private final String _instanceId;
// max heap usage, Xmx
- private final long _maxHeapSize =
MEMORY_MX_BEAN.getHeapMemoryUsage().getMax();
+ private final long _maxHeapSize = ResourceUsageUtils.getMaxHeapSize();
// don't kill a query if its memory footprint is below some ratio of
_maxHeapSize
private final long _minMemoryFootprintForKill;
@@ -420,7 +417,7 @@ public class QueryAggregator implements ResourceAggregator {
Thread.sleep(_gcWaitTime);
} catch (InterruptedException ignored) {
}
- _usedBytes = MEMORY_MX_BEAN.getHeapMemoryUsage().getUsed();
+ _usedBytes = ResourceUsageUtils.getUsedHeapSize();
if (_usedBytes < _criticalLevelAfterGC) {
return;
}
@@ -637,7 +634,7 @@ public class QueryAggregator implements ResourceAggregator {
}
private void collectTriggerMetrics() {
- _usedBytes = MEMORY_MX_BEAN.getHeapMemoryUsage().getUsed();
+ _usedBytes = ResourceUsageUtils.getUsedHeapSize();
LOGGER.debug("Heap used bytes {}", _usedBytes);
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/ResourceAggregator.java
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/ResourceAggregator.java
index 59649c1061a..30566e2f1f4 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/ResourceAggregator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/ResourceAggregator.java
@@ -21,39 +21,35 @@ package org.apache.pinot.core.accounting;
import java.util.List;
-/**
- * Interface for aggregating CPU and memory usage of threads.
- */
+/// Interface for aggregating CPU and memory usage of threads.
public interface ResourceAggregator {
- /**
- * Update CPU usage for one-off cases where identifier is known before-hand.
For example: broker inbound netty
- * thread where queryId and workloadName are already known.
- *
- * @param name identifier name - workload name, queryId, etc.
- * @param cpuTimeNs CPU time in nanoseconds
- */
- public void updateConcurrentCpuUsage(String name, long cpuTimeNs);
+ /// Updates CPU usage for one-off cases where identifier is known
beforehand. For example: broker inbound netty thread
+ /// where queryId and workloadName are already known.
+ ///
+ /// @param name identifier name - queryId, workload name, etc.
+ /// @param cpuTimeNs CPU time in nanoseconds
+ void updateConcurrentCpuUsage(String name, long cpuTimeNs);
- /**
- * Update CPU usage for one-off cases where identifier is known before-hand.
For example: broker inbound netty
- * @param name identifier name - workload name, queryId, etc.
- * @param memBytes memory usage in bytes
- */
- public void updateConcurrentMemUsage(String name, long memBytes);
+ /// Updates memory usage for one-off cases where identifier is known
beforehand. For example: broker inbound netty
+ /// thread where queryId and workloadName are already known.
+ ///
+ /// @param name identifier name - queryId, workload name, etc.
+ /// @param memBytes memory usage in bytes
+ void updateConcurrentMemUsage(String name, long memBytes);
- // Cleanup of state after periodic aggregation is complete.
- public void cleanUpPostAggregation();
+ /// Cleans up state after periodic aggregation is complete.
+ void cleanUpPostAggregation();
- // Sleep time between aggregations.
- public int getAggregationSleepTimeMs();
+ /// Sleep time between aggregations.
+ int getAggregationSleepTimeMs();
- // Pre-aggregation step to be called before the aggregation of all thread
entries.
- public void
preAggregate(List<CPUMemThreadLevelAccountingObjects.ThreadEntry>
anchorThreadEntries);
+ /// Pre-aggregation step to be called before the aggregation of all thread
entries.
+ void preAggregate(List<CPUMemThreadLevelAccountingObjects.ThreadEntry>
anchorThreadEntries);
- // Aggregation of each thread entry
- public void aggregate(Thread thread,
CPUMemThreadLevelAccountingObjects.ThreadEntry threadEntry);
+ /// Aggregates on a thread entry.
+ void aggregate(Thread thread, CPUMemThreadLevelAccountingObjects.ThreadEntry
threadEntry);
- // Post-aggregation step to be called after the aggregation of all thread
entries.
- public void postAggregate();
+ /// Post-aggregation step to be called after the aggregation of all thread
entries.
+ void postAggregate();
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/ResourceUsageAccountantFactory.java
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/ResourceUsageAccountantFactory.java
index 3b3d509bf5d..e00f74d888d 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/ResourceUsageAccountantFactory.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/ResourceUsageAccountantFactory.java
@@ -20,8 +20,7 @@ package org.apache.pinot.core.accounting;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
+import java.util.EnumMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -42,6 +41,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+// TODO: Incorporate query OOM kill handling in
PerQueryCPUMemAccountantFactory into this class
public class ResourceUsageAccountantFactory implements ThreadAccountantFactory
{
@Override
@@ -62,8 +62,6 @@ public class ResourceUsageAccountantFactory implements
ThreadAccountantFactory {
return thread;
});
- private final PinotConfiguration _config;
-
// the map to track stats entry for each thread, the entry will
automatically be added when one calls
// setThreadResourceUsageProvider on the thread, including but not limited
to
// server worker thread, runner thread, broker jetty thread, or broker
netty thread
@@ -87,19 +85,12 @@ public class ResourceUsageAccountantFactory implements
ThreadAccountantFactory {
// is sampling allowed for MSE queries
private final boolean _isThreadSamplingEnabledForMSE;
- // instance id of the current instance, for logging purpose
- private final String _instanceId;
-
private final WatcherTask _watcherTask;
- private final Map<TrackingScope, ResourceAggregator> _resourceAggregators;
-
- private final InstanceType _instanceType;
+ private final EnumMap<TrackingScope, ResourceAggregator>
_resourceAggregators;
public ResourceUsageAccountant(PinotConfiguration config, String
instanceId, InstanceType instanceType) {
LOGGER.info("Initializing ResourceUsageAccountant");
- _config = config;
- _instanceId = instanceId;
boolean threadCpuTimeMeasurementEnabled =
ThreadResourceUsageProvider.isThreadCpuTimeMeasurementEnabled();
boolean threadMemoryMeasurementEnabled =
ThreadResourceUsageProvider.isThreadMemoryMeasurementEnabled();
@@ -113,7 +104,6 @@ public class ResourceUsageAccountantFactory implements
ThreadAccountantFactory {
CommonConstants.Accounting.DEFAULT_ENABLE_THREAD_MEMORY_SAMPLING);
LOGGER.info("cpuSamplingConfig: {}, memorySamplingConfig: {}",
cpuSamplingConfig, memorySamplingConfig);
- _instanceType = instanceType;
_isThreadCPUSamplingEnabled = cpuSamplingConfig &&
threadCpuTimeMeasurementEnabled;
_isThreadMemorySamplingEnabled = memorySamplingConfig &&
threadMemoryMeasurementEnabled;
LOGGER.info("_isThreadCPUSamplingEnabled: {},
_isThreadMemorySamplingEnabled: {}", _isThreadCPUSamplingEnabled,
@@ -126,15 +116,15 @@ public class ResourceUsageAccountantFactory implements
ThreadAccountantFactory {
_watcherTask = new WatcherTask();
- _resourceAggregators = new HashMap<>();
+ _resourceAggregators = new EnumMap<>(TrackingScope.class);
// Add all aggregators. Configs of enabling/disabling cost
collection/enforcement are handled in the aggregators.
_resourceAggregators.put(TrackingScope.WORKLOAD,
- new WorkloadAggregator(_isThreadCPUSamplingEnabled,
_isThreadMemorySamplingEnabled, _config, _instanceType,
- _instanceId));
+ new WorkloadAggregator(_isThreadCPUSamplingEnabled,
_isThreadMemorySamplingEnabled, config, instanceType,
+ instanceId));
_resourceAggregators.put(TrackingScope.QUERY,
- new QueryAggregator(_isThreadCPUSamplingEnabled,
_isThreadMemorySamplingEnabled, _config, _instanceType,
- _instanceId));
+ new QueryAggregator(_isThreadCPUSamplingEnabled,
_isThreadMemorySamplingEnabled, config, instanceType,
+ instanceId));
}
@Override
@@ -166,32 +156,6 @@ public class ResourceUsageAccountantFactory implements
ThreadAccountantFactory {
return false;
}
- @Override
- @Deprecated
- public void createExecutionContext(String queryId, int taskId,
ThreadExecutionContext.TaskType taskType,
- @Nullable ThreadExecutionContext parentContext) {
- }
-
- @Override
- @Deprecated
- public void setThreadResourceUsageProvider(ThreadResourceUsageProvider
threadResourceUsageProvider) {
- }
-
- @Override
- @Deprecated
- public void updateQueryUsageConcurrently(String queryId, long cpuTimeNs,
long memoryAllocatedBytes) {
- }
-
- @Override
- @Deprecated
- public void updateQueryUsageConcurrently(String queryId) {
- }
-
- @Override
- public void setupRunner(String queryId, int taskId,
ThreadExecutionContext.TaskType taskType) {
- setupRunner(queryId, taskId, taskType,
CommonConstants.Accounting.DEFAULT_WORKLOAD_NAME);
- }
-
@Override
public void setupRunner(String queryId, int taskId,
ThreadExecutionContext.TaskType taskType, String workloadName) {
_threadLocalEntry.get()._errorStatus.set(null);
@@ -204,11 +168,12 @@ public class ResourceUsageAccountantFactory implements
ThreadAccountantFactory {
@Override
public void setupWorker(int taskId, ThreadExecutionContext.TaskType
taskType,
- @Nullable ThreadExecutionContext parentContext) {
+ @Nullable ThreadExecutionContext parentContext) {
_threadLocalEntry.get()._errorStatus.set(null);
if (parentContext != null && parentContext.getQueryId() != null &&
parentContext.getAnchorThread() != null) {
-
_threadLocalEntry.get().setThreadTaskStatus(parentContext.getQueryId(), taskId,
parentContext.getTaskType(),
- parentContext.getAnchorThread(), parentContext.getWorkloadName());
+ _threadLocalEntry.get()
+ .setThreadTaskStatus(parentContext.getQueryId(), taskId,
parentContext.getTaskType(),
+ parentContext.getAnchorThread(),
parentContext.getWorkloadName());
}
}
@@ -226,10 +191,6 @@ public class ResourceUsageAccountantFactory implements
ThreadAccountantFactory {
@Override
public Map<String, ? extends QueryResourceTracker> getQueryResources() {
QueryAggregator queryAggregator = (QueryAggregator)
_resourceAggregators.get(TrackingScope.QUERY);
- if (queryAggregator == null) {
- return Collections.emptyMap();
- }
-
return queryAggregator.getQueryResources(_threadEntriesMap);
}
@@ -237,9 +198,6 @@ public class ResourceUsageAccountantFactory implements
ThreadAccountantFactory {
public void updateQueryUsageConcurrently(String identifier, long
cpuTimeNs, long memoryAllocatedBytes,
TrackingScope trackingScope) {
ResourceAggregator resourceAggregator =
_resourceAggregators.get(trackingScope);
- if (resourceAggregator == null) {
- return;
- }
if (_isThreadCPUSamplingEnabled) {
resourceAggregator.updateConcurrentCpuUsage(identifier, cpuTimeNs);
}
@@ -312,7 +270,6 @@ public class ResourceUsageAccountantFactory implements
ThreadAccountantFactory {
return anchorThreadEntries;
}
-
class WatcherTask implements Runnable {
WatcherTask() {
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/WorkloadAggregator.java
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/WorkloadAggregator.java
index 33de4bf2a1b..72d0d818cda 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/WorkloadAggregator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/WorkloadAggregator.java
@@ -23,6 +23,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import org.apache.pinot.spi.accounting.WorkloadBudgetManager;
import org.apache.pinot.spi.config.instance.InstanceType;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.trace.Tracing;
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/WorkloadScheduler.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/WorkloadScheduler.java
index 9d0fbb06215..e90f624e22e 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/WorkloadScheduler.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/WorkloadScheduler.java
@@ -26,12 +26,12 @@ import org.apache.pinot.common.metrics.ServerMeter;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.metrics.ServerQueryPhase;
import org.apache.pinot.common.utils.config.QueryOptionsUtils;
-import org.apache.pinot.core.accounting.WorkloadBudgetManager;
import org.apache.pinot.core.query.executor.QueryExecutor;
import org.apache.pinot.core.query.request.ServerQueryRequest;
import org.apache.pinot.core.query.scheduler.resources.QueryExecutorService;
import
org.apache.pinot.core.query.scheduler.resources.UnboundedResourceManager;
import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
+import org.apache.pinot.spi.accounting.WorkloadBudgetManager;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.query.QueryThreadContext;
import org.apache.pinot.spi.trace.Tracing;
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/accounting/ResourceManagerAccountingTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/accounting/ResourceManagerAccountingTest.java
index 4986e7684d3..2641cd60bd3 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/accounting/ResourceManagerAccountingTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/accounting/ResourceManagerAccountingTest.java
@@ -61,6 +61,7 @@ import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
import org.apache.pinot.spi.accounting.ThreadExecutionContext;
import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
+import org.apache.pinot.spi.accounting.WorkloadBudgetManager;
import org.apache.pinot.spi.config.instance.InstanceType;
import org.apache.pinot.spi.config.table.JsonIndexConfig;
import org.apache.pinot.spi.env.PinotConfiguration;
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/accounting/TestThreadMXBean.java
b/pinot-core/src/test/java/org/apache/pinot/core/accounting/TestThreadMXBean.java
index 572c2afaa8e..6d5614f43e0 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/accounting/TestThreadMXBean.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/accounting/TestThreadMXBean.java
@@ -18,8 +18,6 @@
*/
package org.apache.pinot.core.accounting;
-import java.lang.management.ManagementFactory;
-import java.lang.management.MemoryMXBean;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -28,6 +26,7 @@ import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.apache.pinot.spi.accounting.ThreadResourceSnapshot;
import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
+import org.apache.pinot.spi.utils.ResourceUsageUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
@@ -72,10 +71,9 @@ public class TestThreadMXBean {
AtomicLong b = new AtomicLong();
AtomicLong c = new AtomicLong();
ExecutorService executor = Executors.newFixedThreadPool(3);
- MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
System.gc();
- long heapPrev = memoryMXBean.getHeapMemoryUsage().getUsed();
+ long heapPrev = ResourceUsageUtils.getUsedHeapSize();
ThreadResourceSnapshot threadResourceSnapshot0 = new
ThreadResourceSnapshot();
executor.submit(() -> {
ThreadResourceSnapshot threadResourceSnapshot = new
ThreadResourceSnapshot();
@@ -108,7 +106,7 @@ public class TestThreadMXBean {
long d = threadResourceSnapshot0.getAllocatedBytes();
long threadAllocatedBytes = a.get() + b.get() + c.get() + d;
- float heapUsedBytes = (float)
memoryMXBean.getHeapMemoryUsage().getUsed() - heapPrev;
+ float heapUsedBytes = (float) ResourceUsageUtils.getUsedHeapSize() -
heapPrev;
float ratio = threadAllocatedBytes / heapUsedBytes;
LOGGER.info("Measured thread allocated bytes {}, heap used bytes {},
ratio {}",
@@ -129,10 +127,9 @@ public class TestThreadMXBean {
AtomicLong b = new AtomicLong();
AtomicLong c = new AtomicLong();
ExecutorService executor = Executors.newFixedThreadPool(3);
- MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
System.gc();
- long heapPrev = memoryMXBean.getHeapMemoryUsage().getUsed();
+ long heapPrev = ResourceUsageUtils.getUsedHeapSize();
ThreadResourceSnapshot threadResourceSnapshot0 = new
ThreadResourceSnapshot();
executor.submit(() -> {
ThreadResourceSnapshot threadResourceSnapshot = new
ThreadResourceSnapshot();
@@ -165,7 +162,7 @@ public class TestThreadMXBean {
long d = threadResourceSnapshot0.getAllocatedBytes();
long threadAllocatedBytes = a.get() + b.get() + c.get() + d;
- float heapUsedBytes = (float)
memoryMXBean.getHeapMemoryUsage().getUsed() - heapPrev;
+ float heapUsedBytes = (float) ResourceUsageUtils.getUsedHeapSize() -
heapPrev;
float ratio = threadAllocatedBytes / heapUsedBytes;
LOGGER.info("Measured thread allocated bytes {}, heap used bytes {},
ratio {}",
@@ -180,15 +177,14 @@ public class TestThreadMXBean {
@SuppressWarnings("unused")
public void testThreadMXBeanMemAllocGCTracking() {
LogManager.getLogger(TestThreadMXBean.class).setLevel(Level.INFO);
- MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
System.gc();
ThreadResourceSnapshot threadResourceSnapshot0 = new
ThreadResourceSnapshot();
- long heapPrev = memoryMXBean.getHeapMemoryUsage().getUsed();
+ long heapPrev = ResourceUsageUtils.getUsedHeapSize();
for (int i = 0; i < 3; i++) {
long[] ignored = new long[100000000];
}
System.gc();
- long heapResult = memoryMXBean.getHeapMemoryUsage().getUsed() - heapPrev;
+ long heapResult = ResourceUsageUtils.getUsedHeapSize() - heapPrev;
long result = threadResourceSnapshot0.getAllocatedBytes();
LOGGER.info("Measured thread allocated bytes {}, heap used bytes {}",
result, heapResult);
diff --git
a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkWorkloadBudgetManager.java
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkWorkloadBudgetManager.java
index 05805c2bb32..3904920ec28 100644
---
a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkWorkloadBudgetManager.java
+++
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkWorkloadBudgetManager.java
@@ -19,7 +19,7 @@
package org.apache.pinot.perf;
import java.util.concurrent.TimeUnit;
-import org.apache.pinot.core.accounting.WorkloadBudgetManager;
+import org.apache.pinot.spi.accounting.WorkloadBudgetManager;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.CommonConstants;
import org.openjdk.jmh.annotations.Benchmark;
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/purge/PurgeTaskExecutor.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/purge/PurgeTaskExecutor.java
index 58b0d16c2b4..2877f1b3ccd 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/purge/PurgeTaskExecutor.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/purge/PurgeTaskExecutor.java
@@ -19,8 +19,6 @@
package org.apache.pinot.plugin.minion.tasks.purge;
import java.io.File;
-import java.lang.management.ManagementFactory;
-import java.lang.management.ThreadMXBean;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@@ -32,21 +30,18 @@ import org.apache.pinot.core.minion.PinotTaskConfig;
import org.apache.pinot.core.minion.SegmentPurger;
import
org.apache.pinot.plugin.minion.tasks.BaseSingleSegmentConversionExecutor;
import org.apache.pinot.plugin.minion.tasks.SegmentConversionResult;
+import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
public class PurgeTaskExecutor extends BaseSingleSegmentConversionExecutor {
- private static final Logger LOGGER =
LoggerFactory.getLogger(PurgeTaskExecutor.class);
protected final MinionMetrics _minionMetrics = MinionMetrics.get();
public static final String RECORD_PURGER_KEY = "recordPurger";
public static final String RECORD_MODIFIER_KEY = "recordModifier";
public static final String NUM_RECORDS_PURGED_KEY = "numRecordsPurged";
public static final String NUM_RECORDS_MODIFIED_KEY = "numRecordsModified";
- private static final ThreadMXBean MX_BEAN =
ManagementFactory.getThreadMXBean();
@Override
protected SegmentConversionResult convert(PinotTaskConfig pinotTaskConfig,
File indexDir, File workingDir)
@@ -68,9 +63,9 @@ public class PurgeTaskExecutor extends
BaseSingleSegmentConversionExecutor {
_eventObserver.notifyProgress(pinotTaskConfig, "Purging segment: " +
indexDir);
SegmentPurger segmentPurger =
new SegmentPurger(indexDir, workingDir, tableConfig, schema,
recordPurger, recordModifier, null);
- long purgeTaskStartTimeNs = MX_BEAN.getCurrentThreadCpuTime();
+ long purgeTaskStartTimeNs =
ThreadResourceUsageProvider.getCurrentThreadCpuTime();
File purgedSegmentFile = segmentPurger.purgeSegment();
- long purgeTaskEndTimeNs = MX_BEAN.getCurrentThreadCpuTime();
+ long purgeTaskEndTimeNs =
ThreadResourceUsageProvider.getCurrentThreadCpuTime();
_minionMetrics.addTimedTableValue(tableNameWithType, taskType,
MinionTimer.TASK_THREAD_CPU_TIME_NS,
purgeTaskEndTimeNs - purgeTaskStartTimeNs, TimeUnit.NANOSECONDS);
if (purgedSegmentFile == null) {
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java
index dc912371980..db41723d5be 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java
@@ -46,23 +46,6 @@ public interface ThreadResourceUsageAccountant {
return false;
}
- /**
- * This method has been deprecated and replaced by {@link
setupRunner(String, int, ThreadExecutionContext.TaskType)}
- * and {@link setupWorker(int, ThreadExecutionContext.TaskType,
ThreadExecutionContext)}.
- */
- @Deprecated
- void createExecutionContext(String queryId, int taskId,
ThreadExecutionContext.TaskType taskType,
- @Nullable ThreadExecutionContext parentContext);
-
- /**
- * Set up the thread execution context for an anchor a.k.a runner thread.
- * @param queryId query id string
- * @param taskId a unique task id
- * @param taskType the type of the task - SSE or MSE
- */
- @Deprecated
- void setupRunner(String queryId, int taskId, ThreadExecutionContext.TaskType
taskType);
-
/**
* Set up the thread execution context for an anchor a.k.a runner thread.
* @param queryId query id string
@@ -79,7 +62,7 @@ public interface ThreadResourceUsageAccountant {
* @param parentContext the parent execution context
*/
void setupWorker(int taskId, ThreadExecutionContext.TaskType taskType,
- @Nullable ThreadExecutionContext parentContext);
+ @Nullable ThreadExecutionContext parentContext);
/**
* get the executon context of current thread
@@ -87,12 +70,6 @@ public interface ThreadResourceUsageAccountant {
@Nullable
ThreadExecutionContext getThreadExecutionContext();
- /**
- * set resource usage provider
- */
- @Deprecated
- void setThreadResourceUsageProvider(ThreadResourceUsageProvider
threadResourceUsageProvider);
-
/**
* call to sample usage
*/
@@ -117,24 +94,13 @@ public interface ThreadResourceUsageAccountant {
// Default implementation does nothing. Subclasses can override to
register a cancel callback.
}
- /**
- * special interface to aggregate usage to the stats store only once, it is
used for response
- * ser/de threads where the thread execution context cannot be setup before
hands as
- * queryId/taskId is unknown and the execution process is hard to instrument
- */
- @Deprecated
- void updateQueryUsageConcurrently(String queryId, long cpuTimeNs, long
allocatedBytes);
-
/**
* special interface to aggregate usage to the stats store only once, it is
used for response
* ser/de threads where the thread execution context cannot be setup before
hands as
* queryId/taskId/workloadName is unknown and the execution process is hard
to instrument
*/
void updateQueryUsageConcurrently(String identifier, long cpuTimeNs, long
allocatedBytes,
- TrackingScope trackingScope);
-
- @Deprecated
- void updateQueryUsageConcurrently(String queryId);
+ TrackingScope trackingScope);
/**
* start the periodical task
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageProvider.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageProvider.java
index f5ebc5ed2fa..44fb9f6cea3 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageProvider.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageProvider.java
@@ -31,6 +31,9 @@ import org.slf4j.LoggerFactory;
* and allocateBytes (JVM heap) for the current thread.
*/
public class ThreadResourceUsageProvider {
+ private ThreadResourceUsageProvider() {
+ }
+
private static final Logger LOGGER =
LoggerFactory.getLogger(ThreadResourceUsageProvider.class);
// used for getting the memory allocation function in hotspot jvm through
reflection
@@ -51,14 +54,12 @@ public class ThreadResourceUsageProvider {
private static boolean _isThreadCpuTimeMeasurementEnabled = false;
private static boolean _isThreadMemoryMeasurementEnabled = false;
- @Deprecated
- public long getThreadTimeNs() {
- return 0;
+ public static int getThreadCount() {
+ return MX_BEAN.getThreadCount();
}
- @Deprecated
- public long getThreadAllocatedBytes() {
- return 0;
+ public static long getTotalStartedThreadCount() {
+ return MX_BEAN.getTotalStartedThreadCount();
}
public static long getCurrentThreadCpuTime() {
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/core/accounting/WorkloadBudgetManager.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/WorkloadBudgetManager.java
similarity index 99%
rename from
pinot-spi/src/main/java/org/apache/pinot/core/accounting/WorkloadBudgetManager.java
rename to
pinot-spi/src/main/java/org/apache/pinot/spi/accounting/WorkloadBudgetManager.java
index 5d0004dd628..dc1642a374c 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/core/accounting/WorkloadBudgetManager.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/WorkloadBudgetManager.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.core.accounting;
+package org.apache.pinot.spi.accounting;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java
index 0f2ee9deb72..6d9993cd4b7 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java
@@ -25,14 +25,13 @@ import java.util.Collections;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
-import org.apache.pinot.core.accounting.WorkloadBudgetManager;
import org.apache.pinot.spi.accounting.QueryResourceTracker;
import org.apache.pinot.spi.accounting.ThreadAccountantFactory;
import org.apache.pinot.spi.accounting.ThreadExecutionContext;
import org.apache.pinot.spi.accounting.ThreadResourceTracker;
import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
-import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
import org.apache.pinot.spi.accounting.TrackingScope;
+import org.apache.pinot.spi.accounting.WorkloadBudgetManager;
import org.apache.pinot.spi.config.instance.InstanceType;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.exception.EarlyTerminationException;
@@ -199,17 +198,6 @@ public class Tracing {
return false;
}
- @Override
- public void createExecutionContext(String queryId, int taskId,
ThreadExecutionContext.TaskType taskType,
- @Nullable ThreadExecutionContext
parentContext) {
- }
-
- @Deprecated
- public void createExecutionContextInner(@Nullable String queryId, int
taskId,
- ThreadExecutionContext.TaskType
taskType,
- @Nullable ThreadExecutionContext
parentContext) {
- }
-
@Override
public void clear() {
}
@@ -222,38 +210,19 @@ public class Tracing {
public void sampleUsageMSE() {
}
- @Deprecated
- public void setThreadResourceUsageProvider(ThreadResourceUsageProvider
threadResourceUsageProvider) {
- }
-
- @Override
- @Deprecated
- public void updateQueryUsageConcurrently(String queryId) {
- // No-op for default accountant
- }
-
- @Override
- public void updateQueryUsageConcurrently(String queryId, long cpuTimeNs,
long allocatedBytes) {
- // No-op for default accountant
- }
-
@Override
public void updateQueryUsageConcurrently(String queryId, long cpuTimeNs,
long allocatedBytes,
- TrackingScope trackingScope) {
+ TrackingScope trackingScope) {
// No-op for default accountant
}
- @Override
- public void setupRunner(String queryId, int taskId,
ThreadExecutionContext.TaskType taskType) {
- }
-
@Override
public void setupRunner(String queryId, int taskId,
ThreadExecutionContext.TaskType taskType, String workloadName) {
}
@Override
public void setupWorker(int taskId, ThreadExecutionContext.TaskType
taskType,
- @Nullable ThreadExecutionContext parentContext) {
+ @Nullable ThreadExecutionContext parentContext) {
}
@Override
@@ -293,22 +262,14 @@ public class Tracing {
private ThreadAccountantOps() {
}
- @Deprecated
- public static void setupRunner(String queryId) {
- }
-
- @Deprecated
- public static void setupRunner(String queryId,
ThreadExecutionContext.TaskType taskType) {
- }
-
public static void setupRunner(String queryId, String workloadName) {
setupRunner(queryId, ThreadExecutionContext.TaskType.SSE, workloadName);
}
public static void setupRunner(String queryId,
ThreadExecutionContext.TaskType taskType, String workloadName) {
// Set up the runner thread with the given query ID and workload name
- Tracing.getThreadAccountant().setupRunner(queryId,
CommonConstants.Accounting.ANCHOR_TASK_ID, taskType,
- workloadName);
+ Tracing.getThreadAccountant()
+ .setupRunner(queryId, CommonConstants.Accounting.ANCHOR_TASK_ID,
taskType, workloadName);
}
/**
@@ -398,24 +359,11 @@ public class Tracing {
accountant.sampleUsage();
}
- @Deprecated
- public static void updateQueryUsageConcurrently(String queryId) {
- }
-
- @Deprecated
- public static void updateQueryUsageConcurrently(String queryId, long
cpuTimeNs, long allocatedBytes) {
- Tracing.getThreadAccountant().updateQueryUsageConcurrently(queryId,
cpuTimeNs, allocatedBytes);
- }
-
public static void updateQueryUsageConcurrently(String queryId, long
cpuTimeNs, long allocatedBytes,
TrackingScope trackingScope) {
Tracing.getThreadAccountant().updateQueryUsageConcurrently(queryId,
cpuTimeNs, allocatedBytes, trackingScope);
}
- @Deprecated
- public static void setThreadResourceUsageProvider() {
- }
-
// Check for thread interruption, every time after merging 8192 keys
public static void sampleAndCheckInterruptionPeriodically(int mergedKeys) {
if ((mergedKeys & MAX_ENTRIES_KEYS_MERGED_PER_INTERRUPTION_CHECK_MASK)
== 0) {
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/ResourceUsageUtils.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/ResourceUsageUtils.java
new file mode 100644
index 00000000000..d6ba26b0e4e
--- /dev/null
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/ResourceUsageUtils.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.utils;
+
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryMXBean;
+import java.lang.management.MemoryUsage;
+
+
+public class ResourceUsageUtils {
+ private ResourceUsageUtils() {
+ }
+
+ private static final MemoryMXBean MEMORY_MX_BEAN =
ManagementFactory.getMemoryMXBean();
+
+ public static MemoryUsage getHeapMemoryUsage() {
+ return MEMORY_MX_BEAN.getHeapMemoryUsage();
+ }
+
+ public static long getMaxHeapSize() {
+ return MEMORY_MX_BEAN.getHeapMemoryUsage().getMax();
+ }
+
+ public static long getUsedHeapSize() {
+ return MEMORY_MX_BEAN.getHeapMemoryUsage().getUsed();
+ }
+}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/accounting/WorkloadBudgetManagerTest.java
b/pinot-spi/src/test/java/org/apache/pinot/spi/accounting/WorkloadBudgetManagerTest.java
similarity index 97%
rename from
pinot-core/src/test/java/org/apache/pinot/core/accounting/WorkloadBudgetManagerTest.java
rename to
pinot-spi/src/test/java/org/apache/pinot/spi/accounting/WorkloadBudgetManagerTest.java
index 21cc2df4f0b..d73ca4173a5 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/accounting/WorkloadBudgetManagerTest.java
+++
b/pinot-spi/src/test/java/org/apache/pinot/spi/accounting/WorkloadBudgetManagerTest.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.core.accounting;
+package org.apache.pinot.spi.accounting;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
@@ -26,7 +26,9 @@ import org.apache.pinot.spi.utils.CommonConstants;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-import static org.testng.Assert.*;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
public class WorkloadBudgetManagerTest {
diff --git
a/pinot-spi/src/test/java/org/apache/pinot/spi/executor/ThrottleOnCriticalHeapUsageExecutorTest.java
b/pinot-spi/src/test/java/org/apache/pinot/spi/executor/ThrottleOnCriticalHeapUsageExecutorTest.java
index bab8167e3f9..141b55129f0 100644
---
a/pinot-spi/src/test/java/org/apache/pinot/spi/executor/ThrottleOnCriticalHeapUsageExecutorTest.java
+++
b/pinot-spi/src/test/java/org/apache/pinot/spi/executor/ThrottleOnCriticalHeapUsageExecutorTest.java
@@ -30,7 +30,6 @@ import org.apache.pinot.spi.accounting.QueryResourceTracker;
import org.apache.pinot.spi.accounting.ThreadExecutionContext;
import org.apache.pinot.spi.accounting.ThreadResourceTracker;
import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
-import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
import org.apache.pinot.spi.accounting.TrackingScope;
import org.apache.pinot.spi.exception.QueryErrorCode;
import org.apache.pinot.spi.exception.QueryException;
@@ -42,9 +41,11 @@ import static org.testng.Assert.fail;
public class ThrottleOnCriticalHeapUsageExecutorTest {
@Test
- void testThrottle() throws Exception {
+ void testThrottle()
+ throws Exception {
ThreadResourceUsageAccountant accountant = new
ThreadResourceUsageAccountant() {
final AtomicLong _numCalls = new AtomicLong(0);
+
@Override
public void clear() {
}
@@ -54,15 +55,6 @@ public class ThrottleOnCriticalHeapUsageExecutorTest {
return false;
}
- @Override
- public void createExecutionContext(String queryId, int taskId,
ThreadExecutionContext.TaskType taskType,
- @Nullable ThreadExecutionContext parentContext) {
- }
-
- @Override
- public void setupRunner(String queryId, int taskId,
ThreadExecutionContext.TaskType taskType) {
- }
-
@Override
public void setupRunner(String queryId, int taskId,
ThreadExecutionContext.TaskType taskType,
String workloadName) {
@@ -79,10 +71,6 @@ public class ThrottleOnCriticalHeapUsageExecutorTest {
return null;
}
- @Override
- public void setThreadResourceUsageProvider(ThreadResourceUsageProvider
threadResourceUsageProvider) {
- }
-
@Override
public void sampleUsage() {
}
@@ -96,14 +84,6 @@ public class ThrottleOnCriticalHeapUsageExecutorTest {
return _numCalls.getAndIncrement() > 1;
}
- @Override
- public void updateQueryUsageConcurrently(String queryId, long cpuTimeNs,
long allocatedBytes) {
- }
-
- @Override
- public void updateQueryUsageConcurrently(String queryId) {
- }
-
@Override
public void updateQueryUsageConcurrently(String queryId, long cpuTimeNs,
long allocatedBytes,
TrackingScope trackingScope) {
@@ -129,8 +109,8 @@ public class ThrottleOnCriticalHeapUsageExecutorTest {
}
};
- ThrottleOnCriticalHeapUsageExecutor executor = new
ThrottleOnCriticalHeapUsageExecutor(
- Executors.newCachedThreadPool(), accountant);
+ ThrottleOnCriticalHeapUsageExecutor executor =
+ new
ThrottleOnCriticalHeapUsageExecutor(Executors.newCachedThreadPool(),
accountant);
CyclicBarrier barrier = new CyclicBarrier(2);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]