Repository: falcon
Updated Branches:
  refs/heads/master 830ab368b -> 9979a1fd8


 FALCON-1584 Falcon allows invalid hadoop queue name for schedulable feed 
entities. Contributed by Venkatesan Ramachandran via Balu Vellanki.


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/9979a1fd
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/9979a1fd
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/9979a1fd

Branch: refs/heads/master
Commit: 9979a1fd815495a04f3ce2a0ce673f95c655e20d
Parents: 830ab36
Author: bvellanki <[email protected]>
Authored: Thu Feb 18 14:16:55 2016 -0800
Committer: bvellanki <[email protected]>
Committed: Thu Feb 18 14:16:55 2016 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../org/apache/falcon/entity/EntityUtil.java    |   2 +
 .../org/apache/falcon/entity/FeedHelper.java    |  99 +++++++
 .../falcon/entity/parser/FeedEntityParser.java  |  65 ++++-
 .../entity/parser/ProcessEntityParser.java      |  44 +++
 .../org/apache/falcon/util/HadoopQueueUtil.java | 179 ++++++++++++
 .../apache/falcon/entity/FeedHelperTest.java    |   1 -
 .../apache/falcon/util/HadoopQueueUtilTest.java |  63 +++++
 .../config/feed/feed-schedulerinfo-1.json       | 276 +++++++++++++++++++
 .../config/feed/feed-schedulerinfo-2.json       |  19 ++
 10 files changed, 748 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/9979a1fd/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6861519..191f641 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -11,6 +11,8 @@ Trunk
     FALCON-1230 Data based notification Service to notify execution instances 
when data becomes available(Pavan Kumar Kolamuri via Ajay Yadava)
 
   IMPROVEMENTS
+    FALCON-1584 Falcon allows invalid hadoop queue name for schedulable feed 
entities (Venkatesan Ramachandran via Balu Vellanki)
+
     FALCON-1774 Falcon to honour PRISM_URL env var (Praveen Adlakha) 
 
     FALCON-1721 Checkstyle doesn't extend parent.

