Repository: airavata-sandbox Updated Branches: refs/heads/master 8f9229d5c -> ec6ba29a8
mavening the code Project: http://git-wip-us.apache.org/repos/asf/airavata-sandbox/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata-sandbox/commit/ec6ba29a Tree: http://git-wip-us.apache.org/repos/asf/airavata-sandbox/tree/ec6ba29a Diff: http://git-wip-us.apache.org/repos/asf/airavata-sandbox/diff/ec6ba29a Branch: refs/heads/master Commit: ec6ba29a823677bd10fefb7fd172d2d4fa14a127 Parents: 8f9229d Author: Suresh Marru <[email protected]> Authored: Tue Aug 19 08:14:52 2014 -0400 Committer: Suresh Marru <[email protected]> Committed: Tue Aug 19 08:14:52 2014 -0400 ---------------------------------------------------------------------- job-throttler/pom.xml | 28 +++ job-throttler/src/MetaScheduleTest.java | 105 --------- job-throttler/src/MetaScheduler.java | 229 ------------------ .../scheduler/jobthrottler/MetaScheduler.java | 230 +++++++++++++++++++ .../jobthrottler/MetaSchedulerTest.java | 107 +++++++++ 5 files changed, 365 insertions(+), 334 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/ec6ba29a/job-throttler/pom.xml ---------------------------------------------------------------------- diff --git a/job-throttler/pom.xml b/job-throttler/pom.xml new file mode 100644 index 0000000..9510206 --- /dev/null +++ b/job-throttler/pom.xml @@ -0,0 +1,28 @@ +<?xml version="1.0" encoding="UTF-8"?> + +<!--Licensed to the Apache Software Foundation (ASF) under one or more contributor + license agreements. See the NOTICE file distributed with this work for additional + information regarding copyright ownership. The ASF licenses this file to + you under the Apache License, Version 2.0 (theà "License"); you may not use + this file except in compliance with the License. You may obtain a copy of + the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required + by applicable law or agreed to in writing, software distributed under the + License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS + OF ANY ~ KIND, either express or implied. See the License for the specific + language governing permissions and limitations under the License. --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <!-- the version of maven's project object model --> + + <modelVersion>4.0.0</modelVersion> + <artifactId>airavata-job-throttler</artifactId> + <name>Airavata Job Throttler</name> + <version>0.1-SNAPSHOT</version> + <packaging>jar</packaging> + <groupId>org.apache.airavata</groupId> + <url>http://airavata.apache.org/</url> + + +</project> + http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/ec6ba29a/job-throttler/src/MetaScheduleTest.java ---------------------------------------------------------------------- diff --git a/job-throttler/src/MetaScheduleTest.java b/job-throttler/src/MetaScheduleTest.java deleted file mode 100644 index f59f920..0000000 --- a/job-throttler/src/MetaScheduleTest.java +++ /dev/null @@ -1,105 +0,0 @@ - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -import java.util.*; - -public class MetaScheduleTest { - - public static void main(String[] args) { - ArrayList<String> experimentData = new ArrayList<String>(); - ArrayList<String> statusData = new ArrayList<String>(); - Random rand = new Random(); - String hostID = ""; - String queueID = ""; - String gatewayID = ""; - int submitCtr = 0; - int holdCtr = 0; - int errorCtr = 0; - - String sarg0 = args[0]; - if (sarg0.equals("clear")) { - System.out.println("Clearing Active Jobs"); - MetaScheduler.clearActiveJobs(); - } - - if ((sarg0.equals("run")) || (sarg0.equals("end"))) { - int runCount = Integer.parseInt(args[1]); - int n = rand.nextInt(100); - hostID = "quarry.uits.iu.edu"; - if (n < 50) { - hostID = "bigred2.uits.iu.edu"; - } - queueID = "debug"; - n = rand.nextInt(100); - if (n < 25) { - queueID = "serial"; - } - n = rand.nextInt(100); - if (n < 25) { - queueID = "normal"; - } - n = rand.nextInt(100); - if (n < 25) { - queueID = "long"; - } - n = rand.nextInt(100); - gatewayID = "Gateway1"; - n = rand.nextInt(100); - if (n < 50) { - gatewayID = "Gateway2"; - } - long currentTime = ((long) System.currentTimeMillis()) / 99; - for (int i=0;i<runCount;i++) { - experimentData.add(hostID); - experimentData.add(queueID); - experimentData.add(gatewayID); - experimentData.add(Long.toString(currentTime+i)); - } - if (sarg0.equals("run")) { - System.out.println("Scheduling " + runCount + " jobs on " - + hostID + " : " + queueID + " for " + gatewayID); - statusData = MetaScheduler.submitThrottleJob(experimentData); - // display results summarized - int dataNDX = 0; - while (dataNDX < statusData.size()) { - if (statusData.get(dataNDX).equals("SUBMIT")) { - submitCtr++; - } - if (statusData.get(dataNDX).equals("HOLD")) { - holdCtr++; - } - if (statusData.get(dataNDX).equals("ERROR")) { - errorCtr++; - } - dataNDX++; - } - System.out.println("Job States: submit=" + submitCtr + " hold=" - + holdCtr + " error=" + errorCtr); - } - if (sarg0.equals("end")) { - System.out.println("Changing Status of " + runCount + " jobs on " - + hostID + " : " + queueID + " for " + gatewayID); - MetaScheduler.changeJobStatus(experimentData); - } - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/ec6ba29a/job-throttler/src/MetaScheduler.java ---------------------------------------------------------------------- diff --git a/job-throttler/src/MetaScheduler.java b/job-throttler/src/MetaScheduler.java deleted file mode 100644 index 02f7047..0000000 --- a/job-throttler/src/MetaScheduler.java +++ /dev/null @@ -1,229 +0,0 @@ - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -import java.sql.*; -import java.util.ArrayList; - -public class MetaScheduler { - - public static Connection mySQLConnection() { - String jdbcDriver = "com.mysql.jdbc.Driver"; - String jdbcUser="******"; - String jdbcPwd="********"; - String jdbcUrl="jdbc:mysql://rdc04.uits.iu.edu:3059/scheduler"; - Connection connect = null; - try { - Class.forName(jdbcDriver).newInstance(); - connect = DriverManager.getConnection(jdbcUrl, jdbcUser, jdbcPwd); - } - - catch (Exception e) { - System.out.println("Connection to mysql Failed!"); - e.printStackTrace(); - return(null); - } - return(connect); - } - - public static ArrayList<String> submitThrottleJob(ArrayList<String> experimentData) { - ArrayList<String> statusData = new ArrayList<String>(); - String hostID = ""; - String queueID = ""; - String experimentID = ""; - String gatewayID = ""; - String jobStatus = ""; - int activeJobs = 0; - int queueLimit = 0; - - Connection conn = mySQLConnection(); - - int dataNDX = 0; - while (dataNDX < experimentData.size()) { - hostID = experimentData.get(dataNDX); - queueID = experimentData.get(dataNDX+1); - gatewayID = experimentData.get(dataNDX+2); - experimentID = experimentData.get(dataNDX+3); - queueLimit = getQueueLimits(hostID,queueID,conn); - activeJobs = getActiveJobs(gatewayID, hostID, queueID, conn); - jobStatus = getJobStatus(activeJobs, queueLimit); - if (!updateActiveJobs(gatewayID, hostID, queueID, experimentID, jobStatus, conn)) - jobStatus = "ERROR"; - statusData.add(jobStatus); - dataNDX += 4; - } - return(statusData); - } - - public static Boolean updateActiveJobs(String gatewayID, String hostID, - String queueID, String jobID, String jobStatus, Connection conn ) { - try { - try { - String sql = "insert into activejobs " - + " (gatewayID, hostID, queueName, jobID, jobState) VALUES " - + " (?,?,?,?,?) "; - PreparedStatement insertSQL = conn.prepareStatement(sql); - insertSQL.setString(1, gatewayID); - insertSQL.setString(2, hostID); - insertSQL.setString(3, queueID); - insertSQL.setString(4, jobID); - insertSQL.setString(5, jobStatus); - insertSQL.executeUpdate(); - - } finally { - - } - } catch (SQLException e) { - return false; - } - return(true); - } - - public static String getJobStatus(int activeJobs, int queueLimit) { - String jobStatus = ""; - if (queueLimit > activeJobs) - jobStatus = "SUBMIT"; - else - jobStatus = "HOLD"; - return(jobStatus); - } - - public static int getActiveJobs(String gatewayID, String hostID, String queueID, Connection conn) { - int activeJobs = 0; - try { - Statement statement = null; - try { - statement = conn.createStatement(); - String sql = "select count(*) from activejobs where " + - " gatewayID = ? and hostID = ? and queueName = ?"; - PreparedStatement updateSQL = conn.prepareStatement(sql); - updateSQL.setString(1, gatewayID); - updateSQL.setString(2, hostID); - updateSQL.setString(3, queueID); - ResultSet rs = updateSQL.executeQuery(); - if (rs != null) { - rs.next(); - activeJobs = rs.getInt(1); - rs.close(); - } - } finally { - try { - if (statement != null) { - statement.close(); - } - } catch (SQLException e) { - return -1; - } - } - } catch (SQLException e) { - return -1; - } - return activeJobs; - } - - public static int getQueueLimits(String hostID, String queueID, Connection conn) { - int queueLimit = 0; - try { - Statement statement = null; - try { - statement = conn.createStatement(); - String sql = "select * from queuelimits where hostID = ? and queueName = ?"; - PreparedStatement updateSQL = conn.prepareStatement(sql); - updateSQL.setString(1, hostID); - updateSQL.setString(2, queueID); - ResultSet rs = updateSQL.executeQuery(); - if (rs != null) { - while (rs.next()) { - queueLimit = (Integer) rs.getObject("queueLimit"); - } - rs.close(); - } - } finally { - try { - if (statement != null) { - statement.close(); - } - } catch (SQLException e) { - return -1; - } - } - } catch (SQLException e) { - return -1; - } - return queueLimit; - } - - public static Boolean clearActiveJobs() { - try { - Connection conn = mySQLConnection(); - Statement statement = null; - try { - statement = conn.createStatement(); - String sql = "delete from activejobs"; - statement.executeUpdate(sql); - } finally { - try { - if (statement != null) { - statement.close(); - } - } catch (SQLException e) { - return false; - } - } - } catch (SQLException e) { - return false; - } - return(true); - } - - public static Boolean changeJobStatus(ArrayList<String> experimentData) { - String hostID = ""; - String queueID = ""; - String gatewayID = ""; - int deleteCtr = (experimentData.size() + 1) / 4; - - int dataNDX = 0; - if (deleteCtr > 0) { - hostID = experimentData.get(dataNDX); - queueID = experimentData.get(dataNDX+1); - gatewayID = experimentData.get(dataNDX+2); - } - try { - Connection conn = mySQLConnection(); - try { - // not really meaningful, but useful for testing - String sql = "delete top (?) from activejobs where " + - "hostID = ? and queueID = ? and gatewayID = ? "; - PreparedStatement updateSQL = conn.prepareStatement(sql); - updateSQL.setInt(1, deleteCtr); - updateSQL.setString(2, hostID); - updateSQL.setString(3, queueID); - updateSQL.setString(4, gatewayID); - updateSQL.executeQuery(); - } finally { - - } - } catch (SQLException e) { - return false; - } - return(true); - } -} http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/ec6ba29a/job-throttler/src/main/java/org/apache/airavata/scheduler/jobthrottler/MetaScheduler.java ---------------------------------------------------------------------- diff --git a/job-throttler/src/main/java/org/apache/airavata/scheduler/jobthrottler/MetaScheduler.java b/job-throttler/src/main/java/org/apache/airavata/scheduler/jobthrottler/MetaScheduler.java new file mode 100644 index 0000000..cbba3e3 --- /dev/null +++ b/job-throttler/src/main/java/org/apache/airavata/scheduler/jobthrottler/MetaScheduler.java @@ -0,0 +1,230 @@ + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.airavata.scheduler.jobthrottler; + +import java.sql.*; +import java.util.ArrayList; + +public class MetaScheduler { + + public static Connection mySQLConnection() { + String jdbcDriver = "com.mysql.jdbc.Driver"; + String jdbcUser="******"; + String jdbcPwd="********"; + String jdbcUrl="jdbc:mysql://rdc04.uits.iu.edu:3059/scheduler"; + Connection connect = null; + try { + Class.forName(jdbcDriver).newInstance(); + connect = DriverManager.getConnection(jdbcUrl, jdbcUser, jdbcPwd); + } + + catch (Exception e) { + System.out.println("Connection to mysql Failed!"); + e.printStackTrace(); + return(null); + } + return(connect); + } + + public static ArrayList<String> submitThrottleJob(ArrayList<String> experimentData) { + ArrayList<String> statusData = new ArrayList<String>(); + String hostID = ""; + String queueID = ""; + String experimentID = ""; + String gatewayID = ""; + String jobStatus = ""; + int activeJobs = 0; + int queueLimit = 0; + + Connection conn = mySQLConnection(); + + int dataNDX = 0; + while (dataNDX < experimentData.size()) { + hostID = experimentData.get(dataNDX); + queueID = experimentData.get(dataNDX+1); + gatewayID = experimentData.get(dataNDX+2); + experimentID = experimentData.get(dataNDX+3); + queueLimit = getQueueLimits(hostID,queueID,conn); + activeJobs = getActiveJobs(gatewayID, hostID, queueID, conn); + jobStatus = getJobStatus(activeJobs, queueLimit); + if (!updateActiveJobs(gatewayID, hostID, queueID, experimentID, jobStatus, conn)) + jobStatus = "ERROR"; + statusData.add(jobStatus); + dataNDX += 4; + } + return(statusData); + } + + public static Boolean updateActiveJobs(String gatewayID, String hostID, + String queueID, String jobID, String jobStatus, Connection conn ) { + try { + try { + String sql = "insert into activejobs " + + " (gatewayID, hostID, queueName, jobID, jobState) VALUES " + + " (?,?,?,?,?) "; + PreparedStatement insertSQL = conn.prepareStatement(sql); + insertSQL.setString(1, gatewayID); + insertSQL.setString(2, hostID); + insertSQL.setString(3, queueID); + insertSQL.setString(4, jobID); + insertSQL.setString(5, jobStatus); + insertSQL.executeUpdate(); + + } finally { + + } + } catch (SQLException e) { + return false; + } + return(true); + } + + public static String getJobStatus(int activeJobs, int queueLimit) { + String jobStatus = ""; + if (queueLimit > activeJobs) + jobStatus = "SUBMIT"; + else + jobStatus = "HOLD"; + return(jobStatus); + } + + public static int getActiveJobs(String gatewayID, String hostID, String queueID, Connection conn) { + int activeJobs = 0; + try { + Statement statement = null; + try { + statement = conn.createStatement(); + String sql = "select count(*) from activejobs where " + + " gatewayID = ? and hostID = ? and queueName = ?"; + PreparedStatement updateSQL = conn.prepareStatement(sql); + updateSQL.setString(1, gatewayID); + updateSQL.setString(2, hostID); + updateSQL.setString(3, queueID); + ResultSet rs = updateSQL.executeQuery(); + if (rs != null) { + rs.next(); + activeJobs = rs.getInt(1); + rs.close(); + } + } finally { + try { + if (statement != null) { + statement.close(); + } + } catch (SQLException e) { + return -1; + } + } + } catch (SQLException e) { + return -1; + } + return activeJobs; + } + + public static int getQueueLimits(String hostID, String queueID, Connection conn) { + int queueLimit = 0; + try { + Statement statement = null; + try { + statement = conn.createStatement(); + String sql = "select * from queuelimits where hostID = ? and queueName = ?"; + PreparedStatement updateSQL = conn.prepareStatement(sql); + updateSQL.setString(1, hostID); + updateSQL.setString(2, queueID); + ResultSet rs = updateSQL.executeQuery(); + if (rs != null) { + while (rs.next()) { + queueLimit = (Integer) rs.getObject("queueLimit"); + } + rs.close(); + } + } finally { + try { + if (statement != null) { + statement.close(); + } + } catch (SQLException e) { + return -1; + } + } + } catch (SQLException e) { + return -1; + } + return queueLimit; + } + + public static Boolean clearActiveJobs() { + try { + Connection conn = mySQLConnection(); + Statement statement = null; + try { + statement = conn.createStatement(); + String sql = "delete from activejobs"; + statement.executeUpdate(sql); + } finally { + try { + if (statement != null) { + statement.close(); + } + } catch (SQLException e) { + return false; + } + } + } catch (SQLException e) { + return false; + } + return(true); + } + + public static Boolean changeJobStatus(ArrayList<String> experimentData) { + String hostID = ""; + String queueID = ""; + String gatewayID = ""; + int deleteCtr = (experimentData.size() + 1) / 4; + + int dataNDX = 0; + if (deleteCtr > 0) { + hostID = experimentData.get(dataNDX); + queueID = experimentData.get(dataNDX+1); + gatewayID = experimentData.get(dataNDX+2); + } + try { + Connection conn = mySQLConnection(); + try { + // not really meaningful, but useful for testing + String sql = "delete top (?) from activejobs where " + + "hostID = ? and queueID = ? and gatewayID = ? "; + PreparedStatement updateSQL = conn.prepareStatement(sql); + updateSQL.setInt(1, deleteCtr); + updateSQL.setString(2, hostID); + updateSQL.setString(3, queueID); + updateSQL.setString(4, gatewayID); + updateSQL.executeQuery(); + } finally { + + } + } catch (SQLException e) { + return false; + } + return(true); + } +} http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/ec6ba29a/job-throttler/src/test/java/org/apache/airavata/scheduler/jobthrottler/MetaSchedulerTest.java ---------------------------------------------------------------------- diff --git a/job-throttler/src/test/java/org/apache/airavata/scheduler/jobthrottler/MetaSchedulerTest.java b/job-throttler/src/test/java/org/apache/airavata/scheduler/jobthrottler/MetaSchedulerTest.java new file mode 100644 index 0000000..08348cc --- /dev/null +++ b/job-throttler/src/test/java/org/apache/airavata/scheduler/jobthrottler/MetaSchedulerTest.java @@ -0,0 +1,107 @@ + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.scheduler.jobthrottler; + +import java.util.*; + +public class MetaSchedulerTest { + + public static void main(String[] args) { + ArrayList<String> experimentData = new ArrayList<String>(); + ArrayList<String> statusData = new ArrayList<String>(); + Random rand = new Random(); + String hostID = ""; + String queueID = ""; + String gatewayID = ""; + int submitCtr = 0; + int holdCtr = 0; + int errorCtr = 0; + + String sarg0 = args[0]; + if (sarg0.equals("clear")) { + System.out.println("Clearing Active Jobs"); + MetaScheduler.clearActiveJobs(); + } + + if ((sarg0.equals("run")) || (sarg0.equals("end"))) { + int runCount = Integer.parseInt(args[1]); + int n = rand.nextInt(100); + hostID = "quarry.uits.iu.edu"; + if (n < 50) { + hostID = "bigred2.uits.iu.edu"; + } + queueID = "debug"; + n = rand.nextInt(100); + if (n < 25) { + queueID = "serial"; + } + n = rand.nextInt(100); + if (n < 25) { + queueID = "normal"; + } + n = rand.nextInt(100); + if (n < 25) { + queueID = "long"; + } + n = rand.nextInt(100); + gatewayID = "Gateway1"; + n = rand.nextInt(100); + if (n < 50) { + gatewayID = "Gateway2"; + } + long currentTime = ((long) System.currentTimeMillis()) / 99; + for (int i=0;i<runCount;i++) { + experimentData.add(hostID); + experimentData.add(queueID); + experimentData.add(gatewayID); + experimentData.add(Long.toString(currentTime+i)); + } + if (sarg0.equals("run")) { + System.out.println("Scheduling " + runCount + " jobs on " + + hostID + " : " + queueID + " for " + gatewayID); + statusData = MetaScheduler.submitThrottleJob(experimentData); + // display results summarized + int dataNDX = 0; + while (dataNDX < statusData.size()) { + if (statusData.get(dataNDX).equals("SUBMIT")) { + submitCtr++; + } + if (statusData.get(dataNDX).equals("HOLD")) { + holdCtr++; + } + if (statusData.get(dataNDX).equals("ERROR")) { + errorCtr++; + } + dataNDX++; + } + System.out.println("Job States: submit=" + submitCtr + " hold=" + + holdCtr + " error=" + errorCtr); + } + if (sarg0.equals("end")) { + System.out.println("Changing Status of " + runCount + " jobs on " + + hostID + " : " + queueID + " for " + gatewayID); + MetaScheduler.changeJobStatus(experimentData); + } + } + } +} \ No newline at end of file
