kfaraz commented on code in PR #16685: URL: https://github.com/apache/druid/pull/16685#discussion_r1663451383
########## server/src/main/java/org/apache/druid/server/coordination/SegmentBootstrapper.java: ########## @@ -0,0 +1,438 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordination; + +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.SettableFuture; +import com.google.inject.Inject; +import org.apache.druid.client.BootstrapSegmentsResponse; +import org.apache.druid.client.coordinator.CoordinatorClient; +import org.apache.druid.common.guava.FutureUtils; +import org.apache.druid.guice.ManageLifecycle; +import org.apache.druid.guice.ServerTypeConfig; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.Stopwatch; +import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.java.util.common.lifecycle.LifecycleStart; +import org.apache.druid.java.util.common.lifecycle.LifecycleStop; +import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; +import org.apache.druid.segment.loading.SegmentLoaderConfig; +import org.apache.druid.segment.loading.SegmentLoadingException; +import org.apache.druid.server.SegmentManager; +import org.apache.druid.timeline.DataSegment; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Responsible for bootstrapping cached segments from disk and bootstrap segments from the coordinator. + * Also, responsible for announcing itself as a data server if applicable once the bootstrapping functions Review Comment: ```suggestion * Also responsible for announcing the node as a data server if applicable, once the bootstrapping operations ``` ########## server/src/test/java/org/apache/druid/server/coordination/TestSegmentCacheManager.java: ########## @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordination; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import org.apache.druid.java.util.common.MapUtils; +import org.apache.druid.segment.ReferenceCountingSegment; +import org.apache.druid.segment.SegmentLazyLoadFailCallback; +import org.apache.druid.segment.loading.NoopSegmentCacheManager; +import org.apache.druid.segment.loading.TombstoneSegmentizerFactory; +import org.apache.druid.server.TestSegmentUtils; +import org.apache.druid.timeline.DataSegment; +import org.joda.time.Interval; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * A local cache manager to test the bootstrapping and segment add/remove operations. It stubs only the necessary + * methods to support these operations; any other method invoked will throw an exception from the base class, + * {@link NoopSegmentCacheManager}. + */ +class TestSegmentCacheManager extends NoopSegmentCacheManager +{ + private final List<DataSegment> cachedSegments; + + final List<DataSegment> observedBootstrapSegments; + final List<DataSegment> observedBootstrapSegmentsLoadedIntoPageCache; + final List<DataSegment> observedSegments; + final List<DataSegment> observedSegmentsLoadedIntoPageCache; + final List<DataSegment> observedSegmentsRemovedFromCache; + final AtomicInteger observedShutdownBootstrapCount; Review Comment: Should all of these be private? We can expose getters if needed. ########## server/src/main/java/org/apache/druid/server/coordination/SegmentBootstrapper.java: ########## @@ -0,0 +1,439 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordination; + +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.SettableFuture; +import com.google.inject.Inject; +import org.apache.druid.client.BootstrapSegmentsResponse; +import org.apache.druid.client.coordinator.CoordinatorClient; +import org.apache.druid.common.guava.FutureUtils; +import org.apache.druid.guice.ManageLifecycle; +import org.apache.druid.guice.ServerTypeConfig; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.Stopwatch; +import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.java.util.common.lifecycle.LifecycleStart; +import org.apache.druid.java.util.common.lifecycle.LifecycleStop; +import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; +import org.apache.druid.segment.loading.SegmentLoaderConfig; +import org.apache.druid.segment.loading.SegmentLoadingException; +import org.apache.druid.server.SegmentManager; +import org.apache.druid.timeline.DataSegment; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Responsible for bootstrapping cached segments from disk and bootstrap segments from the coordinator. Review Comment: ```suggestion * Responsible for bootstrapping segments already cached on disk and bootstrap segments fetched from the coordinator. ``` ########## server/src/main/java/org/apache/druid/server/coordination/SegmentBootstrapper.java: ########## @@ -0,0 +1,439 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordination; + +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.SettableFuture; +import com.google.inject.Inject; +import org.apache.druid.client.BootstrapSegmentsResponse; +import org.apache.druid.client.coordinator.CoordinatorClient; +import org.apache.druid.common.guava.FutureUtils; +import org.apache.druid.guice.ManageLifecycle; +import org.apache.druid.guice.ServerTypeConfig; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.Stopwatch; +import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.java.util.common.lifecycle.LifecycleStart; +import org.apache.druid.java.util.common.lifecycle.LifecycleStop; +import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; +import org.apache.druid.segment.loading.SegmentLoaderConfig; +import org.apache.druid.segment.loading.SegmentLoadingException; +import org.apache.druid.server.SegmentManager; +import org.apache.druid.timeline.DataSegment; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Responsible for bootstrapping cached segments from disk and bootstrap segments from the coordinator. + * Also, responsible for announcing itself as a data server if applicable once the bootstrapping functions + * are complete. + */ +@ManageLifecycle +public class SegmentBootstrapper +{ + private final SegmentLoadDropHandler loadDropHandler; + private final SegmentLoaderConfig config; + private final DataSegmentAnnouncer segmentAnnouncer; + private final DataSegmentServerAnnouncer serverAnnouncer; + private final SegmentManager segmentManager; + private final ServerTypeConfig serverTypeConfig; + private final CoordinatorClient coordinatorClient; + private final ServiceEmitter emitter; + + private volatile boolean started = false; + + // Synchronizes start/stop of this object. + private final Object startStopLock = new Object(); + + private static final EmittingLogger log = new EmittingLogger(SegmentBootstrapper.class); + + @Inject + SegmentBootstrapper( Review Comment: Better to keep this `public`. It would make future testing easier. ########## server/src/main/java/org/apache/druid/server/coordination/SegmentBootstrapper.java: ########## @@ -0,0 +1,439 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordination; + +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.SettableFuture; +import com.google.inject.Inject; +import org.apache.druid.client.BootstrapSegmentsResponse; +import org.apache.druid.client.coordinator.CoordinatorClient; +import org.apache.druid.common.guava.FutureUtils; +import org.apache.druid.guice.ManageLifecycle; +import org.apache.druid.guice.ServerTypeConfig; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.Stopwatch; +import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.java.util.common.lifecycle.LifecycleStart; +import org.apache.druid.java.util.common.lifecycle.LifecycleStop; +import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; +import org.apache.druid.segment.loading.SegmentLoaderConfig; +import org.apache.druid.segment.loading.SegmentLoadingException; +import org.apache.druid.server.SegmentManager; +import org.apache.druid.timeline.DataSegment; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Responsible for bootstrapping cached segments from disk and bootstrap segments from the coordinator. + * Also, responsible for announcing itself as a data server if applicable once the bootstrapping functions + * are complete. + */ +@ManageLifecycle +public class SegmentBootstrapper +{ + private final SegmentLoadDropHandler loadDropHandler; + private final SegmentLoaderConfig config; + private final DataSegmentAnnouncer segmentAnnouncer; + private final DataSegmentServerAnnouncer serverAnnouncer; + private final SegmentManager segmentManager; + private final ServerTypeConfig serverTypeConfig; + private final CoordinatorClient coordinatorClient; + private final ServiceEmitter emitter; + + private volatile boolean started = false; + + // Synchronizes start/stop of this object. + private final Object startStopLock = new Object(); + + private static final EmittingLogger log = new EmittingLogger(SegmentBootstrapper.class); + + @Inject + SegmentBootstrapper( + SegmentLoadDropHandler loadDropHandler, + SegmentLoaderConfig config, + DataSegmentAnnouncer segmentAnnouncer, + DataSegmentServerAnnouncer serverAnnouncer, + SegmentManager segmentManager, + ServerTypeConfig serverTypeConfig, + CoordinatorClient coordinatorClient, + ServiceEmitter emitter + ) + { + this.loadDropHandler = loadDropHandler; + this.config = config; + this.segmentAnnouncer = segmentAnnouncer; + this.serverAnnouncer = serverAnnouncer; + this.segmentManager = segmentManager; + this.serverTypeConfig = serverTypeConfig; + this.coordinatorClient = coordinatorClient; + this.emitter = emitter; + } + + @LifecycleStart + public void start() throws IOException + { + synchronized (startStopLock) { + if (started) { + return; + } + + log.info("Starting..."); + try { + if (segmentManager.canHandleSegments()) { + loadSegmentsOnStartup(); + } + + if (shouldAnnounce()) { + serverAnnouncer.announce(); + } + } + catch (Exception e) { + Throwables.propagateIfPossible(e, IOException.class); + throw new RuntimeException(e); + } + started = true; + log.info("Started."); + } + } + + @LifecycleStop + public void stop() + { + synchronized (startStopLock) { + if (!started) { + return; + } + + log.info("Stopping..."); + try { + if (shouldAnnounce()) { + serverAnnouncer.unannounce(); + } + } + catch (Exception e) { + throw new RuntimeException(e); + } + finally { + started = false; + } + log.info("Stopped."); + } + } + + public boolean isStarted() Review Comment: `isStarted()` made sense for `SegmentLoadDropHandler` as it was considered started once bootstrapping was complete. ```suggestion public boolean isBootstrappingComplete() ``` ########## server/src/main/java/org/apache/druid/server/coordination/SegmentBootstrapper.java: ########## @@ -0,0 +1,439 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordination; + +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.SettableFuture; +import com.google.inject.Inject; +import org.apache.druid.client.BootstrapSegmentsResponse; +import org.apache.druid.client.coordinator.CoordinatorClient; +import org.apache.druid.common.guava.FutureUtils; +import org.apache.druid.guice.ManageLifecycle; +import org.apache.druid.guice.ServerTypeConfig; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.Stopwatch; +import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.java.util.common.lifecycle.LifecycleStart; +import org.apache.druid.java.util.common.lifecycle.LifecycleStop; +import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; +import org.apache.druid.segment.loading.SegmentLoaderConfig; +import org.apache.druid.segment.loading.SegmentLoadingException; +import org.apache.druid.server.SegmentManager; +import org.apache.druid.timeline.DataSegment; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Responsible for bootstrapping cached segments from disk and bootstrap segments from the coordinator. + * Also, responsible for announcing itself as a data server if applicable once the bootstrapping functions + * are complete. + */ +@ManageLifecycle +public class SegmentBootstrapper Review Comment: Verified that the new code in `SegmentBootstrapper` is the same as the old code in `SegmentLoadDropHandler`. Built the following diff to go through the differences: ```diff diff --git a/MovedCode.java b/MovedCode.java index a853554..0c285bb 100644 --- a/MovedCode.java +++ b/MovedCode.java @@ -68,12 +68,17 @@ final Stopwatch stopwatch = Stopwatch.createStarted(); // Start a temporary thread pool to load segments into page cache during bootstrap - final ExecutorService loadingExecutor = Execs.multiThreaded( - config.getNumBootstrapThreads(), "Segment-Load-Startup-%s" + final ExecutorService bootstrapExecutor = Execs.multiThreaded( + config.getNumBootstrapThreads(), "Segment-Bootstrap-%s" + ); + + // Start a temporary scheduled executor for background segment announcing + final ScheduledExecutorService backgroundAnnouncerExecutor = Executors.newScheduledThreadPool( + config.getNumLoadingThreads(), Execs.makeThreadFactory("Background-Segment-Announcer-%s") ); try (final BackgroundSegmentAnnouncer backgroundSegmentAnnouncer = - new BackgroundSegmentAnnouncer(announcer, exec, config.getAnnounceIntervalMillis())) { + new BackgroundSegmentAnnouncer(segmentAnnouncer, backgroundAnnouncerExecutor, config.getAnnounceIntervalMillis())) { backgroundSegmentAnnouncer.startAnnouncing(); @@ -82,7 +87,7 @@ final AtomicInteger counter = new AtomicInteger(0); final CopyOnWriteArrayList<DataSegment> failedSegments = new CopyOnWriteArrayList<>(); for (final DataSegment segment : segmentsOnStartup) { - loadingExecutor.submit( + bootstrapExecutor.submit( () -> { try { log.info( @@ -92,11 +97,11 @@ try { segmentManager.loadSegmentOnBootstrap( segment, - () -> this.removeSegment(segment, DataSegmentChangeCallback.NOOP, false) + () -> loadDropHandler.removeSegment(segment, DataSegmentChangeCallback.NOOP, false) ); } catch (Exception e) { - removeSegment(segment, DataSegmentChangeCallback.NOOP, false); + loadDropHandler.removeSegment(segment, DataSegmentChangeCallback.NOOP, false); throw new SegmentLoadingException(e, "Exception loading segment[%s]", segment.getId()); } try { @@ -140,7 +145,8 @@ .emit(); } finally { - loadingExecutor.shutdownNow(); + bootstrapExecutor.shutdownNow(); + backgroundAnnouncerExecutor.shutdownNow(); stopwatch.stop(); // At this stage, all tasks have been submitted, send a shutdown command to cleanup any resources alloted // for the bootstrapping function. @@ -165,8 +171,6 @@ bootstrapSegments = ImmutableList.copyOf(response.getIterator()); } catch (Exception e) { - // By default, we "fail open" when there is any error -- finding the coordinator, or if the API endpoint cannot - // be found during rolling upgrades, or even if it's irrecoverable. log.warn("Error fetching bootstrap segments from the coordinator: [%s]. ", e.getMessage()); } finally { @@ -199,8 +203,8 @@ { private static final EmittingLogger log = new EmittingLogger(BackgroundSegmentAnnouncer.class); - private final int intervalMillis; - private final DataSegmentAnnouncer announcer; + private final int announceIntervalMillis; + private final DataSegmentAnnouncer segmentAnnouncer; private final ScheduledExecutorService exec; private final LinkedBlockingQueue<DataSegment> queue; private final SettableFuture<Boolean> doneAnnouncing; @@ -213,15 +217,15 @@ @Nullable private volatile ScheduledFuture nextAnnoucement = null; - public BackgroundSegmentAnnouncer( - DataSegmentAnnouncer announcer, + BackgroundSegmentAnnouncer( + DataSegmentAnnouncer segmentAnnouncer, ScheduledExecutorService exec, - int intervalMillis + int announceIntervalMillis ) { - this.announcer = announcer; + this.segmentAnnouncer = segmentAnnouncer; this.exec = exec; - this.intervalMillis = intervalMillis; + this.announceIntervalMillis = announceIntervalMillis; this.queue = new LinkedBlockingQueue<>(); this.doneAnnouncing = SettableFuture.create(); } @@ -236,7 +240,8 @@ public void startAnnouncing() { - if (intervalMillis <= 0) { + if (announceIntervalMillis <= 0) { + log.info("Skipping background segment announcing as announceIntervalMillis is [%d].", announceIntervalMillis); return; } @@ -255,8 +260,8 @@ final List<DataSegment> segments = new ArrayList<>(); queue.drainTo(segments); try { - announcer.announceSegments(segments); - nextAnnoucement = exec.schedule(this, intervalMillis, TimeUnit.MILLISECONDS); + segmentAnnouncer.announceSegments(segments); + nextAnnoucement = exec.schedule(this, announceIntervalMillis, TimeUnit.MILLISECONDS); } catch (IOException e) { doneAnnouncing.setException( @@ -273,7 +278,7 @@ } } }, - intervalMillis, + announceIntervalMillis, TimeUnit.MILLISECONDS ); } @@ -286,7 +291,7 @@ try { final List<DataSegment> segments = new ArrayList<>(); queue.drainTo(segments); - announcer.announceSegments(segments); + segmentAnnouncer.announceSegments(segments); } catch (Exception e) { throw new SegmentLoadingException(e, "Failed to announce segments[%s]", queue); ``` ########## server/src/main/java/org/apache/druid/server/coordination/SegmentBootstrapper.java: ########## @@ -0,0 +1,439 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordination; + +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.SettableFuture; +import com.google.inject.Inject; +import org.apache.druid.client.BootstrapSegmentsResponse; +import org.apache.druid.client.coordinator.CoordinatorClient; +import org.apache.druid.common.guava.FutureUtils; +import org.apache.druid.guice.ManageLifecycle; +import org.apache.druid.guice.ServerTypeConfig; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.Stopwatch; +import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.java.util.common.lifecycle.LifecycleStart; +import org.apache.druid.java.util.common.lifecycle.LifecycleStop; +import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; +import org.apache.druid.segment.loading.SegmentLoaderConfig; +import org.apache.druid.segment.loading.SegmentLoadingException; +import org.apache.druid.server.SegmentManager; +import org.apache.druid.timeline.DataSegment; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Responsible for bootstrapping cached segments from disk and bootstrap segments from the coordinator. + * Also, responsible for announcing itself as a data server if applicable once the bootstrapping functions + * are complete. + */ +@ManageLifecycle +public class SegmentBootstrapper +{ + private final SegmentLoadDropHandler loadDropHandler; + private final SegmentLoaderConfig config; + private final DataSegmentAnnouncer segmentAnnouncer; + private final DataSegmentServerAnnouncer serverAnnouncer; + private final SegmentManager segmentManager; + private final ServerTypeConfig serverTypeConfig; + private final CoordinatorClient coordinatorClient; + private final ServiceEmitter emitter; + + private volatile boolean started = false; Review Comment: ```suggestion private volatile boolean isComplete = false; ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
