This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new e5b40b0b8c3 Miscellaneous cleanup of load queue references (#16367)
e5b40b0b8c3 is described below
commit e5b40b0b8c3cd9f69975ef3976cbe87ee62e7ade
Author: Kashif Faraz <[email protected]>
AuthorDate: Thu May 2 15:59:50 2024 +0530
Miscellaneous cleanup of load queue references (#16367)
Changes:
- Rename `DataSegmentChangeRequestAndStatus` to `DataSegmentChangeResponse`
- Rename `SegmentLoadDropHandler.Status` to `SegmentChangeStatus`
- Remove method `CoordinatorRunStats.getSnapshotAndReset()` as it was used
only in
load queue peon implementations. Using an atomic reference is much simpler.
- Remove `ServerTestHelper.MAPPER`. Use existing
`TestHelper.makeJsonMapper()` instead.
---
integration-tests/k8s/tiny-cluster.yaml | 1 -
.../coordination/DataSegmentChangeResponse.java | 64 +++++++++
.../server/coordination/SegmentChangeStatus.java | 82 ++++++++++++
.../coordination/SegmentLoadDropHandler.java | 148 ++++-----------------
.../coordinator/loading/CuratorLoadQueuePeon.java | 9 +-
.../coordinator/loading/HttpLoadQueuePeon.java | 31 +++--
.../server/coordinator/loading/LoadQueuePeon.java | 3 +-
.../coordinator/stats/CoordinatorRunStats.java | 27 ----
.../druid/server/http/SegmentListerResource.java | 7 +-
.../server/metrics/HistoricalMetricsMonitor.java | 2 +-
.../druid/client/CachingClusteredClientTest.java | 3 +-
.../plumber/CustomVersioningPolicyTest.java | 8 +-
.../org/apache/druid/server/ServerTestHelper.java | 38 ------
.../coordination/SegmentLoadDropHandlerTest.java | 37 +++---
.../server/coordination/ZkCoordinatorTest.java | 6 +-
.../coordinator/CoordinatorRunStatsTest.java | 31 -----
...PeonTest.java => CuratorLoadQueuePeonTest.java} | 4 +-
.../coordinator/loading/HttpLoadQueuePeonTest.java | 24 ++--
.../simulate/TestSegmentLoadingHttpClient.java | 16 +--
.../metrics/HistoricalMetricsMonitorTest.java | 2 +-
20 files changed, 251 insertions(+), 292 deletions(-)
diff --git a/integration-tests/k8s/tiny-cluster.yaml
b/integration-tests/k8s/tiny-cluster.yaml
index 6db4dbf421b..0d44522c8ef 100644
--- a/integration-tests/k8s/tiny-cluster.yaml
+++ b/integration-tests/k8s/tiny-cluster.yaml
@@ -73,7 +73,6 @@ spec:
druid.discovery.type=k8s
druid.discovery.k8s.clusterIdentifier=druid-it
druid.serverview.type=http
- druid.coordinator.loadqueuepeon.type=http
druid.indexer.runner.type=httpRemote
# Metadata Store
diff --git
a/server/src/main/java/org/apache/druid/server/coordination/DataSegmentChangeResponse.java
b/server/src/main/java/org/apache/druid/server/coordination/DataSegmentChangeResponse.java
new file mode 100644
index 00000000000..06625409ca7
--- /dev/null
+++
b/server/src/main/java/org/apache/druid/server/coordination/DataSegmentChangeResponse.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordination;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * Response of a {@link DataSegmentChangeRequest}. Contains the request itself
+ * and the result {@link SegmentChangeStatus}.
+ */
+public class DataSegmentChangeResponse
+{
+ private final DataSegmentChangeRequest request;
+ private final SegmentChangeStatus status;
+
+ @JsonCreator
+ public DataSegmentChangeResponse(
+ @JsonProperty("request") DataSegmentChangeRequest request,
+ @JsonProperty("status") SegmentChangeStatus status
+ )
+ {
+ this.request = request;
+ this.status = status;
+ }
+
+ @JsonProperty
+ public DataSegmentChangeRequest getRequest()
+ {
+ return request;
+ }
+
+ @JsonProperty
+ public SegmentChangeStatus getStatus()
+ {
+ return status;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "DataSegmentChangeResponse{" +
+ "request=" + request +
+ ", status=" + status +
+ '}';
+ }
+}
diff --git
a/server/src/main/java/org/apache/druid/server/coordination/SegmentChangeStatus.java
b/server/src/main/java/org/apache/druid/server/coordination/SegmentChangeStatus.java
new file mode 100644
index 00000000000..c19d1e7914a
--- /dev/null
+++
b/server/src/main/java/org/apache/druid/server/coordination/SegmentChangeStatus.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordination;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+
+import javax.annotation.Nullable;
+
+/**
+ * Contains {@link State} of a {@link DataSegmentChangeRequest} and failure
+ * message, if any.
+ */
+public class SegmentChangeStatus
+{
+ public enum State
+ {
+ SUCCESS, FAILED, PENDING
+ }
+
+ private final State state;
+ @Nullable
+ private final String failureCause;
+
+ public static final SegmentChangeStatus SUCCESS = new
SegmentChangeStatus(State.SUCCESS, null);
+ public static final SegmentChangeStatus PENDING = new
SegmentChangeStatus(State.PENDING, null);
+
+ public static SegmentChangeStatus failed(String cause)
+ {
+ return new SegmentChangeStatus(State.FAILED, cause);
+ }
+
+ @JsonCreator
+ private SegmentChangeStatus(
+ @JsonProperty("state") State state,
+ @JsonProperty("failureCause") @Nullable String failureCause
+ )
+ {
+ this.state = Preconditions.checkNotNull(state, "state must be non-null");
+ this.failureCause = failureCause;
+ }
+
+ @JsonProperty
+ public State getState()
+ {
+ return state;
+ }
+
+ @Nullable
+ @JsonProperty
+ public String getFailureCause()
+ {
+ return failureCause;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "SegmentChangeStatus{" +
+ "state=" + state +
+ ", failureCause='" + failureCause + '\'' +
+ '}';
+ }
+}
diff --git
a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java
b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java
index 636894d214f..791de9b55da 100644
---
a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java
+++
b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java
@@ -19,11 +19,8 @@
package org.apache.druid.server.coordination;
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
@@ -98,7 +95,7 @@ public class SegmentLoadDropHandler implements
DataSegmentChangeHandler
// Keep history of load/drop request status in a LRU cache to maintain
idempotency if same request shows up
// again and to return status of a completed request. Maximum size of this
cache must be significantly greater
// than number of pending load/drop requests. so that history is not lost
too quickly.
- private final Cache<DataSegmentChangeRequest, AtomicReference<Status>>
requestStatuses;
+ private final Cache<DataSegmentChangeRequest,
AtomicReference<SegmentChangeStatus>> requestStatuses;
private final Object requestStatusesLock = new Object();
// This is the list of unresolved futures returned to callers of
processBatch(List<DataSegmentChangeRequest>)
@@ -320,7 +317,7 @@ public class SegmentLoadDropHandler implements
DataSegmentChangeHandler
@Override
public void addSegment(DataSegment segment, @Nullable
DataSegmentChangeCallback callback)
{
- Status result = null;
+ SegmentChangeStatus result = null;
try {
log.info("Loading segment %s", segment.getId());
/*
@@ -349,13 +346,13 @@ public class SegmentLoadDropHandler implements
DataSegmentChangeHandler
throw new SegmentLoadingException(e, "Failed to announce segment[%s]",
segment.getId());
}
- result = Status.SUCCESS;
+ result = SegmentChangeStatus.SUCCESS;
}
catch (Throwable e) {
log.makeAlert(e, "Failed to load segment for dataSource")
.addData("segment", segment)
.emit();
- result = Status.failed(e.toString());
+ result = SegmentChangeStatus.failed(e.toString());
}
finally {
updateRequestStatus(new SegmentChangeRequestLoad(segment), result);
@@ -466,7 +463,7 @@ public class SegmentLoadDropHandler implements
DataSegmentChangeHandler
final boolean scheduleDrop
)
{
- Status result = null;
+ SegmentChangeStatus result = null;
try {
announcer.unannounceSegment(segment);
segmentsToDelete.add(segment);
@@ -506,13 +503,13 @@ public class SegmentLoadDropHandler implements
DataSegmentChangeHandler
runnable.run();
}
- result = Status.SUCCESS;
+ result = SegmentChangeStatus.SUCCESS;
}
catch (Exception e) {
log.makeAlert(e, "Failed to remove segment")
.addData("segment", segment)
.emit();
- result = Status.failed(e.getMessage());
+ result = SegmentChangeStatus.failed(e.getMessage());
}
finally {
updateRequestStatus(new SegmentChangeRequestDrop(segment), result);
@@ -522,20 +519,20 @@ public class SegmentLoadDropHandler implements
DataSegmentChangeHandler
}
}
- public Collection<DataSegment> getPendingDeleteSnapshot()
+ public Collection<DataSegment> getSegmentsToDelete()
{
return ImmutableList.copyOf(segmentsToDelete);
}
- public ListenableFuture<List<DataSegmentChangeRequestAndStatus>>
processBatch(List<DataSegmentChangeRequest> changeRequests)
+ public ListenableFuture<List<DataSegmentChangeResponse>>
processBatch(List<DataSegmentChangeRequest> changeRequests)
{
boolean isAnyRequestDone = false;
- Map<DataSegmentChangeRequest, AtomicReference<Status>> statuses =
Maps.newHashMapWithExpectedSize(changeRequests.size());
+ Map<DataSegmentChangeRequest, AtomicReference<SegmentChangeStatus>>
statuses = Maps.newHashMapWithExpectedSize(changeRequests.size());
for (DataSegmentChangeRequest cr : changeRequests) {
- AtomicReference<Status> status = processRequest(cr);
- if (status.get().getState() != Status.STATE.PENDING) {
+ AtomicReference<SegmentChangeStatus> status = processRequest(cr);
+ if (status.get().getState() != SegmentChangeStatus.State.PENDING) {
isAnyRequestDone = true;
}
statuses.put(cr, status);
@@ -554,20 +551,20 @@ public class SegmentLoadDropHandler implements
DataSegmentChangeHandler
return future;
}
- private AtomicReference<Status> processRequest(DataSegmentChangeRequest
changeRequest)
+ private AtomicReference<SegmentChangeStatus>
processRequest(DataSegmentChangeRequest changeRequest)
{
synchronized (requestStatusesLock) {
- AtomicReference<Status> status =
requestStatuses.getIfPresent(changeRequest);
+ AtomicReference<SegmentChangeStatus> status =
requestStatuses.getIfPresent(changeRequest);
// If last load/drop request status is failed, here can try that again
- if (status == null || status.get().getState() == Status.STATE.FAILED) {
+ if (status == null || status.get().getState() ==
SegmentChangeStatus.State.FAILED) {
changeRequest.go(
new DataSegmentChangeHandler()
{
@Override
public void addSegment(DataSegment segment,
DataSegmentChangeCallback callback)
{
- requestStatuses.put(changeRequest, new
AtomicReference<>(Status.PENDING));
+ requestStatuses.put(changeRequest, new
AtomicReference<>(SegmentChangeStatus.PENDING));
exec.submit(
() -> SegmentLoadDropHandler.this.addSegment(
((SegmentChangeRequestLoad)
changeRequest).getSegment(),
@@ -579,7 +576,7 @@ public class SegmentLoadDropHandler implements
DataSegmentChangeHandler
@Override
public void removeSegment(DataSegment segment,
DataSegmentChangeCallback callback)
{
- requestStatuses.put(changeRequest, new
AtomicReference<>(Status.PENDING));
+ requestStatuses.put(changeRequest, new
AtomicReference<>(SegmentChangeStatus.PENDING));
SegmentLoadDropHandler.this.removeSegment(
((SegmentChangeRequestDrop) changeRequest).getSegment(),
() -> resolveWaitingFutures(),
@@ -589,7 +586,7 @@ public class SegmentLoadDropHandler implements
DataSegmentChangeHandler
},
this::resolveWaitingFutures
);
- } else if (status.get().getState() == Status.STATE.SUCCESS) {
+ } else if (status.get().getState() == SegmentChangeStatus.State.SUCCESS)
{
// SUCCESS case, we'll clear up the cached success while serving it to
this client
// Not doing this can lead to an incorrect response to upcoming
clients for a reload
requestStatuses.invalidate(changeRequest);
@@ -599,13 +596,13 @@ public class SegmentLoadDropHandler implements
DataSegmentChangeHandler
}
}
- private void updateRequestStatus(DataSegmentChangeRequest changeRequest,
Status result)
+ private void updateRequestStatus(DataSegmentChangeRequest changeRequest,
SegmentChangeStatus result)
{
if (result == null) {
- result = Status.failed("Unknown reason. Check server logs.");
+ result = SegmentChangeStatus.failed("Unknown reason. Check server
logs.");
}
synchronized (requestStatusesLock) {
- AtomicReference<Status> statusRef =
requestStatuses.getIfPresent(changeRequest);
+ AtomicReference<SegmentChangeStatus> statusRef =
requestStatuses.getIfPresent(changeRequest);
if (statusRef != null) {
statusRef.set(result);
}
@@ -773,14 +770,14 @@ public class SegmentLoadDropHandler implements
DataSegmentChangeHandler
}
// Future with cancel() implementation to remove it from "waitingFutures"
list
- private class CustomSettableFuture extends
AbstractFuture<List<DataSegmentChangeRequestAndStatus>>
+ private class CustomSettableFuture extends
AbstractFuture<List<DataSegmentChangeResponse>>
{
private final LinkedHashSet<CustomSettableFuture> waitingFutures;
- private final Map<DataSegmentChangeRequest, AtomicReference<Status>>
statusRefs;
+ private final Map<DataSegmentChangeRequest,
AtomicReference<SegmentChangeStatus>> statusRefs;
private CustomSettableFuture(
LinkedHashSet<CustomSettableFuture> waitingFutures,
- Map<DataSegmentChangeRequest, AtomicReference<Status>> statusRefs
+ Map<DataSegmentChangeRequest, AtomicReference<SegmentChangeStatus>>
statusRefs
)
{
this.waitingFutures = waitingFutures;
@@ -794,14 +791,14 @@ public class SegmentLoadDropHandler implements
DataSegmentChangeHandler
return;
}
- final List<DataSegmentChangeRequestAndStatus> result = new
ArrayList<>(statusRefs.size());
+ final List<DataSegmentChangeResponse> result = new
ArrayList<>(statusRefs.size());
statusRefs.forEach((request, statusRef) -> {
// Remove complete statuses from the cache
- final Status status = statusRef.get();
- if (status != null && status.getState() != Status.STATE.PENDING) {
+ final SegmentChangeStatus status = statusRef.get();
+ if (status != null && status.getState() !=
SegmentChangeStatus.State.PENDING) {
requestStatuses.invalidate(request);
}
- result.add(new DataSegmentChangeRequestAndStatus(request, status));
+ result.add(new DataSegmentChangeResponse(request, status));
});
set(result);
@@ -818,94 +815,5 @@ public class SegmentLoadDropHandler implements
DataSegmentChangeHandler
}
}
- public static class Status
- {
- public enum STATE
- {
- SUCCESS, FAILED, PENDING
- }
-
- private final STATE state;
- @Nullable
- private final String failureCause;
-
- public static final Status SUCCESS = new Status(STATE.SUCCESS, null);
- public static final Status PENDING = new Status(STATE.PENDING, null);
-
- @JsonCreator
- Status(
- @JsonProperty("state") STATE state,
- @JsonProperty("failureCause") @Nullable String failureCause
- )
- {
- Preconditions.checkNotNull(state, "state must be non-null");
- this.state = state;
- this.failureCause = failureCause;
- }
-
- public static Status failed(String cause)
- {
- return new Status(STATE.FAILED, cause);
- }
-
- @JsonProperty
- public STATE getState()
- {
- return state;
- }
-
- @Nullable
- @JsonProperty
- public String getFailureCause()
- {
- return failureCause;
- }
-
- @Override
- public String toString()
- {
- return "Status{" +
- "state=" + state +
- ", failureCause='" + failureCause + '\'' +
- '}';
- }
- }
-
- public static class DataSegmentChangeRequestAndStatus
- {
- private final DataSegmentChangeRequest request;
- private final Status status;
-
- @JsonCreator
- public DataSegmentChangeRequestAndStatus(
- @JsonProperty("request") DataSegmentChangeRequest request,
- @JsonProperty("status") Status status
- )
- {
- this.request = request;
- this.status = status;
- }
-
- @JsonProperty
- public DataSegmentChangeRequest getRequest()
- {
- return request;
- }
-
- @JsonProperty
- public Status getStatus()
- {
- return status;
- }
-
- @Override
- public String toString()
- {
- return "DataSegmentChangeRequestAndStatus{" +
- "request=" + request +
- ", status=" + status +
- '}';
- }
- }
}
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/loading/CuratorLoadQueuePeon.java
b/server/src/main/java/org/apache/druid/server/coordinator/loading/CuratorLoadQueuePeon.java
index a508251ab58..52b012a81d4 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/loading/CuratorLoadQueuePeon.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/loading/CuratorLoadQueuePeon.java
@@ -47,6 +47,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
/**
* Use {@link HttpLoadQueuePeon} instead.
@@ -75,7 +76,7 @@ public class CuratorLoadQueuePeon implements LoadQueuePeon
private final Duration loadTimeout;
private final AtomicLong queuedSize = new AtomicLong(0);
- private final CoordinatorRunStats stats = new CoordinatorRunStats();
+ private final AtomicReference<CoordinatorRunStats> stats = new
AtomicReference<>(new CoordinatorRunStats());
/**
* Needs to be thread safe since it can be concurrently accessed via
@@ -172,7 +173,7 @@ public class CuratorLoadQueuePeon implements LoadQueuePeon
@Override
public CoordinatorRunStats getAndResetStats()
{
- return stats.getSnapshotAndReset();
+ return stats.getAndSet(new CoordinatorRunStats());
}
@Override
@@ -360,7 +361,7 @@ public class CuratorLoadQueuePeon implements LoadQueuePeon
timedOutSegments.clear();
queuedSize.set(0L);
- stats.clear();
+ stats.get().clear();
}
private void onZkNodeDeleted(SegmentHolder segmentHolder, String path)
@@ -388,7 +389,7 @@ public class CuratorLoadQueuePeon implements LoadQueuePeon
if (e != null) {
log.error(e, "Server[%s], throwable caught when submitting [%s].",
basePath, segmentHolder);
}
- stats.add(Stats.SegmentQueue.FAILED_ACTIONS, 1);
+ stats.get().add(Stats.SegmentQueue.FAILED_ACTIONS, 1);
if (handleTimeout) {
// Avoid removing the segment entry from the load/drop list in case
config.getLoadTimeoutDelay() expires.
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java
b/server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java
index 4ec6f2427a3..cb32f95516b 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java
@@ -35,7 +35,8 @@ import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.server.coordination.DataSegmentChangeCallback;
import org.apache.druid.server.coordination.DataSegmentChangeHandler;
import org.apache.druid.server.coordination.DataSegmentChangeRequest;
-import org.apache.druid.server.coordination.SegmentLoadDropHandler;
+import org.apache.druid.server.coordination.DataSegmentChangeResponse;
+import org.apache.druid.server.coordination.SegmentChangeStatus;
import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler;
import org.apache.druid.server.coordinator.config.HttpLoadQueuePeonConfig;
import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
@@ -66,6 +67,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
/**
*
@@ -77,15 +79,15 @@ public class HttpLoadQueuePeon implements LoadQueuePeon
{
};
- public static final
TypeReference<List<SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus>>
RESPONSE_ENTITY_TYPE_REF =
- new
TypeReference<List<SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus>>()
+ public static final TypeReference<List<DataSegmentChangeResponse>>
RESPONSE_ENTITY_TYPE_REF =
+ new TypeReference<List<DataSegmentChangeResponse>>()
{
};
private static final EmittingLogger log = new
EmittingLogger(HttpLoadQueuePeon.class);
private final AtomicLong queuedSize = new AtomicLong(0);
- private final CoordinatorRunStats stats = new CoordinatorRunStats();
+ private final AtomicReference<CoordinatorRunStats> stats = new
AtomicReference<>(new CoordinatorRunStats());
private final ConcurrentMap<DataSegment, SegmentHolder> segmentsToLoad = new
ConcurrentHashMap<>();
private final ConcurrentMap<DataSegment, SegmentHolder> segmentsToDrop = new
ConcurrentHashMap<>();
@@ -191,10 +193,7 @@ public class HttpLoadQueuePeon implements LoadQueuePeon
if (newRequests.size() == 0) {
log.trace(
"[%s]Found no load/drop requests. SegmentsToLoad[%d],
SegmentsToDrop[%d], batchSize[%d].",
- serverId,
- segmentsToLoad.size(),
- segmentsToDrop.size(),
- config.getBatchSize()
+ serverId, segmentsToLoad.size(), segmentsToDrop.size(),
config.getBatchSize()
);
mainLoopInProgress.set(false);
return;
@@ -225,7 +224,7 @@ public class HttpLoadQueuePeon implements LoadQueuePeon
log.trace("Received NO CONTENT reseponse from [%s]",
serverId);
} else if (HttpServletResponse.SC_OK ==
responseHandler.getStatus()) {
try {
-
List<SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus> statuses =
+ List<DataSegmentChangeResponse> statuses =
jsonMapper.readValue(result, RESPONSE_ENTITY_TYPE_REF);
log.trace("Server[%s] returned status response [%s].",
serverId, statuses);
synchronized (lock) {
@@ -235,7 +234,7 @@ public class HttpLoadQueuePeon implements LoadQueuePeon
return;
}
- for
(SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus e : statuses) {
+ for (DataSegmentChangeResponse e : statuses) {
switch (e.getStatus().getState()) {
case SUCCESS:
case FAILED:
@@ -300,7 +299,7 @@ public class HttpLoadQueuePeon implements LoadQueuePeon
}
}
- private void handleResponseStatus(DataSegmentChangeRequest changeRequest,
SegmentLoadDropHandler.Status status)
+ private void handleResponseStatus(DataSegmentChangeRequest changeRequest,
SegmentChangeStatus status)
{
changeRequest.go(
new DataSegmentChangeHandler()
@@ -317,7 +316,7 @@ public class HttpLoadQueuePeon implements LoadQueuePeon
updateSuccessOrFailureInHolder(segmentsToDrop.remove(segment),
status);
}
- private void updateSuccessOrFailureInHolder(SegmentHolder holder,
SegmentLoadDropHandler.Status status)
+ private void updateSuccessOrFailureInHolder(SegmentHolder holder,
SegmentChangeStatus status)
{
if (holder == null) {
return;
@@ -325,7 +324,7 @@ public class HttpLoadQueuePeon implements LoadQueuePeon
queuedSegments.remove(holder);
activeRequestSegments.remove(holder.getSegment());
- if (status.getState() ==
SegmentLoadDropHandler.Status.STATE.FAILED) {
+ if (status.getState() == SegmentChangeStatus.State.FAILED) {
onRequestFailed(holder, status.getFailureCause());
} else {
onRequestCompleted(holder, RequestStatus.SUCCESS);
@@ -380,7 +379,7 @@ public class HttpLoadQueuePeon implements LoadQueuePeon
queuedSegments.clear();
activeRequestSegments.clear();
queuedSize.set(0L);
- stats.clear();
+ stats.get().clear();
}
}
@@ -485,7 +484,7 @@ public class HttpLoadQueuePeon implements LoadQueuePeon
@Override
public CoordinatorRunStats getAndResetStats()
{
- return stats.getSnapshotAndReset();
+ return stats.getAndSet(new CoordinatorRunStats());
}
@Override
@@ -547,7 +546,7 @@ public class HttpLoadQueuePeon implements LoadQueuePeon
{
RowKey rowKey = RowKey.with(Dimension.DATASOURCE,
holder.getSegment().getDataSource())
.and(Dimension.DESCRIPTION,
holder.getAction().name());
- stats.add(status.datasourceStat, rowKey, 1);
+ stats.get().add(status.datasourceStat, rowKey, 1);
}
private void executeCallbacks(SegmentHolder holder, boolean success)
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/loading/LoadQueuePeon.java
b/server/src/main/java/org/apache/druid/server/coordinator/loading/LoadQueuePeon.java
index 3ab6d30faf5..49e5f9a7c08 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/loading/LoadQueuePeon.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/loading/LoadQueuePeon.java
@@ -25,8 +25,7 @@ import org.apache.druid.timeline.DataSegment;
import java.util.Set;
/**
- * This interface exists only to support configurable load queue management
via curator or http. Once HttpLoadQueuePeon
- * has been verified enough in production, CuratorLoadQueuePeon and this
interface would be removed.
+ * Supports load queue management.
*/
@Deprecated
public interface LoadQueuePeon
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/stats/CoordinatorRunStats.java
b/server/src/main/java/org/apache/druid/server/coordinator/stats/CoordinatorRunStats.java
index cbee9744524..5412230c2cc 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/stats/CoordinatorRunStats.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/stats/CoordinatorRunStats.java
@@ -26,10 +26,8 @@ import javax.annotation.concurrent.ThreadSafe;
import java.util.Collections;
import java.util.EnumMap;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
-import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
@@ -223,31 +221,6 @@ public class CoordinatorRunStats
.mergeLong(stat, value, Math::max);
}
- /**
- * Creates a new {@code CoordinatorRunStats} which represents the snapshot of
- * the stats collected so far in this instance.
- * <p>
- * While this method is in progress, any updates made to the stats of this
- * instance by another thread are not guaranteed to be present in the
snapshot.
- * But the snapshots are consistent, i.e. stats present in the snapshot
created
- * in one invocation of this method are permanently removed from this
instance
- * and will not be present in subsequent snapshots.
- *
- * @return Snapshot of the current state of this {@code CoordinatorRunStats}.
- */
- public CoordinatorRunStats getSnapshotAndReset()
- {
- final CoordinatorRunStats snapshot = new
CoordinatorRunStats(debugDimensions);
-
- // Get a snapshot of all the keys, remove and copy each of them atomically
- final Set<RowKey> keys = new HashSet<>(allStats.keySet());
- for (RowKey key : keys) {
- snapshot.allStats.put(key, allStats.remove(key));
- }
-
- return snapshot;
- }
-
/**
* Checks if the given rowKey has any of the debug dimensions.
*/
diff --git
a/server/src/main/java/org/apache/druid/server/http/SegmentListerResource.java
b/server/src/main/java/org/apache/druid/server/http/SegmentListerResource.java
index 1b6f8326773..1486355a776 100644
---
a/server/src/main/java/org/apache/druid/server/http/SegmentListerResource.java
+++
b/server/src/main/java/org/apache/druid/server/http/SegmentListerResource.java
@@ -35,6 +35,7 @@ import
org.apache.druid.server.coordination.BatchDataSegmentAnnouncer;
import org.apache.druid.server.coordination.ChangeRequestHistory;
import org.apache.druid.server.coordination.ChangeRequestsSnapshot;
import org.apache.druid.server.coordination.DataSegmentChangeRequest;
+import org.apache.druid.server.coordination.DataSegmentChangeResponse;
import org.apache.druid.server.coordination.SegmentLoadDropHandler;
import org.apache.druid.server.coordinator.loading.HttpLoadQueuePeon;
import org.apache.druid.server.http.security.StateResourceFilter;
@@ -250,7 +251,7 @@ public class SegmentListerResource
}
final ResponseContext context = createContext(req.getHeader("Accept"));
- final
ListenableFuture<List<SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus>>
future =
+ final ListenableFuture<List<DataSegmentChangeResponse>> future =
loadDropRequestHandler.processBatch(changeRequestList);
final AsyncContext asyncContext = req.startAsync();
@@ -286,10 +287,10 @@ public class SegmentListerResource
Futures.addCallback(
future,
- new
FutureCallback<List<SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus>>()
+ new FutureCallback<List<DataSegmentChangeResponse>>()
{
@Override
- public void
onSuccess(List<SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus> result)
+ public void onSuccess(List<DataSegmentChangeResponse> result)
{
try {
HttpServletResponse response = (HttpServletResponse)
asyncContext.getResponse();
diff --git
a/server/src/main/java/org/apache/druid/server/metrics/HistoricalMetricsMonitor.java
b/server/src/main/java/org/apache/druid/server/metrics/HistoricalMetricsMonitor.java
index ce21a2cef91..e7d6a74275b 100644
---
a/server/src/main/java/org/apache/druid/server/metrics/HistoricalMetricsMonitor.java
+++
b/server/src/main/java/org/apache/druid/server/metrics/HistoricalMetricsMonitor.java
@@ -58,7 +58,7 @@ public class HistoricalMetricsMonitor extends AbstractMonitor
final Object2LongOpenHashMap<String> pendingDeleteSizes = new
Object2LongOpenHashMap<>();
- for (DataSegment segment : segmentLoadDropMgr.getPendingDeleteSnapshot()) {
+ for (DataSegment segment : segmentLoadDropMgr.getSegmentsToDelete()) {
pendingDeleteSizes.addTo(segment.getDataSource(), segment.getSize());
}
diff --git
a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java
b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java
index 818c6b77a53..d7464e25a6c 100644
---
a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java
+++
b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java
@@ -116,7 +116,6 @@ import org.apache.druid.query.topn.TopNQueryQueryToolChest;
import org.apache.druid.query.topn.TopNResultValue;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.server.QueryScheduler;
-import org.apache.druid.server.ServerTestHelper;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.initialization.ServerConfig;
import org.apache.druid.server.metrics.NoopServiceEmitter;
@@ -1753,7 +1752,7 @@ public class CachingClusteredClientTest
partitions,
partitionDimensions,
partitionFunction,
- ServerTestHelper.MAPPER
+ TestHelper.makeJsonMapper()
),
null,
9,
diff --git
a/server/src/test/java/org/apache/druid/segment/realtime/plumber/CustomVersioningPolicyTest.java
b/server/src/test/java/org/apache/druid/segment/realtime/plumber/CustomVersioningPolicyTest.java
index 2def644cf5a..f145b0e1d79 100644
---
a/server/src/test/java/org/apache/druid/segment/realtime/plumber/CustomVersioningPolicyTest.java
+++
b/server/src/test/java/org/apache/druid/segment/realtime/plumber/CustomVersioningPolicyTest.java
@@ -19,7 +19,8 @@
package org.apache.druid.segment.realtime.plumber;
-import org.apache.druid.server.ServerTestHelper;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.segment.TestHelper;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.Interval;
@@ -37,8 +38,9 @@ public class CustomVersioningPolicyTest
CustomVersioningPolicy policy = new CustomVersioningPolicy(version);
- CustomVersioningPolicy serialized = ServerTestHelper.MAPPER.readValue(
- ServerTestHelper.MAPPER.writeValueAsBytes(policy),
+ final ObjectMapper mapper = TestHelper.makeJsonMapper();
+ CustomVersioningPolicy serialized = mapper.readValue(
+ mapper.writeValueAsBytes(policy),
CustomVersioningPolicy.class
);
diff --git a/server/src/test/java/org/apache/druid/server/ServerTestHelper.java
b/server/src/test/java/org/apache/druid/server/ServerTestHelper.java
deleted file mode 100644
index 784b79a7665..00000000000
--- a/server/src/test/java/org/apache/druid/server/ServerTestHelper.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.server;
-
-import com.fasterxml.jackson.databind.InjectableValues;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.druid.jackson.DefaultObjectMapper;
-import org.apache.druid.timeline.DataSegment.PruneSpecsHolder;
-
-public class ServerTestHelper
-{
- public static final ObjectMapper MAPPER = new DefaultObjectMapper();
-
- static {
- MAPPER.setInjectableValues(
- new InjectableValues.Std()
- .addValue(ObjectMapper.class.getName(), MAPPER)
- .addValue(PruneSpecsHolder.class, PruneSpecsHolder.DEFAULT)
- );
- }
-}
diff --git
a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java
b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java
index 7000dd54480..5e3fac5c44a 100644
---
a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java
@@ -39,8 +39,7 @@ import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.loading.StorageLocationConfig;
import org.apache.druid.segment.realtime.appenderator.SegmentSchemas;
import org.apache.druid.server.SegmentManager;
-import
org.apache.druid.server.coordination.SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus;
-import
org.apache.druid.server.coordination.SegmentLoadDropHandler.Status.STATE;
+import org.apache.druid.server.coordination.SegmentChangeStatus.State;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NoneShardSpec;
@@ -503,14 +502,14 @@ public class SegmentLoadDropHandlerTest
new SegmentChangeRequestDrop(segment2)
);
-
ListenableFuture<List<SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus>>
future = segmentLoadDropHandler
+ ListenableFuture<List<DataSegmentChangeResponse>> future =
segmentLoadDropHandler
.processBatch(batch);
- Map<DataSegmentChangeRequest, SegmentLoadDropHandler.Status>
expectedStatusMap = new HashMap<>();
- expectedStatusMap.put(batch.get(0), SegmentLoadDropHandler.Status.PENDING);
- expectedStatusMap.put(batch.get(1), SegmentLoadDropHandler.Status.SUCCESS);
- List<SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus> result =
future.get();
- for (SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus
requestAndStatus : result) {
+ Map<DataSegmentChangeRequest, SegmentChangeStatus> expectedStatusMap = new
HashMap<>();
+ expectedStatusMap.put(batch.get(0), SegmentChangeStatus.PENDING);
+ expectedStatusMap.put(batch.get(1), SegmentChangeStatus.SUCCESS);
+ List<DataSegmentChangeResponse> result = future.get();
+ for (DataSegmentChangeResponse requestAndStatus : result) {
Assert.assertEquals(expectedStatusMap.get(requestAndStatus.getRequest()),
requestAndStatus.getStatus());
}
@@ -519,7 +518,7 @@ public class SegmentLoadDropHandlerTest
}
result = segmentLoadDropHandler.processBatch(ImmutableList.of(new
SegmentChangeRequestLoad(segment1))).get();
- Assert.assertEquals(SegmentLoadDropHandler.Status.SUCCESS,
result.get(0).getStatus());
+ Assert.assertEquals(SegmentChangeStatus.SUCCESS,
result.get(0).getStatus());
segmentLoadDropHandler.stop();
}
@@ -549,21 +548,21 @@ public class SegmentLoadDropHandlerTest
List<DataSegmentChangeRequest> batch = ImmutableList.of(new
SegmentChangeRequestLoad(segment1));
- ListenableFuture<List<DataSegmentChangeRequestAndStatus>> future =
segmentLoadDropHandler
+ ListenableFuture<List<DataSegmentChangeResponse>> future =
segmentLoadDropHandler
.processBatch(batch);
for (Runnable runnable : scheduledRunnable) {
runnable.run();
}
- List<DataSegmentChangeRequestAndStatus> result = future.get();
- Assert.assertEquals(STATE.FAILED, result.get(0).getStatus().getState());
+ List<DataSegmentChangeResponse> result = future.get();
+ Assert.assertEquals(State.FAILED, result.get(0).getStatus().getState());
future = segmentLoadDropHandler.processBatch(batch);
for (Runnable runnable : scheduledRunnable) {
runnable.run();
}
result = future.get();
- Assert.assertEquals(STATE.SUCCESS, result.get(0).getStatus().getState());
+ Assert.assertEquals(State.SUCCESS, result.get(0).getStatus().getState());
segmentLoadDropHandler.stop();
}
@@ -597,13 +596,13 @@ public class SegmentLoadDropHandlerTest
List<DataSegmentChangeRequest> batch = ImmutableList.of(new
SegmentChangeRequestLoad(segment1));
// Request 1: Load the segment
- ListenableFuture<List<DataSegmentChangeRequestAndStatus>> future =
segmentLoadDropHandler
+ ListenableFuture<List<DataSegmentChangeResponse>> future =
segmentLoadDropHandler
.processBatch(batch);
for (Runnable runnable : scheduledRunnable) {
runnable.run();
}
- List<DataSegmentChangeRequestAndStatus> result = future.get();
- Assert.assertEquals(STATE.SUCCESS, result.get(0).getStatus().getState());
+ List<DataSegmentChangeResponse> result = future.get();
+ Assert.assertEquals(State.SUCCESS, result.get(0).getStatus().getState());
scheduledRunnable.clear();
// Request 2: Drop the segment
@@ -613,7 +612,7 @@ public class SegmentLoadDropHandlerTest
runnable.run();
}
result = future.get();
- Assert.assertEquals(STATE.SUCCESS, result.get(0).getStatus().getState());
+ Assert.assertEquals(State.SUCCESS, result.get(0).getStatus().getState());
scheduledRunnable.clear();
// check invocations after a load-drop sequence
@@ -633,7 +632,7 @@ public class SegmentLoadDropHandlerTest
runnable.run();
}
result = future.get();
- Assert.assertEquals(STATE.SUCCESS, result.get(0).getStatus().getState());
+ Assert.assertEquals(State.SUCCESS, result.get(0).getStatus().getState());
scheduledRunnable.clear();
// check invocations - 1 more load has happened
@@ -653,7 +652,7 @@ public class SegmentLoadDropHandlerTest
runnable.run();
}
result = future.get();
- Assert.assertEquals(STATE.SUCCESS, result.get(0).getStatus().getState());
+ Assert.assertEquals(State.SUCCESS, result.get(0).getStatus().getState());
scheduledRunnable.clear();
// check invocations - the load segment counter should bump up
diff --git
a/server/src/test/java/org/apache/druid/server/coordination/ZkCoordinatorTest.java
b/server/src/test/java/org/apache/druid/server/coordination/ZkCoordinatorTest.java
index 8356a26d33a..fd0547517b8 100644
---
a/server/src/test/java/org/apache/druid/server/coordination/ZkCoordinatorTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordination/ZkCoordinatorTest.java
@@ -28,11 +28,11 @@ import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.segment.IndexIO;
+import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.loading.SegmentCacheManager;
import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.loading.StorageLocationConfig;
import org.apache.druid.server.SegmentManager;
-import org.apache.druid.server.ServerTestHelper;
import org.apache.druid.server.initialization.ZkPathsConfig;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.timeline.DataSegment;
@@ -59,7 +59,7 @@ public class ZkCoordinatorTest extends CuratorTestBase
{
private static final Logger log = new Logger(ZkCoordinatorTest.class);
- private final ObjectMapper jsonMapper = ServerTestHelper.MAPPER;
+ private final ObjectMapper jsonMapper = TestHelper.makeJsonMapper();
private final DruidServerMetadata me = new DruidServerMetadata(
"dummyServer",
"dummyHost",
@@ -135,7 +135,7 @@ public class ZkCoordinatorTest extends CuratorTestBase
CountDownLatch dropLatch = new CountDownLatch(1);
SegmentLoadDropHandler segmentLoadDropHandler = new SegmentLoadDropHandler(
- ServerTestHelper.MAPPER,
+ jsonMapper,
new SegmentLoaderConfig() {
@Override
public File getInfoDir()
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/CoordinatorRunStatsTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/CoordinatorRunStatsTest.java
index 6b4a16f1402..6b9dc844396 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/CoordinatorRunStatsTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/CoordinatorRunStatsTest.java
@@ -74,37 +74,6 @@ public class CoordinatorRunStatsTest
Assert.assertEquals(1, stats.get(Stat.INFO_1, Key.TIER_1));
}
- @Test
- public void testGetSnapshotAndReset()
- {
- stats.add(Stat.ERROR_1, 1);
- stats.add(Stat.INFO_1, 3);
- stats.add(Stat.ERROR_1, Key.TIER_1, 5);
- stats.add(Stat.ERROR_1, Key.DUTY_1, 7);
-
- final CoordinatorRunStats firstFlush = stats.getSnapshotAndReset();
- Assert.assertEquals(1, firstFlush.get(Stat.ERROR_1));
- Assert.assertEquals(3, firstFlush.get(Stat.INFO_1));
- Assert.assertEquals(5, firstFlush.get(Stat.ERROR_1, Key.TIER_1));
- Assert.assertEquals(7, firstFlush.get(Stat.ERROR_1, Key.DUTY_1));
-
- Assert.assertEquals(0, stats.rowCount());
-
- stats.add(Stat.ERROR_1, 7);
- stats.add(Stat.ERROR_1, Key.TIER_1, 5);
- stats.add(Stat.INFO_1, Key.DUTY_1, 3);
- stats.add(Stat.INFO_2, Key.TIER_1, 1);
-
- final CoordinatorRunStats secondFlush = stats.getSnapshotAndReset();
-
- Assert.assertEquals(7, secondFlush.get(Stat.ERROR_1));
- Assert.assertEquals(5, secondFlush.get(Stat.ERROR_1, Key.TIER_1));
- Assert.assertEquals(3, secondFlush.get(Stat.INFO_1, Key.DUTY_1));
- Assert.assertEquals(1, secondFlush.get(Stat.INFO_2, Key.TIER_1));
-
- Assert.assertEquals(0, stats.rowCount());
- }
-
@Test
public void testUpdateMax()
{
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/loading/LoadQueuePeonTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/loading/CuratorLoadQueuePeonTest.java
similarity index 99%
rename from
server/src/test/java/org/apache/druid/server/coordinator/loading/LoadQueuePeonTest.java
rename to
server/src/test/java/org/apache/druid/server/coordinator/loading/CuratorLoadQueuePeonTest.java
index 3e1c816716c..e22b037707a 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/loading/LoadQueuePeonTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/loading/CuratorLoadQueuePeonTest.java
@@ -53,13 +53,13 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-public class LoadQueuePeonTest extends CuratorTestBase
+public class CuratorLoadQueuePeonTest extends CuratorTestBase
{
private static final String LOAD_QUEUE_PATH =
"/druid/loadqueue/localhost:1234";
private final ObjectMapper jsonMapper = TestHelper.makeJsonMapper();
- private LoadQueuePeon loadQueuePeon;
+ private CuratorLoadQueuePeon loadQueuePeon;
private PathChildrenCache loadQueueCache;
@Before
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeonTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeonTest.java
index de1801d7ad7..16251130e77 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeonTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeonTest.java
@@ -20,6 +20,7 @@
package org.apache.druid.server.coordinator.loading;
import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.java.util.common.RE;
@@ -27,11 +28,12 @@ import
org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.java.util.http.client.response.HttpResponseHandler;
-import org.apache.druid.server.ServerTestHelper;
+import org.apache.druid.segment.TestHelper;
import org.apache.druid.server.coordination.DataSegmentChangeCallback;
import org.apache.druid.server.coordination.DataSegmentChangeHandler;
import org.apache.druid.server.coordination.DataSegmentChangeRequest;
-import org.apache.druid.server.coordination.SegmentLoadDropHandler;
+import org.apache.druid.server.coordination.DataSegmentChangeResponse;
+import org.apache.druid.server.coordination.SegmentChangeStatus;
import org.apache.druid.server.coordinator.CreateDataSegments;
import org.apache.druid.server.coordinator.config.HttpLoadQueuePeonConfig;
import org.apache.druid.server.coordinator.simulate.BlockingExecutorService;
@@ -63,6 +65,7 @@ import java.util.stream.Collectors;
*/
public class HttpLoadQueuePeonTest
{
+ private static final ObjectMapper MAPPER = TestHelper.makeJsonMapper();
private final List<DataSegment> segments =
CreateDataSegments.ofDatasource("test")
.forIntervals(1, Granularities.DAY)
@@ -87,7 +90,7 @@ public class HttpLoadQueuePeonTest
httpLoadQueuePeon = new HttpLoadQueuePeon(
"http://dummy:4000",
- ServerTestHelper.MAPPER,
+ MAPPER,
httpClient,
new HttpLoadQueuePeonConfig(null, null, 10),
new WrappingScheduledExecutorService("HttpLoadQueuePeonTest-%s",
processingExecutor, true),
@@ -313,23 +316,22 @@ public class HttpLoadQueuePeonTest
httpResponse.setContent(ChannelBuffers.buffer(0));
httpResponseHandler.handleResponse(httpResponse, null);
try {
- List<DataSegmentChangeRequest> changeRequests =
ServerTestHelper.MAPPER.readValue(
+ List<DataSegmentChangeRequest> changeRequests = MAPPER.readValue(
request.getContent().array(), new
TypeReference<List<DataSegmentChangeRequest>>()
{
}
);
- List<SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus>
statuses = new ArrayList<>(changeRequests.size());
+ List<DataSegmentChangeResponse> statuses = new
ArrayList<>(changeRequests.size());
for (DataSegmentChangeRequest cr : changeRequests) {
cr.go(this, null);
- statuses.add(new
SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus(
- cr,
- SegmentLoadDropHandler.Status.SUCCESS
- ));
+ statuses.add(
+ new DataSegmentChangeResponse(cr, SegmentChangeStatus.SUCCESS)
+ );
}
- return (ListenableFuture) Futures.immediateFuture(
+ return (ListenableFuture<Final>) Futures.immediateFuture(
new ByteArrayInputStream(
- ServerTestHelper.MAPPER
+ MAPPER
.writerWithType(HttpLoadQueuePeon.RESPONSE_ENTITY_TYPE_REF)
.writeValueAsBytes(statuses)
)
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentLoadingHttpClient.java
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentLoadingHttpClient.java
index 0b91e700902..f7007bfd833 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentLoadingHttpClient.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentLoadingHttpClient.java
@@ -30,7 +30,8 @@ import
org.apache.druid.java.util.http.client.response.HttpResponseHandler;
import org.apache.druid.server.coordination.DataSegmentChangeCallback;
import org.apache.druid.server.coordination.DataSegmentChangeHandler;
import org.apache.druid.server.coordination.DataSegmentChangeRequest;
-import org.apache.druid.server.coordination.SegmentLoadDropHandler;
+import org.apache.druid.server.coordination.DataSegmentChangeResponse;
+import org.apache.druid.server.coordination.SegmentChangeStatus;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
import org.jboss.netty.handler.codec.http.HttpResponse;
@@ -126,7 +127,7 @@ public class TestSegmentLoadingHttpClient implements
HttpClient
/**
* Processes all the changes in the request.
*/
- private List<SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus>
processRequest(
+ private List<DataSegmentChangeResponse> processRequest(
Request request,
DataSegmentChangeHandler changeHandler
) throws IOException
@@ -147,21 +148,20 @@ public class TestSegmentLoadingHttpClient implements
HttpClient
/**
* Processes each DataSegmentChangeRequest using the handler.
*/
- private SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus
processRequest(
+ private DataSegmentChangeResponse processRequest(
DataSegmentChangeRequest request,
DataSegmentChangeHandler handler
)
{
- SegmentLoadDropHandler.Status status;
+ SegmentChangeStatus status;
try {
request.go(handler, NOOP_CALLBACK);
- status = SegmentLoadDropHandler.Status.SUCCESS;
+ status = SegmentChangeStatus.SUCCESS;
}
catch (Exception e) {
- status = SegmentLoadDropHandler.Status.failed(e.getMessage());
+ status = SegmentChangeStatus.failed(e.getMessage());
}
- return new SegmentLoadDropHandler
- .DataSegmentChangeRequestAndStatus(request, status);
+ return new DataSegmentChangeResponse(request, status);
}
}
diff --git
a/server/src/test/java/org/apache/druid/server/metrics/HistoricalMetricsMonitorTest.java
b/server/src/test/java/org/apache/druid/server/metrics/HistoricalMetricsMonitorTest.java
index 1806903a9ba..92c2a1064e7 100644
---
a/server/src/test/java/org/apache/druid/server/metrics/HistoricalMetricsMonitorTest.java
+++
b/server/src/test/java/org/apache/druid/server/metrics/HistoricalMetricsMonitorTest.java
@@ -74,7 +74,7 @@ public class HistoricalMetricsMonitorTest extends
EasyMockSupport
final String tier = "tier";
EasyMock.expect(druidServerConfig.getMaxSize()).andReturn(maxSize).once();
-
EasyMock.expect(segmentLoadDropMgr.getPendingDeleteSnapshot()).andReturn(ImmutableList.of(dataSegment)).once();
+
EasyMock.expect(segmentLoadDropMgr.getSegmentsToDelete()).andReturn(ImmutableList.of(dataSegment)).once();
EasyMock.expect(druidServerConfig.getTier()).andReturn(tier).once();
EasyMock.expect(druidServerConfig.getPriority()).andReturn(priority).once();
EasyMock.expect(segmentManager.getDataSourceSizes()).andReturn(ImmutableMap.of(dataSource,
size));
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]