This is an automated email from the ASF dual-hosted git repository. slfan1989 pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push: new 4baa167f0a0 YARN-11786. Upgrade hadoop-yarn-server-timelineservice-hbase-tests to Support Trunk Compilation and Remove compatible hadoop version. (#7453) 4baa167f0a0 is described below commit 4baa167f0a0b9f963f6d191ba69230a3eceffc7c Author: slfan1989 <55643692+slfan1...@users.noreply.github.com> AuthorDate: Mon Mar 10 09:22:35 2025 +0800 YARN-11786. Upgrade hadoop-yarn-server-timelineservice-hbase-tests to Support Trunk Compilation and Remove compatible hadoop version. (#7453) * Upgrade hadoop-yarn-server-timelineservice-hbase-tests to Support Trunk Compilation and Remove compatible hadoop version. Co-authored-by: Chris Nauroth <cnaur...@apache.org> Co-authored-by: Hualong Zhang <hualon...@hotmail.com> Reviewed-by: Chris Nauroth <cnaur...@apache.org> Reviewed-by: Hualong Zhang <hualon...@hotmail.com> Signed-off-by: Shilun Fan <slfan1...@apache.org> --- BUILDING.txt | 2 +- hadoop-project/pom.xml | 2 +- .../api/records/timelineservice/FlowRunEntity.java | 4 +- .../writer/TimelineEntitySetWriter.java | 4 +- .../pom.xml | 37 +- .../AbstractTimelineReaderHBaseTestBase.java | 81 ++-- .../reader/FlowActivityEntityListReader.java | 123 ++++++ .../reader/FlowActivityEntityReader.java} | 44 +- .../reader/FlowActivityEntitySetReader.java | 124 ++++++ .../reader/FlowRunEntityReader.java} | 43 +- .../reader/FlowRunEntitySetReader.java} | 48 ++- .../TestTimelineReaderWebServicesHBaseStorage.java | 454 ++++++++++----------- .../reader/TimelineEntityListReader.java | 122 ++++++ .../reader/TimelineEntitySetReader.java | 125 ++++++ .../storage/flow/TestHBaseStorageFlowRun.java | 2 +- .../storage/HBaseTimelineWriterImpl.java | 8 +- .../hadoop-yarn/hadoop-yarn-server/pom.xml | 1 - 17 files changed, 859 insertions(+), 365 deletions(-) diff --git a/BUILDING.txt b/BUILDING.txt index 191df097b21..71a7b8332eb 100644 --- a/BUILDING.txt +++ b/BUILDING.txt @@ -163,7 +163,7 @@ Maven build goals: YARN Application Timeline Service V2 build options: YARN Timeline Service v.2 chooses Apache HBase as the primary backing storage. The supported - version of Apache HBase is 2.5.8. + version of Apache HBase is 2.6.1. Snappy build options: diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml index 5c2b226d61c..1c046529a0e 100644 --- a/hadoop-project/pom.xml +++ b/hadoop-project/pom.xml @@ -222,7 +222,7 @@ <swagger-annotations-version>1.5.4</swagger-annotations-version> <snakeyaml.version>2.0</snakeyaml.version> <sshd.version>2.11.0</sshd.version> - <hbase.version>2.5.8-hadoop3</hbase.version> + <hbase.version>2.6.1-hadoop3</hbase.version> <junit.version>4.13.2</junit.version> <junit.jupiter.version>5.8.2</junit.jupiter.version> <junit.vintage.version>5.8.2</junit.vintage.version> diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/FlowRunEntity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/FlowRunEntity.java index 513df166bd5..f968476269b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/FlowRunEntity.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/FlowRunEntity.java @@ -19,6 +19,7 @@ import javax.xml.bind.annotation.XmlElement; +import com.fasterxml.jackson.annotation.JsonInclude; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -27,6 +28,7 @@ */ @InterfaceAudience.Public @InterfaceStability.Unstable +@JsonInclude(JsonInclude.Include.NON_NULL) public class FlowRunEntity extends HierarchicalTimelineEntity { public static final String USER_INFO_KEY = TimelineEntity.SYSTEM_INFO_KEY_PREFIX + "USER"; @@ -107,7 +109,7 @@ public void setRunId(long runId) { addInfo(FLOW_RUN_ID_INFO_KEY, runId); } - public long getStartTime() { + public Long getStartTime() { return getCreatedTime(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/writer/TimelineEntitySetWriter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/writer/TimelineEntitySetWriter.java index 3d5978d5714..81269c23e01 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/writer/TimelineEntitySetWriter.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/writer/TimelineEntitySetWriter.java @@ -43,11 +43,13 @@ public class TimelineEntitySetWriter implements MessageBodyWriter<Set<TimelineEntity>> { private ObjectMapper objectMapper = new ObjectMapper(); + private String timelineEntityType = + "java.util.Set<org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity>"; @Override public boolean isWriteable(Class<?> type, Type genericType, Annotation[] annotations, MediaType mediaType) { - return true; + return timelineEntityType.equals(genericType.getTypeName()); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/pom.xml index 025bc3f139e..97cde1b819f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/pom.xml @@ -89,7 +89,6 @@ <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> - <version>${hbase-compatible-hadoop.version}</version> <scope>test</scope> <exclusions> <exclusion> @@ -104,10 +103,6 @@ <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs-client</artifactId> </exclusion> - <exclusion> - <groupId>com.sun.jersey</groupId> - <artifactId>jersey-json</artifactId> - </exclusion> </exclusions> </dependency> @@ -131,13 +126,22 @@ <artifactId>junit-platform-launcher</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.junit.vintage</groupId> + <artifactId>junit-vintage-engine</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.glassfish.jersey.core</groupId> + <artifactId>jersey-server</artifactId> + <scope>test</scope> + </dependency> <!-- 'mvn dependency:analyze' fails to detect use of this direct dependency --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-auth</artifactId> - <version>${hbase-compatible-hadoop.version}</version> <scope>test</scope> <exclusions> <exclusion> @@ -242,13 +246,6 @@ <scope>runtime</scope> </dependency> - <dependency> - <groupId>com.sun.jersey</groupId> - <artifactId>jersey-client</artifactId> - <version>1.19.4</version> - <scope>test</scope> - </dependency> - <dependency> <groupId>javax.ws.rs</groupId> <artifactId>jsr311-api</artifactId> @@ -369,7 +366,6 @@ <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> - <version>${hbase-compatible-hadoop.version}</version> <type>test-jar</type> <scope>test</scope> <exclusions> @@ -377,10 +373,6 @@ <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-auth</artifactId> </exclusion> - <exclusion> - <groupId>com.sun.jersey</groupId> - <artifactId>jersey-json</artifactId> - </exclusion> <exclusion> <groupId>commons-logging</groupId> <artifactId>commons-logging</artifactId> @@ -393,7 +385,6 @@ <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> - <version>${hbase-compatible-hadoop.version}</version> <scope>test</scope> <exclusions> <exclusion> @@ -412,7 +403,6 @@ <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> - <version>${hbase-compatible-hadoop.version}</version> <type>test-jar</type> <scope>test</scope> <exclusions> @@ -430,7 +420,6 @@ <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs-client</artifactId> - <version>${hbase-compatible-hadoop.version}</version> <scope>test</scope> </dependency> @@ -507,6 +496,12 @@ <artifactId>mockito-inline</artifactId> <scope>test</scope> </dependency> + + <dependency> + <groupId>org.glassfish.jersey.media</groupId> + <artifactId>jersey-media-json-jettison</artifactId> + <scope>test</scope> + </dependency> </dependencies> <build> diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/AbstractTimelineReaderHBaseTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/AbstractTimelineReaderHBaseTestBase.java index 471fb6c36f3..dbe9546d0f4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/AbstractTimelineReaderHBaseTestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/AbstractTimelineReaderHBaseTestBase.java @@ -22,32 +22,29 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import javax.ws.rs.client.Client; +import javax.ws.rs.client.ClientBuilder; + import java.io.IOException; -import java.lang.reflect.UndeclaredThrowableException; import java.net.HttpURLConnection; import java.net.URI; -import java.net.URL; import java.util.List; +import javax.ws.rs.core.GenericType; import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.reader.TimelineEntityReader; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.timelineservice.storage.DataGeneratorForTest; -import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider; +import org.glassfish.jersey.client.ClientConfig; +import org.glassfish.jersey.client.HttpUrlConnectorProvider; import org.junit.Assert; -import com.sun.jersey.api.client.Client; -import com.sun.jersey.api.client.ClientResponse; -import com.sun.jersey.api.client.ClientResponse.Status; -import com.sun.jersey.api.client.GenericType; -import com.sun.jersey.api.client.config.ClientConfig; -import com.sun.jersey.api.client.config.DefaultClientConfig; -import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory; -import com.sun.jersey.client.urlconnection.URLConnectionClientHandler; - /** * Test Base for TimelineReaderServer HBase tests. */ @@ -109,19 +106,25 @@ protected void addFilters(Configuration conf) { } protected Client createClient() { - ClientConfig cfg = new DefaultClientConfig(); - cfg.getClasses().add(YarnJacksonJaxbJsonProvider.class); - return new Client( - new URLConnectionClientHandler(new DummyURLConnectionFactory()), cfg); + final ClientConfig cc = new ClientConfig(); + cc.connectorProvider(getHttpURLConnectionFactory()); + return ClientBuilder.newClient(cc) + .register(TimelineEntityReader.class) + .register(TimelineEntitySetReader.class) + .register(TimelineEntityListReader.class) + .register(FlowActivityEntityReader.class) + .register(FlowRunEntityReader.class) + .register(FlowActivityEntitySetReader.class) + .register(FlowActivityEntityListReader.class) + .register(FlowRunEntitySetReader.class); } - protected ClientResponse getResponse(Client client, URI uri) + protected Response getResponse(Client client, URI uri) throws Exception { - ClientResponse resp = - client.resource(uri).accept(MediaType.APPLICATION_JSON) - .type(MediaType.APPLICATION_JSON).get(ClientResponse.class); + Response resp = + client.target(uri).request(MediaType.APPLICATION_JSON).get(); if (resp == null || resp.getStatusInfo() - .getStatusCode() != ClientResponse.Status.OK.getStatusCode()) { + .getStatusCode() != HttpURLConnection.HTTP_OK) { String msg = ""; if (resp != null) { msg = String.valueOf(resp.getStatusInfo().getStatusCode()); @@ -132,39 +135,37 @@ protected ClientResponse getResponse(Client client, URI uri) return resp; } - protected void verifyHttpResponse(Client client, URI uri, Status status) { - ClientResponse resp = - client.resource(uri).accept(MediaType.APPLICATION_JSON) - .type(MediaType.APPLICATION_JSON).get(ClientResponse.class); + protected void verifyHttpResponse(Client client, URI uri, Response.Status status) { + Response resp = client.target(uri).request(MediaType.APPLICATION_JSON).get(); assertNotNull(resp); assertTrue("Response from server should have been " + status, resp.getStatusInfo().getStatusCode() == status.getStatusCode()); - System.out.println("Response is: " + resp.getEntity(String.class)); + System.out.println("Response is: " + resp.readEntity(String.class)); } protected List<FlowActivityEntity> verifyFlowEntites(Client client, URI uri, int noOfEntities) throws Exception { - ClientResponse resp = getResponse(client, uri); + Response resp = getResponse(client, uri); List<FlowActivityEntity> entities = - resp.getEntity(new GenericType<List<FlowActivityEntity>>() { + resp.readEntity(new GenericType<List<FlowActivityEntity>>() { }); assertNotNull(entities); assertEquals(noOfEntities, entities.size()); return entities; } - protected static class DummyURLConnectionFactory - implements HttpURLConnectionFactory { - - @Override - public HttpURLConnection getHttpURLConnection(final URL url) - throws IOException { - try { - return (HttpURLConnection) url.openConnection(); - } catch (UndeclaredThrowableException e) { - throw new IOException(e.getCause()); - } - } + @VisibleForTesting + protected HttpUrlConnectorProvider getHttpURLConnectionFactory() { + return new HttpUrlConnectorProvider().connectionFactory( + url -> { + HttpURLConnection conn; + try { + conn = (HttpURLConnection) url.openConnection(); + } catch (Exception e) { + throw new IOException(e); + } + return conn; + }); } protected static HBaseTestingUtility getHBaseTestingUtility() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/FlowActivityEntityListReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/FlowActivityEntityListReader.java new file mode 100644 index 00000000000..db6eb8cbefe --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/FlowActivityEntityListReader.java @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.reader; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; + +import javax.ws.rs.Consumes; +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.MultivaluedMap; +import javax.ws.rs.ext.MessageBodyReader; +import javax.ws.rs.ext.Provider; +import java.io.IOException; +import java.io.InputStream; +import java.lang.annotation.Annotation; +import java.lang.reflect.Type; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * We have defined a dedicated Reader for `List<FlowActivityEntity>`, + * aimed at adapting to the Jersey2 framework + * to ensure that JSON can be converted into `List<FlowActivityEntity>`. + */ +@Provider +@Consumes(MediaType.APPLICATION_JSON) +public class FlowActivityEntityListReader implements MessageBodyReader<List<FlowActivityEntity>> { + + private ObjectMapper objectMapper = new ObjectMapper(); + private String timelineEntityType = + "java.util.List<org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity>"; + + @Override + public boolean isReadable(Class<?> type, Type genericType, + Annotation[] annotations, MediaType mediaType) { + return timelineEntityType.equals(genericType.getTypeName()); + } + + @Override + public List<FlowActivityEntity> readFrom(Class<List<FlowActivityEntity>> type, + Type genericType, Annotation[] annotations, MediaType mediaType, + MultivaluedMap<String, String> httpHeaders, + InputStream entityStream) throws IOException, WebApplicationException { + List<FlowActivityEntity> flowActivityEntityList = new ArrayList<>(); + + JsonNode jsonNode = objectMapper.readTree(entityStream); + if (jsonNode.isArray()) { + for (JsonNode jNode : jsonNode) { + FlowActivityEntity entity = new FlowActivityEntity(); + + // Get Identifier + JsonNode jnIdentifier = jNode.get("identifier"); + JsonNode jnType = jnIdentifier.get("type"); + JsonNode jnId = jnIdentifier.get("id"); + TimelineEntity.Identifier identifier = + new TimelineEntity.Identifier(jnType.asText(), jnId.asText()); + entity.setIdentifier(identifier); + + // Get Type + JsonNode jnAppType = jNode.get("type"); + entity.setType(jnAppType.asText()); + + // Get Createdtime + JsonNode jnCreatedTime = jNode.get("createdtime"); + entity.setCreatedTime(jnCreatedTime.asLong()); + + // Get configs + JsonNode jnConfigs = jNode.get("configs"); + if (jnConfigs != null) { + Map<String, String> configInfos = + objectMapper.treeToValue(jnConfigs, Map.class); + entity.setConfigs(configInfos); + } + + // Get info + JsonNode jnInfos = jNode.get("info"); + if (jnInfos != null) { + Map<String, Object> entityInfos = + objectMapper.treeToValue(jnInfos, Map.class); + entity.setInfo(entityInfos); + } + + // Get BasicInfo + entity.setDate(jNode.get("date").asLong()); + entity.setCluster(jNode.get("cluster").asText()); + entity.setUser(jNode.get("user").asText()); + entity.setFlowName(jNode.get("flowName").asText()); + + // Get flowRuns + JsonNode jnflowRuns = jNode.get("flowRuns"); + if (jnflowRuns != null) { + for (JsonNode jnflow : jnflowRuns) { + FlowRunEntity flowRunEntity = objectMapper.treeToValue(jnflow, FlowRunEntity.class); + entity.addFlowRun(flowRunEntity); + } + } + flowActivityEntityList.add(entity); + } + } + + return flowActivityEntityList; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/writer/TimelineEntitySetWriter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/FlowActivityEntityReader.java similarity index 52% copy from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/writer/TimelineEntitySetWriter.java copy to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/FlowActivityEntityReader.java index 3d5978d5714..c85f701c875 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/writer/TimelineEntitySetWriter.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/FlowActivityEntityReader.java @@ -15,53 +15,49 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.yarn.api.records.timelineservice.writer; +package org.apache.hadoop.yarn.server.timelineservice.reader; import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity; import javax.ws.rs.Consumes; import javax.ws.rs.WebApplicationException; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.MultivaluedMap; -import javax.ws.rs.ext.MessageBodyWriter; +import javax.ws.rs.ext.MessageBodyReader; import javax.ws.rs.ext.Provider; import java.io.IOException; -import java.io.OutputStream; +import java.io.InputStream; import java.lang.annotation.Annotation; import java.lang.reflect.Type; -import java.nio.charset.StandardCharsets; -import java.util.Set; /** - * We have defined a dedicated Writer for TimelineEntity, - * aimed at adapting to the Jersey2 framework to ensure - * that TimelineEntity can be converted into JSON format. + * We have defined a dedicated Reader for FlowActivityEntity, + * aimed at adapting to the Jersey2 framework + * to ensure that JSON can be converted into FlowActivityEntity. */ @Provider @Consumes(MediaType.APPLICATION_JSON) -public class TimelineEntitySetWriter implements MessageBodyWriter<Set<TimelineEntity>> { +public class FlowActivityEntityReader implements MessageBodyReader<FlowActivityEntity> { private ObjectMapper objectMapper = new ObjectMapper(); @Override - public boolean isWriteable(Class<?> type, Type genericType, + public boolean isReadable(Class<?> type, Type genericType, Annotation[] annotations, MediaType mediaType) { - return true; + return type == FlowActivityEntity.class; } @Override - public void writeTo(Set<TimelineEntity> timelinePutResponse, Class<?> type, - Type genericType, Annotation[] annotations, MediaType mediaType, - MultivaluedMap<String, Object> httpHeaders, OutputStream entityStream) - throws IOException, WebApplicationException { - String entity = objectMapper.writeValueAsString(timelinePutResponse); - entityStream.write(entity.getBytes(StandardCharsets.UTF_8)); - } - - @Override - public long getSize(Set<TimelineEntity> timelineEntities, Class<?> type, Type genericType, - Annotation[] annotations, MediaType mediaType) { - return -1L; + public FlowActivityEntity readFrom(Class<FlowActivityEntity> type, Type genericType, + Annotation[] annotations, MediaType mediaType, MultivaluedMap<String, String> httpHeaders, + InputStream entityStream) throws IOException, WebApplicationException { + try { + FlowActivityEntity timelineEntity = + objectMapper.readValue(entityStream, FlowActivityEntity.class); + return timelineEntity; + } catch (Exception e) { + return new FlowActivityEntity(); + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/FlowActivityEntitySetReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/FlowActivityEntitySetReader.java new file mode 100644 index 00000000000..06a96da3fca --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/FlowActivityEntitySetReader.java @@ -0,0 +1,124 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.reader; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; + +import javax.ws.rs.Consumes; +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.MultivaluedMap; +import javax.ws.rs.ext.MessageBodyReader; +import javax.ws.rs.ext.Provider; +import java.io.IOException; +import java.io.InputStream; +import java.lang.annotation.Annotation; +import java.lang.reflect.Type; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +/** + * We have defined a dedicated Reader for `Set<FlowActivityEntity>`, + * aimed at adapting to the Jersey2 framework + * to ensure that JSON can be converted into `Set<FlowActivityEntity>`. + */ +@Provider +@Consumes(MediaType.APPLICATION_JSON) +public class FlowActivityEntitySetReader implements MessageBodyReader<Set<FlowActivityEntity>> { + + private ObjectMapper objectMapper = new ObjectMapper(); + private String timelineEntityType = + "java.util.Set<org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity>"; + + @Override + public boolean isReadable(Class<?> type, Type genericType, + Annotation[] annotations, MediaType mediaType) { + return timelineEntityType.equals(genericType.getTypeName()); + } + + @Override + public Set<FlowActivityEntity> readFrom(Class<Set<FlowActivityEntity>> type, + Type genericType, Annotation[] annotations, MediaType mediaType, + MultivaluedMap<String, String> httpHeaders, + InputStream entityStream) throws IOException, WebApplicationException { + Set<FlowActivityEntity> flowActivityEntitySet = new HashSet<>(); + + JsonNode jsonNode = objectMapper.readTree(entityStream); + if (jsonNode.isArray()) { + for (JsonNode jNode : jsonNode) { + FlowActivityEntity entity = new FlowActivityEntity(); + + // Get Identifier + JsonNode jnIdentifier = jNode.get("identifier"); + JsonNode jnType = jnIdentifier.get("type"); + JsonNode jnId = jnIdentifier.get("id"); + TimelineEntity.Identifier identifier = + new TimelineEntity.Identifier(jnType.asText(), jnId.asText()); + entity.setIdentifier(identifier); + + // Get Type + JsonNode jnAppType = jNode.get("type"); + entity.setType(jnAppType.asText()); + + // Get Createdtime + JsonNode jnCreatedTime = jNode.get("createdtime"); + entity.setCreatedTime(jnCreatedTime.asLong()); + + // Get configs + JsonNode jnConfigs = jNode.get("configs"); + if (jnConfigs != null) { + Map<String, String> configInfos = + objectMapper.treeToValue(jnConfigs, Map.class); + entity.setConfigs(configInfos); + } + + // Get info + JsonNode jnInfos = jNode.get("info"); + if (jnInfos != null) { + Map<String, Object> entityInfos = + objectMapper.treeToValue(jnInfos, Map.class); + entity.setInfo(entityInfos); + } + + // Get BasicInfo + entity.setDate(jNode.get("date").asLong()); + entity.setCluster(jNode.get("cluster").asText()); + entity.setUser(jNode.get("user").asText()); + entity.setFlowName(jNode.get("flowName").asText()); + + // Get flowRuns + JsonNode jnflowRuns = jNode.get("flowRuns"); + if (jnflowRuns != null) { + for (JsonNode jnflow : jnflowRuns) { + FlowRunEntity flowRunEntity = objectMapper.treeToValue(jnflow, FlowRunEntity.class); + entity.addFlowRun(flowRunEntity); + } + } + + flowActivityEntitySet.add(entity); + } + } + + return flowActivityEntitySet; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/writer/TimelineEntitySetWriter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/FlowRunEntityReader.java similarity index 52% copy from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/writer/TimelineEntitySetWriter.java copy to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/FlowRunEntityReader.java index 3d5978d5714..87453fd6689 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/writer/TimelineEntitySetWriter.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/FlowRunEntityReader.java @@ -15,53 +15,48 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.yarn.api.records.timelineservice.writer; +package org.apache.hadoop.yarn.server.timelineservice.reader; import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity; import javax.ws.rs.Consumes; import javax.ws.rs.WebApplicationException; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.MultivaluedMap; -import javax.ws.rs.ext.MessageBodyWriter; +import javax.ws.rs.ext.MessageBodyReader; import javax.ws.rs.ext.Provider; import java.io.IOException; -import java.io.OutputStream; +import java.io.InputStream; import java.lang.annotation.Annotation; import java.lang.reflect.Type; -import java.nio.charset.StandardCharsets; -import java.util.Set; /** - * We have defined a dedicated Writer for TimelineEntity, - * aimed at adapting to the Jersey2 framework to ensure - * that TimelineEntity can be converted into JSON format. + * We have defined a dedicated Reader for FlowRunEntity, + * aimed at adapting to the Jersey2 framework + * to ensure that JSON can be converted into FlowRunEntity. */ @Provider @Consumes(MediaType.APPLICATION_JSON) -public class TimelineEntitySetWriter implements MessageBodyWriter<Set<TimelineEntity>> { +public class FlowRunEntityReader implements MessageBodyReader<FlowRunEntity> { private ObjectMapper objectMapper = new ObjectMapper(); @Override - public boolean isWriteable(Class<?> type, Type genericType, + public boolean isReadable(Class<?> type, Type genericType, Annotation[] annotations, MediaType mediaType) { - return true; + return type == FlowRunEntity.class; } @Override - public void writeTo(Set<TimelineEntity> timelinePutResponse, Class<?> type, - Type genericType, Annotation[] annotations, MediaType mediaType, - MultivaluedMap<String, Object> httpHeaders, OutputStream entityStream) - throws IOException, WebApplicationException { - String entity = objectMapper.writeValueAsString(timelinePutResponse); - entityStream.write(entity.getBytes(StandardCharsets.UTF_8)); - } - - @Override - public long getSize(Set<TimelineEntity> timelineEntities, Class<?> type, Type genericType, - Annotation[] annotations, MediaType mediaType) { - return -1L; + public FlowRunEntity readFrom(Class<FlowRunEntity> type, Type genericType, + Annotation[] annotations, MediaType mediaType, MultivaluedMap<String, String> httpHeaders, + InputStream entityStream) throws IOException, WebApplicationException { + try { + FlowRunEntity timelineEntity = objectMapper.readValue(entityStream, FlowRunEntity.class); + return timelineEntity; + } catch (Exception e) { + return new FlowRunEntity(); + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/writer/TimelineEntitySetWriter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/FlowRunEntitySetReader.java similarity index 51% copy from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/writer/TimelineEntitySetWriter.java copy to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/FlowRunEntitySetReader.java index 3d5978d5714..4598a275ab8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/writer/TimelineEntitySetWriter.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/FlowRunEntitySetReader.java @@ -15,53 +15,59 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.yarn.api.records.timelineservice.writer; +package org.apache.hadoop.yarn.server.timelineservice.reader; +import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity; import javax.ws.rs.Consumes; import javax.ws.rs.WebApplicationException; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.MultivaluedMap; -import javax.ws.rs.ext.MessageBodyWriter; +import javax.ws.rs.ext.MessageBodyReader; import javax.ws.rs.ext.Provider; import java.io.IOException; -import java.io.OutputStream; +import java.io.InputStream; import java.lang.annotation.Annotation; import java.lang.reflect.Type; -import java.nio.charset.StandardCharsets; +import java.util.HashSet; import java.util.Set; /** - * We have defined a dedicated Writer for TimelineEntity, - * aimed at adapting to the Jersey2 framework to ensure - * that TimelineEntity can be converted into JSON format. + * We have defined a dedicated Reader for `Set<FlowActivityEntity>`, + * aimed at adapting to the Jersey2 framework + * to ensure that JSON can be converted into `Set<FlowActivityEntity>`. */ @Provider @Consumes(MediaType.APPLICATION_JSON) -public class TimelineEntitySetWriter implements MessageBodyWriter<Set<TimelineEntity>> { +public class FlowRunEntitySetReader implements MessageBodyReader<Set<FlowRunEntity>> { private ObjectMapper objectMapper = new ObjectMapper(); + private String timelineEntityType = + "java.util.Set<org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity>"; @Override - public boolean isWriteable(Class<?> type, Type genericType, + public boolean isReadable(Class<?> type, Type genericType, Annotation[] annotations, MediaType mediaType) { - return true; + return timelineEntityType.equals(genericType.getTypeName()); } @Override - public void writeTo(Set<TimelineEntity> timelinePutResponse, Class<?> type, + public Set<FlowRunEntity> readFrom(Class<Set<FlowRunEntity>> type, Type genericType, Annotation[] annotations, MediaType mediaType, - MultivaluedMap<String, Object> httpHeaders, OutputStream entityStream) - throws IOException, WebApplicationException { - String entity = objectMapper.writeValueAsString(timelinePutResponse); - entityStream.write(entity.getBytes(StandardCharsets.UTF_8)); - } + MultivaluedMap<String, String> httpHeaders, + InputStream entityStream) throws IOException, WebApplicationException { + Set<FlowRunEntity> flowRunEntitySet = new HashSet<>(); - @Override - public long getSize(Set<TimelineEntity> timelineEntities, Class<?> type, Type genericType, - Annotation[] annotations, MediaType mediaType) { - return -1L; + JsonNode jsonNode = objectMapper.readTree(entityStream); + if (jsonNode.isArray()) { + for (JsonNode jNode : jsonNode) { + FlowRunEntity flowRunEntity = objectMapper.treeToValue(jNode, FlowRunEntity.class); + flowRunEntitySet.add(flowRunEntity); + } + } + + return flowRunEntitySet; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java index 2a55e0ebe90..e54885369a7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java @@ -23,6 +23,8 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import javax.ws.rs.client.Client; + import java.io.IOException; import java.net.URI; import java.text.DateFormat; @@ -35,7 +37,9 @@ import java.util.Map; import java.util.Set; +import javax.ws.rs.core.GenericType; import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; @@ -60,10 +64,6 @@ import org.junit.Test; import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap; -import com.sun.jersey.api.client.Client; -import com.sun.jersey.api.client.ClientResponse; -import com.sun.jersey.api.client.ClientResponse.Status; -import com.sun.jersey.api.client.GenericType; /** * Test TimelineReder Web Service REST API's. @@ -452,10 +452,10 @@ public void testGetFlowRun() throws Exception { URI uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/clusters/cluster1/users/user1/flows/flow_name/runs/" + "1002345678919"); - ClientResponse resp = getResponse(client, uri); - FlowRunEntity entity = resp.getEntity(FlowRunEntity.class); - assertEquals(MediaType.APPLICATION_JSON_TYPE + "; charset=utf-8", - resp.getType().toString()); + Response resp = getResponse(client, uri); + FlowRunEntity entity = resp.readEntity(FlowRunEntity.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE + ";charset=utf-8", + resp.getMediaType().toString()); assertNotNull(entity); assertEquals("user1@flow_name/1002345678919", entity.getId()); assertEquals(3, entity.getMetrics().size()); @@ -473,7 +473,7 @@ public void testGetFlowRun() throws Exception { uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/users/user1/flows/flow_name/runs/1002345678919"); resp = getResponse(client, uri); - entity = resp.getEntity(FlowRunEntity.class); + entity = resp.readEntity(FlowRunEntity.class); assertNotNull(entity); assertEquals("user1@flow_name/1002345678919", entity.getId()); assertEquals(3, entity.getMetrics().size()); @@ -487,7 +487,7 @@ public void testGetFlowRun() throws Exception { assertTrue(verifyMetrics(metric, m1, m2, m3)); } } finally { - client.destroy(); + client.close(); } } @@ -497,11 +497,11 @@ public void testGetFlowRuns() throws Exception { try { URI uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/clusters/cluster1/users/user1/flows/flow_name/runs"); - ClientResponse resp = getResponse(client, uri); + Response resp = getResponse(client, uri); Set<FlowRunEntity> entities = - resp.getEntity(new GenericType<Set<FlowRunEntity>>(){}); - assertEquals(MediaType.APPLICATION_JSON_TYPE + "; charset=utf-8", - resp.getType().toString()); + resp.readEntity(new GenericType<Set<FlowRunEntity>>(){}); + assertEquals(MediaType.APPLICATION_JSON_TYPE + ";charset=utf-8", + resp.getMediaType().toString()); assertNotNull(entities); assertEquals(2, entities.size()); for (FlowRunEntity entity : entities) { @@ -519,9 +519,9 @@ public void testGetFlowRuns() throws Exception { URI.create("http://localhost:" + getServerPort() + "/ws/v2/timeline/" + "clusters/cluster1/users/user1/flows/flow_name/runs?limit=1"); resp = getResponse(client, uri); - entities = resp.getEntity(new GenericType<Set<FlowRunEntity>>(){}); - assertEquals(MediaType.APPLICATION_JSON_TYPE + "; charset=utf-8", - resp.getType().toString()); + entities = resp.readEntity(new GenericType<Set<FlowRunEntity>>(){}); + assertEquals(MediaType.APPLICATION_JSON_TYPE + ";charset=utf-8", + resp.getMediaType().toString()); assertNotNull(entities); assertEquals(1, entities.size()); for (FlowRunEntity entity : entities) { @@ -536,9 +536,9 @@ public void testGetFlowRuns() throws Exception { "timeline/clusters/cluster1/users/user1/flows/flow_name/runs?" + "createdtimestart=1425016501030"); resp = getResponse(client, uri); - entities = resp.getEntity(new GenericType<Set<FlowRunEntity>>(){}); - assertEquals(MediaType.APPLICATION_JSON_TYPE + "; charset=utf-8", - resp.getType().toString()); + entities = resp.readEntity(new GenericType<Set<FlowRunEntity>>(){}); + assertEquals(MediaType.APPLICATION_JSON_TYPE + ";charset=utf-8", + resp.getMediaType().toString()); assertNotNull(entities); assertEquals(1, entities.size()); for (FlowRunEntity entity : entities) { @@ -553,9 +553,9 @@ public void testGetFlowRuns() throws Exception { "timeline/clusters/cluster1/users/user1/flows/flow_name/runs?" + "createdtimestart=1425016500999&createdtimeend=1425016501035"); resp = getResponse(client, uri); - entities = resp.getEntity(new GenericType<Set<FlowRunEntity>>(){}); - assertEquals(MediaType.APPLICATION_JSON_TYPE + "; charset=utf-8", - resp.getType().toString()); + entities = resp.readEntity(new GenericType<Set<FlowRunEntity>>(){}); + assertEquals(MediaType.APPLICATION_JSON_TYPE + ";charset=utf-8", + resp.getMediaType().toString()); assertNotNull(entities); assertEquals(2, entities.size()); for (FlowRunEntity entity : entities) { @@ -573,9 +573,9 @@ public void testGetFlowRuns() throws Exception { "timeline/clusters/cluster1/users/user1/flows/flow_name/runs?" + "createdtimeend=1425016501030"); resp = getResponse(client, uri); - entities = resp.getEntity(new GenericType<Set<FlowRunEntity>>(){}); - assertEquals(MediaType.APPLICATION_JSON_TYPE + "; charset=utf-8", - resp.getType().toString()); + entities = resp.readEntity(new GenericType<Set<FlowRunEntity>>(){}); + assertEquals(MediaType.APPLICATION_JSON_TYPE + ";charset=utf-8", + resp.getMediaType().toString()); assertNotNull(entities); assertEquals(1, entities.size()); for (FlowRunEntity entity : entities) { @@ -590,9 +590,9 @@ public void testGetFlowRuns() throws Exception { "timeline/clusters/cluster1/users/user1/flows/flow_name/runs?" + "fields=metrics"); resp = getResponse(client, uri); - entities = resp.getEntity(new GenericType<Set<FlowRunEntity>>(){}); - assertEquals(MediaType.APPLICATION_JSON_TYPE + "; charset=utf-8", - resp.getType().toString()); + entities = resp.readEntity(new GenericType<Set<FlowRunEntity>>(){}); + assertEquals(MediaType.APPLICATION_JSON_TYPE + ";charset=utf-8", + resp.getMediaType().toString()); assertNotNull(entities); assertEquals(2, entities.size()); for (FlowRunEntity entity : entities) { @@ -612,9 +612,9 @@ public void testGetFlowRuns() throws Exception { uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/clusters/cluster1/users/user1/flows/flow_name/runs?" + "fields=CONFIGS"); - verifyHttpResponse(client, uri, Status.BAD_REQUEST); + verifyHttpResponse(client, uri, Response.Status.BAD_REQUEST); } finally { - client.destroy(); + client.close(); } } @@ -625,11 +625,11 @@ public void testGetFlowRunsMetricsToRetrieve() throws Exception { URI uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/clusters/cluster1/users/user1/flows/flow_name/runs?" + "metricstoretrieve=MAP_,HDFS_"); - ClientResponse resp = getResponse(client, uri); + Response resp = getResponse(client, uri); Set<FlowRunEntity> entities = - resp.getEntity(new GenericType<Set<FlowRunEntity>>(){}); - assertEquals(MediaType.APPLICATION_JSON_TYPE + "; charset=utf-8", - resp.getType().toString()); + resp.readEntity(new GenericType<Set<FlowRunEntity>>(){}); + assertEquals(MediaType.APPLICATION_JSON_TYPE + ";charset=utf-8", + resp.getMediaType().toString()); assertNotNull(entities); assertEquals(2, entities.size()); int metricCnt = 0; @@ -646,9 +646,9 @@ public void testGetFlowRunsMetricsToRetrieve() throws Exception { "timeline/clusters/cluster1/users/user1/flows/flow_name/runs?" + "metricstoretrieve=!(MAP_,HDFS_)"); resp = getResponse(client, uri); - entities = resp.getEntity(new GenericType<Set<FlowRunEntity>>(){}); - assertEquals(MediaType.APPLICATION_JSON_TYPE + "; charset=utf-8", - resp.getType().toString()); + entities = resp.readEntity(new GenericType<Set<FlowRunEntity>>(){}); + assertEquals(MediaType.APPLICATION_JSON_TYPE + ";charset=utf-8", + resp.getMediaType().toString()); assertNotNull(entities); assertEquals(2, entities.size()); metricCnt = 0; @@ -660,7 +660,7 @@ public void testGetFlowRunsMetricsToRetrieve() throws Exception { } assertEquals(1, metricCnt); } finally { - client.destroy(); + client.close(); } } @@ -670,10 +670,10 @@ public void testGetEntitiesByUID() throws Exception { try { // Query all flows. URI uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + - "timeline/flows"); - ClientResponse resp = getResponse(client, uri); + "timeline/flows/"); + Response resp = getResponse(client, uri); Set<FlowActivityEntity> flowEntities = - resp.getEntity(new GenericType<Set<FlowActivityEntity>>(){}); + resp.readEntity(new GenericType<Set<FlowActivityEntity>>(){}); assertNotNull(flowEntities); assertEquals(3, flowEntities.size()); List<String> listFlowUIDs = new ArrayList<String>(); @@ -699,7 +699,7 @@ public void testGetEntitiesByUID() throws Exception { "timeline/flow-uid/" + flowUID + "/runs"); resp = getResponse(client, uri); Set<FlowRunEntity> frEntities = - resp.getEntity(new GenericType<Set<FlowRunEntity>>(){}); + resp.readEntity(new GenericType<Set<FlowRunEntity>>(){}); assertNotNull(frEntities); for (FlowRunEntity entity : frEntities) { String flowRunUID = @@ -718,7 +718,7 @@ public void testGetEntitiesByUID() throws Exception { uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/run-uid/" + flowRunUID); resp = getResponse(client, uri); - FlowRunEntity entity = resp.getEntity(FlowRunEntity.class); + FlowRunEntity entity = resp.readEntity(FlowRunEntity.class); assertNotNull(entity); } @@ -731,7 +731,7 @@ public void testGetEntitiesByUID() throws Exception { "timeline/run-uid/" + flowRunUID + "/apps"); resp = getResponse(client, uri); Set<TimelineEntity> appEntities = - resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); + resp.readEntity(new GenericType<Set<TimelineEntity>>(){}); assertNotNull(appEntities); for (TimelineEntity entity : appEntities) { String appUID = @@ -750,7 +750,7 @@ public void testGetEntitiesByUID() throws Exception { uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/app-uid/" + appUID); resp = getResponse(client, uri); - TimelineEntity entity = resp.getEntity(TimelineEntity.class); + TimelineEntity entity = resp.readEntity(TimelineEntity.class); assertNotNull(entity); } @@ -764,7 +764,7 @@ public void testGetEntitiesByUID() throws Exception { "timeline/app-uid/" + appUID + "/entities/type1"); resp = getResponse(client, uri); Set<TimelineEntity> entities = - resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); + resp.readEntity(new GenericType<Set<TimelineEntity>>(){}); assertNotNull(entities); for (TimelineEntity entity : entities) { String entityUID = @@ -785,40 +785,40 @@ public void testGetEntitiesByUID() throws Exception { uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/entity-uid/" + entityUID); resp = getResponse(client, uri); - TimelineEntity entity = resp.getEntity(TimelineEntity.class); + TimelineEntity entity = resp.readEntity(TimelineEntity.class); assertNotNull(entity); } uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/flow-uid/dummy:flow/runs"); - verifyHttpResponse(client, uri, Status.BAD_REQUEST); + verifyHttpResponse(client, uri, Response.Status.BAD_REQUEST); uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/run-uid/dummy:flowrun"); - verifyHttpResponse(client, uri, Status.BAD_REQUEST); + verifyHttpResponse(client, uri, Response.Status.BAD_REQUEST); // Run Id is not a numerical value. uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/run-uid/some:dummy:flow:123v456"); - verifyHttpResponse(client, uri, Status.BAD_REQUEST); + verifyHttpResponse(client, uri, Response.Status.BAD_REQUEST); uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/run-uid/dummy:flowrun/apps"); - verifyHttpResponse(client, uri, Status.BAD_REQUEST); + verifyHttpResponse(client, uri, Response.Status.BAD_REQUEST); uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/app-uid/dummy:app"); - verifyHttpResponse(client, uri, Status.BAD_REQUEST); + verifyHttpResponse(client, uri, Response.Status.BAD_REQUEST); uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/app-uid/dummy:app/entities/type1"); - verifyHttpResponse(client, uri, Status.BAD_REQUEST); + verifyHttpResponse(client, uri, Response.Status.BAD_REQUEST); uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/entity-uid/dummy:entity"); - verifyHttpResponse(client, uri, Status.BAD_REQUEST); + verifyHttpResponse(client, uri, Response.Status.BAD_REQUEST); } finally { - client.destroy(); + client.close(); } } @@ -830,8 +830,8 @@ public void testUIDQueryWithAndWithoutFlowContextInfo() throws Exception { "cluster1!user1!flow_name!1002345678919!application_1111111111_1111"; URI uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/"+ "timeline/app-uid/" + appUIDWithFlowInfo); - ClientResponse resp = getResponse(client, uri); - TimelineEntity appEntity1 = resp.getEntity(TimelineEntity.class); + Response resp = getResponse(client, uri); + TimelineEntity appEntity1 = resp.readEntity(TimelineEntity.class); assertNotNull(appEntity1); assertEquals( TimelineEntityType.YARN_APPLICATION.toString(), appEntity1.getType()); @@ -842,7 +842,7 @@ public void testUIDQueryWithAndWithoutFlowContextInfo() throws Exception { + "app-uid/" + appUIDWithFlowInfo + "/entities/type1"); resp = getResponse(client, uri); Set<TimelineEntity> entities1 = - resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); + resp.readEntity(new GenericType<Set<TimelineEntity>>(){}); assertNotNull(entities1); assertEquals(2, entities1.size()); for (TimelineEntity entity : entities1) { @@ -859,7 +859,7 @@ public void testUIDQueryWithAndWithoutFlowContextInfo() throws Exception { uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/timeline/" + "app-uid/" + appUIDWithoutFlowInfo); resp = getResponse(client, uri); - TimelineEntity appEntity2 = resp.getEntity(TimelineEntity.class); + TimelineEntity appEntity2 = resp.readEntity(TimelineEntity.class); assertNotNull(appEntity2); assertEquals( TimelineEntityType.YARN_APPLICATION.toString(), appEntity2.getType()); @@ -870,7 +870,7 @@ public void testUIDQueryWithAndWithoutFlowContextInfo() throws Exception { + "app-uid/" + appUIDWithoutFlowInfo + "/entities/type1"); resp = getResponse(client, uri); Set<TimelineEntity> entities2 = - resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); + resp.readEntity(new GenericType<Set<TimelineEntity>>(){}); assertNotNull(entities2); assertEquals(2, entities2.size()); for (TimelineEntity entity : entities2) { @@ -887,7 +887,7 @@ public void testUIDQueryWithAndWithoutFlowContextInfo() throws Exception { uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/timeline/" + "entity-uid/" + entityUIDWithFlowInfo); resp = getResponse(client, uri); - TimelineEntity singleEntity1 = resp.getEntity(TimelineEntity.class); + TimelineEntity singleEntity1 = resp.readEntity(TimelineEntity.class); assertNotNull(singleEntity1); assertEquals("type1", singleEntity1.getType()); assertEquals("entity1", singleEntity1.getId()); @@ -897,12 +897,12 @@ public void testUIDQueryWithAndWithoutFlowContextInfo() throws Exception { uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/timeline/" + "entity-uid/" + entityUIDWithoutFlowInfo); resp = getResponse(client, uri); - TimelineEntity singleEntity2 = resp.getEntity(TimelineEntity.class); + TimelineEntity singleEntity2 = resp.readEntity(TimelineEntity.class); assertNotNull(singleEntity2); assertEquals("type1", singleEntity2.getType()); assertEquals("entity1", singleEntity2.getId()); } finally { - client.destroy(); + client.close(); } } @@ -914,9 +914,9 @@ public void testUIDNotProperlyEscaped() throws Exception { "cluster1!user*1!flow_name!1002345678919!application_1111111111_1111"; URI uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/"+ "timeline/app-uid/" + appUID); - verifyHttpResponse(client, uri, Status.BAD_REQUEST); + verifyHttpResponse(client, uri, Response.Status.BAD_REQUEST); } finally { - client.destroy(); + client.close(); } } @@ -971,21 +971,21 @@ public void testGetFlows() throws Exception { uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/clusters/cluster1/flows?daterange=20150711:20150714"); - verifyHttpResponse(client, uri, Status.BAD_REQUEST); + verifyHttpResponse(client, uri, Response.Status.BAD_REQUEST); uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/clusters/cluster1/flows?daterange=20150714-20150711"); - verifyHttpResponse(client, uri, Status.BAD_REQUEST); + verifyHttpResponse(client, uri, Response.Status.BAD_REQUEST); uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/clusters/cluster1/flows?daterange=2015071129-20150712"); - verifyHttpResponse(client, uri, Status.BAD_REQUEST); + verifyHttpResponse(client, uri, Response.Status.BAD_REQUEST); uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/clusters/cluster1/flows?daterange=20150711-2015071243"); - verifyHttpResponse(client, uri, Status.BAD_REQUEST); + verifyHttpResponse(client, uri, Response.Status.BAD_REQUEST); } finally { - client.destroy(); + client.close(); } } @@ -1022,7 +1022,7 @@ public void testGetFlowsForPagination() throws Exception { flowEntites = verifyFlowEntites(client, uri, 1); assertEquals(fEntity3, flowEntites.get(0)); } finally { - client.destroy(); + client.close(); } } @@ -1033,8 +1033,8 @@ public void testGetApp() throws Exception { URI uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/clusters/cluster1/apps/application_1111111111_1111?" + "userid=user1&fields=ALL&flowname=flow_name&flowrunid=1002345678919"); - ClientResponse resp = getResponse(client, uri); - TimelineEntity entity = resp.getEntity(TimelineEntity.class); + Response resp = getResponse(client, uri); + TimelineEntity entity = resp.readEntity(TimelineEntity.class); assertNotNull(entity); assertEquals("application_1111111111_1111", entity.getId()); assertEquals(3, entity.getMetrics().size()); @@ -1052,7 +1052,7 @@ public void testGetApp() throws Exception { "timeline/apps/application_1111111111_2222?userid=user1" + "&fields=metrics&flowname=flow_name&flowrunid=1002345678919"); resp = getResponse(client, uri); - entity = resp.getEntity(TimelineEntity.class); + entity = resp.readEntity(TimelineEntity.class); assertNotNull(entity); assertEquals("application_1111111111_2222", entity.getId()); assertEquals(1, entity.getMetrics().size()); @@ -1062,7 +1062,7 @@ public void testGetApp() throws Exception { assertTrue(verifyMetrics(metric, m4)); } } finally { - client.destroy(); + client.close(); } } @@ -1073,8 +1073,8 @@ public void testGetAppWithoutFlowInfo() throws Exception { URI uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/clusters/cluster1/apps/application_1111111111_1111?" + "fields=ALL"); - ClientResponse resp = getResponse(client, uri); - TimelineEntity entity = resp.getEntity(TimelineEntity.class); + Response resp = getResponse(client, uri); + TimelineEntity entity = resp.readEntity(TimelineEntity.class); assertNotNull(entity); assertEquals("application_1111111111_1111", entity.getId()); assertEquals(1, entity.getConfigs().size()); @@ -1093,7 +1093,7 @@ public void testGetAppWithoutFlowInfo() throws Exception { "timeline/clusters/cluster1/apps/application_1111111111_1111?" + "fields=ALL&metricslimit=10"); resp = getResponse(client, uri); - entity = resp.getEntity(TimelineEntity.class); + entity = resp.readEntity(TimelineEntity.class); assertNotNull(entity); assertEquals("application_1111111111_1111", entity.getId()); assertEquals(1, entity.getConfigs().size()); @@ -1111,7 +1111,7 @@ public void testGetAppWithoutFlowInfo() throws Exception { assertTrue(verifyMetrics(metric, m1, m2, m3)); } } finally { - client.destroy(); + client.close(); } } @@ -1122,13 +1122,13 @@ public void testGetEntityWithoutFlowInfo() throws Exception { URI uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/clusters/cluster1/apps/application_1111111111_1111/" + "entities/type1/entity1"); - ClientResponse resp = getResponse(client, uri); - TimelineEntity entity = resp.getEntity(TimelineEntity.class); + Response resp = getResponse(client, uri); + TimelineEntity entity = resp.readEntity(TimelineEntity.class); assertNotNull(entity); assertEquals("entity1", entity.getId()); assertEquals("type1", entity.getType()); } finally { - client.destroy(); + client.close(); } } @@ -1139,9 +1139,9 @@ public void testGetEntitiesWithoutFlowInfo() throws Exception { URI uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/clusters/cluster1/apps/application_1111111111_1111/" + "entities/type1"); - ClientResponse resp = getResponse(client, uri); + Response resp = getResponse(client, uri); Set<TimelineEntity> entities = - resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); + resp.readEntity(new GenericType<Set<TimelineEntity>>(){}); assertNotNull(entities); assertEquals(2, entities.size()); for (TimelineEntity entity : entities) { @@ -1149,7 +1149,7 @@ public void testGetEntitiesWithoutFlowInfo() throws Exception { entity.getId().equals("entity2")); } } finally { - client.destroy(); + client.close(); } } @@ -1163,9 +1163,9 @@ public void testGetEntitiesDataToRetrieve() throws Exception { URI uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/clusters/cluster1/apps/application_1111111111_1111/" + "entities/type1?confstoretrieve=cfg_"); - ClientResponse resp = getResponse(client, uri); + Response resp = getResponse(client, uri); Set<TimelineEntity> entities = - resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); + resp.readEntity(new GenericType<Set<TimelineEntity>>(){}); assertNotNull(entities); assertEquals(2, entities.size()); int cfgCnt = 0; @@ -1181,7 +1181,7 @@ public void testGetEntitiesDataToRetrieve() throws Exception { "timeline/clusters/cluster1/apps/application_1111111111_1111/" + "entities/type1?confstoretrieve=cfg_,config_"); resp = getResponse(client, uri); - entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); + entities = resp.readEntity(new GenericType<Set<TimelineEntity>>(){}); assertNotNull(entities); assertEquals(2, entities.size()); cfgCnt = 0; @@ -1198,7 +1198,7 @@ public void testGetEntitiesDataToRetrieve() throws Exception { "timeline/clusters/cluster1/apps/application_1111111111_1111/" + "entities/type1?confstoretrieve=!(cfg_,config_)"); resp = getResponse(client, uri); - entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); + entities = resp.readEntity(new GenericType<Set<TimelineEntity>>(){}); assertNotNull(entities); assertEquals(2, entities.size()); cfgCnt = 0; @@ -1214,7 +1214,7 @@ public void testGetEntitiesDataToRetrieve() throws Exception { "timeline/clusters/cluster1/apps/application_1111111111_1111/" + "entities/type1?metricstoretrieve=MAP_"); resp = getResponse(client, uri); - entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); + entities = resp.readEntity(new GenericType<Set<TimelineEntity>>(){}); assertNotNull(entities); assertEquals(2, entities.size()); int metricCnt = 0; @@ -1230,7 +1230,7 @@ public void testGetEntitiesDataToRetrieve() throws Exception { "timeline/clusters/cluster1/apps/application_1111111111_1111/" + "entities/type1?metricstoretrieve=MAP1_,HDFS_"); resp = getResponse(client, uri); - entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); + entities = resp.readEntity(new GenericType<Set<TimelineEntity>>(){}); assertNotNull(entities); assertEquals(2, entities.size()); metricCnt = 0; @@ -1247,7 +1247,7 @@ public void testGetEntitiesDataToRetrieve() throws Exception { "timeline/clusters/cluster1/apps/application_1111111111_1111/" + "entities/type1?metricstoretrieve=!(MAP1_,HDFS_)"); resp = getResponse(client, uri); - entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); + entities = resp.readEntity(new GenericType<Set<TimelineEntity>>(){}); assertNotNull(entities); assertEquals(2, entities.size()); metricCnt = 0; @@ -1260,7 +1260,7 @@ public void testGetEntitiesDataToRetrieve() throws Exception { } assertEquals(2, metricCnt); } finally { - client.destroy(); + client.close(); } } @@ -1272,9 +1272,9 @@ public void testGetEntitiesConfigFilters() throws Exception { "timeline/clusters/cluster1/apps/application_1111111111_1111/" + "entities/type1?conffilters=config_param1%20eq%20value1%20OR%20" + "config_param1%20eq%20value3"); - ClientResponse resp = getResponse(client, uri); + Response resp = getResponse(client, uri); Set<TimelineEntity> entities = - resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); + resp.readEntity(new GenericType<Set<TimelineEntity>>(){}); assertNotNull(entities); assertEquals(2, entities.size()); for (TimelineEntity entity : entities) { @@ -1287,7 +1287,7 @@ public void testGetEntitiesConfigFilters() throws Exception { "entities/type1?conffilters=config_param1%20eq%20value1%20AND" + "%20configuration_param2%20eq%20value2"); resp = getResponse(client, uri); - entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); + entities = resp.readEntity(new GenericType<Set<TimelineEntity>>(){}); assertNotNull(entities); assertEquals(0, entities.size()); @@ -1299,7 +1299,7 @@ public void testGetEntitiesConfigFilters() throws Exception { "%20configuration_param2%20eq%20value2)%20OR%20(config_param1%20eq" + "%20value3%20AND%20cfg_param3%20eq%20value1)"); resp = getResponse(client, uri); - entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); + entities = resp.readEntity(new GenericType<Set<TimelineEntity>>(){}); assertNotNull(entities); assertEquals(1, entities.size()); int cfgCnt = 0; @@ -1317,7 +1317,7 @@ public void testGetEntitiesConfigFilters() throws Exception { "%20configuration_param2%20eq%20value2)%20OR%20(config_param1%20eq" + "%20value3%20AND%20cfg_param3%20eq%20value1)&fields=CONFIGS"); resp = getResponse(client, uri); - entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); + entities = resp.readEntity(new GenericType<Set<TimelineEntity>>(){}); assertNotNull(entities); assertEquals(1, entities.size()); cfgCnt = 0; @@ -1334,7 +1334,7 @@ public void testGetEntitiesConfigFilters() throws Exception { "%20value3%20AND%20cfg_param3%20eq%20value1)&confstoretrieve=cfg_," + "configuration_"); resp = getResponse(client, uri); - entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); + entities = resp.readEntity(new GenericType<Set<TimelineEntity>>(){}); assertNotNull(entities); assertEquals(1, entities.size()); cfgCnt = 0; @@ -1357,7 +1357,7 @@ public void testGetEntitiesConfigFilters() throws Exception { "timeline/clusters/cluster1/apps/application_1111111111_1111/" + "entities/type1?conffilters=configuration_param2%20ne%20value3"); resp = getResponse(client, uri); - entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); + entities = resp.readEntity(new GenericType<Set<TimelineEntity>>(){}); assertNotNull(entities); assertEquals(2, entities.size()); for (TimelineEntity entity : entities) { @@ -1369,14 +1369,14 @@ public void testGetEntitiesConfigFilters() throws Exception { "timeline/clusters/cluster1/apps/application_1111111111_1111/" + "entities/type1?conffilters=configuration_param2%20ene%20value3"); resp = getResponse(client, uri); - entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); + entities = resp.readEntity(new GenericType<Set<TimelineEntity>>(){}); assertNotNull(entities); assertEquals(1, entities.size()); for (TimelineEntity entity : entities) { assertEquals("entity2", entity.getId()); } } finally { - client.destroy(); + client.close(); } } @@ -1389,9 +1389,9 @@ public void testGetEntitiesInfoFilters() throws Exception { "timeline/clusters/cluster1/apps/application_1111111111_1111/" + "entities/type1?infofilters=info1%20eq%20cluster1%20OR%20info1%20eq" + "%20cluster2"); - ClientResponse resp = getResponse(client, uri); + Response resp = getResponse(client, uri); Set<TimelineEntity> entities = - resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); + resp.readEntity(new GenericType<Set<TimelineEntity>>(){}); assertNotNull(entities); assertEquals(2, entities.size()); for (TimelineEntity entity : entities) { @@ -1405,7 +1405,7 @@ public void testGetEntitiesInfoFilters() throws Exception { "entities/type1?infofilters=info1%20eq%20cluster1%20AND%20info4%20" + "eq%2035000"); resp = getResponse(client, uri); - entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); + entities = resp.readEntity(new GenericType<Set<TimelineEntity>>(){}); assertNotNull(entities); assertEquals(0, entities.size()); @@ -1415,7 +1415,7 @@ public void testGetEntitiesInfoFilters() throws Exception { "entities/type1?infofilters=info4%20eq%2035000%20OR%20info4%20eq" + "%2036000"); resp = getResponse(client, uri); - entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); + entities = resp.readEntity(new GenericType<Set<TimelineEntity>>(){}); assertNotNull(entities); assertEquals(2, entities.size()); for (TimelineEntity entity : entities) { @@ -1431,7 +1431,7 @@ public void testGetEntitiesInfoFilters() throws Exception { "eq%2035000)%20OR%20(info1%20eq%20cluster2%20AND%20info2%20eq%202.0" + ")"); resp = getResponse(client, uri); - entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); + entities = resp.readEntity(new GenericType<Set<TimelineEntity>>(){}); assertNotNull(entities); assertEquals(1, entities.size()); int infoCnt = 0; @@ -1451,7 +1451,7 @@ public void testGetEntitiesInfoFilters() throws Exception { "eq%2035000)%20OR%20(info1%20eq%20cluster2%20AND%20info2%20eq%20" + "2.0)&fields=INFO"); resp = getResponse(client, uri); - entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); + entities = resp.readEntity(new GenericType<Set<TimelineEntity>>(){}); assertNotNull(entities); assertEquals(1, entities.size()); infoCnt = 0; @@ -1471,7 +1471,7 @@ public void testGetEntitiesInfoFilters() throws Exception { "timeline/clusters/cluster1/apps/application_1111111111_1111/" + "entities/type1?infofilters=info3%20ne%2039000"); resp = getResponse(client, uri); - entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); + entities = resp.readEntity(new GenericType<Set<TimelineEntity>>(){}); assertNotNull(entities); assertEquals(2, entities.size()); for (TimelineEntity entity : entities) { @@ -1483,14 +1483,14 @@ public void testGetEntitiesInfoFilters() throws Exception { "timeline/clusters/cluster1/apps/application_1111111111_1111/" + "entities/type1?infofilters=info3%20ene%2039000"); resp = getResponse(client, uri); - entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); + entities = resp.readEntity(new GenericType<Set<TimelineEntity>>(){}); assertNotNull(entities); assertEquals(1, entities.size()); for (TimelineEntity entity : entities) { assertEquals("entity1", entity.getId()); } } finally { - client.destroy(); + client.close(); } } @@ -1503,9 +1503,9 @@ public void testGetEntitiesMetricFilters() throws Exception { "timeline/clusters/cluster1/apps/application_1111111111_1111/" + "entities/type1?metricfilters=HDFS_BYTES_READ%20lt%2060%20OR%20" + "HDFS_BYTES_READ%20eq%20157"); - ClientResponse resp = getResponse(client, uri); + Response resp = getResponse(client, uri); Set<TimelineEntity> entities = - resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); + resp.readEntity(new GenericType<Set<TimelineEntity>>(){}); assertNotNull(entities); assertEquals(2, entities.size()); for (TimelineEntity entity : entities) { @@ -1519,7 +1519,7 @@ public void testGetEntitiesMetricFilters() throws Exception { "entities/type1?metricfilters=HDFS_BYTES_READ%20lt%2060%20AND%20" + "MAP_SLOT_MILLIS%20gt%2040"); resp = getResponse(client, uri); - entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); + entities = resp.readEntity(new GenericType<Set<TimelineEntity>>(){}); assertNotNull(entities); assertEquals(0, entities.size()); @@ -1531,7 +1531,7 @@ public void testGetEntitiesMetricFilters() throws Exception { "MAP_SLOT_MILLIS%20gt%2040)%20OR%20(MAP1_SLOT_MILLIS%20ge" + "%20140%20AND%20MAP11_SLOT_MILLIS%20le%20122)"); resp = getResponse(client, uri); - entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); + entities = resp.readEntity(new GenericType<Set<TimelineEntity>>(){}); assertNotNull(entities); assertEquals(1, entities.size()); int metricCnt = 0; @@ -1549,7 +1549,7 @@ public void testGetEntitiesMetricFilters() throws Exception { "MAP_SLOT_MILLIS%20gt%2040)%20OR%20(MAP1_SLOT_MILLIS%20ge" + "%20140%20AND%20MAP11_SLOT_MILLIS%20le%20122)&fields=METRICS"); resp = getResponse(client, uri); - entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); + entities = resp.readEntity(new GenericType<Set<TimelineEntity>>(){}); assertNotNull(entities); assertEquals(1, entities.size()); metricCnt = 0; @@ -1568,7 +1568,7 @@ public void testGetEntitiesMetricFilters() throws Exception { "%20140%20AND%20MAP11_SLOT_MILLIS%20le%20122)&metricstoretrieve=" + "!(HDFS)"); resp = getResponse(client, uri); - entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); + entities = resp.readEntity(new GenericType<Set<TimelineEntity>>(){}); assertNotNull(entities); assertEquals(1, entities.size()); metricCnt = 0; @@ -1589,7 +1589,7 @@ public void testGetEntitiesMetricFilters() throws Exception { "%20140%20AND%20MAP11_SLOT_MILLIS%20le%20122)&metricstoretrieve=" + "!(HDFS)&metricslimit=10"); resp = getResponse(client, uri); - entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); + entities = resp.readEntity(new GenericType<Set<TimelineEntity>>(){}); assertNotNull(entities); assertEquals(1, entities.size()); metricCnt = 0; @@ -1619,7 +1619,7 @@ public void testGetEntitiesMetricFilters() throws Exception { "timeline/clusters/cluster1/apps/application_1111111111_1111/" + "entities/type1?metricfilters=MAP11_SLOT_MILLIS%20ne%20100"); resp = getResponse(client, uri); - entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); + entities = resp.readEntity(new GenericType<Set<TimelineEntity>>(){}); assertNotNull(entities); assertEquals(2, entities.size()); for (TimelineEntity entity : entities) { @@ -1631,14 +1631,14 @@ public void testGetEntitiesMetricFilters() throws Exception { "timeline/clusters/cluster1/apps/application_1111111111_1111/" + "entities/type1?metricfilters=MAP11_SLOT_MILLIS%20ene%20100"); resp = getResponse(client, uri); - entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); + entities = resp.readEntity(new GenericType<Set<TimelineEntity>>(){}); assertNotNull(entities); assertEquals(1, entities.size()); for (TimelineEntity entity : entities) { assertEquals("entity2", entity.getId()); } } finally { - client.destroy(); + client.close(); } } @@ -1649,9 +1649,9 @@ public void testGetEntitiesEventFilters() throws Exception { URI uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/clusters/cluster1/apps/application_1111111111_1111/" + "entities/type1?eventfilters=event1,event3"); - ClientResponse resp = getResponse(client, uri); + Response resp = getResponse(client, uri); Set<TimelineEntity> entities = - resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); + resp.readEntity(new GenericType<Set<TimelineEntity>>(){}); assertNotNull(entities); assertEquals(2, entities.size()); for (TimelineEntity entity : entities) { @@ -1663,7 +1663,7 @@ public void testGetEntitiesEventFilters() throws Exception { "timeline/clusters/cluster1/apps/application_1111111111_1111/" + "entities/type1?eventfilters=!(event1,event3)"); resp = getResponse(client, uri); - entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); + entities = resp.readEntity(new GenericType<Set<TimelineEntity>>(){}); assertNotNull(entities); assertEquals(0, entities.size()); @@ -1672,7 +1672,7 @@ public void testGetEntitiesEventFilters() throws Exception { "timeline/clusters/cluster1/apps/application_1111111111_1111/" + "entities/type1?eventfilters=!(event1,event3)%20OR%20event5,event6"); resp = getResponse(client, uri); - entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); + entities = resp.readEntity(new GenericType<Set<TimelineEntity>>(){}); assertNotNull(entities); assertEquals(1, entities.size()); for (TimelineEntity entity : entities) { @@ -1686,7 +1686,7 @@ public void testGetEntitiesEventFilters() throws Exception { "entities/type1?eventfilters=(!(event1,event3)%20OR%20event5," + "event6)%20OR%20(event1,event2%20AND%20(event3,event4))"); resp = getResponse(client, uri); - entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); + entities = resp.readEntity(new GenericType<Set<TimelineEntity>>(){}); assertNotNull(entities); assertEquals(2, entities.size()); for (TimelineEntity entity : entities) { @@ -1694,7 +1694,7 @@ public void testGetEntitiesEventFilters() throws Exception { entity.getId().equals("entity2")); } } finally { - client.destroy(); + client.close(); } } @@ -1705,9 +1705,9 @@ public void testGetEntitiesRelationFilters() throws Exception { URI uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/clusters/cluster1/apps/application_1111111111_1111/" + "entities/type1?isrelatedto=type3:entity31,type2:entity21:entity22"); - ClientResponse resp = getResponse(client, uri); + Response resp = getResponse(client, uri); Set<TimelineEntity> entities = - resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); + resp.readEntity(new GenericType<Set<TimelineEntity>>(){}); assertNotNull(entities); assertEquals(2, entities.size()); for (TimelineEntity entity : entities) { @@ -1720,7 +1720,7 @@ public void testGetEntitiesRelationFilters() throws Exception { "clusters/cluster1/apps/application_1111111111_1111/entities/type1" + "?isrelatedto=!(type3:entity31,type2:entity21:entity22)"); resp = getResponse(client, uri); - entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); + entities = resp.readEntity(new GenericType<Set<TimelineEntity>>(){}); assertNotNull(entities); assertEquals(0, entities.size()); @@ -1732,7 +1732,7 @@ public void testGetEntitiesRelationFilters() throws Exception { "?isrelatedto=!(type3:entity31,type2:entity21:entity22)%20OR%20" + "type5:entity51,type6:entity61:entity66"); resp = getResponse(client, uri); - entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); + entities = resp.readEntity(new GenericType<Set<TimelineEntity>>(){}); assertNotNull(entities); assertEquals(1, entities.size()); for (TimelineEntity entity : entities) { @@ -1750,7 +1750,7 @@ public void testGetEntitiesRelationFilters() throws Exception { "type2:entity21:entity22%20AND%20(type3:entity32:entity35,"+ "type4:entity42))"); resp = getResponse(client, uri); - entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); + entities = resp.readEntity(new GenericType<Set<TimelineEntity>>(){}); assertNotNull(entities); assertEquals(2, entities.size()); for (TimelineEntity entity : entities) { @@ -1766,7 +1766,7 @@ public void testGetEntitiesRelationFilters() throws Exception { "?relatesto=!%20(type3:entity31,type2:entity21:entity22%20)%20OR%20" + "type5:entity51,type6:entity61:entity66"); resp = getResponse(client, uri); - entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); + entities = resp.readEntity(new GenericType<Set<TimelineEntity>>(){}); assertNotNull(entities); assertEquals(1, entities.size()); for (TimelineEntity entity : entities) { @@ -1785,7 +1785,7 @@ public void testGetEntitiesRelationFilters() throws Exception { "type2:entity21:entity22%20AND%20(type3:entity32:entity35%20,%20"+ "type4:entity42))"); resp = getResponse(client, uri); - entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); + entities = resp.readEntity(new GenericType<Set<TimelineEntity>>(){}); assertNotNull(entities); assertEquals(2, entities.size()); for (TimelineEntity entity : entities) { @@ -1793,7 +1793,7 @@ public void testGetEntitiesRelationFilters() throws Exception { entity.getId().equals("entity2")); } } finally { - client.destroy(); + client.close(); } } @@ -1829,9 +1829,9 @@ public void testGetEntitiesMetricsTimeRange() throws Exception { "timeline/clusters/cluster1/apps/application_1111111111_1111/" + "entities/type1?fields=ALL&metricslimit=100&metricstimestart=" + (ts - 90000) + "&metricstimeend=" + (ts - 80000)); - ClientResponse resp = getResponse(client, uri); + Response resp = getResponse(client, uri); Set<TimelineEntity> entities = - resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); + resp.readEntity(new GenericType<Set<TimelineEntity>>(){}); assertNotNull(entities); assertEquals(2, entities.size()); verifyMetricsCount(entities, 4, 4); @@ -1841,7 +1841,7 @@ public void testGetEntitiesMetricsTimeRange() throws Exception { "entities/type1?fields=ALL&metricslimit=100&metricstimestart=" + (ts - 100000) + "&metricstimeend=" + (ts - 80000)); resp = getResponse(client, uri); - entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); + entities = resp.readEntity(new GenericType<Set<TimelineEntity>>(){}); assertNotNull(entities); assertEquals(2, entities.size()); verifyMetricsCount(entities, 5, 9); @@ -1851,7 +1851,7 @@ public void testGetEntitiesMetricsTimeRange() throws Exception { "entities/type1?fields=ALL&metricslimit=100&metricstimestart=" + (ts - 100000)); resp = getResponse(client, uri); - entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); + entities = resp.readEntity(new GenericType<Set<TimelineEntity>>(){}); assertNotNull(entities); assertEquals(2, entities.size()); verifyMetricsCount(entities, 5, 9); @@ -1861,7 +1861,7 @@ public void testGetEntitiesMetricsTimeRange() throws Exception { "entities/type1?fields=ALL&metricslimit=100&metricstimeend=" + (ts - 90000)); resp = getResponse(client, uri); - entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); + entities = resp.readEntity(new GenericType<Set<TimelineEntity>>(){}); assertNotNull(entities); assertEquals(2, entities.size()); verifyMetricsCount(entities, 5, 5); @@ -1871,7 +1871,7 @@ public void testGetEntitiesMetricsTimeRange() throws Exception { "entities/type1?fields=ALL&metricstimestart=" + (ts - 100000)); resp = getResponse(client, uri); - entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); + entities = resp.readEntity(new GenericType<Set<TimelineEntity>>(){}); assertNotNull(entities); assertEquals(2, entities.size()); verifyMetricsCount(entities, 5, 5); @@ -1881,7 +1881,7 @@ public void testGetEntitiesMetricsTimeRange() throws Exception { "entities/type1/entity2?fields=ALL&metricstimestart=" + (ts - 100000) + "&metricstimeend=" + (ts - 80000)); resp = getResponse(client, uri); - TimelineEntity entity = resp.getEntity(TimelineEntity.class); + TimelineEntity entity = resp.readEntity(TimelineEntity.class); assertNotNull(entity); verifyMetricCount(entity, 3, 3); @@ -1890,7 +1890,7 @@ public void testGetEntitiesMetricsTimeRange() throws Exception { "entities/type1/entity2?fields=ALL&metricslimit=5&metricstimestart=" + (ts - 100000) + "&metricstimeend=" + (ts - 80000)); resp = getResponse(client, uri); - entity = resp.getEntity(TimelineEntity.class); + entity = resp.readEntity(TimelineEntity.class); assertNotNull(entity); verifyMetricCount(entity, 3, 5); @@ -1898,9 +1898,9 @@ public void testGetEntitiesMetricsTimeRange() throws Exception { "timeline/clusters/cluster1/apps/application_1111111111_1111/" + "entities/type1?fields=ALL&metricslimit=100&metricstimestart=" + (ts - 80000) + "&metricstimeend=" + (ts - 90000)); - verifyHttpResponse(client, uri, Status.BAD_REQUEST); + verifyHttpResponse(client, uri, Response.Status.BAD_REQUEST); } finally { - client.destroy(); + client.close(); } } @@ -1914,8 +1914,8 @@ public void testGetEntityDataToRetrieve() throws Exception { URI uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/clusters/cluster1/apps/application_1111111111_1111/" + "entities/type1/entity2?confstoretrieve=cfg_,configuration_"); - ClientResponse resp = getResponse(client, uri); - TimelineEntity entity = resp.getEntity(TimelineEntity.class); + Response resp = getResponse(client, uri); + TimelineEntity entity = resp.readEntity(TimelineEntity.class); assertNotNull(entity); assertEquals("entity2", entity.getId()); assertEquals("type1", entity.getType()); @@ -1929,7 +1929,7 @@ public void testGetEntityDataToRetrieve() throws Exception { "timeline/clusters/cluster1/apps/application_1111111111_1111/" + "entities/type1/entity2?confstoretrieve=!(cfg_,configuration_)"); resp = getResponse(client, uri); - entity = resp.getEntity(TimelineEntity.class); + entity = resp.readEntity(TimelineEntity.class); assertNotNull(entity); assertEquals("entity2", entity.getId()); assertEquals("type1", entity.getType()); @@ -1942,7 +1942,7 @@ public void testGetEntityDataToRetrieve() throws Exception { "timeline/clusters/cluster1/apps/application_1111111111_1111/" + "entities/type1/entity2?metricstoretrieve=MAP1_,HDFS_"); resp = getResponse(client, uri); - entity = resp.getEntity(TimelineEntity.class); + entity = resp.readEntity(TimelineEntity.class); assertNotNull(entity); assertEquals("entity2", entity.getId()); assertEquals("type1", entity.getType()); @@ -1956,7 +1956,7 @@ public void testGetEntityDataToRetrieve() throws Exception { "timeline/clusters/cluster1/apps/application_1111111111_1111/" + "entities/type1/entity2?metricstoretrieve=!(MAP1_,HDFS_)"); resp = getResponse(client, uri); - entity = resp.getEntity(TimelineEntity.class); + entity = resp.readEntity(TimelineEntity.class); assertNotNull(entity); assertEquals("entity2", entity.getId()); assertEquals("type1", entity.getType()); @@ -1972,7 +1972,7 @@ public void testGetEntityDataToRetrieve() throws Exception { "entities/type1/entity2?metricstoretrieve=!(MAP1_,HDFS_)&" + "metricslimit=5"); resp = getResponse(client, uri); - entity = resp.getEntity(TimelineEntity.class); + entity = resp.readEntity(TimelineEntity.class); assertNotNull(entity); assertEquals("entity2", entity.getId()); assertEquals("type1", entity.getType()); @@ -1982,7 +1982,7 @@ public void testGetEntityDataToRetrieve() throws Exception { assertEquals(TimelineMetric.Type.SINGLE_VALUE, metric.getType()); } } finally { - client.destroy(); + client.close(); } } @@ -1993,9 +1993,9 @@ public void testGetFlowRunApps() throws Exception { URI uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/clusters/cluster1/users/user1/flows/flow_name/runs/" + "1002345678919/apps?fields=ALL"); - ClientResponse resp = getResponse(client, uri); + Response resp = getResponse(client, uri); Set<TimelineEntity> entities = - resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); + resp.readEntity(new GenericType<Set<TimelineEntity>>(){}); assertNotNull(entities); assertEquals(2, entities.size()); for (TimelineEntity entity : entities) { @@ -2014,7 +2014,7 @@ public void testGetFlowRunApps() throws Exception { "timeline/clusters/cluster1/users/user1/flows/flow_name/runs/" + "1002345678919/apps?fields=ALL&metricslimit=2"); resp = getResponse(client, uri); - entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); + entities = resp.readEntity(new GenericType<Set<TimelineEntity>>(){}); assertNotNull(entities); assertEquals(2, entities.size()); for (TimelineEntity entity : entities) { @@ -2033,7 +2033,7 @@ public void testGetFlowRunApps() throws Exception { uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/users/user1/flows/flow_name/runs/1002345678919/apps"); resp = getResponse(client, uri); - entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); + entities = resp.readEntity(new GenericType<Set<TimelineEntity>>(){}); assertNotNull(entities); assertEquals(2, entities.size()); @@ -2041,11 +2041,11 @@ public void testGetFlowRunApps() throws Exception { "timeline/users/user1/flows/flow_name/runs/1002345678919/" + "apps?limit=1"); resp = getResponse(client, uri); - entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); + entities = resp.readEntity(new GenericType<Set<TimelineEntity>>(){}); assertNotNull(entities); assertEquals(1, entities.size()); } finally { - client.destroy(); + client.close(); } } @@ -2056,9 +2056,9 @@ public void testGetFlowApps() throws Exception { URI uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/clusters/cluster1/users/user1/flows/flow_name/apps?" + "fields=ALL"); - ClientResponse resp = getResponse(client, uri); + Response resp = getResponse(client, uri); Set<TimelineEntity> entities = - resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); + resp.readEntity(new GenericType<Set<TimelineEntity>>(){}); assertNotNull(entities); assertEquals(3, entities.size()); for (TimelineEntity entity : entities) { @@ -2096,7 +2096,7 @@ public void testGetFlowApps() throws Exception { "timeline/clusters/cluster1/users/user1/flows/flow_name/apps?" + "fields=ALL&metricslimit=6"); resp = getResponse(client, uri); - entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); + entities = resp.readEntity(new GenericType<Set<TimelineEntity>>(){}); assertNotNull(entities); assertEquals(3, entities.size()); for (TimelineEntity entity : entities) { @@ -2139,18 +2139,18 @@ public void testGetFlowApps() throws Exception { uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/users/user1/flows/flow_name/apps"); resp = getResponse(client, uri); - entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); + entities = resp.readEntity(new GenericType<Set<TimelineEntity>>(){}); assertNotNull(entities); assertEquals(3, entities.size()); uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/users/user1/flows/flow_name/apps?limit=1"); resp = getResponse(client, uri); - entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); + entities = resp.readEntity(new GenericType<Set<TimelineEntity>>(){}); assertNotNull(entities); assertEquals(1, entities.size()); } finally { - client.destroy(); + client.close(); } } @@ -2162,9 +2162,9 @@ public void testGetFlowAppsFilters() throws Exception { URI uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/clusters/cluster1/users/user1/flows/flow_name/apps?" + "eventfilters=" + ApplicationMetricsConstants.FINISHED_EVENT_TYPE); - ClientResponse resp = getResponse(client, uri); + Response resp = getResponse(client, uri); Set<TimelineEntity> entities = - resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); + resp.readEntity(new GenericType<Set<TimelineEntity>>(){}); assertNotNull(entities); assertEquals(1, entities.size()); assertTrue("Unexpected app in result", entities.contains( @@ -2174,7 +2174,7 @@ public void testGetFlowAppsFilters() throws Exception { "timeline/clusters/cluster1/users/user1/flows/flow_name/apps?" + "metricfilters=HDFS_BYTES_READ%20ge%200"); resp = getResponse(client, uri); - entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); + entities = resp.readEntity(new GenericType<Set<TimelineEntity>>(){}); assertNotNull(entities); assertEquals(1, entities.size()); assertTrue("Unexpected app in result", entities.contains( @@ -2184,13 +2184,13 @@ public void testGetFlowAppsFilters() throws Exception { "timeline/clusters/cluster1/users/user1/flows/flow_name/apps?" + "conffilters=cfg1%20eq%20value1"); resp = getResponse(client, uri); - entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); + entities = resp.readEntity(new GenericType<Set<TimelineEntity>>(){}); assertNotNull(entities); assertEquals(1, entities.size()); assertTrue("Unexpected app in result", entities.contains( newEntity(entityType, "application_1111111111_2222"))); } finally { - client.destroy(); + client.close(); } } @@ -2201,9 +2201,9 @@ public void testGetFlowRunNotPresent() throws Exception { URI uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/clusters/cluster1/users/user1/flows/flow_name/runs/" + "1002345678929"); - verifyHttpResponse(client, uri, Status.NOT_FOUND); + verifyHttpResponse(client, uri, Response.Status.NOT_FOUND); } finally { - client.destroy(); + client.close(); } } @@ -2213,15 +2213,15 @@ public void testGetFlowsNotPresent() throws Exception { try { URI uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/clusters/cluster2/flows"); - ClientResponse resp = getResponse(client, uri); + Response resp = getResponse(client, uri); Set<FlowActivityEntity> entities = - resp.getEntity(new GenericType<Set<FlowActivityEntity>>(){}); - assertEquals(MediaType.APPLICATION_JSON_TYPE + "; charset=utf-8", - resp.getType().toString()); + resp.readEntity(new GenericType<Set<FlowActivityEntity>>(){}); + assertEquals(MediaType.APPLICATION_JSON_TYPE + ";charset=utf-8", + resp.getMediaType().toString()); assertNotNull(entities); assertEquals(0, entities.size()); } finally { - client.destroy(); + client.close(); } } @@ -2231,9 +2231,9 @@ public void testGetAppNotPresent() throws Exception { try { URI uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/clusters/cluster1/apps/application_1111111111_1378"); - verifyHttpResponse(client, uri, Status.NOT_FOUND); + verifyHttpResponse(client, uri, Response.Status.NOT_FOUND); } finally { - client.destroy(); + client.close(); } } @@ -2244,15 +2244,15 @@ public void testGetFlowRunAppsNotPresent() throws Exception { URI uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/clusters/cluster2/users/user1/flows/flow_name/runs/" + "1002345678919/apps"); - ClientResponse resp = getResponse(client, uri); + Response resp = getResponse(client, uri); Set<TimelineEntity> entities = - resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); - assertEquals(MediaType.APPLICATION_JSON_TYPE + "; charset=utf-8", - resp.getType().toString()); + resp.readEntity(new GenericType<Set<TimelineEntity>>(){}); + assertEquals(MediaType.APPLICATION_JSON_TYPE + ";charset=utf-8", + resp.getMediaType().toString()); assertNotNull(entities); assertEquals(0, entities.size()); } finally { - client.destroy(); + client.close(); } } @@ -2262,15 +2262,15 @@ public void testGetFlowAppsNotPresent() throws Exception { try { URI uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/clusters/cluster2/users/user1/flows/flow_name55/apps"); - ClientResponse resp = getResponse(client, uri); + Response resp = getResponse(client, uri); Set<TimelineEntity> entities = - resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); - assertEquals(MediaType.APPLICATION_JSON_TYPE + "; charset=utf-8", - resp.getType().toString()); + resp.readEntity(new GenericType<Set<TimelineEntity>>(){}); + assertEquals(MediaType.APPLICATION_JSON_TYPE + ";charset=utf-8", + resp.getMediaType().toString()); assertNotNull(entities); assertEquals(0, entities.size()); } finally { - client.destroy(); + client.close(); } } @@ -2287,7 +2287,7 @@ public void testGenericEntitiesForPagination() throws Exception { + "/entities/entitytype"; verifyEntitiesForPagination(client, resourceUri); } finally { - client.destroy(); + client.close(); } } @@ -2297,9 +2297,9 @@ private void verifyEntitiesForPagination(Client client, String resourceUri) String queryParam = "?limit=" + limit; URI uri = URI.create(resourceUri + queryParam); - ClientResponse resp = getResponse(client, uri); + Response resp = getResponse(client, uri); List<TimelineEntity> entities = - resp.getEntity(new GenericType<List<TimelineEntity>>() { + resp.readEntity(new GenericType<List<TimelineEntity>>() { }); // verify for entity-10 to entity-1 in descending order. verifyPaginatedEntites(entities, limit, limit); @@ -2308,7 +2308,7 @@ private void verifyEntitiesForPagination(Client client, String resourceUri) queryParam = "?limit=" + limit; uri = URI.create(resourceUri + queryParam); resp = getResponse(client, uri); - entities = resp.getEntity(new GenericType<List<TimelineEntity>>() { + entities = resp.readEntity(new GenericType<List<TimelineEntity>>() { }); // verify for entity-10 to entity-7 in descending order. TimelineEntity entity = verifyPaginatedEntites(entities, limit, 10); @@ -2317,7 +2317,7 @@ private void verifyEntitiesForPagination(Client client, String resourceUri) + entity.getInfo().get(TimelineReaderUtils.FROMID_KEY); uri = URI.create(resourceUri + queryParam); resp = getResponse(client, uri); - entities = resp.getEntity(new GenericType<List<TimelineEntity>>() { + entities = resp.readEntity(new GenericType<List<TimelineEntity>>() { }); // verify for entity-7 to entity-4 in descending order. entity = verifyPaginatedEntites(entities, limit, 7); @@ -2326,7 +2326,7 @@ private void verifyEntitiesForPagination(Client client, String resourceUri) + entity.getInfo().get(TimelineReaderUtils.FROMID_KEY); uri = URI.create(resourceUri + queryParam); resp = getResponse(client, uri); - entities = resp.getEntity(new GenericType<List<TimelineEntity>>() { + entities = resp.readEntity(new GenericType<List<TimelineEntity>>() { }); // verify for entity-4 to entity-1 in descending order. entity = verifyPaginatedEntites(entities, limit, 4); @@ -2335,7 +2335,7 @@ private void verifyEntitiesForPagination(Client client, String resourceUri) + entity.getInfo().get(TimelineReaderUtils.FROMID_KEY); uri = URI.create(resourceUri + queryParam); resp = getResponse(client, uri); - entities = resp.getEntity(new GenericType<List<TimelineEntity>>() { + entities = resp.readEntity(new GenericType<List<TimelineEntity>>() { }); // always entity-1 will be retrieved entity = verifyPaginatedEntites(entities, 1, 1); @@ -2358,9 +2358,9 @@ private TimelineEntity verifyPaginatedEntites(List<TimelineEntity> entities, private List<FlowActivityEntity> verifyFlowEntites(Client client, URI uri, int noOfEntities, int[] a, String[] flowsInSequence) throws Exception { - ClientResponse resp = getResponse(client, uri); + Response resp = getResponse(client, uri); List<FlowActivityEntity> entities = - resp.getEntity(new GenericType<List<FlowActivityEntity>>() { + resp.readEntity(new GenericType<List<FlowActivityEntity>>() { }); assertNotNull(entities); assertEquals(noOfEntities, entities.size()); @@ -2384,9 +2384,9 @@ public void testForFlowAppsPagination() throws Exception { String resourceUri = "http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/clusters/cluster1/users/user1/flows/flow1/apps"; URI uri = URI.create(resourceUri); - ClientResponse resp = getResponse(client, uri); + Response resp = getResponse(client, uri); List<TimelineEntity> entities = - resp.getEntity(new GenericType<List<TimelineEntity>>() { + resp.readEntity(new GenericType<List<TimelineEntity>>() { }); assertNotNull(entities); assertEquals(totalAppEntities, entities.size()); @@ -2397,7 +2397,7 @@ public void testForFlowAppsPagination() throws Exception { String queryParam = "?limit=" + limit; uri = URI.create(resourceUri + queryParam); resp = getResponse(client, uri); - entities = resp.getEntity(new GenericType<List<TimelineEntity>>() { + entities = resp.readEntity(new GenericType<List<TimelineEntity>>() { }); assertNotNull(entities); assertEquals(limit, entities.size()); @@ -2408,7 +2408,7 @@ public void testForFlowAppsPagination() throws Exception { URI.create(resourceUri + queryParam + "&fromid=" + entity10.getInfo().get(TimelineReaderUtils.FROMID_KEY)); resp = getResponse(client, uri); - entities = resp.getEntity(new GenericType<List<TimelineEntity>>() { + entities = resp.readEntity(new GenericType<List<TimelineEntity>>() { }); assertNotNull(entities); assertEquals(6, entities.size()); @@ -2416,7 +2416,7 @@ public void testForFlowAppsPagination() throws Exception { assertEquals(entity15, entities.get(5)); } finally { - client.destroy(); + client.close(); } } @@ -2429,9 +2429,9 @@ public void testForFlowRunAppsPagination() throws Exception { String resourceUri = "http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/clusters/cluster1/users/user1/flows/flow1/runs/1/apps"; URI uri = URI.create(resourceUri); - ClientResponse resp = getResponse(client, uri); + Response resp = getResponse(client, uri); List<TimelineEntity> entities = - resp.getEntity(new GenericType<List<TimelineEntity>>() { + resp.readEntity(new GenericType<List<TimelineEntity>>() { }); assertNotNull(entities); assertEquals(totalAppEntities, entities.size()); @@ -2442,7 +2442,7 @@ public void testForFlowRunAppsPagination() throws Exception { String queryParam = "?limit=" + limit; uri = URI.create(resourceUri + queryParam); resp = getResponse(client, uri); - entities = resp.getEntity(new GenericType<List<TimelineEntity>>() { + entities = resp.readEntity(new GenericType<List<TimelineEntity>>() { }); assertNotNull(entities); assertEquals(limit, entities.size()); @@ -2453,7 +2453,7 @@ public void testForFlowRunAppsPagination() throws Exception { URI.create(resourceUri + queryParam + "&fromid=" + entity3.getInfo().get(TimelineReaderUtils.FROMID_KEY)); resp = getResponse(client, uri); - entities = resp.getEntity(new GenericType<List<TimelineEntity>>() { + entities = resp.readEntity(new GenericType<List<TimelineEntity>>() { }); assertNotNull(entities); assertEquals(3, entities.size()); @@ -2461,7 +2461,7 @@ public void testForFlowRunAppsPagination() throws Exception { assertEquals(entity5, entities.get(2)); } finally { - client.destroy(); + client.close(); } } @@ -2474,9 +2474,9 @@ public void testForFlowRunsPagination() throws Exception { String resourceUri = "http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/clusters/cluster1/users/user1/flows/flow1/runs"; URI uri = URI.create(resourceUri); - ClientResponse resp = getResponse(client, uri); + Response resp = getResponse(client, uri); List<TimelineEntity> entities = - resp.getEntity(new GenericType<List<TimelineEntity>>() { + resp.readEntity(new GenericType<List<TimelineEntity>>() { }); assertNotNull(entities); assertEquals(totalRuns, entities.size()); @@ -2487,7 +2487,7 @@ public void testForFlowRunsPagination() throws Exception { String queryParam = "?limit=" + limit; uri = URI.create(resourceUri + queryParam); resp = getResponse(client, uri); - entities = resp.getEntity(new GenericType<List<TimelineEntity>>() { + entities = resp.readEntity(new GenericType<List<TimelineEntity>>() { }); assertNotNull(entities); assertEquals(limit, entities.size()); @@ -2497,7 +2497,7 @@ public void testForFlowRunsPagination() throws Exception { uri = URI.create(resourceUri + queryParam + "&fromid=" + entity2.getInfo().get(TimelineReaderUtils.FROMID_KEY)); resp = getResponse(client, uri); - entities = resp.getEntity(new GenericType<List<TimelineEntity>>() { + entities = resp.readEntity(new GenericType<List<TimelineEntity>>() { }); assertNotNull(entities); assertEquals(limit, entities.size()); @@ -2507,13 +2507,13 @@ public void testForFlowRunsPagination() throws Exception { uri = URI.create(resourceUri + queryParam + "&fromid=" + entity3.getInfo().get(TimelineReaderUtils.FROMID_KEY)); resp = getResponse(client, uri); - entities = resp.getEntity(new GenericType<List<TimelineEntity>>() { + entities = resp.readEntity(new GenericType<List<TimelineEntity>>() { }); assertNotNull(entities); assertEquals(1, entities.size()); assertEquals(entity3, entities.get(0)); } finally { - client.destroy(); + client.close(); } } @@ -2525,9 +2525,9 @@ public void testGetAppsMetricsRange() throws Exception { "timeline/clusters/cluster1/users/user1/flows/flow_name/runs/" + "1002345678919/apps?fields=ALL&metricslimit=100&metricstimestart=" + (ts - 200000) + "&metricstimeend=" + (ts - 100000)); - ClientResponse resp = getResponse(client, uri); + Response resp = getResponse(client, uri); Set<TimelineEntity> entities = - resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); + resp.readEntity(new GenericType<Set<TimelineEntity>>(){}); assertNotNull(entities); assertEquals(2, entities.size()); verifyMetricsCount(entities, 4, 4); @@ -2536,7 +2536,7 @@ public void testGetAppsMetricsRange() throws Exception { "timeline/clusters/cluster1/users/user1/flows/flow_name/runs/" + "1002345678919/apps?fields=ALL&metricslimit=100"); resp = getResponse(client, uri); - entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); + entities = resp.readEntity(new GenericType<Set<TimelineEntity>>(){}); assertNotNull(entities); assertEquals(2, entities.size()); verifyMetricsCount(entities, 4, 10); @@ -2546,7 +2546,7 @@ public void testGetAppsMetricsRange() throws Exception { "apps?fields=ALL&metricslimit=100&metricstimestart=" + (ts - 200000) + "&metricstimeend=" + (ts - 100000)); resp = getResponse(client, uri); - entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); + entities = resp.readEntity(new GenericType<Set<TimelineEntity>>(){}); assertNotNull(entities); assertEquals(3, entities.size()); verifyMetricsCount(entities, 5, 5); @@ -2555,7 +2555,7 @@ public void testGetAppsMetricsRange() throws Exception { "timeline/clusters/cluster1/users/user1/flows/flow_name/" + "apps?fields=ALL&metricslimit=100"); resp = getResponse(client, uri); - entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); + entities = resp.readEntity(new GenericType<Set<TimelineEntity>>(){}); assertNotNull(entities); assertEquals(3, entities.size()); verifyMetricsCount(entities, 5, 12); @@ -2565,7 +2565,7 @@ public void testGetAppsMetricsRange() throws Exception { "1002345678919/apps?fields=ALL&metricslimit=100&metricstimestart=" + (ts - 200000)); resp = getResponse(client, uri); - entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); + entities = resp.readEntity(new GenericType<Set<TimelineEntity>>(){}); assertNotNull(entities); assertEquals(2, entities.size()); verifyMetricsCount(entities, 4, 10); @@ -2575,7 +2575,7 @@ public void testGetAppsMetricsRange() throws Exception { "1002345678919/apps?fields=ALL&metricslimit=100&metricstimeend=" + (ts - 100000)); resp = getResponse(client, uri); - entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); + entities = resp.readEntity(new GenericType<Set<TimelineEntity>>(){}); assertNotNull(entities); assertEquals(2, entities.size()); verifyMetricsCount(entities, 4, 4); @@ -2586,7 +2586,7 @@ public void testGetAppsMetricsRange() throws Exception { "&metricstimestart=" +(ts - 200000) + "&metricstimeend=" + (ts - 100000)); resp = getResponse(client, uri); - TimelineEntity entity = resp.getEntity(TimelineEntity.class); + TimelineEntity entity = resp.readEntity(TimelineEntity.class); assertNotNull(entity); verifyMetricCount(entity, 3, 3); @@ -2594,9 +2594,9 @@ public void testGetAppsMetricsRange() throws Exception { "timeline/clusters/cluster1/users/user1/flows/flow_name/" + "apps?fields=ALL&metricslimit=100&metricstimestart=" + (ts - 100000) + "&metricstimeend=" + (ts - 200000)); - verifyHttpResponse(client, uri, Status.BAD_REQUEST); + verifyHttpResponse(client, uri, Response.Status.BAD_REQUEST); } finally { - client.destroy(); + client.close(); } } @@ -2607,12 +2607,12 @@ public void testGetEntityWithSystemEntityType() throws Exception { URI uri = URI.create("http://localhost:" + getServerPort() + "/ws/v2/" + "timeline/apps/application_1111111111_1111/" + "entities/YARN_APPLICATION"); - ClientResponse resp = getResponse(client, uri); + Response resp = getResponse(client, uri); Set<TimelineEntity> entities = - resp.getEntity(new GenericType<Set<TimelineEntity>>(){}); + resp.readEntity(new GenericType<Set<TimelineEntity>>(){}); assertEquals(0, entities.size()); } finally { - client.destroy(); + client.close(); } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineEntityListReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineEntityListReader.java new file mode 100644 index 00000000000..5304cc7db40 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineEntityListReader.java @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.reader; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity.Identifier; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; + +import javax.ws.rs.Consumes; +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.MultivaluedMap; +import javax.ws.rs.ext.MessageBodyReader; +import javax.ws.rs.ext.Provider; +import java.io.IOException; +import java.io.InputStream; +import java.lang.annotation.Annotation; +import java.lang.reflect.Type; +import java.util.*; + +/** + * We have defined a dedicated Reader for `Set<TimelineEntity>`, + * aimed at adapting to the Jersey2 framework + * to ensure that JSON can be converted into `Set<TimelineEntity>`. + */ +@Provider +@Consumes(MediaType.APPLICATION_JSON) +public class TimelineEntityListReader implements MessageBodyReader<List<TimelineEntity>> { + + private ObjectMapper objectMapper = new ObjectMapper(); + private String timelineEntityType = + "java.util.List<org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity>"; + + @Override + public boolean isReadable(Class<?> type, Type genericType, + Annotation[] annotations, MediaType mediaType) { + return timelineEntityType.equals(genericType.getTypeName()); + } + + @Override + public List<TimelineEntity> readFrom(Class<List<TimelineEntity>> type, + Type genericType, Annotation[] annotations, MediaType mediaType, + MultivaluedMap<String, String> httpHeaders, + InputStream entityStream) throws IOException, WebApplicationException { + List<TimelineEntity> timelineEntityList = new ArrayList<>(); + + JsonNode jsonNode = objectMapper.readTree(entityStream); + if (jsonNode.isArray()) { + for (JsonNode jNode : jsonNode) { + TimelineEntity entity = new TimelineEntity(); + + // Get Identifier + JsonNode jnIdentifier = jNode.get("identifier"); + JsonNode jnType = jnIdentifier.get("type"); + JsonNode jnId = jnIdentifier.get("id"); + Identifier identifier = new Identifier(jnType.asText(), jnId.asText()); + entity.setIdentifier(identifier); + + // Get Type + JsonNode jnAppType = jNode.get("type"); + entity.setType(jnAppType.asText()); + + // Get Createdtime + JsonNode jnCreatedTime = jNode.get("createdtime"); + entity.setCreatedTime(jnCreatedTime.asLong()); + + JsonNode jnMetrics = jNode.get("metrics"); + Set<TimelineMetric> metricSet = new HashSet<>(); + + if (jnMetrics.isArray()) { + for (JsonNode metric : jnMetrics) { + TimelineMetric timelineMetric = objectMapper.treeToValue(metric, TimelineMetric.class); + metricSet.add(timelineMetric); + System.out.println(metric); + } + } + entity.setMetrics(metricSet); + + // Get configs + JsonNode jnConfigs = jNode.get("configs"); + if (jnConfigs != null) { + Map<String, String> configInfos = + objectMapper.treeToValue(jnConfigs, Map.class); + entity.setConfigs(configInfos); + } + + // Get info + JsonNode jnInfos = jNode.get("info"); + if (jnInfos != null) { + Map<String, Object> entityInfos = + objectMapper.treeToValue(jnInfos, Map.class); + entity.setInfo(entityInfos); + } + + // Get idprefix + JsonNode jnIdprefix = jNode.get("idprefix"); + entity.setIdPrefix(jnIdprefix.asLong()); + + timelineEntityList.add(entity); + } + } + + return timelineEntityList; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineEntitySetReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineEntitySetReader.java new file mode 100644 index 00000000000..0e56a1040ea --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineEntitySetReader.java @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.reader; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity.Identifier; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; + +import javax.ws.rs.Consumes; +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.MultivaluedMap; +import javax.ws.rs.ext.MessageBodyReader; +import javax.ws.rs.ext.Provider; +import java.io.IOException; +import java.io.InputStream; +import java.lang.annotation.Annotation; +import java.lang.reflect.Type; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +/** + * We have defined a dedicated Reader for `Set<TimelineEntity>`, + * aimed at adapting to the Jersey2 framework + * to ensure that JSON can be converted into `Set<TimelineEntity>`. + */ +@Provider +@Consumes(MediaType.APPLICATION_JSON) +public class TimelineEntitySetReader implements MessageBodyReader<Set<TimelineEntity>> { + + private ObjectMapper objectMapper = new ObjectMapper(); + private String timelineEntityType = + "java.util.Set<org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity>"; + + @Override + public boolean isReadable(Class<?> type, Type genericType, + Annotation[] annotations, MediaType mediaType) { + return timelineEntityType.equals(genericType.getTypeName()); + } + + @Override + public Set<TimelineEntity> readFrom(Class<Set<TimelineEntity>> type, + Type genericType, Annotation[] annotations, MediaType mediaType, + MultivaluedMap<String, String> httpHeaders, + InputStream entityStream) throws IOException, WebApplicationException { + Set<TimelineEntity> timelineEntitySet = new HashSet<>(); + + JsonNode jsonNode = objectMapper.readTree(entityStream); + if (jsonNode.isArray()) { + for (JsonNode jNode : jsonNode) { + TimelineEntity entity = new TimelineEntity(); + + // Get Identifier + JsonNode jnIdentifier = jNode.get("identifier"); + JsonNode jnType = jnIdentifier.get("type"); + JsonNode jnId = jnIdentifier.get("id"); + Identifier identifier = new Identifier(jnType.asText(), jnId.asText()); + entity.setIdentifier(identifier); + + // Get Type + JsonNode jnAppType = jNode.get("type"); + entity.setType(jnAppType.asText()); + + // Get Createdtime + JsonNode jnCreatedTime = jNode.get("createdtime"); + entity.setCreatedTime(jnCreatedTime.asLong()); + + // Get metrics + JsonNode jnMetrics = jNode.get("metrics"); + Set<TimelineMetric> metricSet = new HashSet<>(); + + if (jnMetrics.isArray()) { + for (JsonNode metric : jnMetrics) { + TimelineMetric timelineMetric = objectMapper.treeToValue(metric, TimelineMetric.class); + metricSet.add(timelineMetric); + } + } + entity.setMetrics(metricSet); + + // Get configs + JsonNode jnConfigs = jNode.get("configs"); + if (jnConfigs != null) { + Map<String, String> configInfos = + objectMapper.treeToValue(jnConfigs, Map.class); + entity.setConfigs(configInfos); + } + + // Get info + JsonNode jnInfos = jNode.get("info"); + if (jnInfos != null) { + Map<String, Object> entityInfos = + objectMapper.treeToValue(jnInfos, Map.class); + entity.setInfo(entityInfos); + } + + + // Get idprefix + JsonNode jnIdprefix = jNode.get("idprefix"); + entity.setIdPrefix(jnIdprefix.asLong()); + + timelineEntitySet.add(entity); + } + } + + return timelineEntitySet; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java index c3ee758294f..f34a6dcacf4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java @@ -251,7 +251,7 @@ public void testWriteFlowRunMinMax() throws Exception { new TimelineDataToRetrieve()); assertTrue(TimelineEntityType.YARN_FLOW_RUN.matches(entity.getType())); FlowRunEntity flowRun = (FlowRunEntity)entity; - assertEquals(minStartTs, flowRun.getStartTime()); + assertEquals(minStartTs, flowRun.getStartTime().longValue()); assertEquals(endTs, flowRun.getMaxEndTime()); } finally { if (hbr != null) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java index 7233dab3457..93008253850 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java @@ -646,7 +646,9 @@ public void flush() throws IOException { protected void serviceStop() throws Exception { boolean isStorageUp = true; try { - storageMonitor.checkStorageIsUp(); + if (storageMonitor != null) { + storageMonitor.checkStorageIsUp(); + } } catch (IOException e) { LOG.warn("Failed to close the timeline tables as Hbase is down", e); isStorageUp = false; @@ -688,7 +690,9 @@ protected void serviceStop() throws Exception { conn.close(); } } - storageMonitor.stop(); + if (storageMonitor != null) { + storageMonitor.stop(); + } super.serviceStop(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/pom.xml index f2e7c5c4a0e..c0f61f10d95 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/pom.xml @@ -29,7 +29,6 @@ <properties> <hadoop.common.build.dir>${basedir}/../../../../hadoop-common-project/hadoop-common/target</hadoop.common.build.dir> - <hbase-compatible-hadoop.version>3.3.6</hbase-compatible-hadoop.version> </properties> <!-- Do not add dependencies here, add them to the POM of the leaf module --> --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org