This is an automated email from the ASF dual-hosted git repository.
manishswaminathan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new b5afb89074 Make ThreadResourceUsageProvider a Helper/Utility Class.
(#16051)
b5afb89074 is described below
commit b5afb890741909c84e327c1d5a088c685502373d
Author: Rajat Venkatesh <[email protected]>
AuthorDate: Fri Jun 20 11:04:29 2025 +0530
Make ThreadResourceUsageProvider a Helper/Utility Class. (#16051)
* ThreadResourceUsageProvider is a helper class. ThreadResourceContext
tracks resource usage.
Fix updateConcurrently
* Rename to ThreadResourceSnapshot
* Clean up
* Add javadoc
* Done use auto closeable
* Checkstyle
* Fix compilation error
* Add back removed functions in SPI
* Remove private constructor because japicmp complains.
* Add setThreadResourceUsageProvider because of backward-incompatible checks
* Add setThreadResourceUsageProvider because of backward-incompatible checks
* Fix test
* Fix ThreadResourceSnapshot usage and tests
* Store cpu sample in nanoseconds.
---
.../pinot/common/datatable/DataTableImplV4.java | 10 ++--
.../CPUMemThreadLevelAccountingObjects.java | 18 +++++++
.../PerQueryCPUMemAccountantFactory.java | 42 ++++++---------
.../core/operator/InstanceResponseOperator.java | 9 ++--
.../core/operator/combine/BaseCombineOperator.java | 9 ++--
.../pinot/core/transport/DataTableHandler.java | 6 ++-
.../pinot/core/accounting/TestThreadMXBean.java | 41 +++++++-------
.../perf/BenchmarkThreadResourceUsageProvider.java | 9 ++--
.../spi/accounting/ThreadResourceSnapshot.java | 63 ++++++++++++++++++++++
.../accounting/ThreadResourceUsageAccountant.java | 4 ++
.../accounting/ThreadResourceUsageProvider.java | 42 ++++++---------
.../java/org/apache/pinot/spi/trace/Tracing.java | 24 ++++++---
12 files changed, 177 insertions(+), 100 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableImplV4.java
b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableImplV4.java
index c97e6a06e3..f709f79a72 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableImplV4.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableImplV4.java
@@ -34,6 +34,7 @@ import org.apache.pinot.common.datablock.DataBlockUtils;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.HashUtil;
import org.apache.pinot.common.utils.RoaringBitmapUtils;
+import org.apache.pinot.spi.accounting.ThreadResourceSnapshot;
import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
import org.apache.pinot.spi.trace.Tracing;
import org.apache.pinot.spi.utils.BigDecimalUtils;
@@ -415,7 +416,7 @@ public class DataTableImplV4 implements DataTable {
@Override
public byte[] toBytes()
throws IOException {
- ThreadResourceUsageProvider threadTimer = new
ThreadResourceUsageProvider();
+ ThreadResourceSnapshot resourceSnapshot = new ThreadResourceSnapshot();
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
DataOutputStream dataOutputStream = new
DataOutputStream(byteArrayOutputStream);
@@ -425,13 +426,12 @@ public class DataTableImplV4 implements DataTable {
// TODO: The check on cpu time and memory measurement is not needed. We
can remove it. But keeping it around for
// backward compatibility.
if (ThreadResourceUsageProvider.isThreadCpuTimeMeasurementEnabled()) {
- long responseSerializationCpuTimeNs = threadTimer.getThreadTimeNs();
- getMetadata().put(MetadataKey.RESPONSE_SER_CPU_TIME_NS.getName(),
String.valueOf(responseSerializationCpuTimeNs));
+ getMetadata().put(MetadataKey.RESPONSE_SER_CPU_TIME_NS.getName(),
+ String.valueOf(resourceSnapshot.getCpuTimeNs()));
}
if (ThreadResourceUsageProvider.isThreadMemoryMeasurementEnabled()) {
- long responseSerializationAllocatedBytes =
threadTimer.getThreadAllocatedBytes();
getMetadata().put(MetadataKey.RESPONSE_SER_MEM_ALLOCATED_BYTES.getName(),
- String.valueOf(responseSerializationAllocatedBytes));
+ String.valueOf(resourceSnapshot.getAllocatedBytes()));
}
// Write metadata: length followed by actual metadata bytes.
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/CPUMemThreadLevelAccountingObjects.java
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/CPUMemThreadLevelAccountingObjects.java
index eb7ac33e2b..94cd00eef5 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/CPUMemThreadLevelAccountingObjects.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/CPUMemThreadLevelAccountingObjects.java
@@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.pinot.spi.accounting.ThreadExecutionContext;
+import org.apache.pinot.spi.accounting.ThreadResourceSnapshot;
import org.apache.pinot.spi.accounting.ThreadResourceTracker;
import org.apache.pinot.spi.utils.CommonConstants;
@@ -43,6 +44,9 @@ public class CPUMemThreadLevelAccountingObjects {
volatile long _currentThreadCPUTimeSampleMS = 0;
volatile long _currentThreadMemoryAllocationSampleBytes = 0;
+ // reference point for start time/bytes
+ private final ThreadResourceSnapshot _threadResourceSnapshot = new
ThreadResourceSnapshot();
+
// previous query_id, task_id of the thread, this field should only be
accessed by the accountant
TaskEntry _previousThreadTaskStatus = null;
// previous cpu time and memory allocation of the thread
@@ -113,6 +117,20 @@ public class CPUMemThreadLevelAccountingObjects {
public void setThreadTaskStatus(String queryId, int taskId,
ThreadExecutionContext.TaskType taskType,
@Nonnull Thread anchorThread) {
_currentThreadTaskStatus.set(new TaskEntry(queryId, taskId, taskType,
anchorThread));
+ _threadResourceSnapshot.reset();
+ }
+
+ /**
+ * Note that the precision does not match the name of the variable.
+ * _currentThreadCPUTimeSampleMS is in nanoseconds, but the variable name
suggests milliseconds.
+ * This is to maintain backward compatibility. It replaces code that set
the value in nanoseconds.
+ */
+ public void updateCpuSnapshot() {
+ _currentThreadCPUTimeSampleMS = _threadResourceSnapshot.getCpuTimeNs();
+ }
+
+ public void updateMemorySnapshot() {
+ _currentThreadMemoryAllocationSampleBytes =
_threadResourceSnapshot.getAllocatedBytes();
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
index 2430ce7895..df110358cb 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
@@ -114,9 +114,6 @@ public class PerQueryCPUMemAccountantFactory implements
ThreadAccountantFactory
}
);
- // ThreadResourceUsageProvider(ThreadMXBean wrapper) per runner/worker
thread
- private final ThreadLocal<ThreadResourceUsageProvider>
_threadResourceUsageProvider;
-
// track thread cpu time
private final boolean _isThreadCPUSamplingEnabled;
@@ -168,9 +165,6 @@ public class PerQueryCPUMemAccountantFactory implements
ThreadAccountantFactory
CommonConstants.Accounting.DEFAULT_ENABLE_THREAD_SAMPLING_MSE);
LOGGER.info("_isThreadSamplingEnabledForMSE: {}",
_isThreadSamplingEnabledForMSE);
- // ThreadMXBean wrapper
- _threadResourceUsageProvider = new ThreadLocal<>();
-
// task/query tracking
_inactiveQuery = new HashSet<>();
@@ -277,19 +271,26 @@ public class PerQueryCPUMemAccountantFactory implements
ThreadAccountantFactory
}
@Override
- public void updateQueryUsageConcurrently(String queryId) {
+ @Deprecated
+ public void setThreadResourceUsageProvider(ThreadResourceUsageProvider
threadResourceUsageProvider) {
+ }
+
+ @Override
+ public void updateQueryUsageConcurrently(String queryId, long cpuTimeNs,
long memoryAllocatedBytes) {
if (_isThreadCPUSamplingEnabled) {
- long cpuUsageNS = getThreadResourceUsageProvider().getThreadTimeNs();
_concurrentTaskCPUStatsAggregator.compute(queryId,
- (key, value) -> (value == null) ? cpuUsageNS : (value +
cpuUsageNS));
+ (key, value) -> (value == null) ? cpuTimeNs : (value + cpuTimeNs));
}
if (_isThreadMemorySamplingEnabled) {
- long memoryAllocatedBytes =
getThreadResourceUsageProvider().getThreadAllocatedBytes();
_concurrentTaskMemStatsAggregator.compute(queryId,
(key, value) -> (value == null) ? memoryAllocatedBytes : (value +
memoryAllocatedBytes));
}
}
+ @Override
+ @Deprecated
+ public void updateQueryUsageConcurrently(String queryId) {
+ }
/**
* The thread would need to do {@code setThreadResourceUsageProvider}
first upon it is scheduled.
@@ -297,9 +298,8 @@ public class PerQueryCPUMemAccountantFactory implements
ThreadAccountantFactory
*/
@SuppressWarnings("ConstantConditions")
public void sampleThreadCPUTime() {
- ThreadResourceUsageProvider provider = getThreadResourceUsageProvider();
- if (_isThreadCPUSamplingEnabled && provider != null) {
- _threadLocalEntry.get()._currentThreadCPUTimeSampleMS =
provider.getThreadTimeNs();
+ if (_isThreadCPUSamplingEnabled) {
+ _threadLocalEntry.get().updateCpuSnapshot();
}
}
@@ -309,21 +309,11 @@ public class PerQueryCPUMemAccountantFactory implements
ThreadAccountantFactory
*/
@SuppressWarnings("ConstantConditions")
public void sampleThreadBytesAllocated() {
- ThreadResourceUsageProvider provider = getThreadResourceUsageProvider();
- if (_isThreadMemorySamplingEnabled && provider != null) {
- _threadLocalEntry.get()._currentThreadMemoryAllocationSampleBytes =
provider.getThreadAllocatedBytes();
+ if (_isThreadMemorySamplingEnabled) {
+ _threadLocalEntry.get().updateMemorySnapshot();
}
}
- private ThreadResourceUsageProvider getThreadResourceUsageProvider() {
- return _threadResourceUsageProvider.get();
- }
-
- @Override
- public void setThreadResourceUsageProvider(ThreadResourceUsageProvider
threadResourceUsageProvider) {
- _threadResourceUsageProvider.set(threadResourceUsageProvider);
- }
-
@Override
public void setupRunner(@Nullable String queryId, int taskId,
ThreadExecutionContext.TaskType taskType) {
_threadLocalEntry.get()._errorStatus.set(null);
@@ -362,8 +352,6 @@ public class PerQueryCPUMemAccountantFactory implements
ThreadAccountantFactory
CPUMemThreadLevelAccountingObjects.ThreadEntry threadEntry =
_threadLocalEntry.get();
// clear task info + stats
threadEntry.setToIdle();
- // clear threadResourceUsageProvider
- _threadResourceUsageProvider.remove();
}
@Override
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/InstanceResponseOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/InstanceResponseOperator.java
index 9e8fa43b41..6def833b06 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/InstanceResponseOperator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/InstanceResponseOperator.java
@@ -30,7 +30,7 @@ import
org.apache.pinot.core.operator.combine.BaseCombineOperator;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.segment.spi.FetchContext;
import org.apache.pinot.segment.spi.SegmentContext;
-import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
+import org.apache.pinot.spi.accounting.ThreadResourceSnapshot;
import org.apache.pinot.spi.exception.EarlyTerminationException;
import org.apache.pinot.spi.exception.QueryErrorCode;
import org.apache.pinot.spi.exception.QueryErrorMessage;
@@ -102,13 +102,12 @@ public class InstanceResponseOperator extends
BaseOperator<InstanceResponseBlock
protected BaseResultsBlock getBaseBlock() {
long startWallClockTimeNs = System.nanoTime();
- ThreadResourceUsageProvider mainThreadResourceUsageProvider = new
ThreadResourceUsageProvider();
+ ThreadResourceSnapshot resourceSnapshot = new ThreadResourceSnapshot();
BaseResultsBlock resultsBlock = getCombinedResults();
- // No-ops if CPU time measurement and/or memory allocation measurements
are not enabled.
- long mainThreadCpuTimeNs =
mainThreadResourceUsageProvider.getThreadTimeNs();
- long mainThreadMemAllocatedBytes =
mainThreadResourceUsageProvider.getThreadAllocatedBytes();
+ long mainThreadCpuTimeNs = resourceSnapshot.getCpuTimeNs();
+ long mainThreadMemAllocatedBytes = resourceSnapshot.getAllocatedBytes();
long totalWallClockTimeNs = System.nanoTime() - startWallClockTimeNs;
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
index 8a1a90dffe..c6a9070d6f 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
@@ -34,7 +34,7 @@ import
org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.core.util.QueryMultiThreadingUtils;
import org.apache.pinot.core.util.trace.TraceRunnable;
import org.apache.pinot.spi.accounting.ThreadExecutionContext;
-import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
+import org.apache.pinot.spi.accounting.ThreadResourceSnapshot;
import org.apache.pinot.spi.exception.EarlyTerminationException;
import org.apache.pinot.spi.exception.QueryErrorCode;
import org.apache.pinot.spi.exception.QueryErrorMessage;
@@ -103,8 +103,7 @@ public abstract class BaseCombineOperator<T extends
BaseResultsBlock> extends Ba
_futures[i] = _executorService.submit(new TraceRunnable() {
@Override
public void runJob() {
- ThreadResourceUsageProvider threadResourceUsageProvider = new
ThreadResourceUsageProvider();
-
+ ThreadResourceSnapshot resourceSnapshot = new
ThreadResourceSnapshot();
Tracing.ThreadAccountantOps.setupWorker(taskId, parentContext);
// Register the task to the phaser
@@ -136,8 +135,8 @@ public abstract class BaseCombineOperator<T extends
BaseResultsBlock> extends Ba
Tracing.ThreadAccountantOps.clear();
}
-
_totalWorkerThreadCpuTimeNs.getAndAdd(threadResourceUsageProvider.getThreadTimeNs());
-
_totalWorkerThreadMemAllocatedBytes.getAndAdd(threadResourceUsageProvider.getThreadAllocatedBytes());
+
_totalWorkerThreadCpuTimeNs.getAndAdd(resourceSnapshot.getCpuTimeNs());
+
_totalWorkerThreadMemAllocatedBytes.getAndAdd(resourceSnapshot.getAllocatedBytes());
}
});
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/transport/DataTableHandler.java
b/pinot-core/src/main/java/org/apache/pinot/core/transport/DataTableHandler.java
index 3a117a4991..f5c9252d3a 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/transport/DataTableHandler.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/transport/DataTableHandler.java
@@ -25,6 +25,7 @@ import org.apache.pinot.common.datatable.DataTable;
import org.apache.pinot.common.datatable.DataTableFactory;
import org.apache.pinot.common.metrics.BrokerMeter;
import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.spi.accounting.ThreadResourceSnapshot;
import org.apache.pinot.spi.trace.Tracing;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -62,16 +63,17 @@ public class DataTableHandler extends
SimpleChannelInboundHandler<ByteBuf> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
- Tracing.ThreadAccountantOps.setThreadResourceUsageProvider();
int responseSize = msg.readableBytes();
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.NETTY_CONNECTION_BYTES_RECEIVED,
responseSize);
try {
long deserializationStartTimeMs = System.currentTimeMillis();
+ ThreadResourceSnapshot resourceSnapshot = new ThreadResourceSnapshot();
DataTable dataTable = DataTableFactory.getDataTable(msg.nioBuffer());
_queryRouter.receiveDataTable(_serverRoutingInstance, dataTable,
responseSize,
(int) (System.currentTimeMillis() - deserializationStartTimeMs));
long requestID =
Long.parseLong(dataTable.getMetadata().get(DataTable.MetadataKey.REQUEST_ID.getName()));
-
Tracing.ThreadAccountantOps.updateQueryUsageConcurrently(String.valueOf(requestID));
+
Tracing.ThreadAccountantOps.updateQueryUsageConcurrently(String.valueOf(requestID),
+ resourceSnapshot.getCpuTimeNs(),
resourceSnapshot.getAllocatedBytes());
} catch (Exception e) {
LOGGER.error("Caught exception while deserializing data table of size:
{} from server: {}", responseSize,
_serverRoutingInstance, e);
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/accounting/TestThreadMXBean.java
b/pinot-core/src/test/java/org/apache/pinot/core/accounting/TestThreadMXBean.java
index 6ca2a8c9e6..572c2afaa8 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/accounting/TestThreadMXBean.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/accounting/TestThreadMXBean.java
@@ -26,6 +26,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
+import org.apache.pinot.spi.accounting.ThreadResourceSnapshot;
import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,11 +50,11 @@ public class TestThreadMXBean {
@Test
public void testThreadMXBeanSimpleMemAllocTracking() {
if (ThreadResourceUsageProvider.isThreadMemoryMeasurementEnabled()) {
- ThreadResourceUsageProvider threadResourceUsageProvider = new
ThreadResourceUsageProvider();
+ ThreadResourceSnapshot threadResourceSnapshot = new
ThreadResourceSnapshot();
long[] ll = new long[10000];
ll[2] = 4;
LOGGER.trace(String.valueOf(ll[2]));
- long result = threadResourceUsageProvider.getThreadAllocatedBytes();
+ long result = threadResourceSnapshot.getAllocatedBytes();
Assert.assertTrue(result >= 80000 && result <= 85000);
}
}
@@ -75,29 +76,29 @@ public class TestThreadMXBean {
System.gc();
long heapPrev = memoryMXBean.getHeapMemoryUsage().getUsed();
- ThreadResourceUsageProvider threadResourceUsageProvider0 = new
ThreadResourceUsageProvider();
+ ThreadResourceSnapshot threadResourceSnapshot0 = new
ThreadResourceSnapshot();
executor.submit(() -> {
- ThreadResourceUsageProvider threadResourceUsageProvider = new
ThreadResourceUsageProvider();
+ ThreadResourceSnapshot threadResourceSnapshot = new
ThreadResourceSnapshot();
for (int i = 0; i < 100000; i++) {
concurrentHashMap.put(i, i);
}
- a.set(threadResourceUsageProvider.getThreadAllocatedBytes());
+ a.set(threadResourceSnapshot.getAllocatedBytes());
});
executor.submit(() -> {
- ThreadResourceUsageProvider threadResourceUsageProvider = new
ThreadResourceUsageProvider();
+ ThreadResourceSnapshot threadResourceSnapshot = new
ThreadResourceSnapshot();
for (int i = 100000; i < 200000; i++) {
concurrentHashMap.put(i, i);
}
- b.set(threadResourceUsageProvider.getThreadAllocatedBytes());
+ b.set(threadResourceSnapshot.getAllocatedBytes());
});
executor.submit(() -> {
- ThreadResourceUsageProvider threadResourceUsageProvider = new
ThreadResourceUsageProvider();
+ ThreadResourceSnapshot threadResourceSnapshot = new
ThreadResourceSnapshot();
for (int i = 0; i < 200000; i++) {
concurrentHashMap2.put(i, i);
}
- c.set(threadResourceUsageProvider.getThreadAllocatedBytes());
+ c.set(threadResourceSnapshot.getAllocatedBytes());
});
try {
@@ -105,7 +106,7 @@ public class TestThreadMXBean {
} catch (InterruptedException ignored) {
}
- long d = threadResourceUsageProvider0.getThreadAllocatedBytes();
+ long d = threadResourceSnapshot0.getAllocatedBytes();
long threadAllocatedBytes = a.get() + b.get() + c.get() + d;
float heapUsedBytes = (float)
memoryMXBean.getHeapMemoryUsage().getUsed() - heapPrev;
float ratio = threadAllocatedBytes / heapUsedBytes;
@@ -132,29 +133,29 @@ public class TestThreadMXBean {
System.gc();
long heapPrev = memoryMXBean.getHeapMemoryUsage().getUsed();
- ThreadResourceUsageProvider threadResourceUsageProvider0 = new
ThreadResourceUsageProvider();
+ ThreadResourceSnapshot threadResourceSnapshot0 = new
ThreadResourceSnapshot();
executor.submit(() -> {
- ThreadResourceUsageProvider threadResourceUsageProvider = new
ThreadResourceUsageProvider();
+ ThreadResourceSnapshot threadResourceSnapshot = new
ThreadResourceSnapshot();
for (int i = 0; i < 100; i++) {
concurrentHashMap.put(i, new NestedArray());
}
- a.set(threadResourceUsageProvider.getThreadAllocatedBytes());
+ a.set(threadResourceSnapshot.getAllocatedBytes());
});
executor.submit(() -> {
- ThreadResourceUsageProvider threadResourceUsageProvider = new
ThreadResourceUsageProvider();
+ ThreadResourceSnapshot threadResourceSnapshot = new
ThreadResourceSnapshot();
for (int i = 100; i < 200; i++) {
concurrentHashMap.put(i, new NestedArray());
}
- b.set(threadResourceUsageProvider.getThreadAllocatedBytes());
+ b.set(threadResourceSnapshot.getAllocatedBytes());
});
executor.submit(() -> {
- ThreadResourceUsageProvider threadResourceUsageProvider = new
ThreadResourceUsageProvider();
+ ThreadResourceSnapshot threadResourceSnapshot = new
ThreadResourceSnapshot();
for (int i = 0; i < 200; i++) {
concurrentHashMap2.put(i, new NestedArray());
}
- c.set(threadResourceUsageProvider.getThreadAllocatedBytes());
+ c.set(threadResourceSnapshot.getAllocatedBytes());
});
try {
@@ -162,7 +163,7 @@ public class TestThreadMXBean {
} catch (InterruptedException ignored) {
}
- long d = threadResourceUsageProvider0.getThreadAllocatedBytes();
+ long d = threadResourceSnapshot0.getAllocatedBytes();
long threadAllocatedBytes = a.get() + b.get() + c.get() + d;
float heapUsedBytes = (float)
memoryMXBean.getHeapMemoryUsage().getUsed() - heapPrev;
float ratio = threadAllocatedBytes / heapUsedBytes;
@@ -181,14 +182,14 @@ public class TestThreadMXBean {
LogManager.getLogger(TestThreadMXBean.class).setLevel(Level.INFO);
MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
System.gc();
- ThreadResourceUsageProvider threadResourceUsageProvider0 = new
ThreadResourceUsageProvider();
+ ThreadResourceSnapshot threadResourceSnapshot0 = new
ThreadResourceSnapshot();
long heapPrev = memoryMXBean.getHeapMemoryUsage().getUsed();
for (int i = 0; i < 3; i++) {
long[] ignored = new long[100000000];
}
System.gc();
long heapResult = memoryMXBean.getHeapMemoryUsage().getUsed() - heapPrev;
- long result = threadResourceUsageProvider0.getThreadAllocatedBytes();
+ long result = threadResourceSnapshot0.getAllocatedBytes();
LOGGER.info("Measured thread allocated bytes {}, heap used bytes {}",
result, heapResult);
}
diff --git
a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkThreadResourceUsageProvider.java
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkThreadResourceUsageProvider.java
index ea82389e14..2c93f06771 100644
---
a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkThreadResourceUsageProvider.java
+++
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkThreadResourceUsageProvider.java
@@ -19,6 +19,7 @@
package org.apache.pinot.perf;
import java.util.concurrent.TimeUnit;
+import org.apache.pinot.spi.accounting.ThreadResourceSnapshot;
import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
@@ -65,24 +66,24 @@ public class BenchmarkThreadResourceUsageProvider {
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public void benchThreadMXBeanThreadCPUTime(MyState myState, Blackhole bh) {
- bh.consume(myState._threadResourceUsageProvider.getThreadTimeNs());
+ bh.consume(myState._threadResourceSnapshot.getCpuTimeNs());
}
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public void benchThreadMXBeanMem(MyState myState, Blackhole bh) {
- bh.consume(myState._threadResourceUsageProvider.getThreadAllocatedBytes());
+ bh.consume(myState._threadResourceSnapshot.getAllocatedBytes());
}
@State(Scope.Benchmark)
public static class MyState {
- ThreadResourceUsageProvider _threadResourceUsageProvider;
+ ThreadResourceSnapshot _threadResourceSnapshot;
long[] _allocation;
@Setup(Level.Iteration)
public void doSetup() {
- _threadResourceUsageProvider = new ThreadResourceUsageProvider();
+ _threadResourceSnapshot = new ThreadResourceSnapshot();
}
@Setup(Level.Invocation)
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceSnapshot.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceSnapshot.java
new file mode 100644
index 0000000000..befad65a21
--- /dev/null
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceSnapshot.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.accounting;
+
+/**
+ * ThreadResourceSnapshot is a utility class that helps to track the CPU time
and memory allocated.
+ * {@link ThreadResourceUsageProvider} provides cumulative CPU time and memory
allocated for the current thread.
+ * This class uses that provider to snapshot start for a task executed by that
thread.
+ */
+public class ThreadResourceSnapshot {
+ private long _startCpuTime;
+ private long _startAllocatedBytes;
+
+ /**
+ * Creates a new tracker and takes initial snapshots.
+ */
+ public ThreadResourceSnapshot() {
+ reset();
+ }
+
+ public void reset() {
+ _startCpuTime = ThreadResourceUsageProvider.getCurrentThreadCpuTime();
+ _startAllocatedBytes =
ThreadResourceUsageProvider.getCurrentThreadAllocatedBytes();
+ }
+
+ /**
+ * Gets the CPU time used so far in nanoseconds.
+ * This is the difference between the current CPU time and the start CPU
time.
+ */
+ public long getCpuTimeNs() {
+ return ThreadResourceUsageProvider.getCurrentThreadCpuTime() -
_startCpuTime;
+ }
+
+ /**
+ * Gets the memory allocated so far in bytes.
+ * This is the difference between the current allocated bytes and the start
allocated bytes.
+ */
+ public long getAllocatedBytes() {
+ return ThreadResourceUsageProvider.getCurrentThreadAllocatedBytes() -
_startAllocatedBytes;
+ }
+
+ @Override
+ public String toString() {
+ return "ThreadResourceSnapshot{" + "cpuTime=" + (getCpuTimeNs()) + ",
allocatedBytes="
+ + (getAllocatedBytes()) + '}';
+ }
+}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java
index 6a33ac3ac0..dd9a8e9c3f 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java
@@ -70,6 +70,7 @@ public interface ThreadResourceUsageAccountant {
/**
* set resource usage provider
*/
+ @Deprecated
void setThreadResourceUsageProvider(ThreadResourceUsageProvider
threadResourceUsageProvider);
/**
@@ -87,6 +88,9 @@ public interface ThreadResourceUsageAccountant {
* ser/de threads where the thread execution context cannot be setup before
hands as
* queryId/taskId is unknown and the execution process is hard to instrument
*/
+ void updateQueryUsageConcurrently(String queryId, long cpuTimeNs, long
allocatedBytes);
+
+ @Deprecated
void updateQueryUsageConcurrently(String queryId);
/**
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageProvider.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageProvider.java
index a103fe73c5..f5ebc5ed2f 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageProvider.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageProvider.java
@@ -51,22 +51,28 @@ public class ThreadResourceUsageProvider {
private static boolean _isThreadCpuTimeMeasurementEnabled = false;
private static boolean _isThreadMemoryMeasurementEnabled = false;
- // reference point for start time/bytes
- private final long _startTimeNs;
- private final long _startBytesAllocated;
+ @Deprecated
+ public long getThreadTimeNs() {
+ return 0;
+ }
+
+ @Deprecated
+ public long getThreadAllocatedBytes() {
+ return 0;
+ }
- public ThreadResourceUsageProvider() {
- _startTimeNs = _isThreadCpuTimeMeasurementEnabled ?
MX_BEAN.getCurrentThreadCpuTime() : -1;
+ public static long getCurrentThreadCpuTime() {
+ return _isThreadCpuTimeMeasurementEnabled ?
MX_BEAN.getCurrentThreadCpuTime() : 0;
+ }
- long startBytesAllocated1;
+ public static long getCurrentThreadAllocatedBytes() {
try {
- startBytesAllocated1 = _isThreadMemoryMeasurementEnabled
- ? (long)
SUN_THREAD_MXBEAN_GET_BYTES_ALLOCATED_METHOD.invoke(MX_BEAN,
Thread.currentThread().getId()) : -1;
+ return _isThreadMemoryMeasurementEnabled ? (long)
SUN_THREAD_MXBEAN_GET_BYTES_ALLOCATED_METHOD
+ .invoke(MX_BEAN, Thread.currentThread().getId()) : 0;
} catch (IllegalAccessException | InvocationTargetException e) {
- startBytesAllocated1 = -1;
- LOGGER.error("Exception happened during the invocation of getting
initial bytes allocated", e);
+ LOGGER.error("Exception happened during the invocation of getting
current bytes allocated", e);
+ return 0;
}
- _startBytesAllocated = startBytesAllocated1;
}
public static boolean isThreadCpuTimeMeasurementEnabled() {
@@ -100,20 +106,6 @@ public class ThreadResourceUsageProvider {
_isThreadMemoryMeasurementEnabled = enable &&
IS_THREAD_ALLOCATED_MEMORY_SUPPORTED && isThreadAllocateMemoryEnabled;
}
- public long getThreadTimeNs() {
- return _isThreadCpuTimeMeasurementEnabled ?
MX_BEAN.getCurrentThreadCpuTime() - _startTimeNs : 0;
- }
-
- public long getThreadAllocatedBytes() {
- try {
- return _isThreadMemoryMeasurementEnabled ? (long)
SUN_THREAD_MXBEAN_GET_BYTES_ALLOCATED_METHOD
- .invoke(MX_BEAN, Thread.currentThread().getId()) -
_startBytesAllocated : 0;
- } catch (IllegalAccessException | InvocationTargetException e) {
- LOGGER.error("Exception happened during the invocation of getting
initial bytes allocated", e);
- return 0;
- }
- }
-
//initialize the com.sun.management.ThreadMXBean related variables using
reflection
static {
Class<?> sunThreadMXBeanClass;
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java
index 198d083c57..551cf2968f 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java
@@ -191,19 +191,27 @@ public class Tracing {
}
@Override
- public void setThreadResourceUsageProvider(ThreadResourceUsageProvider
threadResourceUsageProvider) {
+ public void sampleUsage() {
}
@Override
- public void sampleUsage() {
+ public void sampleUsageMSE() {
}
@Override
- public void sampleUsageMSE() {
+ @Deprecated
+ public void setThreadResourceUsageProvider(ThreadResourceUsageProvider
threadResourceUsageProvider) {
}
@Override
+ @Deprecated
public void updateQueryUsageConcurrently(String queryId) {
+ // No-op for default accountant
+ }
+
+ @Override
+ public void updateQueryUsageConcurrently(String queryId, long cpuTimeNs,
long allocatedBytes) {
+ // No-op for default accountant
}
@Override
@@ -256,7 +264,6 @@ public class Tracing {
}
public static void setupRunner(String queryId,
ThreadExecutionContext.TaskType taskType) {
- Tracing.getThreadAccountant().setThreadResourceUsageProvider(new
ThreadResourceUsageProvider());
Tracing.getThreadAccountant().setupRunner(queryId,
CommonConstants.Accounting.ANCHOR_TASK_ID, taskType);
}
@@ -276,7 +283,6 @@ public class Tracing {
*/
public static void setupWorker(int taskId, ThreadExecutionContext.TaskType
taskType,
@Nullable ThreadExecutionContext threadExecutionContext) {
- Tracing.getThreadAccountant().setThreadResourceUsageProvider(new
ThreadResourceUsageProvider());
Tracing.getThreadAccountant().setupWorker(taskId, taskType,
threadExecutionContext);
}
@@ -326,12 +332,16 @@ public class Tracing {
sample();
}
+ @Deprecated
public static void updateQueryUsageConcurrently(String queryId) {
- Tracing.getThreadAccountant().updateQueryUsageConcurrently(queryId);
}
+ public static void updateQueryUsageConcurrently(String queryId, long
cpuTimeNs, long allocatedBytes) {
+ Tracing.getThreadAccountant().updateQueryUsageConcurrently(queryId,
cpuTimeNs, allocatedBytes);
+ }
+
+ @Deprecated
public static void setThreadResourceUsageProvider() {
- Tracing.getThreadAccountant().setThreadResourceUsageProvider(new
ThreadResourceUsageProvider());
}
// Check for thread interruption, every time after merging 8192 keys
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]