This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new e5b40b0b8c3 Miscellaneous cleanup of load queue references (#16367)
e5b40b0b8c3 is described below

commit e5b40b0b8c3cd9f69975ef3976cbe87ee62e7ade
Author: Kashif Faraz <[email protected]>
AuthorDate: Thu May 2 15:59:50 2024 +0530

    Miscellaneous cleanup of load queue references (#16367)
    
    Changes:
    - Rename `DataSegmentChangeRequestAndStatus` to `DataSegmentChangeResponse`
    - Rename `SegmentLoadDropHandler.Status` to `SegmentChangeStatus`
    - Remove method `CoordinatorRunStats.getSnapshotAndReset()` as it was used 
only in
    load queue peon implementations. Using an atomic reference is much simpler.
    - Remove `ServerTestHelper.MAPPER`. Use existing 
`TestHelper.makeJsonMapper()` instead.
---
 integration-tests/k8s/tiny-cluster.yaml            |   1 -
 .../coordination/DataSegmentChangeResponse.java    |  64 +++++++++
 .../server/coordination/SegmentChangeStatus.java   |  82 ++++++++++++
 .../coordination/SegmentLoadDropHandler.java       | 148 ++++-----------------
 .../coordinator/loading/CuratorLoadQueuePeon.java  |   9 +-
 .../coordinator/loading/HttpLoadQueuePeon.java     |  31 +++--
 .../server/coordinator/loading/LoadQueuePeon.java  |   3 +-
 .../coordinator/stats/CoordinatorRunStats.java     |  27 ----
 .../druid/server/http/SegmentListerResource.java   |   7 +-
 .../server/metrics/HistoricalMetricsMonitor.java   |   2 +-
 .../druid/client/CachingClusteredClientTest.java   |   3 +-
 .../plumber/CustomVersioningPolicyTest.java        |   8 +-
 .../org/apache/druid/server/ServerTestHelper.java  |  38 ------
 .../coordination/SegmentLoadDropHandlerTest.java   |  37 +++---
 .../server/coordination/ZkCoordinatorTest.java     |   6 +-
 .../coordinator/CoordinatorRunStatsTest.java       |  31 -----
 ...PeonTest.java => CuratorLoadQueuePeonTest.java} |   4 +-
 .../coordinator/loading/HttpLoadQueuePeonTest.java |  24 ++--
 .../simulate/TestSegmentLoadingHttpClient.java     |  16 +--
 .../metrics/HistoricalMetricsMonitorTest.java      |   2 +-
 20 files changed, 251 insertions(+), 292 deletions(-)

diff --git a/integration-tests/k8s/tiny-cluster.yaml 
b/integration-tests/k8s/tiny-cluster.yaml
index 6db4dbf421b..0d44522c8ef 100644
--- a/integration-tests/k8s/tiny-cluster.yaml
+++ b/integration-tests/k8s/tiny-cluster.yaml
@@ -73,7 +73,6 @@ spec:
     druid.discovery.type=k8s
     druid.discovery.k8s.clusterIdentifier=druid-it
     druid.serverview.type=http
-    druid.coordinator.loadqueuepeon.type=http
     druid.indexer.runner.type=httpRemote
 
     # Metadata Store
diff --git 
a/server/src/main/java/org/apache/druid/server/coordination/DataSegmentChangeResponse.java
 
b/server/src/main/java/org/apache/druid/server/coordination/DataSegmentChangeResponse.java
new file mode 100644
index 00000000000..06625409ca7
--- /dev/null
+++ 
b/server/src/main/java/org/apache/druid/server/coordination/DataSegmentChangeResponse.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordination;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * Response of a {@link DataSegmentChangeRequest}. Contains the request itself
+ * and the result {@link SegmentChangeStatus}.
+ */
+public class DataSegmentChangeResponse
+{
+  private final DataSegmentChangeRequest request;
+  private final SegmentChangeStatus status;
+
+  @JsonCreator
+  public DataSegmentChangeResponse(
+      @JsonProperty("request") DataSegmentChangeRequest request,
+      @JsonProperty("status") SegmentChangeStatus status
+  )
+  {
+    this.request = request;
+    this.status = status;
+  }
+
+  @JsonProperty
+  public DataSegmentChangeRequest getRequest()
+  {
+    return request;
+  }
+
+  @JsonProperty
+  public SegmentChangeStatus getStatus()
+  {
+    return status;
+  }
+
+  @Override
+  public String toString()
+  {
+    return "DataSegmentChangeResponse{" +
+           "request=" + request +
+           ", status=" + status +
+           '}';
+  }
+}
diff --git 
a/server/src/main/java/org/apache/druid/server/coordination/SegmentChangeStatus.java
 
b/server/src/main/java/org/apache/druid/server/coordination/SegmentChangeStatus.java
new file mode 100644
index 00000000000..c19d1e7914a
--- /dev/null
+++ 
b/server/src/main/java/org/apache/druid/server/coordination/SegmentChangeStatus.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordination;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+
+import javax.annotation.Nullable;
+
+/**
+ * Contains {@link State} of a {@link DataSegmentChangeRequest} and failure
+ * message, if any.
+ */
+public class SegmentChangeStatus
+{
+  public enum State
+  {
+    SUCCESS, FAILED, PENDING
+  }
+
+  private final State state;
+  @Nullable
+  private final String failureCause;
+
+  public static final SegmentChangeStatus SUCCESS = new 
SegmentChangeStatus(State.SUCCESS, null);
+  public static final SegmentChangeStatus PENDING = new 
SegmentChangeStatus(State.PENDING, null);
+
+  public static SegmentChangeStatus failed(String cause)
+  {
+    return new SegmentChangeStatus(State.FAILED, cause);
+  }
+
+  @JsonCreator
+  private SegmentChangeStatus(
+      @JsonProperty("state") State state,
+      @JsonProperty("failureCause") @Nullable String failureCause
+  )
+  {
+    this.state = Preconditions.checkNotNull(state, "state must be non-null");
+    this.failureCause = failureCause;
+  }
+
+  @JsonProperty
+  public State getState()
+  {
+    return state;
+  }
+
+  @Nullable
+  @JsonProperty
+  public String getFailureCause()
+  {
+    return failureCause;
+  }
+
+  @Override
+  public String toString()
+  {
+    return "SegmentChangeStatus{" +
+           "state=" + state +
+           ", failureCause='" + failureCause + '\'' +
+           '}';
+  }
+}
diff --git 
a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java
 
b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java
index 636894d214f..791de9b55da 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java
@@ -19,11 +19,8 @@
 
 package org.apache.druid.server.coordination;
 
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
@@ -98,7 +95,7 @@ public class SegmentLoadDropHandler implements 
DataSegmentChangeHandler
   // Keep history of load/drop request status in a LRU cache to maintain 
idempotency if same request shows up
   // again and to return status of a completed request. Maximum size of this 
cache must be significantly greater
   // than number of pending load/drop requests. so that history is not lost 
too quickly.
-  private final Cache<DataSegmentChangeRequest, AtomicReference<Status>> 
requestStatuses;
+  private final Cache<DataSegmentChangeRequest, 
AtomicReference<SegmentChangeStatus>> requestStatuses;
   private final Object requestStatusesLock = new Object();
 
   // This is the list of unresolved futures returned to callers of 
processBatch(List<DataSegmentChangeRequest>)
@@ -320,7 +317,7 @@ public class SegmentLoadDropHandler implements 
DataSegmentChangeHandler
   @Override
   public void addSegment(DataSegment segment, @Nullable 
DataSegmentChangeCallback callback)
   {
-    Status result = null;
+    SegmentChangeStatus result = null;
     try {
       log.info("Loading segment %s", segment.getId());
       /*
@@ -349,13 +346,13 @@ public class SegmentLoadDropHandler implements 
DataSegmentChangeHandler
         throw new SegmentLoadingException(e, "Failed to announce segment[%s]", 
segment.getId());
       }
 
-      result = Status.SUCCESS;
+      result = SegmentChangeStatus.SUCCESS;
     }
     catch (Throwable e) {
       log.makeAlert(e, "Failed to load segment for dataSource")
          .addData("segment", segment)
          .emit();
-      result = Status.failed(e.toString());
+      result = SegmentChangeStatus.failed(e.toString());
     }
     finally {
       updateRequestStatus(new SegmentChangeRequestLoad(segment), result);
@@ -466,7 +463,7 @@ public class SegmentLoadDropHandler implements 
DataSegmentChangeHandler
       final boolean scheduleDrop
   )
   {
-    Status result = null;
+    SegmentChangeStatus result = null;
     try {
       announcer.unannounceSegment(segment);
       segmentsToDelete.add(segment);
@@ -506,13 +503,13 @@ public class SegmentLoadDropHandler implements 
DataSegmentChangeHandler
         runnable.run();
       }
 
-      result = Status.SUCCESS;
+      result = SegmentChangeStatus.SUCCESS;
     }
     catch (Exception e) {
       log.makeAlert(e, "Failed to remove segment")
          .addData("segment", segment)
          .emit();
-      result = Status.failed(e.getMessage());
+      result = SegmentChangeStatus.failed(e.getMessage());
     }
     finally {
       updateRequestStatus(new SegmentChangeRequestDrop(segment), result);
@@ -522,20 +519,20 @@ public class SegmentLoadDropHandler implements 
DataSegmentChangeHandler
     }
   }
 
-  public Collection<DataSegment> getPendingDeleteSnapshot()
+  public Collection<DataSegment> getSegmentsToDelete()
   {
     return ImmutableList.copyOf(segmentsToDelete);
   }
 
-  public ListenableFuture<List<DataSegmentChangeRequestAndStatus>> 
processBatch(List<DataSegmentChangeRequest> changeRequests)
+  public ListenableFuture<List<DataSegmentChangeResponse>> 
processBatch(List<DataSegmentChangeRequest> changeRequests)
   {
     boolean isAnyRequestDone = false;
 
-    Map<DataSegmentChangeRequest, AtomicReference<Status>> statuses = 
Maps.newHashMapWithExpectedSize(changeRequests.size());
+    Map<DataSegmentChangeRequest, AtomicReference<SegmentChangeStatus>> 
statuses = Maps.newHashMapWithExpectedSize(changeRequests.size());
 
     for (DataSegmentChangeRequest cr : changeRequests) {
-      AtomicReference<Status> status = processRequest(cr);
-      if (status.get().getState() != Status.STATE.PENDING) {
+      AtomicReference<SegmentChangeStatus> status = processRequest(cr);
+      if (status.get().getState() != SegmentChangeStatus.State.PENDING) {
         isAnyRequestDone = true;
       }
       statuses.put(cr, status);
@@ -554,20 +551,20 @@ public class SegmentLoadDropHandler implements 
DataSegmentChangeHandler
     return future;
   }
 
-  private AtomicReference<Status> processRequest(DataSegmentChangeRequest 
changeRequest)
+  private AtomicReference<SegmentChangeStatus> 
processRequest(DataSegmentChangeRequest changeRequest)
   {
     synchronized (requestStatusesLock) {
-      AtomicReference<Status> status = 
requestStatuses.getIfPresent(changeRequest);
+      AtomicReference<SegmentChangeStatus> status = 
requestStatuses.getIfPresent(changeRequest);
 
       // If last load/drop request status is failed, here can try that again
-      if (status == null || status.get().getState() == Status.STATE.FAILED) {
+      if (status == null || status.get().getState() == 
SegmentChangeStatus.State.FAILED) {
         changeRequest.go(
             new DataSegmentChangeHandler()
             {
               @Override
               public void addSegment(DataSegment segment, 
DataSegmentChangeCallback callback)
               {
-                requestStatuses.put(changeRequest, new 
AtomicReference<>(Status.PENDING));
+                requestStatuses.put(changeRequest, new 
AtomicReference<>(SegmentChangeStatus.PENDING));
                 exec.submit(
                     () -> SegmentLoadDropHandler.this.addSegment(
                         ((SegmentChangeRequestLoad) 
changeRequest).getSegment(),
@@ -579,7 +576,7 @@ public class SegmentLoadDropHandler implements 
DataSegmentChangeHandler
               @Override
               public void removeSegment(DataSegment segment, 
DataSegmentChangeCallback callback)
               {
-                requestStatuses.put(changeRequest, new 
AtomicReference<>(Status.PENDING));
+                requestStatuses.put(changeRequest, new 
AtomicReference<>(SegmentChangeStatus.PENDING));
                 SegmentLoadDropHandler.this.removeSegment(
                     ((SegmentChangeRequestDrop) changeRequest).getSegment(),
                     () -> resolveWaitingFutures(),
@@ -589,7 +586,7 @@ public class SegmentLoadDropHandler implements 
DataSegmentChangeHandler
             },
             this::resolveWaitingFutures
         );
-      } else if (status.get().getState() == Status.STATE.SUCCESS) {
+      } else if (status.get().getState() == SegmentChangeStatus.State.SUCCESS) 
{
         // SUCCESS case, we'll clear up the cached success while serving it to 
this client
         // Not doing this can lead to an incorrect response to upcoming 
clients for a reload
         requestStatuses.invalidate(changeRequest);
@@ -599,13 +596,13 @@ public class SegmentLoadDropHandler implements 
DataSegmentChangeHandler
     }
   }
 
-  private void updateRequestStatus(DataSegmentChangeRequest changeRequest, 
Status result)
+  private void updateRequestStatus(DataSegmentChangeRequest changeRequest, 
SegmentChangeStatus result)
   {
     if (result == null) {
-      result = Status.failed("Unknown reason. Check server logs.");
+      result = SegmentChangeStatus.failed("Unknown reason. Check server 
logs.");
     }
     synchronized (requestStatusesLock) {
-      AtomicReference<Status> statusRef = 
requestStatuses.getIfPresent(changeRequest);
+      AtomicReference<SegmentChangeStatus> statusRef = 
requestStatuses.getIfPresent(changeRequest);
       if (statusRef != null) {
         statusRef.set(result);
       }
@@ -773,14 +770,14 @@ public class SegmentLoadDropHandler implements 
DataSegmentChangeHandler
   }
 
   // Future with cancel() implementation to remove it from "waitingFutures" 
list
-  private class CustomSettableFuture extends 
AbstractFuture<List<DataSegmentChangeRequestAndStatus>>
+  private class CustomSettableFuture extends 
AbstractFuture<List<DataSegmentChangeResponse>>
   {
     private final LinkedHashSet<CustomSettableFuture> waitingFutures;
-    private final Map<DataSegmentChangeRequest, AtomicReference<Status>> 
statusRefs;
+    private final Map<DataSegmentChangeRequest, 
AtomicReference<SegmentChangeStatus>> statusRefs;
 
     private CustomSettableFuture(
         LinkedHashSet<CustomSettableFuture> waitingFutures,
-        Map<DataSegmentChangeRequest, AtomicReference<Status>> statusRefs
+        Map<DataSegmentChangeRequest, AtomicReference<SegmentChangeStatus>> 
statusRefs
     )
     {
       this.waitingFutures = waitingFutures;
@@ -794,14 +791,14 @@ public class SegmentLoadDropHandler implements 
DataSegmentChangeHandler
           return;
         }
 
-        final List<DataSegmentChangeRequestAndStatus> result = new 
ArrayList<>(statusRefs.size());
+        final List<DataSegmentChangeResponse> result = new 
ArrayList<>(statusRefs.size());
         statusRefs.forEach((request, statusRef) -> {
           // Remove complete statuses from the cache
-          final Status status = statusRef.get();
-          if (status != null && status.getState() != Status.STATE.PENDING) {
+          final SegmentChangeStatus status = statusRef.get();
+          if (status != null && status.getState() != 
SegmentChangeStatus.State.PENDING) {
             requestStatuses.invalidate(request);
           }
-          result.add(new DataSegmentChangeRequestAndStatus(request, status));
+          result.add(new DataSegmentChangeResponse(request, status));
         });
 
         set(result);
@@ -818,94 +815,5 @@ public class SegmentLoadDropHandler implements 
DataSegmentChangeHandler
     }
   }
 
-  public static class Status
-  {
-    public enum STATE
-    {
-      SUCCESS, FAILED, PENDING
-    }
-
-    private final STATE state;
-    @Nullable
-    private final String failureCause;
-
-    public static final Status SUCCESS = new Status(STATE.SUCCESS, null);
-    public static final Status PENDING = new Status(STATE.PENDING, null);
-
-    @JsonCreator
-    Status(
-        @JsonProperty("state") STATE state,
-        @JsonProperty("failureCause") @Nullable String failureCause
-    )
-    {
-      Preconditions.checkNotNull(state, "state must be non-null");
-      this.state = state;
-      this.failureCause = failureCause;
-    }
-
-    public static Status failed(String cause)
-    {
-      return new Status(STATE.FAILED, cause);
-    }
-
-    @JsonProperty
-    public STATE getState()
-    {
-      return state;
-    }
-
-    @Nullable
-    @JsonProperty
-    public String getFailureCause()
-    {
-      return failureCause;
-    }
-
-    @Override
-    public String toString()
-    {
-      return "Status{" +
-             "state=" + state +
-             ", failureCause='" + failureCause + '\'' +
-             '}';
-    }
-  }
-
-  public static class DataSegmentChangeRequestAndStatus
-  {
-    private final DataSegmentChangeRequest request;
-    private final Status status;
-
-    @JsonCreator
-    public DataSegmentChangeRequestAndStatus(
-        @JsonProperty("request") DataSegmentChangeRequest request,
-        @JsonProperty("status") Status status
-    )
-    {
-      this.request = request;
-      this.status = status;
-    }
-
-    @JsonProperty
-    public DataSegmentChangeRequest getRequest()
-    {
-      return request;
-    }
-
-    @JsonProperty
-    public Status getStatus()
-    {
-      return status;
-    }
-
-    @Override
-    public String toString()
-    {
-      return "DataSegmentChangeRequestAndStatus{" +
-             "request=" + request +
-             ", status=" + status +
-             '}';
-    }
-  }
 }
 
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/loading/CuratorLoadQueuePeon.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/loading/CuratorLoadQueuePeon.java
index a508251ab58..52b012a81d4 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/loading/CuratorLoadQueuePeon.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/loading/CuratorLoadQueuePeon.java
@@ -47,6 +47,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * Use {@link HttpLoadQueuePeon} instead.
@@ -75,7 +76,7 @@ public class CuratorLoadQueuePeon implements LoadQueuePeon
   private final Duration loadTimeout;
 
   private final AtomicLong queuedSize = new AtomicLong(0);
-  private final CoordinatorRunStats stats = new CoordinatorRunStats();
+  private final AtomicReference<CoordinatorRunStats> stats = new 
AtomicReference<>(new CoordinatorRunStats());
 
   /**
    * Needs to be thread safe since it can be concurrently accessed via
@@ -172,7 +173,7 @@ public class CuratorLoadQueuePeon implements LoadQueuePeon
   @Override
   public CoordinatorRunStats getAndResetStats()
   {
-    return stats.getSnapshotAndReset();
+    return stats.getAndSet(new CoordinatorRunStats());
   }
 
   @Override
@@ -360,7 +361,7 @@ public class CuratorLoadQueuePeon implements LoadQueuePeon
 
     timedOutSegments.clear();
     queuedSize.set(0L);
-    stats.clear();
+    stats.get().clear();
   }
 
   private void onZkNodeDeleted(SegmentHolder segmentHolder, String path)
@@ -388,7 +389,7 @@ public class CuratorLoadQueuePeon implements LoadQueuePeon
     if (e != null) {
       log.error(e, "Server[%s], throwable caught when submitting [%s].", 
basePath, segmentHolder);
     }
-    stats.add(Stats.SegmentQueue.FAILED_ACTIONS, 1);
+    stats.get().add(Stats.SegmentQueue.FAILED_ACTIONS, 1);
 
     if (handleTimeout) {
       // Avoid removing the segment entry from the load/drop list in case 
config.getLoadTimeoutDelay() expires.
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java
index 4ec6f2427a3..cb32f95516b 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java
@@ -35,7 +35,8 @@ import org.apache.druid.java.util.http.client.Request;
 import org.apache.druid.server.coordination.DataSegmentChangeCallback;
 import org.apache.druid.server.coordination.DataSegmentChangeHandler;
 import org.apache.druid.server.coordination.DataSegmentChangeRequest;
-import org.apache.druid.server.coordination.SegmentLoadDropHandler;
+import org.apache.druid.server.coordination.DataSegmentChangeResponse;
+import org.apache.druid.server.coordination.SegmentChangeStatus;
 import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler;
 import org.apache.druid.server.coordinator.config.HttpLoadQueuePeonConfig;
 import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
@@ -66,6 +67,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 
 /**
  *
@@ -77,15 +79,15 @@ public class HttpLoadQueuePeon implements LoadQueuePeon
       {
       };
 
-  public static final 
TypeReference<List<SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus>> 
RESPONSE_ENTITY_TYPE_REF =
-      new 
TypeReference<List<SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus>>()
+  public static final TypeReference<List<DataSegmentChangeResponse>> 
RESPONSE_ENTITY_TYPE_REF =
+      new TypeReference<List<DataSegmentChangeResponse>>()
       {
       };
 
   private static final EmittingLogger log = new 
EmittingLogger(HttpLoadQueuePeon.class);
 
   private final AtomicLong queuedSize = new AtomicLong(0);
-  private final CoordinatorRunStats stats = new CoordinatorRunStats();
+  private final AtomicReference<CoordinatorRunStats> stats = new 
AtomicReference<>(new CoordinatorRunStats());
 
   private final ConcurrentMap<DataSegment, SegmentHolder> segmentsToLoad = new 
ConcurrentHashMap<>();
   private final ConcurrentMap<DataSegment, SegmentHolder> segmentsToDrop = new 
ConcurrentHashMap<>();
@@ -191,10 +193,7 @@ public class HttpLoadQueuePeon implements LoadQueuePeon
     if (newRequests.size() == 0) {
       log.trace(
           "[%s]Found no load/drop requests. SegmentsToLoad[%d], 
SegmentsToDrop[%d], batchSize[%d].",
-          serverId,
-          segmentsToLoad.size(),
-          segmentsToDrop.size(),
-          config.getBatchSize()
+          serverId, segmentsToLoad.size(), segmentsToDrop.size(), 
config.getBatchSize()
       );
       mainLoopInProgress.set(false);
       return;
@@ -225,7 +224,7 @@ public class HttpLoadQueuePeon implements LoadQueuePeon
                   log.trace("Received NO CONTENT reseponse from [%s]", 
serverId);
                 } else if (HttpServletResponse.SC_OK == 
responseHandler.getStatus()) {
                   try {
-                    
List<SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus> statuses =
+                    List<DataSegmentChangeResponse> statuses =
                         jsonMapper.readValue(result, RESPONSE_ENTITY_TYPE_REF);
                     log.trace("Server[%s] returned status response [%s].", 
serverId, statuses);
                     synchronized (lock) {
@@ -235,7 +234,7 @@ public class HttpLoadQueuePeon implements LoadQueuePeon
                         return;
                       }
 
-                      for 
(SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus e : statuses) {
+                      for (DataSegmentChangeResponse e : statuses) {
                         switch (e.getStatus().getState()) {
                           case SUCCESS:
                           case FAILED:
@@ -300,7 +299,7 @@ public class HttpLoadQueuePeon implements LoadQueuePeon
     }
   }
 
-  private void handleResponseStatus(DataSegmentChangeRequest changeRequest, 
SegmentLoadDropHandler.Status status)
+  private void handleResponseStatus(DataSegmentChangeRequest changeRequest, 
SegmentChangeStatus status)
   {
     changeRequest.go(
         new DataSegmentChangeHandler()
@@ -317,7 +316,7 @@ public class HttpLoadQueuePeon implements LoadQueuePeon
             updateSuccessOrFailureInHolder(segmentsToDrop.remove(segment), 
status);
           }
 
-          private void updateSuccessOrFailureInHolder(SegmentHolder holder, 
SegmentLoadDropHandler.Status status)
+          private void updateSuccessOrFailureInHolder(SegmentHolder holder, 
SegmentChangeStatus status)
           {
             if (holder == null) {
               return;
@@ -325,7 +324,7 @@ public class HttpLoadQueuePeon implements LoadQueuePeon
 
             queuedSegments.remove(holder);
             activeRequestSegments.remove(holder.getSegment());
-            if (status.getState() == 
SegmentLoadDropHandler.Status.STATE.FAILED) {
+            if (status.getState() == SegmentChangeStatus.State.FAILED) {
               onRequestFailed(holder, status.getFailureCause());
             } else {
               onRequestCompleted(holder, RequestStatus.SUCCESS);
@@ -380,7 +379,7 @@ public class HttpLoadQueuePeon implements LoadQueuePeon
       queuedSegments.clear();
       activeRequestSegments.clear();
       queuedSize.set(0L);
-      stats.clear();
+      stats.get().clear();
     }
   }
 
@@ -485,7 +484,7 @@ public class HttpLoadQueuePeon implements LoadQueuePeon
   @Override
   public CoordinatorRunStats getAndResetStats()
   {
-    return stats.getSnapshotAndReset();
+    return stats.getAndSet(new CoordinatorRunStats());
   }
 
   @Override
@@ -547,7 +546,7 @@ public class HttpLoadQueuePeon implements LoadQueuePeon
   {
     RowKey rowKey = RowKey.with(Dimension.DATASOURCE, 
holder.getSegment().getDataSource())
                           .and(Dimension.DESCRIPTION, 
holder.getAction().name());
-    stats.add(status.datasourceStat, rowKey, 1);
+    stats.get().add(status.datasourceStat, rowKey, 1);
   }
 
   private void executeCallbacks(SegmentHolder holder, boolean success)
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/loading/LoadQueuePeon.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/loading/LoadQueuePeon.java
index 3ab6d30faf5..49e5f9a7c08 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/loading/LoadQueuePeon.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/loading/LoadQueuePeon.java
@@ -25,8 +25,7 @@ import org.apache.druid.timeline.DataSegment;
 import java.util.Set;
 
 /**
- * This interface exists only to support configurable load queue management 
via curator or http. Once HttpLoadQueuePeon
- * has been verified enough in production, CuratorLoadQueuePeon and this 
interface would be removed.
+ * Supports load queue management.
  */
 @Deprecated
 public interface LoadQueuePeon
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/stats/CoordinatorRunStats.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/stats/CoordinatorRunStats.java
index cbee9744524..5412230c2cc 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/stats/CoordinatorRunStats.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/stats/CoordinatorRunStats.java
@@ -26,10 +26,8 @@ import javax.annotation.concurrent.ThreadSafe;
 import java.util.Collections;
 import java.util.EnumMap;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Map;
 import java.util.Objects;
-import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -223,31 +221,6 @@ public class CoordinatorRunStats
             .mergeLong(stat, value, Math::max);
   }
 
-  /**
-   * Creates a new {@code CoordinatorRunStats} which represents the snapshot of
-   * the stats collected so far in this instance.
-   * <p>
-   * While this method is in progress, any updates made to the stats of this
-   * instance by another thread are not guaranteed to be present in the 
snapshot.
-   * But the snapshots are consistent, i.e. stats present in the snapshot 
created
-   * in one invocation of this method are permanently removed from this 
instance
-   * and will not be present in subsequent snapshots.
-   *
-   * @return Snapshot of the current state of this {@code CoordinatorRunStats}.
-   */
-  public CoordinatorRunStats getSnapshotAndReset()
-  {
-    final CoordinatorRunStats snapshot = new 
CoordinatorRunStats(debugDimensions);
-
-    // Get a snapshot of all the keys, remove and copy each of them atomically
-    final Set<RowKey> keys = new HashSet<>(allStats.keySet());
-    for (RowKey key : keys) {
-      snapshot.allStats.put(key, allStats.remove(key));
-    }
-
-    return snapshot;
-  }
-
   /**
    * Checks if the given rowKey has any of the debug dimensions.
    */
diff --git 
a/server/src/main/java/org/apache/druid/server/http/SegmentListerResource.java 
b/server/src/main/java/org/apache/druid/server/http/SegmentListerResource.java
index 1b6f8326773..1486355a776 100644
--- 
a/server/src/main/java/org/apache/druid/server/http/SegmentListerResource.java
+++ 
b/server/src/main/java/org/apache/druid/server/http/SegmentListerResource.java
@@ -35,6 +35,7 @@ import 
org.apache.druid.server.coordination.BatchDataSegmentAnnouncer;
 import org.apache.druid.server.coordination.ChangeRequestHistory;
 import org.apache.druid.server.coordination.ChangeRequestsSnapshot;
 import org.apache.druid.server.coordination.DataSegmentChangeRequest;
+import org.apache.druid.server.coordination.DataSegmentChangeResponse;
 import org.apache.druid.server.coordination.SegmentLoadDropHandler;
 import org.apache.druid.server.coordinator.loading.HttpLoadQueuePeon;
 import org.apache.druid.server.http.security.StateResourceFilter;
@@ -250,7 +251,7 @@ public class SegmentListerResource
     }
 
     final ResponseContext context = createContext(req.getHeader("Accept"));
-    final 
ListenableFuture<List<SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus>>
 future =
+    final ListenableFuture<List<DataSegmentChangeResponse>> future =
         loadDropRequestHandler.processBatch(changeRequestList);
 
     final AsyncContext asyncContext = req.startAsync();
@@ -286,10 +287,10 @@ public class SegmentListerResource
 
     Futures.addCallback(
         future,
-        new 
FutureCallback<List<SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus>>()
+        new FutureCallback<List<DataSegmentChangeResponse>>()
         {
           @Override
-          public void 
onSuccess(List<SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus> result)
+          public void onSuccess(List<DataSegmentChangeResponse> result)
           {
             try {
               HttpServletResponse response = (HttpServletResponse) 
asyncContext.getResponse();
diff --git 
a/server/src/main/java/org/apache/druid/server/metrics/HistoricalMetricsMonitor.java
 
b/server/src/main/java/org/apache/druid/server/metrics/HistoricalMetricsMonitor.java
index ce21a2cef91..e7d6a74275b 100644
--- 
a/server/src/main/java/org/apache/druid/server/metrics/HistoricalMetricsMonitor.java
+++ 
b/server/src/main/java/org/apache/druid/server/metrics/HistoricalMetricsMonitor.java
@@ -58,7 +58,7 @@ public class HistoricalMetricsMonitor extends AbstractMonitor
 
     final Object2LongOpenHashMap<String> pendingDeleteSizes = new 
Object2LongOpenHashMap<>();
 
-    for (DataSegment segment : segmentLoadDropMgr.getPendingDeleteSnapshot()) {
+    for (DataSegment segment : segmentLoadDropMgr.getSegmentsToDelete()) {
       pendingDeleteSizes.addTo(segment.getDataSource(), segment.getSize());
     }
 
diff --git 
a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java 
b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java
index 818c6b77a53..d7464e25a6c 100644
--- 
a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java
+++ 
b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java
@@ -116,7 +116,6 @@ import org.apache.druid.query.topn.TopNQueryQueryToolChest;
 import org.apache.druid.query.topn.TopNResultValue;
 import org.apache.druid.segment.TestHelper;
 import org.apache.druid.server.QueryScheduler;
-import org.apache.druid.server.ServerTestHelper;
 import org.apache.druid.server.coordination.ServerType;
 import org.apache.druid.server.initialization.ServerConfig;
 import org.apache.druid.server.metrics.NoopServiceEmitter;
@@ -1753,7 +1752,7 @@ public class CachingClusteredClientTest
             partitions,
             partitionDimensions,
             partitionFunction,
-            ServerTestHelper.MAPPER
+            TestHelper.makeJsonMapper()
         ),
         null,
         9,
diff --git 
a/server/src/test/java/org/apache/druid/segment/realtime/plumber/CustomVersioningPolicyTest.java
 
b/server/src/test/java/org/apache/druid/segment/realtime/plumber/CustomVersioningPolicyTest.java
index 2def644cf5a..f145b0e1d79 100644
--- 
a/server/src/test/java/org/apache/druid/segment/realtime/plumber/CustomVersioningPolicyTest.java
+++ 
b/server/src/test/java/org/apache/druid/segment/realtime/plumber/CustomVersioningPolicyTest.java
@@ -19,7 +19,8 @@
 
 package org.apache.druid.segment.realtime.plumber;
 
-import org.apache.druid.server.ServerTestHelper;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.segment.TestHelper;
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
 import org.joda.time.Interval;
@@ -37,8 +38,9 @@ public class CustomVersioningPolicyTest
 
     CustomVersioningPolicy policy = new CustomVersioningPolicy(version);
 
-    CustomVersioningPolicy serialized = ServerTestHelper.MAPPER.readValue(
-        ServerTestHelper.MAPPER.writeValueAsBytes(policy),
+    final ObjectMapper mapper = TestHelper.makeJsonMapper();
+    CustomVersioningPolicy serialized = mapper.readValue(
+        mapper.writeValueAsBytes(policy),
         CustomVersioningPolicy.class
     );
 
diff --git a/server/src/test/java/org/apache/druid/server/ServerTestHelper.java 
b/server/src/test/java/org/apache/druid/server/ServerTestHelper.java
deleted file mode 100644
index 784b79a7665..00000000000
--- a/server/src/test/java/org/apache/druid/server/ServerTestHelper.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.server;
-
-import com.fasterxml.jackson.databind.InjectableValues;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.druid.jackson.DefaultObjectMapper;
-import org.apache.druid.timeline.DataSegment.PruneSpecsHolder;
-
-public class ServerTestHelper
-{
-  public static final ObjectMapper MAPPER = new DefaultObjectMapper();
-
-  static {
-    MAPPER.setInjectableValues(
-        new InjectableValues.Std()
-            .addValue(ObjectMapper.class.getName(), MAPPER)
-            .addValue(PruneSpecsHolder.class, PruneSpecsHolder.DEFAULT)
-    );
-  }
-}
diff --git 
a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java
 
b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java
index 7000dd54480..5e3fac5c44a 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java
@@ -39,8 +39,7 @@ import org.apache.druid.segment.loading.SegmentLoaderConfig;
 import org.apache.druid.segment.loading.StorageLocationConfig;
 import org.apache.druid.segment.realtime.appenderator.SegmentSchemas;
 import org.apache.druid.server.SegmentManager;
-import 
org.apache.druid.server.coordination.SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus;
-import 
org.apache.druid.server.coordination.SegmentLoadDropHandler.Status.STATE;
+import org.apache.druid.server.coordination.SegmentChangeStatus.State;
 import org.apache.druid.server.metrics.NoopServiceEmitter;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.partition.NoneShardSpec;
@@ -503,14 +502,14 @@ public class SegmentLoadDropHandlerTest
         new SegmentChangeRequestDrop(segment2)
     );
 
-    
ListenableFuture<List<SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus>>
 future = segmentLoadDropHandler
+    ListenableFuture<List<DataSegmentChangeResponse>> future = 
segmentLoadDropHandler
         .processBatch(batch);
 
-    Map<DataSegmentChangeRequest, SegmentLoadDropHandler.Status> 
expectedStatusMap = new HashMap<>();
-    expectedStatusMap.put(batch.get(0), SegmentLoadDropHandler.Status.PENDING);
-    expectedStatusMap.put(batch.get(1), SegmentLoadDropHandler.Status.SUCCESS);
-    List<SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus> result = 
future.get();
-    for (SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus 
requestAndStatus : result) {
+    Map<DataSegmentChangeRequest, SegmentChangeStatus> expectedStatusMap = new 
HashMap<>();
+    expectedStatusMap.put(batch.get(0), SegmentChangeStatus.PENDING);
+    expectedStatusMap.put(batch.get(1), SegmentChangeStatus.SUCCESS);
+    List<DataSegmentChangeResponse> result = future.get();
+    for (DataSegmentChangeResponse requestAndStatus : result) {
       
Assert.assertEquals(expectedStatusMap.get(requestAndStatus.getRequest()), 
requestAndStatus.getStatus());
     }
 
@@ -519,7 +518,7 @@ public class SegmentLoadDropHandlerTest
     }
 
     result = segmentLoadDropHandler.processBatch(ImmutableList.of(new 
SegmentChangeRequestLoad(segment1))).get();
-    Assert.assertEquals(SegmentLoadDropHandler.Status.SUCCESS, 
result.get(0).getStatus());
+    Assert.assertEquals(SegmentChangeStatus.SUCCESS, 
result.get(0).getStatus());
 
     segmentLoadDropHandler.stop();
   }
@@ -549,21 +548,21 @@ public class SegmentLoadDropHandlerTest
 
     List<DataSegmentChangeRequest> batch = ImmutableList.of(new 
SegmentChangeRequestLoad(segment1));
 
-    ListenableFuture<List<DataSegmentChangeRequestAndStatus>> future = 
segmentLoadDropHandler
+    ListenableFuture<List<DataSegmentChangeResponse>> future = 
segmentLoadDropHandler
         .processBatch(batch);
 
     for (Runnable runnable : scheduledRunnable) {
       runnable.run();
     }
-    List<DataSegmentChangeRequestAndStatus> result = future.get();
-    Assert.assertEquals(STATE.FAILED, result.get(0).getStatus().getState());
+    List<DataSegmentChangeResponse> result = future.get();
+    Assert.assertEquals(State.FAILED, result.get(0).getStatus().getState());
 
     future = segmentLoadDropHandler.processBatch(batch);
     for (Runnable runnable : scheduledRunnable) {
       runnable.run();
     }
     result = future.get();
-    Assert.assertEquals(STATE.SUCCESS, result.get(0).getStatus().getState());
+    Assert.assertEquals(State.SUCCESS, result.get(0).getStatus().getState());
 
     segmentLoadDropHandler.stop();
   }
@@ -597,13 +596,13 @@ public class SegmentLoadDropHandlerTest
     List<DataSegmentChangeRequest> batch = ImmutableList.of(new 
SegmentChangeRequestLoad(segment1));
 
     // Request 1: Load the segment
-    ListenableFuture<List<DataSegmentChangeRequestAndStatus>> future = 
segmentLoadDropHandler
+    ListenableFuture<List<DataSegmentChangeResponse>> future = 
segmentLoadDropHandler
         .processBatch(batch);
     for (Runnable runnable : scheduledRunnable) {
       runnable.run();
     }
-    List<DataSegmentChangeRequestAndStatus> result = future.get();
-    Assert.assertEquals(STATE.SUCCESS, result.get(0).getStatus().getState());
+    List<DataSegmentChangeResponse> result = future.get();
+    Assert.assertEquals(State.SUCCESS, result.get(0).getStatus().getState());
     scheduledRunnable.clear();
 
     // Request 2: Drop the segment
@@ -613,7 +612,7 @@ public class SegmentLoadDropHandlerTest
       runnable.run();
     }
     result = future.get();
-    Assert.assertEquals(STATE.SUCCESS, result.get(0).getStatus().getState());
+    Assert.assertEquals(State.SUCCESS, result.get(0).getStatus().getState());
     scheduledRunnable.clear();
 
     // check invocations after a load-drop sequence
@@ -633,7 +632,7 @@ public class SegmentLoadDropHandlerTest
       runnable.run();
     }
     result = future.get();
-    Assert.assertEquals(STATE.SUCCESS, result.get(0).getStatus().getState());
+    Assert.assertEquals(State.SUCCESS, result.get(0).getStatus().getState());
     scheduledRunnable.clear();
 
     // check invocations - 1 more load has happened
@@ -653,7 +652,7 @@ public class SegmentLoadDropHandlerTest
       runnable.run();
     }
     result = future.get();
