(TWILL-230) Get resource report based on the caller user - Also by default get the resource report from the tracking url, then fall back to the original tracking url.
This closes #53 on Github. Signed-off-by: Terence Yim <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/twill/repo Commit: http://git-wip-us.apache.org/repos/asf/twill/commit/c310b694 Tree: http://git-wip-us.apache.org/repos/asf/twill/tree/c310b694 Diff: http://git-wip-us.apache.org/repos/asf/twill/diff/c310b694 Branch: refs/heads/site Commit: c310b694582fb31eca4ba6f217254cb853a65a7e Parents: 10fff16 Author: Terence Yim <[email protected]> Authored: Mon Apr 3 16:49:52 2017 -0700 Committer: Terence Yim <[email protected]> Committed: Mon Apr 3 20:55:39 2017 -0700 ---------------------------------------------------------------------- .../yarn/Hadoop20YarnApplicationReport.java | 4 +- .../apache/twill/yarn/ResourceReportClient.java | 28 ++++++---- .../apache/twill/yarn/YarnTwillController.java | 56 +++++++++++++++----- .../twill/yarn/ResourceReportTestRun.java | 47 +++++++++++++--- twill-yarn/src/test/resources/logback-test.xml | 1 + 5 files changed, 104 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/twill/blob/c310b694/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnApplicationReport.java ---------------------------------------------------------------------- diff --git a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnApplicationReport.java b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnApplicationReport.java index 6c1b764..8d6e2df 100644 --- a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnApplicationReport.java +++ b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnApplicationReport.java @@ -77,12 +77,12 @@ public final class Hadoop20YarnApplicationReport implements YarnApplicationRepor @Override public String getTrackingUrl() { - return report.getTrackingUrl(); + return "http://" + report.getTrackingUrl(); } @Override public String getOriginalTrackingUrl() { - return report.getOriginalTrackingUrl(); + return "http://" + report.getOriginalTrackingUrl(); } @Override http://git-wip-us.apache.org/repos/asf/twill/blob/c310b694/twill-yarn/src/main/java/org/apache/twill/yarn/ResourceReportClient.java ---------------------------------------------------------------------- diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/ResourceReportClient.java b/twill-yarn/src/main/java/org/apache/twill/yarn/ResourceReportClient.java index 3d5bcf3..fb8b7e8 100644 --- a/twill-yarn/src/main/java/org/apache/twill/yarn/ResourceReportClient.java +++ b/twill-yarn/src/main/java/org/apache/twill/yarn/ResourceReportClient.java @@ -25,9 +25,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.BufferedReader; +import java.io.IOException; import java.io.InputStreamReader; import java.io.Reader; import java.net.URL; +import java.util.List; /** * Package private class to get {@link ResourceReport} from the application master. @@ -36,10 +38,10 @@ final class ResourceReportClient { private static final Logger LOG = LoggerFactory.getLogger(ResourceReportClient.class); private final ResourceReportAdapter reportAdapter; - private final URL resourceUrl; + private final List<URL> resourceUrls; - ResourceReportClient(URL resourceUrl) { - this.resourceUrl = resourceUrl; + ResourceReportClient(List<URL> resourceUrls) { + this.resourceUrls = resourceUrls; this.reportAdapter = ResourceReportAdapter.create(); } @@ -48,16 +50,20 @@ final class ResourceReportClient { * @return A {@link ResourceReport} or {@code null} if failed to fetch the report. */ public ResourceReport get() { - try { - Reader reader = new BufferedReader(new InputStreamReader(resourceUrl.openStream(), Charsets.UTF_8)); + for (URL url : resourceUrls) { try { - return reportAdapter.fromJson(reader); - } finally { - Closeables.closeQuietly(reader); + Reader reader = new BufferedReader(new InputStreamReader(url.openStream(), Charsets.UTF_8)); + try { + LOG.trace("Report returned by {}", url); + return reportAdapter.fromJson(reader); + } finally { + Closeables.closeQuietly(reader); + } + } catch (IOException e) { + // Just log a trace as it's ok to not able to fetch resource report + LOG.trace("Exception raised when getting resource report from {}.", url, e); } - } catch (Exception e) { - LOG.error("Exception getting resource report from {}.", resourceUrl, e); - return null; } + return null; } } http://git-wip-us.apache.org/repos/asf/twill/blob/c310b694/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java ---------------------------------------------------------------------- diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java index 1945731..6ea7d8f 100644 --- a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java +++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java @@ -43,10 +43,12 @@ import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.net.URI; +import java.net.MalformedURLException; import java.net.URL; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; +import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -66,7 +68,6 @@ final class YarnTwillController extends AbstractTwillController implements Twill private final TimeUnit startTimeoutUnit; private volatile ApplicationMasterLiveNodeData amLiveNodeData; private ProcessController<YarnApplicationReport> processController; - private ResourceReportClient resourcesClient; // Thread for polling yarn for application status if application got ZK session expire. // Only used by the instanceUpdate/Delete method, which is from serialized call from ZK callback. @@ -101,7 +102,6 @@ final class YarnTwillController extends AbstractTwillController implements Twill this.startTimeoutUnit = startTimeoutUnit; } - /** * Sends a message to application to notify the secure store has be updated. */ @@ -140,14 +140,6 @@ final class YarnTwillController extends AbstractTwillController implements Twill if (state != YarnApplicationState.RUNNING) { LOG.info("Yarn application {} {} is not in running state. Shutting down controller.", appName, appId); forceShutDown(); - } else { - try { - URL resourceUrl = URI.create(String.format("http://%s:%d", report.getHost(), report.getRpcPort())) - .resolve(TrackerService.PATH).toURL(); - resourcesClient = new ResourceReportClient(resourceUrl); - } catch (IOException e) { - resourcesClient = null; - } } } catch (Exception e) { throw Throwables.propagate(e); @@ -322,7 +314,45 @@ final class YarnTwillController extends AbstractTwillController implements Twill @Override public ResourceReport getResourceReport() { - // in case the user calls this before starting, return null + // Only has resource report if the app is running. + if (state() != State.RUNNING) { + return null; + } + ResourceReportClient resourcesClient = getResourcesClient(); return (resourcesClient == null) ? null : resourcesClient.get(); } + + /** + * Returns the {@link ResourceReportClient} for fetching resource report from the AM. + * It first consults the RM for the tracking URL and get the resource report from there. + */ + @Nullable + private ResourceReportClient getResourcesClient() { + YarnApplicationReport report = processController.getReport(); + List<URL> urls = new ArrayList<>(2); + + // Try getting the report from the proxy tracking URL as well as the original tracking URL directly + // This is mainly to workaround for unit-test that the proxy tracking URL doesn't work well with local address. + for (String url : Arrays.asList(report.getTrackingUrl(), report.getOriginalTrackingUrl())) { + if (url != null && !url.equals("N/A")) { + try { + URL trackingUrl = new URL(url); + String path = trackingUrl.getPath(); + if (path.endsWith("/")) { + path = path.substring(0, path.length() - 1); + } + urls.add(new URL(trackingUrl.getProtocol(), trackingUrl.getHost(), + trackingUrl.getPort(), path + TrackerService.PATH)); + } catch (MalformedURLException e) { + LOG.debug("Invalid tracking URL {} from YARN application report for {}:{}", url, appName, getRunId()); + } + } + } + + if (urls.isEmpty()) { + return null; + } + + return new ResourceReportClient(urls); + } } http://git-wip-us.apache.org/repos/asf/twill/blob/c310b694/twill-yarn/src/test/java/org/apache/twill/yarn/ResourceReportTestRun.java ---------------------------------------------------------------------- diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/ResourceReportTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/ResourceReportTestRun.java index 32e1fd6..a61880f 100644 --- a/twill-yarn/src/test/java/org/apache/twill/yarn/ResourceReportTestRun.java +++ b/twill-yarn/src/test/java/org/apache/twill/yarn/ResourceReportTestRun.java @@ -18,15 +18,18 @@ package org.apache.twill.yarn; import com.google.common.base.Charsets; +import com.google.common.base.Stopwatch; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Maps; import com.google.common.io.LineReader; +import com.google.common.util.concurrent.Uninterruptibles; import org.apache.twill.api.ResourceReport; import org.apache.twill.api.ResourceSpecification; import org.apache.twill.api.TwillApplication; import org.apache.twill.api.TwillController; import org.apache.twill.api.TwillRunResources; import org.apache.twill.api.TwillRunner; +import org.apache.twill.api.TwillRunnerService; import org.apache.twill.api.TwillSpecification; import org.apache.twill.api.logging.PrinterLogHandler; import org.apache.twill.common.Threads; @@ -159,7 +162,7 @@ public final class ResourceReportTestRun extends BaseYarnTest { Iterable<Discoverable> echoServices = controller.discoverService("echo"); Assert.assertTrue(waitForSize(echoServices, 2, 120)); // check that we have 2 runnables. - ResourceReport report = controller.getResourceReport(); + ResourceReport report = getResourceReport(controller, 10000); Assert.assertEquals(2, report.getRunnableResources("BuggyServer").size()); // cause a divide by 0 in one server @@ -175,7 +178,7 @@ public final class ResourceReportTestRun extends BaseYarnTest { // takes some time for app master to find out the container completed... int count = 0; while (count < 100) { - report = controller.getResourceReport(); + report = getResourceReport(controller, 10000); // check that we have 1 runnable, not 2. if (report.getRunnableResources("BuggyServer").size() == 1) { break; @@ -216,7 +219,7 @@ public final class ResourceReportTestRun extends BaseYarnTest { // wait for 3 echo servers to come up Iterable<Discoverable> echoServices = controller.discoverService("echo"); Assert.assertTrue(waitForSize(echoServices, 3, 120)); - ResourceReport report = controller.getResourceReport(); + ResourceReport report = getResourceReport(controller, 10000); // make sure resources for echo1 and echo2 are there Map<String, Collection<TwillRunResources>> usedResources = report.getResources(); Assert.assertEquals(2, usedResources.keySet().size()); @@ -226,10 +229,10 @@ public final class ResourceReportTestRun extends BaseYarnTest { waitForSize(new Iterable<String>() { @Override public Iterator<String> iterator() { - return controller.getResourceReport().getServices().iterator(); + return getResourceReport(controller, 10000).getServices().iterator(); } }, 3, 120); - report = controller.getResourceReport(); + report = getResourceReport(controller, 10000); Assert.assertEquals(ImmutableSet.of("echo", "echo1", "echo2"), ImmutableSet.copyOf(report.getServices())); Collection<TwillRunResources> echo1Resources = usedResources.get("echo1"); @@ -252,7 +255,7 @@ public final class ResourceReportTestRun extends BaseYarnTest { controller.changeInstances("echo1", 1).get(60, TimeUnit.SECONDS); echoServices = controller.discoverService("echo1"); Assert.assertTrue(waitForSize(echoServices, 1, 60)); - report = controller.getResourceReport(); + report = getResourceReport(controller, 10000); // make sure resources for echo1 and echo2 are there usedResources = report.getResources(); @@ -276,8 +279,40 @@ public final class ResourceReportTestRun extends BaseYarnTest { Assert.assertEquals(512, resources.getMemoryMB()); } + // Create a new TwillRunner, it should be able to get the same resource report + TwillRunnerService newRunnerService = TWILL_TESTER.createTwillRunnerService(); + newRunnerService.start(); + try { + TwillController newController = newRunnerService.lookup("ResourceApplication", controller.getRunId()); + // Get the controller of the application + int trials = 60; + while (newController == null && trials-- > 0) { + TimeUnit.SECONDS.sleep(1); + newController = newRunnerService.lookup("ResourceApplication", controller.getRunId()); + } + Assert.assertNotNull(newController); + + ResourceReport newReport = getResourceReport(newController, 10000); + Assert.assertEquals(report.getResources(), newReport.getResources()); + + } finally { + newRunnerService.stop(); + } + controller.terminate().get(120, TimeUnit.SECONDS); // Sleep a bit before exiting. TimeUnit.SECONDS.sleep(2); } + + private ResourceReport getResourceReport(TwillController controller, long timeoutMillis) { + ResourceReport report = controller.getResourceReport(); + Stopwatch stopwatch = new Stopwatch(); + while (report == null && stopwatch.elapsedMillis() < timeoutMillis) { + Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS); + report = controller.getResourceReport(); + } + + Assert.assertNotNull(report); + return report; + } } http://git-wip-us.apache.org/repos/asf/twill/blob/c310b694/twill-yarn/src/test/resources/logback-test.xml ---------------------------------------------------------------------- diff --git a/twill-yarn/src/test/resources/logback-test.xml b/twill-yarn/src/test/resources/logback-test.xml index 4bcdb42..2b210cb 100644 --- a/twill-yarn/src/test/resources/logback-test.xml +++ b/twill-yarn/src/test/resources/logback-test.xml @@ -24,6 +24,7 @@ limitations under the License. </encoder> </appender> + <logger name="org.mortbay" level="OFF" /> <logger name="org.apache.twill" level="DEBUG" /> <root level="WARN">
