Repository: airavata Updated Branches: refs/heads/master 985151fc1 -> a7baac58a
Adding new implementation for monitoring, AMQPbased monitoring : AIRAVATA-1022 Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/abb05c8f Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/abb05c8f Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/abb05c8f Branch: refs/heads/master Commit: abb05c8f6f9f17b7868dc2be2a97d1598aa1a2a9 Parents: b1a6edc Author: lahiru <[email protected]> Authored: Thu Feb 20 12:02:35 2014 -0500 Committer: lahiru <[email protected]> Committed: Thu Feb 20 12:02:35 2014 -0500 ---------------------------------------------------------------------- .../job/monitor/core/MessageParser.java | 45 ++++++ .../impl/push/amqp/JSONMessageParser.java | 35 ++++ .../job/monitor/util/AMQPConnectionUtil.java | 80 ++++++++++ .../airavata/job/monitor/util/CommonUtils.java | 32 ++++ .../airavata/job/monitor/util/X509Helper.java | 160 +++++++++++++++++++ .../src/main/resources/monitor.properties | 2 + 6 files changed, 354 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/abb05c8f/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/core/MessageParser.java ---------------------------------------------------------------------- diff --git a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/core/MessageParser.java b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/core/MessageParser.java new file mode 100644 index 0000000..4d54fae --- /dev/null +++ b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/core/MessageParser.java @@ -0,0 +1,45 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.job.monitor.core; + +import org.apache.airavata.job.monitor.MonitorID; +import org.apache.airavata.job.monitor.state.JobStatus; +import org.apache.airavata.model.experiment.JobState; + +/** + * This is an interface to implement messageparser, it could be + * pull based or push based still monitor has to parse the content of + * the message it gets from remote monitoring system and finalize + * them to internal job state, Ex: JSON parser for AMQP and Qstat reader + * for pull based monitor. + */ +public interface MessageParser { + /** + * This method is to implement how to parse the incoming message + * and implement a logic to finalize the status of the job, + * we have to makesure the correct message is given to the messageparser + * parse method, it will not do any filtering + * @param message content of the message + * @param monitorID monitorID object + * @return + */ + JobStatus parseMessage(String message,MonitorID monitorID); +} http://git-wip-us.apache.org/repos/asf/airavata/blob/abb05c8f/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/JSONMessageParser.java ---------------------------------------------------------------------- diff --git a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/JSONMessageParser.java b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/JSONMessageParser.java new file mode 100644 index 0000000..42ea2d0 --- /dev/null +++ b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/JSONMessageParser.java @@ -0,0 +1,35 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.job.monitor.impl.push.amqp; + +import org.apache.airavata.job.monitor.MonitorID; +import org.apache.airavata.job.monitor.core.MessageParser; +import org.apache.airavata.job.monitor.state.JobStatus; +import org.apache.hadoop.mapred.jobcontrol.Job; + +import javax.mail.search.MessageIDTerm; + +public class JSONMessageParser implements MessageParser { + public JobStatus parseMessage(String message,MonitorID monitorID) { + /*todo write a json message parser here*/ + return new JobStatus(); + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/abb05c8f/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/util/AMQPConnectionUtil.java ---------------------------------------------------------------------- diff --git a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/util/AMQPConnectionUtil.java b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/util/AMQPConnectionUtil.java new file mode 100644 index 0000000..f100b8f --- /dev/null +++ b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/util/AMQPConnectionUtil.java @@ -0,0 +1,80 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.job.monitor.util; + +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.client.DefaultSaslConfig; + +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.SSLContext; +import javax.net.ssl.TrustManagerFactory; +import java.security.KeyStore; +import java.util.Collections; +import java.util.Vector; + +public class AMQPConnectionUtil { + public static Connection connect(String vhost, String proxyFile) { + Vector<String> hosts = new Vector<String>(); + hosts.add("info1.dyn.teragrid.org"); + hosts.add("info2.dyn.teragrid.org"); + Collections.shuffle(hosts); + for (String host : hosts) { + Connection connection = connect(host, vhost, proxyFile); + if (host != null) { + System.out.println("connected to " + host); + return connection; + } + } + return null; + } + + public static Connection connect(String host, String vhost, String proxyFile) { + Connection connection; + try { + String keyPassPhrase = "test123"; + KeyStore ks = X509Helper.keyStoreFromPEM(proxyFile, keyPassPhrase); + KeyManagerFactory kmf = KeyManagerFactory.getInstance("SunX509"); + kmf.init(ks, keyPassPhrase.toCharArray()); + + KeyStore tks = X509Helper.trustKeyStoreFromCertDir(); + TrustManagerFactory tmf = TrustManagerFactory.getInstance("SunX509"); + tmf.init(tks); + + SSLContext c = SSLContext.getInstance("SSLv3"); + c.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null); + + ConnectionFactory factory = new ConnectionFactory(); + factory.setHost(host); + factory.setPort(5671); + factory.useSslProtocol(c); + factory.setVirtualHost(vhost); + factory.setSaslConfig(DefaultSaslConfig.EXTERNAL); + + connection = factory.newConnection(); + } catch (Exception e) { + e.printStackTrace(); + return null; + } + return connection; + } + +} http://git-wip-us.apache.org/repos/asf/airavata/blob/abb05c8f/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/util/CommonUtils.java ---------------------------------------------------------------------- diff --git a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/util/CommonUtils.java b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/util/CommonUtils.java new file mode 100644 index 0000000..2248ec3 --- /dev/null +++ b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/util/CommonUtils.java @@ -0,0 +1,32 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.job.monitor.util; + +import org.apache.airavata.job.monitor.MonitorID; + +public class CommonUtils { + public static String getChannelID(MonitorID monitorID) { + return monitorID.getUserName() + monitorID.getHost().getType().getHostName(); + } + public static String getRoutingKey(MonitorID monitorID) { + return "*." + monitorID.getUserName() + monitorID.getHost().getType().getHostAddress(); + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/abb05c8f/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/util/X509Helper.java ---------------------------------------------------------------------- diff --git a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/util/X509Helper.java b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/util/X509Helper.java new file mode 100644 index 0000000..2ed0b88 --- /dev/null +++ b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/util/X509Helper.java @@ -0,0 +1,160 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.job.monitor.util; + +import org.bouncycastle.openssl.PEMKeyPair; +import org.bouncycastle.openssl.PEMParser; +import org.bouncycastle.openssl.jcajce.JcaPEMKeyConverter; + +import java.io.*; +import java.security.*; +import java.security.cert.CertificateException; +import java.security.cert.CertificateFactory; +import java.security.cert.CertificateParsingException; +import java.security.cert.X509Certificate; +import java.security.spec.InvalidKeySpecException; + +public class X509Helper { + + static { + // parsing of RSA key fails without this + java.security.Security.addProvider(new org.bouncycastle.jce.provider.BouncyCastleProvider()); + } + + + + public static KeyStore keyStoreFromPEM(String proxyFile, + String keyPassPhrase) throws IOException, + CertificateException, + NoSuchAlgorithmException, + InvalidKeySpecException, + KeyStoreException { + return keyStoreFromPEM(proxyFile,proxyFile,keyPassPhrase); + } + + public static KeyStore keyStoreFromPEM(String certFile, + String keyFile, + String keyPassPhrase) throws IOException, + CertificateException, + NoSuchAlgorithmException, + InvalidKeySpecException, + KeyStoreException { + CertificateFactory cf = CertificateFactory.getInstance("X.509"); + X509Certificate cert = (X509Certificate)cf.generateCertificate(new FileInputStream(certFile)); + //System.out.println(cert.toString()); + + // this works for proxy files, too, since it skips over the certificate + BufferedReader reader = new BufferedReader(new FileReader(keyFile)); + String line = null; + StringBuilder builder = new StringBuilder(); + boolean inKey = false; + while((line=reader.readLine()) != null) { + if (line.contains("-----BEGIN RSA PRIVATE KEY-----")) { + inKey = true; + } + if (inKey) { + builder.append(line); + builder.append(System.getProperty("line.separator")); + } + if (line.contains("-----END RSA PRIVATE KEY-----")) { + inKey = false; + } + } + String privKeyPEM = builder.toString(); + //System.out.println(privKeyPEM); + + // using BouncyCastle + PEMParser pemParser = new PEMParser(new StringReader(privKeyPEM)); + Object object = pemParser.readObject(); + //System.out.println(object); + JcaPEMKeyConverter converter = new JcaPEMKeyConverter().setProvider("BC"); + KeyPair kp = converter.getKeyPair((PEMKeyPair)object); + PrivateKey privKey = kp.getPrivate(); + + // PEMParser from BouncyCastle is good for reading PEM files, but I didn't want to add that dependency + /* + // Base64 decode the data + byte[] encoded = javax.xml.bind.DatatypeConverter.parseBase64Binary(privKeyPEM); + + // PKCS8 decode the encoded RSA private key + java.security.spec.PKCS8EncodedKeySpec keySpec = new PKCS8EncodedKeySpec(encoded); + KeyFactory kf = KeyFactory.getInstance("RSA"); + PrivateKey privKey = kf.generatePrivate(keySpec); + //RSAPrivateKey privKey = (RSAPrivateKey)kf.generatePrivate(keySpec); + */ + //System.out.println(privKey.toString()); + + KeyStore keyStore = KeyStore.getInstance("PKCS12"); + keyStore.load(null,null); + + KeyStore.PrivateKeyEntry entry = + new KeyStore.PrivateKeyEntry(privKey, + new java.security.cert.Certificate[] {(java.security.cert.Certificate)cert}); + KeyStore.PasswordProtection prot = new KeyStore.PasswordProtection(keyPassPhrase.toCharArray()); + keyStore.setEntry(cert.getSubjectX500Principal().getName(), entry, prot); + + return keyStore; + } + + + public static KeyStore trustKeyStoreFromCertDir() throws IOException, + KeyStoreException, + CertificateException, + NoSuchAlgorithmException { + return trustKeyStoreFromCertDir("/Users/lahirugunathilake/Downloads/certificates"); + } + + public static KeyStore trustKeyStoreFromCertDir(String certDir) throws IOException, + KeyStoreException, + CertificateException, + NoSuchAlgorithmException { + KeyStore ks = KeyStore.getInstance("JKS"); + ks.load(null,null); + + File dir = new File(certDir); + for(File file : dir.listFiles()) { + if (!file.isFile()) { + continue; + } + if (!file.getName().endsWith(".0")) { + continue; + } + + try { + //System.out.println("reading file "+file.getName()); + CertificateFactory cf = CertificateFactory.getInstance("X.509"); + X509Certificate cert = (X509Certificate) cf.generateCertificate(new FileInputStream(file)); + //System.out.println(cert.toString()); + + KeyStore.TrustedCertificateEntry entry = new KeyStore.TrustedCertificateEntry(cert); + + ks.setEntry(cert.getSubjectX500Principal().getName(), entry, null); + } catch (KeyStoreException e) { + } catch (CertificateParsingException e) { + continue; + } + + } + + return ks; + } +} + http://git-wip-us.apache.org/repos/asf/airavata/blob/abb05c8f/modules/airavata-job-monitor/src/main/resources/monitor.properties ---------------------------------------------------------------------- diff --git a/modules/airavata-job-monitor/src/main/resources/monitor.properties b/modules/airavata-job-monitor/src/main/resources/monitor.properties new file mode 100644 index 0000000..0b0b5f4 --- /dev/null +++ b/modules/airavata-job-monitor/src/main/resources/monitor.properties @@ -0,0 +1,2 @@ +amqp.hosts=info1.dyn.teragrid.org,info2.dyn.teragrid.org +connection.name=xsede_private \ No newline at end of file
