This is an automated email from the ASF dual-hosted git repository.

suneet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new ad437dd  Add shuffle metrics for parallel indexing (#10359)
ad437dd is described below

commit ad437dd655ef24d8d9a4120fa8e99885ec353eb3
Author: Jihoon Son <[email protected]>
AuthorDate: Sat Oct 10 19:35:17 2020 -0700

    Add shuffle metrics for parallel indexing (#10359)
    
    * Add shuffle metrics for parallel indexing
    
    * javadoc and concurrency test
    
    * concurrency
    
    * fix javadoc
    
    * Feature flag
    
    * doc
    
    * fix doc and add a test
    
    * checkstyle
    
    * add tests
    
    * fix build and address comments
---
 core/pom.xml                                       |   6 +
 .../apache/druid/java/util/metrics/Monitor.java    |   5 +
 .../druid/java/util/metrics/MonitorScheduler.java  |  12 ++
 .../java/util/metrics/MonitorSchedulerTest.java    |  86 +++++++++
 docs/operations/metrics.md                         |  12 +-
 indexing-service/pom.xml                           |   6 +
 .../apache/druid/indexing/common/TaskToolbox.java  |   2 +-
 .../druid/indexing/common/TaskToolboxFactory.java  |   2 +-
 .../parallel/ParallelIndexSupervisorTask.java      |   3 +-
 .../parallel/PartialHashSegmentGenerateTask.java   |   3 +-
 .../parallel/PartialRangeSegmentGenerateTask.java  |   2 +-
 .../batch/parallel/PartialSegmentGenerateTask.java |   2 +-
 .../common/task/batch/parallel/ShuffleClient.java  |   4 +-
 .../{ => shuffle}/IntermediaryDataManager.java     |   7 +-
 .../{ => shuffle}/ShuffleDataSegmentPusher.java    |   2 +-
 .../indexing/worker/shuffle/ShuffleMetrics.java    | 121 ++++++++++++
 .../indexing/worker/shuffle/ShuffleModule.java     |  60 ++++++
 .../indexing/worker/shuffle/ShuffleMonitor.java    |  67 +++++++
 .../worker/{http => shuffle}/ShuffleResource.java  |   9 +-
 .../AbstractParallelIndexSupervisorTaskTest.java   |   2 +-
 .../IntermediaryDataManagerAutoCleanupTest.java    |   2 +-
 ...ermediaryDataManagerManualAddAndDeleteTest.java |   2 +-
 .../ShuffleDataSegmentPusherTest.java              |   2 +-
 .../worker/shuffle/ShuffleMetricsTest.java         | 186 ++++++++++++++++++
 .../indexing/worker/shuffle/ShuffleModuleTest.java |  85 +++++++++
 .../worker/shuffle/ShuffleMonitorTest.java         |  68 +++++++
 .../worker/shuffle/ShuffleResourceTest.java        | 212 +++++++++++++++++++++
 .../main/java/org/apache/druid/cli/CliIndexer.java |   4 +-
 .../org/apache/druid/cli/CliMiddleManager.java     |   5 +-
 website/.spelling                                  |   1 +
 30 files changed, 956 insertions(+), 24 deletions(-)

diff --git a/core/pom.xml b/core/pom.xml
index efec90a..e0ccfa1 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -331,6 +331,12 @@
       <groupId>com.google.errorprone</groupId>
       <artifactId>error_prone_annotations</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-core</artifactId>
+      <version>${mockito.version}</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>
diff --git a/core/src/main/java/org/apache/druid/java/util/metrics/Monitor.java 
b/core/src/main/java/org/apache/druid/java/util/metrics/Monitor.java
index ac926be..2ccd5db 100644
--- a/core/src/main/java/org/apache/druid/java/util/metrics/Monitor.java
+++ b/core/src/main/java/org/apache/druid/java/util/metrics/Monitor.java
@@ -29,5 +29,10 @@ public interface Monitor
 
   void stop();
 
+  /**
+   * Emit metrics using the given emitter.
+   *
+   * @return true if this monitor needs to continue monitoring. False 
otherwise.
+   */
   boolean monitor(ServiceEmitter emitter);
 }
diff --git 
a/core/src/main/java/org/apache/druid/java/util/metrics/MonitorScheduler.java 
b/core/src/main/java/org/apache/druid/java/util/metrics/MonitorScheduler.java
index 118f283..2adbe95 100644
--- 
a/core/src/main/java/org/apache/druid/java/util/metrics/MonitorScheduler.java
+++ 
b/core/src/main/java/org/apache/druid/java/util/metrics/MonitorScheduler.java
@@ -27,6 +27,7 @@ import 
org.apache.druid.java.util.common.lifecycle.LifecycleStop;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 
 import java.util.List;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ScheduledExecutorService;
@@ -93,6 +94,17 @@ public class MonitorScheduler
     }
   }
 
