Repository: hadoop Updated Branches: refs/heads/YARN-2928 bcd755eba -> f40c73548 (forced update)
YARN-3240. Implement client API to put generic entities. Contributed by Zhijie Shen (cherry picked from commit 4487da249f448d5c67b712cd0aa723e764eed77d) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6b7c5538 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6b7c5538 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6b7c5538 Branch: refs/heads/YARN-2928 Commit: 6b7c553833f12fcc2e64d8fb62b7c6d04e297f27 Parents: 4f38ace Author: Junping Du <[email protected]> Authored: Wed Feb 25 02:40:55 2015 -0800 Committer: Vinod Kumar Vavilapalli <[email protected]> Committed: Fri Aug 14 11:23:21 2015 -0700 ---------------------------------------------------------------------- hadoop-project/pom.xml | 7 ++ hadoop-yarn-project/CHANGES.txt | 3 + .../timelineservice/TimelineEntities.java | 58 ++++++++++ .../hadoop-yarn/hadoop-yarn-common/pom.xml | 1 + .../hadoop/yarn/client/api/TimelineClient.java | 60 +++++++++- .../client/api/impl/TimelineClientImpl.java | 110 ++++++++++++++++--- .../TestTimelineServiceRecords.java | 7 ++ .../hadoop-yarn-server-tests/pom.xml | 12 ++ .../TestTimelineServiceClientIntegration.java | 54 +++++++++ .../hadoop-yarn-server-timelineservice/pom.xml | 11 ++ .../aggregator/BaseAggregatorService.java | 7 +- .../aggregator/PerNodeAggregatorServer.java | 9 +- .../aggregator/PerNodeAggregatorWebService.java | 54 +++++---- 13 files changed, 344 insertions(+), 49 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b7c5538/hadoop-project/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml index 20eee21..acc021d 100644 --- a/hadoop-project/pom.xml +++ b/hadoop-project/pom.xml @@ -294,6 +294,13 @@ <version>${project.version}</version> </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-server-timelineservice</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + </dependency> + <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-yarn-applications-distributedshell</artifactId> http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b7c5538/hadoop-yarn-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index ad8a5dc..a768480 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -14,6 +14,9 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1 YARN-3041. Added the overall data model of timeline service next gen. (zjshen) + YARN-3240. Implement client API to put generic entities. (Zhijie Shen via + junping_du) + IMPROVEMENTS OPTIMIZATIONS http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b7c5538/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntities.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntities.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntities.java new file mode 100644 index 0000000..39504cc --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntities.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.api.records.timelineservice; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlElement; +import javax.xml.bind.annotation.XmlRootElement; +import java.util.HashSet; +import java.util.Set; + +@XmlRootElement(name = "entities") +@XmlAccessorType(XmlAccessType.NONE) [email protected] [email protected] +public class TimelineEntities { + + private Set<TimelineEntity> entities = new HashSet<>(); + + public TimelineEntities() { + + } + + @XmlElement(name = "entities") + public Set<TimelineEntity> getEntities() { + return entities; + } + + public void setEntities(Set<TimelineEntity> entities) { + this.entities = entities; + } + + public void addEntities(Set<TimelineEntity> entities) { + this.entities.addAll(entities); + } + + public void addEntity(TimelineEntity entity) { + entities.add(entity); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b7c5538/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml index 3b47cdd..7c6e719 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml @@ -207,6 +207,7 @@ <exclude>src/main/resources/webapps/jobhistory/.keep</exclude> <exclude>src/main/resources/webapps/yarn/.keep</exclude> <exclude>src/main/resources/webapps/applicationhistory/.keep</exclude> + <exclude>src/main/resources/webapps/timeline/.keep</exclude> <exclude>src/main/resources/webapps/cluster/.keep</exclude> <exclude>src/main/resources/webapps/test/.keep</exclude> <exclude>src/main/resources/webapps/proxy/.keep</exclude> http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b7c5538/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java index a3766f9..f7f6fc0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java @@ -26,8 +26,9 @@ import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.service.AbstractService; -import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -50,15 +51,25 @@ public abstract class TimelineClient extends AbstractService { * * @return a timeline client */ + protected ApplicationId contextAppId; + protected String timelineServiceAddress; + @Public public static TimelineClient createTimelineClient() { TimelineClient client = new TimelineClientImpl(); return client; } + @Public + public static TimelineClient createTimelineClient(ApplicationId appId) { + TimelineClient client = new TimelineClientImpl(appId); + return client; + } + @Private - protected TimelineClient(String name) { + protected TimelineClient(String name, ApplicationId appId) { super(name); + contextAppId = appId; } /** @@ -142,4 +153,49 @@ public abstract class TimelineClient extends AbstractService { public abstract void cancelDelegationToken( Token<TimelineDelegationTokenIdentifier> timelineDT) throws IOException, YarnException; + + /** + * <p> + * Send the information of a number of conceptual entities to the timeline + * aggregator. It is a blocking API. The method will not return until all the + * put entities have been persisted. + * </p> + * + * @param entities + * the collection of {@link org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity} + * @throws IOException + * @throws YarnException + */ + @Public + public abstract void putEntities( + org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity... entities) + throws IOException, YarnException; + + /** + * <p> + * Send the information of a number of conceptual entities to the timeline + * aggregator. It is an asynchronous API. The method will return once all the + * entities are received. + * </p> + * + * @param entities + * the collection of {@link org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity} + * @throws IOException + * @throws YarnException + */ + @Public + public abstract void putEntitiesAsync( + org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity... entities) + throws IOException, YarnException; + + /** + * <p> + * Update the timeline service address where the request will be sent to + * </p> + * @param address + * the timeline service address + */ + public void setTimelineServiceAddress(String address) { + timelineServiceAddress = address; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b7c5538/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java index 04c84ca..5bdc3b2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java @@ -35,6 +35,7 @@ import javax.net.ssl.HostnameVerifier; import javax.net.ssl.HttpsURLConnection; import javax.net.ssl.SSLSocketFactory; import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.MultivaluedMap; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.GnuParser; @@ -45,8 +46,8 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authentication.client.AuthenticationException; import org.apache.hadoop.security.authentication.client.ConnectionConfigurator; import org.apache.hadoop.security.ssl.SSLFactory; @@ -55,6 +56,7 @@ import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthentica import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticator; import org.apache.hadoop.security.token.delegation.web.KerberosDelegationTokenAuthenticator; import org.apache.hadoop.security.token.delegation.web.PseudoDelegationTokenAuthenticator; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain; import org.apache.hadoop.yarn.api.records.timeline.TimelineDomains; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; @@ -81,13 +83,16 @@ import com.sun.jersey.api.client.config.DefaultClientConfig; import com.sun.jersey.api.client.filter.ClientFilter; import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory; import com.sun.jersey.client.urlconnection.URLConnectionClientHandler; +import com.sun.jersey.core.util.MultivaluedMapImpl; + @Private @Evolving public class TimelineClientImpl extends TimelineClient { private static final Log LOG = LogFactory.getLog(TimelineClientImpl.class); - private static final String RESOURCE_URI_STR = "/ws/v1/timeline/"; + private static final String RESOURCE_URI_STR_V1 = "/ws/v1/timeline/"; + private static final String RESOURCE_URI_STR_V2 = "/ws/v2/timeline/"; private static final Joiner JOINER = Joiner.on(""); public final static int DEFAULT_SOCKET_TIMEOUT = 1 * 60 * 1000; // 1 minute @@ -251,7 +256,11 @@ public class TimelineClientImpl extends TimelineClient { } public TimelineClientImpl() { - super(TimelineClientImpl.class.getName()); + super(TimelineClientImpl.class.getName(), null); + } + + public TimelineClientImpl(ApplicationId applicationId) { + super(TimelineClientImpl.class.getName(), applicationId); } protected void serviceInit(Configuration conf) throws Exception { @@ -282,18 +291,15 @@ public class TimelineClientImpl extends TimelineClient { client.addFilter(retryFilter); if (YarnConfiguration.useHttps(conf)) { - resURI = URI - .create(JOINER.join("https://", conf.get( - YarnConfiguration.TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS, - YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS), - RESOURCE_URI_STR)); + timelineServiceAddress = conf.get( + YarnConfiguration.TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS); } else { - resURI = URI.create(JOINER.join("http://", conf.get( + timelineServiceAddress = conf.get( YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS, - YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_ADDRESS), - RESOURCE_URI_STR)); + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_ADDRESS); } - LOG.info("Timeline service address: " + resURI); + LOG.info("Timeline service address: " + timelineServiceAddress); super.serviceInit(conf); } @@ -306,6 +312,39 @@ public class TimelineClientImpl extends TimelineClient { return resp.getEntity(TimelinePutResponse.class); } + @Override + public void putEntities( + org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity... entities) + throws IOException, YarnException { + putEntities(false, entities); + } + + @Override + public void putEntitiesAsync( + org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity... entities) + throws IOException, YarnException { + putEntities(true, entities); + } + + private void putEntities(boolean async, + org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity... entities) + throws IOException, YarnException { + org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities + entitiesContainer = + new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities(); + for (org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity : entities) { + entitiesContainer.addEntity(entity); + } + MultivaluedMap<String, String> params = new MultivaluedMapImpl(); + if (contextAppId != null) { + params.add("appid", contextAppId.toString()); + } + if (async) { + params.add("async", Boolean.TRUE.toString()); + } + putObjects(constructResURI(getConfig(), timelineServiceAddress, true), + "entities", params, entitiesContainer); + } @Override public void putDomain(TimelineDomain domain) throws IOException, @@ -313,6 +352,36 @@ public class TimelineClientImpl extends TimelineClient { doPosting(domain, "domain"); } + private void putObjects( + URI base, String path, MultivaluedMap<String, String> params, Object obj) + throws IOException, YarnException { + ClientResponse resp; + try { + resp = client.resource(base).path(path).queryParams(params) + .accept(MediaType.APPLICATION_JSON) + .type(MediaType.APPLICATION_JSON) + .put(ClientResponse.class, obj); + } catch (RuntimeException re) { + // runtime exception is expected if the client cannot connect the server + String msg = + "Failed to get the response from the timeline server."; + LOG.error(msg, re); + throw new IOException(re); + } + if (resp == null || + resp.getClientResponseStatus() != ClientResponse.Status.OK) { + String msg = + "Failed to get the response from the timeline server."; + LOG.error(msg); + if (LOG.isDebugEnabled() && resp != null) { + String output = resp.getEntity(String.class); + LOG.debug("HTTP error code: " + resp.getStatus() + + " Server response:\n" + output); + } + throw new YarnException(msg); + } + } + private ClientResponse doPosting(final Object obj, final String path) throws IOException, YarnException { ClientResponse resp; @@ -357,7 +426,8 @@ public class TimelineClientImpl extends TimelineClient { new DelegationTokenAuthenticatedURL(authenticator, connConfigurator); return (Token) authUrl.getDelegationToken( - resURI.toURL(), token, renewer, doAsUser); + constructResURI(getConfig(), timelineServiceAddress, false).toURL(), + token, renewer, doAsUser); } }; return (Token<TimelineDelegationTokenIdentifier>) operateDelegationToken(getDTAction); @@ -392,7 +462,7 @@ public class TimelineClientImpl extends TimelineClient { // the configured service address. final URI serviceURI = isTokenServiceAddrEmpty ? resURI : new URI(scheme, null, address.getHostName(), - address.getPort(), RESOURCE_URI_STR, null, null); + address.getPort(), RESOURCE_URI_STR_V1, null, null); return authUrl .renewDelegationToken(serviceURI.toURL(), token, doAsUser); } @@ -429,7 +499,7 @@ public class TimelineClientImpl extends TimelineClient { // the configured service address. final URI serviceURI = isTokenServiceAddrEmpty ? resURI : new URI(scheme, null, address.getHostName(), - address.getPort(), RESOURCE_URI_STR, null, null); + address.getPort(), RESOURCE_URI_STR_V1, null, null); authUrl.cancelDelegationToken(serviceURI.toURL(), token, doAsUser); return null; } @@ -469,7 +539,8 @@ public class TimelineClientImpl extends TimelineClient { @Private @VisibleForTesting public ClientResponse doPostingObject(Object object, String path) { - WebResource webResource = client.resource(resURI); + WebResource webResource = client.resource( + constructResURI(getConfig(), timelineServiceAddress, false)); if (path == null) { return webResource.accept(MediaType.APPLICATION_JSON) .type(MediaType.APPLICATION_JSON) @@ -553,6 +624,13 @@ public class TimelineClientImpl extends TimelineClient { connection.setReadTimeout(socketTimeout); } + private static URI constructResURI( + Configuration conf, String address, boolean v2) { + return URI.create( + JOINER.join(YarnConfiguration.useHttps(conf) ? "https://" : "http://", + address, v2 ? RESOURCE_URI_STR_V2 : RESOURCE_URI_STR_V1)); + } + public static void main(String[] argv) throws Exception { CommandLine cliParser = new GnuParser().parse(opts, argv); if (cliParser.hasOption("put")) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b7c5538/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineServiceRecords.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineServiceRecords.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineServiceRecords.java index 6bab239..4f8ab94 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineServiceRecords.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineServiceRecords.java @@ -76,6 +76,13 @@ public class TestTimelineServiceRecords { entity.addIsRelatedToEntity("test type 4", "test id 4"); entity.addIsRelatedToEntity("test type 5", "test id 5"); LOG.info(TimelineUtils.dumpTimelineRecordtoJSON(entity, true)); + + TimelineEntities entities = new TimelineEntities(); + TimelineEntity entity1 = new TimelineEntity(); + entities.addEntity(entity1); + TimelineEntity entity2 = new TimelineEntity(); + entities.addEntity(entity2); + LOG.info(TimelineUtils.dumpTimelineRecordtoJSON(entities, true)); } @Test http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b7c5538/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/pom.xml index 6a229de..3b7fba0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/pom.xml @@ -88,6 +88,18 @@ <!-- 'mvn dependency:analyze' fails to detect use of this dependency --> <dependency> <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-server-timelineservice</artifactId> + </dependency> + <!-- 'mvn dependency:analyze' fails to detect use of this dependency --> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-server-timelineservice</artifactId> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <!-- 'mvn dependency:analyze' fails to detect use of this dependency --> + <dependency> + <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-minikdc</artifactId> <scope>test</scope> </dependency> http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b7c5538/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java new file mode 100644 index 0000000..a5159a2 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java @@ -0,0 +1,54 @@ +package org.apache.hadoop.yarn.server.timelineservice; + + +import org.apache.hadoop.util.ExitUtil; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.client.api.TimelineClient; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.timelineservice.aggregator.PerNodeAggregatorServer; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import static org.junit.Assert.fail; + +public class TestTimelineServiceClientIntegration { + private static PerNodeAggregatorServer server; + + @BeforeClass + public static void setupClass() throws Exception { + try { + server = PerNodeAggregatorServer.launchServer(new String[0]); + server.addApplication(ApplicationId.newInstance(0, 1)); + } catch (ExitUtil.ExitException e) { + fail(); + } + } + + @AfterClass + public static void tearDownClass() throws Exception { + if (server != null) { + server.stop(); + } + } + + @Test + public void testPutEntities() throws Exception { + TimelineClient client = + TimelineClient.createTimelineClient(ApplicationId.newInstance(0, 1)); + try { + client.init(new YarnConfiguration()); + client.start(); + TimelineEntity entity = new TimelineEntity(); + entity.setType("test entity type"); + entity.setId("test entity id"); + client.putEntities(entity); + client.putEntitiesAsync(entity); + } catch(Exception e) { + fail(); + } finally { + client.stop(); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b7c5538/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml index 3154ca3..26790f1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml @@ -114,6 +114,17 @@ <build> <plugins> + <plugin> + <artifactId>maven-jar-plugin</artifactId> + <executions> + <execution> + <goals> + <goal>test-jar</goal> + </goals> + <phase>test-compile</phase> + </execution> + </executions> + </plugin> </plugins> </build> </project> http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b7c5538/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/BaseAggregatorService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/BaseAggregatorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/BaseAggregatorService.java index 994c66f..46e5574 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/BaseAggregatorService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/BaseAggregatorService.java @@ -25,8 +25,7 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.CompositeService; -import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; -import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; /** * Service that handles writes to the timeline service and writes them to the @@ -70,16 +69,14 @@ public class BaseAggregatorService extends CompositeService { * * @param entities entities to post * @param callerUgi the caller UGI - * @return the response that contains the result of the post. */ - public TimelinePutResponse postEntities(TimelineEntities entities, + public void postEntities(TimelineEntities entities, UserGroupInformation callerUgi) { // TODO implement if (LOG.isDebugEnabled()) { LOG.debug("postEntities(entities=" + entities + ", callerUgi=" + callerUgi + ")"); } - return null; } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b7c5538/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorServer.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorServer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorServer.java index 6371e82..ef30b22 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorServer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorServer.java @@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.timelineservice.aggregator; import java.nio.ByteBuffer; +import com.google.inject.Inject; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -39,9 +40,7 @@ import org.apache.hadoop.yarn.server.api.AuxiliaryService; import org.apache.hadoop.yarn.server.api.ContainerContext; import org.apache.hadoop.yarn.server.api.ContainerInitializationContext; import org.apache.hadoop.yarn.server.api.ContainerTerminationContext; -import org.apache.hadoop.yarn.webapp.WebApp; -import org.apache.hadoop.yarn.webapp.WebApps; -import org.apache.hadoop.yarn.webapp.YarnWebParams; +import org.apache.hadoop.yarn.webapp.*; import org.apache.hadoop.yarn.webapp.util.WebAppUtils; import com.google.common.annotations.VisibleForTesting; @@ -120,6 +119,8 @@ public class PerNodeAggregatorServer extends AuxiliaryService { extends WebApp implements YarnWebParams { @Override public void setup() { + bind(YarnJacksonJaxbJsonProvider.class); + bind(GenericExceptionHandler.class); bind(PerNodeAggregatorWebService.class); // bind to the global singleton bind(AppLevelServiceManager.class). @@ -214,7 +215,7 @@ public class PerNodeAggregatorServer extends AuxiliaryService { } @VisibleForTesting - static PerNodeAggregatorServer launchServer(String[] args) { + public static PerNodeAggregatorServer launchServer(String[] args) { Thread .setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler()); StringUtils.startupShutdownMessage(PerNodeAggregatorServer.class, args, http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b7c5538/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorWebService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorWebService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorWebService.java index 2d96699..28e6a52 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorWebService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorWebService.java @@ -20,12 +20,7 @@ package org.apache.hadoop.yarn.server.timelineservice.aggregator; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; -import javax.ws.rs.Consumes; -import javax.ws.rs.GET; -import javax.ws.rs.POST; -import javax.ws.rs.Path; -import javax.ws.rs.Produces; -import javax.ws.rs.WebApplicationException; +import javax.ws.rs.*; import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; @@ -40,14 +35,17 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; -import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; +import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.webapp.ForbiddenException; import org.apache.hadoop.yarn.webapp.NotFoundException; import com.google.inject.Inject; import com.google.inject.Singleton; +import java.net.URI; + /** * The main per-node REST end point for timeline service writes. It is * essentially a container service that routes requests to the appropriate @@ -112,11 +110,14 @@ public class PerNodeAggregatorWebService { * the request to the app level aggregator. It expects an application as a * context. */ - @POST + @PUT + @Path("/entities") @Consumes({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */}) - public TimelinePutResponse postEntities( + public Response putEntities( @Context HttpServletRequest req, @Context HttpServletResponse res, + @QueryParam("async") String async, + @QueryParam("appid") String appId, TimelineEntities entities) { init(res); UserGroupInformation callerUgi = getUser(req); @@ -127,13 +128,20 @@ public class PerNodeAggregatorWebService { } // TODO how to express async posts and handle them + boolean isAsync = async != null && async.trim().equalsIgnoreCase("true"); + try { - AppLevelAggregatorService service = getAggregatorService(req); + appId = parseApplicationId(appId); + if (appId == null) { + return Response.status(Response.Status.BAD_REQUEST).build(); + } + AppLevelAggregatorService service = serviceManager.getService(appId); if (service == null) { LOG.error("Application not found"); throw new NotFoundException(); // different exception? } - return service.postEntities(entities, callerUgi); + service.postEntities(entities, callerUgi); + return Response.ok().build(); } catch (Exception e) { LOG.error("Error putting entities", e); throw new WebApplicationException(e, @@ -141,16 +149,18 @@ public class PerNodeAggregatorWebService { } } - private AppLevelAggregatorService - getAggregatorService(HttpServletRequest req) { - String appIdString = getApplicationId(req); - return serviceManager.getService(appIdString); - } - - private String getApplicationId(HttpServletRequest req) { - // TODO the application id from the request - // (most likely from the URI) - return null; + private String parseApplicationId(String appId) { + // Make sure the appId is not null and is valid + ApplicationId appID; + try { + if (appId != null) { + return ConverterUtils.toApplicationId(appId.trim()).toString(); + } else { + return null; + } + } catch (Exception e) { + return null; + } } private void init(HttpServletResponse response) {
