This is an automated email from the ASF dual-hosted git repository.

abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 4d8015578dd Remove unused WorkerManagerClient interface. (#17073)
4d8015578dd is described below

commit 4d8015578dd19b3b8ba5642890145a552720b0a6
Author: Gian Merlino <[email protected]>
AuthorDate: Mon Sep 16 05:30:47 2024 -0700

    Remove unused WorkerManagerClient interface. (#17073)
---
 .../apache/druid/msq/exec/WorkerManagerClient.java |  57 -----------
 .../client/IndexerWorkerManagerClient.java         | 105 ---------------------
 .../client/IndexerWorkerManagerClientTest.java     | 104 --------------------
 3 files changed, 266 deletions(-)

diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerManagerClient.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerManagerClient.java
deleted file mode 100644
index 415c93a8599..00000000000
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerManagerClient.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.msq.exec;
-
-import org.apache.druid.indexer.TaskLocation;
-import org.apache.druid.indexer.TaskStatus;
-import org.apache.druid.msq.indexing.MSQWorkerTask;
-
-import java.io.Closeable;
-import java.util.Map;
-import java.util.Set;
-
-/**
- * Generic interface to the "worker manager" mechanism which starts, cancels 
and monitors worker tasks.
- */
-public interface WorkerManagerClient extends Closeable
-{
-  String run(String taskId, MSQWorkerTask task);
-
-  /**
-   * @param workerId the task ID
-   *
-   * @return a {@code TaskLocation} associated with the task or
-   * {@code TaskLocation.unknown()} if no associated entry could be found
-   */
-  TaskLocation location(String workerId);
-
-  /**
-   * Fetches status map corresponding to a group of task ids
-   */
-  Map<String, TaskStatus> statuses(Set<String> taskIds);
-
-  /**
-   * Cancel the task corresponding to the provided workerId
-   */
-  void cancel(String workerId);
-
-  @Override
-  void close();
-}
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/client/IndexerWorkerManagerClient.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/client/IndexerWorkerManagerClient.java
deleted file mode 100644
index 927130e0ca7..00000000000
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/client/IndexerWorkerManagerClient.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.msq.indexing.client;
-
-import com.google.common.collect.ImmutableSet;
-import org.apache.druid.client.indexing.TaskStatusResponse;
-import org.apache.druid.common.guava.FutureUtils;
-import org.apache.druid.indexer.TaskLocation;
-import org.apache.druid.indexer.TaskStatus;
-import org.apache.druid.msq.exec.WorkerManagerClient;
-import org.apache.druid.msq.indexing.MSQWorkerTask;
-import org.apache.druid.rpc.indexing.OverlordClient;
-
-import java.util.Map;
-import java.util.Set;
-
-/**
- * Worker manager client backed by the Indexer service. Glues together
- * three different mechanisms to provide the single multi-stage query 
interface.
- */
-public class IndexerWorkerManagerClient implements WorkerManagerClient
-{
-  private final OverlordClient overlordClient;
-  private final TaskLocationFetcher locationFetcher = new 
TaskLocationFetcher();
-
-  public IndexerWorkerManagerClient(final OverlordClient overlordClient)
-  {
-    this.overlordClient = overlordClient;
-  }
-
-  @Override
-  public String run(String taskId, MSQWorkerTask task)
-  {
-    FutureUtils.getUnchecked(overlordClient.runTask(taskId, task), true);
-    return taskId;
-  }
-
-  @Override
-  public void cancel(String taskId)
-  {
-    FutureUtils.getUnchecked(overlordClient.cancelTask(taskId), true);
-  }
-
-  @Override
-  public Map<String, TaskStatus> statuses(Set<String> taskIds)
-  {
-    return FutureUtils.getUnchecked(overlordClient.taskStatuses(taskIds), 
true);
-  }
-
-  @Override
-  public TaskLocation location(String workerId)
-  {
-    return locationFetcher.getLocation(workerId);
-  }
-
-  @Override
-  public void close()
-  {
-    // Nothing to do. The OverlordServiceClient is closed by the JVM lifecycle.
-  }
-
-  private class TaskLocationFetcher
-  {
-    TaskLocation getLocation(String workerId)
-    {
-      final TaskStatus taskStatus = FutureUtils.getUnchecked(
-          overlordClient.taskStatuses(ImmutableSet.of(workerId)),
-          true
-      ).get(workerId);
-
-      if (taskStatus != null
-          && !TaskLocation.unknown().equals(taskStatus.getLocation())) {
-        return taskStatus.getLocation();
-      }
-
-      // Retry with the single status API
-      final TaskStatusResponse statusResponse = FutureUtils.getUnchecked(
-          overlordClient.taskStatus(workerId),
-          true
-      );
-      if (statusResponse == null || statusResponse.getStatus() == null) {
-        return TaskLocation.unknown();
-      } else {
-        return statusResponse.getStatus().getLocation();
-      }
-    }
-  }
-}
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/client/IndexerWorkerManagerClientTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/client/IndexerWorkerManagerClientTest.java
deleted file mode 100644
index 4b53420cbb9..00000000000
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/client/IndexerWorkerManagerClientTest.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.msq.indexing.client;
-
-import com.google.common.util.concurrent.Futures;
-import org.apache.druid.client.indexing.TaskStatusResponse;
-import org.apache.druid.indexer.TaskLocation;
-import org.apache.druid.indexer.TaskState;
-import org.apache.druid.indexer.TaskStatus;
-import org.apache.druid.indexer.TaskStatusPlus;
-import org.apache.druid.java.util.common.DateTimes;
-import org.apache.druid.rpc.indexing.OverlordClient;
-import org.junit.Assert;
-import org.junit.Test;
-import org.mockito.ArgumentMatchers;
-import org.mockito.Mockito;
-
-import java.util.Collections;
-
-public class IndexerWorkerManagerClientTest
-{
-
-  @Test
-  public void testGetLocationCallsMultiStatusApiByDefault()
-  {
-    final OverlordClient overlordClient = Mockito.mock(OverlordClient.class);
-
-    final String taskId = "worker1";
-    final TaskLocation expectedLocation = new TaskLocation("localhost", 1000, 
1100, null);
-    
Mockito.when(overlordClient.taskStatuses(Collections.singleton(taskId))).thenReturn(
-        Futures.immediateFuture(
-            Collections.singletonMap(
-                taskId,
-                new TaskStatus(taskId, TaskState.RUNNING, 100L, null, 
expectedLocation)
-            )
-        )
-    );
-
-    final IndexerWorkerManagerClient managerClient = new 
IndexerWorkerManagerClient(overlordClient);
-    Assert.assertEquals(managerClient.location(taskId), expectedLocation);
-
-    Mockito.verify(overlordClient, 
Mockito.times(1)).taskStatuses(ArgumentMatchers.anySet());
-    Mockito.verify(overlordClient, 
Mockito.never()).taskStatus(ArgumentMatchers.anyString());
-  }
-
-  @Test
-  public void testGetLocationFallsBackToSingleTaskApiIfLocationIsUnknown()
-  {
-    final OverlordClient overlordClient = Mockito.mock(OverlordClient.class);
-
-    final String taskId = "worker1";
-    
Mockito.when(overlordClient.taskStatuses(Collections.singleton(taskId))).thenReturn(
-        Futures.immediateFuture(
-            Collections.singletonMap(
-                taskId,
-                new TaskStatus(taskId, TaskState.RUNNING, 100L, null, 
TaskLocation.unknown())
-            )
-        )
-    );
-
-    final TaskLocation expectedLocation = new TaskLocation("localhost", 1000, 
1100, null);
-    final TaskStatusPlus taskStatus = new TaskStatusPlus(
-        taskId,
-        null,
-        null,
-        DateTimes.nowUtc(),
-        DateTimes.nowUtc(),
-        TaskState.RUNNING,
-        null,
-        100L,
-        expectedLocation,
-        "wiki",
-        null
-    );
-
-    Mockito.when(overlordClient.taskStatus(taskId)).thenReturn(
-        Futures.immediateFuture(new TaskStatusResponse(taskId, taskStatus))
-    );
-
-    final IndexerWorkerManagerClient managerClient = new 
IndexerWorkerManagerClient(overlordClient);
-    Assert.assertEquals(managerClient.location(taskId), expectedLocation);
-
-    Mockito.verify(overlordClient, 
Mockito.times(1)).taskStatuses(ArgumentMatchers.anySet());
-    Mockito.verify(overlordClient, 
Mockito.times(1)).taskStatus(ArgumentMatchers.anyString());
-  }
-
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to