+  /**
+   * Returns a {@link Monitor} instance of the given class if any. Note that 
this method searches for the monitor
+   * from the current snapshot of {@link #monitors}.
+   */
+  public <T extends Monitor> Optional<T> findMonitor(Class<T> monitorClass)
+  {
+    synchronized (lock) {
+      return (Optional<T>) monitors.stream().filter(m -> m.getClass() == 
monitorClass).findFirst();
+    }
+  }
+
   @LifecycleStop
   public void stop()
   {
diff --git 
a/core/src/test/java/org/apache/druid/java/util/metrics/MonitorSchedulerTest.java
 
b/core/src/test/java/org/apache/druid/java/util/metrics/MonitorSchedulerTest.java
new file mode 100644
index 0000000..7667196
--- /dev/null
+++ 
b/core/src/test/java/org/apache/druid/java/util/metrics/MonitorSchedulerTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.java.util.metrics;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.Optional;
+
+public class MonitorSchedulerTest
+{
+  @Test
+  public void testFindMonitor()
+  {
+    class Monitor1 extends NoopMonitor
+    {
+    }
+    class Monitor2 extends NoopMonitor
+    {
+    }
+    class Monitor3 extends NoopMonitor
+    {
+    }
+
+    final Monitor1 monitor1 = new Monitor1();
+    final Monitor2 monitor2 = new Monitor2();
+
+    final MonitorScheduler scheduler = new MonitorScheduler(
+        Mockito.mock(MonitorSchedulerConfig.class),
+        Execs.scheduledSingleThreaded("monitor-scheduler-test"),
+        Mockito.mock(ServiceEmitter.class),
+        ImmutableList.of(monitor1, monitor2)
+    );
+
+    final Optional<Monitor1> maybeFound1 = 
scheduler.findMonitor(Monitor1.class);
+    final Optional<Monitor2> maybeFound2 = 
scheduler.findMonitor(Monitor2.class);
+    Assert.assertTrue(maybeFound1.isPresent());
+    Assert.assertTrue(maybeFound2.isPresent());
+    Assert.assertSame(monitor1, maybeFound1.get());
+    Assert.assertSame(monitor2, maybeFound2.get());
+
+    Assert.assertFalse(scheduler.findMonitor(Monitor3.class).isPresent());
+  }
+
+  private static class NoopMonitor implements Monitor
+  {
+    @Override
+    public void start()
+    {
+
+    }
+
+    @Override
+    public void stop()
+    {
+
+    }
+
+    @Override
+    public boolean monitor(ServiceEmitter emitter)
+    {
+      return true;
+    }
+  }
+}
diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md
index 1b4ed7f..95bc345 100644
--- a/docs/operations/metrics.md
+++ b/docs/operations/metrics.md
@@ -181,7 +181,7 @@ These metrics are only available if the 
RealtimeMetricsMonitor is included in th
 
 Note: If the JVM does not support CPU time measurement for the current thread, 
ingest/merge/cpu and ingest/persists/cpu will be 0.
 
-### Indexing service
+## Indexing service
 
 |Metric|Description|Dimensions|Normal Value|
 |------|-----------|----------|------------|
@@ -202,6 +202,16 @@ Note: If the JVM does not support CPU time measurement for 
the current thread, i
 |`taskSlot/lazy/count`|Number of total task slots in lazy marked 
MiddleManagers and Indexers per emission period. This metric is only available 
if the TaskSlotCountStatsMonitor module is included.| |Varies.|
 |`taskSlot/blacklisted/count`|Number of total task slots in blacklisted 
MiddleManagers and Indexers per emission period. This metric is only available 
if the TaskSlotCountStatsMonitor module is included.| |Varies.|
 
+## Shuffle metrics (Native parallel task)
+
+The shuffle metrics can be enabled by adding 
`org.apache.druid.indexing.worker.shuffle.ShuffleMonitor` in 
`druid.monitoring.monitors`
+See [Enabling Metrics](../configuration/index.md#enabling-metrics) for more 
details.
+
+|Metric|Description|Dimensions|Normal Value|
+|------|-----------|----------|------------|
+|`ingest/shuffle/bytes`|Number of bytes shuffled per emission 
period.|supervisorTaskId|Varies|
+|`ingest/shuffle/requests`|Number of shuffle requests per emission 
period.|supervisorTaskId|Varies|
+
 ## Coordination
 
 These metrics are for the Druid Coordinator and are reset each time the 
Coordinator runs the coordination logic.
diff --git a/indexing-service/pom.xml b/indexing-service/pom.xml
index cea66e7..d00de06 100644
--- a/indexing-service/pom.xml
+++ b/indexing-service/pom.xml
@@ -278,6 +278,12 @@
             <artifactId>system-rules</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-core</artifactId>
+            <version>${mockito.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java
index 5f6f392..cbdfc68 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java
@@ -42,7 +42,7 @@ import org.apache.druid.indexing.common.config.TaskConfig;
 import org.apache.druid.indexing.common.task.IndexTaskClientFactory;
 import 
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTaskClient;
 import org.apache.druid.indexing.common.task.batch.parallel.ShuffleClient;
-import org.apache.druid.indexing.worker.IntermediaryDataManager;
+import org.apache.druid.indexing.worker.shuffle.IntermediaryDataManager;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.java.util.metrics.Monitor;
 import org.apache.druid.java.util.metrics.MonitorScheduler;
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java
index 8c883a4..2ef1f88 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java
@@ -41,7 +41,7 @@ import 
org.apache.druid.indexing.common.task.IndexTaskClientFactory;
 import org.apache.druid.indexing.common.task.Task;
 import 
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTaskClient;
 import org.apache.druid.indexing.common.task.batch.parallel.ShuffleClient;
-import org.apache.druid.indexing.worker.IntermediaryDataManager;
+import org.apache.druid.indexing.worker.shuffle.IntermediaryDataManager;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.java.util.metrics.MonitorScheduler;
 import org.apache.druid.query.QueryRunnerFactoryConglomerate;
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
index acac279..d5d8a8f 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
@@ -62,6 +62,7 @@ import 
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTaskRun
 import 
org.apache.druid.indexing.common.task.batch.parallel.distribution.StringDistribution;
 import 
org.apache.druid.indexing.common.task.batch.parallel.distribution.StringDistributionMerger;
 import 
org.apache.druid.indexing.common.task.batch.parallel.distribution.StringSketchMerger;
+import org.apache.druid.indexing.worker.shuffle.IntermediaryDataManager;
 import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.Pair;
@@ -517,7 +518,7 @@ public class ParallelIndexSupervisorTask extends 
AbstractBatchIndexTask implemen
    * - In the first phase, each task partitions input data and stores those 
partitions in local storage.
    *   - The partition is created based on the segment granularity (primary 
partition key) and the partition dimension
    *     values in {@link PartitionsSpec} (secondary partition key).
-   *   - Partitioned data is maintained by {@link 
org.apache.druid.indexing.worker.IntermediaryDataManager}.
+   *   - Partitioned data is maintained by {@link IntermediaryDataManager}.
    * - In the second phase, each task reads partitioned data from the 
intermediary data server (middleManager
    *   or indexer) and merges them to create the final segments.
    */
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java
index ff0090c..3b1f7ba 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java
@@ -29,6 +29,7 @@ import 
org.apache.druid.indexing.common.task.SegmentAllocators;
 import org.apache.druid.indexing.common.task.TaskResource;
 import 
org.apache.druid.indexing.common.task.batch.parallel.iterator.DefaultIndexTaskInputRowIteratorBuilder;
 import 
org.apache.druid.indexing.common.task.batch.partition.HashPartitionAnalysis;
+import org.apache.druid.indexing.worker.shuffle.ShuffleDataSegmentPusher;
 import org.apache.druid.segment.indexing.granularity.GranularitySpec;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.partition.BucketNumberedShardSpec;
@@ -46,7 +47,7 @@ import java.util.stream.Collectors;
 /**
  * The worker task of {@link 
PartialHashSegmentGenerateParallelIndexTaskRunner}. This task partitions input 
data by
  * hashing the segment granularity and partition dimensions in {@link 
HashedPartitionsSpec}. Partitioned segments are
- * stored in local storage using {@link 
org.apache.druid.indexing.worker.ShuffleDataSegmentPusher}.
+ * stored in local storage using {@link ShuffleDataSegmentPusher}.
  */
 public class PartialHashSegmentGenerateTask extends 
PartialSegmentGenerateTask<GeneratedPartitionsMetadataReport>
 {
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java
index 89b9f80..57978f4 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java
@@ -31,7 +31,7 @@ import 
org.apache.druid.indexing.common.task.SegmentAllocators;
 import org.apache.druid.indexing.common.task.TaskResource;
 import 
org.apache.druid.indexing.common.task.batch.parallel.iterator.RangePartitionIndexTaskInputRowIteratorBuilder;
 import 
org.apache.druid.indexing.common.task.batch.partition.RangePartitionAnalysis;
-import org.apache.druid.indexing.worker.ShuffleDataSegmentPusher;
+import org.apache.druid.indexing.worker.shuffle.ShuffleDataSegmentPusher;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.partition.BucketNumberedShardSpec;
 import org.apache.druid.timeline.partition.PartitionBoundaries;
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java
index 27160c9..ec8530b 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java
@@ -30,7 +30,7 @@ import 
org.apache.druid.indexing.common.task.SegmentAllocatorForBatch;
 import org.apache.druid.indexing.common.task.SequenceNameFunction;
 import org.apache.druid.indexing.common.task.TaskResource;
 import 
org.apache.druid.indexing.common.task.batch.parallel.iterator.IndexTaskInputRowIteratorBuilder;
-import org.apache.druid.indexing.worker.ShuffleDataSegmentPusher;
+import org.apache.druid.indexing.worker.shuffle.ShuffleDataSegmentPusher;
 import org.apache.druid.query.DruidMetrics;
 import org.apache.druid.segment.incremental.ParseExceptionHandler;
 import org.apache.druid.segment.incremental.RowIngestionMeters;
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ShuffleClient.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ShuffleClient.java
index 4395c7c..b6ea7aa 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ShuffleClient.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ShuffleClient.java
@@ -19,6 +19,8 @@
 
 package org.apache.druid.indexing.common.task.batch.parallel;
 
+import org.apache.druid.indexing.worker.shuffle.IntermediaryDataManager;
+
 import java.io.File;
 import java.io.IOException;
 
@@ -27,7 +29,7 @@ import java.io.IOException;
  * The only available implementation for production code is {@link 
HttpShuffleClient} and
  * this interface is more for easier testing.
  *
- * @see org.apache.druid.indexing.worker.IntermediaryDataManager
+ * @see IntermediaryDataManager
  * @see PartialSegmentMergeTask
  */
 public interface ShuffleClient
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/IntermediaryDataManager.java
similarity index 98%
rename from 
indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java
rename to 
indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/IntermediaryDataManager.java
index 408fba5..9afa1c0 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/IntermediaryDataManager.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.druid.indexing.worker;
+package org.apache.druid.indexing.worker.shuffle;
 
 import com.google.common.collect.Iterators;
 import com.google.common.io.Files;
@@ -41,6 +41,7 @@ import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.segment.loading.StorageLocation;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.utils.CompressionUtils;
+import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
 import org.joda.time.DateTime;
 import org.joda.time.Interval;
 import org.joda.time.Period;
@@ -67,7 +68,7 @@ import java.util.stream.IntStream;
 /**
  * This class manages intermediary segments for data shuffle between native 
parallel index tasks.
  * In native parallel indexing, phase 1 tasks store segment files in local 
storage of middleManagers (or indexer)
- * and phase 2 tasks read those files via HTTP.
+ * and phase 2 tasks read those files over HTTP.
  *
  * The directory where segment files are placed is structured as
  * {@link 
StorageLocation#path}/supervisorTaskId/startTimeOfSegment/endTimeOfSegment/bucketIdOfSegment.
@@ -100,7 +101,7 @@ public class IntermediaryDataManager
   // but middleManager or indexer could miss the request. This executor is to 
automatically clean up unused intermediary
   // partitions.
   // This can be null until IntermediaryDataManager is started.
-  @Nullable
+  @MonotonicNonNull
   private ScheduledExecutorService supervisorTaskChecker;
 
   @Inject
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/worker/ShuffleDataSegmentPusher.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/ShuffleDataSegmentPusher.java
similarity index 97%
rename from 
indexing-service/src/main/java/org/apache/druid/indexing/worker/ShuffleDataSegmentPusher.java
rename to 
indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/ShuffleDataSegmentPusher.java
index fcbdf9d..6bc83ba 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/worker/ShuffleDataSegmentPusher.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/ShuffleDataSegmentPusher.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.druid.indexing.worker;
+package org.apache.druid.indexing.worker.shuffle;
 
 import org.apache.druid.segment.SegmentUtils;
 import org.apache.druid.segment.loading.DataSegmentPusher;
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/ShuffleMetrics.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/ShuffleMetrics.java
new file mode 100644
index 0000000..500d8e9
--- /dev/null
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/ShuffleMetrics.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.worker.shuffle;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.errorprone.annotations.concurrent.GuardedBy;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Shuffle metrcis for middleManagers and indexers. This class is thread-safe 
because shuffle can be performed by
+ * multiple HTTP threads while a monitoring thread periodically emits the 
snapshot of metrics.
+ *
+ * @see ShuffleResource
+ * @see org.apache.druid.java.util.metrics.MonitorScheduler
+ */
+public class ShuffleMetrics
+{
+  /**
+   * This lock is used to synchronize accesses to the reference to {@link 
#datasourceMetrics} and the
+   * {@link PerDatasourceShuffleMetrics} values of the map. This means,
+   *
+   * - Any updates on PerDatasourceShuffleMetrics in the map (and thus its key 
as well) should be synchronized
+   * under this lock.
+   * - Any updates on the reference to datasourceMetrics should be 
synchronized under this lock.
+   */
+  private final Object lock = new Object();
+
+  /**
+   * A map of (datasource name) -> {@link PerDatasourceShuffleMetrics}. This 
map is replaced with an empty map
+   * whenever a snapshot is taken since the map can keep growing over time 
otherwise. For concurrent access pattern,
+   * see {@link #shuffleRequested} and {@link #snapshotAndReset()}.
+   */
+  @GuardedBy("lock")
+  private Map<String, PerDatasourceShuffleMetrics> datasourceMetrics = new 
HashMap<>();
+
+  /**
+   * This method is called whenever a new shuffle is requested. Multiple tasks 
can request shuffle at the same time,
+   * while the monitoring thread takes a snapshot of the metrics. There is a 
happens-before relationship between
+   * shuffleRequested and {@link #snapshotAndReset()}.
+   */
+  public void shuffleRequested(String supervisorTaskId, long fileLength)
+  {
+    synchronized (lock) {
+      datasourceMetrics.computeIfAbsent(supervisorTaskId, k -> new 
PerDatasourceShuffleMetrics())
+                       .accumulate(fileLength);
+    }
+  }
+
+  /**
+   * This method is called whenever the monitoring thread takes a snapshot of 
the current metrics.
+   * {@link #datasourceMetrics} will be reset to an empty map after this call. 
This is to return the snapshot
+   * metrics collected during the monitornig period. There is a happens-before 
relationship between snapshotAndReset()
+   * and {@link #shuffleRequested}.
+   */
+  public Map<String, PerDatasourceShuffleMetrics> snapshotAndReset()
+  {
+    synchronized (lock) {
+      final Map<String, PerDatasourceShuffleMetrics> snapshot = 
Collections.unmodifiableMap(datasourceMetrics);
+      datasourceMetrics = new HashMap<>();
+      return snapshot;
+    }
+  }
+
+  /**
+   * This method is visible only for testing. Use {@link #snapshotAndReset()} 
instead to get the current snapshot.
+   */
+  @VisibleForTesting
+  Map<String, PerDatasourceShuffleMetrics> getDatasourceMetrics()
+  {
+    synchronized (lock) {
+      return datasourceMetrics;
+    }
+  }
+
+  /**
+   * This class represents shuffle metrics of one datasource. This class is 
not thread-safe and should never be accessed
+   * by multiple threads at the same time.
+   */
+  public static class PerDatasourceShuffleMetrics
+  {
+    private long shuffleBytes;
+    private int shuffleRequests;
+
+    @VisibleForTesting
+    void accumulate(long shuffleBytes)
+    {
+      this.shuffleBytes += shuffleBytes;
+      this.shuffleRequests++;
+    }
+
+    public long getShuffleBytes()
+    {
+      return shuffleBytes;
+    }
+
+    public int getShuffleRequests()
+    {
+      return shuffleRequests;
+    }
+  }
+}
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/ShuffleModule.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/ShuffleModule.java
new file mode 100644
index 0000000..1c2afb1
--- /dev/null
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/ShuffleModule.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.worker.shuffle;
+
+import com.google.inject.Binder;
+import com.google.inject.Module;
+import com.google.inject.Provides;
+import org.apache.druid.guice.Jerseys;
+import org.apache.druid.guice.LazySingleton;
+import org.apache.druid.java.util.metrics.MonitorScheduler;
+
+import java.util.Optional;
+
+public class ShuffleModule implements Module
+{
+  @Override
+  public void configure(Binder binder)
+  {
+    Jerseys.addResource(binder, ShuffleResource.class);
+  }
+
+  /**
+   * {@link ShuffleMetrics} is used in {@link ShuffleResource} and {@link 
ShuffleMonitor} to collect metrics
+   * and report them, respectively. Unlike ShuffleResource, ShuffleMonitor can 
be created via a user config
+   * ({@link org.apache.druid.server.metrics.MonitorsConfig}) in potentially 
any node types, where it is not
+   * possible to create ShuffleMetrics. This method checks the {@link 
MonitorScheduler} if ShuffleMonitor is
+   * registered on it, and sets the proper ShuffleMetrics.
+   */
+  @Provides
+  @LazySingleton
+  public Optional<ShuffleMetrics> getShuffleMetrics(MonitorScheduler 
monitorScheduler)
+  {
+    // ShuffleMonitor cannot be registered dynamically, but can only via the 
static configuration (MonitorsConfig).
+    // As a result, it is safe to check only one time if it is registered in 
MonitorScheduler.
+    final Optional<ShuffleMonitor> maybeMonitor = 
monitorScheduler.findMonitor(ShuffleMonitor.class);
+    if (maybeMonitor.isPresent()) {
+      final ShuffleMetrics metrics = new ShuffleMetrics();
+      maybeMonitor.get().setShuffleMetrics(metrics);
+      return Optional.of(metrics);
+    }
+    return Optional.empty();
+  }
+}
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/ShuffleMonitor.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/ShuffleMonitor.java
new file mode 100644
index 0000000..6157698
--- /dev/null
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/ShuffleMonitor.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.worker.shuffle;
+
+import 
org.apache.druid.indexing.worker.shuffle.ShuffleMetrics.PerDatasourceShuffleMetrics;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent.Builder;
+import org.apache.druid.java.util.metrics.AbstractMonitor;
+import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
+
+import java.util.Map;
+
+public class ShuffleMonitor extends AbstractMonitor
+{
+  static final String SUPERVISOR_TASK_ID_DIMENSION = "supervisorTaskId";
+  static final String SHUFFLE_BYTES_KEY = "ingest/shuffle/bytes";
+  static final String SHUFFLE_REQUESTS_KEY = "ingest/shuffle/requests";
+
+  /**
+   * ShuffleMonitor can be instantiated in any node types if it is defined in
+   * {@link org.apache.druid.server.metrics.MonitorsConfig}. Since {@link 
ShuffleMetrics} is defined
+   * in the `indexing-service` module, some node types (such as broker) would 
fail to create it
+   * if they don't have required dependencies. To avoid this problem, this 
variable is lazily initialized
+   * only in the node types which has the {@link ShuffleModule}.
+   */
+  @MonotonicNonNull
+  private ShuffleMetrics shuffleMetrics;
+
+  public void setShuffleMetrics(ShuffleMetrics shuffleMetrics)
+  {
+    this.shuffleMetrics = shuffleMetrics;
+  }
+
+  @Override
+  public boolean doMonitor(ServiceEmitter emitter)
+  {
+    if (shuffleMetrics != null) {
+      final Map<String, PerDatasourceShuffleMetrics> snapshot = 
shuffleMetrics.snapshotAndReset();
+      snapshot.forEach((supervisorTaskId, perDatasourceShuffleMetrics) -> {
+        final Builder metricBuilder = ServiceMetricEvent
+            .builder()
+            .setDimension(SUPERVISOR_TASK_ID_DIMENSION, supervisorTaskId);
+        emitter.emit(metricBuilder.build(SHUFFLE_BYTES_KEY, 
perDatasourceShuffleMetrics.getShuffleBytes()));
+        emitter.emit(metricBuilder.build(SHUFFLE_REQUESTS_KEY, 
perDatasourceShuffleMetrics.getShuffleRequests()));
+      });
+    }
+    return true;
+  }
+}
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/worker/http/ShuffleResource.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/ShuffleResource.java
similarity index 92%
rename from 
indexing-service/src/main/java/org/apache/druid/indexing/worker/http/ShuffleResource.java
rename to 
indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/ShuffleResource.java
index 0e0e936..dd885a2 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/worker/http/ShuffleResource.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/ShuffleResource.java
@@ -17,12 +17,11 @@
  * under the License.
  */
 
-package org.apache.druid.indexing.worker.http;
+package org.apache.druid.indexing.worker.shuffle;
 
 import com.google.common.io.ByteStreams;
 import com.google.inject.Inject;
 import com.sun.jersey.spi.container.ResourceFilters;
-import org.apache.druid.indexing.worker.IntermediaryDataManager;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.logger.Logger;
@@ -42,6 +41,7 @@ import javax.ws.rs.core.StreamingOutput;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
+import java.util.Optional;
 
 /**
  * HTTP endpoints for shuffle system. The MiddleManager and Indexer use this 
resource to serve intermediary shuffle
@@ -60,11 +60,13 @@ public class ShuffleResource
   private static final Logger log = new Logger(ShuffleResource.class);
 
   private final IntermediaryDataManager intermediaryDataManager;
+  private final Optional<ShuffleMetrics> shuffleMetrics;
 
   @Inject
-  public ShuffleResource(IntermediaryDataManager intermediaryDataManager)
+  public ShuffleResource(IntermediaryDataManager intermediaryDataManager, 
Optional<ShuffleMetrics> shuffleMetrics)
   {
     this.intermediaryDataManager = intermediaryDataManager;
+    this.shuffleMetrics = shuffleMetrics;
   }
 
   @GET
@@ -96,6 +98,7 @@ public class ShuffleResource
       );
       return Response.status(Status.NOT_FOUND).entity(errorMessage).build();
     } else {
+      shuffleMetrics.ifPresent(metrics -> 
metrics.shuffleRequested(supervisorTaskId, partitionFile.length()));
       return Response.ok(
           (StreamingOutput) output -> {
             try (final FileInputStream fileInputStream = new 
FileInputStream(partitionFile)) {
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
index af22160..d87d1f0 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
@@ -64,8 +64,8 @@ import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.indexing.common.task.TaskResource;
 import org.apache.druid.indexing.common.task.TestAppenderatorsManager;
 import org.apache.druid.indexing.overlord.Segments;
-import org.apache.druid.indexing.worker.IntermediaryDataManager;
 import org.apache.druid.indexing.worker.config.WorkerConfig;
+import org.apache.druid.indexing.worker.shuffle.IntermediaryDataManager;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.ISE;
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/worker/IntermediaryDataManagerAutoCleanupTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/IntermediaryDataManagerAutoCleanupTest.java
similarity index 98%
rename from 
indexing-service/src/test/java/org/apache/druid/indexing/worker/IntermediaryDataManagerAutoCleanupTest.java
rename to 
indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/IntermediaryDataManagerAutoCleanupTest.java
index 7d0233b..dcce480 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/worker/IntermediaryDataManagerAutoCleanupTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/IntermediaryDataManagerAutoCleanupTest.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.druid.indexing.worker;
+package org.apache.druid.indexing.worker.shuffle;
 
 import com.google.common.collect.ImmutableList;
 import org.apache.commons.io.FileUtils;
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/worker/IntermediaryDataManagerManualAddAndDeleteTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/IntermediaryDataManagerManualAddAndDeleteTest.java
similarity index 99%
rename from 
indexing-service/src/test/java/org/apache/druid/indexing/worker/IntermediaryDataManagerManualAddAndDeleteTest.java
rename to 
indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/IntermediaryDataManagerManualAddAndDeleteTest.java
index 15aad92..4db1b39 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/worker/IntermediaryDataManagerManualAddAndDeleteTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/IntermediaryDataManagerManualAddAndDeleteTest.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.druid.indexing.worker;
+package org.apache.druid.indexing.worker.shuffle;
 
 import com.google.common.collect.ImmutableList;
 import org.apache.commons.io.FileUtils;
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/worker/ShuffleDataSegmentPusherTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleDataSegmentPusherTest.java
similarity index 98%
rename from 
indexing-service/src/test/java/org/apache/druid/indexing/worker/ShuffleDataSegmentPusherTest.java
rename to 
indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleDataSegmentPusherTest.java
index 1531926..0604742 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/worker/ShuffleDataSegmentPusherTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleDataSegmentPusherTest.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.druid.indexing.worker;
+package org.apache.druid.indexing.worker.shuffle;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.io.Files;
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleMetricsTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleMetricsTest.java
new file mode 100644
index 0000000..d4f6258
--- /dev/null
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleMetricsTest.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.worker.shuffle;
+
+import com.google.common.collect.ImmutableSet;
+import 
org.apache.druid.indexing.worker.shuffle.ShuffleMetrics.PerDatasourceShuffleMetrics;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadLocalRandom;
+
+public class ShuffleMetricsTest
+{
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+
+  @Test
+  public void testShuffleRequested()
+  {
+    final ShuffleMetrics metrics = new ShuffleMetrics();
+    final String supervisorTask1 = "supervisor1";
+    final String supervisorTask2 = "supervisor2";
+    final String supervisorTask3 = "supervisor3";
+    metrics.shuffleRequested(supervisorTask1, 1024);
+    metrics.shuffleRequested(supervisorTask2, 10);
+    metrics.shuffleRequested(supervisorTask1, 512);
+    metrics.shuffleRequested(supervisorTask3, 10000);
+    metrics.shuffleRequested(supervisorTask2, 30);
+
+    final Map<String, PerDatasourceShuffleMetrics> snapshot = 
metrics.snapshotAndReset();
+    Assert.assertEquals(ImmutableSet.of(supervisorTask1, supervisorTask2, 
supervisorTask3), snapshot.keySet());
+
+    PerDatasourceShuffleMetrics perDatasourceShuffleMetrics = 
snapshot.get(supervisorTask1);
+    Assert.assertEquals(2, perDatasourceShuffleMetrics.getShuffleRequests());
+    Assert.assertEquals(1536, perDatasourceShuffleMetrics.getShuffleBytes());
+
+    perDatasourceShuffleMetrics = snapshot.get(supervisorTask2);
+    Assert.assertEquals(2, perDatasourceShuffleMetrics.getShuffleRequests());
+    Assert.assertEquals(40, perDatasourceShuffleMetrics.getShuffleBytes());
+
+    perDatasourceShuffleMetrics = snapshot.get(supervisorTask3);
+    Assert.assertEquals(1, perDatasourceShuffleMetrics.getShuffleRequests());
+    Assert.assertEquals(10000, perDatasourceShuffleMetrics.getShuffleBytes());
+  }
+
+  @Test
+  public void testSnapshotUnmodifiable()
+  {
+    expectedException.expect(UnsupportedOperationException.class);
+    new ShuffleMetrics().snapshotAndReset().put("k", new 
PerDatasourceShuffleMetrics());
+  }
+
+  @Test
+  public void testResetDatasourceMetricsAfterSnapshot()
+  {
+    final ShuffleMetrics shuffleMetrics = new ShuffleMetrics();
+    shuffleMetrics.shuffleRequested("supervisor", 10);
+    shuffleMetrics.shuffleRequested("supervisor", 10);
+    shuffleMetrics.shuffleRequested("supervisor2", 10);
+    shuffleMetrics.snapshotAndReset();
+
+    Assert.assertEquals(Collections.emptyMap(), 
shuffleMetrics.getDatasourceMetrics());
+  }
+
+  @Test(timeout = 5000L)
+  public void testConcurrency() throws ExecutionException, InterruptedException
+  {
+    final ExecutorService exec = Execs.multiThreaded(3, 
"shuffle-metrics-test-%d"); // 2 for write, 1 for read
+
+    try {
+      final ShuffleMetrics metrics = new ShuffleMetrics();
+      final String supervisorTask1 = "supervisor1";
+      final String supervisorTask2 = "supervisor2";
+
+      final CountDownLatch firstUpdatelatch = new CountDownLatch(2);
+      final List<Future<Void>> futures = new ArrayList<>();
+
+      futures.add(
+          exec.submit(() -> {
+            metrics.shuffleRequested(supervisorTask1, 1024);
+            metrics.shuffleRequested(supervisorTask2, 30);
+            firstUpdatelatch.countDown();
+            Thread.sleep(ThreadLocalRandom.current().nextInt(10));
+            metrics.shuffleRequested(supervisorTask2, 10);
+            return null;
+          })
+      );
+      futures.add(
+          exec.submit(() -> {
+            metrics.shuffleRequested(supervisorTask2, 30);
+            metrics.shuffleRequested(supervisorTask1, 1024);
+            firstUpdatelatch.countDown();
+            Thread.sleep(ThreadLocalRandom.current().nextInt(10));
+            metrics.shuffleRequested(supervisorTask1, 32);
+            return null;
+          })
+      );
+      final Map<String, PerDatasourceShuffleMetrics> firstSnapshot = 
exec.submit(() -> {
+        firstUpdatelatch.await();
+        Thread.sleep(ThreadLocalRandom.current().nextInt(10));
+        return metrics.snapshotAndReset();
+      }).get();
+
+      int expectedSecondSnapshotSize = 0;
+      boolean task1ShouldBeInSecondSnapshot = false;
+      boolean task2ShouldBeInSecondSnapshot = false;
+
+      Assert.assertEquals(2, firstSnapshot.size());
+      Assert.assertNotNull(firstSnapshot.get(supervisorTask1));
+      Assert.assertTrue(
+          2048 == firstSnapshot.get(supervisorTask1).getShuffleBytes()
+          || 2080 == firstSnapshot.get(supervisorTask1).getShuffleBytes()
+      );
+      Assert.assertTrue(
+          2 == firstSnapshot.get(supervisorTask1).getShuffleRequests()
+          || 3 == firstSnapshot.get(supervisorTask1).getShuffleRequests()
+      );
+      if (firstSnapshot.get(supervisorTask1).getShuffleRequests() == 2) {
+        expectedSecondSnapshotSize++;
+        task1ShouldBeInSecondSnapshot = true;
+      }
+      Assert.assertNotNull(firstSnapshot.get(supervisorTask2));
+      Assert.assertTrue(
+          60 == firstSnapshot.get(supervisorTask2).getShuffleBytes()
+          || 70 == firstSnapshot.get(supervisorTask2).getShuffleBytes()
+      );
+      Assert.assertTrue(
+          2 == firstSnapshot.get(supervisorTask2).getShuffleRequests()
+          || 3 == firstSnapshot.get(supervisorTask2).getShuffleRequests()
+      );
+      if (firstSnapshot.get(supervisorTask2).getShuffleRequests() == 2) {
+        expectedSecondSnapshotSize++;
+        task2ShouldBeInSecondSnapshot = true;
+      }
+
+      for (Future<Void> future : futures) {
+        future.get();
+      }
+      final Map<String, PerDatasourceShuffleMetrics> secondSnapshot = 
metrics.snapshotAndReset();
+
+      Assert.assertEquals(expectedSecondSnapshotSize, secondSnapshot.size());
+      Assert.assertEquals(task1ShouldBeInSecondSnapshot, 
secondSnapshot.containsKey(supervisorTask1));
+      if (task1ShouldBeInSecondSnapshot) {
+        Assert.assertEquals(32, 
secondSnapshot.get(supervisorTask1).getShuffleBytes());
+        Assert.assertEquals(1, 
secondSnapshot.get(supervisorTask1).getShuffleRequests());
+      }
+      Assert.assertEquals(task2ShouldBeInSecondSnapshot, 
secondSnapshot.containsKey(supervisorTask2));
+      if (task2ShouldBeInSecondSnapshot) {
+        Assert.assertEquals(10, 
secondSnapshot.get(supervisorTask2).getShuffleBytes());
+        Assert.assertEquals(1, 
secondSnapshot.get(supervisorTask2).getShuffleRequests());
+      }
+
+    }
+    finally {
+      exec.shutdown();
+    }
+  }
+}
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleModuleTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleModuleTest.java
new file mode 100644
index 0000000..fec860a
--- /dev/null
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleModuleTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.worker.shuffle;
+
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Key;
+import com.google.inject.Scopes;
+import com.google.inject.TypeLiteral;
+import org.apache.druid.guice.LazySingleton;
+import org.apache.druid.java.util.metrics.MonitorScheduler;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mockito;
+
+import java.util.Optional;
+
+public class ShuffleModuleTest
+{
+  private ShuffleModule shuffleModule;
+
+  @Before
+  public void setup()
+  {
+    shuffleModule = new ShuffleModule();
+  }
+
+  @Test
+  public void testGetShuffleMetricsWhenShuffleMonitorExists()
+  {
+    final ShuffleMonitor shuffleMonitor = new ShuffleMonitor();
+    final MonitorScheduler monitorScheduler = 
Mockito.mock(MonitorScheduler.class);
+    Mockito.when(monitorScheduler.findMonitor(ShuffleMonitor.class))
+           .thenReturn(Optional.of(shuffleMonitor));
+    final Injector injector = createInjector(monitorScheduler);
+    final Optional<ShuffleMetrics> optional = injector.getInstance(
+        Key.get(new TypeLiteral<Optional<ShuffleMetrics>>() {})
+    );
+    Assert.assertTrue(optional.isPresent());
+  }
+
+  @Test
+  public void testGetShuffleMetricsWithNoShuffleMonitor()
+  {
+    final MonitorScheduler monitorScheduler = 
Mockito.mock(MonitorScheduler.class);
+    
Mockito.when(monitorScheduler.findMonitor(ArgumentMatchers.eq(ShuffleMonitor.class)))
+           .thenReturn(Optional.empty());
+    final Injector injector = createInjector(monitorScheduler);
+    final Optional<ShuffleMetrics> optional = injector.getInstance(
+        Key.get(new TypeLiteral<Optional<ShuffleMetrics>>() {})
+    );
+    Assert.assertFalse(optional.isPresent());
+  }
+
+  private Injector createInjector(MonitorScheduler monitorScheduler)
+  {
+    return Guice.createInjector(
+        binder -> {
+          binder.bindScope(LazySingleton.class, Scopes.SINGLETON);
+          binder.bind(MonitorScheduler.class).toInstance(monitorScheduler);
+          
binder.bind(IntermediaryDataManager.class).toInstance(Mockito.mock(IntermediaryDataManager.class));
+        },
+        shuffleModule
+    );
+  }
+}
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleMonitorTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleMonitorTest.java
new file mode 100644
index 0000000..1174bc8
--- /dev/null
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleMonitorTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.worker.shuffle;
+
+import com.google.common.collect.ImmutableMap;
+import 
org.apache.druid.indexing.worker.shuffle.ShuffleMetrics.PerDatasourceShuffleMetrics;
+import org.apache.druid.java.util.emitter.core.Event;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.java.util.metrics.StubServiceEmitter;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.List;
+
+public class ShuffleMonitorTest
+{
+  @Test
+  public void testDoMonitor()
+  {
+    final ShuffleMetrics shuffleMetrics = Mockito.mock(ShuffleMetrics.class);
+    final PerDatasourceShuffleMetrics perDatasourceShuffleMetrics = new 
PerDatasourceShuffleMetrics();
+    perDatasourceShuffleMetrics.accumulate(100);
+    perDatasourceShuffleMetrics.accumulate(200);
+    perDatasourceShuffleMetrics.accumulate(10);
+    Mockito.when(shuffleMetrics.snapshotAndReset())
+           .thenReturn(ImmutableMap.of("supervisor", 
perDatasourceShuffleMetrics));
+    final StubServiceEmitter emitter = new StubServiceEmitter("service", 
"host");
+    final ShuffleMonitor monitor = new ShuffleMonitor();
+    monitor.setShuffleMetrics(shuffleMetrics);
+    Assert.assertTrue(monitor.doMonitor(emitter));
+    final List<Event> events = emitter.getEvents();
+    Assert.assertEquals(2, events.size());
+    Assert.assertSame(ServiceMetricEvent.class, events.get(0).getClass());
+    ServiceMetricEvent event = (ServiceMetricEvent) events.get(0);
+    Assert.assertEquals(ShuffleMonitor.SHUFFLE_BYTES_KEY, event.getMetric());
+    Assert.assertEquals(310L, event.getValue());
+    Assert.assertEquals(
+        ImmutableMap.of(ShuffleMonitor.SUPERVISOR_TASK_ID_DIMENSION, 
"supervisor"),
+        event.getUserDims()
+    );
+    Assert.assertSame(ServiceMetricEvent.class, events.get(1).getClass());
+    event = (ServiceMetricEvent) events.get(1);
+    Assert.assertEquals(ShuffleMonitor.SHUFFLE_REQUESTS_KEY, 
event.getMetric());
+    Assert.assertEquals(3, event.getValue());
+    Assert.assertEquals(
+        ImmutableMap.of(ShuffleMonitor.SUPERVISOR_TASK_ID_DIMENSION, 
"supervisor"),
+        event.getUserDims()
+    );
+  }
+}
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleResourceTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleResourceTest.java
new file mode 100644
index 0000000..bd1b211
--- /dev/null
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleResourceTest.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.worker.shuffle;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.commons.io.FileUtils;
+import org.apache.druid.client.indexing.IndexingServiceClient;
+import org.apache.druid.client.indexing.NoopIndexingServiceClient;
+import org.apache.druid.client.indexing.TaskStatus;
+import org.apache.druid.indexer.TaskState;
+import org.apache.druid.indexing.common.config.TaskConfig;
+import org.apache.druid.indexing.worker.config.WorkerConfig;
+import 
org.apache.druid.indexing.worker.shuffle.ShuffleMetrics.PerDatasourceShuffleMetrics;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.segment.loading.StorageLocationConfig;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.partition.NumberedShardSpec;
+import org.easymock.EasyMock;
+import org.joda.time.Interval;
+import org.joda.time.Period;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.Status;
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+public class ShuffleResourceTest
+{
+  private static final String DATASOURCE = "datasource";
+
+  @Rule
+  public TemporaryFolder tempDir = new TemporaryFolder();
+
+  private IntermediaryDataManager intermediaryDataManager;
+  private ShuffleMetrics shuffleMetrics;
+  private ShuffleResource shuffleResource;
+
+  @Before
+  public void setup() throws IOException
+  {
+    final WorkerConfig workerConfig = new WorkerConfig()
+    {
+      @Override
+      public long getIntermediaryPartitionDiscoveryPeriodSec()
+      {
+        return 1;
+      }
+
+      @Override
+      public long getIntermediaryPartitionCleanupPeriodSec()
+      {
+        return 2;
+      }
+
+      @Override
+      public Period getIntermediaryPartitionTimeout()
+      {
+        return new Period("PT2S");
+      }
+
+    };
+    final TaskConfig taskConfig = new TaskConfig(
+        null,
+        null,
+        null,
+        null,
+        null,
+        false,
+        null,
+        null,
+        ImmutableList.of(new StorageLocationConfig(tempDir.newFolder(), null, 
null))
+    );
+    final IndexingServiceClient indexingServiceClient = new 
NoopIndexingServiceClient()
+    {
+      @Override
+      public Map<String, TaskStatus> getTaskStatuses(Set<String> taskIds)
+      {
+        final Map<String, TaskStatus> result = new HashMap<>();
+        for (String taskId : taskIds) {
+          result.put(taskId, new TaskStatus(taskId, TaskState.SUCCESS, 10));
+        }
+        return result;
+      }
+    };
+    intermediaryDataManager = new IntermediaryDataManager(workerConfig, 
taskConfig, indexingServiceClient);
+    shuffleMetrics = new ShuffleMetrics();
+    shuffleResource = new ShuffleResource(intermediaryDataManager, 
Optional.of(shuffleMetrics));
+  }
+
+  @Test
+  public void testGetUnknownPartitionReturnNotFound()
+  {
+    final Response response = shuffleResource.getPartition(
+        "unknownSupervisorTask",
+        "unknownSubtask",
+        "2020-01-01",
+        "2020-01-02",
+        0
+    );
+    Assert.assertEquals(Status.NOT_FOUND.getStatusCode(), 
response.getStatus());
+    Assert.assertNotNull(response.getEntity());
+    final String errorMessage = (String) response.getEntity();
+    Assert.assertTrue(errorMessage.contains("Can't find the partition for 
supervisorTask"));
+  }
+
+  @Test
+  public void testGetPartitionWithValidParamsReturnOk() throws IOException
+  {
+    final String supervisorTaskId = "supervisorTask";
+    final String subtaskId = "subtaskId";
+    final Interval interval = Intervals.of("2020-01-01/P1D");
+    final DataSegment segment = newSegment(interval);
+    final File segmentDir = generateSegmentDir("test");
+    intermediaryDataManager.addSegment(supervisorTaskId, subtaskId, segment, 
segmentDir);
+
+    final Response response = shuffleResource.getPartition(
+        supervisorTaskId,
+        subtaskId,
+        interval.getStart().toString(),
+        interval.getEnd().toString(),
+        segment.getId().getPartitionNum()
+    );
+    final Map<String, PerDatasourceShuffleMetrics> snapshot = 
shuffleMetrics.snapshotAndReset();
+    Assert.assertEquals(Status.OK.getStatusCode(), response.getStatus());
+    Assert.assertEquals(1, 
snapshot.get(supervisorTaskId).getShuffleRequests());
+    Assert.assertEquals(134, snapshot.get(supervisorTaskId).getShuffleBytes());
+  }
+
+  @Test
+  public void testDeleteUnknownPartitionReturnOk()
+  {
+    final Response response = 
shuffleResource.deletePartitions("unknownSupervisorTask");
+    Assert.assertEquals(Status.OK.getStatusCode(), response.getStatus());
+  }
+
+  @Test
+  public void testDeletePartitionWithValidParamsReturnOk() throws IOException
+  {
+    final String supervisorTaskId = "supervisorTask";
+    final String subtaskId = "subtaskId";
+    final Interval interval = Intervals.of("2020-01-01/P1D");
+    final DataSegment segment = newSegment(interval);
+    final File segmentDir = generateSegmentDir("test");
+    intermediaryDataManager.addSegment(supervisorTaskId, subtaskId, segment, 
segmentDir);
+
+    final Response response = 
shuffleResource.deletePartitions(supervisorTaskId);
+    Assert.assertEquals(Status.OK.getStatusCode(), response.getStatus());
+  }
+
+  @Test
+  public void testDeletePartitionThrowingExceptionReturnIntervalServerError() 
throws IOException
+  {
+    final IntermediaryDataManager exceptionThrowingManager = 
EasyMock.niceMock(IntermediaryDataManager.class);
+    exceptionThrowingManager.deletePartitions(EasyMock.anyString());
+    EasyMock.expectLastCall().andThrow(new IOException("test"));
+    EasyMock.replay(exceptionThrowingManager);
+    final ShuffleResource shuffleResource = new 
ShuffleResource(exceptionThrowingManager, Optional.of(shuffleMetrics));
+
+    final Response response = 
shuffleResource.deletePartitions("supervisorTask");
+    Assert.assertEquals(Status.INTERNAL_SERVER_ERROR.getStatusCode(), 
response.getStatus());
+  }
+
+  private static DataSegment newSegment(Interval interval)
+  {
+    return new DataSegment(
+        DATASOURCE,
+        interval,
+        "version",
+        null,
+        null,
+        null,
+        new NumberedShardSpec(0, 0),
+        0,
+        10
+    );
+  }
+
+  private File generateSegmentDir(String fileName) throws IOException
+  {
+    // Each file size is 138 bytes after compression
+    final File segmentDir = tempDir.newFolder();
+    FileUtils.write(new File(segmentDir, fileName), "test data.", 
StandardCharsets.UTF_8);
+    return segmentDir;
+  }
+}
diff --git a/services/src/main/java/org/apache/druid/cli/CliIndexer.java 
b/services/src/main/java/org/apache/druid/cli/CliIndexer.java
index 953c03f..10a7014 100644
--- a/services/src/main/java/org/apache/druid/cli/CliIndexer.java
+++ b/services/src/main/java/org/apache/druid/cli/CliIndexer.java
@@ -57,7 +57,7 @@ import org.apache.druid.indexing.overlord.TaskRunner;
 import org.apache.druid.indexing.overlord.ThreadingTaskRunner;
 import org.apache.druid.indexing.worker.Worker;
 import org.apache.druid.indexing.worker.config.WorkerConfig;
-import org.apache.druid.indexing.worker.http.ShuffleResource;
+import org.apache.druid.indexing.worker.shuffle.ShuffleModule;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.metadata.input.InputSourceModule;
 import org.apache.druid.query.QuerySegmentWalker;
@@ -143,7 +143,6 @@ public class CliIndexer extends ServerRunnable
 
             
binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class);
             Jerseys.addResource(binder, SegmentListerResource.class);
-            Jerseys.addResource(binder, ShuffleResource.class);
 
             LifecycleModule.register(binder, Server.class, 
RemoteChatHandler.class);
 
@@ -201,6 +200,7 @@ public class CliIndexer extends ServerRunnable
             );
           }
         },
+        new ShuffleModule(),
         new IndexingServiceFirehoseModule(),
         new IndexingServiceInputSourceModule(),
         new IndexingServiceTaskLogsModule(),
diff --git a/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java 
b/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java
index 531746f..4438a07 100644
--- a/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java
+++ b/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java
@@ -56,9 +56,9 @@ import org.apache.druid.indexing.worker.Worker;
 import org.apache.druid.indexing.worker.WorkerCuratorCoordinator;
 import org.apache.druid.indexing.worker.WorkerTaskMonitor;
 import org.apache.druid.indexing.worker.config.WorkerConfig;
-import org.apache.druid.indexing.worker.http.ShuffleResource;
 import org.apache.druid.indexing.worker.http.TaskManagementResource;
 import org.apache.druid.indexing.worker.http.WorkerResource;
+import org.apache.druid.indexing.worker.shuffle.ShuffleModule;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.metadata.input.InputSourceModule;
 import org.apache.druid.query.lookup.LookupSerdeModule;
@@ -142,8 +142,6 @@ public class CliMiddleManager extends ServerRunnable
                   .to(DummyForInjectionAppenderatorsManager.class)
                   .in(LazySingleton.class);
 
-            Jerseys.addResource(binder, ShuffleResource.class);
-
             LifecycleModule.register(binder, Server.class);
 
             bindNodeRoleAndAnnouncer(
@@ -184,6 +182,7 @@ public class CliMiddleManager extends ServerRunnable
             );
           }
         },
+        new ShuffleModule(),
         new IndexingServiceFirehoseModule(),
         new IndexingServiceInputSourceModule(),
         new IndexingServiceTaskLogsModule(),
diff --git a/website/.spelling b/website/.spelling
index 5a4453e..343be4d 100644
--- a/website/.spelling
+++ b/website/.spelling
@@ -382,6 +382,7 @@ subsecond
 substring
 subtask
 subtasks
+supervisorTaskId
 symlink
 tiering
 timeseries


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to