avro rpc implemetation
Project: http://git-wip-us.apache.org/repos/asf/oodt/repo Commit: http://git-wip-us.apache.org/repos/asf/oodt/commit/287d4e89 Tree: http://git-wip-us.apache.org/repos/asf/oodt/tree/287d4e89 Diff: http://git-wip-us.apache.org/repos/asf/oodt/diff/287d4e89 Branch: refs/heads/master Commit: 287d4e8979e9c7d54648688d06374e8d8a2dddc4 Parents: f91720d Author: Radu Manole <manole.v.r...@gmail.com> Authored: Mon Aug 17 18:11:09 2015 +0300 Committer: Radu Manole <manole.v.r...@gmail.com> Committed: Mon Aug 17 18:11:09 2015 +0300 ---------------------------------------------------------------------- resource/pom.xml | 52 +++ resource/src/main/avro/types/AvroJob.avsc | 17 + resource/src/main/avro/types/AvroJobInput.avsc | 11 + .../main/avro/types/AvroNameValueJobInput.avsc | 10 + .../src/main/avro/types/AvroResourceNode.avsc | 11 + .../avro/types/resource_manager_protocol.avdl | 53 +++ .../src/main/avro/types/tatchmgr_protocol.avdl | 27 ++ .../cas/resource/batchmgr/AvroRpcBatchMgr.java | 180 ++++++++ .../batchmgr/AvroRpcBatchMgrFactory.java | 32 ++ .../resource/batchmgr/AvroRpcBatchMgrProxy.java | 135 ++++++ .../cas/resource/structs/AvroTypeFactory.java | 168 ++++++++ .../cas/resource/structs/NameValueJobInput.java | 4 + .../resource/system/AvroRpcResourceManager.java | 425 +++++++++++++++++++ .../system/AvroRpcResourceManagerClient.java | 305 +++++++++++++ .../cas/resource/system/ResourceManager.java | 31 ++ .../resource/system/ResourceManagerClient.java | 80 ++++ .../system/XmlRpcResourceManagerClient.java | 26 +- .../system/extern/AvroRpcBatchStub.java | 212 +++++++++ .../cas/resource/batchmgr/TestBatchMgr.java | 54 +++ .../resource/structs/TestAvroTypeFactory.java | 112 +++++ .../system/TestAvroRpcResourceManager.java | 159 +++++++ .../system/TestXmlRpcResourceManager.java | 4 + 22 files changed, 2107 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oodt/blob/287d4e89/resource/pom.xml ---------------------------------------------------------------------- diff --git a/resource/pom.xml b/resource/pom.xml index de807d6..b91fa84 100644 --- a/resource/pom.xml +++ b/resource/pom.xml @@ -35,6 +35,48 @@ the License. </scm> <build> <plugins> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <version>2.3.2</version> + </plugin> + <plugin> + <groupId>org.apache.avro</groupId> + <artifactId>avro-maven-plugin</artifactId> + <version>1.7.7</version> + <configuration> + <stringType>String</stringType> + <detail>true</detail> + </configuration> + <executions> + <execution> + <id>schemas</id> + <configuration> + <imports> + <import>${basedir}/src/main/avro/types/AvroJob.avsc</import> + <import>${basedir}/src/main/avro/types/AvroNameValueJobInput.avsc</import> + <import>${basedir}/src/main/avro/types/AvroJobInput.avsc</import> + </imports> + </configuration> + <goals> + <goal>schema</goal> + </goals> + </execution> + <execution> + <id>protocol</id> + <configuration><imports> + <import>${basedir}/src/main/avro/types</import> + + </imports> + </configuration> + <goals> + <goal>idl-protocol</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> <artifactId>maven-surefire-plugin</artifactId> <version>2.4</version> @@ -81,6 +123,16 @@ the License. </build> <dependencies> <dependency> + <groupId>org.apache.avro</groupId> + <artifactId>avro</artifactId> + <version>1.7.7</version> + </dependency> + <dependency> + <groupId>org.apache.avro</groupId> + <artifactId>avro-ipc</artifactId> + <version>1.7.7</version> + </dependency> + <dependency> <groupId>org.apache.oodt</groupId> <artifactId>cas-metadata</artifactId> <version>${project.parent.version}</version> http://git-wip-us.apache.org/repos/asf/oodt/blob/287d4e89/resource/src/main/avro/types/AvroJob.avsc ---------------------------------------------------------------------- diff --git a/resource/src/main/avro/types/AvroJob.avsc b/resource/src/main/avro/types/AvroJob.avsc new file mode 100644 index 0000000..7efa842 --- /dev/null +++ b/resource/src/main/avro/types/AvroJob.avsc @@ -0,0 +1,17 @@ +{ + "type":"record", + "name":"AvroJob", + "default":null, + "namespace":"org.apache.oodt.cas.resource.structs.avrotypes", + "imports":[], + "fields":[ + {"name":"id","type":"string"}, + {"name":"name","type":"string"}, + {"name":"jobInstanceClassName","type":"string"}, + {"name":"jobInputClassName","type":"string"}, + {"name":"queueName","type":"string"}, + {"name":"loadValue","type":"int"}, + {"name":"status","type":"string"} + + ] +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/oodt/blob/287d4e89/resource/src/main/avro/types/AvroJobInput.avsc ---------------------------------------------------------------------- diff --git a/resource/src/main/avro/types/AvroJobInput.avsc b/resource/src/main/avro/types/AvroJobInput.avsc new file mode 100644 index 0000000..d915769 --- /dev/null +++ b/resource/src/main/avro/types/AvroJobInput.avsc @@ -0,0 +1,11 @@ +{ + "type":"record", + "name":"AvroJobInput", + "default":null, + "namespace":"org.apache.oodt.cas.resource.structs.avrotypes", + "imports":["AvroNameValueJobInput.avsc"], + "fields":[ + {"name":"className","type":"string"}, + {"name":"imple","type":["AvroNameValueJobInput","null"]} + ] +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/oodt/blob/287d4e89/resource/src/main/avro/types/AvroNameValueJobInput.avsc ---------------------------------------------------------------------- diff --git a/resource/src/main/avro/types/AvroNameValueJobInput.avsc b/resource/src/main/avro/types/AvroNameValueJobInput.avsc new file mode 100644 index 0000000..8a1607a --- /dev/null +++ b/resource/src/main/avro/types/AvroNameValueJobInput.avsc @@ -0,0 +1,10 @@ +{ + "type":"record", + "name":"AvroNameValueJobInput", + "default":null, + "namespace":"org.apache.oodt.cas.resource.structs.avrotypes", + "imports":[], + "fields":[ + {"name":"props","type":[{"type":"map","values":"string"},"null"]} + ] +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/oodt/blob/287d4e89/resource/src/main/avro/types/AvroResourceNode.avsc ---------------------------------------------------------------------- diff --git a/resource/src/main/avro/types/AvroResourceNode.avsc b/resource/src/main/avro/types/AvroResourceNode.avsc new file mode 100644 index 0000000..11509f3 --- /dev/null +++ b/resource/src/main/avro/types/AvroResourceNode.avsc @@ -0,0 +1,11 @@ +{ + "type":"record", + "name":"AvroResourceNode", + "default":null, + "namespace":"org.apache.oodt.cas.resource.structs.avrotypes", + "fields":[ + {"name":"nodeId","type":"string"}, + {"name":"ipAddr","type":"string"}, + {"name":"capacity","type":"int","default":0} + ] +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/oodt/blob/287d4e89/resource/src/main/avro/types/resource_manager_protocol.avdl ---------------------------------------------------------------------- diff --git a/resource/src/main/avro/types/resource_manager_protocol.avdl b/resource/src/main/avro/types/resource_manager_protocol.avdl new file mode 100644 index 0000000..8b3f43f --- /dev/null +++ b/resource/src/main/avro/types/resource_manager_protocol.avdl @@ -0,0 +1,53 @@ +@namespace("org.apache.oodt.cas.resource.structs.avrotypes") + +protocol ResourceManager { +import schema "AvroJob.avsc"; +import schema "AvroNameValueJobInput.avsc"; +import schema "AvroJobInput.avsc"; +import schema "AvroResourceNode.avsc"; + + boolean isJobComplete(string jobId); + + AvroJob getJobInfo(string jobId); + + boolean isAlive(); + + int getJobQueueSize(); + + int getJobQueueCapacity(); + + boolean killJob(string jobId); + + string getExecutionNode(string jobId); + + string handleJob(AvroJob exec, AvroJobInput into); + + boolean handleJobWithUrl(AvroJob exec, AvroJobInput in, string hostUrl); + + array<AvroResourceNode> getNodes(); + + AvroResourceNode getNodeById(string nodeId); + + boolean addQueue(string queueName); + + boolean removeQueue(string queueName); + + boolean addNode(AvroResourceNode node); + + boolean removeNode(string nodeId); + + boolean setNodeCapacity(string nodeId, int capacity); + + boolean addNodeToQueue(string nodeId, string queueName); + + boolean removeNodeFromQueue(string nodeId, string queueName); + + array<string> getQueues(); + + array<string> getNodesInQueue(string queueName); + + array<string> getQueuesWithNode(string nodeId); + + string getNodeLoad(string nodeId); + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/oodt/blob/287d4e89/resource/src/main/avro/types/tatchmgr_protocol.avdl ---------------------------------------------------------------------- diff --git a/resource/src/main/avro/types/tatchmgr_protocol.avdl b/resource/src/main/avro/types/tatchmgr_protocol.avdl new file mode 100644 index 0000000..3e424ff --- /dev/null +++ b/resource/src/main/avro/types/tatchmgr_protocol.avdl @@ -0,0 +1,27 @@ +@namespace("org.apache.oodt.cas.resource.structs.avrotypes") + +protocol AvroIntrBatchmgr { +import schema "AvroJob.avsc"; +import schema "AvroNameValueJobInput.avsc"; +import schema "AvroJobInput.avsc"; +import schema "AvroResourceNode.avsc"; + + boolean isAlive(); + + boolean executeJob(AvroJob avroJob, AvroJobInput jobInput); + +// public boolean executeJob(Hashtable jobHash, Date jobInput); +// +// public boolean executeJob(Hashtable jobHash, double jobInput); +// +// public boolean executeJob(Hashtable jobHash, int jobInput); +// +// public boolean executeJob(Hashtable jobHash, boolean jobInput); +// +// public boolean executeJob(Hashtable jobHash, Vector jobInput); +// +// public boolean executeJob(Hashtable jobHash, byte[] jobInput); + + boolean killJob(AvroJob jobHash); + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/oodt/blob/287d4e89/resource/src/main/java/org/apache/oodt/cas/resource/batchmgr/AvroRpcBatchMgr.java ---------------------------------------------------------------------- diff --git a/resource/src/main/java/org/apache/oodt/cas/resource/batchmgr/AvroRpcBatchMgr.java b/resource/src/main/java/org/apache/oodt/cas/resource/batchmgr/AvroRpcBatchMgr.java new file mode 100644 index 0000000..483754f --- /dev/null +++ b/resource/src/main/java/org/apache/oodt/cas/resource/batchmgr/AvroRpcBatchMgr.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oodt.cas.resource.batchmgr; + +import org.apache.oodt.cas.resource.jobrepo.JobRepository; +import org.apache.oodt.cas.resource.monitor.Monitor; +import org.apache.oodt.cas.resource.structs.Job; +import org.apache.oodt.cas.resource.structs.JobSpec; +import org.apache.oodt.cas.resource.structs.JobStatus; +import org.apache.oodt.cas.resource.structs.ResourceNode; +import org.apache.oodt.cas.resource.structs.exceptions.JobExecutionException; +import org.apache.oodt.cas.resource.structs.exceptions.JobRepositoryException; +import org.apache.oodt.cas.resource.structs.exceptions.MonitorException; + +import java.util.HashMap; +import java.util.Map; +import java.util.logging.Level; +import java.util.logging.Logger; + +public class AvroRpcBatchMgr implements Batchmgr { + + /* our log stream */ + private static final Logger LOG = Logger.getLogger(XmlRpcBatchMgr.class + .getName()); + + private Monitor mon; + + private JobRepository repo; + + private Map nodeToJobMap; + + private Map specToProxyMap; + + public AvroRpcBatchMgr(){ + nodeToJobMap = new HashMap(); + specToProxyMap = new HashMap(); + } + + @Override + public boolean executeRemotely(JobSpec jobSpec, ResourceNode resNode) throws JobExecutionException { + AvroRpcBatchMgrProxy proxy = new AvroRpcBatchMgrProxy(jobSpec,resNode,this); + if (!proxy.nodeAlive()) { + throw new JobExecutionException("Node: [" + resNode.getNodeId() + + "] is down: Unable to execute job!"); + } + + synchronized (this.specToProxyMap) { + specToProxyMap.put(jobSpec.getJob().getId(), proxy); + } + + synchronized (this.nodeToJobMap) { + this.nodeToJobMap + .put(jobSpec.getJob().getId(), resNode.getNodeId()); + } + + proxy.start(); + + return true; + + } + + @Override + public void setMonitor(Monitor monitor) { + this.mon = monitor; + } + + @Override + public void setJobRepository(JobRepository repository) { + this.repo = repository; + } + + @Override + public String getExecutionNode(String jobId) { + return (String) nodeToJobMap.get(jobId); + } + + @Override + public boolean killJob(String jobId, ResourceNode node) { + JobSpec spec = null; + try { + spec = repo.getJobById(jobId); + } catch (Exception e) { + LOG.log(Level.WARNING, "Unable to get job by id: [" + jobId + + "] to kill it: Message: " + e.getMessage()); + return false; + } + + AvroRpcBatchMgrProxy proxy = new AvroRpcBatchMgrProxy(spec, node, this); + return proxy.killJob(); + } + + protected void notifyMonitor(ResourceNode node, JobSpec jobSpec) { + Job job = jobSpec.getJob(); + int reducedLoad = job.getLoadValue().intValue(); + try { + mon.reduceLoad(node, reducedLoad); + } catch (MonitorException e) { + } + } + + protected void jobSuccess(JobSpec spec) { + spec.getJob().setStatus(JobStatus.SUCCESS); + synchronized (this.nodeToJobMap) { + this.nodeToJobMap.remove(spec.getJob().getId()); + } + synchronized (this.specToProxyMap) { + XmlRpcBatchMgrProxy proxy = (XmlRpcBatchMgrProxy) this.specToProxyMap + .remove(spec.getJob().getId()); + if (proxy != null) { + proxy = null; + } + } + + try { + repo.updateJob(spec); + } catch (JobRepositoryException e) { + LOG.log(Level.WARNING, "Error set job completion status for job: [" + + spec.getJob().getId() + "]: Message: " + e.getMessage()); + } + } + + protected void jobFailure(JobSpec spec) { + spec.getJob().setStatus(JobStatus.FAILURE); + synchronized (this.nodeToJobMap) { + this.nodeToJobMap.remove(spec.getJob().getId()); + } + synchronized (this.specToProxyMap) { + XmlRpcBatchMgrProxy proxy = (XmlRpcBatchMgrProxy) this.specToProxyMap + .remove(spec.getJob().getId()); + if (proxy != null) { + proxy = null; + } + } + + try { + repo.updateJob(spec); + } catch (JobRepositoryException e) { + LOG.log(Level.WARNING, "Error set job completion status for job: [" + + spec.getJob().getId() + "]: Message: " + e.getMessage()); + } + } + + protected void jobKilled(JobSpec spec) { + spec.getJob().setStatus(JobStatus.KILLED); + nodeToJobMap.remove(spec.getJob().getId()); + try { + repo.updateJob(spec); + } catch (JobRepositoryException e) { + LOG.log(Level.WARNING, "Error setting job killed status for job: [" + + spec.getJob().getId() + "]: Message: " + e.getMessage()); + } + } + + protected void jobExecuting(JobSpec spec) { + spec.getJob().setStatus(JobStatus.EXECUTED); + try { + repo.updateJob(spec); + } catch (JobRepositoryException e) { + LOG.log(Level.WARNING, + "Error setting job execution status for job: [" + + spec.getJob().getId() + "]: Message: " + + e.getMessage()); + } + } +} http://git-wip-us.apache.org/repos/asf/oodt/blob/287d4e89/resource/src/main/java/org/apache/oodt/cas/resource/batchmgr/AvroRpcBatchMgrFactory.java ---------------------------------------------------------------------- diff --git a/resource/src/main/java/org/apache/oodt/cas/resource/batchmgr/AvroRpcBatchMgrFactory.java b/resource/src/main/java/org/apache/oodt/cas/resource/batchmgr/AvroRpcBatchMgrFactory.java new file mode 100644 index 0000000..fe00741 --- /dev/null +++ b/resource/src/main/java/org/apache/oodt/cas/resource/batchmgr/AvroRpcBatchMgrFactory.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oodt.cas.resource.batchmgr; + +import org.apache.oodt.cas.resource.monitor.Monitor; + +public class AvroRpcBatchMgrFactory implements BatchmgrFactory { + + private Monitor mon = null; + + public AvroRpcBatchMgrFactory(){} + + public Batchmgr createBatchmgr() { + return new AvroRpcBatchMgr(); + } + +} http://git-wip-us.apache.org/repos/asf/oodt/blob/287d4e89/resource/src/main/java/org/apache/oodt/cas/resource/batchmgr/AvroRpcBatchMgrProxy.java ---------------------------------------------------------------------- diff --git a/resource/src/main/java/org/apache/oodt/cas/resource/batchmgr/AvroRpcBatchMgrProxy.java b/resource/src/main/java/org/apache/oodt/cas/resource/batchmgr/AvroRpcBatchMgrProxy.java new file mode 100644 index 0000000..98a717a --- /dev/null +++ b/resource/src/main/java/org/apache/oodt/cas/resource/batchmgr/AvroRpcBatchMgrProxy.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oodt.cas.resource.batchmgr; + +import org.apache.avro.AvroRemoteException; +import org.apache.avro.ipc.NettyTransceiver; +import org.apache.avro.ipc.Transceiver; +import org.apache.avro.ipc.specific.SpecificRequestor; +import org.apache.oodt.cas.resource.structs.AvroTypeFactory; +import org.apache.oodt.cas.resource.structs.JobSpec; +import org.apache.oodt.cas.resource.structs.ResourceNode; +import org.apache.oodt.cas.resource.system.extern.AvroRpcBatchStub; +import org.apache.oodt.cas.resource.util.XmlRpcStructFactory; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Vector; +import java.util.logging.Level; +import java.util.logging.Logger; + +public class AvroRpcBatchMgrProxy extends Thread implements Runnable { + + private static final Logger LOG = Logger.getLogger(XmlRpcBatchMgrProxy.class.getName()); + + private JobSpec jobSpec; + + private ResourceNode remoteHost; + + private Transceiver client; + + private AvroRpcBatchStub proxy; + + private AvroRpcBatchMgr parent; + + public AvroRpcBatchMgrProxy(JobSpec jobSpec, ResourceNode remoteHost, + AvroRpcBatchMgr par) { + this.jobSpec = jobSpec; + this.remoteHost = remoteHost; + this.parent = par; + } + + public boolean nodeAlive() { + + try { + this.client = new NettyTransceiver(new InetSocketAddress(remoteHost.getIpAddr().getPort())); + this.proxy = (AvroRpcBatchStub) SpecificRequestor.getClient(AvroRpcBatchStub.class, client); + } catch (IOException e) { + LOG.log(Level.SEVERE, "Failed connection with the server.", e); + } + + + + boolean alive = false; + + try { + alive = proxy.isAlive(); + } catch (AvroRemoteException e) { + alive = false; + } + return alive; + + } + + public boolean killJob() { + + try { + this.client = new NettyTransceiver(new InetSocketAddress(remoteHost.getIpAddr().getPort())); + this.proxy = (AvroRpcBatchStub) SpecificRequestor.getClient(AvroRpcBatchStub.class, client); + } catch (IOException e) { + LOG.log(Level.SEVERE, "Failed connection with the server.", e); + } + + + boolean result = false; + try { + result = proxy.killJob(AvroTypeFactory.getAvroJob(jobSpec.getJob())); + } catch (AvroRemoteException e) { + e.printStackTrace(); + result = false; + } + + if (result) { + parent.jobKilled(jobSpec); + } + + return result; + } + + public void run() { + try { + this.client = new NettyTransceiver(new InetSocketAddress(remoteHost.getIpAddr().getPort())); + this.proxy = (AvroRpcBatchStub) SpecificRequestor.getClient(AvroRpcBatchStub.class, client); + } catch (IOException e) { + LOG.log(Level.SEVERE, "Failed connection with the server.", e); + } + + boolean result = false; + try { + parent.jobExecuting(jobSpec); + result = proxy.executeJob(AvroTypeFactory.getAvroJob(jobSpec.getJob()), + AvroTypeFactory.getAvroJobInput(jobSpec.getIn())); + if (result) + parent.jobSuccess(jobSpec); + else + throw new Exception("batchstub.executeJob returned false"); + } catch (Exception e) { + LOG.log(Level.SEVERE, "Job execution failed for jobId '" + jobSpec.getJob().getId() + "' : " + e.getMessage(), e); + parent.jobFailure(jobSpec); + }finally { + parent.notifyMonitor(remoteHost, jobSpec); + } + + } + + + + + + +} http://git-wip-us.apache.org/repos/asf/oodt/blob/287d4e89/resource/src/main/java/org/apache/oodt/cas/resource/structs/AvroTypeFactory.java ---------------------------------------------------------------------- diff --git a/resource/src/main/java/org/apache/oodt/cas/resource/structs/AvroTypeFactory.java b/resource/src/main/java/org/apache/oodt/cas/resource/structs/AvroTypeFactory.java new file mode 100644 index 0000000..70a2d88 --- /dev/null +++ b/resource/src/main/java/org/apache/oodt/cas/resource/structs/AvroTypeFactory.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oodt.cas.resource.structs; + +import org.apache.avro.ipc.Responder; +import org.apache.avro.reflect.AvroName; +import org.apache.oodt.cas.metadata.Metadata; +import org.apache.oodt.cas.resource.structs.avrotypes.*; +import org.apache.oodt.cas.resource.util.GenericResourceManagerObjectFactory; + +import java.net.MalformedURLException; +import java.net.URL; +import java.util.*; + +public class AvroTypeFactory { + + public static Job getJob(AvroJob avroJob) { + Job job = new Job(); + job.setId(avroJob.getId()); + job.setName(avroJob.getName()); + job.setJobInstanceClassName(avroJob.getJobInstanceClassName()); + job.setJobInputClassName(avroJob.getJobInputClassName()); + job.setQueueName(avroJob.getQueueName()); + job.setLoadValue(avroJob.getLoadValue()); + job.setStatus(avroJob.getStatus()); + + return job; + } + + public static AvroJob getAvroJob(Job job) { + AvroJob avroJob = new AvroJob(); + avroJob.setId(job.getId()); + avroJob.setName(job.getName()); + avroJob.setJobInstanceClassName(job.getJobInstanceClassName()); + avroJob.setJobInputClassName(job.getJobInputClassName()); + avroJob.setQueueName(job.getQueueName()); + avroJob.setLoadValue(avroJob.getLoadValue()); + avroJob.setStatus(avroJob.getStatus()); + + return avroJob; + } + + // + + public static JobInput getJobInput(AvroJobInput avroJobInput){ + JobInput jobInput = GenericResourceManagerObjectFactory + .getJobInputFromClassName(avroJobInput.getClassName()); + + return setJobInputInplementation(jobInput,avroJobInput); + } + + public static AvroJobInput getAvroJobInput(JobInput jobInput){ + AvroJobInput avroJobInput = new AvroJobInput(); + avroJobInput.setClassName(jobInput.getClass().getCanonicalName()); + + return setAvroJobInputInplementation(avroJobInput,jobInput); + } + + private static JobInput setJobInputInplementation(JobInput jobInput,AvroJobInput avroJobInput){ + + if(jobInput instanceof NameValueJobInput){ + NameValueJobInput nameValueJobInput = (NameValueJobInput)jobInput; + AvroNameValueJobInput avroNameValueJobInput = (AvroNameValueJobInput) avroJobInput.getImple(); + setPropertiesToNameValueJobInput(getHashtable(avroNameValueJobInput.getProps()), nameValueJobInput); + return nameValueJobInput; + } + + return jobInput; + } + + private static NameValueJobInput setPropertiesToNameValueJobInput(Hashtable hashProp, NameValueJobInput nameValueJobInput){ + for (Object key : hashProp.keySet()){ + nameValueJobInput.setNameValuePair((String)key,(String)hashProp.get(key)); + } + return nameValueJobInput; + } + + + + private static AvroJobInput setAvroJobInputInplementation(AvroJobInput avroJobInput,JobInput jobInput){ + + if (jobInput instanceof NameValueJobInput){ + NameValueJobInput nameValueJobInput = (NameValueJobInput) jobInput; + + AvroNameValueJobInput avroNameValueJobInput = new AvroNameValueJobInput(); + avroNameValueJobInput.setProps(getMap(nameValueJobInput.getProps())); + avroJobInput.setImple(avroNameValueJobInput); + return avroJobInput; + } + return avroJobInput; + } + + private static Hashtable getHashtable(Map<String,String> map){ + Hashtable hashtable = new Hashtable(); + + for (String s : map.keySet()){ + hashtable.put(s,map.get(s)); + } + return hashtable; + } + + private static Map<String,String> getMap(Hashtable hashtable){ + Map<String,String> map = new HashMap<String, String>(); + for (Object o : hashtable.keySet()){ + map.put((String)o,(String)hashtable.get(o)); + } + return map; + } + + // + + public static ResourceNode getResourceNode(AvroResourceNode avroResourceNode){ + ResourceNode resourceNode = new ResourceNode(); + resourceNode.setId(avroResourceNode.getNodeId()); + try { + resourceNode.setIpAddr(new URL(avroResourceNode.getIpAddr())); + } catch (MalformedURLException e) { + e.printStackTrace(); + } + resourceNode.setCapacity(avroResourceNode.getCapacity()); + return resourceNode; + } + + public static AvroResourceNode getAvroResourceNode(ResourceNode resourceNode){ + AvroResourceNode avroResourceNode = new AvroResourceNode(); + avroResourceNode.setNodeId(resourceNode.getNodeId()); + avroResourceNode.setIpAddr(resourceNode.getIpAddr().toString()); + avroResourceNode.setCapacity(resourceNode.getCapacity()); + return avroResourceNode; + } + + + public static List<AvroResourceNode> getListAvroResourceNode(List<ResourceNode> resourceNodes){ + List<AvroResourceNode> avroResourceNodes = new ArrayList<AvroResourceNode>(); + + for (ResourceNode rn : resourceNodes){ + avroResourceNodes.add(getAvroResourceNode(rn)); + } + return avroResourceNodes; + } + + public static List<ResourceNode> getListResourceNode(List<AvroResourceNode> avroResourceNodes){ + List<ResourceNode> resourceNodes = new ArrayList<ResourceNode>(); + + for (AvroResourceNode arn : avroResourceNodes){ + resourceNodes.add(getResourceNode(arn)); + } + + return resourceNodes; + } + + +} http://git-wip-us.apache.org/repos/asf/oodt/blob/287d4e89/resource/src/main/java/org/apache/oodt/cas/resource/structs/NameValueJobInput.java ---------------------------------------------------------------------- diff --git a/resource/src/main/java/org/apache/oodt/cas/resource/structs/NameValueJobInput.java b/resource/src/main/java/org/apache/oodt/cas/resource/structs/NameValueJobInput.java index a7fcb7a..c3cc6fc 100644 --- a/resource/src/main/java/org/apache/oodt/cas/resource/structs/NameValueJobInput.java +++ b/resource/src/main/java/org/apache/oodt/cas/resource/structs/NameValueJobInput.java @@ -114,4 +114,8 @@ public class NameValueJobInput implements JobInput { } } + public Properties getProps(){ + return this.props; + } + } http://git-wip-us.apache.org/repos/asf/oodt/blob/287d4e89/resource/src/main/java/org/apache/oodt/cas/resource/system/AvroRpcResourceManager.java ---------------------------------------------------------------------- diff --git a/resource/src/main/java/org/apache/oodt/cas/resource/system/AvroRpcResourceManager.java b/resource/src/main/java/org/apache/oodt/cas/resource/system/AvroRpcResourceManager.java new file mode 100644 index 0000000..47ea2df --- /dev/null +++ b/resource/src/main/java/org/apache/oodt/cas/resource/system/AvroRpcResourceManager.java @@ -0,0 +1,425 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oodt.cas.resource.system; + +import org.apache.avro.AvroRemoteException; +import org.apache.avro.ipc.NettyServer; +import org.apache.avro.ipc.Server; +import org.apache.avro.ipc.specific.SpecificResponder; +import org.apache.oodt.cas.resource.scheduler.Scheduler; +import org.apache.oodt.cas.resource.structs.*; +import org.apache.oodt.cas.resource.structs.avrotypes.*; +import org.apache.oodt.cas.resource.structs.exceptions.*; +import org.apache.oodt.cas.resource.util.GenericResourceManagerObjectFactory; +import org.apache.oodt.cas.resource.util.XmlRpcStructFactory; +import org.apache.xmlrpc.WebServer; + +import java.io.File; +import java.io.FileInputStream; +import java.net.InetSocketAddress; +import java.net.MalformedURLException; +import java.net.URL; +import java.util.Hashtable; +import java.util.List; +import java.util.Vector; +import java.util.logging.Level; +import java.util.logging.Logger; + +public class AvroRpcResourceManager implements org.apache.oodt.cas.resource.structs.avrotypes.ResourceManager, ResourceManager{ + + private int port = 2000; + + private Logger LOG = Logger + .getLogger(XmlRpcResourceManager.class.getName()); + + private Server server; + + /* our scheduler */ + private Scheduler scheduler = null; + + public AvroRpcResourceManager(int port) throws Exception{ + // load properties from workflow manager properties file, if specified + if (System.getProperty("org.apache.oodt.cas.resource.properties") != null) { + String configFile = System + .getProperty("org.apache.oodt.cas.resource.properties"); + LOG.log(Level.INFO, + "Loading Resource Manager Configuration Properties from: [" + + configFile + "]"); + System.getProperties().load( + new FileInputStream(new File(configFile))); + } + + String schedulerClassStr = System.getProperty( + "resource.scheduler.factory", + "org.apache.oodt.cas.resource.scheduler.LRUSchedulerFactory"); + + scheduler = GenericResourceManagerObjectFactory + .getSchedulerServiceFromFactory(schedulerClassStr); + + // start up the scheduler + new Thread(scheduler).start(); + + this.port = port; + + // start up the web server + server = new NettyServer(new SpecificResponder(org.apache.oodt.cas.resource.structs.avrotypes.ResourceManager.class,this), + new InetSocketAddress(this.port)); + server.start(); + + LOG.log(Level.INFO, "Resource Manager started by " + + System.getProperty("user.name", "unknown")); + + } + + @Override + public boolean isAlive() throws AvroRemoteException { + return true; + } + + @Override + public int getJobQueueSize() throws AvroRemoteException { + try { + return this.scheduler.getJobQueue().getSize(); + }catch (Exception e) { + throw new AvroRemoteException(new JobRepositoryException("Failed to get size of JobQueue : " + e.getMessage(), e)); + } + } + + + @Override + public int getJobQueueCapacity() throws AvroRemoteException { + try { + return this.scheduler.getJobQueue().getCapacity(); + }catch (Exception e) { + throw new AvroRemoteException(new JobRepositoryException("Failed to get capacity of JobQueue : " + e.getMessage(), e)); + } + } + + @Override + public boolean isJobComplete(String jobId) throws AvroRemoteException { + try { + JobSpec spec = scheduler.getJobQueue().getJobRepository().getJobById( + jobId); + return scheduler.getJobQueue().getJobRepository().jobFinished(spec); + + } catch(JobRepositoryException e ){ + throw new AvroRemoteException(e); + } + } + @Override + public AvroJob getJobInfo(String jobId) throws AvroRemoteException { + JobSpec spec = null; + + try { + spec = scheduler.getJobQueue().getJobRepository() + .getJobById(jobId); + } catch (JobRepositoryException e) { + LOG.log(Level.WARNING, + "Exception communicating with job repository for job: [" + + jobId + "]: Message: " + e.getMessage()); + throw new AvroRemoteException(new JobRepositoryException("Unable to get job: [" + jobId + + "] from repository!")); + } + + return AvroTypeFactory.getAvroJob(spec.getJob()); + + } + + @Override + public String handleJob(AvroJob exec, AvroJobInput into) throws AvroRemoteException { + try { + return genericHandleJob(exec, into); + } catch (SchedulerException e) { + throw new AvroRemoteException(e); + } + } + + @Override + public boolean handleJobWithUrl(AvroJob exec, AvroJobInput in, String hostUrl) throws AvroRemoteException { + try { + return genericHandleJob(exec,in,hostUrl); + } catch (JobExecutionException e) { + throw new AvroRemoteException(e); + } + } + + @Override + public List<AvroResourceNode> getNodes() throws AvroRemoteException { + + List resNodes = null; + try { + resNodes = scheduler.getMonitor().getNodes(); + } catch (MonitorException e) { + throw new AvroRemoteException(e); + } + + return AvroTypeFactory.getListAvroResourceNode(resNodes); + } + + @Override + public AvroResourceNode getNodeById(String nodeId) throws AvroRemoteException { + ResourceNode node = null; + try { + node = scheduler.getMonitor().getNodeById(nodeId); + } catch (MonitorException e) { + throw new AvroRemoteException(e); + } + return AvroTypeFactory.getAvroResourceNode(node); + } + + @Override + public boolean killJob(String jobId) throws AvroRemoteException { + String resNodeId = scheduler.getBatchmgr().getExecutionNode(jobId); + if (resNodeId == null) { + LOG.log(Level.WARNING, "Attempt to kill job: [" + jobId + + "]: cannot find execution node" + + " (has the job already finished?)"); + return false; + } + ResourceNode node = null; + try { + node = scheduler.getMonitor().getNodeById(resNodeId); + } catch (MonitorException e) { + throw new AvroRemoteException(e); + } + return scheduler.getBatchmgr().killJob(jobId, node); + + } + + @Override + public String getExecutionNode(String jobId) throws AvroRemoteException { + String execNode = scheduler.getBatchmgr().getExecutionNode(jobId); + if (execNode == null) { + LOG.log(Level.WARNING, "Job: [" + jobId + + "] not currently executing on any known node"); + return ""; + } else + return execNode; + } + + @Override + public List<String> getQueues() throws AvroRemoteException { + try { + return this.scheduler.getQueueManager().getQueues(); + } catch (QueueManagerException e) { + throw new AvroRemoteException(e); + } + } + + @Override + public boolean addQueue(String queueName) throws AvroRemoteException { + try { + this.scheduler.getQueueManager().addQueue(queueName); + } catch (QueueManagerException e) { + e.printStackTrace(); + } + return true; + + } + + @Override + public boolean removeQueue(String queueName) throws AvroRemoteException { + try { + this.scheduler.getQueueManager().removeQueue(queueName); + } catch (QueueManagerException e) { + throw new AvroRemoteException(e); + } + return true; + + } + + @Override + public boolean addNode(AvroResourceNode node) throws AvroRemoteException { + try { + this.scheduler.getMonitor().addNode(AvroTypeFactory.getResourceNode(node)); + } catch (MonitorException e) { + throw new AvroRemoteException(e); + } + return true; + } + + @Override + public boolean removeNode(String nodeId) throws AvroRemoteException { + try{ + for(String queueName: this.getQueuesWithNode(nodeId)){ + this.removeNodeFromQueue(nodeId, queueName); + } + this.scheduler.getMonitor().removeNodeById(nodeId); + }catch(Exception e){ + throw new AvroRemoteException(new MonitorException(e.getMessage(), e)); + } + + return true; + } + + @Override + public boolean addNodeToQueue(String nodeId, String queueName) throws AvroRemoteException { + try { + this.scheduler.getQueueManager().addNodeToQueue(nodeId, queueName); + } catch (QueueManagerException e) { + throw new AvroRemoteException(e); + } + return true; + + } + + @Override + public boolean removeNodeFromQueue(String nodeId, String queueName) throws AvroRemoteException { + try { + this.scheduler.getQueueManager().removeNodeFromQueue(nodeId, queueName); + } catch (QueueManagerException e) { + throw new AvroRemoteException(e); + } + return true; + + } + + @Override + public List<String> getNodesInQueue(String queueName) throws AvroRemoteException { + try { + return this.scheduler.getQueueManager().getNodes(queueName); + } catch (QueueManagerException e) { + throw new AvroRemoteException(e); + } + } + + @Override + public List<String> getQueuesWithNode(String nodeId) throws AvroRemoteException { + try { + return this.scheduler.getQueueManager().getQueues(nodeId); + } catch (QueueManagerException e) { + throw new AvroRemoteException(e); + } + } + + public boolean shutdown(){ + if (this.server != null) { + this.server.close(); + this.server = null; + return true; + } else + return false; + } + + @Override + public String getNodeLoad(String nodeId) throws AvroRemoteException { + ResourceNode node = null; + try { + node = this.scheduler.getMonitor().getNodeById(nodeId); + int capacity = node.getCapacity(); + int load = (this.scheduler.getMonitor().getLoad(node)) * -1 + capacity; + return load + "/" + capacity; + } catch (MonitorException e) { + throw new AvroRemoteException(e); + } + } + + public static void main(String[] args) throws Exception { + int portNum = -1; + String usage = "AvroRpcResourceManager --portNum <port number for xml rpc service>\n"; + + for (int i = 0; i < args.length; i++) { + if (args[i].equals("--portNum")) { + portNum = Integer.parseInt(args[++i]); + } + } + + if (portNum == -1) { + System.err.println(usage); + System.exit(1); + } + + AvroRpcResourceManager manager = new AvroRpcResourceManager(portNum); + + for (;;) + try { + Thread.currentThread().join(); + } catch (InterruptedException ignore) { + } + } + + + @Override + public boolean setNodeCapacity(String nodeId, int capacity) throws AvroRemoteException { + try{ + this.scheduler.getMonitor().getNodeById(nodeId).setCapacity(capacity); + }catch (MonitorException e){ + LOG.log(Level.WARNING, "Exception setting capacity on node " + + nodeId + ": " + e.getMessage()); + return false; + } + return true; + + } + + + private String genericHandleJob(AvroJob avroJob, AvroJobInput avroJobInput) + throws SchedulerException { + + Job exec = AvroTypeFactory.getJob(avroJob); + JobInput in = AvroTypeFactory.getJobInput(avroJobInput); + JobSpec spec = new JobSpec(in, exec); + + // queue the job up + String jobId = null; + + try { + jobId = scheduler.getJobQueue().addJob(spec); + } catch (JobQueueException e) { + LOG.log(Level.WARNING, "JobQueue exception adding job: Message: " + + e.getMessage()); + throw new SchedulerException(e.getMessage()); + } + return jobId; + } + + private boolean genericHandleJob(AvroJob avroJob, AvroJobInput avroJobInput, + String urlStr) throws JobExecutionException { + Job exec = AvroTypeFactory.getJob(avroJob); + JobInput in = AvroTypeFactory.getJobInput(avroJobInput); + + JobSpec spec = new JobSpec(in, exec); + + URL remoteUrl = safeGetUrlFromString(urlStr); + ResourceNode remoteNode = null; + + try { + remoteNode = scheduler.getMonitor().getNodeByURL(remoteUrl); + } catch (MonitorException e) { + } + + if (remoteNode != null) { + return scheduler.getBatchmgr().executeRemotely(spec, remoteNode); + } else + return false; + } + + private URL safeGetUrlFromString(String urlStr) { + URL url = null; + + try { + url = new URL(urlStr); + } catch (MalformedURLException e) { + LOG.log(Level.WARNING, "Error converting string: [" + urlStr + + "] to URL object: Message: " + e.getMessage()); + } + + return url; + } + +} http://git-wip-us.apache.org/repos/asf/oodt/blob/287d4e89/resource/src/main/java/org/apache/oodt/cas/resource/system/AvroRpcResourceManagerClient.java ---------------------------------------------------------------------- diff --git a/resource/src/main/java/org/apache/oodt/cas/resource/system/AvroRpcResourceManagerClient.java b/resource/src/main/java/org/apache/oodt/cas/resource/system/AvroRpcResourceManagerClient.java new file mode 100644 index 0000000..fa0e84b --- /dev/null +++ b/resource/src/main/java/org/apache/oodt/cas/resource/system/AvroRpcResourceManagerClient.java @@ -0,0 +1,305 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oodt.cas.resource.system; + +import org.apache.avro.AvroRemoteException; +import org.apache.avro.ipc.NettyTransceiver; +import org.apache.avro.ipc.Transceiver; +import org.apache.avro.ipc.specific.SpecificRequestor; +import org.apache.oodt.cas.cli.CmdLineUtility; +import org.apache.oodt.cas.resource.structs.AvroTypeFactory; +import org.apache.oodt.cas.resource.structs.Job; +import org.apache.oodt.cas.resource.structs.JobInput; +import org.apache.oodt.cas.resource.structs.ResourceNode; +import org.apache.oodt.cas.resource.structs.exceptions.JobExecutionException; +import org.apache.oodt.cas.resource.structs.exceptions.JobRepositoryException; +import org.apache.oodt.cas.resource.structs.exceptions.MonitorException; +import org.apache.oodt.cas.resource.structs.exceptions.QueueManagerException; +import org.apache.oodt.cas.resource.structs.avrotypes.ResourceManager; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.URL; +import java.util.List; +import java.util.logging.Level; +import java.util.logging.Logger; + +public class AvroRpcResourceManagerClient implements ResourceManagerClient { + + /* our log stream */ + private static Logger LOG = Logger + .getLogger(XmlRpcResourceManagerClient.class.getName()); + + /* resource manager url */ + private URL resMgrUrl = null; + + Transceiver client; + ResourceManager proxy; + + public AvroRpcResourceManagerClient(URL url) { + // set up the configuration, if there is any + if (System.getProperty("org.apache.oodt.cas.resource.properties") != null) { + String configFile = System + .getProperty("org.apache.oodt.cas.resource.properties"); + LOG.log(Level.INFO, + "Loading Resource Manager Configuration Properties from: [" + + configFile + "]"); + try { + System.getProperties().load( + new FileInputStream(new File(configFile))); + } catch (Exception e) { + LOG.log(Level.INFO, + "Error loading configuration properties from: [" + + configFile + "]"); + } + } + + try { + this.client = new NettyTransceiver(new InetSocketAddress(url.getPort())); + proxy = (ResourceManager) SpecificRequestor.getClient(ResourceManager.class, client); + } catch (IOException e) { + e.printStackTrace(); + } + + } + + public static void main(String[] args) { + CmdLineUtility cmdLineUtility = new CmdLineUtility(); + cmdLineUtility.run(args); + } + + + @Override + public boolean isJobComplete(String jobId) throws JobRepositoryException { + try { + return proxy.isJobComplete(jobId); + } catch (AvroRemoteException e) { + throw new JobRepositoryException(e); + } + } + + @Override + public Job getJobInfo(String jobId) throws JobRepositoryException { + try { + return AvroTypeFactory.getJob(proxy.getJobInfo(jobId)); + } catch (AvroRemoteException e) { + throw new JobRepositoryException(e); + } + } + + @Override + public boolean isAlive() { + try { + return proxy.isAlive(); + } catch (AvroRemoteException e) { + e.printStackTrace(); + } + return false; + } + + @Override + public int getJobQueueSize() throws JobRepositoryException { + try { + return proxy.getJobQueueSize(); + } catch (AvroRemoteException e) { + throw new JobRepositoryException(e); + } + } + + @Override + public int getJobQueueCapacity() throws JobRepositoryException { + try { + return proxy.getJobQueueCapacity(); + } catch (AvroRemoteException e) { + throw new JobRepositoryException(e); + } + } + + @Override + public boolean killJob(String jobId) { + try { + return proxy.killJob(jobId); + } catch (AvroRemoteException e) { + LOG.log(Level.SEVERE, + "Server error!"); + } + return false; + } + + @Override + public String getExecutionNode(String jobId) { + try { + return proxy.getExecutionNode(jobId); + } catch (AvroRemoteException e) { + LOG.log(Level.SEVERE, + "Server error!"); + } + return null; + } + + @Override + public String submitJob(Job exec, JobInput in) throws JobExecutionException { + try { + return proxy.handleJob(AvroTypeFactory.getAvroJob(exec),AvroTypeFactory.getAvroJobInput(in)); + } catch (AvroRemoteException e) { + LOG.log(Level.SEVERE, + "Server error!"); + + } + return null; + } + + @Override + public boolean submitJob(Job exec, JobInput in, URL hostUrl) throws JobExecutionException { + try { + return proxy.handleJobWithUrl(AvroTypeFactory.getAvroJob(exec), AvroTypeFactory.getAvroJobInput(in), hostUrl.toString()); + } catch (AvroRemoteException e) { + throw new JobExecutionException(e); + } + } + + @Override + public List getNodes() throws MonitorException { + try { + return AvroTypeFactory.getListResourceNode(proxy.getNodes()); + } catch (AvroRemoteException e) { + throw new MonitorException(e); + } + } + + @Override + public ResourceNode getNodeById(String nodeId) throws MonitorException { + try { + return AvroTypeFactory.getResourceNode(proxy.getNodeById(nodeId)); + } catch (AvroRemoteException e) { + throw new MonitorException(e); + } + } + + @Override + public URL getResMgrUrl() { + return this.resMgrUrl; + } + + @Override + public void setResMgrUrl(URL resMgrUrl) { + this.resMgrUrl = resMgrUrl; + } + + @Override + public void addQueue(String queueName) throws QueueManagerException { + try { + proxy.addQueue(queueName); + } catch (AvroRemoteException e) { + throw new QueueManagerException(e); + } + } + + @Override + public void removeQueue(String queueName) throws QueueManagerException { + try { + proxy.removeQueue(queueName); + } catch (AvroRemoteException e) { + throw new QueueManagerException(e); + } + + } + + @Override + public void addNode(ResourceNode node) throws MonitorException { + try { + proxy.addNode(AvroTypeFactory.getAvroResourceNode(node)); + } catch (AvroRemoteException e) { + throw new MonitorException(e); + } + } + + @Override + public void removeNode(String nodeId) throws MonitorException { + try { + proxy.removeNode(nodeId); + } catch (AvroRemoteException e) { + throw new MonitorException(e); + } + } + + @Override + public void setNodeCapacity(String nodeId, int capacity) throws MonitorException { + try { + proxy.setNodeCapacity(nodeId,capacity); + } catch (AvroRemoteException e) { + throw new MonitorException(e); + } + } + + @Override + public void addNodeToQueue(String nodeId, String queueName) throws QueueManagerException { + try { + proxy.addNodeToQueue(nodeId,queueName); + } catch (AvroRemoteException e) { + throw new QueueManagerException(e); + } + } + + @Override + public void removeNodeFromQueue(String nodeId, String queueName) throws QueueManagerException { + try { + proxy.removeNodeFromQueue(nodeId,queueName); + } catch (AvroRemoteException e) { + throw new QueueManagerException(e); + } + } + + @Override + public List<String> getQueues() throws QueueManagerException { + try { + return proxy.getQueues(); + } catch (AvroRemoteException e) { + throw new QueueManagerException(e); + } + } + + @Override + public List<String> getNodesInQueue(String queueName) throws QueueManagerException { + try { + return proxy.getNodesInQueue(queueName); + } catch (AvroRemoteException e) { + throw new QueueManagerException(e); + } + } + + @Override + public List<String> getQueuesWithNode(String nodeId) throws QueueManagerException { + try { + return proxy.getQueuesWithNode(nodeId); + } catch (AvroRemoteException e) { + throw new QueueManagerException(e); + } + } + + @Override + public String getNodeLoad(String nodeId) throws MonitorException { + try { + return proxy.getNodeLoad(nodeId); + } catch (AvroRemoteException e) { + throw new MonitorException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/oodt/blob/287d4e89/resource/src/main/java/org/apache/oodt/cas/resource/system/ResourceManager.java ---------------------------------------------------------------------- diff --git a/resource/src/main/java/org/apache/oodt/cas/resource/system/ResourceManager.java b/resource/src/main/java/org/apache/oodt/cas/resource/system/ResourceManager.java new file mode 100644 index 0000000..5cbf6d3 --- /dev/null +++ b/resource/src/main/java/org/apache/oodt/cas/resource/system/ResourceManager.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oodt.cas.resource.system; + +import org.apache.oodt.cas.resource.structs.exceptions.*; + +import java.util.Date; +import java.util.Hashtable; +import java.util.List; +import java.util.Vector; + +public interface ResourceManager { + + boolean shutdown(); + +} http://git-wip-us.apache.org/repos/asf/oodt/blob/287d4e89/resource/src/main/java/org/apache/oodt/cas/resource/system/ResourceManagerClient.java ---------------------------------------------------------------------- diff --git a/resource/src/main/java/org/apache/oodt/cas/resource/system/ResourceManagerClient.java b/resource/src/main/java/org/apache/oodt/cas/resource/system/ResourceManagerClient.java new file mode 100644 index 0000000..dd4444b --- /dev/null +++ b/resource/src/main/java/org/apache/oodt/cas/resource/system/ResourceManagerClient.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oodt.cas.resource.system; + +import org.apache.oodt.cas.resource.structs.Job; +import org.apache.oodt.cas.resource.structs.JobInput; +import org.apache.oodt.cas.resource.structs.ResourceNode; +import org.apache.oodt.cas.resource.structs.exceptions.JobExecutionException; +import org.apache.oodt.cas.resource.structs.exceptions.JobRepositoryException; +import org.apache.oodt.cas.resource.structs.exceptions.MonitorException; +import org.apache.oodt.cas.resource.structs.exceptions.QueueManagerException; + +import java.net.URL; +import java.util.List; + +public interface ResourceManagerClient { + boolean isJobComplete(String jobId) throws JobRepositoryException; + + Job getJobInfo(String jobId) throws JobRepositoryException; + + boolean isAlive(); + + int getJobQueueSize() throws JobRepositoryException; + + int getJobQueueCapacity() throws JobRepositoryException; + + boolean killJob(String jobId); + + String getExecutionNode(String jobId); + + String submitJob(Job exec, JobInput in) throws JobExecutionException; + + boolean submitJob(Job exec, JobInput in, URL hostUrl) + throws JobExecutionException; + + List getNodes() throws MonitorException; + + ResourceNode getNodeById(String nodeId) throws MonitorException; + + URL getResMgrUrl(); + + void setResMgrUrl(URL resMgrUrl); + + void addQueue(String queueName) throws QueueManagerException; + + void removeQueue(String queueName) throws QueueManagerException; + + void addNode(ResourceNode node) throws MonitorException; + + void removeNode(String nodeId) throws MonitorException; + + void setNodeCapacity(String nodeId, int capacity) throws MonitorException; + + void addNodeToQueue(String nodeId, String queueName) throws QueueManagerException; + + void removeNodeFromQueue(String nodeId, String queueName) throws QueueManagerException; + + List<String> getQueues() throws QueueManagerException; + + List<String> getNodesInQueue(String queueName) throws QueueManagerException; + + List<String> getQueuesWithNode(String nodeId) throws QueueManagerException; + + String getNodeLoad(String nodeId) throws MonitorException; +} http://git-wip-us.apache.org/repos/asf/oodt/blob/287d4e89/resource/src/main/java/org/apache/oodt/cas/resource/system/XmlRpcResourceManagerClient.java ---------------------------------------------------------------------- diff --git a/resource/src/main/java/org/apache/oodt/cas/resource/system/XmlRpcResourceManagerClient.java b/resource/src/main/java/org/apache/oodt/cas/resource/system/XmlRpcResourceManagerClient.java index 0fb0520..9110807 100644 --- a/resource/src/main/java/org/apache/oodt/cas/resource/system/XmlRpcResourceManagerClient.java +++ b/resource/src/main/java/org/apache/oodt/cas/resource/system/XmlRpcResourceManagerClient.java @@ -56,7 +56,7 @@ import java.io.IOException; * </p> * */ -public class XmlRpcResourceManagerClient { +public class XmlRpcResourceManagerClient implements ResourceManagerClient { /* our xml rpc client */ private XmlRpcClient client = null; @@ -119,6 +119,7 @@ public class XmlRpcResourceManagerClient { cmdLineUtility.run(args); } + @Override public boolean isJobComplete(String jobId) throws JobRepositoryException { Vector argList = new Vector(); argList.add(jobId); @@ -137,6 +138,7 @@ public class XmlRpcResourceManagerClient { return complete; } + @Override public Job getJobInfo(String jobId) throws JobRepositoryException { Vector argList = new Vector(); argList.add(jobId); @@ -155,6 +157,7 @@ public class XmlRpcResourceManagerClient { return XmlRpcStructFactory.getJobFromXmlRpc(jobHash); } + @Override public boolean isAlive() { Vector argList = new Vector(); @@ -174,6 +177,7 @@ public class XmlRpcResourceManagerClient { * @return Number of Jobs in JobQueue * @throws JobRepositoryException On Any Exception */ + @Override public int getJobQueueSize() throws JobRepositoryException { try { Vector argList = new Vector(); @@ -188,6 +192,7 @@ public class XmlRpcResourceManagerClient { * @return Max number of Jobs * @throws JobRepositoryException On Any Exception */ + @Override public int getJobQueueCapacity() throws JobRepositoryException { try { Vector argList = new Vector(); @@ -197,6 +202,7 @@ public class XmlRpcResourceManagerClient { } } + @Override public boolean killJob(String jobId) { Vector argList = new Vector(); argList.add(jobId); @@ -211,6 +217,7 @@ public class XmlRpcResourceManagerClient { } } + @Override public String getExecutionNode(String jobId) { Vector argList = new Vector(); argList.add(jobId); @@ -224,6 +231,7 @@ public class XmlRpcResourceManagerClient { } } + @Override public String submitJob(Job exec, JobInput in) throws JobExecutionException { Vector argList = new Vector(); argList.add(XmlRpcStructFactory.getXmlRpcJob(exec)); @@ -245,6 +253,7 @@ public class XmlRpcResourceManagerClient { } + @Override public boolean submitJob(Job exec, JobInput in, URL hostUrl) throws JobExecutionException { Vector argList = new Vector(); @@ -267,6 +276,7 @@ public class XmlRpcResourceManagerClient { } + @Override public List getNodes() throws MonitorException { Vector argList = new Vector(); @@ -285,6 +295,7 @@ public class XmlRpcResourceManagerClient { } + @Override public ResourceNode getNodeById(String nodeId) throws MonitorException { Vector argList = new Vector(); argList.add(nodeId); @@ -307,6 +318,7 @@ public class XmlRpcResourceManagerClient { /** * @return the resMgrUrl */ + @Override public URL getResMgrUrl() { return resMgrUrl; } @@ -315,6 +327,7 @@ public class XmlRpcResourceManagerClient { * @param resMgrUrl * the resMgrUrl to set */ + @Override public void setResMgrUrl(URL resMgrUrl) { this.resMgrUrl = resMgrUrl; } @@ -324,6 +337,7 @@ public class XmlRpcResourceManagerClient { * @param queueName The name of the queue to be created * @throws QueueManagerException on any error */ + @Override public void addQueue(String queueName) throws QueueManagerException { try { Vector<Object> argList = new Vector<Object>(); @@ -339,6 +353,7 @@ public class XmlRpcResourceManagerClient { * @param queueName The name of the queue to be removed * @throws QueueManagerException on any error */ + @Override public void removeQueue(String queueName) throws QueueManagerException { try { Vector<Object> argList = new Vector<Object>(); @@ -354,6 +369,7 @@ public class XmlRpcResourceManagerClient { * @param node The node to be added * @throws MonitorException on any error */ + @Override public void addNode(ResourceNode node) throws MonitorException { try { Vector<Object> argList = new Vector<Object>(); @@ -369,6 +385,7 @@ public class XmlRpcResourceManagerClient { * @param nodeId The id of the node to be removed * @throws MonitorException on any error */ + @Override public void removeNode(String nodeId) throws MonitorException { try { Vector<Object> argList = new Vector<Object>(); @@ -379,6 +396,7 @@ public class XmlRpcResourceManagerClient { } } + @Override public void setNodeCapacity(String nodeId, int capacity) throws MonitorException{ try{ Vector<Object> argList = new Vector<Object>(); @@ -396,6 +414,7 @@ public class XmlRpcResourceManagerClient { * @param queueName The name of the queue to add the given node * @throws QueueManagerException on any error */ + @Override public void addNodeToQueue(String nodeId, String queueName) throws QueueManagerException { try { Vector<Object> argList = new Vector<Object>(); @@ -413,6 +432,7 @@ public class XmlRpcResourceManagerClient { * @param queueName The name of the queue from which to remove the given node * @throws QueueManagerException on any error */ + @Override public void removeNodeFromQueue(String nodeId, String queueName) throws QueueManagerException { try { Vector<Object> argList = new Vector<Object>(); @@ -429,6 +449,7 @@ public class XmlRpcResourceManagerClient { * @return A list of currently supported queue names * @throws QueueManagerException on any error */ + @Override public List<String> getQueues() throws QueueManagerException { try { Vector<Object> argList = new Vector<Object>(); @@ -444,6 +465,7 @@ public class XmlRpcResourceManagerClient { * @return List of node ids in the given queueName * @throws QueueManagerException on any error */ + @Override public List<String> getNodesInQueue(String queueName) throws QueueManagerException { try { Vector<Object> argList = new Vector<Object>(); @@ -460,6 +482,7 @@ public class XmlRpcResourceManagerClient { * @return List of queues which contain the give node * @throws QueueManagerException on any error */ + @Override public List<String> getQueuesWithNode(String nodeId) throws QueueManagerException { try { Vector<Object> argList = new Vector<Object>(); @@ -476,6 +499,7 @@ public class XmlRpcResourceManagerClient { * @return A String showing a fraction of the loads node over its capacity * @throws MonitorException on any error */ + @Override public String getNodeLoad(String nodeId) throws MonitorException{ try{ Vector<Object> argList = new Vector<Object>(); http://git-wip-us.apache.org/repos/asf/oodt/blob/287d4e89/resource/src/main/java/org/apache/oodt/cas/resource/system/extern/AvroRpcBatchStub.java ---------------------------------------------------------------------- diff --git a/resource/src/main/java/org/apache/oodt/cas/resource/system/extern/AvroRpcBatchStub.java b/resource/src/main/java/org/apache/oodt/cas/resource/system/extern/AvroRpcBatchStub.java new file mode 100644 index 0000000..2d55d19 --- /dev/null +++ b/resource/src/main/java/org/apache/oodt/cas/resource/system/extern/AvroRpcBatchStub.java @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oodt.cas.resource.system.extern; + +import org.apache.avro.AvroRemoteException; +import org.apache.avro.ipc.NettyServer; +import org.apache.avro.ipc.Server; +import org.apache.avro.ipc.specific.SpecificResponder; +import org.apache.oodt.cas.resource.structs.AvroTypeFactory; +import org.apache.oodt.cas.resource.structs.Job; +import org.apache.oodt.cas.resource.structs.JobInput; +import org.apache.oodt.cas.resource.structs.JobInstance; +import org.apache.oodt.cas.resource.structs.avrotypes.AvroIntrBatchmgr; +import org.apache.oodt.cas.resource.structs.avrotypes.AvroJob; +import org.apache.oodt.cas.resource.structs.avrotypes.AvroJobInput; +import org.apache.oodt.cas.resource.structs.avrotypes.AvroResourceNode; +import org.apache.oodt.cas.resource.structs.exceptions.JobException; +import org.apache.oodt.cas.resource.structs.exceptions.JobInputException; +import org.apache.oodt.cas.resource.util.GenericResourceManagerObjectFactory; +import org.apache.oodt.cas.resource.util.XmlRpcStructFactory; +import org.apache.xmlrpc.WebServer; + +import java.net.InetSocketAddress; +import java.util.HashMap; +import java.util.Hashtable; +import java.util.Map; +import java.util.logging.Level; +import java.util.logging.Logger; + +public class AvroRpcBatchStub implements AvroIntrBatchmgr { + + /* the port to run the XML RPC web server on, default is 2000 */ + private int port = 2000; + + /* our avro rpc web server */ + Server server; + + /* our log stream */ + private static Logger LOG = Logger.getLogger(AvroRpcBatchStub.class + .getName()); + + private static Map jobThreadMap = null; + + public AvroRpcBatchStub(int port) throws Exception { + + + this.port = port; + + // start up the web server + server = new NettyServer(new SpecificResponder(AvroIntrBatchmgr.class,this), new InetSocketAddress(this.port)); + server.start(); + + jobThreadMap = new HashMap(); + + LOG.log(Level.INFO, "AvroRpc Batch Stub started by " + + System.getProperty("user.name", "unknown")); + } + + @Override + public boolean isAlive() throws AvroRemoteException { + return true; + } + + @Override + public boolean executeJob(AvroJob avroJob, AvroJobInput jobInput) throws AvroRemoteException { + try { + return genericExecuteJob(avroJob,jobInput); + } catch (JobException e) { + throw new AvroRemoteException(e); + } + } + + @Override + public boolean killJob(AvroJob jobHash) throws AvroRemoteException { + Job job = AvroTypeFactory.getJob(jobHash); + Thread jobThread = (Thread) jobThreadMap.get(job.getId()); + if (jobThread == null) { + LOG.log(Level.WARNING, "Job: [" + job.getId() + + "] not managed by this batch stub"); + return false; + } + + // okay, so interrupt it, which should cause it to stop + jobThread.interrupt(); + return true; + } + + private boolean genericExecuteJob(AvroJob avroJob, AvroJobInput jobInput) + throws JobException { + JobInstance exec = null; + JobInput in = null; + try { + Job job = AvroTypeFactory.getJob(avroJob); + + LOG.log(Level.INFO, "stub attempting to execute class: [" + + job.getJobInstanceClassName() + "]"); + + exec = GenericResourceManagerObjectFactory + .getJobInstanceFromClassName(job.getJobInstanceClassName()); + in = AvroTypeFactory.getJobInput(jobInput); + // load the input obj + // + + // create threaded job + // so that it can be interrupted + RunnableJob runner = new RunnableJob(exec, in); + Thread threadRunner = new Thread(runner); + /* save this job thread in a map so we can kill it later */ + jobThreadMap.put(job.getId(), threadRunner); + threadRunner.start(); + + try { + threadRunner.join(); + } catch (InterruptedException e) { + LOG.log(Level.INFO, "Current job: [" + job.getName() + + "]: killed: exiting gracefully"); + synchronized (jobThreadMap) { + Thread endThread = (Thread) jobThreadMap.get(job.getId()); + if (endThread != null) + endThread = null; + } + return false; + } + + synchronized (jobThreadMap) { + Thread endThread = (Thread) jobThreadMap.get(job.getId()); + if (endThread != null) + endThread = null; + } + + return runner.wasSuccessful(); + } catch (Exception e) { + e.printStackTrace(); + return false; + } + } + + private class RunnableJob implements Runnable { + + private JobInput in; + + private JobInstance job; + + private boolean successful; + + public RunnableJob(JobInstance job, JobInput in) { + this.job = job; + this.in = in; + this.successful = false; + } + + /* + * (non-Javadoc) + * + * @see java.lang.Runnable#run() + */ + public void run() { + try { + this.successful = job.execute(in); + } catch (JobInputException e) { + e.printStackTrace(); + this.successful = false; + } + + } + + public boolean wasSuccessful() { + return this.successful; + } + } + + public static void main(String[] args) throws Exception { + int portNum = -1; + String usage = "AvroRpcBatchStub --portNum <port number for xml rpc service>\n"; + + for (int i = 0; i < args.length; i++) { + if (args[i].equals("--portNum")) { + portNum = Integer.parseInt(args[++i]); + } + } + + if (portNum == -1) { + System.err.println(usage); + System.exit(1); + } + + XmlRpcBatchStub stub = new XmlRpcBatchStub(portNum); + + for (;;) + try { + Thread.currentThread().join(); + } catch (InterruptedException ignore) { + } + } + + +} http://git-wip-us.apache.org/repos/asf/oodt/blob/287d4e89/resource/src/test/java/org/apache/oodt/cas/resource/batchmgr/TestBatchMgr.java ---------------------------------------------------------------------- diff --git a/resource/src/test/java/org/apache/oodt/cas/resource/batchmgr/TestBatchMgr.java b/resource/src/test/java/org/apache/oodt/cas/resource/batchmgr/TestBatchMgr.java new file mode 100644 index 0000000..0fa28ef --- /dev/null +++ b/resource/src/test/java/org/apache/oodt/cas/resource/batchmgr/TestBatchMgr.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oodt.cas.resource.batchmgr; + +import junit.framework.TestCase; +import org.apache.oodt.cas.resource.structs.JobSpec; +import org.apache.oodt.cas.resource.structs.ResourceNode; +import org.apache.oodt.cas.resource.system.extern.AvroRpcBatchStub; + +import java.net.MalformedURLException; +import java.net.URL; + +public class TestBatchMgr extends TestCase { + + public void testAvroBatchMgr(){ + AvroRpcBatchMgrFactory avroRpcBatchMgrFactory = new AvroRpcBatchMgrFactory(); + Batchmgr batchmgr = avroRpcBatchMgrFactory.createBatchmgr(); + assertNotNull(batchmgr); + + try { + AvroRpcBatchStub avroRpcBatchStub = new AvroRpcBatchStub(50001); + } catch (Exception e) { + + e.printStackTrace(); + fail(e.getMessage()); + } + ResourceNode resNode = new ResourceNode(); + try { + resNode.setIpAddr(new URL("http//:localhost:50001")); + } catch (MalformedURLException e) { + fail(e.getMessage()); + } + + AvroRpcBatchMgrProxy bmc = new AvroRpcBatchMgrProxy(new JobSpec(), new ResourceNode(),(AvroRpcBatchMgr)batchmgr); + + assertTrue(bmc.nodeAlive()); + + } +} http://git-wip-us.apache.org/repos/asf/oodt/blob/287d4e89/resource/src/test/java/org/apache/oodt/cas/resource/structs/TestAvroTypeFactory.java ---------------------------------------------------------------------- diff --git a/resource/src/test/java/org/apache/oodt/cas/resource/structs/TestAvroTypeFactory.java b/resource/src/test/java/org/apache/oodt/cas/resource/structs/TestAvroTypeFactory.java new file mode 100644 index 0000000..1ad4e18 --- /dev/null +++ b/resource/src/test/java/org/apache/oodt/cas/resource/structs/TestAvroTypeFactory.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oodt.cas.resource.structs; + +import junit.framework.TestCase; +import org.apache.oodt.cas.resource.util.GenericResourceManagerObjectFactory; + +import java.net.MalformedURLException; +import java.net.URL; +import java.util.Properties; + +public class TestAvroTypeFactory extends TestCase { + + public void testAvroJobInput(){ + JobInput jobInput = GenericResourceManagerObjectFactory + .getJobInputFromClassName("org.apache.oodt.cas.resource.structs.NameValueJobInput"); + + assertNotNull(jobInput); + Properties properties = new Properties(); + properties.setProperty("key","prop1"); + jobInput.configure(properties); + + JobInput afterJobInput = AvroTypeFactory.getJobInput(AvroTypeFactory.getAvroJobInput(jobInput)); + + assertNotNull(afterJobInput); + assertEquals(afterJobInput.getId(),jobInput.getId()); + + + } + + public void testAvroJob(){ + Job initJob = new Job(); + + initJob.setId("id"); + initJob.setJobInputClassName("classname"); + initJob.setJobInstanceClassName("instClassName"); + initJob.setLoadValue(42); + initJob.setQueueName("queueName"); + initJob.setStatus("status"); + initJob.setName("name"); + + Job afterJob = AvroTypeFactory.getJob(AvroTypeFactory.getAvroJob(initJob)); + + + + assertEquals("id",afterJob.getId()); + + assertEquals("classname",afterJob.getJobInputClassName()); + + assertEquals("instClassName",afterJob.getJobInstanceClassName()); + + assertEquals(new Integer(42),afterJob.getLoadValue()); + + assertEquals("name",afterJob.getName()); + + assertEquals("queueName",afterJob.getQueueName()); + + assertEquals("status",afterJob.getStatus()); + } + + public void testNameValueJobInput(){ + NameValueJobInput initNameValueJobInput = new NameValueJobInput(); + + initNameValueJobInput.setNameValuePair("name","value"); + + NameValueJobInput afterNameValueJobInput =(NameValueJobInput) AvroTypeFactory.getJobInput( + AvroTypeFactory.getAvroJobInput( + initNameValueJobInput)); + + assertEquals(initNameValueJobInput.getId(),afterNameValueJobInput.getId()); + assertEquals("value", afterNameValueJobInput.getProps().getProperty("name")); + + } + + public void testAvroResourceNode(){ + ResourceNode initResourceNode = new ResourceNode(); + + initResourceNode.setCapacity(42); + + initResourceNode.setId("id"); + + try { + initResourceNode.setIpAddr(new URL("http//:localhost")); + } catch (MalformedURLException e) { + fail(e.getMessage()); + } + + ResourceNode afterResourceNode = AvroTypeFactory.getResourceNode(AvroTypeFactory.getAvroResourceNode(initResourceNode)); + + assertEquals(initResourceNode.getCapacity(),afterResourceNode.getCapacity()); + + assertEquals(initResourceNode.getIpAddr(),afterResourceNode.getIpAddr()); + + assertEquals(initResourceNode.getNodeId(),afterResourceNode.getNodeId()); + + } +}