Repository: eagle Updated Branches: refs/heads/master 93f83f4a9 -> 3fe637eb5
http://git-wip-us.apache.org/repos/asf/eagle/blob/3fe637eb/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/RMResourceFetcher.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/RMResourceFetcher.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/RMResourceFetcher.java index 225ede9..e1991b0 100644 --- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/RMResourceFetcher.java +++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/RMResourceFetcher.java @@ -19,6 +19,8 @@ */ package org.apache.eagle.jpm.util.resourcefetch; +import org.apache.commons.lang3.StringUtils; +import org.apache.eagle.common.DateTimeUtil; import org.apache.eagle.jpm.util.Constants; import org.apache.eagle.jpm.util.resourcefetch.connection.InputStreamUtils; import org.apache.eagle.jpm.util.resourcefetch.ha.HAURLSelector; @@ -27,25 +29,24 @@ import org.apache.eagle.jpm.util.resourcefetch.model.AppInfo; import org.apache.eagle.jpm.util.resourcefetch.model.AppsWrapper; import org.apache.eagle.jpm.util.resourcefetch.model.ClusterInfo; import org.apache.eagle.jpm.util.resourcefetch.model.ClusterInfoWrapper; -import org.apache.eagle.jpm.util.resourcefetch.url.JobListServiceURLBuilderImpl; -import org.apache.eagle.jpm.util.resourcefetch.url.ServiceURLBuilder; -import org.apache.eagle.jpm.util.resourcefetch.url.SparkCompleteJobServiceURLBuilderImpl; -import org.apache.eagle.jpm.util.resourcefetch.url.URLUtil; +import org.apache.eagle.jpm.util.resourcefetch.url.*; import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.databind.ObjectMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.io.InputStream; +import java.security.InvalidParameterException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; public class RMResourceFetcher implements ResourceFetcher<AppInfo> { private static final Logger LOG = LoggerFactory.getLogger(RMResourceFetcher.class); private final HAURLSelector selector; - private final ServiceURLBuilder jobListServiceURLBuilder; + //private final ServiceURLBuilder jobListServiceURLBuilder; private final ServiceURLBuilder sparkCompleteJobServiceURLBuilder; private static final ObjectMapper OBJ_MAPPER = new ObjectMapper(); @@ -54,31 +55,32 @@ public class RMResourceFetcher implements ResourceFetcher<AppInfo> { } public RMResourceFetcher(String[] rmBasePaths) { - this.jobListServiceURLBuilder = new JobListServiceURLBuilderImpl(); + //this.jobListServiceURLBuilder = new JobListServiceURLBuilderImpl(); this.sparkCompleteJobServiceURLBuilder = new SparkCompleteJobServiceURLBuilderImpl(); - - this.selector = new HAURLSelectorImpl(rmBasePaths, jobListServiceURLBuilder, Constants.CompressionType.GZIP); + this.selector = new HAURLSelectorImpl( + rmBasePaths, + new RmActiveTestURLBuilderImpl(), + Constants.CompressionType.NONE, null); } - private void checkUrl() throws IOException { - if (!selector.checkUrl(jobListServiceURLBuilder.build(selector.getSelectedUrl(), Constants.JobState.RUNNING.name()))) { - selector.reSelectUrl(); - } + public HAURLSelector getSelector() { + return selector; } - private List<AppInfo> doFetchFinishApplicationsList(String urlString, Constants.CompressionType compressionType) throws Exception { - List<AppInfo> result = new ArrayList<>(0); + private List<AppInfo> doFetchApplicationsList(String urlString, Constants.CompressionType compressionType) { + List<AppInfo> result = new ArrayList<>(); InputStream is = null; try { - checkUrl(); - LOG.info("Going to call yarn api to fetch finished application list: " + urlString); + LOG.info("Going to query cluster applications list: " + urlString); is = InputStreamUtils.getInputStream(urlString, null, compressionType); final AppsWrapper appWrapper = OBJ_MAPPER.readValue(is, AppsWrapper.class); if (appWrapper != null && appWrapper.getApps() != null && appWrapper.getApps().getApp() != null) { result = appWrapper.getApps().getApp(); } - return result; + LOG.info("Successfully fetched {} AppInfos from url {}", result.size(), urlString); + } catch (Exception e) { + LOG.error("Fail to query {} due to {}", urlString, e.getMessage()); } finally { if (is != null) { try { @@ -88,21 +90,25 @@ public class RMResourceFetcher implements ResourceFetcher<AppInfo> { } } } + return result; } - private String getSparkRunningJobURL() { - return selector.getSelectedUrl() - + "/" - + Constants.V2_APPS_URL - + "?applicationTypes=SPARK&state=RUNNING&" - + Constants.ANONYMOUS_PARAMETER; - } - - private String getMRRunningJobURL() { - return String.format("%s/%s?applicationTypes=MAPREDUCE&state=RUNNING&%s", - selector.getSelectedUrl(), - Constants.V2_APPS_URL, - Constants.ANONYMOUS_PARAMETER); + public String getRunningJobURL(Constants.JobType jobType, String startTime, String endTime, String limit) { + String condition = ""; + limit = ((limit == null || limit.isEmpty()) ? "" : "&limit=" + limit); + if (startTime == null && endTime == null) { + condition = String.format("applicationTypes=%s%s&", jobType, limit); + } else if (startTime == null) { + condition = String.format("applicationTypes=%s&startedTimeEnd=%s%s&", jobType, endTime, limit); + } else if (endTime == null) { + condition = String.format("applicationTypes=%s&startedTimeBegin=%s%s&", jobType, startTime, limit); + } else { + condition = String.format("applicationTypes=%s&startedTimeBegin=%s&startedTimeEnd=%s%s&", + jobType, startTime, endTime, limit); + } + String url = URLUtil.removeTrailingSlash(selector.getSelectedUrl()); + return String.format("%s/%s?%sstate=RUNNING&%s", url, Constants.V2_APPS_URL, condition, + Constants.ANONYMOUS_PARAMETER); } private String getMRFinishedJobURL(String lastFinishedTime) { @@ -112,40 +118,101 @@ public class RMResourceFetcher implements ResourceFetcher<AppInfo> { + lastFinishedTime + "&" + Constants.ANONYMOUS_PARAMETER; } - private List<AppInfo> doFetchRunningApplicationsList(String urlString, Constants.CompressionType compressionType) throws Exception { - List<AppInfo> result = new ArrayList<>(0); - InputStream is = null; + private String getAcceptedAppURL() { + String baseUrl = URLUtil.removeTrailingSlash(selector.getSelectedUrl()); + return String.format("%s/%s?state=ACCEPTED&%s", baseUrl, Constants.V2_APPS_URL, Constants.ANONYMOUS_PARAMETER); + } + + private List<AppInfo> doFetchRunningApplicationsList(Constants.JobType jobType, + Constants.CompressionType compressionType, + Object... parameter) throws Exception { + Map<String, AppInfo> result = new HashMap(); + List<AppInfo> apps = new ArrayList<>(); try { - checkUrl(); - LOG.info("Going to call yarn api to fetch running application list: " + urlString); - is = InputStreamUtils.getInputStream(urlString, null, compressionType); - final AppsWrapper appWrapper = OBJ_MAPPER.readValue(is, AppsWrapper.class); - if (appWrapper != null && appWrapper.getApps() != null && appWrapper.getApps().getApp() != null) { - result = appWrapper.getApps().getApp(); + selector.checkUrl(); + + String limit = ""; + int requests = 1; + int timeRangePerRequestInMin = 60; + + switch (parameter.length) { + case 0 : + String urlString = getRunningJobURL(jobType, null, null, null); + return doFetchApplicationsList(urlString, compressionType); + case 1 : + limit = String.valueOf(parameter[0]); + break; + case 2 : + limit = String.valueOf(parameter[0]); + requests = (int) parameter[1]; + break; + case 3 : + limit = String.valueOf(parameter[0]); + requests = (int) parameter[1]; + timeRangePerRequestInMin = (int) parameter[2]; + break; + default : + throw new InvalidParameterException("parameter list: limit, requests, requestTimeRange"); } - return result; - } finally { - if (is != null) { - try { - is.close(); - } catch (Exception e) { - LOG.warn("{}", e); - } + + if (requests <= 1) { + String urlString = getRunningJobURL(jobType, null, null, limit); + return doFetchApplicationsList(urlString, compressionType); } + + long interval = timeRangePerRequestInMin * DateTimeUtil.ONEMINUTE; + long currentTime = System.currentTimeMillis() - interval; + + List<String> requestUrls = new ArrayList<>(); + requestUrls.add(getRunningJobURL(jobType, String.valueOf(currentTime), null, limit)); + + for (int cnt = 2; cnt < requests; cnt++) { + long start = currentTime - interval; + requestUrls.add(getRunningJobURL(jobType, String.valueOf(start), String.valueOf(currentTime), limit)); + currentTime -= interval; + } + + requestUrls.add(getRunningJobURL(jobType, null, String.valueOf(currentTime), limit)); + LOG.info("{} requests to fetch running MapReduce applications: \n{}", requestUrls.size(), + StringUtils.join(requestUrls, "\n")); + + requestUrls.forEach(query -> + doFetchApplicationsList(query, compressionType).forEach(app -> result.put(app.getId(), app)) + ); + } catch (Exception e) { + LOG.error("Catch an exception when query url{} : {}", selector.getSelectedUrl(), e.getMessage(), e); + return apps; + } + apps.addAll(result.values()); + return apps; + } + + private List<AppInfo> doFetchAcceptedApplicationList(Constants.CompressionType compressionType, + Object... parameter) throws Exception { + List<AppInfo> apps = new ArrayList<>(); + try { + selector.checkUrl(); + String url = getAcceptedAppURL(); + return doFetchApplicationsList(url, compressionType); + } catch (Exception e) { + LOG.error("Catch an exception when query {} : {}", selector.getSelectedUrl(), e.getMessage(), e); } + return apps; } private List<AppInfo> getResource(Constants.ResourceType resourceType, Constants.CompressionType compressionType, Object... parameter) throws Exception { switch (resourceType) { case COMPLETE_SPARK_JOB: final String urlString = sparkCompleteJobServiceURLBuilder.build(selector.getSelectedUrl(), (String) parameter[0]); - return doFetchFinishApplicationsList(urlString, compressionType); + return doFetchApplicationsList(urlString, compressionType); case RUNNING_SPARK_JOB: - return doFetchRunningApplicationsList(getSparkRunningJobURL(), compressionType); + return doFetchRunningApplicationsList(Constants.JobType.SPARK, compressionType, parameter); case RUNNING_MR_JOB: - return doFetchRunningApplicationsList(getMRRunningJobURL(), compressionType); + return doFetchRunningApplicationsList(Constants.JobType.MAPREDUCE, compressionType, parameter); case COMPLETE_MR_JOB: - return doFetchFinishApplicationsList(getMRFinishedJobURL((String) parameter[0]), compressionType); + return doFetchApplicationsList(getMRFinishedJobURL((String) parameter[0]), compressionType); + case ACCEPTED_JOB: + return doFetchAcceptedApplicationList(compressionType, parameter); default: throw new Exception("Not support resourceType :" + resourceType); } @@ -166,7 +233,7 @@ public class RMResourceFetcher implements ResourceFetcher<AppInfo> { public ClusterInfo getClusterInfo() throws Exception { InputStream is = null; try { - checkUrl(); + selector.checkUrl(); final String urlString = getClusterInfoURL(); LOG.info("Calling yarn api to fetch cluster info: " + urlString); is = InputStreamUtils.getInputStream(urlString, null, Constants.CompressionType.GZIP); http://git-wip-us.apache.org/repos/asf/eagle/blob/3fe637eb/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/SparkHistoryServerResourceFetcher.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/SparkHistoryServerResourceFetcher.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/SparkHistoryServerResourceFetcher.java index 6b0f454..9e776be 100644 --- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/SparkHistoryServerResourceFetcher.java +++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/SparkHistoryServerResourceFetcher.java @@ -53,7 +53,6 @@ public class SparkHistoryServerResourceFetcher implements ResourceFetcher<SparkA this.historyServerURL = historyServerURL; this.sparkDetailJobServiceURLBuilder = new SparkJobServiceURLBuilderImpl(); this.auth = "Basic " + new String(new Base64().encode(String.format("%s:%s", userName, pwd).getBytes())); - ; } private List<SparkApplication> doFetchSparkApplicationDetail(String appId) throws Exception { http://git-wip-us.apache.org/repos/asf/eagle/blob/3fe637eb/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/ha/AbstractURLSelector.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/ha/AbstractURLSelector.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/ha/AbstractURLSelector.java deleted file mode 100644 index 2a99d26..0000000 --- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/ha/AbstractURLSelector.java +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.eagle.jpm.util.resourcefetch.ha; - -import org.apache.eagle.jpm.util.Constants; -import org.apache.eagle.jpm.util.resourcefetch.connection.InputStreamUtils; -import org.apache.hadoop.util.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.io.InputStream; -import java.util.Arrays; - -public abstract class AbstractURLSelector implements HAURLSelector { - private final String[] urls; - private volatile String selectedUrl; - - private volatile boolean reselectInProgress; - private final Constants.CompressionType compressionType; - - private static final long MAX_RETRY_TIME = 3; - private static final Logger LOG = LoggerFactory.getLogger(HAURLSelectorImpl.class); - - public AbstractURLSelector(String[] urls, Constants.CompressionType compressionType) { - this.urls = urls; - this.compressionType = compressionType; - } - - public boolean checkUrl(String urlString) { - InputStream is = null; - try { - is = InputStreamUtils.getInputStream(urlString, null, compressionType); - } catch (Exception ex) { - LOG.info("get input stream from url: " + urlString + " failed. "); - return false; - } finally { - if (is != null) { - try { - is.close(); - } catch (IOException e) { - LOG.warn("{}", e); - } - } - } - return true; - } - - @Override - public String getSelectedUrl() { - if (selectedUrl == null) { - selectedUrl = urls[0]; - } - return selectedUrl; - } - - @Override - public void reSelectUrl() throws IOException { - if (reselectInProgress) { - return; - } - synchronized (this) { - if (reselectInProgress) { - return; - } - reselectInProgress = true; - try { - LOG.info("Going to reselect url"); - for (int i = 0; i < urls.length; i++) { - String urlToCheck = urls[i]; - LOG.info("Going to try url :" + urlToCheck); - for (int time = 0; time < MAX_RETRY_TIME; time++) { - if (checkUrl(buildTestURL(urlToCheck))) { - selectedUrl = urls[i]; - LOG.info("Successfully switch to new url : " + selectedUrl); - return; - } - LOG.info("try url " + urlToCheck + " failed for " + (time + 1) + " times, sleep 5 seconds before try again. "); - try { - Thread.sleep(5 * 1000); - } catch (InterruptedException ex) { - LOG.warn("{}", ex); - } - } - } - throw new IOException("No alive url found: " + StringUtils.join(";", Arrays.asList(this.urls))); - } finally { - reselectInProgress = false; - } - } - } - - protected abstract String buildTestURL(String urlToCheck); -} http://git-wip-us.apache.org/repos/asf/eagle/blob/3fe637eb/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/ha/HAURLSelector.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/ha/HAURLSelector.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/ha/HAURLSelector.java index fa9b52b..6539263 100644 --- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/ha/HAURLSelector.java +++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/ha/HAURLSelector.java @@ -20,9 +20,9 @@ import java.io.IOException; public interface HAURLSelector { - boolean checkUrl(String url); + void checkUrl() throws IOException; - void reSelectUrl() throws IOException; + boolean checkUrl(String urlString); String getSelectedUrl(); } http://git-wip-us.apache.org/repos/asf/eagle/blob/3fe637eb/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/ha/HAURLSelectorImpl.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/ha/HAURLSelectorImpl.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/ha/HAURLSelectorImpl.java index a083ef2..ca1df25 100644 --- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/ha/HAURLSelectorImpl.java +++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/ha/HAURLSelectorImpl.java @@ -32,16 +32,38 @@ public class HAURLSelectorImpl implements HAURLSelector { private final String[] urls; private volatile String selectedUrl; private final ServiceURLBuilder builder; + private final Constants.JobState jobState; private volatile boolean reselectInProgress; private final Constants.CompressionType compressionType; private static final long MAX_RETRY_TIME = 2; private static final Logger LOG = LoggerFactory.getLogger(HAURLSelectorImpl.class); - public HAURLSelectorImpl(String[] urls, ServiceURLBuilder builder, Constants.CompressionType compressionType) { + public HAURLSelectorImpl(String[] urls, ServiceURLBuilder builder, Constants.CompressionType compressionType, Constants.JobState jobState) { this.urls = urls; this.compressionType = compressionType; this.builder = builder; + this.jobState = jobState; + } + + private String convertRestApi(Constants.JobState jobState) { + if (jobState == null) { + return null; + } + switch (jobState) { + case RUNNING : return Constants.V2_APPS_RUNNING_URL; + case FINISHED : return Constants.V2_APPS_COMPLETED_URL; + case ALL : return Constants.V2_APPS_URL; + default : + LOG.error("Unsupported JobState={}", jobState); + return null; + } + } + + public void checkUrl() throws IOException { + if (!checkUrl(builder.build(getSelectedUrl(), convertRestApi(jobState)))) { + reSelectUrl(); + } } public boolean checkUrl(String urlString) { @@ -49,7 +71,7 @@ public class HAURLSelectorImpl implements HAURLSelector { try { is = InputStreamUtils.getInputStream(urlString, null, compressionType); } catch (Exception ex) { - LOG.info("get inputstream from url: " + urlString + " failed. "); + LOG.info("get inputStream from url: " + urlString + " failed. "); return false; } finally { if (is != null) { @@ -71,8 +93,7 @@ public class HAURLSelectorImpl implements HAURLSelector { return selectedUrl; } - @Override - public void reSelectUrl() throws IOException { + private void reSelectUrl() throws IOException { if (reselectInProgress) { return; } @@ -87,7 +108,7 @@ public class HAURLSelectorImpl implements HAURLSelector { String urlToCheck = urls[i]; LOG.info("Going to try url :" + urlToCheck); for (int time = 0; time < MAX_RETRY_TIME; time++) { - if (checkUrl(builder.build(urlToCheck, Constants.JobState.RUNNING.name()))) { + if (checkUrl(builder.build(urlToCheck, convertRestApi(jobState)))) { selectedUrl = urls[i]; LOG.info("Successfully switch to new url : " + selectedUrl); return; http://git-wip-us.apache.org/repos/asf/eagle/blob/3fe637eb/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/model/AppInfo.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/model/AppInfo.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/model/AppInfo.java index 90709c9..5b0269d 100644 --- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/model/AppInfo.java +++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/model/AppInfo.java @@ -43,9 +43,12 @@ public class AppInfo implements Serializable { private long elapsedTime; private String amContainerLogs; private String amHostHttpAddress; - private long allocatedMB; + private int allocatedMB; private int allocatedVCores; private int runningContainers; + // for HDP 2.7 + private double queueUsagePercentage; + private double clusterUsagePercentage; public String getId() { return id; @@ -183,11 +186,11 @@ public class AppInfo implements Serializable { this.amHostHttpAddress = amHostHttpAddress; } - public long getAllocatedMB() { + public int getAllocatedMB() { return allocatedMB; } - public void setAllocatedMB(long allocatedMB) { + public void setAllocatedMB(int allocatedMB) { this.allocatedMB = allocatedMB; } @@ -207,6 +210,22 @@ public class AppInfo implements Serializable { this.runningContainers = runningContainers; } + public double getQueueUsagePercentage() { + return queueUsagePercentage; + } + + public void setQueueUsagePercentage(double queueUsagePercentage) { + this.queueUsagePercentage = queueUsagePercentage; + } + + public double getClusterUsagePercentage() { + return clusterUsagePercentage; + } + + public void setClusterUsagePercentage(double clusterUsagePercentage) { + this.clusterUsagePercentage = clusterUsagePercentage; + } + @Override public String toString() { return "AppInfo{" http://git-wip-us.apache.org/repos/asf/eagle/blob/3fe637eb/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/url/JobListServiceURLBuilderImpl.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/url/JobListServiceURLBuilderImpl.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/url/JobListServiceURLBuilderImpl.java index 5513771..e5994aa 100644 --- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/url/JobListServiceURLBuilderImpl.java +++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/url/JobListServiceURLBuilderImpl.java @@ -20,25 +20,17 @@ import org.apache.eagle.jpm.util.Constants; public class JobListServiceURLBuilderImpl implements ServiceURLBuilder { - public String build(String... parameters) { + public String build(String url, String... parameters) { /** * {rmUrl}/ws/v1/cluster/apps?state=RUNNING. * We need to remove tailing slashes to avoid "url//ws/v1" * because it would not be found and would be redirected to * history server ui. */ - String rmUrl = URLUtil.removeTrailingSlash(parameters[0]); + String rmUrl = URLUtil.removeTrailingSlash(url); - String restApi = null; - String jobState = parameters[1]; + String restApi = parameters[0]; - if (jobState.equals(Constants.JobState.RUNNING.name())) { - restApi = Constants.V2_APPS_RUNNING_URL; - } else if (jobState.equals(Constants.JobState.FINISHED.name())) { - restApi = Constants.V2_APPS_COMPLETED_URL; - } else if (jobState.equals(Constants.JobState.ALL.name())) { - restApi = Constants.V2_APPS_URL; - } if (restApi == null) { return null; } http://git-wip-us.apache.org/repos/asf/eagle/blob/3fe637eb/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/url/RmActiveTestURLBuilderImpl.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/url/RmActiveTestURLBuilderImpl.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/url/RmActiveTestURLBuilderImpl.java new file mode 100644 index 0000000..f0b963b --- /dev/null +++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/url/RmActiveTestURLBuilderImpl.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.jpm.util.resourcefetch.url; + +import org.apache.eagle.jpm.util.Constants; + +public class RmActiveTestURLBuilderImpl implements ServiceURLBuilder { + @Override + public String build(String url, String... parameters) { + String rmUrl = URLUtil.removeTrailingSlash(url); + return String.format("%s/%s&limit=1&%s", rmUrl, Constants.V2_APPS_COMPLETED_URL, Constants.ANONYMOUS_PARAMETER); + } +} http://git-wip-us.apache.org/repos/asf/eagle/blob/3fe637eb/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/url/ServiceURLBuilder.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/url/ServiceURLBuilder.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/url/ServiceURLBuilder.java index 09fea2f..1fc234f 100644 --- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/url/ServiceURLBuilder.java +++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/url/ServiceURLBuilder.java @@ -17,5 +17,5 @@ package org.apache.eagle.jpm.util.resourcefetch.url; public interface ServiceURLBuilder { - String build(String... parameters); + String build(String url, String... parameters); } http://git-wip-us.apache.org/repos/asf/eagle/blob/3fe637eb/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/url/SparkCompleteJobServiceURLBuilderImpl.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/url/SparkCompleteJobServiceURLBuilderImpl.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/url/SparkCompleteJobServiceURLBuilderImpl.java index ca6e938..063ac5f 100644 --- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/url/SparkCompleteJobServiceURLBuilderImpl.java +++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/url/SparkCompleteJobServiceURLBuilderImpl.java @@ -23,11 +23,11 @@ import org.apache.eagle.jpm.util.Constants; public class SparkCompleteJobServiceURLBuilderImpl implements ServiceURLBuilder { - public String build(String... parameters) { - String url = URLUtil.removeTrailingSlash(parameters[0]); + public String build(String url, String... parameters) { + String newUrl = URLUtil.removeTrailingSlash(url); - return url + "/" + Constants.V2_APPS_URL + return newUrl + "/" + Constants.V2_APPS_URL + "?applicationTypes=SPARK&state=FINISHED&finishedTimeBegin=" - + parameters[1] + "&" + Constants.ANONYMOUS_PARAMETER; + + parameters[0] + "&" + Constants.ANONYMOUS_PARAMETER; } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/3fe637eb/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/url/SparkJobServiceURLBuilderImpl.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/url/SparkJobServiceURLBuilderImpl.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/url/SparkJobServiceURLBuilderImpl.java index c5ec67a..20006f5 100644 --- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/url/SparkJobServiceURLBuilderImpl.java +++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/url/SparkJobServiceURLBuilderImpl.java @@ -23,9 +23,9 @@ import org.apache.eagle.jpm.util.Constants; public class SparkJobServiceURLBuilderImpl implements ServiceURLBuilder { - public String build(String... parameters) { - String serverAddress = URLUtil.removeTrailingSlash(parameters[0]); + public String build(String url, String... parameters) { + String serverAddress = URLUtil.removeTrailingSlash(url); - return serverAddress + Constants.SPARK_APPS_URL + parameters[1]; + return serverAddress + Constants.SPARK_APPS_URL + parameters[0]; } } http://git-wip-us.apache.org/repos/asf/eagle/blob/3fe637eb/eagle-jpm/eagle-jpm-util/src/test/java/org/apache/eagle/jpm/util/resourcefetch/ha/HAURLSelectorImplTest.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-util/src/test/java/org/apache/eagle/jpm/util/resourcefetch/ha/HAURLSelectorImplTest.java b/eagle-jpm/eagle-jpm-util/src/test/java/org/apache/eagle/jpm/util/resourcefetch/ha/HAURLSelectorImplTest.java index 8c958a3..9d1f73c 100644 --- a/eagle-jpm/eagle-jpm-util/src/test/java/org/apache/eagle/jpm/util/resourcefetch/ha/HAURLSelectorImplTest.java +++ b/eagle-jpm/eagle-jpm-util/src/test/java/org/apache/eagle/jpm/util/resourcefetch/ha/HAURLSelectorImplTest.java @@ -44,7 +44,7 @@ public class HAURLSelectorImplTest { @Test public void testCheckUrl() throws Exception { String[] rmBasePaths = new String[]{"http://www.xxx.com:8088", "http://www.yyy.com:8088"}; - HAURLSelectorImpl haurlSelector = new HAURLSelectorImpl(rmBasePaths, new JobListServiceURLBuilderImpl(), Constants.CompressionType.GZIP); + HAURLSelectorImpl haurlSelector = new HAURLSelectorImpl(rmBasePaths, new JobListServiceURLBuilderImpl(), Constants.CompressionType.GZIP, Constants.JobState.RUNNING); mockStatic(InputStreamUtils.class); when(InputStreamUtils.getInputStream("http://www.xxx.com:8088", null, Constants.CompressionType.GZIP)).thenReturn(null); Assert.assertTrue(haurlSelector.checkUrl("http://www.xxx.com:8088")); @@ -53,7 +53,7 @@ public class HAURLSelectorImplTest { @Test public void testCheckUrl1() throws Exception { String[] rmBasePaths = new String[]{"http://www.xxx.com:8088", "http://www.yyy.com:8088"}; - HAURLSelectorImpl haurlSelector = new HAURLSelectorImpl(rmBasePaths, new JobListServiceURLBuilderImpl(), Constants.CompressionType.GZIP); + HAURLSelectorImpl haurlSelector = new HAURLSelectorImpl(rmBasePaths, new JobListServiceURLBuilderImpl(), Constants.CompressionType.GZIP, Constants.JobState.RUNNING); mockStatic(InputStreamUtils.class); when(InputStreamUtils.getInputStream("http://www.xxx.com:8088", null, Constants.CompressionType.GZIP)).thenThrow(new Exception()); Assert.assertFalse(haurlSelector.checkUrl("http://www.xxx.com:8088")); @@ -62,29 +62,29 @@ public class HAURLSelectorImplTest { @Test public void testGetSelectedUrl() { String[] rmBasePaths = new String[]{"http://www.xxx.com:8088", "http://www.yyy.com:8088"}; - HAURLSelectorImpl haurlSelector = new HAURLSelectorImpl(rmBasePaths, new JobListServiceURLBuilderImpl(), Constants.CompressionType.GZIP); + HAURLSelectorImpl haurlSelector = new HAURLSelectorImpl(rmBasePaths, new JobListServiceURLBuilderImpl(), Constants.CompressionType.GZIP, Constants.JobState.RUNNING); Assert.assertEquals(rmBasePaths[0], haurlSelector.getSelectedUrl()); } @Test public void testReSelectUrl() throws Exception { String[] rmBasePaths = new String[]{"http://www.xxx.com:8088", "http://www.yyy.com:8088"}; - HAURLSelectorImpl haurlSelector = new HAURLSelectorImpl(rmBasePaths, new JobListServiceURLBuilderImpl(), Constants.CompressionType.GZIP); + HAURLSelectorImpl haurlSelector = new HAURLSelectorImpl(rmBasePaths, new JobListServiceURLBuilderImpl(), Constants.CompressionType.GZIP, Constants.JobState.RUNNING); mockStatic(InputStreamUtils.class); when(InputStreamUtils.getInputStream("http://www.xxx.com:8088/ws/v1/cluster/apps?state=RUNNING&anonymous=true", null, Constants.CompressionType.GZIP)).thenThrow(new Exception()); when(InputStreamUtils.getInputStream("http://www.yyy.com:8088/ws/v1/cluster/apps?state=RUNNING&anonymous=true", null, Constants.CompressionType.GZIP)).thenReturn(null); - haurlSelector.reSelectUrl(); + haurlSelector.checkUrl(); Assert.assertEquals(rmBasePaths[1], haurlSelector.getSelectedUrl()); } @Test public void testReSelectUrl1() throws Exception { String[] rmBasePaths = new String[]{"http://www.xxx.com:8088", "http://www.yyy.com:8088"}; - HAURLSelectorImpl haurlSelector = new HAURLSelectorImpl(rmBasePaths, new JobListServiceURLBuilderImpl(), Constants.CompressionType.GZIP); + HAURLSelectorImpl haurlSelector = new HAURLSelectorImpl(rmBasePaths, new JobListServiceURLBuilderImpl(), Constants.CompressionType.GZIP, Constants.JobState.RUNNING); mockStatic(InputStreamUtils.class); when(InputStreamUtils.getInputStream("http://www.xxx.com:8088/ws/v1/cluster/apps?state=RUNNING&anonymous=true", null, Constants.CompressionType.GZIP)).thenReturn(null); when(InputStreamUtils.getInputStream("http://www.yyy.com:8088/ws/v1/cluster/apps?state=RUNNING&anonymous=true", null, Constants.CompressionType.GZIP)).thenThrow(new Exception()); - haurlSelector.reSelectUrl(); + haurlSelector.checkUrl(); Assert.assertEquals(rmBasePaths[0], haurlSelector.getSelectedUrl()); } @@ -93,10 +93,10 @@ public class HAURLSelectorImplTest { public void testReSelectUrl2() throws Exception { thrown.expect(IOException.class); String[] rmBasePaths = new String[]{"http://www.xxx.com:8088", "http://www.yyy.com:8088"}; - HAURLSelectorImpl haurlSelector = new HAURLSelectorImpl(rmBasePaths, new JobListServiceURLBuilderImpl(), Constants.CompressionType.GZIP); + HAURLSelectorImpl haurlSelector = new HAURLSelectorImpl(rmBasePaths, new JobListServiceURLBuilderImpl(), Constants.CompressionType.GZIP, Constants.JobState.RUNNING); mockStatic(InputStreamUtils.class); when(InputStreamUtils.getInputStream("http://www.xxx.com:8088/ws/v1/cluster/apps?state=RUNNING&anonymous=true", null, Constants.CompressionType.GZIP)).thenThrow(new Exception()); when(InputStreamUtils.getInputStream("http://www.yyy.com:8088/ws/v1/cluster/apps?state=RUNNING&anonymous=true", null, Constants.CompressionType.GZIP)).thenThrow(new Exception()); - haurlSelector.reSelectUrl(); + haurlSelector.checkUrl(); } } http://git-wip-us.apache.org/repos/asf/eagle/blob/3fe637eb/eagle-jpm/eagle-jpm-util/src/test/java/org/apache/eagle/jpm/util/resourcefetch/url/JobListServiceURLBuilderImplTest.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-util/src/test/java/org/apache/eagle/jpm/util/resourcefetch/url/JobListServiceURLBuilderImplTest.java b/eagle-jpm/eagle-jpm-util/src/test/java/org/apache/eagle/jpm/util/resourcefetch/url/JobListServiceURLBuilderImplTest.java index 6f9aa97..81d9309 100644 --- a/eagle-jpm/eagle-jpm-util/src/test/java/org/apache/eagle/jpm/util/resourcefetch/url/JobListServiceURLBuilderImplTest.java +++ b/eagle-jpm/eagle-jpm-util/src/test/java/org/apache/eagle/jpm/util/resourcefetch/url/JobListServiceURLBuilderImplTest.java @@ -18,6 +18,7 @@ package org.apache.eagle.jpm.util.resourcefetch.url; import org.apache.eagle.jpm.util.Constants; +import org.apache.eagle.jpm.util.Constants.JobState; import org.junit.Assert; import org.junit.Test; @@ -25,13 +26,25 @@ public class JobListServiceURLBuilderImplTest { @Test public void testBuild() { JobListServiceURLBuilderImpl jobListServiceURLBuilderImpl = new JobListServiceURLBuilderImpl(); - String finalUrl = jobListServiceURLBuilderImpl.build("http://www.xxx.com:8088/", Constants.JobState.RUNNING.name()); + String finalUrl = jobListServiceURLBuilderImpl.build("http://www.xxx.com:8088/", convertRestApi(JobState.RUNNING)); Assert.assertEquals("http://www.xxx.com:8088/ws/v1/cluster/apps?state=RUNNING&anonymous=true", finalUrl); - finalUrl = jobListServiceURLBuilderImpl.build("http://www.xxx.com:8088/", Constants.JobState.FINISHED.name()); + finalUrl = jobListServiceURLBuilderImpl.build("http://www.xxx.com:8088/", convertRestApi(JobState.FINISHED)); Assert.assertEquals("http://www.xxx.com:8088/ws/v1/cluster/apps?state=FINISHED&anonymous=true", finalUrl); - finalUrl = jobListServiceURLBuilderImpl.build("http://www.xxx.com:8088/", Constants.JobState.ALL.name()); + finalUrl = jobListServiceURLBuilderImpl.build("http://www.xxx.com:8088/", convertRestApi(JobState.ALL)); Assert.assertEquals("http://www.xxx.com:8088/ws/v1/cluster/apps&anonymous=true", finalUrl); - finalUrl = jobListServiceURLBuilderImpl.build("http://www.xxx.com:8088/", ""); + finalUrl = jobListServiceURLBuilderImpl.build("http://www.xxx.com:8088/", convertRestApi(null)); Assert.assertEquals(null, finalUrl); } + + private String convertRestApi(Constants.JobState jobState) { + if (jobState == null) { + return null; + } + switch (jobState) { + case RUNNING : return Constants.V2_APPS_RUNNING_URL; + case FINISHED : return Constants.V2_APPS_COMPLETED_URL; + case ALL : return Constants.V2_APPS_URL; + default : return null; + } + } } http://git-wip-us.apache.org/repos/asf/eagle/blob/3fe637eb/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml ---------------------------------------------------------------------- diff --git a/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml b/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml index 1142e1b..8aa5d8e 100644 --- a/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml +++ b/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml @@ -59,17 +59,17 @@ <description>number of sinks connected to alert engine</description> </property> <property> - <name>topology.resolverAPIUrl</name> - <displayName>Rack Resolver APIUrl</displayName> - <description>Use the URL to obtain a Node Object, from a node identified by the nodeid value.</description> - <value>http://sandbox.hortonworks.com:8088/ws/v1/cluster/nodes</value> - </property> - <property> <name>topology.rackResolverCls</name> <displayName>Rack Resolver Class</displayName> <description>rack resolver class</description> <value>org.apache.eagle.topology.resolver.impl.DefaultTopologyRackResolver</value> </property> + <property> + <name>topology.resolverAPIUrl</name> + <displayName>Rack Resolver APIUrl Required by ClusterNodeAPITopologyRackResolver</displayName> + <description>Use the URL to obtain a Node Object, from a node identified by the nodeid value.</description> + <value>http://sandbox.hortonworks.com:8088/ws/v1/cluster/nodes</value> + </property> <property> <name>dataSourceConfig.hbase.enabled</name>
