This is an automated email from the ASF dual-hosted git repository. xiangfu pushed a commit to branch adding-minion-clients in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 35e781224a08fd7b32cee67f5c7d24a186998e76 Author: Xiang Fu <[email protected]> AuthorDate: Tue Dec 8 16:22:05 2020 -0800 Adding minion client --- .../apache/pinot/common/minion/MinionClient.java | 115 +++++++++++++++++++++ .../common/minion/MinionRequestURLBuilder.java | 83 +++++++++++++++ .../pinot/common/minion/MinionClientTest.java | 91 ++++++++++++++++ 3 files changed, 289 insertions(+) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/minion/MinionClient.java b/pinot-common/src/main/java/org/apache/pinot/common/minion/MinionClient.java new file mode 100644 index 0000000..88bcc39 --- /dev/null +++ b/pinot-common/src/main/java/org/apache/pinot/common/minion/MinionClient.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.minion; + +import java.io.IOException; +import java.util.Map; +import org.apache.commons.httpclient.HttpException; +import org.apache.commons.io.IOUtils; +import org.apache.http.HttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClientBuilder; +import org.apache.pinot.spi.utils.JsonUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * MinionClient is the client-side APIs for Pinot Controller tasks APIs. + */ +public class MinionClient { + + private static final Logger LOGGER = LoggerFactory.getLogger(MinionClient.class); + private static final CloseableHttpClient HTTP_CLIENT = HttpClientBuilder.create().build(); + private static final String ACCEPT = "accept"; + private static final String APPLICATION_JSON = "application/json"; + private static final String HTTP = "http"; + + private final String _controllerUrl; + + public MinionClient(String controllerHost, String controllerPort) { + this(HTTP, controllerHost, controllerPort); + } + + public MinionClient(String scheme, String controllerHost, String controllerPort) { + this(String.format("%s://%s:%s", scheme, controllerHost, controllerPort)); + } + + public MinionClient(String controllerUrl) { + _controllerUrl = controllerUrl; + } + + public String getControllerUrl() { + return _controllerUrl; + } + + public Map<String, String> scheduleMinionTasks() + throws IOException { + HttpPost httpPost = createHttpPostRequest(MinionRequestURLBuilder.baseUrl(getControllerUrl()).forTaskSchedule()); + HttpResponse response = HTTP_CLIENT.execute(httpPost); + int statusCode = response.getStatusLine().getStatusCode(); + final String responseString = IOUtils.toString(response.getEntity().getContent()); + if (statusCode >= 400) { + throw new HttpException(String + .format("Unable to schedule minion tasks. Error code %d, Error message: %s", statusCode, responseString)); + } + return JsonUtils.stringToObject(responseString, Map.class); + } + + public Map<String, String> getTasksStates(String taskType) + throws IOException { + HttpGet httpGet = + createHttpGetRequest(MinionRequestURLBuilder.baseUrl(getControllerUrl()).forTasksStates(taskType)); + HttpResponse response = HTTP_CLIENT.execute(httpGet); + int statusCode = response.getStatusLine().getStatusCode(); + final String responseString = IOUtils.toString(response.getEntity().getContent()); + if (statusCode >= 400) { + throw new HttpException(String + .format("Unable to get tasks states map. Error code %d, Error message: %s", statusCode, responseString)); + } + return JsonUtils.stringToObject(responseString, Map.class); + } + + public String getTaskState(String taskName) + throws IOException { + HttpGet httpGet = createHttpGetRequest(MinionRequestURLBuilder.baseUrl(getControllerUrl()).forTaskState(taskName)); + HttpResponse response = HTTP_CLIENT.execute(httpGet); + int statusCode = response.getStatusLine().getStatusCode(); + String responseString = IOUtils.toString(response.getEntity().getContent()); + if (statusCode >= 400) { + throw new HttpException( + String.format("Unable to get task state. Error code %d, Error message: %s", statusCode, responseString)); + } + return responseString; + } + + private HttpGet createHttpGetRequest(String uri) { + HttpGet httpGet = new HttpGet(uri); + httpGet.setHeader(ACCEPT, APPLICATION_JSON); + return httpGet; + } + + private HttpPost createHttpPostRequest(String uri) { + HttpPost httpPost = new HttpPost(uri); + httpPost.setHeader(ACCEPT, APPLICATION_JSON); + return httpPost; + } +} diff --git a/pinot-common/src/main/java/org/apache/pinot/common/minion/MinionRequestURLBuilder.java b/pinot-common/src/main/java/org/apache/pinot/common/minion/MinionRequestURLBuilder.java new file mode 100644 index 0000000..4432bc9 --- /dev/null +++ b/pinot-common/src/main/java/org/apache/pinot/common/minion/MinionRequestURLBuilder.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.minion; + +import org.apache.commons.lang3.StringUtils; +import org.apache.pinot.common.utils.StringUtil; + + +/** + * MinionRequestURLBuilder is the helper class to generate URLs for task APIs. + */ +public class MinionRequestURLBuilder { + + private final String _baseUrl; + + private MinionRequestURLBuilder(String baseUrl) { + _baseUrl = StringUtils.removeEnd(baseUrl, "/"); + } + + public static MinionRequestURLBuilder baseUrl(String baseUrl) { + return new MinionRequestURLBuilder(baseUrl); + } + + public String forTaskSchedule() { + return StringUtil.join("/", _baseUrl, "tasks/schedule"); + } + + public String forListAllTasks(String taskType) { + return StringUtil.join("/", _baseUrl, "tasks", taskType, "tasks"); + } + + public String forListAllTaskTypes() { + return StringUtil.join("/", _baseUrl, "tasks/tasktypes"); + } + + public String forTaskTypeState(String taskType) { + return StringUtil.join("/", _baseUrl, "tasks", taskType, "state"); + } + + public String forTasksStates(String taskType) { + return StringUtil.join("/", _baseUrl, "tasks", taskType, "taskstates"); + } + + public String forTaskState(String taskName) { + return StringUtil.join("/", _baseUrl, "tasks/task", taskName, "state"); + } + + public String forTaskConfig(String taskName) { + return StringUtil.join("/", _baseUrl, "tasks/task", taskName, "config"); + } + + public String forTaskTypeCleanup(String taskType) { + return StringUtil.join("/", _baseUrl, "tasks", taskType, "clenaup"); + } + + public String forTaskTypeStop(String taskType) { + return StringUtil.join("/", _baseUrl, "tasks", taskType, "stop"); + } + + public String forTaskTypeResume(String taskType) { + return StringUtil.join("/", _baseUrl, "tasks", taskType, "resume"); + } + + public String forTaskTypeDelete(String taskType) { + return StringUtil.join("/", _baseUrl, "tasks", taskType); + } +} diff --git a/pinot-common/src/test/java/org/apache/pinot/common/minion/MinionClientTest.java b/pinot-common/src/test/java/org/apache/pinot/common/minion/MinionClientTest.java new file mode 100644 index 0000000..869dde1 --- /dev/null +++ b/pinot-common/src/test/java/org/apache/pinot/common/minion/MinionClientTest.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.minion; + +import com.sun.net.httpserver.HttpHandler; +import com.sun.net.httpserver.HttpServer; +import java.io.IOException; +import java.io.OutputStream; +import java.net.InetSocketAddress; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.Assert; +import org.testng.annotations.Test; + + +public class MinionClientTest { + private static final Logger LOGGER = LoggerFactory.getLogger(MinionClientTest.class); + + private HttpHandler createHandler(int status, String msg, int sleepTimeMs) { + return httpExchange -> { + if (sleepTimeMs > 0) { + try { + Thread.sleep(sleepTimeMs); + } catch (InterruptedException e) { + LOGGER.warn("Handler interrupted during sleep"); + } + } + httpExchange.sendResponseHeaders(status, msg.length()); + OutputStream responseBody = httpExchange.getResponseBody(); + responseBody.write(msg.getBytes()); + responseBody.close(); + }; + } + + private HttpServer startServer(int port, String path, HttpHandler handler) + throws IOException { + HttpServer server = HttpServer.create(new InetSocketAddress(port), 0); + server.createContext(path, handler); + new Thread(() -> server.start()).start(); + return server; + } + + @Test + public void testTaskSchedule() + throws IOException { + HttpServer httpServer = startServer(14202, "/tasks/schedule", + createHandler(200, "{\"SegmentGenerationAndPushTask\":\"Task_SegmentGenerationAndPushTask_1607470525615\"}", + 0)); + MinionClient minionClient = new MinionClient("localhost", "14202"); + Assert.assertEquals(minionClient.scheduleMinionTasks().get("SegmentGenerationAndPushTask"), + "Task_SegmentGenerationAndPushTask_1607470525615"); + httpServer.stop(0); + } + + @Test + public void testTasksStates() + throws IOException { + HttpServer httpServer = startServer(14203, "/tasks/SegmentGenerationAndPushTask/taskstates", + createHandler(200, "{\"Task_SegmentGenerationAndPushTask_1607470525615\":\"IN_PROGRESS\"}", 0)); + MinionClient minionClient = new MinionClient("http", "localhost", "14203"); + Assert.assertEquals(minionClient.getTasksStates("SegmentGenerationAndPushTask") + .get("Task_SegmentGenerationAndPushTask_1607470525615"), "IN_PROGRESS"); + httpServer.stop(0); + } + + @Test + public void testTaskState() + throws IOException { + HttpServer httpServer = startServer(14204, "/tasks/task/Task_SegmentGenerationAndPushTask_1607470525615/state", + createHandler(200, "\"COMPLETED\"", 0)); + MinionClient minionClient = new MinionClient("http://localhost:14204"); + Assert.assertEquals(minionClient.getTaskState("Task_SegmentGenerationAndPushTask_1607470525615"), "\"COMPLETED\""); + httpServer.stop(0); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
