Repository: samza Updated Branches: refs/heads/master 373181c57 -> 668a952ac
SAMZA-930: fix issue with json deserialisation in YarnUtil Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/668a952a Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/668a952a Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/668a952a Branch: refs/heads/master Commit: 668a952ac628eb1d9cac58ec21797de9be2b9747 Parents: 373181c Author: Alex Buck <[email protected]> Authored: Mon May 2 11:06:21 2016 -0700 Committer: Yi Pan (Data Infrastructure) <[email protected]> Committed: Mon May 2 11:06:21 2016 -0700 ---------------------------------------------------------------------- checkstyle/import-control.xml | 1 + .../samza/autoscaling/utils/YarnUtil.java | 12 +- .../samza/autoscaling/utils/YarnUtilTest.java | 38 ++++++ .../resources/exampleResourceManagerOutput.json | 121 +++++++++++++++++++ 4 files changed, 168 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/668a952a/checkstyle/import-control.xml ---------------------------------------------------------------------- diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index c15b8e7..9afab88 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -30,6 +30,7 @@ <allow pkg="org.mockito" /> <allow pkg="org.apache.log4j" /> <allow pkg="org.apache.kafka" /> + <allow pkg="org.apache.commons" /> <subpackage name="config"> <allow class="org.apache.samza.SamzaException" /> http://git-wip-us.apache.org/repos/asf/samza/blob/668a952a/samza-autoscaling/src/main/java/org/apache/samza/autoscaling/utils/YarnUtil.java ---------------------------------------------------------------------- diff --git a/samza-autoscaling/src/main/java/org/apache/samza/autoscaling/utils/YarnUtil.java b/samza-autoscaling/src/main/java/org/apache/samza/autoscaling/utils/YarnUtil.java index 376c549..cab46b9 100644 --- a/samza-autoscaling/src/main/java/org/apache/samza/autoscaling/utils/YarnUtil.java +++ b/samza-autoscaling/src/main/java/org/apache/samza/autoscaling/utils/YarnUtil.java @@ -74,11 +74,8 @@ public class YarnUtil { String applications = EntityUtils.toString(httpResponse.getEntity()); log.debug("applications: " + applications); - ObjectMapper mapper = new ObjectMapper(); - Map<String, Map<String, List<Map<String, String>>>> yarnApplications = mapper.readValue(applications, new TypeReference<Map<String, Map<String, List<Map<String, String>>>>>() { - }); + List<Map<String, String>> applicationList = parseYarnApplications(applications); String name = jobName + "_" + jobID; - List<Map<String, String>> applicationList = yarnApplications.get("apps").get("app"); for (Map<String, String> application : applicationList) { if (application.containsKey("state") && application.containsKey("name") && application.containsKey("id")) { if (application.get("state").toString().equals("RUNNING") && application.get("name").toString().equals(name)) { @@ -94,6 +91,13 @@ public class YarnUtil { return null; } + List<Map<String, String>> parseYarnApplications(String applications) throws IOException { + ObjectMapper mapper = new ObjectMapper(); + Map<String, Map<String, List<Map<String, String>>>> yarnApplications = mapper.readValue(applications, new TypeReference<Map<String, Map<String, List<Map<String, Object>>>>>() { + }); + return yarnApplications.get("apps").get("app"); + } + /** * This function returns the state of a given application. This state can be on of the * {"NEW", "NEW_SAVING", "SUBMITTED", "ACCEPTED", "RUNNING", "FINISHED", "FAILED", "KILLED"} http://git-wip-us.apache.org/repos/asf/samza/blob/668a952a/samza-autoscaling/src/test/java/org/apache/samza/autoscaling/utils/YarnUtilTest.java ---------------------------------------------------------------------- diff --git a/samza-autoscaling/src/test/java/org/apache/samza/autoscaling/utils/YarnUtilTest.java b/samza-autoscaling/src/test/java/org/apache/samza/autoscaling/utils/YarnUtilTest.java new file mode 100644 index 0000000..97ccb2d --- /dev/null +++ b/samza-autoscaling/src/test/java/org/apache/samza/autoscaling/utils/YarnUtilTest.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.samza.autoscaling.utils; + +import org.apache.commons.io.IOUtils; +import org.junit.Test; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; + +public class YarnUtilTest { + + @Test + public void handleJsonArraysAsWellAsStrings() throws IOException { + YarnUtil yarnUtil = new YarnUtil("rm", 0); + List<Map<String, String>> applications = yarnUtil.parseYarnApplications(IOUtils.toString(getClass().getClassLoader().getResourceAsStream("exampleResourceManagerOutput.json"))); + assertEquals("RUNNING", applications.get(0).get("state")); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/668a952a/samza-autoscaling/src/test/resources/exampleResourceManagerOutput.json ---------------------------------------------------------------------- diff --git a/samza-autoscaling/src/test/resources/exampleResourceManagerOutput.json b/samza-autoscaling/src/test/resources/exampleResourceManagerOutput.json new file mode 100644 index 0000000..9f8a025 --- /dev/null +++ b/samza-autoscaling/src/test/resources/exampleResourceManagerOutput.json @@ -0,0 +1,121 @@ +{ + "apps": { + "app": [ + { + "id": "application_1459790549146_0003", + "user": "root", + "name": "protodeserializer_1", + "queue": "default", + "state": "RUNNING", + "finalStatus": "UNDEFINED", + "progress": 0, + "trackingUI": "ApplicationMaster", + "trackingUrl": "http://yarnrm:8088/proxy/application_1459790549146_0003/", + "diagnostics": "", + "clusterId": 1459790549146, + "applicationType": "Samza", + "applicationTags": "", + "startedTime": 1459792852675, + "finishedTime": 0, + "elapsedTime": 738921, + "amContainerLogs": "http://yarnnm:8042/node/containerlogs/container_1459790549146_0003_01_000001/root", + "amHostHttpAddress": "yarnnm:8042", + "allocatedMB": 1024, + "allocatedVCores": 2, + "runningContainers": 2, + "memorySeconds": 749045, + "vcoreSeconds": 1462, + "preemptedResourceMB": 0, + "preemptedResourceVCores": 0, + "numNonAMContainerPreempted": 0, + "numAMContainerPreempted": 0, + "resourceRequests": [ + { + "capability": { + "memory": 512, + "virtualCores": 1 + }, + "nodeLabelExpression": "", + "numContainers": 0, + "priority": { + "priority": 0 + }, + "relaxLocality": true, + "resourceName": "*" + }, + { + "capability": { + "memory": 512, + "virtualCores": 1 + }, + "nodeLabelExpression": "", + "numContainers": 0, + "priority": { + "priority": 0 + }, + "relaxLocality": true, + "resourceName": "/default-rack" + } + ] + }, + { + "id": "application_1459790549146_0002", + "user": "root", + "name": "protodeserializer_1", + "queue": "default", + "state": "KILLED", + "finalStatus": "KILLED", + "progress": 100, + "trackingUI": "History", + "trackingUrl": "http://yarnrm:8088/cluster/app/application_1459790549146_0002", + "diagnostics": "Application killed by user.", + "clusterId": 1459790549146, + "applicationType": "Samza", + "applicationTags": "", + "startedTime": 1459791820396, + "finishedTime": 1459792284264, + "elapsedTime": 463868, + "amContainerLogs": "http://yarnnm:8042/node/containerlogs/container_1459790549146_0002_01_000001/root", + "amHostHttpAddress": "yarnnm:8042", + "allocatedMB": -1, + "allocatedVCores": -1, + "runningContainers": -1, + "memorySeconds": 462177, + "vcoreSeconds": 902, + "preemptedResourceMB": 0, + "preemptedResourceVCores": 0, + "numNonAMContainerPreempted": 0, + "numAMContainerPreempted": 0 + }, + { + "id": "application_1459790549146_0001", + "user": "root", + "name": "protodeserializer_1", + "queue": "default", + "state": "KILLED", + "finalStatus": "KILLED", + "progress": 100, + "trackingUI": "History", + "trackingUrl": "http://yarnrm:8088/cluster/app/application_1459790549146_0001", + "diagnostics": "Application killed by user.", + "clusterId": 1459790549146, + "applicationType": "Samza", + "applicationTags": "", + "startedTime": 1459791108916, + "finishedTime": 1459791813659, + "elapsedTime": 704743, + "amContainerLogs": "http://yarnnm:8042/node/containerlogs/container_1459790549146_0001_01_000001/root", + "amHostHttpAddress": "yarnnm:8042", + "allocatedMB": -1, + "allocatedVCores": -1, + "runningContainers": -1, + "memorySeconds": 711605, + "vcoreSeconds": 1389, + "preemptedResourceMB": 0, + "preemptedResourceVCores": 0, + "numNonAMContainerPreempted": 0, + "numAMContainerPreempted": 0 + } + ] + } +} \ No newline at end of file
