This is an automated email from the ASF dual-hosted git repository.
abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 4d8015578dd Remove unused WorkerManagerClient interface. (#17073)
4d8015578dd is described below
commit 4d8015578dd19b3b8ba5642890145a552720b0a6
Author: Gian Merlino <[email protected]>
AuthorDate: Mon Sep 16 05:30:47 2024 -0700
Remove unused WorkerManagerClient interface. (#17073)
---
.../apache/druid/msq/exec/WorkerManagerClient.java | 57 -----------
.../client/IndexerWorkerManagerClient.java | 105 ---------------------
.../client/IndexerWorkerManagerClientTest.java | 104 --------------------
3 files changed, 266 deletions(-)
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerManagerClient.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerManagerClient.java
deleted file mode 100644
index 415c93a8599..00000000000
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerManagerClient.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.msq.exec;
-
-import org.apache.druid.indexer.TaskLocation;
-import org.apache.druid.indexer.TaskStatus;
-import org.apache.druid.msq.indexing.MSQWorkerTask;
-
-import java.io.Closeable;
-import java.util.Map;
-import java.util.Set;
-
-/**
- * Generic interface to the "worker manager" mechanism which starts, cancels
and monitors worker tasks.
- */
-public interface WorkerManagerClient extends Closeable
-{
- String run(String taskId, MSQWorkerTask task);
-
- /**
- * @param workerId the task ID
- *
- * @return a {@code TaskLocation} associated with the task or
- * {@code TaskLocation.unknown()} if no associated entry could be found
- */
- TaskLocation location(String workerId);
-
- /**
- * Fetches status map corresponding to a group of task ids
- */
- Map<String, TaskStatus> statuses(Set<String> taskIds);
-
- /**
- * Cancel the task corresponding to the provided workerId
- */
- void cancel(String workerId);
-
- @Override
- void close();
-}
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/client/IndexerWorkerManagerClient.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/client/IndexerWorkerManagerClient.java
deleted file mode 100644
index 927130e0ca7..00000000000
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/client/IndexerWorkerManagerClient.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.msq.indexing.client;
-
-import com.google.common.collect.ImmutableSet;
-import org.apache.druid.client.indexing.TaskStatusResponse;
-import org.apache.druid.common.guava.FutureUtils;
-import org.apache.druid.indexer.TaskLocation;
-import org.apache.druid.indexer.TaskStatus;
-import org.apache.druid.msq.exec.WorkerManagerClient;
-import org.apache.druid.msq.indexing.MSQWorkerTask;
-import org.apache.druid.rpc.indexing.OverlordClient;
-
-import java.util.Map;
-import java.util.Set;
-
-/**
- * Worker manager client backed by the Indexer service. Glues together
- * three different mechanisms to provide the single multi-stage query
interface.
- */
-public class IndexerWorkerManagerClient implements WorkerManagerClient
-{
- private final OverlordClient overlordClient;
- private final TaskLocationFetcher locationFetcher = new
TaskLocationFetcher();
-
- public IndexerWorkerManagerClient(final OverlordClient overlordClient)
- {
- this.overlordClient = overlordClient;
- }
-
- @Override
- public String run(String taskId, MSQWorkerTask task)
- {
- FutureUtils.getUnchecked(overlordClient.runTask(taskId, task), true);
- return taskId;
- }
-
- @Override
- public void cancel(String taskId)
- {
- FutureUtils.getUnchecked(overlordClient.cancelTask(taskId), true);
- }
-
- @Override
- public Map<String, TaskStatus> statuses(Set<String> taskIds)
- {
- return FutureUtils.getUnchecked(overlordClient.taskStatuses(taskIds),
true);
- }
-
- @Override
- public TaskLocation location(String workerId)
- {
- return locationFetcher.getLocation(workerId);
- }
-
- @Override
- public void close()
- {
- // Nothing to do. The OverlordServiceClient is closed by the JVM lifecycle.
- }
-
- private class TaskLocationFetcher
- {
- TaskLocation getLocation(String workerId)
- {
- final TaskStatus taskStatus = FutureUtils.getUnchecked(
- overlordClient.taskStatuses(ImmutableSet.of(workerId)),
- true
- ).get(workerId);
-
- if (taskStatus != null
- && !TaskLocation.unknown().equals(taskStatus.getLocation())) {
- return taskStatus.getLocation();
- }
-
- // Retry with the single status API
- final TaskStatusResponse statusResponse = FutureUtils.getUnchecked(
- overlordClient.taskStatus(workerId),
- true
- );
- if (statusResponse == null || statusResponse.getStatus() == null) {
- return TaskLocation.unknown();
- } else {
- return statusResponse.getStatus().getLocation();
- }
- }
- }
-}
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/client/IndexerWorkerManagerClientTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/client/IndexerWorkerManagerClientTest.java
deleted file mode 100644
index 4b53420cbb9..00000000000
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/client/IndexerWorkerManagerClientTest.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.msq.indexing.client;
-
-import com.google.common.util.concurrent.Futures;
-import org.apache.druid.client.indexing.TaskStatusResponse;
-import org.apache.druid.indexer.TaskLocation;
-import org.apache.druid.indexer.TaskState;
-import org.apache.druid.indexer.TaskStatus;
-import org.apache.druid.indexer.TaskStatusPlus;
-import org.apache.druid.java.util.common.DateTimes;
-import org.apache.druid.rpc.indexing.OverlordClient;
-import org.junit.Assert;
-import org.junit.Test;
-import org.mockito.ArgumentMatchers;
-import org.mockito.Mockito;
-
-import java.util.Collections;
-
-public class IndexerWorkerManagerClientTest
-{
-
- @Test
- public void testGetLocationCallsMultiStatusApiByDefault()
- {
- final OverlordClient overlordClient = Mockito.mock(OverlordClient.class);
-
- final String taskId = "worker1";
- final TaskLocation expectedLocation = new TaskLocation("localhost", 1000,
1100, null);
-
Mockito.when(overlordClient.taskStatuses(Collections.singleton(taskId))).thenReturn(
- Futures.immediateFuture(
- Collections.singletonMap(
- taskId,
- new TaskStatus(taskId, TaskState.RUNNING, 100L, null,
expectedLocation)
- )
- )
- );
-
- final IndexerWorkerManagerClient managerClient = new
IndexerWorkerManagerClient(overlordClient);
- Assert.assertEquals(managerClient.location(taskId), expectedLocation);
-
- Mockito.verify(overlordClient,
Mockito.times(1)).taskStatuses(ArgumentMatchers.anySet());
- Mockito.verify(overlordClient,
Mockito.never()).taskStatus(ArgumentMatchers.anyString());
- }
-
- @Test
- public void testGetLocationFallsBackToSingleTaskApiIfLocationIsUnknown()
- {
- final OverlordClient overlordClient = Mockito.mock(OverlordClient.class);
-
- final String taskId = "worker1";
-
Mockito.when(overlordClient.taskStatuses(Collections.singleton(taskId))).thenReturn(
- Futures.immediateFuture(
- Collections.singletonMap(
- taskId,
- new TaskStatus(taskId, TaskState.RUNNING, 100L, null,
TaskLocation.unknown())
- )
- )
- );
-
- final TaskLocation expectedLocation = new TaskLocation("localhost", 1000,
1100, null);
- final TaskStatusPlus taskStatus = new TaskStatusPlus(
- taskId,
- null,
- null,
- DateTimes.nowUtc(),
- DateTimes.nowUtc(),
- TaskState.RUNNING,
- null,
- 100L,
- expectedLocation,
- "wiki",
- null
- );
-
- Mockito.when(overlordClient.taskStatus(taskId)).thenReturn(
- Futures.immediateFuture(new TaskStatusResponse(taskId, taskStatus))
- );
-
- final IndexerWorkerManagerClient managerClient = new
IndexerWorkerManagerClient(overlordClient);
- Assert.assertEquals(managerClient.location(taskId), expectedLocation);
-
- Mockito.verify(overlordClient,
Mockito.times(1)).taskStatuses(ArgumentMatchers.anySet());
- Mockito.verify(overlordClient,
Mockito.times(1)).taskStatus(ArgumentMatchers.anyString());
- }
-
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]