This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new bf2be938a93 Refactor `SegmentLoadDropHandler` code (#16685)
bf2be938a93 is described below
commit bf2be938a93aa3819901bfcc65adac66ce2bc75a
Author: Abhishek Radhakrishnan <[email protected]>
AuthorDate: Sun Jul 7 20:59:55 2024 -0700
Refactor `SegmentLoadDropHandler` code (#16685)
Motivation:
- Improve code hygeiene
- Make `SegmentLoadDropHandler` easily extensible
Changes:
- Add `SegmentBootstrapper`
- Move code for bootstrapping segments already cached on disk and fetched
from coordinator to
`SegmentBootstrapper`.
- No functional change
- Use separate executor service in `SegmentBootstrapper`
- Bind `SegmentBootstrapper` to `ManageLifecycle` explicitly in
`CliBroker`, `CliHistorical` etc.
---
.../coordination/ChangeRequestsSnapshot.java | 1 -
.../server/coordination/SegmentBootstrapper.java | 439 +++++++++++++++++++++
.../coordination/SegmentChangeRequestDrop.java | 1 -
.../coordination/SegmentChangeRequestLoad.java | 1 -
.../coordination/SegmentLoadDropHandler.java | 384 +-----------------
.../druid/server/http/HistoricalResource.java | 12 +-
...Test.java => SegmentBootstrapperCacheTest.java} | 51 ++-
.../coordination/SegmentBootstrapperTest.java | 306 ++++++++++++++
.../coordination/SegmentLoadDropHandlerTest.java | 383 ++----------------
.../coordination/TestSegmentCacheManager.java | 175 ++++++++
.../server/coordination/ZkCoordinatorTest.java | 9 +-
.../main/java/org/apache/druid/cli/CliBroker.java | 2 +
.../java/org/apache/druid/cli/CliHistorical.java | 2 +
.../main/java/org/apache/druid/cli/CliIndexer.java | 2 +
.../main/java/org/apache/druid/cli/CliPeon.java | 2 +
15 files changed, 1001 insertions(+), 769 deletions(-)
diff --git
a/server/src/main/java/org/apache/druid/server/coordination/ChangeRequestsSnapshot.java
b/server/src/main/java/org/apache/druid/server/coordination/ChangeRequestsSnapshot.java
index 14113bed6b2..a86453df66d 100644
---
a/server/src/main/java/org/apache/druid/server/coordination/ChangeRequestsSnapshot.java
+++
b/server/src/main/java/org/apache/druid/server/coordination/ChangeRequestsSnapshot.java
@@ -24,7 +24,6 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import javax.annotation.Nullable;
-
import java.util.List;
/**
diff --git
a/server/src/main/java/org/apache/druid/server/coordination/SegmentBootstrapper.java
b/server/src/main/java/org/apache/druid/server/coordination/SegmentBootstrapper.java
new file mode 100644
index 00000000000..c5b71fbcddc
--- /dev/null
+++
b/server/src/main/java/org/apache/druid/server/coordination/SegmentBootstrapper.java
@@ -0,0 +1,439 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordination;
+
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.SettableFuture;
+import com.google.inject.Inject;
+import org.apache.druid.client.BootstrapSegmentsResponse;
+import org.apache.druid.client.coordinator.CoordinatorClient;
+import org.apache.druid.common.guava.FutureUtils;
+import org.apache.druid.guice.ManageLifecycle;
+import org.apache.druid.guice.ServerTypeConfig;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.Stopwatch;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.segment.loading.SegmentLoaderConfig;
+import org.apache.druid.segment.loading.SegmentLoadingException;
+import org.apache.druid.server.SegmentManager;
+import org.apache.druid.timeline.DataSegment;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Responsible for bootstrapping segments already cached on disk and bootstrap
segments fetched from the coordinator.
+ * Also responsible for announcing the node as a data server if applicable,
once the bootstrapping operations
+ * are complete.
+ */
+@ManageLifecycle
+public class SegmentBootstrapper
+{
+ private final SegmentLoadDropHandler loadDropHandler;
+ private final SegmentLoaderConfig config;
+ private final DataSegmentAnnouncer segmentAnnouncer;
+ private final DataSegmentServerAnnouncer serverAnnouncer;
+ private final SegmentManager segmentManager;
+ private final ServerTypeConfig serverTypeConfig;
+ private final CoordinatorClient coordinatorClient;
+ private final ServiceEmitter emitter;
+
+ private volatile boolean isComplete = false;
+
+ // Synchronizes start/stop of this object.
+ private final Object startStopLock = new Object();
+
+ private static final EmittingLogger log = new
EmittingLogger(SegmentBootstrapper.class);
+
+ @Inject
+ public SegmentBootstrapper(
+ SegmentLoadDropHandler loadDropHandler,
+ SegmentLoaderConfig config,
+ DataSegmentAnnouncer segmentAnnouncer,
+ DataSegmentServerAnnouncer serverAnnouncer,
+ SegmentManager segmentManager,
+ ServerTypeConfig serverTypeConfig,
+ CoordinatorClient coordinatorClient,
+ ServiceEmitter emitter
+ )
+ {
+ this.loadDropHandler = loadDropHandler;
+ this.config = config;
+ this.segmentAnnouncer = segmentAnnouncer;
+ this.serverAnnouncer = serverAnnouncer;
+ this.segmentManager = segmentManager;
+ this.serverTypeConfig = serverTypeConfig;
+ this.coordinatorClient = coordinatorClient;
+ this.emitter = emitter;
+ }
+
+ @LifecycleStart
+ public void start() throws IOException
+ {
+ synchronized (startStopLock) {
+ if (isComplete) {
+ return;
+ }
+
+ log.info("Starting...");
+ try {
+ if (segmentManager.canHandleSegments()) {
+ loadSegmentsOnStartup();
+ }
+
+ if (shouldAnnounce()) {
+ serverAnnouncer.announce();
+ }
+ }
+ catch (Exception e) {
+ Throwables.propagateIfPossible(e, IOException.class);
+ throw new RuntimeException(e);
+ }
+ isComplete = true;
+ log.info("Started.");
+ }
+ }
+
+ @LifecycleStop
+ public void stop()
+ {
+ synchronized (startStopLock) {
+ if (!isComplete) {
+ return;
+ }
+
+ log.info("Stopping...");
+ try {
+ if (shouldAnnounce()) {
+ serverAnnouncer.unannounce();
+ }
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ finally {
+ isComplete = false;
+ }
+ log.info("Stopped.");
+ }
+ }
+
+ public boolean isBootstrappingComplete()
+ {
+ return isComplete;
+ }
+
+ /**
+ * Bulk loading of the following segments into the page cache at startup:
+ * <li> Previously cached segments </li>
+ * <li> Bootstrap segments from the coordinator </li>
+ */
+ private void loadSegmentsOnStartup() throws IOException
+ {
+ final List<DataSegment> segmentsOnStartup = new ArrayList<>();
+ segmentsOnStartup.addAll(segmentManager.getCachedSegments());
+ segmentsOnStartup.addAll(getBootstrapSegments());
+
+ final Stopwatch stopwatch = Stopwatch.createStarted();
+
+ // Start a temporary thread pool to load segments into page cache during
bootstrap
+ final ExecutorService bootstrapExecutor = Execs.multiThreaded(
+ config.getNumBootstrapThreads(), "Segment-Bootstrap-%s"
+ );
+
+ // Start a temporary scheduled executor for background segment announcing
+ final ScheduledExecutorService backgroundAnnouncerExecutor =
Executors.newScheduledThreadPool(
+ config.getNumLoadingThreads(),
Execs.makeThreadFactory("Background-Segment-Announcer-%s")
+ );
+
+ try (final BackgroundSegmentAnnouncer backgroundSegmentAnnouncer =
+ new BackgroundSegmentAnnouncer(segmentAnnouncer,
backgroundAnnouncerExecutor, config.getAnnounceIntervalMillis())) {
+
+ backgroundSegmentAnnouncer.startAnnouncing();
+
+ final int numSegments = segmentsOnStartup.size();
+ final CountDownLatch latch = new CountDownLatch(numSegments);
+ final AtomicInteger counter = new AtomicInteger(0);
+ final CopyOnWriteArrayList<DataSegment> failedSegments = new
CopyOnWriteArrayList<>();
+ for (final DataSegment segment : segmentsOnStartup) {
+ bootstrapExecutor.submit(
+ () -> {
+ try {
+ log.info(
+ "Loading segment[%d/%d][%s]",
+ counter.incrementAndGet(), numSegments, segment.getId()
+ );
+ try {
+ segmentManager.loadSegmentOnBootstrap(
+ segment,
+ () -> loadDropHandler.removeSegment(segment,
DataSegmentChangeCallback.NOOP, false)
+ );
+ }
+ catch (Exception e) {
+ loadDropHandler.removeSegment(segment,
DataSegmentChangeCallback.NOOP, false);
+ throw new SegmentLoadingException(e, "Exception loading
segment[%s]", segment.getId());
+ }
+ try {
+ backgroundSegmentAnnouncer.announceSegment(segment);
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new SegmentLoadingException(e, "Loading Interrupted");
+ }
+ }
+ catch (SegmentLoadingException e) {
+ log.error(e, "[%s] failed to load", segment.getId());
+ failedSegments.add(segment);
+ }
+ finally {
+ latch.countDown();
+ }
+ }
+ );
+ }
+
+ try {
+ latch.await();
+
+ if (failedSegments.size() > 0) {
+ log.makeAlert("[%,d] errors seen while loading segments on startup",
failedSegments.size())
+ .addData("failedSegments", failedSegments)
+ .emit();
+ }
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ log.makeAlert(e, "LoadingInterrupted").emit();
+ }
+
+ backgroundSegmentAnnouncer.finishAnnouncing();
+ }
+ catch (SegmentLoadingException e) {
+ log.makeAlert(e, "Failed to load segments on startup -- likely problem
with announcing.")
+ .addData("numSegments", segmentsOnStartup.size())
+ .emit();
+ }
+ finally {
+ bootstrapExecutor.shutdownNow();
+ backgroundAnnouncerExecutor.shutdownNow();
+ stopwatch.stop();
+ // At this stage, all tasks have been submitted, send a shutdown command
to cleanup any resources alloted
+ // for the bootstrapping function.
+ segmentManager.shutdownBootstrap();
+ log.info("Loaded [%d] segments on startup in [%,d]ms.",
segmentsOnStartup.size(), stopwatch.millisElapsed());
+ }
+ }
+
+ /**
+ * @return a list of bootstrap segments. When bootstrap segments cannot be
found, an empty list is returned.
+ */
+ private List<DataSegment> getBootstrapSegments()
+ {
+ log.info("Fetching bootstrap segments from the coordinator.");
+ final Stopwatch stopwatch = Stopwatch.createStarted();
+
+ List<DataSegment> bootstrapSegments = new ArrayList<>();
+
+ try {
+ final BootstrapSegmentsResponse response =
+ FutureUtils.getUnchecked(coordinatorClient.fetchBootstrapSegments(),
true);
+ bootstrapSegments = ImmutableList.copyOf(response.getIterator());
+ }
+ catch (Exception e) {
+ log.warn("Error fetching bootstrap segments from the coordinator: [%s].
", e.getMessage());
+ }
+ finally {
+ stopwatch.stop();
+ final long fetchRunMillis = stopwatch.millisElapsed();
+ emitter.emit(new
ServiceMetricEvent.Builder().setMetric("segment/bootstrap/time",
fetchRunMillis));
+ emitter.emit(new
ServiceMetricEvent.Builder().setMetric("segment/bootstrap/count",
bootstrapSegments.size()));
+ log.info("Fetched [%d] bootstrap segments in [%d]ms.",
bootstrapSegments.size(), fetchRunMillis);
+ }
+
+ return bootstrapSegments;
+ }
+
+ /**
+ * Returns whether or not we should announce ourselves as a data server
using {@link DataSegmentServerAnnouncer}.
+ *
+ * Returns true if _either_:
+ *
+ * <li> Our {@link #serverTypeConfig} indicates we are a segment server.
This is necessary for Brokers to be able
+ * to detect that we exist.</li>
+ * <li> The segment manager is able to handle segments. This is necessary
for Coordinators to be able to
+ * assign segments to us.</li>
+ */
+ private boolean shouldAnnounce()
+ {
+ return serverTypeConfig.getServerType().isSegmentServer() ||
segmentManager.canHandleSegments();
+ }
+
+ private static class BackgroundSegmentAnnouncer implements AutoCloseable
+ {
+ private static final EmittingLogger log = new
EmittingLogger(BackgroundSegmentAnnouncer.class);
+
+ private final int announceIntervalMillis;
+ private final DataSegmentAnnouncer segmentAnnouncer;
+ private final ScheduledExecutorService exec;
+ private final LinkedBlockingQueue<DataSegment> queue;
+ private final SettableFuture<Boolean> doneAnnouncing;
+
+ private final Object lock = new Object();
+
+ private volatile boolean finished = false;
+ @Nullable
+ private volatile ScheduledFuture startedAnnouncing = null;
+ @Nullable
+ private volatile ScheduledFuture nextAnnoucement = null;
+
+ BackgroundSegmentAnnouncer(
+ DataSegmentAnnouncer segmentAnnouncer,
+ ScheduledExecutorService exec,
+ int announceIntervalMillis
+ )
+ {
+ this.segmentAnnouncer = segmentAnnouncer;
+ this.exec = exec;
+ this.announceIntervalMillis = announceIntervalMillis;
+ this.queue = new LinkedBlockingQueue<>();
+ this.doneAnnouncing = SettableFuture.create();
+ }
+
+ public void announceSegment(final DataSegment segment) throws
InterruptedException
+ {
+ if (finished) {
+ throw new ISE("Announce segment called after finishAnnouncing");
+ }
+ queue.put(segment);
+ }
+
+ public void startAnnouncing()
+ {
+ if (announceIntervalMillis <= 0) {
+ log.info("Skipping background segment announcing as
announceIntervalMillis is [%d].", announceIntervalMillis);
+ return;
+ }
+
+ log.info("Starting background segment announcing task");
+
+ // schedule background announcing task
+ nextAnnoucement = startedAnnouncing = exec.schedule(
+ new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ synchronized (lock) {
+ try {
+ if (!(finished && queue.isEmpty())) {
+ final List<DataSegment> segments = new ArrayList<>();
+ queue.drainTo(segments);
+ try {
+ segmentAnnouncer.announceSegments(segments);
+ nextAnnoucement = exec.schedule(this,
announceIntervalMillis, TimeUnit.MILLISECONDS);
+ }
+ catch (IOException e) {
+ doneAnnouncing.setException(
+ new SegmentLoadingException(e, "Failed to announce
segments[%s]", segments)
+ );
+ }
+ } else {
+ doneAnnouncing.set(true);
+ }
+ }
+ catch (Exception e) {
+ doneAnnouncing.setException(e);
+ }
+ }
+ }
+ },
+ announceIntervalMillis,
+ TimeUnit.MILLISECONDS
+ );
+ }
+
+ public void finishAnnouncing() throws SegmentLoadingException
+ {
+ synchronized (lock) {
+ finished = true;
+ // announce any remaining segments
+ try {
+ final List<DataSegment> segments = new ArrayList<>();
+ queue.drainTo(segments);
+ segmentAnnouncer.announceSegments(segments);
+ }
+ catch (Exception e) {
+ throw new SegmentLoadingException(e, "Failed to announce
segments[%s]", queue);
+ }
+
+ // get any exception that may have been thrown in background announcing
+ try {
+ // check in case intervalMillis is <= 0
+ if (startedAnnouncing != null) {
+ startedAnnouncing.cancel(false);
+ }
+ // - if the task is waiting on the lock, then the queue will be
empty by the time it runs
+ // - if the task just released it, then the lock ensures any
exception is set in doneAnnouncing
+ if (doneAnnouncing.isDone()) {
+ doneAnnouncing.get();
+ }
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new SegmentLoadingException(e, "Loading Interrupted");
+ }
+ catch (ExecutionException e) {
+ throw new SegmentLoadingException(e.getCause(), "Background
Announcing Task Failed");
+ }
+ }
+ log.info("Completed background segment announcing");
+ }
+
+ @Override
+ public void close()
+ {
+ // stop background scheduling
+ synchronized (lock) {
+ finished = true;
+ if (nextAnnoucement != null) {
+ nextAnnoucement.cancel(false);
+ }
+ }
+ }
+ }
+}
diff --git
a/server/src/main/java/org/apache/druid/server/coordination/SegmentChangeRequestDrop.java
b/server/src/main/java/org/apache/druid/server/coordination/SegmentChangeRequestDrop.java
index c4229a02880..ddee89b9763 100644
---
a/server/src/main/java/org/apache/druid/server/coordination/SegmentChangeRequestDrop.java
+++
b/server/src/main/java/org/apache/druid/server/coordination/SegmentChangeRequestDrop.java
@@ -26,7 +26,6 @@ import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.timeline.DataSegment;
import javax.annotation.Nullable;
-
import java.util.Objects;
/**
diff --git
a/server/src/main/java/org/apache/druid/server/coordination/SegmentChangeRequestLoad.java
b/server/src/main/java/org/apache/druid/server/coordination/SegmentChangeRequestLoad.java
index 130c7b50d80..1bb9997980c 100644
---
a/server/src/main/java/org/apache/druid/server/coordination/SegmentChangeRequestLoad.java
+++
b/server/src/main/java/org/apache/druid/server/coordination/SegmentChangeRequestLoad.java
@@ -26,7 +26,6 @@ import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.timeline.DataSegment;
import javax.annotation.Nullable;
-
import java.util.Objects;
/**
diff --git
a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java
b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java
index 4a9086ab572..12462adab2f 100644
---
a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java
+++
b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java
@@ -20,28 +20,16 @@
package org.apache.druid.server.coordination;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Throwables;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.AbstractFuture;
import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
import com.google.inject.Inject;
-import org.apache.druid.client.BootstrapSegmentsResponse;
-import org.apache.druid.client.coordinator.CoordinatorClient;
-import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.guice.ManageLifecycle;
-import org.apache.druid.guice.ServerTypeConfig;
-import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.java.util.common.Stopwatch;
import org.apache.druid.java.util.common.concurrent.Execs;
-import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
-import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.emitter.EmittingLogger;
-import org.apache.druid.java.util.emitter.service.ServiceEmitter;
-import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.loading.SegmentLoadingException;
import org.apache.druid.server.SegmentManager;
@@ -56,20 +44,13 @@ import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentSkipListSet;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
/**
- *
+ * Responsible for loading and dropping of segments by a process that can
serve segments.
*/
@ManageLifecycle
public class SegmentLoadDropHandler implements DataSegmentChangeHandler
@@ -79,20 +60,12 @@ public class SegmentLoadDropHandler implements
DataSegmentChangeHandler
// Synchronizes removals from segmentsToDelete
private final Object segmentDeleteLock = new Object();
- // Synchronizes start/stop of this object.
- private final Object startStopLock = new Object();
-
private final SegmentLoaderConfig config;
private final DataSegmentAnnouncer announcer;
- private final DataSegmentServerAnnouncer serverAnnouncer;
private final SegmentManager segmentManager;
private final ScheduledExecutorService exec;
- private final ServerTypeConfig serverTypeConfig;
- private final CoordinatorClient coordinatorClient;
- private final ServiceEmitter emitter;
- private final ConcurrentSkipListSet<DataSegment> segmentsToDelete;
- private volatile boolean started = false;
+ private final ConcurrentSkipListSet<DataSegment> segmentsToDelete;
// Keep history of load/drop request status in a LRU cache to maintain
idempotency if same request shows up
// again and to return status of a completed request. Maximum size of this
cache must be significantly greater
@@ -108,25 +81,17 @@ public class SegmentLoadDropHandler implements
DataSegmentChangeHandler
public SegmentLoadDropHandler(
SegmentLoaderConfig config,
DataSegmentAnnouncer announcer,
- DataSegmentServerAnnouncer serverAnnouncer,
- SegmentManager segmentManager,
- ServerTypeConfig serverTypeConfig,
- CoordinatorClient coordinatorClient,
- ServiceEmitter emitter
+ SegmentManager segmentManager
)
{
this(
config,
announcer,
- serverAnnouncer,
segmentManager,
Executors.newScheduledThreadPool(
config.getNumLoadingThreads(),
Execs.makeThreadFactory("SimpleDataSegmentChangeHandler-%s")
- ),
- serverTypeConfig,
- coordinatorClient,
- emitter
+ )
);
}
@@ -134,83 +99,19 @@ public class SegmentLoadDropHandler implements
DataSegmentChangeHandler
SegmentLoadDropHandler(
SegmentLoaderConfig config,
DataSegmentAnnouncer announcer,
- DataSegmentServerAnnouncer serverAnnouncer,
SegmentManager segmentManager,
- ScheduledExecutorService exec,
- ServerTypeConfig serverTypeConfig,
- CoordinatorClient coordinatorClient,
- ServiceEmitter emitter
+ ScheduledExecutorService exec
)
{
this.config = config;
this.announcer = announcer;
- this.serverAnnouncer = serverAnnouncer;
this.segmentManager = segmentManager;
this.exec = exec;
- this.serverTypeConfig = serverTypeConfig;
- this.coordinatorClient = coordinatorClient;
- this.emitter = emitter;
this.segmentsToDelete = new ConcurrentSkipListSet<>();
requestStatuses =
CacheBuilder.newBuilder().maximumSize(config.getStatusQueueMaxSize()).initialCapacity(8).build();
}
- @LifecycleStart
- public void start() throws IOException
- {
- synchronized (startStopLock) {
- if (started) {
- return;
- }
-
- log.info("Starting...");
- try {
- if (segmentManager.canHandleSegments()) {
- loadSegmentsOnStartup();
- }
-
- if (shouldAnnounce()) {
- serverAnnouncer.announce();
- }
- }
- catch (Exception e) {
- Throwables.propagateIfPossible(e, IOException.class);
- throw new RuntimeException(e);
- }
- started = true;
- log.info("Started.");
- }
- }
-
- @LifecycleStop
- public void stop()
- {
- synchronized (startStopLock) {
- if (!started) {
- return;
- }
-
- log.info("Stopping...");
- try {
- if (shouldAnnounce()) {
- serverAnnouncer.unannounce();
- }
- }
- catch (Exception e) {
- throw new RuntimeException(e);
- }
- finally {
- started = false;
- }
- log.info("Stopped.");
- }
- }
-
- public boolean isStarted()
- {
- return started;
- }
-
public Map<String, Long> getAverageNumOfRowsPerSegmentForDatasource()
{
return segmentManager.getAverageRowCountForDatasource();
@@ -221,132 +122,6 @@ public class SegmentLoadDropHandler implements
DataSegmentChangeHandler
return segmentManager.getRowCountDistribution();
}
- /**
- * Bulk loading of the following segments into the page cache at startup:
- * <li> Previously cached segments </li>
- * <li> Bootstrap segments from the coordinator </li>
- */
- private void loadSegmentsOnStartup() throws IOException
- {
- final List<DataSegment> segmentsOnStartup = new ArrayList<>();
- segmentsOnStartup.addAll(segmentManager.getCachedSegments());
- segmentsOnStartup.addAll(getBootstrapSegments());
-
- final Stopwatch stopwatch = Stopwatch.createStarted();
-
- // Start a temporary thread pool to load segments into page cache during
bootstrap
- final ExecutorService loadingExecutor = Execs.multiThreaded(
- config.getNumBootstrapThreads(), "Segment-Load-Startup-%s"
- );
-
- try (final BackgroundSegmentAnnouncer backgroundSegmentAnnouncer =
- new BackgroundSegmentAnnouncer(announcer, exec,
config.getAnnounceIntervalMillis())) {
-
- backgroundSegmentAnnouncer.startAnnouncing();
-
- final int numSegments = segmentsOnStartup.size();
- final CountDownLatch latch = new CountDownLatch(numSegments);
- final AtomicInteger counter = new AtomicInteger(0);
- final CopyOnWriteArrayList<DataSegment> failedSegments = new
CopyOnWriteArrayList<>();
- for (final DataSegment segment : segmentsOnStartup) {
- loadingExecutor.submit(
- () -> {
- try {
- log.info(
- "Loading segment[%d/%d][%s]",
- counter.incrementAndGet(), numSegments, segment.getId()
- );
- try {
- segmentManager.loadSegmentOnBootstrap(
- segment,
- () -> this.removeSegment(segment,
DataSegmentChangeCallback.NOOP, false)
- );
- }
- catch (Exception e) {
- removeSegment(segment, DataSegmentChangeCallback.NOOP,
false);
- throw new SegmentLoadingException(e, "Exception loading
segment[%s]", segment.getId());
- }
- try {
- backgroundSegmentAnnouncer.announceSegment(segment);
- }
- catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new SegmentLoadingException(e, "Loading Interrupted");
- }
- }
- catch (SegmentLoadingException e) {
- log.error(e, "[%s] failed to load", segment.getId());
- failedSegments.add(segment);
- }
- finally {
- latch.countDown();
- }
- }
- );
- }
-
- try {
- latch.await();
-
- if (failedSegments.size() > 0) {
- log.makeAlert("[%,d] errors seen while loading segments on startup",
failedSegments.size())
- .addData("failedSegments", failedSegments)
- .emit();
- }
- }
- catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- log.makeAlert(e, "LoadingInterrupted").emit();
- }
-
- backgroundSegmentAnnouncer.finishAnnouncing();
- }
- catch (SegmentLoadingException e) {
- log.makeAlert(e, "Failed to load segments on startup -- likely problem
with announcing.")
- .addData("numSegments", segmentsOnStartup.size())
- .emit();
- }
- finally {
- loadingExecutor.shutdownNow();
- stopwatch.stop();
- // At this stage, all tasks have been submitted, send a shutdown command
to cleanup any resources alloted
- // for the bootstrapping function.
- segmentManager.shutdownBootstrap();
- log.info("Loaded [%d] segments on startup in [%,d]ms.",
segmentsOnStartup.size(), stopwatch.millisElapsed());
- }
- }
-
- /**
- * @return a list of bootstrap segments. When bootstrap segments cannot be
found, an empty list is returned.
- */
- private List<DataSegment> getBootstrapSegments()
- {
- log.info("Fetching bootstrap segments from the coordinator.");
- final Stopwatch stopwatch = Stopwatch.createStarted();
-
- List<DataSegment> bootstrapSegments = new ArrayList<>();
-
- try {
- final BootstrapSegmentsResponse response =
- FutureUtils.getUnchecked(coordinatorClient.fetchBootstrapSegments(),
true);
- bootstrapSegments = ImmutableList.copyOf(response.getIterator());
- }
- catch (Exception e) {
- // By default, we "fail open" when there is any error -- finding the
coordinator, or if the API endpoint cannot
- // be found during rolling upgrades, or even if it's irrecoverable.
- log.warn("Error fetching bootstrap segments from the coordinator: [%s].
", e.getMessage());
- }
- finally {
- stopwatch.stop();
- final long fetchRunMillis = stopwatch.millisElapsed();
- emitter.emit(new
ServiceMetricEvent.Builder().setMetric("segment/bootstrap/time",
fetchRunMillis));
- emitter.emit(new
ServiceMetricEvent.Builder().setMetric("segment/bootstrap/count",
bootstrapSegments.size()));
- log.info("Fetched [%d] bootstrap segments in [%d]ms.",
bootstrapSegments.size(), fetchRunMillis);
- }
-
- return bootstrapSegments;
- }
-
@Override
public void addSegment(DataSegment segment, @Nullable
DataSegmentChangeCallback callback)
{
@@ -566,154 +341,6 @@ public class SegmentLoadDropHandler implements
DataSegmentChangeHandler
}
}
- /**
- * Returns whether or not we should announce ourselves as a data server
using {@link DataSegmentServerAnnouncer}.
- *
- * Returns true if _either_:
- *
- * <li> Our {@link #serverTypeConfig} indicates we are a segment server.
This is necessary for Brokers to be able
- * to detect that we exist.</li>
- * <li> The segment manager is able to handle segments. This is necessary
for Coordinators to be able to
- * assign segments to us.</li>
- */
- private boolean shouldAnnounce()
- {
- return serverTypeConfig.getServerType().isSegmentServer() ||
segmentManager.canHandleSegments();
- }
-
- private static class BackgroundSegmentAnnouncer implements AutoCloseable
- {
- private static final EmittingLogger log = new
EmittingLogger(BackgroundSegmentAnnouncer.class);
-
- private final int intervalMillis;
- private final DataSegmentAnnouncer announcer;
- private final ScheduledExecutorService exec;
- private final LinkedBlockingQueue<DataSegment> queue;
- private final SettableFuture<Boolean> doneAnnouncing;
-
- private final Object lock = new Object();
-
- private volatile boolean finished = false;
- @Nullable
- private volatile ScheduledFuture startedAnnouncing = null;
- @Nullable
- private volatile ScheduledFuture nextAnnoucement = null;
-
- public BackgroundSegmentAnnouncer(
- DataSegmentAnnouncer announcer,
- ScheduledExecutorService exec,
- int intervalMillis
- )
- {
- this.announcer = announcer;
- this.exec = exec;
- this.intervalMillis = intervalMillis;
- this.queue = new LinkedBlockingQueue<>();
- this.doneAnnouncing = SettableFuture.create();
- }
-
- public void announceSegment(final DataSegment segment) throws
InterruptedException
- {
- if (finished) {
- throw new ISE("Announce segment called after finishAnnouncing");
- }
- queue.put(segment);
- }
-
- public void startAnnouncing()
- {
- if (intervalMillis <= 0) {
- return;
- }
-
- log.info("Starting background segment announcing task");
-
- // schedule background announcing task
- nextAnnoucement = startedAnnouncing = exec.schedule(
- new Runnable()
- {
- @Override
- public void run()
- {
- synchronized (lock) {
- try {
- if (!(finished && queue.isEmpty())) {
- final List<DataSegment> segments = new ArrayList<>();
- queue.drainTo(segments);
- try {
- announcer.announceSegments(segments);
- nextAnnoucement = exec.schedule(this, intervalMillis,
TimeUnit.MILLISECONDS);
- }
- catch (IOException e) {
- doneAnnouncing.setException(
- new SegmentLoadingException(e, "Failed to announce
segments[%s]", segments)
- );
- }
- } else {
- doneAnnouncing.set(true);
- }
- }
- catch (Exception e) {
- doneAnnouncing.setException(e);
- }
- }
- }
- },
- intervalMillis,
- TimeUnit.MILLISECONDS
- );
- }
-
- public void finishAnnouncing() throws SegmentLoadingException
- {
- synchronized (lock) {
- finished = true;
- // announce any remaining segments
- try {
- final List<DataSegment> segments = new ArrayList<>();
- queue.drainTo(segments);
- announcer.announceSegments(segments);
- }
- catch (Exception e) {
- throw new SegmentLoadingException(e, "Failed to announce
segments[%s]", queue);
- }
-
- // get any exception that may have been thrown in background announcing
- try {
- // check in case intervalMillis is <= 0
- if (startedAnnouncing != null) {
- startedAnnouncing.cancel(false);
- }
- // - if the task is waiting on the lock, then the queue will be
empty by the time it runs
- // - if the task just released it, then the lock ensures any
exception is set in doneAnnouncing
- if (doneAnnouncing.isDone()) {
- doneAnnouncing.get();
- }
- }
- catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new SegmentLoadingException(e, "Loading Interrupted");
- }
- catch (ExecutionException e) {
- throw new SegmentLoadingException(e.getCause(), "Background
Announcing Task Failed");
- }
- }
- log.info("Completed background segment announcing");
- }
-
- @Override
- public void close()
- {
- // stop background scheduling
- synchronized (lock) {
- finished = true;
- if (nextAnnoucement != null) {
- nextAnnoucement.cancel(false);
- }
- }
- }
- }
-
// Future with cancel() implementation to remove it from "waitingFutures"
list
private class CustomSettableFuture extends
AbstractFuture<List<DataSegmentChangeResponse>>
{
@@ -759,6 +386,5 @@ public class SegmentLoadDropHandler implements
DataSegmentChangeHandler
return true;
}
}
-
}
diff --git
a/server/src/main/java/org/apache/druid/server/http/HistoricalResource.java
b/server/src/main/java/org/apache/druid/server/http/HistoricalResource.java
index 4bc48f444df..223e1a2dea2 100644
--- a/server/src/main/java/org/apache/druid/server/http/HistoricalResource.java
+++ b/server/src/main/java/org/apache/druid/server/http/HistoricalResource.java
@@ -21,7 +21,7 @@ package org.apache.druid.server.http;
import com.google.common.collect.ImmutableMap;
import com.sun.jersey.spi.container.ResourceFilters;
-import org.apache.druid.server.coordination.SegmentLoadDropHandler;
+import org.apache.druid.server.coordination.SegmentBootstrapper;
import org.apache.druid.server.http.security.StateResourceFilter;
import javax.inject.Inject;
@@ -34,14 +34,14 @@ import javax.ws.rs.core.Response;
@Path("/druid/historical/v1")
public class HistoricalResource
{
- private final SegmentLoadDropHandler segmentLoadDropHandler;
+ private final SegmentBootstrapper segmentBootstrapper;
@Inject
public HistoricalResource(
- SegmentLoadDropHandler segmentLoadDropHandler
+ SegmentBootstrapper segmentBootstrapper
)
{
- this.segmentLoadDropHandler = segmentLoadDropHandler;
+ this.segmentBootstrapper = segmentBootstrapper;
}
@GET
@@ -50,14 +50,14 @@ public class HistoricalResource
@Produces(MediaType.APPLICATION_JSON)
public Response getLoadStatus()
{
- return Response.ok(ImmutableMap.of("cacheInitialized",
segmentLoadDropHandler.isStarted())).build();
+ return Response.ok(ImmutableMap.of("cacheInitialized",
segmentBootstrapper.isBootstrappingComplete())).build();
}
@GET
@Path("/readiness")
public Response getReadiness()
{
- if (segmentLoadDropHandler.isStarted()) {
+ if (segmentBootstrapper.isBootstrappingComplete()) {
return Response.ok().build();
} else {
return Response.status(Response.Status.SERVICE_UNAVAILABLE).build();
diff --git
a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerCacheTest.java
b/server/src/test/java/org/apache/druid/server/coordination/SegmentBootstrapperCacheTest.java
similarity index 86%
rename from
server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerCacheTest.java
rename to
server/src/test/java/org/apache/druid/server/coordination/SegmentBootstrapperCacheTest.java
index f6b1c39c59d..7629a6b875c 100644
---
a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerCacheTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordination/SegmentBootstrapperCacheTest.java
@@ -50,10 +50,10 @@ import java.util.Collections;
import java.util.List;
/**
- * Similar to {@link SegmentLoadDropHandlerTest}. This class includes tests
that cover the
+ * Similar to {@link SegmentBootstrapperTest}. This class includes tests that
cover the
* storage location layer as well.
*/
-public class SegmentLoadDropHandlerCacheTest
+public class SegmentBootstrapperCacheTest
{
private static final long MAX_SIZE = 1000L;
private static final long SEGMENT_SIZE = 100L;
@@ -101,8 +101,8 @@ public class SegmentLoadDropHandlerCacheTest
objectMapper
);
segmentManager = new SegmentManager(cacheManager);
- segmentAnnouncer = new TestDataSegmentAnnouncer();
serverAnnouncer = new TestDataServerAnnouncer();
+ segmentAnnouncer = new TestDataSegmentAnnouncer();
coordinatorClient = new TestCoordinatorClient();
emitter = new StubServiceEmitter();
EmittingLogger.registerEmitter(emitter);
@@ -112,10 +112,11 @@ public class SegmentLoadDropHandlerCacheTest
public void testLoadStartStopWithEmptyLocations() throws IOException
{
final List<StorageLocation> emptyLocations = ImmutableList.of();
+ final SegmentLoaderConfig loaderConfig = new SegmentLoaderConfig();
segmentManager = new SegmentManager(
new SegmentLocalCacheManager(
emptyLocations,
- new SegmentLoaderConfig(),
+ loaderConfig,
new LeastBytesUsedStorageLocationSelectorStrategy(emptyLocations),
TestIndex.INDEX_IO,
objectMapper
@@ -123,19 +124,26 @@ public class SegmentLoadDropHandlerCacheTest
);
final SegmentLoadDropHandler loadDropHandler = new SegmentLoadDropHandler(
- new SegmentLoaderConfig(),
+ loaderConfig,
+ segmentAnnouncer,
+ segmentManager
+ );
+
+ final SegmentBootstrapper bootstrapper = new SegmentBootstrapper(
+ loadDropHandler,
+ loaderConfig,
segmentAnnouncer,
serverAnnouncer,
segmentManager,
- new ServerTypeConfig(ServerType.BROKER),
+ new ServerTypeConfig(ServerType.HISTORICAL),
coordinatorClient,
emitter
);
- loadDropHandler.start();
- Assert.assertEquals(0, serverAnnouncer.getObservedCount());
+ bootstrapper.start();
+ Assert.assertEquals(1, serverAnnouncer.getObservedCount());
- loadDropHandler.stop();
+ bootstrapper.stop();
Assert.assertEquals(0, serverAnnouncer.getObservedCount());
}
@@ -143,19 +151,26 @@ public class SegmentLoadDropHandlerCacheTest
public void testLoadStartStop() throws IOException
{
final SegmentLoadDropHandler loadDropHandler = new SegmentLoadDropHandler(
+ loaderConfig,
+ segmentAnnouncer,
+ segmentManager
+ );
+
+ final SegmentBootstrapper bootstrapper = new SegmentBootstrapper(
+ loadDropHandler,
loaderConfig,
segmentAnnouncer,
serverAnnouncer,
segmentManager,
- new ServerTypeConfig(ServerType.BROKER),
+ new ServerTypeConfig(ServerType.HISTORICAL),
coordinatorClient,
emitter
);
- loadDropHandler.start();
+ bootstrapper.start();
Assert.assertEquals(1, serverAnnouncer.getObservedCount());
- loadDropHandler.stop();
+ bootstrapper.stop();
Assert.assertEquals(0, serverAnnouncer.getObservedCount());
}
@@ -176,6 +191,13 @@ public class SegmentLoadDropHandlerCacheTest
}
final SegmentLoadDropHandler loadDropHandler = new SegmentLoadDropHandler(
+ loaderConfig,
+ segmentAnnouncer,
+ segmentManager
+ );
+
+ final SegmentBootstrapper bootstrapper = new SegmentBootstrapper(
+ loadDropHandler,
loaderConfig,
segmentAnnouncer,
serverAnnouncer,
@@ -185,8 +207,7 @@ public class SegmentLoadDropHandlerCacheTest
emitter
);
- // Start the load drop handler
- loadDropHandler.start();
+ bootstrapper.start();
Assert.assertEquals(1, serverAnnouncer.getObservedCount());
// Verify the expected announcements
@@ -202,7 +223,7 @@ public class SegmentLoadDropHandlerCacheTest
loadDropHandler.addSegment(newSegment, null);
Assert.assertTrue(segmentAnnouncer.getObservedSegments().contains(newSegment));
- loadDropHandler.stop();
+ bootstrapper.stop();
Assert.assertEquals(0, serverAnnouncer.getObservedCount());
}
}
diff --git
a/server/src/test/java/org/apache/druid/server/coordination/SegmentBootstrapperTest.java
b/server/src/test/java/org/apache/druid/server/coordination/SegmentBootstrapperTest.java
new file mode 100644
index 00000000000..c41763f1824
--- /dev/null
+++
b/server/src/test/java/org/apache/druid/server/coordination/SegmentBootstrapperTest.java
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordination;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.guice.ServerTypeConfig;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.metrics.StubServiceEmitter;
+import org.apache.druid.segment.loading.SegmentLoaderConfig;
+import org.apache.druid.segment.loading.StorageLocationConfig;
+import org.apache.druid.server.SegmentManager;
+import org.apache.druid.timeline.DataSegment;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.druid.server.TestSegmentUtils.makeSegment;
+
+public class SegmentBootstrapperTest
+{
+ private static final int COUNT = 50;
+
+ private TestDataSegmentAnnouncer segmentAnnouncer;
+ private TestDataServerAnnouncer serverAnnouncer;
+ private SegmentLoaderConfig segmentLoaderConfig;
+ private TestCoordinatorClient coordinatorClient;
+ private StubServiceEmitter serviceEmitter;
+
+ @Rule
+ public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+ @Before
+ public void setUp() throws IOException
+ {
+ final File segmentCacheDir = temporaryFolder.newFolder();
+
+ segmentAnnouncer = new TestDataSegmentAnnouncer();
+ serverAnnouncer = new TestDataServerAnnouncer();
+ segmentLoaderConfig = new SegmentLoaderConfig()
+ {
+ @Override
+ public File getInfoDir()
+ {
+ return segmentCacheDir;
+ }
+
+ @Override
+ public int getNumLoadingThreads()
+ {
+ return 5;
+ }
+
+ @Override
+ public int getAnnounceIntervalMillis()
+ {
+ return 50;
+ }
+
+ @Override
+ public List<StorageLocationConfig> getLocations()
+ {
+ return Collections.singletonList(
+ new StorageLocationConfig(segmentCacheDir, null, null)
+ );
+ }
+ };
+
+ coordinatorClient = new TestCoordinatorClient();
+ serviceEmitter = new StubServiceEmitter();
+ EmittingLogger.registerEmitter(serviceEmitter);
+ }
+
+
+ @Test
+ public void testStartStop() throws Exception
+ {
+ final Set<DataSegment> segments = new HashSet<>();
+ for (int i = 0; i < COUNT; ++i) {
+ segments.add(makeSegment("test" + i, "1",
Intervals.of("P1d/2011-04-01")));
+ segments.add(makeSegment("test" + i, "1",
Intervals.of("P1d/2011-04-02")));
+ segments.add(makeSegment("test" + i, "2",
Intervals.of("P1d/2011-04-02")));
+ segments.add(makeSegment("test_two" + i, "1",
Intervals.of("P1d/2011-04-01")));
+ segments.add(makeSegment("test_two" + i, "1",
Intervals.of("P1d/2011-04-02")));
+ }
+
+ final TestSegmentCacheManager cacheManager = new
TestSegmentCacheManager(segments);
+ final SegmentManager segmentManager = new SegmentManager(cacheManager);
+ final SegmentLoadDropHandler handler = new SegmentLoadDropHandler(
+ segmentLoaderConfig,
+ segmentAnnouncer,
+ segmentManager
+ );
+ final SegmentBootstrapper bootstrapper = new SegmentBootstrapper(
+ handler,
+ segmentLoaderConfig,
+ segmentAnnouncer,
+ serverAnnouncer,
+ segmentManager,
+ new ServerTypeConfig(ServerType.HISTORICAL),
+ coordinatorClient,
+ serviceEmitter
+ );
+
+ Assert.assertTrue(segmentManager.getDataSourceCounts().isEmpty());
+ bootstrapper.start();
+
+ Assert.assertEquals(1, serverAnnouncer.getObservedCount());
+ Assert.assertFalse(segmentManager.getDataSourceCounts().isEmpty());
+
+ for (int i = 0; i < COUNT; ++i) {
+ Assert.assertEquals(3L, segmentManager.getDataSourceCounts().get("test"
+ i).longValue());
+ Assert.assertEquals(2L,
segmentManager.getDataSourceCounts().get("test_two" + i).longValue());
+ }
+
+ Assert.assertEquals(ImmutableList.copyOf(segments),
segmentAnnouncer.getObservedSegments());
+
+ final ImmutableList<DataSegment> expectedBootstrapSegments =
ImmutableList.copyOf(segments);
+ Assert.assertEquals(expectedBootstrapSegments,
cacheManager.getObservedBootstrapSegments());
+ Assert.assertEquals(expectedBootstrapSegments,
cacheManager.getObservedBootstrapSegmentsLoadedIntoPageCache());
+ Assert.assertEquals(ImmutableList.of(),
cacheManager.getObservedSegments());
+ Assert.assertEquals(ImmutableList.of(),
cacheManager.getObservedSegmentsLoadedIntoPageCache());
+
+ bootstrapper.stop();
+
+ Assert.assertEquals(0, serverAnnouncer.getObservedCount());
+ Assert.assertEquals(1,
cacheManager.getObservedShutdownBootstrapCount().get());
+ }
+
+ @Test
+ public void testLoadCachedSegments() throws Exception
+ {
+ final Set<DataSegment> segments = new HashSet<>();
+ for (int i = 0; i < COUNT; ++i) {
+ segments.add(makeSegment("test" + i, "1",
Intervals.of("P1d/2011-04-01")));
+ segments.add(makeSegment("test" + i, "1",
Intervals.of("P1d/2011-04-02")));
+ segments.add(makeSegment("test" + i, "2",
Intervals.of("P1d/2011-04-02")));
+ segments.add(makeSegment("test" + i, "1",
Intervals.of("P1d/2011-04-03")));
+ segments.add(makeSegment("test" + i, "1",
Intervals.of("P1d/2011-04-04")));
+ segments.add(makeSegment("test" + i, "1",
Intervals.of("P1d/2011-04-05")));
+ segments.add(makeSegment("test" + i, "2",
Intervals.of("PT1h/2011-04-04T01")));
+ segments.add(makeSegment("test" + i, "2",
Intervals.of("PT1h/2011-04-04T02")));
+ segments.add(makeSegment("test" + i, "2",
Intervals.of("PT1h/2011-04-04T03")));
+ segments.add(makeSegment("test" + i, "2",
Intervals.of("PT1h/2011-04-04T05")));
+ segments.add(makeSegment("test" + i, "2",
Intervals.of("PT1h/2011-04-04T06")));
+ segments.add(makeSegment("test_two" + i, "1",
Intervals.of("P1d/2011-04-01")));
+ segments.add(makeSegment("test_two" + i, "1",
Intervals.of("P1d/2011-04-02")));
+ }
+
+ final TestSegmentCacheManager cacheManager = new
TestSegmentCacheManager(segments);
+ final SegmentManager segmentManager = new SegmentManager(cacheManager);
+ final SegmentLoadDropHandler handler = new
SegmentLoadDropHandler(segmentLoaderConfig, segmentAnnouncer, segmentManager);
+ final SegmentBootstrapper bootstrapper = new SegmentBootstrapper(
+ handler,
+ segmentLoaderConfig,
+ segmentAnnouncer,
+ serverAnnouncer,
+ segmentManager,
+ new ServerTypeConfig(ServerType.HISTORICAL),
+ coordinatorClient,
+ serviceEmitter
+ );
+
+ Assert.assertTrue(segmentManager.getDataSourceCounts().isEmpty());
+
+ bootstrapper.start();
+
+ Assert.assertEquals(1, serverAnnouncer.getObservedCount());
+ Assert.assertFalse(segmentManager.getDataSourceCounts().isEmpty());
+
+ for (int i = 0; i < COUNT; ++i) {
+ Assert.assertEquals(11L, segmentManager.getDataSourceCounts().get("test"
+ i).longValue());
+ Assert.assertEquals(2L,
segmentManager.getDataSourceCounts().get("test_two" + i).longValue());
+ }
+
+ Assert.assertEquals(ImmutableList.copyOf(segments),
segmentAnnouncer.getObservedSegments());
+
+ final ImmutableList<DataSegment> expectedBootstrapSegments =
ImmutableList.copyOf(segments);
+ Assert.assertEquals(expectedBootstrapSegments,
cacheManager.getObservedBootstrapSegments());
+ Assert.assertEquals(expectedBootstrapSegments,
cacheManager.getObservedBootstrapSegmentsLoadedIntoPageCache());
+ Assert.assertEquals(ImmutableList.of(),
cacheManager.getObservedSegments());
+ Assert.assertEquals(ImmutableList.of(),
cacheManager.getObservedSegmentsLoadedIntoPageCache());
+
+ bootstrapper.stop();
+
+ Assert.assertEquals(0, serverAnnouncer.getObservedCount());
+ Assert.assertEquals(1,
cacheManager.getObservedShutdownBootstrapCount().get());
+ }
+
+ @Test
+ public void testLoadBootstrapSegments() throws Exception
+ {
+ final Set<DataSegment> segments = new HashSet<>();
+ for (int i = 0; i < COUNT; ++i) {
+ segments.add(makeSegment("test" + i, "1",
Intervals.of("P1d/2011-04-01")));
+ segments.add(makeSegment("test" + i, "1",
Intervals.of("P1d/2011-04-02")));
+ segments.add(makeSegment("test_two" + i, "1",
Intervals.of("P1d/2011-04-01")));
+ segments.add(makeSegment("test_two" + i, "1",
Intervals.of("P1d/2011-04-02")));
+ }
+
+ final TestCoordinatorClient coordinatorClient = new
TestCoordinatorClient(segments);
+ final TestSegmentCacheManager cacheManager = new TestSegmentCacheManager();
+ final SegmentManager segmentManager = new SegmentManager(cacheManager);
+ final SegmentLoadDropHandler handler = new SegmentLoadDropHandler(
+ segmentLoaderConfig,
+ segmentAnnouncer,
+ segmentManager
+ );
+ final SegmentBootstrapper bootstrapper = new SegmentBootstrapper(
+ handler,
+ segmentLoaderConfig,
+ segmentAnnouncer,
+ serverAnnouncer,
+ segmentManager,
+ new ServerTypeConfig(ServerType.HISTORICAL),
+ coordinatorClient,
+ serviceEmitter
+ );
+
+ Assert.assertTrue(segmentManager.getDataSourceCounts().isEmpty());
+
+ bootstrapper.start();
+
+ Assert.assertEquals(1, serverAnnouncer.getObservedCount());
+ Assert.assertFalse(segmentManager.getDataSourceCounts().isEmpty());
+
+ for (int i = 0; i < COUNT; ++i) {
+ Assert.assertEquals(2L, segmentManager.getDataSourceCounts().get("test"
+ i).longValue());
+ Assert.assertEquals(2L,
segmentManager.getDataSourceCounts().get("test_two" + i).longValue());
+ }
+
+ final ImmutableList<DataSegment> expectedBootstrapSegments =
ImmutableList.copyOf(segments);
+
+ Assert.assertEquals(expectedBootstrapSegments,
segmentAnnouncer.getObservedSegments());
+
+ Assert.assertEquals(expectedBootstrapSegments,
cacheManager.getObservedBootstrapSegments());
+ Assert.assertEquals(expectedBootstrapSegments,
cacheManager.getObservedBootstrapSegmentsLoadedIntoPageCache());
+ serviceEmitter.verifyValue("segment/bootstrap/count",
expectedBootstrapSegments.size());
+ serviceEmitter.verifyEmitted("segment/bootstrap/time", 1);
+
+ bootstrapper.stop();
+ }
+
+ @Test
+ public void testLoadBootstrapSegmentsWhenExceptionThrown() throws Exception
+ {
+ final TestSegmentCacheManager cacheManager = new TestSegmentCacheManager();
+ final SegmentManager segmentManager = new SegmentManager(cacheManager);
+ final SegmentLoadDropHandler handler = new SegmentLoadDropHandler(
+ segmentLoaderConfig,
+ segmentAnnouncer,
+ segmentManager
+ );
+ final SegmentBootstrapper bootstrapper = new SegmentBootstrapper(
+ handler,
+ segmentLoaderConfig,
+ segmentAnnouncer,
+ serverAnnouncer,
+ segmentManager,
+ new ServerTypeConfig(ServerType.HISTORICAL),
+ coordinatorClient,
+ serviceEmitter
+ );
+
+ Assert.assertTrue(segmentManager.getDataSourceCounts().isEmpty());
+
+ bootstrapper.start();
+
+ Assert.assertEquals(1, serverAnnouncer.getObservedCount());
+ Assert.assertTrue(segmentManager.getDataSourceCounts().isEmpty());
+
+ Assert.assertEquals(ImmutableList.of(),
segmentAnnouncer.getObservedSegments());
+ Assert.assertEquals(ImmutableList.of(),
cacheManager.getObservedBootstrapSegments());
+ Assert.assertEquals(ImmutableList.of(),
cacheManager.getObservedBootstrapSegmentsLoadedIntoPageCache());
+ serviceEmitter.verifyValue("segment/bootstrap/count", 0);
+ serviceEmitter.verifyEmitted("segment/bootstrap/time", 1);
+
+ bootstrapper.stop();
+ }
+}
diff --git
a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java
b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java
index 9fe04d60d5b..cd2fe2dbd63 100644
---
a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java
@@ -20,33 +20,21 @@
package org.apache.druid.server.coordination;
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.ListenableFuture;
-import org.apache.druid.client.coordinator.CoordinatorClient;
-import org.apache.druid.client.coordinator.NoopCoordinatorClient;
-import org.apache.druid.guice.ServerTypeConfig;
import org.apache.druid.java.util.common.Intervals;
-import org.apache.druid.java.util.common.MapUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.metrics.StubServiceEmitter;
-import org.apache.druid.segment.ReferenceCountingSegment;
-import org.apache.druid.segment.SegmentLazyLoadFailCallback;
-import org.apache.druid.segment.loading.NoopSegmentCacheManager;
import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.loading.StorageLocationConfig;
-import org.apache.druid.segment.loading.TombstoneSegmentizerFactory;
import org.apache.druid.server.SegmentManager;
-import org.apache.druid.server.TestSegmentUtils;
import org.apache.druid.server.coordination.SegmentChangeStatus.State;
import org.apache.druid.timeline.DataSegment;
-import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
-import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
@@ -56,29 +44,20 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.druid.server.TestSegmentUtils.makeSegment;
public class SegmentLoadDropHandlerTest
{
- private static final int COUNT = 50;
-
private TestDataSegmentAnnouncer segmentAnnouncer;
- private TestDataServerAnnouncer serverAnnouncer;
private List<Runnable> scheduledRunnable;
private SegmentLoaderConfig segmentLoaderConfig;
private ScheduledExecutorFactory scheduledExecutorFactory;
- private TestCoordinatorClient coordinatorClient;
- private StubServiceEmitter serviceEmitter;
-
- @Rule
- public ExpectedException expectedException = ExpectedException.none();
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
@@ -90,7 +69,6 @@ public class SegmentLoadDropHandlerTest
scheduledRunnable = new ArrayList<>();
segmentAnnouncer = new TestDataSegmentAnnouncer();
- serverAnnouncer = new TestDataServerAnnouncer();
segmentLoaderConfig = new SegmentLoaderConfig()
{
@Override
@@ -140,9 +118,7 @@ public class SegmentLoadDropHandlerTest
};
};
- coordinatorClient = new TestCoordinatorClient();
- serviceEmitter = new StubServiceEmitter();
- EmittingLogger.registerEmitter(serviceEmitter);
+ EmittingLogger.registerEmitter(new StubServiceEmitter());
}
/**
@@ -154,16 +130,12 @@ public class SegmentLoadDropHandlerTest
* </ul>
*/
@Test
- public void testSegmentLoading1() throws Exception
+ public void testSegmentLoading1()
{
final TestSegmentCacheManager cacheManager = new TestSegmentCacheManager();
final SegmentManager segmentManager = new SegmentManager(cacheManager);
final SegmentLoadDropHandler handler =
initSegmentLoadDropHandler(segmentManager);
- handler.start();
-
- Assert.assertEquals(1, serverAnnouncer.getObservedCount());
-
final DataSegment segment = makeSegment("test", "1",
Intervals.of("P1d/2011-04-01"));
handler.removeSegment(segment, DataSegmentChangeCallback.NOOP);
@@ -178,19 +150,16 @@ public class SegmentLoadDropHandlerTest
for (Runnable runnable : scheduledRunnable) {
runnable.run();
}
- Assert.assertEquals(ImmutableList.of(segment),
cacheManager.observedSegments);
- Assert.assertEquals(ImmutableList.of(segment),
cacheManager.observedSegmentsLoadedIntoPageCache);
- Assert.assertEquals(ImmutableList.of(),
cacheManager.observedBootstrapSegments);
- Assert.assertEquals(ImmutableList.of(),
cacheManager.observedBootstrapSegmentsLoadedIntoPageCache);
+ Assert.assertEquals(ImmutableList.of(segment),
cacheManager.getObservedSegments());
+ Assert.assertEquals(ImmutableList.of(segment),
cacheManager.getObservedSegmentsLoadedIntoPageCache());
+ Assert.assertEquals(ImmutableList.of(),
cacheManager.getObservedBootstrapSegments());
+ Assert.assertEquals(ImmutableList.of(),
cacheManager.getObservedBootstrapSegmentsLoadedIntoPageCache());
Assert.assertEquals(ImmutableList.of(segment),
segmentAnnouncer.getObservedSegments());
Assert.assertFalse(
"segment files shouldn't be deleted",
- cacheManager.observedSegmentsRemovedFromCache.contains(segment)
+ cacheManager.getObservedSegmentsRemovedFromCache().contains(segment)
);
-
- handler.stop();
- Assert.assertEquals(0, serverAnnouncer.getObservedCount());
}
/**
@@ -203,15 +172,15 @@ public class SegmentLoadDropHandlerTest
* </ul>
*/
@Test
- public void testSegmentLoading2() throws Exception
+ public void testSegmentLoading2()
{
final TestSegmentCacheManager cacheManager = new TestSegmentCacheManager();
final SegmentManager segmentManager = new SegmentManager(cacheManager);
final SegmentLoadDropHandler handler =
initSegmentLoadDropHandler(segmentManager);
- handler.start();
+ // handler.start();
- Assert.assertEquals(1, serverAnnouncer.getObservedCount());
+ // Assert.assertEquals(1, serverAnnouncer.getObservedCount());
final DataSegment segment = makeSegment("test", "1",
Intervals.of("P1d/2011-04-01"));
@@ -234,176 +203,16 @@ public class SegmentLoadDropHandlerTest
// The same segment reference will be fetched more than once in the above
sequence, but the segment should
// be loaded only once onto the page cache.
- Assert.assertEquals(ImmutableList.of(segment, segment),
cacheManager.observedSegments);
- Assert.assertEquals(ImmutableList.of(segment),
cacheManager.observedSegmentsLoadedIntoPageCache);
- Assert.assertEquals(ImmutableList.of(),
cacheManager.observedBootstrapSegments);
- Assert.assertEquals(ImmutableList.of(),
cacheManager.observedBootstrapSegmentsLoadedIntoPageCache);
+ Assert.assertEquals(ImmutableList.of(segment, segment),
cacheManager.getObservedSegments());
+ Assert.assertEquals(ImmutableList.of(segment),
cacheManager.getObservedSegmentsLoadedIntoPageCache());
+ Assert.assertEquals(ImmutableList.of(),
cacheManager.getObservedBootstrapSegments());
+ Assert.assertEquals(ImmutableList.of(),
cacheManager.getObservedBootstrapSegmentsLoadedIntoPageCache());
Assert.assertTrue(segmentAnnouncer.getObservedSegments().contains(segment));
Assert.assertFalse(
"segment files shouldn't be deleted",
- cacheManager.observedSegmentsRemovedFromCache.contains(segment)
+ cacheManager.getObservedSegmentsRemovedFromCache().contains(segment)
);
-
- handler.stop();
- Assert.assertEquals(0, serverAnnouncer.getObservedCount());
- }
-
- @Test
- public void testLoadCache() throws Exception
- {
- Set<DataSegment> segments = new HashSet<>();
- for (int i = 0; i < COUNT; ++i) {
- segments.add(makeSegment("test" + i, "1",
Intervals.of("P1d/2011-04-01")));
- segments.add(makeSegment("test" + i, "1",
Intervals.of("P1d/2011-04-02")));
- segments.add(makeSegment("test" + i, "2",
Intervals.of("P1d/2011-04-02")));
- segments.add(makeSegment("test" + i, "1",
Intervals.of("P1d/2011-04-03")));
- segments.add(makeSegment("test" + i, "1",
Intervals.of("P1d/2011-04-04")));
- segments.add(makeSegment("test" + i, "1",
Intervals.of("P1d/2011-04-05")));
- segments.add(makeSegment("test" + i, "2",
Intervals.of("PT1h/2011-04-04T01")));
- segments.add(makeSegment("test" + i, "2",
Intervals.of("PT1h/2011-04-04T02")));
- segments.add(makeSegment("test" + i, "2",
Intervals.of("PT1h/2011-04-04T03")));
- segments.add(makeSegment("test" + i, "2",
Intervals.of("PT1h/2011-04-04T05")));
- segments.add(makeSegment("test" + i, "2",
Intervals.of("PT1h/2011-04-04T06")));
- segments.add(makeSegment("test_two" + i, "1",
Intervals.of("P1d/2011-04-01")));
- segments.add(makeSegment("test_two" + i, "1",
Intervals.of("P1d/2011-04-02")));
- }
-
- final TestSegmentCacheManager cacheManager = new
TestSegmentCacheManager(segments);
- final SegmentManager segmentManager = new SegmentManager(cacheManager);
- final SegmentLoadDropHandler handler =
initSegmentLoadDropHandler(segmentManager);
-
- Assert.assertTrue(segmentManager.getDataSourceCounts().isEmpty());
-
- handler.start();
-
- Assert.assertEquals(1, serverAnnouncer.getObservedCount());
- Assert.assertFalse(segmentManager.getDataSourceCounts().isEmpty());
-
- for (int i = 0; i < COUNT; ++i) {
- Assert.assertEquals(11L, segmentManager.getDataSourceCounts().get("test"
+ i).longValue());
- Assert.assertEquals(2L,
segmentManager.getDataSourceCounts().get("test_two" + i).longValue());
- }
-
- Assert.assertEquals(ImmutableList.copyOf(segments),
segmentAnnouncer.getObservedSegments());
-
- final ImmutableList<DataSegment> expectedBootstrapSegments =
ImmutableList.copyOf(segments);
- Assert.assertEquals(expectedBootstrapSegments,
cacheManager.observedBootstrapSegments);
- Assert.assertEquals(expectedBootstrapSegments,
cacheManager.observedBootstrapSegmentsLoadedIntoPageCache);
- Assert.assertEquals(ImmutableList.of(), cacheManager.observedSegments);
- Assert.assertEquals(ImmutableList.of(),
cacheManager.observedSegmentsLoadedIntoPageCache);
-
- handler.stop();
-
- Assert.assertEquals(0, serverAnnouncer.getObservedCount());
- Assert.assertEquals(1, cacheManager.observedShutdownBootstrapCount.get());
- }
-
- @Test
- public void testLoadBootstrapSegments() throws Exception
- {
- final Set<DataSegment> segments = new HashSet<>();
- for (int i = 0; i < COUNT; ++i) {
- segments.add(makeSegment("test" + i, "1",
Intervals.of("P1d/2011-04-01")));
- segments.add(makeSegment("test" + i, "1",
Intervals.of("P1d/2011-04-02")));
- segments.add(makeSegment("test_two" + i, "1",
Intervals.of("P1d/2011-04-01")));
- segments.add(makeSegment("test_two" + i, "1",
Intervals.of("P1d/2011-04-02")));
- }
-
- final TestCoordinatorClient coordinatorClient = new
TestCoordinatorClient(segments);
- final TestSegmentCacheManager cacheManager = new TestSegmentCacheManager();
- final SegmentManager segmentManager = new SegmentManager(cacheManager);
-
- final SegmentLoadDropHandler handler =
initSegmentLoadDropHandler(segmentManager, coordinatorClient);
-
- Assert.assertTrue(segmentManager.getDataSourceCounts().isEmpty());
-
- handler.start();
-
- Assert.assertEquals(1, serverAnnouncer.getObservedCount());
- Assert.assertFalse(segmentManager.getDataSourceCounts().isEmpty());
-
- for (int i = 0; i < COUNT; ++i) {
- Assert.assertEquals(2L, segmentManager.getDataSourceCounts().get("test"
+ i).longValue());
- Assert.assertEquals(2L,
segmentManager.getDataSourceCounts().get("test_two" + i).longValue());
- }
-
- final ImmutableList<DataSegment> expectedBootstrapSegments =
ImmutableList.copyOf(segments);
-
- Assert.assertEquals(expectedBootstrapSegments,
segmentAnnouncer.getObservedSegments());
-
- Assert.assertEquals(expectedBootstrapSegments,
cacheManager.observedBootstrapSegments);
- Assert.assertEquals(expectedBootstrapSegments,
cacheManager.observedBootstrapSegmentsLoadedIntoPageCache);
- serviceEmitter.verifyValue("segment/bootstrap/count",
expectedBootstrapSegments.size());
- serviceEmitter.verifyEmitted("segment/bootstrap/time", 1);
-
- handler.stop();
- }
-
- @Test
- public void testLoadBootstrapSegmentsWhenExceptionThrown() throws Exception
- {
- final TestSegmentCacheManager cacheManager = new TestSegmentCacheManager();
- final SegmentManager segmentManager = new SegmentManager(cacheManager);
-
- final SegmentLoadDropHandler handler =
initSegmentLoadDropHandler(segmentManager, new NoopCoordinatorClient());
-
- Assert.assertTrue(segmentManager.getDataSourceCounts().isEmpty());
-
- handler.start();
-
- Assert.assertEquals(1, serverAnnouncer.getObservedCount());
- Assert.assertTrue(segmentManager.getDataSourceCounts().isEmpty());
-
- Assert.assertEquals(ImmutableList.of(),
segmentAnnouncer.getObservedSegments());
- Assert.assertEquals(ImmutableList.of(),
cacheManager.observedBootstrapSegments);
- Assert.assertEquals(ImmutableList.of(),
cacheManager.observedBootstrapSegmentsLoadedIntoPageCache);
- serviceEmitter.verifyValue("segment/bootstrap/count", 0);
- serviceEmitter.verifyEmitted("segment/bootstrap/time", 1);
-
- handler.stop();
- }
-
- @Test
- public void testStartStop() throws Exception
- {
- final Set<DataSegment> segments = new HashSet<>();
- for (int i = 0; i < COUNT; ++i) {
- segments.add(makeSegment("test" + i, "1",
Intervals.of("P1d/2011-04-01")));
- segments.add(makeSegment("test" + i, "1",
Intervals.of("P1d/2011-04-02")));
- segments.add(makeSegment("test" + i, "2",
Intervals.of("P1d/2011-04-02")));
- segments.add(makeSegment("test_two" + i, "1",
Intervals.of("P1d/2011-04-01")));
- segments.add(makeSegment("test_two" + i, "1",
Intervals.of("P1d/2011-04-02")));
- }
-
- final TestSegmentCacheManager cacheManager = new
TestSegmentCacheManager(segments);
- final SegmentManager segmentManager = new SegmentManager(cacheManager);
- final SegmentLoadDropHandler handler =
initSegmentLoadDropHandler(segmentManager);
-
- Assert.assertTrue(segmentManager.getDataSourceCounts().isEmpty());
-
- handler.start();
-
- Assert.assertEquals(1, serverAnnouncer.getObservedCount());
- Assert.assertFalse(segmentManager.getDataSourceCounts().isEmpty());
-
- for (int i = 0; i < COUNT; ++i) {
- Assert.assertEquals(3L, segmentManager.getDataSourceCounts().get("test"
+ i).longValue());
- Assert.assertEquals(2L,
segmentManager.getDataSourceCounts().get("test_two" + i).longValue());
- }
-
- Assert.assertEquals(ImmutableList.copyOf(segments),
segmentAnnouncer.getObservedSegments());
-
- final ImmutableList<DataSegment> expectedBootstrapSegments =
ImmutableList.copyOf(segments);
- Assert.assertEquals(expectedBootstrapSegments,
cacheManager.observedBootstrapSegments);
- Assert.assertEquals(expectedBootstrapSegments,
cacheManager.observedBootstrapSegmentsLoadedIntoPageCache);
- Assert.assertEquals(ImmutableList.of(), cacheManager.observedSegments);
- Assert.assertEquals(ImmutableList.of(),
cacheManager.observedSegmentsLoadedIntoPageCache);
-
- handler.stop();
-
- Assert.assertEquals(0, serverAnnouncer.getObservedCount());
- Assert.assertEquals(1, cacheManager.observedShutdownBootstrapCount.get());
}
@Test(timeout = 60_000L)
@@ -413,10 +222,6 @@ public class SegmentLoadDropHandlerTest
final SegmentManager segmentManager = new SegmentManager(cacheManager);
final SegmentLoadDropHandler handler =
initSegmentLoadDropHandler(segmentManager);
- handler.start();
-
- Assert.assertEquals(1, serverAnnouncer.getObservedCount());
-
DataSegment segment1 = makeSegment("batchtest1", "1",
Intervals.of("P1d/2011-04-01"));
DataSegment segment2 = makeSegment("batchtest2", "1",
Intervals.of("P1d/2011-04-01"));
@@ -445,13 +250,10 @@ public class SegmentLoadDropHandlerTest
Assert.assertEquals(ImmutableList.of(segment1),
segmentAnnouncer.getObservedSegments());
final ImmutableList<DataSegment> expectedSegments =
ImmutableList.of(segment1);
- Assert.assertEquals(expectedSegments, cacheManager.observedSegments);
- Assert.assertEquals(expectedSegments,
cacheManager.observedSegmentsLoadedIntoPageCache);
- Assert.assertEquals(ImmutableList.of(),
cacheManager.observedBootstrapSegments);
- Assert.assertEquals(ImmutableList.of(),
cacheManager.observedBootstrapSegmentsLoadedIntoPageCache);
-
- handler.stop();
- Assert.assertEquals(0, serverAnnouncer.getObservedCount());
+ Assert.assertEquals(expectedSegments, cacheManager.getObservedSegments());
+ Assert.assertEquals(expectedSegments,
cacheManager.getObservedSegmentsLoadedIntoPageCache());
+ Assert.assertEquals(ImmutableList.of(),
cacheManager.getObservedBootstrapSegments());
+ Assert.assertEquals(ImmutableList.of(),
cacheManager.getObservedBootstrapSegmentsLoadedIntoPageCache());
}
@Test(timeout = 60_000L)
@@ -465,9 +267,6 @@ public class SegmentLoadDropHandlerTest
final SegmentLoadDropHandler handler =
initSegmentLoadDropHandler(segmentManager);
- handler.start();
-
- Assert.assertEquals(1, serverAnnouncer.getObservedCount());
DataSegment segment1 = makeSegment("batchtest1", "1",
Intervals.of("P1d/2011-04-01"));
List<DataSegmentChangeRequest> batch = ImmutableList.of(new
SegmentChangeRequestLoad(segment1));
@@ -489,8 +288,6 @@ public class SegmentLoadDropHandlerTest
Assert.assertEquals(State.SUCCESS, result.get(0).getStatus().getState());
Assert.assertEquals(ImmutableList.of(segment1, segment1),
segmentAnnouncer.getObservedSegments());
- handler.stop();
- Assert.assertEquals(0, serverAnnouncer.getObservedCount());
}
@Test(timeout = 60_000L)
@@ -538,13 +335,9 @@ public class SegmentLoadDropHandlerTest
final SegmentLoadDropHandler handler = initSegmentLoadDropHandler(
noAnnouncerSegmentLoaderConfig,
- segmentManager,
- coordinatorClient
+ segmentManager
);
- handler.start();
-
- Assert.assertEquals(1, serverAnnouncer.getObservedCount());
final DataSegment segment1 = makeSegment("batchtest1", "1",
Intervals.of("P1d/2011-04-01"));
List<DataSegmentChangeRequest> batch = ImmutableList.of(new
SegmentChangeRequestLoad(segment1));
@@ -611,149 +404,23 @@ public class SegmentLoadDropHandlerTest
Mockito.verify(segmentManager, Mockito.times(1))
.dropSegment(ArgumentMatchers.any());
- handler.stop();
- Assert.assertEquals(0, serverAnnouncer.getObservedCount());
- }
-
- private SegmentLoadDropHandler initSegmentLoadDropHandler(SegmentManager
segmentManager, CoordinatorClient coordinatorClient)
- {
- return initSegmentLoadDropHandler(segmentLoaderConfig, segmentManager,
coordinatorClient);
}
private SegmentLoadDropHandler initSegmentLoadDropHandler(SegmentManager
segmentManager)
{
- return initSegmentLoadDropHandler(segmentLoaderConfig, segmentManager,
coordinatorClient);
+ return initSegmentLoadDropHandler(segmentLoaderConfig, segmentManager);
}
private SegmentLoadDropHandler initSegmentLoadDropHandler(
SegmentLoaderConfig config,
- SegmentManager segmentManager,
- CoordinatorClient coordinatorClient
+ SegmentManager segmentManager
)
{
return new SegmentLoadDropHandler(
config,
segmentAnnouncer,
- serverAnnouncer,
segmentManager,
- scheduledExecutorFactory.create(5, "SegmentLoadDropHandlerTest-[%d]"),
- new ServerTypeConfig(ServerType.HISTORICAL),
- coordinatorClient,
- serviceEmitter
+ scheduledExecutorFactory.create(5, "SegmentLoadDropHandlerTest-[%d]")
);
}
-
- private DataSegment makeSegment(String dataSource, String version, Interval
interval)
- {
- return TestSegmentUtils.makeSegment(dataSource, version, interval);
- }
-
- /**
- * A local cache manager to test the bootstrapping and segment add/remove
operations. It stubs only the necessary
- * methods to support these operations; any other method invoked will throw
an exception from the base class,
- * {@link NoopSegmentCacheManager}.
- */
- private static class TestSegmentCacheManager extends NoopSegmentCacheManager
- {
- private final List<DataSegment> cachedSegments;
-
- private final List<DataSegment> observedBootstrapSegments;
- private final List<DataSegment>
observedBootstrapSegmentsLoadedIntoPageCache;
- private final List<DataSegment> observedSegments;
- private final List<DataSegment> observedSegmentsLoadedIntoPageCache;
- private final List<DataSegment> observedSegmentsRemovedFromCache;
- private final AtomicInteger observedShutdownBootstrapCount;
-
- TestSegmentCacheManager()
- {
- this(ImmutableSet.of());
- }
-
- TestSegmentCacheManager(final Set<DataSegment> segmentsToCache)
- {
- this.cachedSegments = ImmutableList.copyOf(segmentsToCache);
- this.observedBootstrapSegments = new ArrayList<>();
- this.observedBootstrapSegmentsLoadedIntoPageCache = new ArrayList<>();
- this.observedSegments = new ArrayList<>();
- this.observedSegmentsLoadedIntoPageCache = new ArrayList<>();
- this.observedSegmentsRemovedFromCache = new ArrayList<>();
- this.observedShutdownBootstrapCount = new AtomicInteger(0);
- }
-
- @Override
- public boolean canHandleSegments()
- {
- return true;
- }
-
- @Override
- public List<DataSegment> getCachedSegments()
- {
- return cachedSegments;
- }
-
- @Override
- public ReferenceCountingSegment getBootstrapSegment(DataSegment segment,
SegmentLazyLoadFailCallback loadFailed)
- {
- observedBootstrapSegments.add(segment);
- return getSegmentInternal(segment);
- }
-
- @Override
- public ReferenceCountingSegment getSegment(final DataSegment segment)
- {
- observedSegments.add(segment);
- return getSegmentInternal(segment);
- }
-
- private ReferenceCountingSegment getSegmentInternal(final DataSegment
segment)
- {
- if (segment.isTombstone()) {
- return ReferenceCountingSegment
-
.wrapSegment(TombstoneSegmentizerFactory.segmentForTombstone(segment),
segment.getShardSpec());
- } else {
- return ReferenceCountingSegment.wrapSegment(
- new TestSegmentUtils.SegmentForTesting(
- segment.getDataSource(),
- (Interval) segment.getLoadSpec().get("interval"),
- MapUtils.getString(segment.getLoadSpec(), "version")
- ), segment.getShardSpec()
- );
- }
- }
-
- @Override
- public void loadSegmentIntoPageCache(DataSegment segment)
- {
- observedSegmentsLoadedIntoPageCache.add(segment);
- }
-
- @Override
- public void loadSegmentIntoPageCacheOnBootstrap(DataSegment segment)
- {
- observedBootstrapSegmentsLoadedIntoPageCache.add(segment);
- }
-
- @Override
- public void shutdownBootstrap()
- {
- observedShutdownBootstrapCount.incrementAndGet();
- }
-
- @Override
- public void storeInfoFile(DataSegment segment)
- {
- }
-
- @Override
- public void removeInfoFile(DataSegment segment)
- {
- }
-
- @Override
- public void cleanup(DataSegment segment)
- {
- observedSegmentsRemovedFromCache.add(segment);
- }
- }
}
diff --git
a/server/src/test/java/org/apache/druid/server/coordination/TestSegmentCacheManager.java
b/server/src/test/java/org/apache/druid/server/coordination/TestSegmentCacheManager.java
new file mode 100644
index 00000000000..2cd5e8e61fe
--- /dev/null
+++
b/server/src/test/java/org/apache/druid/server/coordination/TestSegmentCacheManager.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordination;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import org.apache.druid.java.util.common.MapUtils;
+import org.apache.druid.segment.ReferenceCountingSegment;
+import org.apache.druid.segment.SegmentLazyLoadFailCallback;
+import org.apache.druid.segment.loading.NoopSegmentCacheManager;
+import org.apache.druid.segment.loading.TombstoneSegmentizerFactory;
+import org.apache.druid.server.TestSegmentUtils;
+import org.apache.druid.timeline.DataSegment;
+import org.joda.time.Interval;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * A local cache manager to test the bootstrapping and segment add/remove
operations. It stubs only the necessary
+ * methods to support these operations; any other method invoked will throw an
exception from the base class,
+ * {@link NoopSegmentCacheManager}.
+ */
+class TestSegmentCacheManager extends NoopSegmentCacheManager
+{
+ private final List<DataSegment> cachedSegments;
+
+ private final List<DataSegment> observedBootstrapSegments;
+ private final List<DataSegment> observedBootstrapSegmentsLoadedIntoPageCache;
+ private final List<DataSegment> observedSegments;
+ private final List<DataSegment> observedSegmentsLoadedIntoPageCache;
+ private final List<DataSegment> observedSegmentsRemovedFromCache;
+ private final AtomicInteger observedShutdownBootstrapCount;
+
+ TestSegmentCacheManager()
+ {
+ this(ImmutableSet.of());
+ }
+
+ TestSegmentCacheManager(final Set<DataSegment> segmentsToCache)
+ {
+ this.cachedSegments = ImmutableList.copyOf(segmentsToCache);
+ this.observedBootstrapSegments = new ArrayList<>();
+ this.observedBootstrapSegmentsLoadedIntoPageCache = new ArrayList<>();
+ this.observedSegments = new ArrayList<>();
+ this.observedSegmentsLoadedIntoPageCache = new ArrayList<>();
+ this.observedSegmentsRemovedFromCache = new ArrayList<>();
+ this.observedShutdownBootstrapCount = new AtomicInteger(0);
+ }
+
+ @Override
+ public boolean canHandleSegments()
+ {
+ return true;
+ }
+
+ @Override
+ public List<DataSegment> getCachedSegments()
+ {
+ return cachedSegments;
+ }
+
+ @Override
+ public ReferenceCountingSegment getBootstrapSegment(DataSegment segment,
SegmentLazyLoadFailCallback loadFailed)
+ {
+ observedBootstrapSegments.add(segment);
+ return getSegmentInternal(segment);
+ }
+
+ @Override
+ public ReferenceCountingSegment getSegment(final DataSegment segment)
+ {
+ observedSegments.add(segment);
+ return getSegmentInternal(segment);
+ }
+
+ private ReferenceCountingSegment getSegmentInternal(final DataSegment
segment)
+ {
+ if (segment.isTombstone()) {
+ return ReferenceCountingSegment
+
.wrapSegment(TombstoneSegmentizerFactory.segmentForTombstone(segment),
segment.getShardSpec());
+ } else {
+ return ReferenceCountingSegment.wrapSegment(
+ new TestSegmentUtils.SegmentForTesting(
+ segment.getDataSource(),
+ (Interval) segment.getLoadSpec().get("interval"),
+ MapUtils.getString(segment.getLoadSpec(), "version")
+ ), segment.getShardSpec()
+ );
+ }
+ }
+
+ @Override
+ public void loadSegmentIntoPageCache(DataSegment segment)
+ {
+ observedSegmentsLoadedIntoPageCache.add(segment);
+ }
+
+ @Override
+ public void loadSegmentIntoPageCacheOnBootstrap(DataSegment segment)
+ {
+ observedBootstrapSegmentsLoadedIntoPageCache.add(segment);
+ }
+
+ @Override
+ public void shutdownBootstrap()
+ {
+ observedShutdownBootstrapCount.incrementAndGet();
+ }
+
+ @Override
+ public void storeInfoFile(DataSegment segment)
+ {
+ }
+
+ @Override
+ public void removeInfoFile(DataSegment segment)
+ {
+ }
+
+ @Override
+ public void cleanup(DataSegment segment)
+ {
+ observedSegmentsRemovedFromCache.add(segment);
+ }
+
+ public List<DataSegment> getObservedBootstrapSegments()
+ {
+ return observedBootstrapSegments;
+ }
+
+ public List<DataSegment> getObservedBootstrapSegmentsLoadedIntoPageCache()
+ {
+ return observedBootstrapSegmentsLoadedIntoPageCache;
+ }
+
+ public List<DataSegment> getObservedSegments()
+ {
+ return observedSegments;
+ }
+
+ public List<DataSegment> getObservedSegmentsLoadedIntoPageCache()
+ {
+ return observedSegmentsLoadedIntoPageCache;
+ }
+
+ public List<DataSegment> getObservedSegmentsRemovedFromCache()
+ {
+ return observedSegmentsRemovedFromCache;
+ }
+
+ public AtomicInteger getObservedShutdownBootstrapCount()
+ {
+ return observedShutdownBootstrapCount;
+ }
+}
diff --git
a/server/src/test/java/org/apache/druid/server/coordination/ZkCoordinatorTest.java
b/server/src/test/java/org/apache/druid/server/coordination/ZkCoordinatorTest.java
index 9f5291af598..a9f7772e59d 100644
---
a/server/src/test/java/org/apache/druid/server/coordination/ZkCoordinatorTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordination/ZkCoordinatorTest.java
@@ -23,7 +23,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import org.apache.curator.utils.ZKPaths;
import org.apache.druid.curator.CuratorTestBase;
-import org.apache.druid.guice.ServerTypeConfig;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.segment.IndexIO;
@@ -42,7 +41,6 @@ import org.junit.Test;
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ScheduledExecutorService;
/**
*/
@@ -103,12 +101,7 @@ public class ZkCoordinatorTest extends CuratorTestBase
SegmentLoadDropHandler segmentLoadDropHandler = new SegmentLoadDropHandler(
new SegmentLoaderConfig(),
EasyMock.createNiceMock(DataSegmentAnnouncer.class),
- EasyMock.createNiceMock(DataSegmentServerAnnouncer.class),
- EasyMock.createNiceMock(SegmentManager.class),
- EasyMock.createNiceMock(ScheduledExecutorService.class),
- new ServerTypeConfig(ServerType.HISTORICAL),
- new TestCoordinatorClient(),
- new NoopServiceEmitter()
+ EasyMock.createNiceMock(SegmentManager.class)
)
{
@Override
diff --git a/services/src/main/java/org/apache/druid/cli/CliBroker.java
b/services/src/main/java/org/apache/druid/cli/CliBroker.java
index 7fe5ec57631..94160f55779 100644
--- a/services/src/main/java/org/apache/druid/cli/CliBroker.java
+++ b/services/src/main/java/org/apache/druid/cli/CliBroker.java
@@ -63,6 +63,7 @@ import org.apache.druid.server.ResponseContextConfig;
import org.apache.druid.server.SegmentManager;
import org.apache.druid.server.SubqueryGuardrailHelper;
import org.apache.druid.server.SubqueryGuardrailHelperProvider;
+import org.apache.druid.server.coordination.SegmentBootstrapper;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.coordination.ZkCoordinator;
import org.apache.druid.server.http.BrokerResource;
@@ -172,6 +173,7 @@ public class CliBroker extends ServerRunnable
if (isZkEnabled) {
LifecycleModule.register(binder, ZkCoordinator.class);
}
+ LifecycleModule.register(binder, SegmentBootstrapper.class);
bindAnnouncer(
binder,
diff --git a/services/src/main/java/org/apache/druid/cli/CliHistorical.java
b/services/src/main/java/org/apache/druid/cli/CliHistorical.java
index dc1acc41f87..2e231bcdcc3 100644
--- a/services/src/main/java/org/apache/druid/cli/CliHistorical.java
+++ b/services/src/main/java/org/apache/druid/cli/CliHistorical.java
@@ -49,6 +49,7 @@ import org.apache.druid.query.lookup.LookupModule;
import org.apache.druid.server.QueryResource;
import org.apache.druid.server.ResponseContextConfig;
import org.apache.druid.server.SegmentManager;
+import org.apache.druid.server.coordination.SegmentBootstrapper;
import org.apache.druid.server.coordination.ServerManager;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.coordination.ZkCoordinator;
@@ -125,6 +126,7 @@ public class CliHistorical extends ServerRunnable
if (isZkEnabled) {
LifecycleModule.register(binder, ZkCoordinator.class);
}
+ LifecycleModule.register(binder, SegmentBootstrapper.class);
JsonConfigProvider.bind(binder, "druid.historical.cache",
CacheConfig.class);
binder.install(new CacheModule());
diff --git a/services/src/main/java/org/apache/druid/cli/CliIndexer.java
b/services/src/main/java/org/apache/druid/cli/CliIndexer.java
index c6b817fa4a9..312b6f6b05a 100644
--- a/services/src/main/java/org/apache/druid/cli/CliIndexer.java
+++ b/services/src/main/java/org/apache/druid/cli/CliIndexer.java
@@ -74,6 +74,7 @@ import
org.apache.druid.segment.realtime.appenderator.UnifiedIndexerAppenderator
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.ResponseContextConfig;
import org.apache.druid.server.SegmentManager;
+import org.apache.druid.server.coordination.SegmentBootstrapper;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.coordination.ZkCoordinator;
import org.apache.druid.server.http.HistoricalResource;
@@ -187,6 +188,7 @@ public class CliIndexer extends ServerRunnable
if (isZkEnabled) {
LifecycleModule.register(binder, ZkCoordinator.class);
}
+ LifecycleModule.register(binder, SegmentBootstrapper.class);
bindAnnouncer(
binder,
diff --git a/services/src/main/java/org/apache/druid/cli/CliPeon.java
b/services/src/main/java/org/apache/druid/cli/CliPeon.java
index 1ca8ddf539f..f78a763cec4 100644
--- a/services/src/main/java/org/apache/druid/cli/CliPeon.java
+++ b/services/src/main/java/org/apache/druid/cli/CliPeon.java
@@ -125,6 +125,7 @@ import
org.apache.druid.segment.realtime.firehose.ServiceAnnouncingChatHandlerPr
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.ResponseContextConfig;
import org.apache.druid.server.SegmentManager;
+import org.apache.druid.server.coordination.SegmentBootstrapper;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.coordination.ZkCoordinator;
import org.apache.druid.server.http.HistoricalResource;
@@ -553,6 +554,7 @@ public class CliPeon extends GuiceRunnable
if (isZkEnabled) {
LifecycleModule.register(binder, ZkCoordinator.class);
}
+ LifecycleModule.register(binder, SegmentBootstrapper.class);
}
@Provides
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]