http://git-wip-us.apache.org/repos/asf/falcon/blob/9979a1fd/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java 
b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
index 9c03de3..96befa1 100644
--- a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
+++ b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
@@ -81,6 +81,8 @@ import java.util.TimeZone;
 public final class EntityUtil {
     public static final Logger LOG = LoggerFactory.getLogger(EntityUtil.class);
 
+    public static final String MR_QUEUE_NAME = "queueName";
+
     private static final long MINUTE_IN_MS = 60 * 1000L;
     private static final long HOUR_IN_MS = 60 * MINUTE_IN_MS;
     private static final long DAY_IN_MS = 24 * HOUR_IN_MS;

http://git-wip-us.apache.org/repos/asf/falcon/blob/9979a1fd/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java 
b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
index b3aaaab..cca2d8b 100644
--- a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
+++ b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
@@ -1043,6 +1043,40 @@ public final class FeedHelper {
     }
 
     /**
+     * Returns the hadoop cluster queue name specified for the replication 
jobs to run in the Lifecycle
+     * section of the target cluster section of the feed entity.
+     *
+     * NOTE: Lifecycle for replication is not implemented. This will return 
the queueName property value.
+     *
+     * @param feed
+     * @param clusterName
+     * @return hadoop cluster queue name specified in the feed entity
+     * @throws FalconException
+     */
+
+    public static String getLifecycleReplicationQueue(Feed feed, String 
clusterName) throws FalconException {
+        return null;
+    }
+
+    /**
+     * Returns the hadoop cluster queue name specified for the retention jobs 
to run in the Lifecycle
+     * section of feed entity.
+     *
+     * @param feed
+     * @param clusterName
+     * @return hadoop cluster queue name specified in the feed entity
+     * @throws FalconException
+     */
+    public static String getLifecycleRetentionQueue(Feed feed, String 
clusterName) throws FalconException {
+        RetentionStage retentionStage = getRetentionStage(feed, clusterName);
+        if (retentionStage != null) {
+            return retentionStage.getQueue();
+        } else {
+            return null;
+        }
+    }
+
+    /**
      * Returns the data source type associated with the Feed's export policy.
      *
      * @param clusterEntity
@@ -1190,4 +1224,69 @@ public final class FeedHelper {
         Long freqInMillis = DateUtil.getFrequencyInMillis(retentionLimit);
         return (int) (freqInMillis/1000);
     }
+
+    /**
+     * Returns the replication job's queue name specified in the feed entity 
definition.
+     * First looks into the Lifecycle stage if exists. If null, looks into the 
queueName property specified
+     * in the Feed definition.
+     *
+     * @param feed
+     * @param feedCluster
+     * @return
+     * @throws FalconException
+     */
+    public static String getReplicationQueue(Feed feed, Cluster feedCluster) 
throws FalconException {
+        String queueName;
+        queueName = getLifecycleReplicationQueue(feed, feedCluster.getName());
+        if (StringUtils.isBlank(queueName)) {
+            queueName = getQueueFromProperties(feed);
+        }
+        return queueName;
+    }
+
+    /**
+     * Returns the retention job's queue name specified in the feed entity 
definition.
+     * First looks into the Lifecycle stage. If null, looks into the queueName 
property specified
+     * in the Feed definition.
+     *
+     * @param feed
+     * @param feedCluster
+     * @return
+     * @throws FalconException
+     */
+    public static String getRetentionQueue(Feed feed, Cluster feedCluster) 
throws FalconException {
+        String queueName = getLifecycleRetentionQueue(feed, 
feedCluster.getName());
+        if (StringUtils.isBlank(queueName)) {
+            queueName = getQueueFromProperties(feed);
+        }
+        return queueName;
+    }
+
+    /**
+     * Returns the queue name specified in the Feed entity definition from 
queueName property.
+     *
+     * @param feed
+     * @return queueName property value
+     */
+    public static String getQueueFromProperties(Feed feed) {
+        return getPropertyValue(feed, EntityUtil.MR_QUEUE_NAME);
+    }
+
+    /**
+     * Returns value of a feed property given property name.
+     * @param feed
+     * @param propName
+     * @return property value
+     */
+
+    public static String getPropertyValue(Feed feed, String propName) {
+        if (feed.getProperties() != null) {
+            for (Property prop : feed.getProperties().getProperties()) {
+                if ((prop != null) && (prop.getName().equals(propName))) {
+                    return prop.getValue();
+                }
+            }
+        }
+        return null;
+    }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/9979a1fd/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java 
b/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
index 981e730..c942862 100644
--- a/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
+++ b/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
@@ -52,6 +52,7 @@ import org.apache.falcon.group.FeedGroup;
 import org.apache.falcon.group.FeedGroupMap;
 import org.apache.falcon.service.LifecyclePolicyMap;
 import org.apache.falcon.util.DateUtil;
+import org.apache.falcon.util.HadoopQueueUtil;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.authorize.AuthorizationException;
 import org.slf4j.Logger;
@@ -119,6 +120,7 @@ public class FeedEntityParser extends EntityParser<Feed> {
         validateFeedGroups(feed);
         validateFeedSLA(feed);
         validateProperties(feed);
+        validateHadoopQueue(feed);
 
         // Seems like a good enough entity object for a new one
         // But is this an update ?
@@ -527,6 +529,66 @@ public class FeedEntityParser extends EntityParser<Feed> {
         }
     }
 
+    /**
+     * Validate Hadoop cluster queue names specified in the Feed entity 
defintion.
+     *
+     * First tries to look for queue name specified in the Lifecycle, next 
queueName property
+     * and checks its validity against the Hadoop cluster scheduler info.
+     *
+     * Hadoop cluster queue is validated only if YARN RM webaddress is 
specified in the
+     * cluster entity properties.
+     *
+     * Throws exception if the specified queue name is not a valid hadoop 
cluster queue.
+     *
+     * @param feed
+     * @throws FalconException
+     */
+
+    protected void validateHadoopQueue(Feed feed) throws FalconException {
+        for (Cluster cluster : feed.getClusters().getClusters()) {
+            Set<String> feedQueue = getQueueNamesUsedInFeed(feed, cluster);
+
+            org.apache.falcon.entity.v0.cluster.Cluster clusterEntity =
+                    EntityUtil.getEntity(EntityType.CLUSTER, 
cluster.getName());
+
+            String rmURL = ClusterHelper.getPropertyValue(clusterEntity, 
"yarn.resourcemanager.webapp.https.address");
+            if (StringUtils.isBlank(rmURL)) {
+                rmURL = ClusterHelper.getPropertyValue(clusterEntity, 
"yarn.resourcemanager.webapp.address");
+            }
+
+            if (StringUtils.isNotBlank(rmURL)) {
+                LOG.info("Fetching hadoop queue names from cluster {} RM URL 
{}", cluster.getName(), rmURL);
+                Set<String> queueNames = 
HadoopQueueUtil.getHadoopClusterQueueNames(rmURL);
+
+                for (String q: feedQueue) {
+                    if (queueNames.contains(q)) {
+                        LOG.info("Validated presence of retention queue 
specified in feed - {}", q);
+                    } else {
+                        String strMsg = String.format("The hadoop queue name 
%s specified "
+                                + "for cluster %s is invalid.", q, 
cluster.getName());
+                        LOG.info(strMsg);
+                        throw new FalconException(strMsg);
+                    }
+                }
+            }
+        }
+    }
+
+    protected Set<String> getQueueNamesUsedInFeed(Feed feed, Cluster cluster) 
throws FalconException {
+        Set<String> queueList = new HashSet<>();
+        addToQueueList(FeedHelper.getRetentionQueue(feed, cluster), queueList);
+        if (cluster.getType() == ClusterType.TARGET) {
+            addToQueueList(FeedHelper.getReplicationQueue(feed, cluster), 
queueList);
+        }
+        return queueList;
+    }
+
+    private void addToQueueList(String queueName, Set<String> queueList) {
+        if (StringUtils.isBlank(queueName)) {
+            queueList.add(queueName);
+        }
+    }
+
     protected void validateProperties(Feed feed) throws ValidationException {
         Properties properties = feed.getProperties();
         if (properties == null) {
@@ -634,7 +696,7 @@ public class FeedEntityParser extends EntityParser<Feed> {
      */
     private void validateFeedExportArgs(Cluster feedCluster) throws 
FalconException {
         Map<String, String> args = FeedHelper.getExportArguments(feedCluster);
-        Map<String, String> validArgs = new HashMap<String, String>();
+        Map<String, String> validArgs = new HashMap<>();
         validArgs.put("--num-mappers", "");
         validArgs.put("--update-key" , "");
         validArgs.put("--input-null-string", "");
@@ -653,4 +715,5 @@ public class FeedEntityParser extends EntityParser<Feed> {
                     + "currently in Feed import policy"));
         }
     }
+
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/9979a1fd/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java 
b/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java
index eec6f69..16fd8b3 100644
--- 
a/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java
+++ 
b/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java
@@ -41,10 +41,13 @@ import org.apache.falcon.entity.v0.process.Process;
 import org.apache.falcon.expression.ExpressionHelper;
 import org.apache.falcon.hadoop.HadoopClientFactory;
 import org.apache.falcon.util.DateUtil;
+import org.apache.falcon.util.HadoopQueueUtil;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.security.authorize.AuthorizationException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.Date;
@@ -60,6 +63,8 @@ import java.util.TimeZone;
  */
 public class ProcessEntityParser extends EntityParser<Process> {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(ProcessEntityParser.class);
+
     public ProcessEntityParser() {
         super(EntityType.PROCESS);
     }
@@ -114,6 +119,7 @@ public class ProcessEntityParser extends 
EntityParser<Process> {
         validateDatasetName(process.getInputs(), process.getOutputs());
         validateLateInputs(process);
         validateProcessSLA(process);
+        validateHadoopQueue(process);
     }
 
 
@@ -322,4 +328,42 @@ public class ProcessEntityParser extends 
EntityParser<Process> {
         }
     }
 
+    private void validateHadoopQueue(Process process) throws FalconException {
+        // get queue name specified in the process entity
+        String processQueueName = null;
+        java.util.Properties props = EntityUtil.getEntityProperties(process);
+        if ((props != null) && (props.containsKey(EntityUtil.MR_QUEUE_NAME))) {
+            processQueueName = props.getProperty(EntityUtil.MR_QUEUE_NAME);
+        } else {
+            return;
+        }
+
+        // iterate through each cluster in process entity to check if the 
cluster has the process entity queue
+        for (org.apache.falcon.entity.v0.process.Cluster cluster : 
process.getClusters().getClusters()) {
+            String clusterName = cluster.getName();
+            org.apache.falcon.entity.v0.cluster.Cluster clusterEntity =
+                    ConfigurationStore.get().get(EntityType.CLUSTER, 
clusterName);
+
+            String rmURL = ClusterHelper.getPropertyValue(clusterEntity, 
"yarn.resourcemanager.webapp.https.address");
+            if (rmURL == null) {
+                rmURL = ClusterHelper.getPropertyValue(clusterEntity, 
"yarn.resourcemanager.webapp.address");
+            }
+
+            if (rmURL != null) {
+                LOG.info("Fetching hadoop queue names from cluster {} RM URL 
{}", cluster.getName(), rmURL);
+                Set<String> queueNames = 
HadoopQueueUtil.getHadoopClusterQueueNames(rmURL);
+
+                if (queueNames.contains(processQueueName)) {
+                    LOG.info("Validated presence of queue {} specified in 
process "
+                            + "entity for cluster {}", processQueueName, 
clusterName);
+                } else {
+                    String strMsg = String.format("The hadoop queue name %s 
specified in process "
+                            + "entity for cluster %s is invalid.", 
processQueueName, cluster.getName());
+                    LOG.info(strMsg);
+                    throw new FalconException(strMsg);
+                }
+            }
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/9979a1fd/common/src/main/java/org/apache/falcon/util/HadoopQueueUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/util/HadoopQueueUtil.java 
b/common/src/main/java/org/apache/falcon/util/HadoopQueueUtil.java
new file mode 100644
index 0000000..cc48402
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/util/HadoopQueueUtil.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.util;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.falcon.FalconException;
+import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
+import org.apache.hadoop.security.authentication.client.KerberosAuthenticator;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Utility class to get the Hadoop Queue names by querying resource manager.
+ */
+public final class HadoopQueueUtil {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(HadoopQueueUtil.class);
+
+    private HadoopQueueUtil() {
+        // make the constructor private
+    }
+
+    /**
+     * Uses Resource Manager REST API to get the hadoop scheduler info.
+     *
+     * @param rmBaseUrlStr
+     * @return JSON string representing hadoop Scheduler Info
+     * @throws FalconException
+     */
+
+    public static String getHadoopClusterSchedulerInfo(String rmBaseUrlStr) 
throws FalconException {
+        KerberosAuthenticator kAUTHENTICATOR = new KerberosAuthenticator();
+        AuthenticatedURL.Token authenticationToken = new 
AuthenticatedURL.Token();
+        String rmSchedulerInfoURL = rmBaseUrlStr;
+        if (!rmSchedulerInfoURL.endsWith("/")) {
+            rmSchedulerInfoURL += "/";
+        }
+        rmSchedulerInfoURL += "ws/v1/cluster/scheduler";
+        HttpURLConnection conn = null;
+        BufferedReader reader = null;
+
+        try {
+            URL url = new URL(rmSchedulerInfoURL);
+            conn = new AuthenticatedURL(kAUTHENTICATOR).openConnection(url, 
authenticationToken);
+            reader = new BufferedReader(new 
InputStreamReader(conn.getInputStream()));
+            StringBuilder jsonResponse =  new StringBuilder();
+            String line;
+            while ((line = reader.readLine()) != null) {
+                jsonResponse.append(line);
+            }
+            return jsonResponse.toString();
+        } catch (Exception ex) {
+            throw new RuntimeException("Could not authenticate, " + 
ex.getMessage(), ex);
+        } finally {
+            IOUtils.closeQuietly(reader);
+            if (conn != null) {
+                conn.disconnect();
+            }
+        }
+
+    }
+
+    /**
+     *
+     *
+     * @param jsonResult
+     * @param qNames
+     * @return
+     * @throws JSONException
+     */
+
+    public static Set<String> getHadoopClusterQueueNamesHelper(String 
jsonResult, Set<String> qNames)
+        throws JSONException {
+        String qJson = extractRootQueuesElement(jsonResult);
+        LOG.debug("Extracted Queue JSON - {}", qJson);
+        JSONObject jObject = new JSONObject(qJson);
+        LOG.debug("Parsing Json result done");
+        JSONObject queues = jObject.getJSONObject("queues");
+        jsonParseForHadoopQueueNames(queues, qNames);
+        return qNames;
+    }
+
+    /**
+     * Recursively parses JSON hadoop cluster scheduler info and returns all 
the sub queue names in the output
+     * parameter.
+     *
+     * @param queues JSON document queues element
+     * @param qNames Output parameter that will have all hadoop cluster queue 
names
+     * @throws JSONException
+     *
+     */
+    public static void jsonParseForHadoopQueueNames(JSONObject queues, 
Set<String> qNames) throws JSONException {
+        JSONArray qs = queues.getJSONArray("queue");
+        for(int i=0; i<qs.length(); i++) {
+            JSONObject q = qs.getJSONObject(i);
+            qNames.add(q.getString("queueName"));
+
+            if ((q.isNull("type"))
+                    || 
(!q.getString("type").equalsIgnoreCase("capacitySchedulerLeafQueueInfo"))) {
+                jsonParseForHadoopQueueNames(q.getJSONObject("queues"), 
qNames);
+            }
+        }
+    }
+    /**
+     * Parse the hadoop cluster scheduler info to extract JSON element 
'queues'.
+     *
+     * NOTE: the JSON returned by Resource Manager REST API is not well formed
+     * and trying to parse the entire returned document results in parse 
exception
+     * using latest JSON parsers.
+     *
+     * @param json
+     * @return
+     */
+
+    public static String extractRootQueuesElement(String json) {
+        int start = json.indexOf("\"queues\":");
+        int i = start;
+        while(json.charAt(i) != '{') {
+            i++;
+        }
+        i++;
+        int count = 1;
+        while (count != 0) {
+            if (json.charAt(i) == '{') {
+                count++;
+            } else if (json.charAt(i) == '}') {
+                count--;
+            }
+            i++;
+        }
+        return "{" + json.substring(start, i) + "}";
+    }
+
+    /**
+     * Retrieves scheduler info JSON from the resource manager and extracts 
hadoop cluster queue names into
+     * a set of strings.
+     *
+     * @param rmBaseUrlStr
+     * @return
+     * @throws FalconException
+     */
+
+    public static Set<String> getHadoopClusterQueueNames(String rmBaseUrlStr) 
throws FalconException {
+        String jsonResult = getHadoopClusterSchedulerInfo(rmBaseUrlStr);
+        LOG.debug("Scheduler Info Result : {} ", jsonResult);
+        Set<String> qNames = new HashSet<>();
+        try {
+            return getHadoopClusterQueueNamesHelper(jsonResult, qNames);
+        } catch(JSONException jex) {
+            throw new FalconException(jex);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/9979a1fd/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java 
b/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java
index 95d10c4..450b251 100644
--- a/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java
@@ -941,7 +941,6 @@ public class FeedHelperTest extends AbstractTestBase {
         Assert.assertEquals(startInstResult, 
feed.getClusters().getClusters().get(0).getValidity().getStart());
     }
 
-    @Test
     public void testGetFeedClusterValidity() throws  Exception {
         Cluster cluster = publishCluster();
         Feed feed = publishFeed(cluster, "hours(1)",  "2012-02-07 00:00 UTC", 
"2020-02-25 00:00 UTC");

http://git-wip-us.apache.org/repos/asf/falcon/blob/9979a1fd/common/src/test/java/org/apache/falcon/util/HadoopQueueUtilTest.java
----------------------------------------------------------------------
diff --git 
a/common/src/test/java/org/apache/falcon/util/HadoopQueueUtilTest.java 
b/common/src/test/java/org/apache/falcon/util/HadoopQueueUtilTest.java
new file mode 100644
index 0000000..bb37343
--- /dev/null
+++ b/common/src/test/java/org/apache/falcon/util/HadoopQueueUtilTest.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.util;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Utilities for falcon unit tests.
+ */
+public final class HadoopQueueUtilTest {
+
+    @Test
+    public void testGetHadoopClusterQueueNamesHelper1() throws Exception {
+        final InputStream inputStream = 
this.getClass().getResourceAsStream("/config/feed/feed-schedulerinfo-1.json");
+        BufferedReader br = new BufferedReader(new 
InputStreamReader(inputStream));
+        String jsonResult = "";
+        String line;
+        while((line = br.readLine()) != null) {
+            jsonResult += line;
+        }
+        Set<String> qNames = new HashSet<>();
+        HadoopQueueUtil.getHadoopClusterQueueNamesHelper(jsonResult, qNames);
+        Assert.assertEquals(qNames.size(), 9);
+    }
+
+    @Test
+    public void testGetHadoopClusterQueueNamesHelper2() throws Exception {
+        final InputStream inputStream = 
this.getClass().getResourceAsStream("/config/feed/feed-schedulerinfo-2.json");
+        BufferedReader br = new BufferedReader(new 
InputStreamReader(inputStream));
+        String jsonResult = "";
+        String line;
+        while((line = br.readLine()) != null) {
+            jsonResult += line;
+        }
+        Set<String> qNames = new HashSet<>();
+        HadoopQueueUtil.getHadoopClusterQueueNamesHelper(jsonResult, qNames);
+        Assert.assertTrue(qNames.contains("default"));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/9979a1fd/common/src/test/resources/config/feed/feed-schedulerinfo-1.json
----------------------------------------------------------------------
diff --git a/common/src/test/resources/config/feed/feed-schedulerinfo-1.json 
b/common/src/test/resources/config/feed/feed-schedulerinfo-1.json
new file mode 100644
index 0000000..6525c7d
--- /dev/null
+++ b/common/src/test/resources/config/feed/feed-schedulerinfo-1.json
@@ -0,0 +1,276 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+{
+    "scheduler": {
+        "schedulerInfo": {
+            "capacity": 100.0,
+            "maxCapacity": 100.0,
+            "queueName": "root",
+            "queues": {
+                "queue": [
+                    {
+                        "absoluteCapacity": 10.5,
+                        "absoluteMaxCapacity": 50.0,
+                        "absoluteUsedCapacity": 0.0,
+                        "capacity": 10.5,
+                        "maxCapacity": 50.0,
+                        "numApplications": 0,
+                        "queueName": "a",
+                        "queues": {
+                            "queue": [
+                                {
+                                    "absoluteCapacity": 3.15,
+                                    "absoluteMaxCapacity": 25.0,
+                                    "absoluteUsedCapacity": 0.0,
+                                    "capacity": 30.000002,
+                                    "maxCapacity": 50.0,
+                                    "numApplications": 0,
+                                    "queueName": "a1",
+                                    "queues": {
+                                        "queue": [
+                                            {
+                                                "absoluteCapacity": 2.6775,
+                                                "absoluteMaxCapacity": 25.0,
+                                                "absoluteUsedCapacity": 0.0,
+                                                "capacity": 85.0,
+                                                "maxActiveApplications": 1,
+                                                
"maxActiveApplicationsPerUser": 1,
+                                                "maxApplications": 267,
+                                                "maxApplicationsPerUser": 267,
+                                                "maxCapacity": 100.0,
+                                                "numActiveApplications": 0,
+                                                "numApplications": 0,
+                                                "numContainers": 0,
+                                                "numPendingApplications": 0,
+                                                "queueName": "a1a",
+                                                "resourcesUsed": {
+                                                    "memory": 0,
+                                                    "vCores": 0
+                                                },
+                                                "state": "RUNNING",
+                                                "type": 
"capacitySchedulerLeafQueueInfo",
+                                                "usedCapacity": 0.0,
+                                                "usedResources": "<memory:0, 
vCores:0>",
+                                                "userLimit": 100,
+                                                "userLimitFactor": 1.0,
+                                                "users": null
+                                            },
+                                            {
+                                                "absoluteCapacity": 0.47250003,
+                                                "absoluteMaxCapacity": 25.0,
+                                                "absoluteUsedCapacity": 0.0,
+                                                "capacity": 15.000001,
+                                                "maxActiveApplications": 1,
+                                                
"maxActiveApplicationsPerUser": 1,
+                                                "maxApplications": 47,
+                                                "maxApplicationsPerUser": 47,
+                                                "maxCapacity": 100.0,
+                                                "numActiveApplications": 0,
+                                                "numApplications": 0,
+                                                "numContainers": 0,
+                                                "numPendingApplications": 0,
+                                                "queueName": "a1b",
+                                                "resourcesUsed": {
+                                                    "memory": 0,
+                                                    "vCores": 0
+                                                },
+                                                "state": "RUNNING",
+                                                "type": 
"capacitySchedulerLeafQueueInfo",
+                                                "usedCapacity": 0.0,
+                                                "usedResources": "<memory:0, 
vCores:0>",
+                                                "userLimit": 100,
+                                                "userLimitFactor": 1.0,
+                                                "users": null
+                                            }
+                                        ]
+                                    },
+                                    "resourcesUsed": {
+                                        "memory": 0,
+                                        "vCores": 0
+                                    },
+                                    "state": "RUNNING",
+                                    "usedCapacity": 0.0,
+                                    "usedResources": "<memory:0, vCores:0>"
+                                },
+                                {
+                                    "absoluteCapacity": 7.35,
+                                    "absoluteMaxCapacity": 50.0,
+                                    "absoluteUsedCapacity": 0.0,
+                                    "capacity": 70.0,
+                                    "maxActiveApplications": 1,
+                                    "maxActiveApplicationsPerUser": 100,
+                                    "maxApplications": 735,
+                                    "maxApplicationsPerUser": 73500,
+                                    "maxCapacity": 100.0,
+                                    "numActiveApplications": 0,
+                                    "numApplications": 0,
+                                    "numContainers": 0,
+                                    "numPendingApplications": 0,
+                                    "queueName": "a2",
+                                    "resourcesUsed": {
+                                        "memory": 0,
+                                        "vCores": 0
+                                    },
+                                    "state": "RUNNING",
+                                    "type": "capacitySchedulerLeafQueueInfo",
+                                    "usedCapacity": 0.0,
+                                    "usedResources": "<memory:0, vCores:0>",
+                                    "userLimit": 100,
+                                    "userLimitFactor": 100.0,
+                                    "users": null
+                                }
+                            ]
+                        },
+                        "resourcesUsed": {
+                            "memory": 0,
+                            "vCores": 0
+                        },
+                        "state": "RUNNING",
+                        "usedCapacity": 0.0,
+                        "usedResources": "<memory:0, vCores:0>"
+                    },
+                    {
+                        "absoluteCapacity": 89.5,
+                        "absoluteMaxCapacity": 100.0,
+                        "absoluteUsedCapacity": 0.0,
+                        "capacity": 89.5,
+                        "maxCapacity": 100.0,
+                        "numApplications": 2,
+                        "queueName": "b",
+                        "queues": {
+                            "queue": [
+                                {
+                                    "absoluteCapacity": 53.7,
+                                    "absoluteMaxCapacity": 100.0,
+                                    "absoluteUsedCapacity": 0.0,
+                                    "capacity": 60.000004,
+                                    "maxActiveApplications": 1,
+                                    "maxActiveApplicationsPerUser": 100,
+                                    "maxApplications": 5370,
+                                    "maxApplicationsPerUser": 537000,
+                                    "maxCapacity": 100.0,
+                                    "numActiveApplications": 1,
+                                    "numApplications": 2,
+                                    "numContainers": 0,
+                                    "numPendingApplications": 1,
+                                    "queueName": "b1",
+                                    "resourcesUsed": {
+                                        "memory": 0,
+                                        "vCores": 0
+                                    },
+                                    "state": "RUNNING",
+                                    "type": "capacitySchedulerLeafQueueInfo",
+                                    "usedCapacity": 0.0,
+                                    "usedResources": "<memory:0, vCores:0>",
+                                    "userLimit": 100,
+                                    "userLimitFactor": 100.0,
+                                    "users": {
+                                        "user": [
+                                            {
+                                                "numActiveApplications": 0,
+                                                "numPendingApplications": 1,
+                                                "resourcesUsed": {
+                                                    "memory": 0,
+                                                    "vCores": 0
+                                                },
+                                                "username": "user2"
+                                            },
+                                            {
+                                                "numActiveApplications": 1,
+                                                "numPendingApplications": 0,
+                                                "resourcesUsed": {
+                                                    "memory": 0,
+                                                    "vCores": 0
+                                                },
+                                                "username": "user1"
+                                            }
+                                        ]
+                                    }
+                                },
+                                {
+                                    "absoluteCapacity": 35.3525,
+                                    "absoluteMaxCapacity": 100.0,
+                                    "absoluteUsedCapacity": 0.0,
+                                    "capacity": 39.5,
+                                    "maxActiveApplications": 1,
+                                    "maxActiveApplicationsPerUser": 100,
+                                    "maxApplications": 3535,
+                                    "maxApplicationsPerUser": 353500,
+                                    "maxCapacity": 100.0,
+                                    "numActiveApplications": 0,
+                                    "numApplications": 0,
+                                    "numContainers": 0,
+                                    "numPendingApplications": 0,
+                                    "queueName": "b2",
+                                    "resourcesUsed": {
+                                        "memory": 0,
+                                        "vCores": 0
+                                    },
+                                    "state": "RUNNING",
+                                    "type": "capacitySchedulerLeafQueueInfo",
+                                    "usedCapacity": 0.0,
+                                    "usedResources": "<memory:0, vCores:0>",
+                                    "userLimit": 100,
+                                    "userLimitFactor": 100.0,
+                                    "users": null
+                                },
+                                {
+                                    "absoluteCapacity": 0.4475,
+                                    "absoluteMaxCapacity": 100.0,
+                                    "absoluteUsedCapacity": 0.0,
+                                    "capacity": 0.5,
+                                    "maxActiveApplications": 1,
+                                    "maxActiveApplicationsPerUser": 100,
+                                    "maxApplications": 44,
+                                    "maxApplicationsPerUser": 4400,
+                                    "maxCapacity": 100.0,
+                                    "numActiveApplications": 0,
+                                    "numApplications": 0,
+                                    "numContainers": 0,
+                                    "numPendingApplications": 0,
+                                    "queueName": "b3",
+                                    "resourcesUsed": {
+                                        "memory": 0,
+                                        "vCores": 0
+                                    },
+                                    "state": "RUNNING",
+                                    "type": "capacitySchedulerLeafQueueInfo",
+                                    "usedCapacity": 0.0,
+                                    "usedResources": "<memory:0, vCores:0>",
+                                    "userLimit": 100,
+                                    "userLimitFactor": 100.0,
+                                    "users": null
+                                }
+                            ]
+                        },
+                        "resourcesUsed": {
+                            "memory": 0,
+                            "vCores": 0
+                        },
+                        "state": "RUNNING",
+                        "usedCapacity": 0.0,
+                        "usedResources": "<memory:0, vCores:0>"
+                    }
+                ]
+            },
+            "type": "capacityScheduler",
+            "usedCapacity": 0.0
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/9979a1fd/common/src/test/resources/config/feed/feed-schedulerinfo-2.json
----------------------------------------------------------------------
diff --git a/common/src/test/resources/config/feed/feed-schedulerinfo-2.json 
b/common/src/test/resources/config/feed/feed-schedulerinfo-2.json
new file mode 100644
index 0000000..d49bc81
--- /dev/null
+++ b/common/src/test/resources/config/feed/feed-schedulerinfo-2.json
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+{"scheduler":{"schedulerInfo":{"type":"capacityScheduler","capacity":100.0,"usedCapacity":0.0,"maxCapacity":100.0,"queueName":"root","queues":{"queue":[{"type":"capacitySchedulerLeafQueueInfo","capacity":100.0,"usedCapacity":0.0,"maxCapacity":100.0,"absoluteCapacity":100.0,"absoluteMaxCapacity":100.0,"absoluteUsedCapacity":0.0,"numApplications":0,"queueName":"default","state":"RUNNING","resourcesUsed":{"memory":0,"vCores":0},"hideReservationQueues":false,"nodeLabels":["*"],"numActiveApplications":0,"numPendingApplications":0,"numContainers":0,"maxApplications":10000,"maxApplicationsPerUser":10000,"userLimit":100,"users":null,"userLimitFactor":1.0,"AMResourceLimit":{"memory":2046,"vCores":1},"usedAMResource":{"memory":0,"vCores":0},"userAMResourceLimit":{"memory":2046,"vCores":1},"preemptionDisabled":true}]},"health":{"lastrun":1451957838430,"operationsInfo":{"entry":{"key":"last-preemption","value":{"nodeId":"N/A","containerId":"N/A","queue":"N/A"}},"entry":{"key":"last-reservation"
 
,"value":{"nodeId":"N/A","containerId":"N/A","queue":"N/A"}},"entry":{"key":"last-allocation","value":{"nodeId":"c6401.ambari.apache.org:45454","containerId":"container_e11_1450120354929_5040_01_000002","queue":"root.default"}},"entry":{"key":"last-release","value":{"nodeId":"c6401.ambari.apache.org:45454","containerId":"container_e11_1450120354929_5040_01_000001","queue":"root.default"}}},"lastRunDetails":[{"operation":"releases","count":0,"resources":{"memory":0,"vCores":0}},{"operation":"allocations","count":0,"resources":{"memory":0,"vCores":0}},{"operation":"reservations","count":0,"resources":{"memory":0,"vCores":0}}]}}}}

Reply via email to