-    Assert.assertEquals(STATE.SUCCESS, result.get(0).getStatus().getState());
+    Assert.assertEquals(State.SUCCESS, result.get(0).getStatus().getState());
     scheduledRunnable.clear();
 
     // check invocations - the load segment counter should bump up
diff --git 
a/server/src/test/java/org/apache/druid/server/coordination/ZkCoordinatorTest.java
 
b/server/src/test/java/org/apache/druid/server/coordination/ZkCoordinatorTest.java
index 8356a26d33a..fd0547517b8 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordination/ZkCoordinatorTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordination/ZkCoordinatorTest.java
@@ -28,11 +28,11 @@ import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.segment.IndexIO;
+import org.apache.druid.segment.TestHelper;
 import org.apache.druid.segment.loading.SegmentCacheManager;
 import org.apache.druid.segment.loading.SegmentLoaderConfig;
 import org.apache.druid.segment.loading.StorageLocationConfig;
 import org.apache.druid.server.SegmentManager;
-import org.apache.druid.server.ServerTestHelper;
 import org.apache.druid.server.initialization.ZkPathsConfig;
 import org.apache.druid.server.metrics.NoopServiceEmitter;
 import org.apache.druid.timeline.DataSegment;
@@ -59,7 +59,7 @@ public class ZkCoordinatorTest extends CuratorTestBase
 {
   private static final Logger log = new Logger(ZkCoordinatorTest.class);
 
-  private final ObjectMapper jsonMapper = ServerTestHelper.MAPPER;
+  private final ObjectMapper jsonMapper = TestHelper.makeJsonMapper();
   private final DruidServerMetadata me = new DruidServerMetadata(
       "dummyServer",
       "dummyHost",
@@ -135,7 +135,7 @@ public class ZkCoordinatorTest extends CuratorTestBase
     CountDownLatch dropLatch = new CountDownLatch(1);
 
     SegmentLoadDropHandler segmentLoadDropHandler = new SegmentLoadDropHandler(
-        ServerTestHelper.MAPPER,
+        jsonMapper,
         new SegmentLoaderConfig() {
           @Override
           public File getInfoDir()
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/CoordinatorRunStatsTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/CoordinatorRunStatsTest.java
index 6b4a16f1402..6b9dc844396 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/CoordinatorRunStatsTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/CoordinatorRunStatsTest.java
@@ -74,37 +74,6 @@ public class CoordinatorRunStatsTest
     Assert.assertEquals(1, stats.get(Stat.INFO_1, Key.TIER_1));
   }
 
-  @Test
-  public void testGetSnapshotAndReset()
-  {
-    stats.add(Stat.ERROR_1, 1);
-    stats.add(Stat.INFO_1, 3);
-    stats.add(Stat.ERROR_1, Key.TIER_1, 5);
-    stats.add(Stat.ERROR_1, Key.DUTY_1, 7);
-
-    final CoordinatorRunStats firstFlush = stats.getSnapshotAndReset();
-    Assert.assertEquals(1, firstFlush.get(Stat.ERROR_1));
-    Assert.assertEquals(3, firstFlush.get(Stat.INFO_1));
-    Assert.assertEquals(5, firstFlush.get(Stat.ERROR_1, Key.TIER_1));
-    Assert.assertEquals(7, firstFlush.get(Stat.ERROR_1, Key.DUTY_1));
-
-    Assert.assertEquals(0, stats.rowCount());
-
-    stats.add(Stat.ERROR_1, 7);
-    stats.add(Stat.ERROR_1, Key.TIER_1, 5);
-    stats.add(Stat.INFO_1, Key.DUTY_1, 3);
-    stats.add(Stat.INFO_2, Key.TIER_1, 1);
-
-    final CoordinatorRunStats secondFlush = stats.getSnapshotAndReset();
-
-    Assert.assertEquals(7, secondFlush.get(Stat.ERROR_1));
-    Assert.assertEquals(5, secondFlush.get(Stat.ERROR_1, Key.TIER_1));
-    Assert.assertEquals(3, secondFlush.get(Stat.INFO_1, Key.DUTY_1));
-    Assert.assertEquals(1, secondFlush.get(Stat.INFO_2, Key.TIER_1));
-
-    Assert.assertEquals(0, stats.rowCount());
-  }
-
   @Test
   public void testUpdateMax()
   {
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/loading/LoadQueuePeonTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/loading/CuratorLoadQueuePeonTest.java
similarity index 99%
rename from 
server/src/test/java/org/apache/druid/server/coordinator/loading/LoadQueuePeonTest.java
rename to 
server/src/test/java/org/apache/druid/server/coordinator/loading/CuratorLoadQueuePeonTest.java
index 3e1c816716c..e22b037707a 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/loading/LoadQueuePeonTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/loading/CuratorLoadQueuePeonTest.java
@@ -53,13 +53,13 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
-public class LoadQueuePeonTest extends CuratorTestBase
+public class CuratorLoadQueuePeonTest extends CuratorTestBase
 {
   private static final String LOAD_QUEUE_PATH = 
"/druid/loadqueue/localhost:1234";
 
   private final ObjectMapper jsonMapper = TestHelper.makeJsonMapper();
 
-  private LoadQueuePeon loadQueuePeon;
+  private CuratorLoadQueuePeon loadQueuePeon;
   private PathChildrenCache loadQueueCache;
 
   @Before
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeonTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeonTest.java
index de1801d7ad7..16251130e77 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeonTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeonTest.java
@@ -20,6 +20,7 @@
 package org.apache.druid.server.coordinator.loading;
 
 import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.druid.java.util.common.RE;
@@ -27,11 +28,12 @@ import 
org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.java.util.http.client.HttpClient;
 import org.apache.druid.java.util.http.client.Request;
 import org.apache.druid.java.util.http.client.response.HttpResponseHandler;
-import org.apache.druid.server.ServerTestHelper;
+import org.apache.druid.segment.TestHelper;
 import org.apache.druid.server.coordination.DataSegmentChangeCallback;
 import org.apache.druid.server.coordination.DataSegmentChangeHandler;
 import org.apache.druid.server.coordination.DataSegmentChangeRequest;
-import org.apache.druid.server.coordination.SegmentLoadDropHandler;
+import org.apache.druid.server.coordination.DataSegmentChangeResponse;
+import org.apache.druid.server.coordination.SegmentChangeStatus;
 import org.apache.druid.server.coordinator.CreateDataSegments;
 import org.apache.druid.server.coordinator.config.HttpLoadQueuePeonConfig;
 import org.apache.druid.server.coordinator.simulate.BlockingExecutorService;
@@ -63,6 +65,7 @@ import java.util.stream.Collectors;
  */
 public class HttpLoadQueuePeonTest
 {
+  private static final ObjectMapper MAPPER = TestHelper.makeJsonMapper();
   private final List<DataSegment> segments =
       CreateDataSegments.ofDatasource("test")
                         .forIntervals(1, Granularities.DAY)
@@ -87,7 +90,7 @@ public class HttpLoadQueuePeonTest
 
     httpLoadQueuePeon = new HttpLoadQueuePeon(
         "http://dummy:4000";,
-        ServerTestHelper.MAPPER,
+        MAPPER,
         httpClient,
         new HttpLoadQueuePeonConfig(null, null, 10),
         new WrappingScheduledExecutorService("HttpLoadQueuePeonTest-%s", 
processingExecutor, true),
@@ -313,23 +316,22 @@ public class HttpLoadQueuePeonTest
       httpResponse.setContent(ChannelBuffers.buffer(0));
       httpResponseHandler.handleResponse(httpResponse, null);
       try {
-        List<DataSegmentChangeRequest> changeRequests = 
ServerTestHelper.MAPPER.readValue(
+        List<DataSegmentChangeRequest> changeRequests = MAPPER.readValue(
             request.getContent().array(), new 
TypeReference<List<DataSegmentChangeRequest>>()
             {
             }
         );
 
-        List<SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus> 
statuses = new ArrayList<>(changeRequests.size());
+        List<DataSegmentChangeResponse> statuses = new 
ArrayList<>(changeRequests.size());
         for (DataSegmentChangeRequest cr : changeRequests) {
           cr.go(this, null);
-          statuses.add(new 
SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus(
-              cr,
-              SegmentLoadDropHandler.Status.SUCCESS
-          ));
+          statuses.add(
+              new DataSegmentChangeResponse(cr, SegmentChangeStatus.SUCCESS)
+          );
         }
-        return (ListenableFuture) Futures.immediateFuture(
+        return (ListenableFuture<Final>) Futures.immediateFuture(
             new ByteArrayInputStream(
-                ServerTestHelper.MAPPER
+                MAPPER
                     .writerWithType(HttpLoadQueuePeon.RESPONSE_ENTITY_TYPE_REF)
                     .writeValueAsBytes(statuses)
             )
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentLoadingHttpClient.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentLoadingHttpClient.java
index 0b91e700902..f7007bfd833 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentLoadingHttpClient.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentLoadingHttpClient.java
@@ -30,7 +30,8 @@ import 
org.apache.druid.java.util.http.client.response.HttpResponseHandler;
 import org.apache.druid.server.coordination.DataSegmentChangeCallback;
 import org.apache.druid.server.coordination.DataSegmentChangeHandler;
 import org.apache.druid.server.coordination.DataSegmentChangeRequest;
-import org.apache.druid.server.coordination.SegmentLoadDropHandler;
+import org.apache.druid.server.coordination.DataSegmentChangeResponse;
+import org.apache.druid.server.coordination.SegmentChangeStatus;
 import org.jboss.netty.buffer.ChannelBuffers;
 import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
 import org.jboss.netty.handler.codec.http.HttpResponse;
@@ -126,7 +127,7 @@ public class TestSegmentLoadingHttpClient implements 
HttpClient
   /**
    * Processes all the changes in the request.
    */
-  private List<SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus> 
processRequest(
+  private List<DataSegmentChangeResponse> processRequest(
       Request request,
       DataSegmentChangeHandler changeHandler
   ) throws IOException
@@ -147,21 +148,20 @@ public class TestSegmentLoadingHttpClient implements 
HttpClient
   /**
    * Processes each DataSegmentChangeRequest using the handler.
    */
-  private SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus 
processRequest(
+  private DataSegmentChangeResponse processRequest(
       DataSegmentChangeRequest request,
       DataSegmentChangeHandler handler
   )
   {
-    SegmentLoadDropHandler.Status status;
+    SegmentChangeStatus status;
     try {
       request.go(handler, NOOP_CALLBACK);
-      status = SegmentLoadDropHandler.Status.SUCCESS;
+      status = SegmentChangeStatus.SUCCESS;
     }
     catch (Exception e) {
-      status = SegmentLoadDropHandler.Status.failed(e.getMessage());
+      status = SegmentChangeStatus.failed(e.getMessage());
     }
 
-    return new SegmentLoadDropHandler
-        .DataSegmentChangeRequestAndStatus(request, status);
+    return new DataSegmentChangeResponse(request, status);
   }
 }
diff --git 
a/server/src/test/java/org/apache/druid/server/metrics/HistoricalMetricsMonitorTest.java
 
b/server/src/test/java/org/apache/druid/server/metrics/HistoricalMetricsMonitorTest.java
index 1806903a9ba..92c2a1064e7 100644
--- 
a/server/src/test/java/org/apache/druid/server/metrics/HistoricalMetricsMonitorTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/metrics/HistoricalMetricsMonitorTest.java
@@ -74,7 +74,7 @@ public class HistoricalMetricsMonitorTest extends 
EasyMockSupport
     final String tier = "tier";
 
     EasyMock.expect(druidServerConfig.getMaxSize()).andReturn(maxSize).once();
-    
EasyMock.expect(segmentLoadDropMgr.getPendingDeleteSnapshot()).andReturn(ImmutableList.of(dataSegment)).once();
+    
EasyMock.expect(segmentLoadDropMgr.getSegmentsToDelete()).andReturn(ImmutableList.of(dataSegment)).once();
     EasyMock.expect(druidServerConfig.getTier()).andReturn(tier).once();
     
EasyMock.expect(druidServerConfig.getPriority()).andReturn(priority).once();
     
EasyMock.expect(segmentManager.getDataSourceSizes()).andReturn(ImmutableMap.of(dataSource,
 size));


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to