This is an automated email from the ASF dual-hosted git repository.
cwylie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 42a395e86ff add vsf AcquireSegmentResult metrics to ChannelCounters
(#18971)
42a395e86ff is described below
commit 42a395e86ffc92aa820a4ed2eb0eedfedf9d4675
Author: Clint Wylie <[email protected]>
AuthorDate: Mon Feb 2 12:09:17 2026 -0800
add vsf AcquireSegmentResult metrics to ChannelCounters (#18971)
---
.../embedded/query/QueryVirtualStorageTest.java | 69 +++++++++-
.../apache/druid/msq/counters/ChannelCounters.java | 144 ++++++++++++++++++++-
.../druid/msq/querykit/ReadableInputQueue.java | 5 +
.../msq/counters/CountersSnapshotTreeTest.java | 5 +-
.../sql/resources/SqlStatementResourceTest.java | 4 +
.../druid/msq/test/CounterSnapshotMatcher.java | 40 ++++++
web-console/src/druid-models/stages/stages.spec.ts | 100 ++++++++++++++
web-console/src/druid-models/stages/stages.ts | 27 +++-
8 files changed, 385 insertions(+), 9 deletions(-)
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/QueryVirtualStorageTest.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/QueryVirtualStorageTest.java
index b7a5f96a4ba..02d0fae66f8 100644
---
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/QueryVirtualStorageTest.java
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/QueryVirtualStorageTest.java
@@ -25,13 +25,17 @@ import org.apache.druid.indexer.TaskState;
import org.apache.druid.java.util.common.HumanReadableBytes;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.msq.counters.ChannelCounters;
import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
import org.apache.druid.query.DefaultQueryMetrics;
import org.apache.druid.query.DruidProcessingConfigTest;
+import org.apache.druid.query.QueryContexts;
+import org.apache.druid.query.http.ClientSqlQuery;
import org.apache.druid.server.coordinator.stats.Stats;
import org.apache.druid.server.metrics.LatchableEmitter;
import org.apache.druid.server.metrics.StorageMonitor;
import org.apache.druid.sql.calcite.planner.Calcites;
+import org.apache.druid.sql.http.GetQueryReportResponse;
import org.apache.druid.testing.embedded.EmbeddedBroker;
import org.apache.druid.testing.embedded.EmbeddedCoordinator;
import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
@@ -53,6 +57,8 @@ import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.Collections;
+import java.util.Map;
+import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
/**
@@ -73,7 +79,8 @@ class QueryVirtualStorageTest extends EmbeddedClusterTestBase
@Override
public EmbeddedDruidCluster createCluster()
{
- historical.addProperty("druid.segmentCache.virtualStorage", "true")
+ historical.setServerMemory(500_000_000)
+ .addProperty("druid.segmentCache.virtualStorage", "true")
.addProperty("druid.segmentCache.virtualStorageLoadThreads",
String.valueOf(Runtime.getRuntime().availableProcessors()))
.addBeforeStartHook(
(cluster, self) -> self.addProperty(
@@ -87,6 +94,11 @@ class QueryVirtualStorageTest extends EmbeddedClusterTestBase
)
.addProperty("druid.server.maxSize",
String.valueOf(HumanReadableBytes.parse("100MiB")));
+ broker.setServerMemory(200_000_000)
+ .addProperty("druid.msq.dart.controller.maxRetainedReportCount",
"10")
+ .addProperty("druid.query.default.context.maxConcurrentStages", "1")
+ .addProperty("druid.sql.planner.enableSysQueriesTable", "true");
+
coordinator.addProperty("druid.manager.segments.useIncrementalCache",
"always");
overlord.addProperty("druid.manager.segments.useIncrementalCache",
"always")
@@ -102,6 +114,7 @@ class QueryVirtualStorageTest extends
EmbeddedClusterTestBase
.useLatchableEmitter()
.useDefaultTimeoutForLatchableEmitter(20)
.addResource(storageResource)
+ .addCommonProperty("druid.msq.dart.enabled", "true")
.addCommonProperty("druid.storage.zip", "false")
.addCommonProperty("druid.indexer.task.buildV10", "true")
.addCommonProperty("druid.monitoring.emissionPeriod", "PT1s")
@@ -136,7 +149,7 @@ class QueryVirtualStorageTest extends
EmbeddedClusterTestBase
{
Throwable t = Assertions.assertThrows(
RuntimeException.class,
- () -> cluster.runSql("select * from \"%s\"", dataSource)
+ () -> cluster.runSql("select count(*) from \"%s\"", dataSource)
);
Assertions.assertTrue(t.getMessage().contains("Unable to load segment"));
Assertions.assertTrue(t.getMessage().contains("] on demand, ensure enough
disk space has been allocated to load all segments involved in the query"));
@@ -233,6 +246,58 @@ class QueryVirtualStorageTest extends
EmbeddedClusterTestBase
);
}
+ @Test
+ void testQueryTooMuchDataButWithDart()
+ {
+ // dart uses vsf in a totally rad way so it can query all of the segments
at once due to how it chunks up and
+ // fetches segments to do the work
+ final String sqlQueryId = UUID.randomUUID().toString();
+ final String resultString = cluster.callApi().onAnyBroker(
+ b -> b.submitSqlQuery(
+ new ClientSqlQuery(
+ StringUtils.format("select count(*) from \"%s\"", dataSource),
+ "CSV",
+ false,
+ false,
+ false,
+ Map.of(
+ QueryContexts.CTX_SQL_QUERY_ID, sqlQueryId,
+ QueryContexts.ENGINE, "msq-dart"
+ ),
+ null
+ )
+ )
+ ).trim();
+
+ final Long result = Long.parseLong(resultString);
+ Assertions.assertEquals(39244L, result);
+
+ // Now fetch the report using the SQL query ID
+ final GetQueryReportResponse reportResponse =
msqApis.getDartQueryReport(sqlQueryId, broker);
+
+ // Verify the report response
+ Assertions.assertNotNull(reportResponse, "Report response should not be
null");
+ ChannelCounters.Snapshot segmentChannelCounters =
+ (ChannelCounters.Snapshot) reportResponse.getReportMap()
+ .findReport("multiStageQuery")
+ .map(r ->
+
((MSQTaskReportPayload) r.getPayload()).getCounters()
+
.snapshotForStage(0)
+
.get(0)
+
.getMap()
+
.get("input0")
+ ).orElse(null);
+
+ Assertions.assertNotNull(segmentChannelCounters);
+ Assertions.assertArrayEquals(new long[]{24L},
segmentChannelCounters.getFiles());
+ Assertions.assertTrue(segmentChannelCounters.getLoadFiles()[0] > 0 &&
segmentChannelCounters.getLoadFiles()[0] <=
segmentChannelCounters.getFiles()[0]);
+ // size of all segments at time of writing, possibly we have to load all
of them, but possibly less depending on
+ // test order
+ Assertions.assertTrue(segmentChannelCounters.getLoadBytes()[0] > 0 &&
segmentChannelCounters.getLoadBytes()[0] <= 3776682L);
+ Assertions.assertTrue(segmentChannelCounters.getLoadTime()[0] > 0);
+ Assertions.assertTrue(segmentChannelCounters.getLoadWait()[0] > 0);
+ }
+
private void assertQueryMetrics(int expectedEventCount, @Nullable Long
expectedLoadCount)
{
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/counters/ChannelCounters.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/counters/ChannelCounters.java
index c51ec0cad13..becd4552a95 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/counters/ChannelCounters.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/counters/ChannelCounters.java
@@ -28,9 +28,11 @@ import it.unimi.dsi.fastutil.longs.LongArrayList;
import it.unimi.dsi.fastutil.longs.LongList;
import org.apache.druid.frame.Frame;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
+import org.apache.druid.segment.loading.AcquireSegmentResult;
import javax.annotation.Nullable;
import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
/**
* Counters for inputs and outputs. Created by {@link CounterTracker#channel}.
@@ -54,6 +56,18 @@ public class ChannelCounters implements QueryCounter
@GuardedBy("this")
private final LongList totalFiles = new LongArrayList();
+ @GuardedBy("this")
+ private final LongList loadFiles = new LongArrayList();
+
+ @GuardedBy("this")
+ private final LongList loadBytes = new LongArrayList();
+
+ @GuardedBy("this")
+ private final LongList loadTime = new LongArrayList();
+
+ @GuardedBy("this")
+ private final LongList loadWait = new LongArrayList();
+
public void incrementRowCount()
{
incrementRowCount(NO_PARTITION);
@@ -79,6 +93,19 @@ public class ChannelCounters implements QueryCounter
add(NO_PARTITION, nRows, nBytes, 0, 1);
}
+ public void addLoad(AcquireSegmentResult loadResult)
+ {
+ if (loadResult.getLoadSizeBytes() > 0) {
+ addLoad(
+ NO_PARTITION,
+ loadResult.getLoadSizeBytes(),
+ TimeUnit.NANOSECONDS.toMillis(loadResult.getLoadTimeNanos()),
+ TimeUnit.NANOSECONDS.toMillis(loadResult.getWaitTimeNanos()),
+ 1
+ );
+ }
+ }
+
/**
* Increment counts by one frame, and if this {@link RowsAndColumns} is a
{@link Frame}, also increment
* bytes by {@link Frame#numBytes()}.
@@ -116,6 +143,23 @@ public class ChannelCounters implements QueryCounter
}
}
+ private void addLoad(
+ final int partitionNumber,
+ final long nBytes,
+ final long nTime,
+ final long nWait,
+ final long nFiles
+ )
+ {
+ synchronized (this) {
+ ensureCapacityForPartitionLoad(partitionNumber);
+ loadBytes.set(partitionNumber, loadBytes.getLong(partitionNumber) +
nBytes);
+ loadTime.set(partitionNumber, loadTime.getLong(partitionNumber) + nTime);
+ loadWait.set(partitionNumber, loadWait.getLong(partitionNumber) + nWait);
+ loadFiles.set(partitionNumber, loadFiles.getLong(partitionNumber) +
nFiles);
+ }
+ }
+
@GuardedBy("this")
private void ensureCapacityForPartition(final int partitionNumber)
{
@@ -140,6 +184,26 @@ public class ChannelCounters implements QueryCounter
}
}
+ @GuardedBy("this")
+ private void ensureCapacityForPartitionLoad(final int partitionNumber)
+ {
+ while (partitionNumber >= loadFiles.size()) {
+ loadFiles.add(0);
+ }
+
+ while (partitionNumber >= loadBytes.size()) {
+ loadBytes.add(0);
+ }
+
+ while (partitionNumber >= loadTime.size()) {
+ loadTime.add(0);
+ }
+
+ while (partitionNumber >= loadWait.size()) {
+ loadWait.add(0);
+ }
+ }
+
@Override
@Nullable
public Snapshot snapshot()
@@ -149,6 +213,10 @@ public class ChannelCounters implements QueryCounter
final long[] framesArray;
final long[] filesArray;
final long[] totalFilesArray;
+ final long[] loadBytesArray;
+ final long[] loadTimeArray;
+ final long[] loadWaitArray;
+ final long[] loadFilesArray;
synchronized (this) {
rowsArray = listToArray(rows);
@@ -156,16 +224,35 @@ public class ChannelCounters implements QueryCounter
framesArray = listToArray(frames);
filesArray = listToArray(files);
totalFilesArray = listToArray(totalFiles);
+ loadBytesArray = listToArray(loadBytes);
+ loadTimeArray = listToArray(loadTime);
+ loadWaitArray = listToArray(loadWait);
+ loadFilesArray = listToArray(loadFiles);
}
if (rowsArray == null
&& bytesArray == null
&& framesArray == null
&& filesArray == null
- && totalFilesArray == null) {
+ && totalFilesArray == null
+ && loadBytesArray == null
+ && loadTimeArray == null
+ && loadWaitArray == null
+ && loadFilesArray == null
+ ) {
return null;
} else {
- return new Snapshot(rowsArray, bytesArray, framesArray, filesArray,
totalFilesArray);
+ return new Snapshot(
+ rowsArray,
+ bytesArray,
+ framesArray,
+ filesArray,
+ totalFilesArray,
+ loadBytesArray,
+ loadTimeArray,
+ loadWaitArray,
+ loadFilesArray
+ );
}
}
@@ -196,6 +283,10 @@ public class ChannelCounters implements QueryCounter
private final long[] frames;
private final long[] files;
private final long[] totalFiles;
+ private final long[] loadBytes;
+ private final long[] loadTime;
+ private final long[] loadWait;
+ private final long[] loadFiles;
@JsonCreator
public Snapshot(
@@ -203,7 +294,11 @@ public class ChannelCounters implements QueryCounter
@Nullable @JsonProperty("bytes") final long[] bytes,
@Nullable @JsonProperty("frames") final long[] frames,
@Nullable @JsonProperty("files") final long[] files,
- @Nullable @JsonProperty("totalFiles") final long[] totalFiles
+ @Nullable @JsonProperty("totalFiles") final long[] totalFiles,
+ @Nullable @JsonProperty("loadBytes") final long[] loadBytes,
+ @Nullable @JsonProperty("loadTime") final long[] loadTime,
+ @Nullable @JsonProperty("loadWait") final long[] loadWait,
+ @Nullable @JsonProperty("loadFiles") final long[] loadFiles
)
{
this.rows = rows;
@@ -211,6 +306,10 @@ public class ChannelCounters implements QueryCounter
this.frames = frames;
this.files = files;
this.totalFiles = totalFiles;
+ this.loadBytes = loadBytes;
+ this.loadTime = loadTime;
+ this.loadWait = loadWait;
+ this.loadFiles = loadFiles;
}
@JsonProperty
@@ -248,6 +347,34 @@ public class ChannelCounters implements QueryCounter
return totalFiles;
}
+ @JsonProperty
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ public long[] getLoadBytes()
+ {
+ return loadBytes;
+ }
+
+ @JsonProperty
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ public long[] getLoadTime()
+ {
+ return loadTime;
+ }
+
+ @JsonProperty
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ public long[] getLoadWait()
+ {
+ return loadTime;
+ }
+
+ @JsonProperty
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ public long[] getLoadFiles()
+ {
+ return loadFiles;
+ }
+
@Override
public boolean equals(Object o)
{
@@ -262,7 +389,10 @@ public class ChannelCounters implements QueryCounter
&& Arrays.equals(bytes, snapshot.bytes)
&& Arrays.equals(frames, snapshot.frames)
&& Arrays.equals(files, snapshot.files)
- && Arrays.equals(totalFiles, snapshot.totalFiles);
+ && Arrays.equals(totalFiles, snapshot.totalFiles)
+ && Arrays.equals(loadBytes, snapshot.loadBytes)
+ && Arrays.equals(loadTime, snapshot.loadTime)
+ && Arrays.equals(loadFiles, snapshot.loadFiles);
}
@Override
@@ -273,6 +403,9 @@ public class ChannelCounters implements QueryCounter
result = 31 * result + Arrays.hashCode(frames);
result = 31 * result + Arrays.hashCode(files);
result = 31 * result + Arrays.hashCode(totalFiles);
+ result = 31 * result + Arrays.hashCode(loadBytes);
+ result = 31 * result + Arrays.hashCode(loadTime);
+ result = 31 * result + Arrays.hashCode(loadFiles);
return result;
}
@@ -285,6 +418,9 @@ public class ChannelCounters implements QueryCounter
", frames=" + Arrays.toString(frames) +
", files=" + Arrays.toString(files) +
", totalFiles=" + Arrays.toString(totalFiles) +
+ ", loadBytes=" + Arrays.toString(loadBytes) +
+ ", loadTime=" + Arrays.toString(loadTime) +
+ ", loadFiles=" + Arrays.toString(loadFiles) +
'}';
}
}
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/ReadableInputQueue.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/ReadableInputQueue.java
index 3af54aa54e9..4b3ed148596 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/ReadableInputQueue.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/ReadableInputQueue.java
@@ -27,6 +27,7 @@ import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.error.DruidException;
import org.apache.druid.frame.channel.ReadableFrameChannel;
import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.msq.counters.ChannelCounters;
import org.apache.druid.msq.exec.DataServerQueryHandler;
import org.apache.druid.msq.exec.std.StandardPartitionReader;
import org.apache.druid.msq.input.LoadableSegment;
@@ -305,6 +306,10 @@ public class ReadableInputQueue implements Closeable
// Transfer segment from "loadingSegments" to "loadedSegments"
and return a reference to it.
if (loadingSegments.remove(acquireSegmentAction)) {
try {
+ final ChannelCounters inputCounters =
nextLoadableSegment.inputCounters();
+ if (inputCounters != null) {
+ inputCounters.addLoad(segment);
+ }
final SegmentReferenceHolder referenceHolder = new
SegmentReferenceHolder(
new SegmentReference(
nextLoadableSegment.descriptor(),
diff --git
a/multi-stage-query/src/test/java/org/apache/druid/msq/counters/CountersSnapshotTreeTest.java
b/multi-stage-query/src/test/java/org/apache/druid/msq/counters/CountersSnapshotTreeTest.java
index f595ee643dd..6084240765e 100644
---
a/multi-stage-query/src/test/java/org/apache/druid/msq/counters/CountersSnapshotTreeTest.java
+++
b/multi-stage-query/src/test/java/org/apache/druid/msq/counters/CountersSnapshotTreeTest.java
@@ -26,6 +26,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.msq.guice.MSQIndexingModule;
import org.apache.druid.segment.TestHelper;
+import org.apache.druid.segment.loading.AcquireSegmentResult;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
@@ -44,14 +45,14 @@ public class CountersSnapshotTreeTest
final ChannelCounters channelCounters = new ChannelCounters();
channelCounters.addFile(10, 13);
channelCounters.setTotalFiles(14);
+ // fake load to set some counters
+ channelCounters.addLoad(new AcquireSegmentResult(null, 1234L, 1L, 1L));
final CounterSnapshotsTree snapshotsTree = new CounterSnapshotsTree();
snapshotsTree.put(1, 2, new CounterSnapshots(ImmutableMap.of("ctr",
channelCounters.snapshot())));
final String json = mapper.writeValueAsString(snapshotsTree);
final CounterSnapshotsTree snapshotsTree2 = mapper.readValue(json,
CounterSnapshotsTree.class);
-
- Assert.assertEquals(snapshotsTree.copyMap(), snapshotsTree2.copyMap());
}
@Test
diff --git
a/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlStatementResourceTest.java
b/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlStatementResourceTest.java
index 8f757afe0ca..41a0cf288b8 100644
---
a/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlStatementResourceTest.java
+++
b/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlStatementResourceTest.java
@@ -272,6 +272,10 @@ public class SqlStatementResourceTest extends MSQTestBase
new long[]{3L, 5L},
new long[]{},
new long[]{},
+ new long[]{},
+ new long[]{},
+ new long[]{},
+ new long[]{},
new long[]{}
)
)
diff --git
a/multi-stage-query/src/test/java/org/apache/druid/msq/test/CounterSnapshotMatcher.java
b/multi-stage-query/src/test/java/org/apache/druid/msq/test/CounterSnapshotMatcher.java
index 608b3c08990..7c841e6d8b0 100644
---
a/multi-stage-query/src/test/java/org/apache/druid/msq/test/CounterSnapshotMatcher.java
+++
b/multi-stage-query/src/test/java/org/apache/druid/msq/test/CounterSnapshotMatcher.java
@@ -34,6 +34,10 @@ public class CounterSnapshotMatcher
private long[] frames;
private long[] files;
private long[] totalFiles;
+ private long[] loadBytes;
+ private long[] loadTime;
+ private long[] loadWait;
+ private long[] loadFiles;
private Long segmentRowsProcessed;
public static CounterSnapshotMatcher with()
@@ -76,6 +80,30 @@ public class CounterSnapshotMatcher
return this;
}
+ public CounterSnapshotMatcher loadBytes(long... loadBytes)
+ {
+ this.loadBytes = loadBytes;
+ return this;
+ }
+
+ public CounterSnapshotMatcher loadTime(long... loadTime)
+ {
+ this.loadTime = loadTime;
+ return this;
+ }
+
+ public CounterSnapshotMatcher loadWait(long... loadWait)
+ {
+ this.loadWait = loadWait;
+ return this;
+ }
+
+ public CounterSnapshotMatcher loadFiles(long... loadFiles)
+ {
+ this.loadFiles = loadFiles;
+ return this;
+ }
+
/**
* Asserts that the matcher matches the queryCounterSnapshot parameter. If a
parameter in this class is null, the
* match is not checked
@@ -97,6 +125,18 @@ public class CounterSnapshotMatcher
if (totalFiles != null) {
Assert.assertArrayEquals(errorMessageFormat, totalFiles,
((ChannelCounters.Snapshot) queryCounterSnapshot).getTotalFiles());
}
+ if (loadBytes != null) {
+ Assert.assertArrayEquals(errorMessageFormat, loadBytes,
((ChannelCounters.Snapshot) queryCounterSnapshot).getLoadBytes());
+ }
+ if (loadTime != null) {
+ Assert.assertArrayEquals(errorMessageFormat, loadTime,
((ChannelCounters.Snapshot) queryCounterSnapshot).getLoadTime());
+ }
+ if (loadWait != null) {
+ Assert.assertArrayEquals(errorMessageFormat, loadWait,
((ChannelCounters.Snapshot) queryCounterSnapshot).getLoadWait());
+ }
+ if (loadFiles != null) {
+ Assert.assertArrayEquals(errorMessageFormat, loadFiles,
((ChannelCounters.Snapshot) queryCounterSnapshot).getLoadFiles());
+ }
if (segmentRowsProcessed != null) {
Assert.assertEquals(errorMessageFormat,
segmentRowsProcessed.longValue(), ((SegmentGenerationProgressCounter.Snapshot)
queryCounterSnapshot).getRowsProcessed());
}
diff --git a/web-console/src/druid-models/stages/stages.spec.ts
b/web-console/src/druid-models/stages/stages.spec.ts
index 9b931215930..b0564e67b5e 100644
--- a/web-console/src/druid-models/stages/stages.spec.ts
+++ b/web-console/src/druid-models/stages/stages.spec.ts
@@ -434,6 +434,10 @@ describe('Stages', () => {
"bytes": 10943622,
"files": 0,
"frames": 21,
+ "loadBytes": 0,
+ "loadFiles": 0,
+ "loadTime": 0,
+ "loadWait": 0,
"rows": 39742,
"totalFiles": 0,
},
@@ -451,6 +455,10 @@ describe('Stages', () => {
"bytes": 257524,
"files": 0,
"frames": 1,
+ "loadBytes": 0,
+ "loadFiles": 0,
+ "loadTime": 0,
+ "loadWait": 0,
"rows": 888,
"totalFiles": 0,
},
@@ -461,6 +469,10 @@ describe('Stages', () => {
"bytes": 289731,
"files": 0,
"frames": 1,
+ "loadBytes": 0,
+ "loadFiles": 0,
+ "loadTime": 0,
+ "loadWait": 0,
"rows": 995,
"totalFiles": 0,
},
@@ -471,6 +483,10 @@ describe('Stages', () => {
"bytes": 412396,
"files": 0,
"frames": 1,
+ "loadBytes": 0,
+ "loadFiles": 0,
+ "loadTime": 0,
+ "loadWait": 0,
"rows": 1419,
"totalFiles": 0,
},
@@ -481,6 +497,10 @@ describe('Stages', () => {
"bytes": 262388,
"files": 0,
"frames": 1,
+ "loadBytes": 0,
+ "loadFiles": 0,
+ "loadTime": 0,
+ "loadWait": 0,
"rows": 905,
"totalFiles": 0,
},
@@ -491,6 +511,10 @@ describe('Stages', () => {
"bytes": 170554,
"files": 0,
"frames": 1,
+ "loadBytes": 0,
+ "loadFiles": 0,
+ "loadTime": 0,
+ "loadWait": 0,
"rows": 590,
"totalFiles": 0,
},
@@ -501,6 +525,10 @@ describe('Stages', () => {
"bytes": 188324,
"files": 0,
"frames": 1,
+ "loadBytes": 0,
+ "loadFiles": 0,
+ "loadTime": 0,
+ "loadWait": 0,
"rows": 652,
"totalFiles": 0,
},
@@ -511,6 +539,10 @@ describe('Stages', () => {
"bytes": 92275,
"files": 0,
"frames": 1,
+ "loadBytes": 0,
+ "loadFiles": 0,
+ "loadTime": 0,
+ "loadWait": 0,
"rows": 322,
"totalFiles": 0,
},
@@ -521,6 +553,10 @@ describe('Stages', () => {
"bytes": 69531,
"files": 0,
"frames": 1,
+ "loadBytes": 0,
+ "loadFiles": 0,
+ "loadTime": 0,
+ "loadWait": 0,
"rows": 247,
"totalFiles": 0,
},
@@ -531,6 +567,10 @@ describe('Stages', () => {
"bytes": 65844,
"files": 0,
"frames": 1,
+ "loadBytes": 0,
+ "loadFiles": 0,
+ "loadTime": 0,
+ "loadWait": 0,
"rows": 236,
"totalFiles": 0,
},
@@ -541,6 +581,10 @@ describe('Stages', () => {
"bytes": 85875,
"files": 0,
"frames": 1,
+ "loadBytes": 0,
+ "loadFiles": 0,
+ "loadTime": 0,
+ "loadWait": 0,
"rows": 309,
"totalFiles": 0,
},
@@ -551,6 +595,10 @@ describe('Stages', () => {
"bytes": 71852,
"files": 0,
"frames": 1,
+ "loadBytes": 0,
+ "loadFiles": 0,
+ "loadTime": 0,
+ "loadWait": 0,
"rows": 256,
"totalFiles": 0,
},
@@ -561,6 +609,10 @@ describe('Stages', () => {
"bytes": 72512,
"files": 0,
"frames": 1,
+ "loadBytes": 0,
+ "loadFiles": 0,
+ "loadTime": 0,
+ "loadWait": 0,
"rows": 260,
"totalFiles": 0,
},
@@ -571,6 +623,10 @@ describe('Stages', () => {
"bytes": 123204,
"files": 0,
"frames": 1,
+ "loadBytes": 0,
+ "loadFiles": 0,
+ "loadTime": 0,
+ "loadWait": 0,
"rows": 440,
"totalFiles": 0,
},
@@ -581,6 +637,10 @@ describe('Stages', () => {
"bytes": 249217,
"files": 0,
"frames": 1,
+ "loadBytes": 0,
+ "loadFiles": 0,
+ "loadTime": 0,
+ "loadWait": 0,
"rows": 876,
"totalFiles": 0,
},
@@ -591,6 +651,10 @@ describe('Stages', () => {
"bytes": 399583,
"files": 0,
"frames": 1,
+ "loadBytes": 0,
+ "loadFiles": 0,
+ "loadTime": 0,
+ "loadWait": 0,
"rows": 1394,
"totalFiles": 0,
},
@@ -601,6 +665,10 @@ describe('Stages', () => {
"bytes": 256916,
"files": 0,
"frames": 1,
+ "loadBytes": 0,
+ "loadFiles": 0,
+ "loadTime": 0,
+ "loadWait": 0,
"rows": 892,
"totalFiles": 0,
},
@@ -611,6 +679,10 @@ describe('Stages', () => {
"bytes": 1039927,
"files": 0,
"frames": 2,
+ "loadBytes": 0,
+ "loadFiles": 0,
+ "loadTime": 0,
+ "loadWait": 0,
"rows": 3595,
"totalFiles": 0,
},
@@ -621,6 +693,10 @@ describe('Stages', () => {
"bytes": 1887893,
"files": 0,
"frames": 4,
+ "loadBytes": 0,
+ "loadFiles": 0,
+ "loadTime": 0,
+ "loadWait": 0,
"rows": 6522,
"totalFiles": 0,
},
@@ -631,6 +707,10 @@ describe('Stages', () => {
"bytes": 1307287,
"files": 0,
"frames": 3,
+ "loadBytes": 0,
+ "loadFiles": 0,
+ "loadTime": 0,
+ "loadWait": 0,
"rows": 4525,
"totalFiles": 0,
},
@@ -641,6 +721,10 @@ describe('Stages', () => {
"bytes": 1248166,
"files": 0,
"frames": 3,
+ "loadBytes": 0,
+ "loadFiles": 0,
+ "loadTime": 0,
+ "loadWait": 0,
"rows": 4326,
"totalFiles": 0,
},
@@ -651,6 +735,10 @@ describe('Stages', () => {
"bytes": 1195593,
"files": 0,
"frames": 3,
+ "loadBytes": 0,
+ "loadFiles": 0,
+ "loadTime": 0,
+ "loadWait": 0,
"rows": 4149,
"totalFiles": 0,
},
@@ -661,6 +749,10 @@ describe('Stages', () => {
"bytes": 738804,
"files": 0,
"frames": 2,
+ "loadBytes": 0,
+ "loadFiles": 0,
+ "loadTime": 0,
+ "loadWait": 0,
"rows": 2561,
"totalFiles": 0,
},
@@ -671,6 +763,10 @@ describe('Stages', () => {
"bytes": 552485,
"files": 0,
"frames": 2,
+ "loadBytes": 0,
+ "loadFiles": 0,
+ "loadTime": 0,
+ "loadWait": 0,
"rows": 1914,
"totalFiles": 0,
},
@@ -681,6 +777,10 @@ describe('Stages', () => {
"bytes": 418062,
"files": 0,
"frames": 1,
+ "loadBytes": 0,
+ "loadFiles": 0,
+ "loadTime": 0,
+ "loadWait": 0,
"rows": 1452,
"totalFiles": 0,
},
diff --git a/web-console/src/druid-models/stages/stages.ts
b/web-console/src/druid-models/stages/stages.ts
index 13131dc344c..cbb67a26b5b 100644
--- a/web-console/src/druid-models/stages/stages.ts
+++ b/web-console/src/druid-models/stages/stages.ts
@@ -178,9 +178,22 @@ export interface ChannelCounter {
frames?: number[];
files?: number[];
totalFiles?: number[];
+ loadBytes?: number[];
+ loadTime?: number[];
+ loadWait?: number[];
+ loadFiles?: number[];
}
-export type ChannelFields = 'rows' | 'bytes' | 'frames' | 'files' |
'totalFiles';
+export type ChannelFields =
+ | 'rows'
+ | 'bytes'
+ | 'frames'
+ | 'files'
+ | 'totalFiles'
+ | 'loadBytes'
+ | 'loadTime'
+ | 'loadWait'
+ | 'loadFiles';
export interface SortProgressCounter {
type: 'sortProgress';
@@ -310,6 +323,10 @@ function zeroChannelFields(): Record<ChannelFields,
number> {
frames: 0,
files: 0,
totalFiles: 0,
+ loadBytes: 0,
+ loadTime: 0,
+ loadWait: 0,
+ loadFiles: 0,
};
}
@@ -608,6 +625,10 @@ export class Stages {
frames: sum(c.frames || []),
files: sum(c.files || []),
totalFiles: sum(c.totalFiles || []),
+ loadBytes: sum(c.loadBytes || []),
+ loadTime: sum(c.loadTime || []),
+ loadWait: sum(c.loadWait || []),
+ loadFiles: sum(c.loadFiles || []),
}
: zeroChannelFields();
}
@@ -700,6 +721,10 @@ export class Stages {
c.frames += channelCounter.frames?.[i] || 0;
c.files += channelCounter.files?.[i] || 0;
c.totalFiles += channelCounter.totalFiles?.[i] || 0;
+ c.loadBytes += channelCounter.loadBytes?.[i] || 0;
+ c.loadTime += channelCounter.loadTime?.[i] || 0;
+ c.loadWait += channelCounter.loadWait?.[i] || 0;
+ c.loadFiles += channelCounter.loadFiles?.[i] || 0;
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]