kfaraz commented on code in PR #16685:
URL: https://github.com/apache/druid/pull/16685#discussion_r1663451383


##########
server/src/main/java/org/apache/druid/server/coordination/SegmentBootstrapper.java:
##########
@@ -0,0 +1,438 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordination;
+
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.SettableFuture;
+import com.google.inject.Inject;
+import org.apache.druid.client.BootstrapSegmentsResponse;
+import org.apache.druid.client.coordinator.CoordinatorClient;
+import org.apache.druid.common.guava.FutureUtils;
+import org.apache.druid.guice.ManageLifecycle;
+import org.apache.druid.guice.ServerTypeConfig;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.Stopwatch;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.segment.loading.SegmentLoaderConfig;
+import org.apache.druid.segment.loading.SegmentLoadingException;
+import org.apache.druid.server.SegmentManager;
+import org.apache.druid.timeline.DataSegment;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Responsible for bootstrapping cached segments from disk and bootstrap 
segments from the coordinator.
+ * Also, responsible for announcing itself as a data server if applicable once 
the bootstrapping functions

Review Comment:
   ```suggestion
    * Also responsible for announcing the node as a data server if applicable, 
once the bootstrapping operations
   ```



##########
server/src/test/java/org/apache/druid/server/coordination/TestSegmentCacheManager.java:
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordination;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import org.apache.druid.java.util.common.MapUtils;
+import org.apache.druid.segment.ReferenceCountingSegment;
+import org.apache.druid.segment.SegmentLazyLoadFailCallback;
+import org.apache.druid.segment.loading.NoopSegmentCacheManager;
+import org.apache.druid.segment.loading.TombstoneSegmentizerFactory;
+import org.apache.druid.server.TestSegmentUtils;
+import org.apache.druid.timeline.DataSegment;
+import org.joda.time.Interval;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * A local cache manager to test the bootstrapping and segment add/remove 
operations. It stubs only the necessary
+ * methods to support these operations; any other method invoked will throw an 
exception from the base class,
+ * {@link NoopSegmentCacheManager}.
+ */
+class TestSegmentCacheManager extends NoopSegmentCacheManager
+{
+  private final List<DataSegment> cachedSegments;
+
+  final List<DataSegment> observedBootstrapSegments;
+  final List<DataSegment> observedBootstrapSegmentsLoadedIntoPageCache;
+  final List<DataSegment> observedSegments;
+  final List<DataSegment> observedSegmentsLoadedIntoPageCache;
+  final List<DataSegment> observedSegmentsRemovedFromCache;
+  final AtomicInteger observedShutdownBootstrapCount;

Review Comment:
   Should all of these be private? We can expose getters if needed.



##########
server/src/main/java/org/apache/druid/server/coordination/SegmentBootstrapper.java:
##########
@@ -0,0 +1,439 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordination;
+
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.SettableFuture;
+import com.google.inject.Inject;
+import org.apache.druid.client.BootstrapSegmentsResponse;
+import org.apache.druid.client.coordinator.CoordinatorClient;
+import org.apache.druid.common.guava.FutureUtils;
+import org.apache.druid.guice.ManageLifecycle;
+import org.apache.druid.guice.ServerTypeConfig;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.Stopwatch;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.segment.loading.SegmentLoaderConfig;
+import org.apache.druid.segment.loading.SegmentLoadingException;
+import org.apache.druid.server.SegmentManager;
+import org.apache.druid.timeline.DataSegment;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Responsible for bootstrapping cached segments from disk and bootstrap 
segments from the coordinator.

Review Comment:
   ```suggestion
    * Responsible for bootstrapping segments already cached on disk and 
bootstrap segments fetched from the coordinator.
   ```



##########
server/src/main/java/org/apache/druid/server/coordination/SegmentBootstrapper.java:
##########
@@ -0,0 +1,439 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordination;
+
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.SettableFuture;
+import com.google.inject.Inject;
+import org.apache.druid.client.BootstrapSegmentsResponse;
+import org.apache.druid.client.coordinator.CoordinatorClient;
+import org.apache.druid.common.guava.FutureUtils;
+import org.apache.druid.guice.ManageLifecycle;
+import org.apache.druid.guice.ServerTypeConfig;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.Stopwatch;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.segment.loading.SegmentLoaderConfig;
+import org.apache.druid.segment.loading.SegmentLoadingException;
+import org.apache.druid.server.SegmentManager;
+import org.apache.druid.timeline.DataSegment;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Responsible for bootstrapping cached segments from disk and bootstrap 
segments from the coordinator.
+ * Also, responsible for announcing itself as a data server if applicable once 
the bootstrapping functions
+ * are complete.
+ */
+@ManageLifecycle
+public class SegmentBootstrapper
+{
+  private final SegmentLoadDropHandler loadDropHandler;
+  private final SegmentLoaderConfig config;
+  private final DataSegmentAnnouncer segmentAnnouncer;
+  private final DataSegmentServerAnnouncer serverAnnouncer;
+  private final SegmentManager segmentManager;
+  private final ServerTypeConfig serverTypeConfig;
+  private final CoordinatorClient coordinatorClient;
+  private final ServiceEmitter emitter;
+
+  private volatile boolean started = false;
+
+  // Synchronizes start/stop of this object.
+  private final Object startStopLock = new Object();
+
+  private static final EmittingLogger log = new 
EmittingLogger(SegmentBootstrapper.class);
+
+  @Inject
+  SegmentBootstrapper(

Review Comment:
   Better to keep this `public`. It would make future testing easier.



##########
server/src/main/java/org/apache/druid/server/coordination/SegmentBootstrapper.java:
##########
@@ -0,0 +1,439 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordination;
+
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.SettableFuture;
+import com.google.inject.Inject;
+import org.apache.druid.client.BootstrapSegmentsResponse;
+import org.apache.druid.client.coordinator.CoordinatorClient;
+import org.apache.druid.common.guava.FutureUtils;
+import org.apache.druid.guice.ManageLifecycle;
+import org.apache.druid.guice.ServerTypeConfig;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.Stopwatch;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.segment.loading.SegmentLoaderConfig;
+import org.apache.druid.segment.loading.SegmentLoadingException;
+import org.apache.druid.server.SegmentManager;
+import org.apache.druid.timeline.DataSegment;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Responsible for bootstrapping cached segments from disk and bootstrap 
segments from the coordinator.
+ * Also, responsible for announcing itself as a data server if applicable once 
the bootstrapping functions
+ * are complete.
+ */
+@ManageLifecycle
+public class SegmentBootstrapper
+{
+  private final SegmentLoadDropHandler loadDropHandler;
+  private final SegmentLoaderConfig config;
+  private final DataSegmentAnnouncer segmentAnnouncer;
+  private final DataSegmentServerAnnouncer serverAnnouncer;
+  private final SegmentManager segmentManager;
+  private final ServerTypeConfig serverTypeConfig;
+  private final CoordinatorClient coordinatorClient;
+  private final ServiceEmitter emitter;
+
+  private volatile boolean started = false;
+
+  // Synchronizes start/stop of this object.
+  private final Object startStopLock = new Object();
+
+  private static final EmittingLogger log = new 
EmittingLogger(SegmentBootstrapper.class);
+
+  @Inject
+  SegmentBootstrapper(
+      SegmentLoadDropHandler loadDropHandler,
+      SegmentLoaderConfig config,
+      DataSegmentAnnouncer segmentAnnouncer,
+      DataSegmentServerAnnouncer serverAnnouncer,
+      SegmentManager segmentManager,
+      ServerTypeConfig serverTypeConfig,
+      CoordinatorClient coordinatorClient,
+      ServiceEmitter emitter
+  )
+  {
+    this.loadDropHandler = loadDropHandler;
+    this.config = config;
+    this.segmentAnnouncer = segmentAnnouncer;
+    this.serverAnnouncer = serverAnnouncer;
+    this.segmentManager = segmentManager;
+    this.serverTypeConfig = serverTypeConfig;
+    this.coordinatorClient = coordinatorClient;
+    this.emitter = emitter;
+  }
+
+  @LifecycleStart
+  public void start() throws IOException
+  {
+    synchronized (startStopLock) {
+      if (started) {
+        return;
+      }
+
+      log.info("Starting...");
+      try {
+        if (segmentManager.canHandleSegments()) {
+          loadSegmentsOnStartup();
+        }
+
+        if (shouldAnnounce()) {
+          serverAnnouncer.announce();
+        }
+      }
+      catch (Exception e) {
+        Throwables.propagateIfPossible(e, IOException.class);
+        throw new RuntimeException(e);
+      }
+      started = true;
+      log.info("Started.");
+    }
+  }
+
+  @LifecycleStop
+  public void stop()
+  {
+    synchronized (startStopLock) {
+      if (!started) {
+        return;
+      }
+
+      log.info("Stopping...");
+      try {
+        if (shouldAnnounce()) {
+          serverAnnouncer.unannounce();
+        }
+      }
+      catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+      finally {
+        started = false;
+      }
+      log.info("Stopped.");
+    }
+  }
+
+  public boolean isStarted()

Review Comment:
   `isStarted()` made sense for `SegmentLoadDropHandler` as it was considered 
started once bootstrapping was complete.
   
   ```suggestion
     public boolean isBootstrappingComplete()
   ```



##########
server/src/main/java/org/apache/druid/server/coordination/SegmentBootstrapper.java:
##########
@@ -0,0 +1,439 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordination;
+
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.SettableFuture;
+import com.google.inject.Inject;
+import org.apache.druid.client.BootstrapSegmentsResponse;
+import org.apache.druid.client.coordinator.CoordinatorClient;
+import org.apache.druid.common.guava.FutureUtils;
+import org.apache.druid.guice.ManageLifecycle;
+import org.apache.druid.guice.ServerTypeConfig;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.Stopwatch;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.segment.loading.SegmentLoaderConfig;
+import org.apache.druid.segment.loading.SegmentLoadingException;
+import org.apache.druid.server.SegmentManager;
+import org.apache.druid.timeline.DataSegment;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Responsible for bootstrapping cached segments from disk and bootstrap 
segments from the coordinator.
+ * Also, responsible for announcing itself as a data server if applicable once 
the bootstrapping functions
+ * are complete.
+ */
+@ManageLifecycle
+public class SegmentBootstrapper

Review Comment:
   Verified that the new code in `SegmentBootstrapper` is the same as the old 
code in `SegmentLoadDropHandler`.
   
   Built the following diff to go through the differences:
   
   ```diff
   diff --git a/MovedCode.java b/MovedCode.java
   index a853554..0c285bb 100644
   --- a/MovedCode.java
   +++ b/MovedCode.java
   @@ -68,12 +68,17 @@
        final Stopwatch stopwatch = Stopwatch.createStarted();
    
        // Start a temporary thread pool to load segments into page cache 
during bootstrap
   -    final ExecutorService loadingExecutor = Execs.multiThreaded(
   -        config.getNumBootstrapThreads(), "Segment-Load-Startup-%s"
   +    final ExecutorService bootstrapExecutor = Execs.multiThreaded(
   +        config.getNumBootstrapThreads(), "Segment-Bootstrap-%s"
   +    );
   +
   +    // Start a temporary scheduled executor for background segment 
announcing
   +    final ScheduledExecutorService backgroundAnnouncerExecutor = 
Executors.newScheduledThreadPool(
   +        config.getNumLoadingThreads(), 
Execs.makeThreadFactory("Background-Segment-Announcer-%s")
        );
    
        try (final BackgroundSegmentAnnouncer backgroundSegmentAnnouncer =
   -             new BackgroundSegmentAnnouncer(announcer, exec, 
config.getAnnounceIntervalMillis())) {
   +             new BackgroundSegmentAnnouncer(segmentAnnouncer, 
backgroundAnnouncerExecutor, config.getAnnounceIntervalMillis())) {
    
          backgroundSegmentAnnouncer.startAnnouncing();
    
   @@ -82,7 +87,7 @@
          final AtomicInteger counter = new AtomicInteger(0);
          final CopyOnWriteArrayList<DataSegment> failedSegments = new 
CopyOnWriteArrayList<>();
          for (final DataSegment segment : segmentsOnStartup) {
   -        loadingExecutor.submit(
   +        bootstrapExecutor.submit(
                () -> {
                  try {
                    log.info(
   @@ -92,11 +97,11 @@
                    try {
                      segmentManager.loadSegmentOnBootstrap(
                          segment,
   -                      () -> this.removeSegment(segment, 
DataSegmentChangeCallback.NOOP, false)
   +                      () -> loadDropHandler.removeSegment(segment, 
DataSegmentChangeCallback.NOOP, false)
                      );
                    }
                    catch (Exception e) {
   -                  removeSegment(segment, DataSegmentChangeCallback.NOOP, 
false);
   +                  loadDropHandler.removeSegment(segment, 
DataSegmentChangeCallback.NOOP, false);
                      throw new SegmentLoadingException(e, "Exception loading 
segment[%s]", segment.getId());
                    }
                    try {
   @@ -140,7 +145,8 @@
             .emit();
        }
        finally {
   -      loadingExecutor.shutdownNow();
   +      bootstrapExecutor.shutdownNow();
   +      backgroundAnnouncerExecutor.shutdownNow();
          stopwatch.stop();
          // At this stage, all tasks have been submitted, send a shutdown 
command to cleanup any resources alloted
          // for the bootstrapping function.
   @@ -165,8 +171,6 @@
          bootstrapSegments = ImmutableList.copyOf(response.getIterator());
        }
        catch (Exception e) {
   -      // By default, we "fail open" when there is any error -- finding the 
coordinator, or if the API endpoint cannot
   -      // be found during rolling upgrades, or even if it's irrecoverable.
          log.warn("Error fetching bootstrap segments from the coordinator: 
[%s]. ", e.getMessage());
        }
        finally {
   @@ -199,8 +203,8 @@
      {
        private static final EmittingLogger log = new 
EmittingLogger(BackgroundSegmentAnnouncer.class);
    
   -    private final int intervalMillis;
   -    private final DataSegmentAnnouncer announcer;
   +    private final int announceIntervalMillis;
   +    private final DataSegmentAnnouncer segmentAnnouncer;
        private final ScheduledExecutorService exec;
        private final LinkedBlockingQueue<DataSegment> queue;
        private final SettableFuture<Boolean> doneAnnouncing;
   @@ -213,15 +217,15 @@
        @Nullable
        private volatile ScheduledFuture nextAnnoucement = null;
    
   -    public BackgroundSegmentAnnouncer(
   -        DataSegmentAnnouncer announcer,
   +    BackgroundSegmentAnnouncer(
   +        DataSegmentAnnouncer segmentAnnouncer,
            ScheduledExecutorService exec,
   -        int intervalMillis
   +        int announceIntervalMillis
        )
        {
   -      this.announcer = announcer;
   +      this.segmentAnnouncer = segmentAnnouncer;
          this.exec = exec;
   -      this.intervalMillis = intervalMillis;
   +      this.announceIntervalMillis = announceIntervalMillis;
          this.queue = new LinkedBlockingQueue<>();
          this.doneAnnouncing = SettableFuture.create();
        }
   @@ -236,7 +240,8 @@
    
        public void startAnnouncing()
        {
   -      if (intervalMillis <= 0) {
   +      if (announceIntervalMillis <= 0) {
   +        log.info("Skipping background segment announcing as 
announceIntervalMillis is [%d].", announceIntervalMillis);
            return;
          }
    
   @@ -255,8 +260,8 @@
                        final List<DataSegment> segments = new ArrayList<>();
                        queue.drainTo(segments);
                        try {
   -                      announcer.announceSegments(segments);
   -                      nextAnnoucement = exec.schedule(this, intervalMillis, 
TimeUnit.MILLISECONDS);
   +                      segmentAnnouncer.announceSegments(segments);
   +                      nextAnnoucement = exec.schedule(this, 
announceIntervalMillis, TimeUnit.MILLISECONDS);
                        }
                        catch (IOException e) {
                          doneAnnouncing.setException(
   @@ -273,7 +278,7 @@
                  }
                }
              },
   -          intervalMillis,
   +          announceIntervalMillis,
              TimeUnit.MILLISECONDS
          );
        }
   @@ -286,7 +291,7 @@
            try {
              final List<DataSegment> segments = new ArrayList<>();
              queue.drainTo(segments);
   -          announcer.announceSegments(segments);
   +          segmentAnnouncer.announceSegments(segments);
            }
            catch (Exception e) {
              throw new SegmentLoadingException(e, "Failed to announce 
segments[%s]", queue);
   ```



##########
server/src/main/java/org/apache/druid/server/coordination/SegmentBootstrapper.java:
##########
@@ -0,0 +1,439 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordination;
+
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.SettableFuture;
+import com.google.inject.Inject;
+import org.apache.druid.client.BootstrapSegmentsResponse;
+import org.apache.druid.client.coordinator.CoordinatorClient;
+import org.apache.druid.common.guava.FutureUtils;
+import org.apache.druid.guice.ManageLifecycle;
+import org.apache.druid.guice.ServerTypeConfig;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.Stopwatch;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.segment.loading.SegmentLoaderConfig;
+import org.apache.druid.segment.loading.SegmentLoadingException;
+import org.apache.druid.server.SegmentManager;
+import org.apache.druid.timeline.DataSegment;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Responsible for bootstrapping cached segments from disk and bootstrap 
segments from the coordinator.
+ * Also, responsible for announcing itself as a data server if applicable once 
the bootstrapping functions
+ * are complete.
+ */
+@ManageLifecycle
+public class SegmentBootstrapper
+{
+  private final SegmentLoadDropHandler loadDropHandler;
+  private final SegmentLoaderConfig config;
+  private final DataSegmentAnnouncer segmentAnnouncer;
+  private final DataSegmentServerAnnouncer serverAnnouncer;
+  private final SegmentManager segmentManager;
+  private final ServerTypeConfig serverTypeConfig;
+  private final CoordinatorClient coordinatorClient;
+  private final ServiceEmitter emitter;
+
+  private volatile boolean started = false;

Review Comment:
   ```suggestion
     private volatile boolean isComplete = false;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to