This is an automated email from the ASF dual-hosted git repository. hulee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/helix.git
commit a4f5faae717571468afc191cd836e678a0567f3a Author: Yi Wang <[email protected]> AuthorDate: Fri Apr 19 17:27:19 2019 -0700 implementation of CustomRestClient (post request and get health checks) RB=1638858 G=helix-reviewers R=cjerian A=jxue Signed-off-by: Hunter Lee <[email protected]> --- helix-rest/pom.xml | 5 + .../apache/helix/rest/client/CustomRestClient.java | 27 ++-- .../helix/rest/client/CustomRestClientFactory.java | 26 +++- .../helix/rest/client/CustomRestClientImpl.java | 120 ++++++++++++++++- .../helix/rest/server/service/InstanceService.java | 3 +- .../rest/server/service/InstanceServiceImpl.java | 15 ++- .../helix/rest/client/TestCustomRestClient.java | 149 +++++++++++++++++++++ 7 files changed, 320 insertions(+), 25 deletions(-) diff --git a/helix-rest/pom.xml b/helix-rest/pom.xml index 390d578..baaf1f5 100644 --- a/helix-rest/pom.xml +++ b/helix-rest/pom.xml @@ -67,6 +67,11 @@ under the License. <version>3.8.1</version> </dependency> <dependency> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpclient</artifactId> + <version>4.5.8</version> + </dependency> + <dependency> <groupId>org.eclipse.jetty</groupId> <artifactId>jetty-server</artifactId> <version>9.1.0.RC0</version> diff --git a/helix-rest/src/main/java/org/apache/helix/rest/client/CustomRestClient.java b/helix-rest/src/main/java/org/apache/helix/rest/client/CustomRestClient.java index f8ce0e5..afa4ef3 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/client/CustomRestClient.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/client/CustomRestClient.java @@ -19,25 +19,34 @@ package org.apache.helix.rest.client; * under the License. */ +import java.io.IOException; +import java.util.List; import java.util.Map; - /** - * Interface for interacting with client side rest endpoints + * Interacting with participant side to query for its health checks */ public interface CustomRestClient { /** * Get stoppable check result on instance - * + * @param baseUrl the base url of the participant * @param customPayloads generic payloads required from client side and helix only works as proxy - * @return a map where key is custom stoppable check name and boolean value indicates if the check succeeds + * @return a map where key is custom stoppable check name and boolean value indicates if the check + * succeeds + * @throws IOException */ - Map<String, Boolean> getInstanceStoppableCheck(Map<String, String> customPayloads); + Map<String, Boolean> getInstanceStoppableCheck(String baseUrl, Map<String, String> customPayloads) + throws IOException; + /** * Get stoppable check result on partition - * + * @param baseUrl the base url of the participant + * @param partitions a list of partitions maintained by the participant * @param customPayloads generic payloads required from client side and helix only works as proxy - * @return a map where key is custom stoppable check name and boolean value indicates if the check succeeds - */ - Map<String, Boolean> getPartitionStoppableCheck(Map<String, String> customPayloads); + * @return a map where key is partition name and boolean value indicates if the partition is + * healthy + * @throws IOException + */ + Map<String, Boolean> getPartitionStoppableCheck(String baseUrl, List<String> partitions, + Map<String, String> customPayloads) throws IOException; } diff --git a/helix-rest/src/main/java/org/apache/helix/rest/client/CustomRestClientFactory.java b/helix-rest/src/main/java/org/apache/helix/rest/client/CustomRestClientFactory.java index acbfef7..362818c 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/client/CustomRestClientFactory.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/client/CustomRestClientFactory.java @@ -19,17 +19,33 @@ package org.apache.helix.rest.client; * under the License. */ +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * The memory efficient factory to create instances for {@link CustomRestClient} */ public class CustomRestClientFactory { - private static final String INSTANCE_HEALTH_STATUS = "/instanceHealthStatus"; - private static final String PARTITION_HEALTH_STATUS = "/partitionHealthStatus"; + private static final Logger LOG = LoggerFactory.getLogger(CustomRestClientFactory.class); + + private static CustomRestClient INSTANCE = null; - private CustomRestClientFactory() {} + private CustomRestClientFactory() { + } public static CustomRestClient get(String jsonContent) { - //TODO: add implementation - return new CustomRestClientImpl(); + if (INSTANCE == null) { + synchronized (CustomRestClientFactory.class) { + if (INSTANCE == null) { + try { + INSTANCE = new CustomRestClientImpl(); + } catch (Exception e) { + LOG.error("Exception when initializing CustomRestClient", e); + } + } + } + } + + return INSTANCE; } } diff --git a/helix-rest/src/main/java/org/apache/helix/rest/client/CustomRestClientImpl.java b/helix-rest/src/main/java/org/apache/helix/rest/client/CustomRestClientImpl.java index 133a338..0db5a9b 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/client/CustomRestClientImpl.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/client/CustomRestClientImpl.java @@ -19,20 +19,130 @@ package org.apache.helix.rest.client; * under the License. */ +import java.io.IOException; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.stream.Collectors; +import org.apache.http.HttpEntity; +import org.apache.http.HttpResponse; +import org.apache.http.NameValuePair; +import org.apache.http.client.ClientProtocolException; +import org.apache.http.client.HttpClient; +import org.apache.http.client.entity.UrlEncodedFormEntity; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.message.BasicNameValuePair; +import org.apache.http.util.EntityUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; -//TODO: add implementation details class CustomRestClientImpl implements CustomRestClient { + private static final Logger LOG = LoggerFactory.getLogger(CustomRestClient.class); + + // postfix used to append at the end of base url + private static final String INSTANCE_HEALTH_STATUS = "/instanceHealthStatus"; + private static final String PARTITION_HEALTH_STATUS = "/partitionHealthStatus"; + + private static final String IS_HEALTHY_FIELD = "IS_HEALTHY"; + private static final String PARTITIONS = "partitions"; + private static final String ACCEPT_CONTENT_TYPE = "application/json"; + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private HttpClient _httpClient; + + private interface JsonConverter { + Map<String, Boolean> convert(JsonNode jsonNode); + } + + /** + * TODO: create Config to initialize SSLContext for Https endpoint + * Override the constructor if https endpoint is expected + */ + public CustomRestClientImpl() { + _httpClient = HttpClients.createDefault(); + } + + public CustomRestClientImpl(HttpClient httpClient) { + _httpClient = httpClient; + } @Override - public Map<String, Boolean> getInstanceStoppableCheck(Map<String, String> customPayloads) { - return new HashMap<>(); + public Map<String, Boolean> getInstanceStoppableCheck(String baseUrl, + Map<String, String> customPayloads) throws IOException { + // example url: http://<baseUrl>/instanceHealthStatus, assuming the base url already directly + // queries at the instance + String url = baseUrl + INSTANCE_HEALTH_STATUS; + JsonConverter jsonConverter = jsonNode -> { + Map<String, Boolean> result = new HashMap<>(); + jsonNode.fields() + .forEachRemaining(kv -> result.put(kv.getKey(), kv.getValue().asBoolean())); + return result; + }; + return handleResponse(post(url, customPayloads), jsonConverter); } @Override - public Map<String, Boolean> getPartitionStoppableCheck(Map<String, String> customPayloads) { - return new HashMap<>(); + public Map<String, Boolean> getPartitionStoppableCheck(String baseUrl, List<String> partitions, + Map<String, String> customPayloads) throws IOException { + /* + * example url: http://<baseUrl>/partitionHealthStatus -d { + * "partitions" : ["p1", "p3", "p9"], + * "<key>": "<value>", + * ... + * } + */ + String url = baseUrl + PARTITION_HEALTH_STATUS; + // To avoid ImmutableMap as parameter + Map<String, String> payLoads = new HashMap<>(customPayloads); + // Add the entry: "partitions" : ["p1", "p3", "p9"] + payLoads.put(PARTITIONS, partitions.toString()); + JsonConverter jsonConverter = jsonNode -> { + Map<String, Boolean> result = new HashMap<>(); + jsonNode.fields() + .forEachRemaining(kv -> result.put(kv.getKey(), kv.getValue().get(IS_HEALTHY_FIELD).asBoolean())); + return result; + }; + return handleResponse(post(url, payLoads), jsonConverter); + } + + @VisibleForTesting + protected JsonNode getJsonObject(HttpResponse httpResponse) throws IOException { + HttpEntity httpEntity = httpResponse.getEntity(); + String str = EntityUtils.toString(httpEntity); + return OBJECT_MAPPER.readTree(str); + } + + private Map<String, Boolean> handleResponse(HttpResponse httpResponse, + JsonConverter jsonConverter) throws IOException { + int status = httpResponse.getStatusLine().getStatusCode(); + if (status == 200) { + return jsonConverter.convert(getJsonObject(httpResponse)); + } else { + throw new ClientProtocolException("Unexpected response status: " + status + ", reason: " + + httpResponse.getStatusLine().getReasonPhrase()); + } + } + + private HttpResponse post(String url, Map<String, String> payloads) throws IOException { + List<NameValuePair> params = payloads.entrySet().stream() + .map(entry -> new BasicNameValuePair(entry.getKey(), entry.getValue())) + .collect(Collectors.toList()); + try { + HttpPost postRequest = new HttpPost(url); + postRequest.setHeader("Accept", ACCEPT_CONTENT_TYPE); + postRequest.setEntity(new UrlEncodedFormEntity(params, "UTF-8")); + LOG.info("Executing request {}", postRequest.getRequestLine()); + return _httpClient.execute(postRequest); + } catch (IOException e) { + LOG.error("Failed to perform customized health check. Is participant endpoint {} available?", + url, e); + throw e; + } } } diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceService.java b/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceService.java index 471a4ec..3e642cb 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceService.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceService.java @@ -19,6 +19,7 @@ package org.apache.helix.rest.server.service; * under the License. */ +import java.io.IOException; import java.util.Arrays; import java.util.List; import java.util.Map; @@ -97,5 +98,5 @@ public interface InstanceService { List<HealthCheck> healthChecks); StoppableCheck checkSingleInstanceStoppable(String clusterId, String instanceName, - String jsonContent); + String jsonContent) throws IOException; } diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceServiceImpl.java b/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceServiceImpl.java index 8db4d42..a01912a 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceServiceImpl.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceServiceImpl.java @@ -19,6 +19,7 @@ package org.apache.helix.rest.server.service; * under the License. */ +import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -151,15 +152,19 @@ public class InstanceServiceImpl implements InstanceService { @Override public StoppableCheck checkSingleInstanceStoppable(String clusterId, String instanceName, - String jsonContent) { + String jsonContent) throws IOException { // TODO reduce GC by dependency injection Map<String, Boolean> helixStoppableCheck = getInstanceHealthStatus(clusterId, instanceName, InstanceService.HealthCheck.STOPPABLE_CHECK_LIST); CustomRestClient customClient = CustomRestClientFactory.get(jsonContent); - // TODO add the json content parse logic - Map<String, Boolean> customStoppableCheck = - customClient.getInstanceStoppableCheck(Collections.emptyMap()); - return StoppableCheck.mergeStoppableChecks(helixStoppableCheck, customStoppableCheck); + try { + Map<String, Boolean> customStoppableCheck = + customClient.getInstanceStoppableCheck("", Collections.emptyMap()); + return StoppableCheck.mergeStoppableChecks(helixStoppableCheck, customStoppableCheck); + } catch (IOException e) { + LOG.error("Failed to perform customized health check for {}/{}", clusterId, instanceName, e); + throw e; + } } public PartitionHealth generatePartitionHealthMapFromZK() { diff --git a/helix-rest/src/test/java/org/apache/helix/rest/client/TestCustomRestClient.java b/helix-rest/src/test/java/org/apache/helix/rest/client/TestCustomRestClient.java new file mode 100644 index 0000000..c43578b --- /dev/null +++ b/helix-rest/src/test/java/org/apache/helix/rest/client/TestCustomRestClient.java @@ -0,0 +1,149 @@ +package org.apache.helix.rest.client; + +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.*; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; + +import org.apache.http.HttpResponse; +import org.apache.http.HttpStatus; +import org.apache.http.StatusLine; +import org.apache.http.client.ClientProtocolException; +import org.apache.http.client.HttpClient; +import org.apache.http.client.methods.HttpPost; +import org.junit.Assert; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; + +public class TestCustomRestClient { + private static final String HTTP_LOCALHOST = "http://localhost:1000"; + @Mock + HttpClient _httpClient; + + @BeforeMethod + public void init() { + MockitoAnnotations.initMocks(this); + } + + @Test + public void testGetInstanceStoppableCheck() throws IOException { + MockCustomRestClient customRestClient = new MockCustomRestClient(_httpClient); + String jsonResponse = "{\n" + " \"check1\": \"false\",\n" + " \"check2\": \"true\"\n" + "}"; + + HttpResponse httpResponse = mock(HttpResponse.class); + StatusLine statusLine = mock(StatusLine.class); + + when(statusLine.getStatusCode()).thenReturn(HttpStatus.SC_OK); + when(httpResponse.getStatusLine()).thenReturn(statusLine); + customRestClient.setJsonResponse(jsonResponse); + when(_httpClient.execute(any(HttpPost.class))).thenReturn(httpResponse); + + Map<String, Boolean> healthCheck = + customRestClient.getInstanceStoppableCheck(HTTP_LOCALHOST, Collections.emptyMap()); + Assert.assertFalse(healthCheck.get("check1")); + Assert.assertTrue(healthCheck.get("check2")); + } + + @Test(expectedExceptions = ClientProtocolException.class) + public void testGetInstanceStoppableCheck_when_url_404() throws IOException { + MockCustomRestClient customRestClient = new MockCustomRestClient(_httpClient); + HttpResponse httpResponse = mock(HttpResponse.class); + StatusLine statusLine = mock(StatusLine.class); + + when(statusLine.getStatusCode()).thenReturn(HttpStatus.SC_NOT_FOUND); + when(httpResponse.getStatusLine()).thenReturn(statusLine); + when(_httpClient.execute(any(HttpPost.class))).thenReturn(httpResponse); + + customRestClient.getInstanceStoppableCheck(HTTP_LOCALHOST, Collections.emptyMap()); + } + + @Test(expectedExceptions = IOException.class) + public void testGetInstanceStoppableCheck_when_response_empty() throws IOException { + MockCustomRestClient customRestClient = new MockCustomRestClient(_httpClient); + HttpResponse httpResponse = mock(HttpResponse.class); + StatusLine statusLine = mock(StatusLine.class); + + when(statusLine.getStatusCode()).thenReturn(HttpStatus.SC_NOT_FOUND); + when(httpResponse.getStatusLine()).thenReturn(statusLine); + when(_httpClient.execute(any(HttpPost.class))).thenReturn(httpResponse); + customRestClient.setJsonResponse(""); + + customRestClient.getInstanceStoppableCheck(HTTP_LOCALHOST, Collections.emptyMap()); + } + + @Test + public void testGetPartitionStoppableCheck() throws IOException { + MockCustomRestClient customRestClient = new MockCustomRestClient(_httpClient); + String jsonResponse = "\n" + "{\n" + " \"db1\": {\n" + " \"IS_HEALTHY\": \"false\"\n" + + " },\n" + " \"db0\": {\n" + " \"IS_HEALTHY\": \"true\"\n" + " }\n" + "}"; + + HttpResponse httpResponse = mock(HttpResponse.class); + StatusLine statusLine = mock(StatusLine.class); + + when(statusLine.getStatusCode()).thenReturn(HttpStatus.SC_OK); + when(httpResponse.getStatusLine()).thenReturn(statusLine); + customRestClient.setJsonResponse(jsonResponse); + when(_httpClient.execute(any(HttpPost.class))).thenReturn(httpResponse); + + Map<String, Boolean> partitionHealth = customRestClient.getPartitionStoppableCheck(HTTP_LOCALHOST, + ImmutableList.of("db0", "db1"), Collections.emptyMap()); + + Assert.assertTrue(partitionHealth.get("db0")); + Assert.assertFalse(partitionHealth.get("db1")); + } + + @Test(expectedExceptions = ClientProtocolException.class) + public void testGetPartitionStoppableCheck_when_url_404() throws IOException { + MockCustomRestClient customRestClient = new MockCustomRestClient(_httpClient); + + HttpResponse httpResponse = mock(HttpResponse.class); + StatusLine statusLine = mock(StatusLine.class); + + when(statusLine.getStatusCode()).thenReturn(HttpStatus.SC_NOT_FOUND); + when(httpResponse.getStatusLine()).thenReturn(statusLine); + when(_httpClient.execute(any(HttpPost.class))).thenReturn(httpResponse); + + customRestClient.getPartitionStoppableCheck(HTTP_LOCALHOST, + ImmutableList.of("db0", "db1"), Collections.emptyMap()); + } + + @Test(expectedExceptions = IOException.class) + public void testGetPartitionStoppableCheck_when_response_empty() throws IOException { + MockCustomRestClient customRestClient = new MockCustomRestClient(_httpClient); + HttpResponse httpResponse = mock(HttpResponse.class); + StatusLine statusLine = mock(StatusLine.class); + + when(statusLine.getStatusCode()).thenReturn(HttpStatus.SC_NOT_FOUND); + when(httpResponse.getStatusLine()).thenReturn(statusLine); + when(_httpClient.execute(any(HttpPost.class))).thenReturn(httpResponse); + customRestClient.setJsonResponse(""); + + customRestClient.getPartitionStoppableCheck(HTTP_LOCALHOST, + ImmutableList.of("db0", "db1"), Collections.emptyMap()); + } + + private class MockCustomRestClient extends CustomRestClientImpl { + private String _jsonResponse = ""; + + MockCustomRestClient(HttpClient mockHttpClient) { + super(mockHttpClient); + } + + void setJsonResponse(String response) { + _jsonResponse = response; + } + + @Override + protected JsonNode getJsonObject(HttpResponse httpResponse) throws IOException { + return new ObjectMapper().readTree(_jsonResponse); + } + } +}
