This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new bf2be938a93 Refactor `SegmentLoadDropHandler` code (#16685)
bf2be938a93 is described below

commit bf2be938a93aa3819901bfcc65adac66ce2bc75a
Author: Abhishek Radhakrishnan <[email protected]>
AuthorDate: Sun Jul 7 20:59:55 2024 -0700

    Refactor `SegmentLoadDropHandler` code (#16685)
    
    Motivation:
    - Improve code hygeiene
    - Make `SegmentLoadDropHandler` easily extensible
    
    Changes:
    - Add `SegmentBootstrapper`
    - Move code for bootstrapping segments already cached on disk and fetched 
from coordinator to
    `SegmentBootstrapper`.
    - No functional change
    - Use separate executor service in `SegmentBootstrapper`
    - Bind `SegmentBootstrapper` to `ManageLifecycle` explicitly in 
`CliBroker`, `CliHistorical` etc.
---
 .../coordination/ChangeRequestsSnapshot.java       |   1 -
 .../server/coordination/SegmentBootstrapper.java   | 439 +++++++++++++++++++++
 .../coordination/SegmentChangeRequestDrop.java     |   1 -
 .../coordination/SegmentChangeRequestLoad.java     |   1 -
 .../coordination/SegmentLoadDropHandler.java       | 384 +-----------------
 .../druid/server/http/HistoricalResource.java      |  12 +-
 ...Test.java => SegmentBootstrapperCacheTest.java} |  51 ++-
 .../coordination/SegmentBootstrapperTest.java      | 306 ++++++++++++++
 .../coordination/SegmentLoadDropHandlerTest.java   | 383 ++----------------
 .../coordination/TestSegmentCacheManager.java      | 175 ++++++++
 .../server/coordination/ZkCoordinatorTest.java     |   9 +-
 .../main/java/org/apache/druid/cli/CliBroker.java  |   2 +
 .../java/org/apache/druid/cli/CliHistorical.java   |   2 +
 .../main/java/org/apache/druid/cli/CliIndexer.java |   2 +
 .../main/java/org/apache/druid/cli/CliPeon.java    |   2 +
 15 files changed, 1001 insertions(+), 769 deletions(-)

diff --git 
a/server/src/main/java/org/apache/druid/server/coordination/ChangeRequestsSnapshot.java
 
b/server/src/main/java/org/apache/druid/server/coordination/ChangeRequestsSnapshot.java
index 14113bed6b2..a86453df66d 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordination/ChangeRequestsSnapshot.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordination/ChangeRequestsSnapshot.java
@@ -24,7 +24,6 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
 
 import javax.annotation.Nullable;
-
 import java.util.List;
 
 /**
diff --git 
a/server/src/main/java/org/apache/druid/server/coordination/SegmentBootstrapper.java
 
b/server/src/main/java/org/apache/druid/server/coordination/SegmentBootstrapper.java
new file mode 100644
index 00000000000..c5b71fbcddc
--- /dev/null
+++ 
b/server/src/main/java/org/apache/druid/server/coordination/SegmentBootstrapper.java
@@ -0,0 +1,439 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordination;
+
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.SettableFuture;
+import com.google.inject.Inject;
+import org.apache.druid.client.BootstrapSegmentsResponse;
+import org.apache.druid.client.coordinator.CoordinatorClient;
+import org.apache.druid.common.guava.FutureUtils;
+import org.apache.druid.guice.ManageLifecycle;
+import org.apache.druid.guice.ServerTypeConfig;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.Stopwatch;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.segment.loading.SegmentLoaderConfig;
+import org.apache.druid.segment.loading.SegmentLoadingException;
+import org.apache.druid.server.SegmentManager;
+import org.apache.druid.timeline.DataSegment;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Responsible for bootstrapping segments already cached on disk and bootstrap 
segments fetched from the coordinator.
+ * Also responsible for announcing the node as a data server if applicable, 
once the bootstrapping operations
+ * are complete.
+ */
+@ManageLifecycle
+public class SegmentBootstrapper
+{
+  private final SegmentLoadDropHandler loadDropHandler;
+  private final SegmentLoaderConfig config;
+  private final DataSegmentAnnouncer segmentAnnouncer;
+  private final DataSegmentServerAnnouncer serverAnnouncer;
+  private final SegmentManager segmentManager;
+  private final ServerTypeConfig serverTypeConfig;
+  private final CoordinatorClient coordinatorClient;
+  private final ServiceEmitter emitter;
+
+  private volatile boolean isComplete = false;
+
+  // Synchronizes start/stop of this object.
+  private final Object startStopLock = new Object();
+
+  private static final EmittingLogger log = new 
EmittingLogger(SegmentBootstrapper.class);
+
+  @Inject
+  public SegmentBootstrapper(
+      SegmentLoadDropHandler loadDropHandler,
+      SegmentLoaderConfig config,
+      DataSegmentAnnouncer segmentAnnouncer,
+      DataSegmentServerAnnouncer serverAnnouncer,
+      SegmentManager segmentManager,
+      ServerTypeConfig serverTypeConfig,
+      CoordinatorClient coordinatorClient,
+      ServiceEmitter emitter
+  )
+  {
+    this.loadDropHandler = loadDropHandler;
+    this.config = config;
+    this.segmentAnnouncer = segmentAnnouncer;
+    this.serverAnnouncer = serverAnnouncer;
+    this.segmentManager = segmentManager;
+    this.serverTypeConfig = serverTypeConfig;
+    this.coordinatorClient = coordinatorClient;
+    this.emitter = emitter;
+  }
+
+  @LifecycleStart
+  public void start() throws IOException
+  {
+    synchronized (startStopLock) {
+      if (isComplete) {
+        return;
+      }
+
+      log.info("Starting...");
+      try {
+        if (segmentManager.canHandleSegments()) {
+          loadSegmentsOnStartup();
+        }
+
+        if (shouldAnnounce()) {
+          serverAnnouncer.announce();
+        }
+      }
+      catch (Exception e) {
+        Throwables.propagateIfPossible(e, IOException.class);
+        throw new RuntimeException(e);
+      }
+      isComplete = true;
+      log.info("Started.");
+    }
+  }
+
+  @LifecycleStop
+  public void stop()
+  {
+    synchronized (startStopLock) {
+      if (!isComplete) {
+        return;
+      }
+
+      log.info("Stopping...");
+      try {
+        if (shouldAnnounce()) {
+          serverAnnouncer.unannounce();
+        }
+      }
+      catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+      finally {
+        isComplete = false;
+      }
+      log.info("Stopped.");
+    }
+  }
+
+  public boolean isBootstrappingComplete()
+  {
+    return isComplete;
+  }
+
+  /**
+   * Bulk loading of the following segments into the page cache at startup:
+   * <li> Previously cached segments </li>
+   * <li> Bootstrap segments from the coordinator </li>
+   */
+  private void loadSegmentsOnStartup() throws IOException
+  {
+    final List<DataSegment> segmentsOnStartup = new ArrayList<>();
+    segmentsOnStartup.addAll(segmentManager.getCachedSegments());
+    segmentsOnStartup.addAll(getBootstrapSegments());
+
+    final Stopwatch stopwatch = Stopwatch.createStarted();
+
+    // Start a temporary thread pool to load segments into page cache during 
bootstrap
+    final ExecutorService bootstrapExecutor = Execs.multiThreaded(
+        config.getNumBootstrapThreads(), "Segment-Bootstrap-%s"
+    );
+
+    // Start a temporary scheduled executor for background segment announcing
+    final ScheduledExecutorService backgroundAnnouncerExecutor = 
Executors.newScheduledThreadPool(
+        config.getNumLoadingThreads(), 
Execs.makeThreadFactory("Background-Segment-Announcer-%s")
+    );
+
+    try (final BackgroundSegmentAnnouncer backgroundSegmentAnnouncer =
+             new BackgroundSegmentAnnouncer(segmentAnnouncer, 
backgroundAnnouncerExecutor, config.getAnnounceIntervalMillis())) {
+
+      backgroundSegmentAnnouncer.startAnnouncing();
+
+      final int numSegments = segmentsOnStartup.size();
+      final CountDownLatch latch = new CountDownLatch(numSegments);
+      final AtomicInteger counter = new AtomicInteger(0);
+      final CopyOnWriteArrayList<DataSegment> failedSegments = new 
CopyOnWriteArrayList<>();
+      for (final DataSegment segment : segmentsOnStartup) {
+        bootstrapExecutor.submit(
+            () -> {
+              try {
+                log.info(
+                    "Loading segment[%d/%d][%s]",
+                    counter.incrementAndGet(), numSegments, segment.getId()
+                );
+                try {
+                  segmentManager.loadSegmentOnBootstrap(
+                      segment,
+                      () -> loadDropHandler.removeSegment(segment, 
DataSegmentChangeCallback.NOOP, false)
+                  );
+                }
+                catch (Exception e) {
+                  loadDropHandler.removeSegment(segment, 
DataSegmentChangeCallback.NOOP, false);
+                  throw new SegmentLoadingException(e, "Exception loading 
segment[%s]", segment.getId());
+                }
+                try {
+                  backgroundSegmentAnnouncer.announceSegment(segment);
+                }
+                catch (InterruptedException e) {
+                  Thread.currentThread().interrupt();
+                  throw new SegmentLoadingException(e, "Loading Interrupted");
+                }
+              }
+              catch (SegmentLoadingException e) {
+                log.error(e, "[%s] failed to load", segment.getId());
+                failedSegments.add(segment);
+              }
+              finally {
+                latch.countDown();
+              }
+            }
+        );
+      }
+
+      try {
+        latch.await();
+
+        if (failedSegments.size() > 0) {
+          log.makeAlert("[%,d] errors seen while loading segments on startup", 
failedSegments.size())
+             .addData("failedSegments", failedSegments)
+             .emit();
+        }
+      }
+      catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        log.makeAlert(e, "LoadingInterrupted").emit();
+      }
+
+      backgroundSegmentAnnouncer.finishAnnouncing();
+    }
+    catch (SegmentLoadingException e) {
+      log.makeAlert(e, "Failed to load segments on startup -- likely problem 
with announcing.")
+         .addData("numSegments", segmentsOnStartup.size())
+         .emit();
+    }
+    finally {
+      bootstrapExecutor.shutdownNow();
+      backgroundAnnouncerExecutor.shutdownNow();
+      stopwatch.stop();
+      // At this stage, all tasks have been submitted, send a shutdown command 
to cleanup any resources alloted
+      // for the bootstrapping function.
+      segmentManager.shutdownBootstrap();
+      log.info("Loaded [%d] segments on startup in [%,d]ms.", 
segmentsOnStartup.size(), stopwatch.millisElapsed());
+    }
+  }
+
+  /**
+   * @return a list of bootstrap segments. When bootstrap segments cannot be 
found, an empty list is returned.
+   */
+  private List<DataSegment> getBootstrapSegments()
+  {
+    log.info("Fetching bootstrap segments from the coordinator.");
+    final Stopwatch stopwatch = Stopwatch.createStarted();
+
+    List<DataSegment> bootstrapSegments = new ArrayList<>();
+
+    try {
+      final BootstrapSegmentsResponse response =
+          FutureUtils.getUnchecked(coordinatorClient.fetchBootstrapSegments(), 
true);
+      bootstrapSegments = ImmutableList.copyOf(response.getIterator());
+    }
+    catch (Exception e) {
+      log.warn("Error fetching bootstrap segments from the coordinator: [%s]. 
", e.getMessage());
+    }
+    finally {
+      stopwatch.stop();
+      final long fetchRunMillis = stopwatch.millisElapsed();
+      emitter.emit(new 
ServiceMetricEvent.Builder().setMetric("segment/bootstrap/time", 
fetchRunMillis));
+      emitter.emit(new 
ServiceMetricEvent.Builder().setMetric("segment/bootstrap/count", 
bootstrapSegments.size()));
+      log.info("Fetched [%d] bootstrap segments in [%d]ms.", 
bootstrapSegments.size(), fetchRunMillis);
+    }
+
+    return bootstrapSegments;
+  }
+
+  /**
+   * Returns whether or not we should announce ourselves as a data server 
using {@link DataSegmentServerAnnouncer}.
+   *
+   * Returns true if _either_:
+   *
+   * <li> Our {@link #serverTypeConfig} indicates we are a segment server. 
This is necessary for Brokers to be able
+   * to detect that we exist.</li>
+   * <li> The segment manager is able to handle segments. This is necessary 
for Coordinators to be able to
+   * assign segments to us.</li>
+   */
+  private boolean shouldAnnounce()
+  {
+    return serverTypeConfig.getServerType().isSegmentServer() || 
segmentManager.canHandleSegments();
+  }
+
+  private static class BackgroundSegmentAnnouncer implements AutoCloseable
+  {
+    private static final EmittingLogger log = new 
EmittingLogger(BackgroundSegmentAnnouncer.class);
+
+    private final int announceIntervalMillis;
+    private final DataSegmentAnnouncer segmentAnnouncer;
+    private final ScheduledExecutorService exec;
+    private final LinkedBlockingQueue<DataSegment> queue;
+    private final SettableFuture<Boolean> doneAnnouncing;
+
+    private final Object lock = new Object();
+
+    private volatile boolean finished = false;
+    @Nullable
+    private volatile ScheduledFuture startedAnnouncing = null;
+    @Nullable
+    private volatile ScheduledFuture nextAnnoucement = null;
+
+    BackgroundSegmentAnnouncer(
+        DataSegmentAnnouncer segmentAnnouncer,
+        ScheduledExecutorService exec,
+        int announceIntervalMillis
+    )
+    {
+      this.segmentAnnouncer = segmentAnnouncer;
+      this.exec = exec;
+      this.announceIntervalMillis = announceIntervalMillis;
+      this.queue = new LinkedBlockingQueue<>();
+      this.doneAnnouncing = SettableFuture.create();
+    }
+
+    public void announceSegment(final DataSegment segment) throws 
InterruptedException
+    {
+      if (finished) {
+        throw new ISE("Announce segment called after finishAnnouncing");
+      }
+      queue.put(segment);
+    }
+
+    public void startAnnouncing()
+    {
+      if (announceIntervalMillis <= 0) {
+        log.info("Skipping background segment announcing as 
announceIntervalMillis is [%d].", announceIntervalMillis);
+        return;
+      }
+
+      log.info("Starting background segment announcing task");
+
+      // schedule background announcing task
+      nextAnnoucement = startedAnnouncing = exec.schedule(
+          new Runnable()
+          {
+            @Override
+            public void run()
+            {
+              synchronized (lock) {
+                try {
+                  if (!(finished && queue.isEmpty())) {
+                    final List<DataSegment> segments = new ArrayList<>();
+                    queue.drainTo(segments);
+                    try {
+                      segmentAnnouncer.announceSegments(segments);
+                      nextAnnoucement = exec.schedule(this, 
announceIntervalMillis, TimeUnit.MILLISECONDS);
+                    }
+                    catch (IOException e) {
+                      doneAnnouncing.setException(
+                          new SegmentLoadingException(e, "Failed to announce 
segments[%s]", segments)
+                      );
+                    }
+                  } else {
+                    doneAnnouncing.set(true);
+                  }
+                }
+                catch (Exception e) {
+                  doneAnnouncing.setException(e);
+                }
+              }
+            }
+          },
+          announceIntervalMillis,
+          TimeUnit.MILLISECONDS
+      );
+    }
+
+    public void finishAnnouncing() throws SegmentLoadingException
+    {
+      synchronized (lock) {
+        finished = true;
+        // announce any remaining segments
+        try {
+          final List<DataSegment> segments = new ArrayList<>();
+          queue.drainTo(segments);
+          segmentAnnouncer.announceSegments(segments);
+        }
+        catch (Exception e) {
+          throw new SegmentLoadingException(e, "Failed to announce 
segments[%s]", queue);
+        }
+
+        // get any exception that may have been thrown in background announcing
+        try {
+          // check in case intervalMillis is <= 0
+          if (startedAnnouncing != null) {
+            startedAnnouncing.cancel(false);
+          }
+          // - if the task is waiting on the lock, then the queue will be 
empty by the time it runs
+          // - if the task just released it, then the lock ensures any 
exception is set in doneAnnouncing
+          if (doneAnnouncing.isDone()) {
+            doneAnnouncing.get();
+          }
+        }
+        catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          throw new SegmentLoadingException(e, "Loading Interrupted");
+        }
+        catch (ExecutionException e) {
+          throw new SegmentLoadingException(e.getCause(), "Background 
Announcing Task Failed");
+        }
+      }
+      log.info("Completed background segment announcing");
+    }
+
+    @Override
+    public void close()
+    {
+      // stop background scheduling
+      synchronized (lock) {
+        finished = true;
+        if (nextAnnoucement != null) {
+          nextAnnoucement.cancel(false);
+        }
+      }
+    }
+  }
+}
diff --git 
a/server/src/main/java/org/apache/druid/server/coordination/SegmentChangeRequestDrop.java
 
b/server/src/main/java/org/apache/druid/server/coordination/SegmentChangeRequestDrop.java
index c4229a02880..ddee89b9763 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordination/SegmentChangeRequestDrop.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordination/SegmentChangeRequestDrop.java
@@ -26,7 +26,6 @@ import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.timeline.DataSegment;
 
 import javax.annotation.Nullable;
-
 import java.util.Objects;
 
 /**
diff --git 
a/server/src/main/java/org/apache/druid/server/coordination/SegmentChangeRequestLoad.java
 
b/server/src/main/java/org/apache/druid/server/coordination/SegmentChangeRequestLoad.java
index 130c7b50d80..1bb9997980c 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordination/SegmentChangeRequestLoad.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordination/SegmentChangeRequestLoad.java
@@ -26,7 +26,6 @@ import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.timeline.DataSegment;
 
 import javax.annotation.Nullable;
-
 import java.util.Objects;
 
 /**
diff --git 
a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java
 
b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java
index 4a9086ab572..12462adab2f 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java
@@ -20,28 +20,16 @@
 package org.apache.druid.server.coordination;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Throwables;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Maps;
 import com.google.common.util.concurrent.AbstractFuture;
 import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
 import com.google.inject.Inject;
-import org.apache.druid.client.BootstrapSegmentsResponse;
-import org.apache.druid.client.coordinator.CoordinatorClient;
-import org.apache.druid.common.guava.FutureUtils;
 import org.apache.druid.guice.ManageLifecycle;
-import org.apache.druid.guice.ServerTypeConfig;
-import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.java.util.common.Stopwatch;
 import org.apache.druid.java.util.common.concurrent.Execs;
-import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
-import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
 import org.apache.druid.java.util.emitter.EmittingLogger;
-import org.apache.druid.java.util.emitter.service.ServiceEmitter;
-import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
 import org.apache.druid.segment.loading.SegmentLoaderConfig;
 import org.apache.druid.segment.loading.SegmentLoadingException;
 import org.apache.druid.server.SegmentManager;
@@ -56,20 +44,13 @@ import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentSkipListSet;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
- *
+ * Responsible for loading and dropping of segments by a process that can 
serve segments.
  */
 @ManageLifecycle
 public class SegmentLoadDropHandler implements DataSegmentChangeHandler
@@ -79,20 +60,12 @@ public class SegmentLoadDropHandler implements 
DataSegmentChangeHandler
   // Synchronizes removals from segmentsToDelete
   private final Object segmentDeleteLock = new Object();
 
-  // Synchronizes start/stop of this object.
-  private final Object startStopLock = new Object();
-
   private final SegmentLoaderConfig config;
   private final DataSegmentAnnouncer announcer;
-  private final DataSegmentServerAnnouncer serverAnnouncer;
   private final SegmentManager segmentManager;
   private final ScheduledExecutorService exec;
-  private final ServerTypeConfig serverTypeConfig;
-  private final CoordinatorClient coordinatorClient;
-  private final ServiceEmitter emitter;
-  private final ConcurrentSkipListSet<DataSegment> segmentsToDelete;
 
-  private volatile boolean started = false;
+  private final ConcurrentSkipListSet<DataSegment> segmentsToDelete;
 
   // Keep history of load/drop request status in a LRU cache to maintain 
idempotency if same request shows up
   // again and to return status of a completed request. Maximum size of this 
cache must be significantly greater
@@ -108,25 +81,17 @@ public class SegmentLoadDropHandler implements 
DataSegmentChangeHandler
   public SegmentLoadDropHandler(
       SegmentLoaderConfig config,
       DataSegmentAnnouncer announcer,
-      DataSegmentServerAnnouncer serverAnnouncer,
-      SegmentManager segmentManager,
-      ServerTypeConfig serverTypeConfig,
-      CoordinatorClient coordinatorClient,
-      ServiceEmitter emitter
+      SegmentManager segmentManager
   )
   {
     this(
         config,
         announcer,
-        serverAnnouncer,
         segmentManager,
         Executors.newScheduledThreadPool(
             config.getNumLoadingThreads(),
             Execs.makeThreadFactory("SimpleDataSegmentChangeHandler-%s")
-        ),
-        serverTypeConfig,
-        coordinatorClient,
-        emitter
+        )
     );
   }
 
@@ -134,83 +99,19 @@ public class SegmentLoadDropHandler implements 
DataSegmentChangeHandler
   SegmentLoadDropHandler(
       SegmentLoaderConfig config,
       DataSegmentAnnouncer announcer,
-      DataSegmentServerAnnouncer serverAnnouncer,
       SegmentManager segmentManager,
-      ScheduledExecutorService exec,
-      ServerTypeConfig serverTypeConfig,
-      CoordinatorClient coordinatorClient,
-      ServiceEmitter emitter
+      ScheduledExecutorService exec
   )
   {
     this.config = config;
     this.announcer = announcer;
-    this.serverAnnouncer = serverAnnouncer;
     this.segmentManager = segmentManager;
     this.exec = exec;
-    this.serverTypeConfig = serverTypeConfig;
-    this.coordinatorClient = coordinatorClient;
-    this.emitter = emitter;
 
     this.segmentsToDelete = new ConcurrentSkipListSet<>();
     requestStatuses = 
CacheBuilder.newBuilder().maximumSize(config.getStatusQueueMaxSize()).initialCapacity(8).build();
   }
 
-  @LifecycleStart
-  public void start() throws IOException
-  {
-    synchronized (startStopLock) {
-      if (started) {
-        return;
-      }
-
-      log.info("Starting...");
-      try {
-        if (segmentManager.canHandleSegments()) {
-          loadSegmentsOnStartup();
-        }
-
-        if (shouldAnnounce()) {
-          serverAnnouncer.announce();
-        }
-      }
-      catch (Exception e) {
-        Throwables.propagateIfPossible(e, IOException.class);
-        throw new RuntimeException(e);
-      }
-      started = true;
-      log.info("Started.");
-    }
-  }
-
-  @LifecycleStop
-  public void stop()
-  {
-    synchronized (startStopLock) {
-      if (!started) {
-        return;
-      }
-
-      log.info("Stopping...");
-      try {
-        if (shouldAnnounce()) {
-          serverAnnouncer.unannounce();
-        }
-      }
-      catch (Exception e) {
-        throw new RuntimeException(e);
-      }
-      finally {
-        started = false;
-      }
-      log.info("Stopped.");
-    }
-  }
-
-  public boolean isStarted()
-  {
-    return started;
-  }
-
   public Map<String, Long> getAverageNumOfRowsPerSegmentForDatasource()
   {
     return segmentManager.getAverageRowCountForDatasource();
@@ -221,132 +122,6 @@ public class SegmentLoadDropHandler implements 
DataSegmentChangeHandler
     return segmentManager.getRowCountDistribution();
   }
 
-  /**
-   * Bulk loading of the following segments into the page cache at startup:
-   * <li> Previously cached segments </li>
-   * <li> Bootstrap segments from the coordinator </li>
-   */
-  private void loadSegmentsOnStartup() throws IOException
-  {
-    final List<DataSegment> segmentsOnStartup = new ArrayList<>();
-    segmentsOnStartup.addAll(segmentManager.getCachedSegments());
-    segmentsOnStartup.addAll(getBootstrapSegments());
-
-    final Stopwatch stopwatch = Stopwatch.createStarted();
-
-    // Start a temporary thread pool to load segments into page cache during 
bootstrap
-    final ExecutorService loadingExecutor = Execs.multiThreaded(
-        config.getNumBootstrapThreads(), "Segment-Load-Startup-%s"
-    );
-
-    try (final BackgroundSegmentAnnouncer backgroundSegmentAnnouncer =
-             new BackgroundSegmentAnnouncer(announcer, exec, 
config.getAnnounceIntervalMillis())) {
-
-      backgroundSegmentAnnouncer.startAnnouncing();
-
-      final int numSegments = segmentsOnStartup.size();
-      final CountDownLatch latch = new CountDownLatch(numSegments);
-      final AtomicInteger counter = new AtomicInteger(0);
-      final CopyOnWriteArrayList<DataSegment> failedSegments = new 
CopyOnWriteArrayList<>();
-      for (final DataSegment segment : segmentsOnStartup) {
-        loadingExecutor.submit(
-            () -> {
-              try {
-                log.info(
-                    "Loading segment[%d/%d][%s]",
-                    counter.incrementAndGet(), numSegments, segment.getId()
-                );
-                try {
-                  segmentManager.loadSegmentOnBootstrap(
-                      segment,
-                      () -> this.removeSegment(segment, 
DataSegmentChangeCallback.NOOP, false)
-                  );
-                }
-                catch (Exception e) {
-                  removeSegment(segment, DataSegmentChangeCallback.NOOP, 
false);
-                  throw new SegmentLoadingException(e, "Exception loading 
segment[%s]", segment.getId());
-                }
-                try {
-                  backgroundSegmentAnnouncer.announceSegment(segment);
-                }
-                catch (InterruptedException e) {
-                  Thread.currentThread().interrupt();
-                  throw new SegmentLoadingException(e, "Loading Interrupted");
-                }
-              }
-              catch (SegmentLoadingException e) {
-                log.error(e, "[%s] failed to load", segment.getId());
-                failedSegments.add(segment);
-              }
-              finally {
-                latch.countDown();
-              }
-            }
-        );
-      }
-
-      try {
-        latch.await();
-
-        if (failedSegments.size() > 0) {
-          log.makeAlert("[%,d] errors seen while loading segments on startup", 
failedSegments.size())
-             .addData("failedSegments", failedSegments)
-             .emit();
-        }
-      }
-      catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        log.makeAlert(e, "LoadingInterrupted").emit();
-      }
-
-      backgroundSegmentAnnouncer.finishAnnouncing();
-    }
-    catch (SegmentLoadingException e) {
-      log.makeAlert(e, "Failed to load segments on startup -- likely problem 
with announcing.")
-         .addData("numSegments", segmentsOnStartup.size())
-         .emit();
-    }
-    finally {
-      loadingExecutor.shutdownNow();
-      stopwatch.stop();
-      // At this stage, all tasks have been submitted, send a shutdown command 
to cleanup any resources alloted
-      // for the bootstrapping function.
-      segmentManager.shutdownBootstrap();
-      log.info("Loaded [%d] segments on startup in [%,d]ms.", 
segmentsOnStartup.size(), stopwatch.millisElapsed());
-    }
-  }
-
-  /**
-   * @return a list of bootstrap segments. When bootstrap segments cannot be 
found, an empty list is returned.
-   */
-  private List<DataSegment> getBootstrapSegments()
-  {
-    log.info("Fetching bootstrap segments from the coordinator.");
-    final Stopwatch stopwatch = Stopwatch.createStarted();
-
-    List<DataSegment> bootstrapSegments = new ArrayList<>();
-
-    try {
-      final BootstrapSegmentsResponse response =
-          FutureUtils.getUnchecked(coordinatorClient.fetchBootstrapSegments(), 
true);
-      bootstrapSegments = ImmutableList.copyOf(response.getIterator());
-    }
-    catch (Exception e) {
-      // By default, we "fail open" when there is any error -- finding the 
coordinator, or if the API endpoint cannot
-      // be found during rolling upgrades, or even if it's irrecoverable.
-      log.warn("Error fetching bootstrap segments from the coordinator: [%s]. 
", e.getMessage());
-    }
-    finally {
-      stopwatch.stop();
-      final long fetchRunMillis = stopwatch.millisElapsed();
-      emitter.emit(new 
ServiceMetricEvent.Builder().setMetric("segment/bootstrap/time", 
fetchRunMillis));
-      emitter.emit(new 
ServiceMetricEvent.Builder().setMetric("segment/bootstrap/count", 
bootstrapSegments.size()));
-      log.info("Fetched [%d] bootstrap segments in [%d]ms.", 
bootstrapSegments.size(), fetchRunMillis);
-    }
-
-    return bootstrapSegments;
-  }
-
   @Override
   public void addSegment(DataSegment segment, @Nullable 
DataSegmentChangeCallback callback)
   {
@@ -566,154 +341,6 @@ public class SegmentLoadDropHandler implements 
DataSegmentChangeHandler
     }
   }
 
-  /**
-   * Returns whether or not we should announce ourselves as a data server 
using {@link DataSegmentServerAnnouncer}.
-   *
-   * Returns true if _either_:
-   *
-   * <li> Our {@link #serverTypeConfig} indicates we are a segment server. 
This is necessary for Brokers to be able
-   * to detect that we exist.</li>
-   * <li> The segment manager is able to handle segments. This is necessary 
for Coordinators to be able to
-   * assign segments to us.</li>
-   */
-  private boolean shouldAnnounce()
-  {
-    return serverTypeConfig.getServerType().isSegmentServer() || 
segmentManager.canHandleSegments();
-  }
-
-  private static class BackgroundSegmentAnnouncer implements AutoCloseable
-  {
-    private static final EmittingLogger log = new 
EmittingLogger(BackgroundSegmentAnnouncer.class);
-
-    private final int intervalMillis;
-    private final DataSegmentAnnouncer announcer;
-    private final ScheduledExecutorService exec;
-    private final LinkedBlockingQueue<DataSegment> queue;
-    private final SettableFuture<Boolean> doneAnnouncing;
-
-    private final Object lock = new Object();
-
-    private volatile boolean finished = false;
-    @Nullable
-    private volatile ScheduledFuture startedAnnouncing = null;
-    @Nullable
-    private volatile ScheduledFuture nextAnnoucement = null;
-
-    public BackgroundSegmentAnnouncer(
-        DataSegmentAnnouncer announcer,
-        ScheduledExecutorService exec,
-        int intervalMillis
-    )
-    {
-      this.announcer = announcer;
-      this.exec = exec;
-      this.intervalMillis = intervalMillis;
-      this.queue = new LinkedBlockingQueue<>();
-      this.doneAnnouncing = SettableFuture.create();
-    }
-
-    public void announceSegment(final DataSegment segment) throws 
InterruptedException
-    {
-      if (finished) {
-        throw new ISE("Announce segment called after finishAnnouncing");
-      }
-      queue.put(segment);
-    }
-
-    public void startAnnouncing()
-    {
-      if (intervalMillis <= 0) {
-        return;
-      }
-
-      log.info("Starting background segment announcing task");
-
-      // schedule background announcing task
-      nextAnnoucement = startedAnnouncing = exec.schedule(
-          new Runnable()
-          {
-            @Override
-            public void run()
-            {
-              synchronized (lock) {
-                try {
-                  if (!(finished && queue.isEmpty())) {
-                    final List<DataSegment> segments = new ArrayList<>();
-                    queue.drainTo(segments);
-                    try {
-                      announcer.announceSegments(segments);
-                      nextAnnoucement = exec.schedule(this, intervalMillis, 
TimeUnit.MILLISECONDS);
-                    }
-                    catch (IOException e) {
-                      doneAnnouncing.setException(
-                          new SegmentLoadingException(e, "Failed to announce 
segments[%s]", segments)
-                      );
-                    }
-                  } else {
-                    doneAnnouncing.set(true);
-                  }
-                }
-                catch (Exception e) {
-                  doneAnnouncing.setException(e);
-                }
-              }
-            }
-          },
-          intervalMillis,
-          TimeUnit.MILLISECONDS
-      );
-    }
-
-    public void finishAnnouncing() throws SegmentLoadingException
-    {
-      synchronized (lock) {
-        finished = true;
-        // announce any remaining segments
-        try {
-          final List<DataSegment> segments = new ArrayList<>();
-          queue.drainTo(segments);
-          announcer.announceSegments(segments);
-        }
-        catch (Exception e) {
-          throw new SegmentLoadingException(e, "Failed to announce 
segments[%s]", queue);
-        }
-
-        // get any exception that may have been thrown in background announcing
-        try {
-          // check in case intervalMillis is <= 0
-          if (startedAnnouncing != null) {
-            startedAnnouncing.cancel(false);
-          }
-          // - if the task is waiting on the lock, then the queue will be 
empty by the time it runs
-          // - if the task just released it, then the lock ensures any 
exception is set in doneAnnouncing
-          if (doneAnnouncing.isDone()) {
-            doneAnnouncing.get();
-          }
-        }
-        catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-          throw new SegmentLoadingException(e, "Loading Interrupted");
-        }
-        catch (ExecutionException e) {
-          throw new SegmentLoadingException(e.getCause(), "Background 
Announcing Task Failed");
-        }
-      }
-      log.info("Completed background segment announcing");
-    }
-
-    @Override
-    public void close()
-    {
-      // stop background scheduling
-      synchronized (lock) {
-        finished = true;
-        if (nextAnnoucement != null) {
-          nextAnnoucement.cancel(false);
-        }
-      }
-    }
-  }
-
   // Future with cancel() implementation to remove it from "waitingFutures" 
list
   private class CustomSettableFuture extends 
AbstractFuture<List<DataSegmentChangeResponse>>
   {
@@ -759,6 +386,5 @@ public class SegmentLoadDropHandler implements 
DataSegmentChangeHandler
       return true;
     }
   }
-
 }
 
diff --git 
a/server/src/main/java/org/apache/druid/server/http/HistoricalResource.java 
b/server/src/main/java/org/apache/druid/server/http/HistoricalResource.java
index 4bc48f444df..223e1a2dea2 100644
--- a/server/src/main/java/org/apache/druid/server/http/HistoricalResource.java
+++ b/server/src/main/java/org/apache/druid/server/http/HistoricalResource.java
@@ -21,7 +21,7 @@ package org.apache.druid.server.http;
 
 import com.google.common.collect.ImmutableMap;
 import com.sun.jersey.spi.container.ResourceFilters;
-import org.apache.druid.server.coordination.SegmentLoadDropHandler;
+import org.apache.druid.server.coordination.SegmentBootstrapper;
 import org.apache.druid.server.http.security.StateResourceFilter;
 
 import javax.inject.Inject;
@@ -34,14 +34,14 @@ import javax.ws.rs.core.Response;
 @Path("/druid/historical/v1")
 public class HistoricalResource
 {
-  private final SegmentLoadDropHandler segmentLoadDropHandler;
+  private final SegmentBootstrapper segmentBootstrapper;
 
   @Inject
   public HistoricalResource(
-      SegmentLoadDropHandler segmentLoadDropHandler
+      SegmentBootstrapper segmentBootstrapper
   )
   {
-    this.segmentLoadDropHandler = segmentLoadDropHandler;
+    this.segmentBootstrapper = segmentBootstrapper;
   }
 
   @GET
@@ -50,14 +50,14 @@ public class HistoricalResource
   @Produces(MediaType.APPLICATION_JSON)
   public Response getLoadStatus()
   {
-    return Response.ok(ImmutableMap.of("cacheInitialized", 
segmentLoadDropHandler.isStarted())).build();
+    return Response.ok(ImmutableMap.of("cacheInitialized", 
segmentBootstrapper.isBootstrappingComplete())).build();
   }
 
   @GET
   @Path("/readiness")
   public Response getReadiness()
   {
-    if (segmentLoadDropHandler.isStarted()) {
+    if (segmentBootstrapper.isBootstrappingComplete()) {
       return Response.ok().build();
     } else {
       return Response.status(Response.Status.SERVICE_UNAVAILABLE).build();
diff --git 
a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerCacheTest.java
 
b/server/src/test/java/org/apache/druid/server/coordination/SegmentBootstrapperCacheTest.java
similarity index 86%
rename from 
server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerCacheTest.java
rename to 
server/src/test/java/org/apache/druid/server/coordination/SegmentBootstrapperCacheTest.java
index f6b1c39c59d..7629a6b875c 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerCacheTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordination/SegmentBootstrapperCacheTest.java
@@ -50,10 +50,10 @@ import java.util.Collections;
 import java.util.List;
 
 /**
- * Similar to {@link SegmentLoadDropHandlerTest}. This class includes tests 
that cover the
+ * Similar to {@link SegmentBootstrapperTest}. This class includes tests that 
cover the
  * storage location layer as well.
  */
-public class SegmentLoadDropHandlerCacheTest
+public class SegmentBootstrapperCacheTest
 {
   private static final long MAX_SIZE = 1000L;
   private static final long SEGMENT_SIZE = 100L;
@@ -101,8 +101,8 @@ public class SegmentLoadDropHandlerCacheTest
         objectMapper
     );
     segmentManager = new SegmentManager(cacheManager);
-    segmentAnnouncer = new TestDataSegmentAnnouncer();
     serverAnnouncer = new TestDataServerAnnouncer();
+    segmentAnnouncer = new TestDataSegmentAnnouncer();
     coordinatorClient = new TestCoordinatorClient();
     emitter = new StubServiceEmitter();
     EmittingLogger.registerEmitter(emitter);
@@ -112,10 +112,11 @@ public class SegmentLoadDropHandlerCacheTest
   public void testLoadStartStopWithEmptyLocations() throws IOException
   {
     final List<StorageLocation> emptyLocations = ImmutableList.of();
+    final SegmentLoaderConfig loaderConfig = new SegmentLoaderConfig();
     segmentManager = new SegmentManager(
         new SegmentLocalCacheManager(
             emptyLocations,
-            new SegmentLoaderConfig(),
+            loaderConfig,
             new LeastBytesUsedStorageLocationSelectorStrategy(emptyLocations),
             TestIndex.INDEX_IO,
             objectMapper
@@ -123,19 +124,26 @@ public class SegmentLoadDropHandlerCacheTest
     );
 
     final SegmentLoadDropHandler loadDropHandler = new SegmentLoadDropHandler(
-        new SegmentLoaderConfig(),
+        loaderConfig,
+        segmentAnnouncer,
+        segmentManager
+    );
+
+    final SegmentBootstrapper bootstrapper = new SegmentBootstrapper(
+        loadDropHandler,
+        loaderConfig,
         segmentAnnouncer,
         serverAnnouncer,
         segmentManager,
-        new ServerTypeConfig(ServerType.BROKER),
+        new ServerTypeConfig(ServerType.HISTORICAL),
         coordinatorClient,
         emitter
     );
 
-    loadDropHandler.start();
-    Assert.assertEquals(0, serverAnnouncer.getObservedCount());
+    bootstrapper.start();
+    Assert.assertEquals(1, serverAnnouncer.getObservedCount());
 
-    loadDropHandler.stop();
+    bootstrapper.stop();
     Assert.assertEquals(0, serverAnnouncer.getObservedCount());
   }
 
@@ -143,19 +151,26 @@ public class SegmentLoadDropHandlerCacheTest
   public void testLoadStartStop() throws IOException
   {
     final SegmentLoadDropHandler loadDropHandler = new SegmentLoadDropHandler(
+        loaderConfig,
+        segmentAnnouncer,
+        segmentManager
+    );
+
+    final SegmentBootstrapper bootstrapper = new SegmentBootstrapper(
+        loadDropHandler,
         loaderConfig,
         segmentAnnouncer,
         serverAnnouncer,
         segmentManager,
-        new ServerTypeConfig(ServerType.BROKER),
+        new ServerTypeConfig(ServerType.HISTORICAL),
         coordinatorClient,
         emitter
     );
 
-    loadDropHandler.start();
+    bootstrapper.start();
     Assert.assertEquals(1, serverAnnouncer.getObservedCount());
 
-    loadDropHandler.stop();
+    bootstrapper.stop();
     Assert.assertEquals(0, serverAnnouncer.getObservedCount());
   }
 
@@ -176,6 +191,13 @@ public class SegmentLoadDropHandlerCacheTest
     }
 
     final SegmentLoadDropHandler loadDropHandler = new SegmentLoadDropHandler(
+        loaderConfig,
+        segmentAnnouncer,
+        segmentManager
+    );
+
+    final SegmentBootstrapper bootstrapper = new SegmentBootstrapper(
+        loadDropHandler,
         loaderConfig,
         segmentAnnouncer,
         serverAnnouncer,
@@ -185,8 +207,7 @@ public class SegmentLoadDropHandlerCacheTest
         emitter
     );
 
-    // Start the load drop handler
-    loadDropHandler.start();
+    bootstrapper.start();
     Assert.assertEquals(1, serverAnnouncer.getObservedCount());
 
     // Verify the expected announcements
@@ -202,7 +223,7 @@ public class SegmentLoadDropHandlerCacheTest
     loadDropHandler.addSegment(newSegment, null);
     
Assert.assertTrue(segmentAnnouncer.getObservedSegments().contains(newSegment));
 
-    loadDropHandler.stop();
+    bootstrapper.stop();
     Assert.assertEquals(0, serverAnnouncer.getObservedCount());
   }
 }
diff --git 
a/server/src/test/java/org/apache/druid/server/coordination/SegmentBootstrapperTest.java
 
b/server/src/test/java/org/apache/druid/server/coordination/SegmentBootstrapperTest.java
new file mode 100644
index 00000000000..c41763f1824
--- /dev/null
+++ 
b/server/src/test/java/org/apache/druid/server/coordination/SegmentBootstrapperTest.java
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordination;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.guice.ServerTypeConfig;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.metrics.StubServiceEmitter;
+import org.apache.druid.segment.loading.SegmentLoaderConfig;
+import org.apache.druid.segment.loading.StorageLocationConfig;
+import org.apache.druid.server.SegmentManager;
+import org.apache.druid.timeline.DataSegment;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.druid.server.TestSegmentUtils.makeSegment;
+
+public class SegmentBootstrapperTest
+{
+  private static final int COUNT = 50;
+
+  private TestDataSegmentAnnouncer segmentAnnouncer;
+  private TestDataServerAnnouncer serverAnnouncer;
+  private SegmentLoaderConfig segmentLoaderConfig;
+  private TestCoordinatorClient coordinatorClient;
+  private StubServiceEmitter serviceEmitter;
+
+  @Rule
+  public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+  @Before
+  public void setUp() throws IOException
+  {
+    final File segmentCacheDir = temporaryFolder.newFolder();
+
+    segmentAnnouncer = new TestDataSegmentAnnouncer();
+    serverAnnouncer = new TestDataServerAnnouncer();
+    segmentLoaderConfig = new SegmentLoaderConfig()
+    {
+      @Override
+      public File getInfoDir()
+      {
+        return segmentCacheDir;
+      }
+
+      @Override
+      public int getNumLoadingThreads()
+      {
+        return 5;
+      }
+
+      @Override
+      public int getAnnounceIntervalMillis()
+      {
+        return 50;
+      }
+
+      @Override
+      public List<StorageLocationConfig> getLocations()
+      {
+        return Collections.singletonList(
+            new StorageLocationConfig(segmentCacheDir, null, null)
+        );
+      }
+    };
+
+    coordinatorClient = new TestCoordinatorClient();
+    serviceEmitter = new StubServiceEmitter();
+    EmittingLogger.registerEmitter(serviceEmitter);
+  }
+
+
+  @Test
+  public void testStartStop() throws Exception
+  {
+    final Set<DataSegment> segments = new HashSet<>();
+    for (int i = 0; i < COUNT; ++i) {
+      segments.add(makeSegment("test" + i, "1", 
Intervals.of("P1d/2011-04-01")));
+      segments.add(makeSegment("test" + i, "1", 
Intervals.of("P1d/2011-04-02")));
+      segments.add(makeSegment("test" + i, "2", 
Intervals.of("P1d/2011-04-02")));
+      segments.add(makeSegment("test_two" + i, "1", 
Intervals.of("P1d/2011-04-01")));
+      segments.add(makeSegment("test_two" + i, "1", 
Intervals.of("P1d/2011-04-02")));
+    }
+
+    final TestSegmentCacheManager cacheManager = new 
TestSegmentCacheManager(segments);
+    final SegmentManager segmentManager = new SegmentManager(cacheManager);
+    final SegmentLoadDropHandler handler = new SegmentLoadDropHandler(
+        segmentLoaderConfig,
+        segmentAnnouncer,
+        segmentManager
+    );
+    final SegmentBootstrapper bootstrapper = new SegmentBootstrapper(
+        handler,
+        segmentLoaderConfig,
+        segmentAnnouncer,
+        serverAnnouncer,
+        segmentManager,
+        new ServerTypeConfig(ServerType.HISTORICAL),
+        coordinatorClient,
+        serviceEmitter
+    );
+
+    Assert.assertTrue(segmentManager.getDataSourceCounts().isEmpty());
+    bootstrapper.start();
+
+    Assert.assertEquals(1, serverAnnouncer.getObservedCount());
+    Assert.assertFalse(segmentManager.getDataSourceCounts().isEmpty());
+
+    for (int i = 0; i < COUNT; ++i) {
+      Assert.assertEquals(3L, segmentManager.getDataSourceCounts().get("test" 
+ i).longValue());
+      Assert.assertEquals(2L, 
segmentManager.getDataSourceCounts().get("test_two" + i).longValue());
+    }
+
+    Assert.assertEquals(ImmutableList.copyOf(segments), 
segmentAnnouncer.getObservedSegments());
+
+    final ImmutableList<DataSegment> expectedBootstrapSegments = 
ImmutableList.copyOf(segments);
+    Assert.assertEquals(expectedBootstrapSegments, 
cacheManager.getObservedBootstrapSegments());
+    Assert.assertEquals(expectedBootstrapSegments, 
cacheManager.getObservedBootstrapSegmentsLoadedIntoPageCache());
+    Assert.assertEquals(ImmutableList.of(), 
cacheManager.getObservedSegments());
+    Assert.assertEquals(ImmutableList.of(), 
cacheManager.getObservedSegmentsLoadedIntoPageCache());
+
+    bootstrapper.stop();
+
+    Assert.assertEquals(0, serverAnnouncer.getObservedCount());
+    Assert.assertEquals(1, 
cacheManager.getObservedShutdownBootstrapCount().get());
+  }
+
+  @Test
+  public void testLoadCachedSegments() throws Exception
+  {
+    final Set<DataSegment> segments = new HashSet<>();
+    for (int i = 0; i < COUNT; ++i) {
+      segments.add(makeSegment("test" + i, "1", 
Intervals.of("P1d/2011-04-01")));
+      segments.add(makeSegment("test" + i, "1", 
Intervals.of("P1d/2011-04-02")));
+      segments.add(makeSegment("test" + i, "2", 
Intervals.of("P1d/2011-04-02")));
+      segments.add(makeSegment("test" + i, "1", 
Intervals.of("P1d/2011-04-03")));
+      segments.add(makeSegment("test" + i, "1", 
Intervals.of("P1d/2011-04-04")));
+      segments.add(makeSegment("test" + i, "1", 
Intervals.of("P1d/2011-04-05")));
+      segments.add(makeSegment("test" + i, "2", 
Intervals.of("PT1h/2011-04-04T01")));
+      segments.add(makeSegment("test" + i, "2", 
Intervals.of("PT1h/2011-04-04T02")));
+      segments.add(makeSegment("test" + i, "2", 
Intervals.of("PT1h/2011-04-04T03")));
+      segments.add(makeSegment("test" + i, "2", 
Intervals.of("PT1h/2011-04-04T05")));
+      segments.add(makeSegment("test" + i, "2", 
Intervals.of("PT1h/2011-04-04T06")));
+      segments.add(makeSegment("test_two" + i, "1", 
Intervals.of("P1d/2011-04-01")));
+      segments.add(makeSegment("test_two" + i, "1", 
Intervals.of("P1d/2011-04-02")));
+    }
+
+    final TestSegmentCacheManager cacheManager = new 
TestSegmentCacheManager(segments);
+    final SegmentManager segmentManager = new SegmentManager(cacheManager);
+    final SegmentLoadDropHandler handler = new 
SegmentLoadDropHandler(segmentLoaderConfig, segmentAnnouncer, segmentManager);
+    final SegmentBootstrapper bootstrapper = new SegmentBootstrapper(
+        handler,
+        segmentLoaderConfig,
+        segmentAnnouncer,
+        serverAnnouncer,
+        segmentManager,
+        new ServerTypeConfig(ServerType.HISTORICAL),
+        coordinatorClient,
+        serviceEmitter
+    );
+
+    Assert.assertTrue(segmentManager.getDataSourceCounts().isEmpty());
+
+    bootstrapper.start();
+
+    Assert.assertEquals(1, serverAnnouncer.getObservedCount());
+    Assert.assertFalse(segmentManager.getDataSourceCounts().isEmpty());
+
+    for (int i = 0; i < COUNT; ++i) {
+      Assert.assertEquals(11L, segmentManager.getDataSourceCounts().get("test" 
+ i).longValue());
+      Assert.assertEquals(2L, 
segmentManager.getDataSourceCounts().get("test_two" + i).longValue());
+    }
+
+    Assert.assertEquals(ImmutableList.copyOf(segments), 
segmentAnnouncer.getObservedSegments());
+
+    final ImmutableList<DataSegment> expectedBootstrapSegments = 
ImmutableList.copyOf(segments);
+    Assert.assertEquals(expectedBootstrapSegments, 
cacheManager.getObservedBootstrapSegments());
+    Assert.assertEquals(expectedBootstrapSegments, 
cacheManager.getObservedBootstrapSegmentsLoadedIntoPageCache());
+    Assert.assertEquals(ImmutableList.of(), 
cacheManager.getObservedSegments());
+    Assert.assertEquals(ImmutableList.of(), 
cacheManager.getObservedSegmentsLoadedIntoPageCache());
+
+    bootstrapper.stop();
+
+    Assert.assertEquals(0, serverAnnouncer.getObservedCount());
+    Assert.assertEquals(1, 
cacheManager.getObservedShutdownBootstrapCount().get());
+  }
+
+  @Test
+  public void testLoadBootstrapSegments() throws Exception
+  {
+    final Set<DataSegment> segments = new HashSet<>();
+    for (int i = 0; i < COUNT; ++i) {
+      segments.add(makeSegment("test" + i, "1", 
Intervals.of("P1d/2011-04-01")));
+      segments.add(makeSegment("test" + i, "1", 
Intervals.of("P1d/2011-04-02")));
+      segments.add(makeSegment("test_two" + i, "1", 
Intervals.of("P1d/2011-04-01")));
+      segments.add(makeSegment("test_two" + i, "1", 
Intervals.of("P1d/2011-04-02")));
+    }
+
+    final TestCoordinatorClient coordinatorClient = new 
TestCoordinatorClient(segments);
+    final TestSegmentCacheManager cacheManager = new TestSegmentCacheManager();
+    final SegmentManager segmentManager = new SegmentManager(cacheManager);
+    final SegmentLoadDropHandler handler = new SegmentLoadDropHandler(
+        segmentLoaderConfig,
+        segmentAnnouncer,
+        segmentManager
+    );
+    final SegmentBootstrapper bootstrapper = new SegmentBootstrapper(
+        handler,
+        segmentLoaderConfig,
+        segmentAnnouncer,
+        serverAnnouncer,
+        segmentManager,
+        new ServerTypeConfig(ServerType.HISTORICAL),
+        coordinatorClient,
+        serviceEmitter
+    );
+
+    Assert.assertTrue(segmentManager.getDataSourceCounts().isEmpty());
+
+    bootstrapper.start();
+
+    Assert.assertEquals(1, serverAnnouncer.getObservedCount());
+    Assert.assertFalse(segmentManager.getDataSourceCounts().isEmpty());
+
+    for (int i = 0; i < COUNT; ++i) {
+      Assert.assertEquals(2L, segmentManager.getDataSourceCounts().get("test" 
+ i).longValue());
+      Assert.assertEquals(2L, 
segmentManager.getDataSourceCounts().get("test_two" + i).longValue());
+    }
+
+    final ImmutableList<DataSegment> expectedBootstrapSegments = 
ImmutableList.copyOf(segments);
+
+    Assert.assertEquals(expectedBootstrapSegments, 
segmentAnnouncer.getObservedSegments());
+
+    Assert.assertEquals(expectedBootstrapSegments, 
cacheManager.getObservedBootstrapSegments());
+    Assert.assertEquals(expectedBootstrapSegments, 
cacheManager.getObservedBootstrapSegmentsLoadedIntoPageCache());
+    serviceEmitter.verifyValue("segment/bootstrap/count", 
expectedBootstrapSegments.size());
+    serviceEmitter.verifyEmitted("segment/bootstrap/time", 1);
+
+    bootstrapper.stop();
+  }
+
+  @Test
+  public void testLoadBootstrapSegmentsWhenExceptionThrown() throws Exception
+  {
+    final TestSegmentCacheManager cacheManager = new TestSegmentCacheManager();
+    final SegmentManager segmentManager = new SegmentManager(cacheManager);
+    final SegmentLoadDropHandler handler = new SegmentLoadDropHandler(
+        segmentLoaderConfig,
+        segmentAnnouncer,
+        segmentManager
+    );
+    final SegmentBootstrapper bootstrapper = new SegmentBootstrapper(
+        handler,
+        segmentLoaderConfig,
+        segmentAnnouncer,
+        serverAnnouncer,
+        segmentManager,
+        new ServerTypeConfig(ServerType.HISTORICAL),
+        coordinatorClient,
+        serviceEmitter
+    );
+
+    Assert.assertTrue(segmentManager.getDataSourceCounts().isEmpty());
+
+    bootstrapper.start();
+
+    Assert.assertEquals(1, serverAnnouncer.getObservedCount());
+    Assert.assertTrue(segmentManager.getDataSourceCounts().isEmpty());
+
+    Assert.assertEquals(ImmutableList.of(), 
segmentAnnouncer.getObservedSegments());
+    Assert.assertEquals(ImmutableList.of(), 
cacheManager.getObservedBootstrapSegments());
+    Assert.assertEquals(ImmutableList.of(), 
cacheManager.getObservedBootstrapSegmentsLoadedIntoPageCache());
+    serviceEmitter.verifyValue("segment/bootstrap/count", 0);
+    serviceEmitter.verifyEmitted("segment/bootstrap/time", 1);
+
+    bootstrapper.stop();
+  }
+}
diff --git 
a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java
 
b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java
index 9fe04d60d5b..cd2fe2dbd63 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java
@@ -20,33 +20,21 @@
 package org.apache.druid.server.coordination;
 
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
 import com.google.common.util.concurrent.ListenableFuture;
-import org.apache.druid.client.coordinator.CoordinatorClient;
-import org.apache.druid.client.coordinator.NoopCoordinatorClient;
-import org.apache.druid.guice.ServerTypeConfig;
 import org.apache.druid.java.util.common.Intervals;
-import org.apache.druid.java.util.common.MapUtils;
 import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory;
 import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.java.util.metrics.StubServiceEmitter;
-import org.apache.druid.segment.ReferenceCountingSegment;
-import org.apache.druid.segment.SegmentLazyLoadFailCallback;
-import org.apache.druid.segment.loading.NoopSegmentCacheManager;
 import org.apache.druid.segment.loading.SegmentLoaderConfig;
 import org.apache.druid.segment.loading.StorageLocationConfig;
-import org.apache.druid.segment.loading.TombstoneSegmentizerFactory;
 import org.apache.druid.server.SegmentManager;
-import org.apache.druid.server.TestSegmentUtils;
 import org.apache.druid.server.coordination.SegmentChangeStatus.State;
 import org.apache.druid.timeline.DataSegment;
-import org.joda.time.Interval;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.ExpectedException;
 import org.junit.rules.TemporaryFolder;
 import org.mockito.ArgumentMatchers;
 import org.mockito.Mockito;
@@ -56,29 +44,20 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.druid.server.TestSegmentUtils.makeSegment;
 
 public class SegmentLoadDropHandlerTest
 {
-  private static final int COUNT = 50;
-
   private TestDataSegmentAnnouncer segmentAnnouncer;
-  private TestDataServerAnnouncer serverAnnouncer;
   private List<Runnable> scheduledRunnable;
   private SegmentLoaderConfig segmentLoaderConfig;
   private ScheduledExecutorFactory scheduledExecutorFactory;
-  private TestCoordinatorClient coordinatorClient;
-  private StubServiceEmitter serviceEmitter;
-
-  @Rule
-  public ExpectedException expectedException = ExpectedException.none();
 
   @Rule
   public TemporaryFolder temporaryFolder = new TemporaryFolder();
@@ -90,7 +69,6 @@ public class SegmentLoadDropHandlerTest
 
     scheduledRunnable = new ArrayList<>();
     segmentAnnouncer = new TestDataSegmentAnnouncer();
-    serverAnnouncer = new TestDataServerAnnouncer();
     segmentLoaderConfig = new SegmentLoaderConfig()
     {
       @Override
@@ -140,9 +118,7 @@ public class SegmentLoadDropHandlerTest
       };
     };
 
-    coordinatorClient = new TestCoordinatorClient();
-    serviceEmitter = new StubServiceEmitter();
-    EmittingLogger.registerEmitter(serviceEmitter);
+    EmittingLogger.registerEmitter(new StubServiceEmitter());
   }
 
   /**
@@ -154,16 +130,12 @@ public class SegmentLoadDropHandlerTest
    * </ul>
    */
   @Test
-  public void testSegmentLoading1() throws Exception
+  public void testSegmentLoading1()
   {
     final TestSegmentCacheManager cacheManager = new TestSegmentCacheManager();
     final SegmentManager segmentManager = new SegmentManager(cacheManager);
     final SegmentLoadDropHandler handler = 
initSegmentLoadDropHandler(segmentManager);
 
-    handler.start();
-
-    Assert.assertEquals(1, serverAnnouncer.getObservedCount());
-
     final DataSegment segment = makeSegment("test", "1", 
Intervals.of("P1d/2011-04-01"));
 
     handler.removeSegment(segment, DataSegmentChangeCallback.NOOP);
@@ -178,19 +150,16 @@ public class SegmentLoadDropHandlerTest
     for (Runnable runnable : scheduledRunnable) {
       runnable.run();
     }
-    Assert.assertEquals(ImmutableList.of(segment), 
cacheManager.observedSegments);
-    Assert.assertEquals(ImmutableList.of(segment), 
cacheManager.observedSegmentsLoadedIntoPageCache);
-    Assert.assertEquals(ImmutableList.of(), 
cacheManager.observedBootstrapSegments);
-    Assert.assertEquals(ImmutableList.of(), 
cacheManager.observedBootstrapSegmentsLoadedIntoPageCache);
+    Assert.assertEquals(ImmutableList.of(segment), 
cacheManager.getObservedSegments());
+    Assert.assertEquals(ImmutableList.of(segment), 
cacheManager.getObservedSegmentsLoadedIntoPageCache());
+    Assert.assertEquals(ImmutableList.of(), 
cacheManager.getObservedBootstrapSegments());
+    Assert.assertEquals(ImmutableList.of(), 
cacheManager.getObservedBootstrapSegmentsLoadedIntoPageCache());
 
     Assert.assertEquals(ImmutableList.of(segment), 
segmentAnnouncer.getObservedSegments());
     Assert.assertFalse(
         "segment files shouldn't be deleted",
-        cacheManager.observedSegmentsRemovedFromCache.contains(segment)
+        cacheManager.getObservedSegmentsRemovedFromCache().contains(segment)
     );
-
-    handler.stop();
-    Assert.assertEquals(0, serverAnnouncer.getObservedCount());
   }
 
   /**
@@ -203,15 +172,15 @@ public class SegmentLoadDropHandlerTest
    * </ul>
    */
   @Test
-  public void testSegmentLoading2() throws Exception
+  public void testSegmentLoading2()
   {
     final TestSegmentCacheManager cacheManager = new TestSegmentCacheManager();
     final SegmentManager segmentManager = new SegmentManager(cacheManager);
     final SegmentLoadDropHandler handler = 
initSegmentLoadDropHandler(segmentManager);
 
-    handler.start();
+    // handler.start();
 
-    Assert.assertEquals(1, serverAnnouncer.getObservedCount());
+    // Assert.assertEquals(1, serverAnnouncer.getObservedCount());
 
     final DataSegment segment = makeSegment("test", "1", 
Intervals.of("P1d/2011-04-01"));
 
@@ -234,176 +203,16 @@ public class SegmentLoadDropHandlerTest
 
     // The same segment reference will be fetched more than once in the above 
sequence, but the segment should
     // be loaded only once onto the page cache.
-    Assert.assertEquals(ImmutableList.of(segment, segment), 
cacheManager.observedSegments);
-    Assert.assertEquals(ImmutableList.of(segment), 
cacheManager.observedSegmentsLoadedIntoPageCache);
-    Assert.assertEquals(ImmutableList.of(), 
cacheManager.observedBootstrapSegments);
-    Assert.assertEquals(ImmutableList.of(), 
cacheManager.observedBootstrapSegmentsLoadedIntoPageCache);
+    Assert.assertEquals(ImmutableList.of(segment, segment), 
cacheManager.getObservedSegments());
+    Assert.assertEquals(ImmutableList.of(segment), 
cacheManager.getObservedSegmentsLoadedIntoPageCache());
+    Assert.assertEquals(ImmutableList.of(), 
cacheManager.getObservedBootstrapSegments());
+    Assert.assertEquals(ImmutableList.of(), 
cacheManager.getObservedBootstrapSegmentsLoadedIntoPageCache());
 
     
Assert.assertTrue(segmentAnnouncer.getObservedSegments().contains(segment));
     Assert.assertFalse(
         "segment files shouldn't be deleted",
-        cacheManager.observedSegmentsRemovedFromCache.contains(segment)
+        cacheManager.getObservedSegmentsRemovedFromCache().contains(segment)
     );
-
-    handler.stop();
-    Assert.assertEquals(0, serverAnnouncer.getObservedCount());
-  }
-
-  @Test
-  public void testLoadCache() throws Exception
-  {
-    Set<DataSegment> segments = new HashSet<>();
-    for (int i = 0; i < COUNT; ++i) {
-      segments.add(makeSegment("test" + i, "1", 
Intervals.of("P1d/2011-04-01")));
-      segments.add(makeSegment("test" + i, "1", 
Intervals.of("P1d/2011-04-02")));
-      segments.add(makeSegment("test" + i, "2", 
Intervals.of("P1d/2011-04-02")));
-      segments.add(makeSegment("test" + i, "1", 
Intervals.of("P1d/2011-04-03")));
-      segments.add(makeSegment("test" + i, "1", 
Intervals.of("P1d/2011-04-04")));
-      segments.add(makeSegment("test" + i, "1", 
Intervals.of("P1d/2011-04-05")));
-      segments.add(makeSegment("test" + i, "2", 
Intervals.of("PT1h/2011-04-04T01")));
-      segments.add(makeSegment("test" + i, "2", 
Intervals.of("PT1h/2011-04-04T02")));
-      segments.add(makeSegment("test" + i, "2", 
Intervals.of("PT1h/2011-04-04T03")));
-      segments.add(makeSegment("test" + i, "2", 
Intervals.of("PT1h/2011-04-04T05")));
-      segments.add(makeSegment("test" + i, "2", 
Intervals.of("PT1h/2011-04-04T06")));
-      segments.add(makeSegment("test_two" + i, "1", 
Intervals.of("P1d/2011-04-01")));
-      segments.add(makeSegment("test_two" + i, "1", 
Intervals.of("P1d/2011-04-02")));
-    }
-
-    final TestSegmentCacheManager cacheManager = new 
TestSegmentCacheManager(segments);
-    final SegmentManager segmentManager = new SegmentManager(cacheManager);
-    final SegmentLoadDropHandler handler = 
initSegmentLoadDropHandler(segmentManager);
-
-    Assert.assertTrue(segmentManager.getDataSourceCounts().isEmpty());
-
-    handler.start();
-
-    Assert.assertEquals(1, serverAnnouncer.getObservedCount());
-    Assert.assertFalse(segmentManager.getDataSourceCounts().isEmpty());
-
-    for (int i = 0; i < COUNT; ++i) {
-      Assert.assertEquals(11L, segmentManager.getDataSourceCounts().get("test" 
+ i).longValue());
-      Assert.assertEquals(2L, 
segmentManager.getDataSourceCounts().get("test_two" + i).longValue());
-    }
-
-    Assert.assertEquals(ImmutableList.copyOf(segments), 
segmentAnnouncer.getObservedSegments());
-
-    final ImmutableList<DataSegment> expectedBootstrapSegments = 
ImmutableList.copyOf(segments);
-    Assert.assertEquals(expectedBootstrapSegments, 
cacheManager.observedBootstrapSegments);
-    Assert.assertEquals(expectedBootstrapSegments, 
cacheManager.observedBootstrapSegmentsLoadedIntoPageCache);
-    Assert.assertEquals(ImmutableList.of(), cacheManager.observedSegments);
-    Assert.assertEquals(ImmutableList.of(), 
cacheManager.observedSegmentsLoadedIntoPageCache);
-
-    handler.stop();
-
-    Assert.assertEquals(0, serverAnnouncer.getObservedCount());
-    Assert.assertEquals(1, cacheManager.observedShutdownBootstrapCount.get());
-  }
-
-  @Test
-  public void testLoadBootstrapSegments() throws Exception
-  {
-    final Set<DataSegment> segments = new HashSet<>();
-    for (int i = 0; i < COUNT; ++i) {
-      segments.add(makeSegment("test" + i, "1", 
Intervals.of("P1d/2011-04-01")));
-      segments.add(makeSegment("test" + i, "1", 
Intervals.of("P1d/2011-04-02")));
-      segments.add(makeSegment("test_two" + i, "1", 
Intervals.of("P1d/2011-04-01")));
-      segments.add(makeSegment("test_two" + i, "1", 
Intervals.of("P1d/2011-04-02")));
-    }
-
-    final TestCoordinatorClient coordinatorClient = new 
TestCoordinatorClient(segments);
-    final TestSegmentCacheManager cacheManager = new TestSegmentCacheManager();
-    final SegmentManager segmentManager = new SegmentManager(cacheManager);
-
-    final SegmentLoadDropHandler handler = 
initSegmentLoadDropHandler(segmentManager, coordinatorClient);
-
-    Assert.assertTrue(segmentManager.getDataSourceCounts().isEmpty());
-
-    handler.start();
-
-    Assert.assertEquals(1, serverAnnouncer.getObservedCount());
-    Assert.assertFalse(segmentManager.getDataSourceCounts().isEmpty());
-
-    for (int i = 0; i < COUNT; ++i) {
-      Assert.assertEquals(2L, segmentManager.getDataSourceCounts().get("test" 
+ i).longValue());
-      Assert.assertEquals(2L, 
segmentManager.getDataSourceCounts().get("test_two" + i).longValue());
-    }
-
-    final ImmutableList<DataSegment> expectedBootstrapSegments = 
ImmutableList.copyOf(segments);
-
-    Assert.assertEquals(expectedBootstrapSegments, 
segmentAnnouncer.getObservedSegments());
-
-    Assert.assertEquals(expectedBootstrapSegments, 
cacheManager.observedBootstrapSegments);
-    Assert.assertEquals(expectedBootstrapSegments, 
cacheManager.observedBootstrapSegmentsLoadedIntoPageCache);
-    serviceEmitter.verifyValue("segment/bootstrap/count", 
expectedBootstrapSegments.size());
-    serviceEmitter.verifyEmitted("segment/bootstrap/time", 1);
-
-    handler.stop();
-  }
-
-  @Test
-  public void testLoadBootstrapSegmentsWhenExceptionThrown() throws Exception
-  {
-    final TestSegmentCacheManager cacheManager = new TestSegmentCacheManager();
-    final SegmentManager segmentManager = new SegmentManager(cacheManager);
-
-    final SegmentLoadDropHandler handler = 
initSegmentLoadDropHandler(segmentManager, new NoopCoordinatorClient());
-
-    Assert.assertTrue(segmentManager.getDataSourceCounts().isEmpty());
-
-    handler.start();
-
-    Assert.assertEquals(1, serverAnnouncer.getObservedCount());
-    Assert.assertTrue(segmentManager.getDataSourceCounts().isEmpty());
-
-    Assert.assertEquals(ImmutableList.of(), 
segmentAnnouncer.getObservedSegments());
-    Assert.assertEquals(ImmutableList.of(), 
cacheManager.observedBootstrapSegments);
-    Assert.assertEquals(ImmutableList.of(), 
cacheManager.observedBootstrapSegmentsLoadedIntoPageCache);
-    serviceEmitter.verifyValue("segment/bootstrap/count", 0);
-    serviceEmitter.verifyEmitted("segment/bootstrap/time", 1);
-
-    handler.stop();
-  }
-
-  @Test
-  public void testStartStop() throws Exception
-  {
-    final Set<DataSegment> segments = new HashSet<>();
-    for (int i = 0; i < COUNT; ++i) {
-      segments.add(makeSegment("test" + i, "1", 
Intervals.of("P1d/2011-04-01")));
-      segments.add(makeSegment("test" + i, "1", 
Intervals.of("P1d/2011-04-02")));
-      segments.add(makeSegment("test" + i, "2", 
Intervals.of("P1d/2011-04-02")));
-      segments.add(makeSegment("test_two" + i, "1", 
Intervals.of("P1d/2011-04-01")));
-      segments.add(makeSegment("test_two" + i, "1", 
Intervals.of("P1d/2011-04-02")));
-    }
-
-    final TestSegmentCacheManager cacheManager = new 
TestSegmentCacheManager(segments);
-    final SegmentManager segmentManager = new SegmentManager(cacheManager);
-    final SegmentLoadDropHandler handler = 
initSegmentLoadDropHandler(segmentManager);
-
-    Assert.assertTrue(segmentManager.getDataSourceCounts().isEmpty());
-
-    handler.start();
-
-    Assert.assertEquals(1, serverAnnouncer.getObservedCount());
-    Assert.assertFalse(segmentManager.getDataSourceCounts().isEmpty());
-
-    for (int i = 0; i < COUNT; ++i) {
-      Assert.assertEquals(3L, segmentManager.getDataSourceCounts().get("test" 
+ i).longValue());
-      Assert.assertEquals(2L, 
segmentManager.getDataSourceCounts().get("test_two" + i).longValue());
-    }
-
-    Assert.assertEquals(ImmutableList.copyOf(segments), 
segmentAnnouncer.getObservedSegments());
-
-    final ImmutableList<DataSegment> expectedBootstrapSegments = 
ImmutableList.copyOf(segments);
-    Assert.assertEquals(expectedBootstrapSegments, 
cacheManager.observedBootstrapSegments);
-    Assert.assertEquals(expectedBootstrapSegments, 
cacheManager.observedBootstrapSegmentsLoadedIntoPageCache);
-    Assert.assertEquals(ImmutableList.of(), cacheManager.observedSegments);
-    Assert.assertEquals(ImmutableList.of(), 
cacheManager.observedSegmentsLoadedIntoPageCache);
-
-    handler.stop();
-
-    Assert.assertEquals(0, serverAnnouncer.getObservedCount());
-    Assert.assertEquals(1, cacheManager.observedShutdownBootstrapCount.get());
   }
 
   @Test(timeout = 60_000L)
@@ -413,10 +222,6 @@ public class SegmentLoadDropHandlerTest
     final SegmentManager segmentManager = new SegmentManager(cacheManager);
     final SegmentLoadDropHandler handler = 
initSegmentLoadDropHandler(segmentManager);
 
-    handler.start();
-
-    Assert.assertEquals(1, serverAnnouncer.getObservedCount());
-
     DataSegment segment1 = makeSegment("batchtest1", "1", 
Intervals.of("P1d/2011-04-01"));
     DataSegment segment2 = makeSegment("batchtest2", "1", 
Intervals.of("P1d/2011-04-01"));
 
@@ -445,13 +250,10 @@ public class SegmentLoadDropHandlerTest
     Assert.assertEquals(ImmutableList.of(segment1), 
segmentAnnouncer.getObservedSegments());
 
     final ImmutableList<DataSegment> expectedSegments = 
ImmutableList.of(segment1);
-    Assert.assertEquals(expectedSegments, cacheManager.observedSegments);
-    Assert.assertEquals(expectedSegments, 
cacheManager.observedSegmentsLoadedIntoPageCache);
-    Assert.assertEquals(ImmutableList.of(), 
cacheManager.observedBootstrapSegments);
-    Assert.assertEquals(ImmutableList.of(), 
cacheManager.observedBootstrapSegmentsLoadedIntoPageCache);
-
-    handler.stop();
-    Assert.assertEquals(0, serverAnnouncer.getObservedCount());
+    Assert.assertEquals(expectedSegments, cacheManager.getObservedSegments());
+    Assert.assertEquals(expectedSegments, 
cacheManager.getObservedSegmentsLoadedIntoPageCache());
+    Assert.assertEquals(ImmutableList.of(), 
cacheManager.getObservedBootstrapSegments());
+    Assert.assertEquals(ImmutableList.of(), 
cacheManager.getObservedBootstrapSegmentsLoadedIntoPageCache());
   }
 
   @Test(timeout = 60_000L)
@@ -465,9 +267,6 @@ public class SegmentLoadDropHandlerTest
 
     final SegmentLoadDropHandler handler = 
initSegmentLoadDropHandler(segmentManager);
 
-    handler.start();
-
-    Assert.assertEquals(1, serverAnnouncer.getObservedCount());
 
     DataSegment segment1 = makeSegment("batchtest1", "1", 
Intervals.of("P1d/2011-04-01"));
     List<DataSegmentChangeRequest> batch = ImmutableList.of(new 
SegmentChangeRequestLoad(segment1));
@@ -489,8 +288,6 @@ public class SegmentLoadDropHandlerTest
     Assert.assertEquals(State.SUCCESS, result.get(0).getStatus().getState());
     Assert.assertEquals(ImmutableList.of(segment1, segment1), 
segmentAnnouncer.getObservedSegments());
 
-    handler.stop();
-    Assert.assertEquals(0, serverAnnouncer.getObservedCount());
   }
 
   @Test(timeout = 60_000L)
@@ -538,13 +335,9 @@ public class SegmentLoadDropHandlerTest
 
     final SegmentLoadDropHandler handler = initSegmentLoadDropHandler(
         noAnnouncerSegmentLoaderConfig,
-        segmentManager,
-        coordinatorClient
+        segmentManager
     );
 
-    handler.start();
-
-    Assert.assertEquals(1, serverAnnouncer.getObservedCount());
 
     final DataSegment segment1 = makeSegment("batchtest1", "1", 
Intervals.of("P1d/2011-04-01"));
     List<DataSegmentChangeRequest> batch = ImmutableList.of(new 
SegmentChangeRequestLoad(segment1));
@@ -611,149 +404,23 @@ public class SegmentLoadDropHandlerTest
     Mockito.verify(segmentManager, Mockito.times(1))
            .dropSegment(ArgumentMatchers.any());
 
-    handler.stop();
-    Assert.assertEquals(0, serverAnnouncer.getObservedCount());
-  }
-
-  private SegmentLoadDropHandler initSegmentLoadDropHandler(SegmentManager 
segmentManager, CoordinatorClient coordinatorClient)
-  {
-    return initSegmentLoadDropHandler(segmentLoaderConfig, segmentManager, 
coordinatorClient);
   }
 
   private SegmentLoadDropHandler initSegmentLoadDropHandler(SegmentManager 
segmentManager)
   {
-    return initSegmentLoadDropHandler(segmentLoaderConfig, segmentManager, 
coordinatorClient);
+    return initSegmentLoadDropHandler(segmentLoaderConfig, segmentManager);
   }
 
   private SegmentLoadDropHandler initSegmentLoadDropHandler(
       SegmentLoaderConfig config,
-      SegmentManager segmentManager,
-      CoordinatorClient coordinatorClient
+      SegmentManager segmentManager
   )
   {
     return new SegmentLoadDropHandler(
         config,
         segmentAnnouncer,
-        serverAnnouncer,
         segmentManager,
-        scheduledExecutorFactory.create(5, "SegmentLoadDropHandlerTest-[%d]"),
-        new ServerTypeConfig(ServerType.HISTORICAL),
-        coordinatorClient,
-        serviceEmitter
+        scheduledExecutorFactory.create(5, "SegmentLoadDropHandlerTest-[%d]")
     );
   }
-
-  private DataSegment makeSegment(String dataSource, String version, Interval 
interval)
-  {
-    return TestSegmentUtils.makeSegment(dataSource, version, interval);
-  }
-
-  /**
-   * A local cache manager to test the bootstrapping and segment add/remove 
operations. It stubs only the necessary
-   * methods to support these operations; any other method invoked will throw 
an exception from the base class,
-   * {@link NoopSegmentCacheManager}.
-   */
-  private static class TestSegmentCacheManager extends NoopSegmentCacheManager
-  {
-    private final List<DataSegment> cachedSegments;
-
-    private final List<DataSegment> observedBootstrapSegments;
-    private final List<DataSegment> 
observedBootstrapSegmentsLoadedIntoPageCache;
-    private final List<DataSegment> observedSegments;
-    private final List<DataSegment> observedSegmentsLoadedIntoPageCache;
-    private final List<DataSegment> observedSegmentsRemovedFromCache;
-    private final AtomicInteger observedShutdownBootstrapCount;
-
-    TestSegmentCacheManager()
-    {
-      this(ImmutableSet.of());
-    }
-
-    TestSegmentCacheManager(final Set<DataSegment> segmentsToCache)
-    {
-      this.cachedSegments = ImmutableList.copyOf(segmentsToCache);
-      this.observedBootstrapSegments = new ArrayList<>();
-      this.observedBootstrapSegmentsLoadedIntoPageCache = new ArrayList<>();
-      this.observedSegments = new ArrayList<>();
-      this.observedSegmentsLoadedIntoPageCache = new ArrayList<>();
-      this.observedSegmentsRemovedFromCache = new ArrayList<>();
-      this.observedShutdownBootstrapCount = new AtomicInteger(0);
-    }
-
-    @Override
-    public boolean canHandleSegments()
-    {
-      return true;
-    }
-
-    @Override
-    public List<DataSegment> getCachedSegments()
-    {
-      return cachedSegments;
-    }
-
-    @Override
-    public ReferenceCountingSegment getBootstrapSegment(DataSegment segment, 
SegmentLazyLoadFailCallback loadFailed)
-    {
-      observedBootstrapSegments.add(segment);
-      return getSegmentInternal(segment);
-    }
-
-    @Override
-    public ReferenceCountingSegment getSegment(final DataSegment segment)
-    {
-      observedSegments.add(segment);
-      return getSegmentInternal(segment);
-    }
-
-    private ReferenceCountingSegment getSegmentInternal(final DataSegment 
segment)
-    {
-      if (segment.isTombstone()) {
-        return ReferenceCountingSegment
-            
.wrapSegment(TombstoneSegmentizerFactory.segmentForTombstone(segment), 
segment.getShardSpec());
-      } else {
-        return ReferenceCountingSegment.wrapSegment(
-            new TestSegmentUtils.SegmentForTesting(
-                segment.getDataSource(),
-                (Interval) segment.getLoadSpec().get("interval"),
-                MapUtils.getString(segment.getLoadSpec(), "version")
-            ), segment.getShardSpec()
-        );
-      }
-    }
-
-    @Override
-    public void loadSegmentIntoPageCache(DataSegment segment)
-    {
-      observedSegmentsLoadedIntoPageCache.add(segment);
-    }
-
-    @Override
-    public void loadSegmentIntoPageCacheOnBootstrap(DataSegment segment)
-    {
-      observedBootstrapSegmentsLoadedIntoPageCache.add(segment);
-    }
-
-    @Override
-    public void shutdownBootstrap()
-    {
-      observedShutdownBootstrapCount.incrementAndGet();
-    }
-
-    @Override
-    public void storeInfoFile(DataSegment segment)
-    {
-    }
-
-    @Override
-    public void removeInfoFile(DataSegment segment)
-    {
-    }
-
-    @Override
-    public void cleanup(DataSegment segment)
-    {
-      observedSegmentsRemovedFromCache.add(segment);
-    }
-  }
 }
diff --git 
a/server/src/test/java/org/apache/druid/server/coordination/TestSegmentCacheManager.java
 
b/server/src/test/java/org/apache/druid/server/coordination/TestSegmentCacheManager.java
new file mode 100644
index 00000000000..2cd5e8e61fe
--- /dev/null
+++ 
b/server/src/test/java/org/apache/druid/server/coordination/TestSegmentCacheManager.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordination;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import org.apache.druid.java.util.common.MapUtils;
+import org.apache.druid.segment.ReferenceCountingSegment;
+import org.apache.druid.segment.SegmentLazyLoadFailCallback;
+import org.apache.druid.segment.loading.NoopSegmentCacheManager;
+import org.apache.druid.segment.loading.TombstoneSegmentizerFactory;
+import org.apache.druid.server.TestSegmentUtils;
+import org.apache.druid.timeline.DataSegment;
+import org.joda.time.Interval;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * A local cache manager to test the bootstrapping and segment add/remove 
operations. It stubs only the necessary
+ * methods to support these operations; any other method invoked will throw an 
exception from the base class,
+ * {@link NoopSegmentCacheManager}.
+ */
+class TestSegmentCacheManager extends NoopSegmentCacheManager
+{
+  private final List<DataSegment> cachedSegments;
+
+  private final List<DataSegment> observedBootstrapSegments;
+  private final List<DataSegment> observedBootstrapSegmentsLoadedIntoPageCache;
+  private final List<DataSegment> observedSegments;
+  private final List<DataSegment> observedSegmentsLoadedIntoPageCache;
+  private final List<DataSegment> observedSegmentsRemovedFromCache;
+  private final AtomicInteger observedShutdownBootstrapCount;
+
+  TestSegmentCacheManager()
+  {
+    this(ImmutableSet.of());
+  }
+
+  TestSegmentCacheManager(final Set<DataSegment> segmentsToCache)
+  {
+    this.cachedSegments = ImmutableList.copyOf(segmentsToCache);
+    this.observedBootstrapSegments = new ArrayList<>();
+    this.observedBootstrapSegmentsLoadedIntoPageCache = new ArrayList<>();
+    this.observedSegments = new ArrayList<>();
+    this.observedSegmentsLoadedIntoPageCache = new ArrayList<>();
+    this.observedSegmentsRemovedFromCache = new ArrayList<>();
+    this.observedShutdownBootstrapCount = new AtomicInteger(0);
+  }
+
+  @Override
+  public boolean canHandleSegments()
+  {
+    return true;
+  }
+
+  @Override
+  public List<DataSegment> getCachedSegments()
+  {
+    return cachedSegments;
+  }
+
+  @Override
+  public ReferenceCountingSegment getBootstrapSegment(DataSegment segment, 
SegmentLazyLoadFailCallback loadFailed)
+  {
+    observedBootstrapSegments.add(segment);
+    return getSegmentInternal(segment);
+  }
+
+  @Override
+  public ReferenceCountingSegment getSegment(final DataSegment segment)
+  {
+    observedSegments.add(segment);
+    return getSegmentInternal(segment);
+  }
+
+  private ReferenceCountingSegment getSegmentInternal(final DataSegment 
segment)
+  {
+    if (segment.isTombstone()) {
+      return ReferenceCountingSegment
+          
.wrapSegment(TombstoneSegmentizerFactory.segmentForTombstone(segment), 
segment.getShardSpec());
+    } else {
+      return ReferenceCountingSegment.wrapSegment(
+          new TestSegmentUtils.SegmentForTesting(
+              segment.getDataSource(),
+              (Interval) segment.getLoadSpec().get("interval"),
+              MapUtils.getString(segment.getLoadSpec(), "version")
+          ), segment.getShardSpec()
+      );
+    }
+  }
+
+  @Override
+  public void loadSegmentIntoPageCache(DataSegment segment)
+  {
+    observedSegmentsLoadedIntoPageCache.add(segment);
+  }
+
+  @Override
+  public void loadSegmentIntoPageCacheOnBootstrap(DataSegment segment)
+  {
+    observedBootstrapSegmentsLoadedIntoPageCache.add(segment);
+  }
+
+  @Override
+  public void shutdownBootstrap()
+  {
+    observedShutdownBootstrapCount.incrementAndGet();
+  }
+
+  @Override
+  public void storeInfoFile(DataSegment segment)
+  {
+  }
+
+  @Override
+  public void removeInfoFile(DataSegment segment)
+  {
+  }
+
+  @Override
+  public void cleanup(DataSegment segment)
+  {
+    observedSegmentsRemovedFromCache.add(segment);
+  }
+
+  public List<DataSegment> getObservedBootstrapSegments()
+  {
+    return observedBootstrapSegments;
+  }
+
+  public List<DataSegment> getObservedBootstrapSegmentsLoadedIntoPageCache()
+  {
+    return observedBootstrapSegmentsLoadedIntoPageCache;
+  }
+
+  public List<DataSegment> getObservedSegments()
+  {
+    return observedSegments;
+  }
+
+  public List<DataSegment> getObservedSegmentsLoadedIntoPageCache()
+  {
+    return observedSegmentsLoadedIntoPageCache;
+  }
+
+  public List<DataSegment> getObservedSegmentsRemovedFromCache()
+  {
+    return observedSegmentsRemovedFromCache;
+  }
+
+  public AtomicInteger getObservedShutdownBootstrapCount()
+  {
+    return observedShutdownBootstrapCount;
+  }
+}
diff --git 
a/server/src/test/java/org/apache/druid/server/coordination/ZkCoordinatorTest.java
 
b/server/src/test/java/org/apache/druid/server/coordination/ZkCoordinatorTest.java
index 9f5291af598..a9f7772e59d 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordination/ZkCoordinatorTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordination/ZkCoordinatorTest.java
@@ -23,7 +23,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableMap;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.druid.curator.CuratorTestBase;
-import org.apache.druid.guice.ServerTypeConfig;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.segment.IndexIO;
@@ -42,7 +41,6 @@ import org.junit.Test;
 
 import java.util.Arrays;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ScheduledExecutorService;
 
 /**
  */
@@ -103,12 +101,7 @@ public class ZkCoordinatorTest extends CuratorTestBase
     SegmentLoadDropHandler segmentLoadDropHandler = new SegmentLoadDropHandler(
         new SegmentLoaderConfig(),
         EasyMock.createNiceMock(DataSegmentAnnouncer.class),
-        EasyMock.createNiceMock(DataSegmentServerAnnouncer.class),
-        EasyMock.createNiceMock(SegmentManager.class),
-        EasyMock.createNiceMock(ScheduledExecutorService.class),
-        new ServerTypeConfig(ServerType.HISTORICAL),
-        new TestCoordinatorClient(),
-        new NoopServiceEmitter()
+        EasyMock.createNiceMock(SegmentManager.class)
     )
     {
       @Override
diff --git a/services/src/main/java/org/apache/druid/cli/CliBroker.java 
b/services/src/main/java/org/apache/druid/cli/CliBroker.java
index 7fe5ec57631..94160f55779 100644
--- a/services/src/main/java/org/apache/druid/cli/CliBroker.java
+++ b/services/src/main/java/org/apache/druid/cli/CliBroker.java
@@ -63,6 +63,7 @@ import org.apache.druid.server.ResponseContextConfig;
 import org.apache.druid.server.SegmentManager;
 import org.apache.druid.server.SubqueryGuardrailHelper;
 import org.apache.druid.server.SubqueryGuardrailHelperProvider;
+import org.apache.druid.server.coordination.SegmentBootstrapper;
 import org.apache.druid.server.coordination.ServerType;
 import org.apache.druid.server.coordination.ZkCoordinator;
 import org.apache.druid.server.http.BrokerResource;
@@ -172,6 +173,7 @@ public class CliBroker extends ServerRunnable
           if (isZkEnabled) {
             LifecycleModule.register(binder, ZkCoordinator.class);
           }
+          LifecycleModule.register(binder, SegmentBootstrapper.class);
 
           bindAnnouncer(
               binder,
diff --git a/services/src/main/java/org/apache/druid/cli/CliHistorical.java 
b/services/src/main/java/org/apache/druid/cli/CliHistorical.java
index dc1acc41f87..2e231bcdcc3 100644
--- a/services/src/main/java/org/apache/druid/cli/CliHistorical.java
+++ b/services/src/main/java/org/apache/druid/cli/CliHistorical.java
@@ -49,6 +49,7 @@ import org.apache.druid.query.lookup.LookupModule;
 import org.apache.druid.server.QueryResource;
 import org.apache.druid.server.ResponseContextConfig;
 import org.apache.druid.server.SegmentManager;
+import org.apache.druid.server.coordination.SegmentBootstrapper;
 import org.apache.druid.server.coordination.ServerManager;
 import org.apache.druid.server.coordination.ServerType;
 import org.apache.druid.server.coordination.ZkCoordinator;
@@ -125,6 +126,7 @@ public class CliHistorical extends ServerRunnable
           if (isZkEnabled) {
             LifecycleModule.register(binder, ZkCoordinator.class);
           }
+          LifecycleModule.register(binder, SegmentBootstrapper.class);
 
           JsonConfigProvider.bind(binder, "druid.historical.cache", 
CacheConfig.class);
           binder.install(new CacheModule());
diff --git a/services/src/main/java/org/apache/druid/cli/CliIndexer.java 
b/services/src/main/java/org/apache/druid/cli/CliIndexer.java
index c6b817fa4a9..312b6f6b05a 100644
--- a/services/src/main/java/org/apache/druid/cli/CliIndexer.java
+++ b/services/src/main/java/org/apache/druid/cli/CliIndexer.java
@@ -74,6 +74,7 @@ import 
org.apache.druid.segment.realtime.appenderator.UnifiedIndexerAppenderator
 import org.apache.druid.server.DruidNode;
 import org.apache.druid.server.ResponseContextConfig;
 import org.apache.druid.server.SegmentManager;
+import org.apache.druid.server.coordination.SegmentBootstrapper;
 import org.apache.druid.server.coordination.ServerType;
 import org.apache.druid.server.coordination.ZkCoordinator;
 import org.apache.druid.server.http.HistoricalResource;
@@ -187,6 +188,7 @@ public class CliIndexer extends ServerRunnable
             if (isZkEnabled) {
               LifecycleModule.register(binder, ZkCoordinator.class);
             }
+            LifecycleModule.register(binder, SegmentBootstrapper.class);
 
             bindAnnouncer(
                 binder,
diff --git a/services/src/main/java/org/apache/druid/cli/CliPeon.java 
b/services/src/main/java/org/apache/druid/cli/CliPeon.java
index 1ca8ddf539f..f78a763cec4 100644
--- a/services/src/main/java/org/apache/druid/cli/CliPeon.java
+++ b/services/src/main/java/org/apache/druid/cli/CliPeon.java
@@ -125,6 +125,7 @@ import 
org.apache.druid.segment.realtime.firehose.ServiceAnnouncingChatHandlerPr
 import org.apache.druid.server.DruidNode;
 import org.apache.druid.server.ResponseContextConfig;
 import org.apache.druid.server.SegmentManager;
+import org.apache.druid.server.coordination.SegmentBootstrapper;
 import org.apache.druid.server.coordination.ServerType;
 import org.apache.druid.server.coordination.ZkCoordinator;
 import org.apache.druid.server.http.HistoricalResource;
@@ -553,6 +554,7 @@ public class CliPeon extends GuiceRunnable
       if (isZkEnabled) {
         LifecycleModule.register(binder, ZkCoordinator.class);
       }
+      LifecycleModule.register(binder, SegmentBootstrapper.class);
     }
 
     @Provides


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to