This is an automated email from the ASF dual-hosted git repository.
lakshsingla pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 7c625356c52 Add logging for sketches on workers (#16697)
7c625356c52 is described below
commit 7c625356c52b2a45edf75b28755e70e6dc0da826
Author: Adarsh Sanjeev <[email protected]>
AuthorDate: Tue Jul 9 14:37:43 2024 +0530
Add logging for sketches on workers (#16697)
Improve the logging of sketches on workers.
---
.../src/main/java/org/apache/druid/msq/exec/WorkerImpl.java | 1 +
.../org/apache/druid/msq/kernel/GlobalSortMaxCountShuffleSpec.java | 1 +
.../apache/druid/msq/kernel/GlobalSortTargetSizeShuffleSpec.java | 1 +
.../apache/druid/msq/statistics/ClusterByStatisticsCollector.java | 5 +++++
.../druid/msq/statistics/ClusterByStatisticsCollectorImpl.java | 7 ++-----
5 files changed, 10 insertions(+), 5 deletions(-)
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java
index 84453eaf98b..61939d82373 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java
@@ -1772,6 +1772,7 @@ public class WorkerImpl implements Worker
@Override
public void onSuccess(final ClusterByStatisticsCollector result)
{
+ result.logSketches();
kernelManipulationQueue.add(
holder ->
holder.getStageKernelMap().get(stageDefinition.getId())
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/GlobalSortMaxCountShuffleSpec.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/GlobalSortMaxCountShuffleSpec.java
index 8be2108a57a..e38fc778bb8 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/GlobalSortMaxCountShuffleSpec.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/GlobalSortMaxCountShuffleSpec.java
@@ -100,6 +100,7 @@ public class GlobalSortMaxCountShuffleSpec implements
GlobalSortShuffleSpec
} else if (maxPartitions > maxNumPartitions) {
return Either.error((long) maxPartitions);
} else {
+ collector.logSketches();
final ClusterByPartitions generatedPartitions =
collector.generatePartitionsWithMaxCount(maxPartitions);
if (generatedPartitions.size() <= maxNumPartitions) {
return Either.value(generatedPartitions);
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/GlobalSortTargetSizeShuffleSpec.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/GlobalSortTargetSizeShuffleSpec.java
index 61ae457d626..7e03a0664a8 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/GlobalSortTargetSizeShuffleSpec.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/GlobalSortTargetSizeShuffleSpec.java
@@ -99,6 +99,7 @@ public class GlobalSortTargetSizeShuffleSpec implements
GlobalSortShuffleSpec
if (expectedPartitions > maxNumPartitions) {
return Either.error(expectedPartitions);
} else {
+ collector.logSketches();
final ClusterByPartitions generatedPartitions =
collector.generatePartitionsWithTargetWeight(targetSize);
if (generatedPartitions.size() <= maxNumPartitions) {
return Either.value(generatedPartitions);
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/ClusterByStatisticsCollector.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/ClusterByStatisticsCollector.java
index fb8c8232fb8..971bf757321 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/ClusterByStatisticsCollector.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/ClusterByStatisticsCollector.java
@@ -90,6 +90,11 @@ public interface ClusterByStatisticsCollector
*/
ClusterByPartitions generatePartitionsWithMaxCount(int maxNumPartitions);
+ /**
+ * Logs some information regarding the collector. This is useful in seeing
which sketches were downsampled the most.
+ */
+ void logSketches();
+
/**
* Returns an immutable, JSON-serializable snapshot of this collector.
*/
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/ClusterByStatisticsCollectorImpl.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/ClusterByStatisticsCollectorImpl.java
index aad5d3d5483..d89a4e80952 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/ClusterByStatisticsCollectorImpl.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/ClusterByStatisticsCollectorImpl.java
@@ -243,8 +243,6 @@ public class ClusterByStatisticsCollectorImpl implements
ClusterByStatisticsColl
@Override
public ClusterByPartitions generatePartitionsWithTargetWeight(final long
targetWeight)
{
- logSketches();
-
if (targetWeight < 1) {
throw new IAE("Target weight must be positive");
}
@@ -288,8 +286,6 @@ public class ClusterByStatisticsCollectorImpl implements
ClusterByStatisticsColl
@Override
public ClusterByPartitions generatePartitionsWithMaxCount(final int
maxNumPartitions)
{
- logSketches();
-
if (maxNumPartitions < 1) {
throw new IAE("Must have at least one partition");
} else if (buckets.isEmpty()) {
@@ -331,7 +327,8 @@ public class ClusterByStatisticsCollectorImpl implements
ClusterByStatisticsColl
return ranges;
}
- private void logSketches()
+ @Override
+ public void logSketches()
{
if (log.isDebugEnabled()) {
// Log all sketches
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]