Repository: airavata Updated Branches: refs/heads/master 7b8e933a4 -> 43c313f9a
adding token id to submit and cancel methods Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/43c313f9 Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/43c313f9 Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/43c313f9 Branch: refs/heads/master Commit: 43c313f9a1234154e7093a5b63cbf8e6bf0efe49 Parents: 7b8e933 Author: Chathuri Wimalasena <[email protected]> Authored: Mon May 11 10:57:12 2015 -0400 Committer: Chathuri Wimalasena <[email protected]> Committed: Mon May 11 10:57:12 2015 -0400 ---------------------------------------------------------------------- .../airavata/common/utils/AiravataZKUtils.java | 9 - .../apache/airavata/gfac/cpi/GfacService.java | 250 +++++++++++++++++-- .../airavata/gfac/server/GfacServerHandler.java | 12 +- .../airavata/gfac/core/cpi/BetterGfacImpl.java | 7 +- .../org/apache/airavata/gfac/core/cpi/GFac.java | 4 +- .../airavata/gfac/core/utils/GFacUtils.java | 6 - .../gfac/core/utils/InputHandlerWorker.java | 12 +- .../gfac.cpi.service.thrift | 6 +- .../core/impl/GFACEmbeddedJobSubmitter.java | 2 +- pom.xml | 2 +- 10 files changed, 248 insertions(+), 62 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/43c313f9/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AiravataZKUtils.java ---------------------------------------------------------------------- diff --git a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AiravataZKUtils.java b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AiravataZKUtils.java index 3a38ba1..9b42df9 100644 --- a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AiravataZKUtils.java +++ b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AiravataZKUtils.java @@ -79,15 +79,6 @@ public class AiravataZKUtils implements Watcher { "state"; } - public static String getExpTokenId(ZooKeeper zk, String expId) throws ApplicationSettingsException, - KeeperException, InterruptedException { - Stat exists = zk.exists(getExpZnodePath(expId), false); - if (exists != null) { - return new String(zk.getData(getExpZnodePath(expId), false, exists)); - } - return null; - } - public static String getExpState(ZooKeeper zk, String expId) throws ApplicationSettingsException, KeeperException, InterruptedException { Stat exists = zk.exists(getExpStatePath(expId), false); http://git-wip-us.apache.org/repos/asf/airavata/blob/43c313f9/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/cpi/GfacService.java ---------------------------------------------------------------------- diff --git a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/cpi/GfacService.java b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/cpi/GfacService.java index 24f4de8..213b834 100644 --- a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/cpi/GfacService.java +++ b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/cpi/GfacService.java @@ -73,8 +73,9 @@ import org.slf4j.LoggerFactory; * @param experimentId * @param taskId * @param gatewayId + * @param tokenId */ - public boolean submitJob(String experimentId, String taskId, String gatewayId) throws org.apache.thrift.TException; + public boolean submitJob(String experimentId, String taskId, String gatewayId, String tokenId) throws org.apache.thrift.TException; /** * * @@ -92,8 +93,9 @@ import org.slf4j.LoggerFactory; * @param experimentId * @param taskId * @param gatewayId + * @param tokenId */ - public boolean cancelJob(String experimentId, String taskId, String gatewayId) throws org.apache.thrift.TException; + public boolean cancelJob(String experimentId, String taskId, String gatewayId, String tokenId) throws org.apache.thrift.TException; } @@ -101,9 +103,9 @@ import org.slf4j.LoggerFactory; public void getGFACServiceVersion(org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException; - public void submitJob(String experimentId, String taskId, String gatewayId, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException; + public void submitJob(String experimentId, String taskId, String gatewayId, String tokenId, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException; - public void cancelJob(String experimentId, String taskId, String gatewayId, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException; + public void cancelJob(String experimentId, String taskId, String gatewayId, String tokenId, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException; } @@ -149,18 +151,19 @@ import org.slf4j.LoggerFactory; throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.MISSING_RESULT, "getGFACServiceVersion failed: unknown result"); } - public boolean submitJob(String experimentId, String taskId, String gatewayId) throws org.apache.thrift.TException + public boolean submitJob(String experimentId, String taskId, String gatewayId, String tokenId) throws org.apache.thrift.TException { - send_submitJob(experimentId, taskId, gatewayId); + send_submitJob(experimentId, taskId, gatewayId, tokenId); return recv_submitJob(); } - public void send_submitJob(String experimentId, String taskId, String gatewayId) throws org.apache.thrift.TException + public void send_submitJob(String experimentId, String taskId, String gatewayId, String tokenId) throws org.apache.thrift.TException { submitJob_args args = new submitJob_args(); args.setExperimentId(experimentId); args.setTaskId(taskId); args.setGatewayId(gatewayId); + args.setTokenId(tokenId); sendBase("submitJob", args); } @@ -174,18 +177,19 @@ import org.slf4j.LoggerFactory; throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.MISSING_RESULT, "submitJob failed: unknown result"); } - public boolean cancelJob(String experimentId, String taskId, String gatewayId) throws org.apache.thrift.TException + public boolean cancelJob(String experimentId, String taskId, String gatewayId, String tokenId) throws org.apache.thrift.TException { - send_cancelJob(experimentId, taskId, gatewayId); + send_cancelJob(experimentId, taskId, gatewayId, tokenId); return recv_cancelJob(); } - public void send_cancelJob(String experimentId, String taskId, String gatewayId) throws org.apache.thrift.TException + public void send_cancelJob(String experimentId, String taskId, String gatewayId, String tokenId) throws org.apache.thrift.TException { cancelJob_args args = new cancelJob_args(); args.setExperimentId(experimentId); args.setTaskId(taskId); args.setGatewayId(gatewayId); + args.setTokenId(tokenId); sendBase("cancelJob", args); } @@ -246,9 +250,9 @@ import org.slf4j.LoggerFactory; } } - public void submitJob(String experimentId, String taskId, String gatewayId, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException { + public void submitJob(String experimentId, String taskId, String gatewayId, String tokenId, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException { checkReady(); - submitJob_call method_call = new submitJob_call(experimentId, taskId, gatewayId, resultHandler, this, ___protocolFactory, ___transport); + submitJob_call method_call = new submitJob_call(experimentId, taskId, gatewayId, tokenId, resultHandler, this, ___protocolFactory, ___transport); this.___currentMethod = method_call; ___manager.call(method_call); } @@ -257,11 +261,13 @@ import org.slf4j.LoggerFactory; private String experimentId; private String taskId; private String gatewayId; - public submitJob_call(String experimentId, String taskId, String gatewayId, org.apache.thrift.async.AsyncMethodCallback resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException { + private String tokenId; + public submitJob_call(String experimentId, String taskId, String gatewayId, String tokenId, org.apache.thrift.async.AsyncMethodCallback resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException { super(client, protocolFactory, transport, resultHandler, false); this.experimentId = experimentId; this.taskId = taskId; this.gatewayId = gatewayId; + this.tokenId = tokenId; } public void write_args(org.apache.thrift.protocol.TProtocol prot) throws org.apache.thrift.TException { @@ -270,6 +276,7 @@ import org.slf4j.LoggerFactory; args.setExperimentId(experimentId); args.setTaskId(taskId); args.setGatewayId(gatewayId); + args.setTokenId(tokenId); args.write(prot); prot.writeMessageEnd(); } @@ -284,9 +291,9 @@ import org.slf4j.LoggerFactory; } } - public void cancelJob(String experimentId, String taskId, String gatewayId, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException { + public void cancelJob(String experimentId, String taskId, String gatewayId, String tokenId, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException { checkReady(); - cancelJob_call method_call = new cancelJob_call(experimentId, taskId, gatewayId, resultHandler, this, ___protocolFactory, ___transport); + cancelJob_call method_call = new cancelJob_call(experimentId, taskId, gatewayId, tokenId, resultHandler, this, ___protocolFactory, ___transport); this.___currentMethod = method_call; ___manager.call(method_call); } @@ -295,11 +302,13 @@ import org.slf4j.LoggerFactory; private String experimentId; private String taskId; private String gatewayId; - public cancelJob_call(String experimentId, String taskId, String gatewayId, org.apache.thrift.async.AsyncMethodCallback resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException { + private String tokenId; + public cancelJob_call(String experimentId, String taskId, String gatewayId, String tokenId, org.apache.thrift.async.AsyncMethodCallback resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException { super(client, protocolFactory, transport, resultHandler, false); this.experimentId = experimentId; this.taskId = taskId; this.gatewayId = gatewayId; + this.tokenId = tokenId; } public void write_args(org.apache.thrift.protocol.TProtocol prot) throws org.apache.thrift.TException { @@ -308,6 +317,7 @@ import org.slf4j.LoggerFactory; args.setExperimentId(experimentId); args.setTaskId(taskId); args.setGatewayId(gatewayId); + args.setTokenId(tokenId); args.write(prot); prot.writeMessageEnd(); } @@ -376,7 +386,7 @@ import org.slf4j.LoggerFactory; public submitJob_result getResult(I iface, submitJob_args args) throws org.apache.thrift.TException { submitJob_result result = new submitJob_result(); - result.success = iface.submitJob(args.experimentId, args.taskId, args.gatewayId); + result.success = iface.submitJob(args.experimentId, args.taskId, args.gatewayId, args.tokenId); result.setSuccessIsSet(true); return result; } @@ -397,7 +407,7 @@ import org.slf4j.LoggerFactory; public cancelJob_result getResult(I iface, cancelJob_args args) throws org.apache.thrift.TException { cancelJob_result result = new cancelJob_result(); - result.success = iface.cancelJob(args.experimentId, args.taskId, args.gatewayId); + result.success = iface.cancelJob(args.experimentId, args.taskId, args.gatewayId, args.tokenId); result.setSuccessIsSet(true); return result; } @@ -521,7 +531,7 @@ import org.slf4j.LoggerFactory; } public void start(I iface, submitJob_args args, org.apache.thrift.async.AsyncMethodCallback<Boolean> resultHandler) throws TException { - iface.submitJob(args.experimentId, args.taskId, args.gatewayId,resultHandler); + iface.submitJob(args.experimentId, args.taskId, args.gatewayId, args.tokenId,resultHandler); } } @@ -573,7 +583,7 @@ import org.slf4j.LoggerFactory; } public void start(I iface, cancelJob_args args, org.apache.thrift.async.AsyncMethodCallback<Boolean> resultHandler) throws TException { - iface.cancelJob(args.experimentId, args.taskId, args.gatewayId,resultHandler); + iface.cancelJob(args.experimentId, args.taskId, args.gatewayId, args.tokenId,resultHandler); } } @@ -1185,6 +1195,7 @@ import org.slf4j.LoggerFactory; private static final org.apache.thrift.protocol.TField EXPERIMENT_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("experimentId", org.apache.thrift.protocol.TType.STRING, (short)1); private static final org.apache.thrift.protocol.TField TASK_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("taskId", org.apache.thrift.protocol.TType.STRING, (short)2); private static final org.apache.thrift.protocol.TField GATEWAY_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("gatewayId", org.apache.thrift.protocol.TType.STRING, (short)3); + private static final org.apache.thrift.protocol.TField TOKEN_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("tokenId", org.apache.thrift.protocol.TType.STRING, (short)4); private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>(); static { @@ -1195,12 +1206,14 @@ import org.slf4j.LoggerFactory; public String experimentId; // required public String taskId; // required public String gatewayId; // required + public String tokenId; // required /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ @SuppressWarnings("all") public enum _Fields implements org.apache.thrift.TFieldIdEnum { EXPERIMENT_ID((short)1, "experimentId"), TASK_ID((short)2, "taskId"), - GATEWAY_ID((short)3, "gatewayId"); + GATEWAY_ID((short)3, "gatewayId"), + TOKEN_ID((short)4, "tokenId"); private static final Map<String, _Fields> byName = new HashMap<String, _Fields>(); @@ -1221,6 +1234,8 @@ import org.slf4j.LoggerFactory; return TASK_ID; case 3: // GATEWAY_ID return GATEWAY_ID; + case 4: // TOKEN_ID + return TOKEN_ID; default: return null; } @@ -1270,6 +1285,8 @@ import org.slf4j.LoggerFactory; new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); tmpMap.put(_Fields.GATEWAY_ID, new org.apache.thrift.meta_data.FieldMetaData("gatewayId", org.apache.thrift.TFieldRequirementType.REQUIRED, new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); + tmpMap.put(_Fields.TOKEN_ID, new org.apache.thrift.meta_data.FieldMetaData("tokenId", org.apache.thrift.TFieldRequirementType.REQUIRED, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); metaDataMap = Collections.unmodifiableMap(tmpMap); org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(submitJob_args.class, metaDataMap); } @@ -1280,12 +1297,14 @@ import org.slf4j.LoggerFactory; public submitJob_args( String experimentId, String taskId, - String gatewayId) + String gatewayId, + String tokenId) { this(); this.experimentId = experimentId; this.taskId = taskId; this.gatewayId = gatewayId; + this.tokenId = tokenId; } /** @@ -1301,6 +1320,9 @@ import org.slf4j.LoggerFactory; if (other.isSetGatewayId()) { this.gatewayId = other.gatewayId; } + if (other.isSetTokenId()) { + this.tokenId = other.tokenId; + } } public submitJob_args deepCopy() { @@ -1312,6 +1334,7 @@ import org.slf4j.LoggerFactory; this.experimentId = null; this.taskId = null; this.gatewayId = null; + this.tokenId = null; } public String getExperimentId() { @@ -1386,6 +1409,30 @@ import org.slf4j.LoggerFactory; } } + public String getTokenId() { + return this.tokenId; + } + + public submitJob_args setTokenId(String tokenId) { + this.tokenId = tokenId; + return this; + } + + public void unsetTokenId() { + this.tokenId = null; + } + + /** Returns true if field tokenId is set (has been assigned a value) and false otherwise */ + public boolean isSetTokenId() { + return this.tokenId != null; + } + + public void setTokenIdIsSet(boolean value) { + if (!value) { + this.tokenId = null; + } + } + public void setFieldValue(_Fields field, Object value) { switch (field) { case EXPERIMENT_ID: @@ -1412,6 +1459,14 @@ import org.slf4j.LoggerFactory; } break; + case TOKEN_ID: + if (value == null) { + unsetTokenId(); + } else { + setTokenId((String)value); + } + break; + } } @@ -1426,6 +1481,9 @@ import org.slf4j.LoggerFactory; case GATEWAY_ID: return getGatewayId(); + case TOKEN_ID: + return getTokenId(); + } throw new IllegalStateException(); } @@ -1443,6 +1501,8 @@ import org.slf4j.LoggerFactory; return isSetTaskId(); case GATEWAY_ID: return isSetGatewayId(); + case TOKEN_ID: + return isSetTokenId(); } throw new IllegalStateException(); } @@ -1487,6 +1547,15 @@ import org.slf4j.LoggerFactory; return false; } + boolean this_present_tokenId = true && this.isSetTokenId(); + boolean that_present_tokenId = true && that.isSetTokenId(); + if (this_present_tokenId || that_present_tokenId) { + if (!(this_present_tokenId && that_present_tokenId)) + return false; + if (!this.tokenId.equals(that.tokenId)) + return false; + } + return true; } @@ -1533,6 +1602,16 @@ import org.slf4j.LoggerFactory; return lastComparison; } } + lastComparison = Boolean.valueOf(isSetTokenId()).compareTo(other.isSetTokenId()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetTokenId()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.tokenId, other.tokenId); + if (lastComparison != 0) { + return lastComparison; + } + } return 0; } @@ -1576,6 +1655,14 @@ import org.slf4j.LoggerFactory; sb.append(this.gatewayId); } first = false; + if (!first) sb.append(", "); + sb.append("tokenId:"); + if (this.tokenId == null) { + sb.append("null"); + } else { + sb.append(this.tokenId); + } + first = false; sb.append(")"); return sb.toString(); } @@ -1591,6 +1678,9 @@ import org.slf4j.LoggerFactory; if (gatewayId == null) { throw new org.apache.thrift.protocol.TProtocolException("Required field 'gatewayId' was not present! Struct: " + toString()); } + if (tokenId == null) { + throw new org.apache.thrift.protocol.TProtocolException("Required field 'tokenId' was not present! Struct: " + toString()); + } // check for sub-struct validity } @@ -1652,6 +1742,14 @@ import org.slf4j.LoggerFactory; org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; + case 4: // TOKEN_ID + if (schemeField.type == org.apache.thrift.protocol.TType.STRING) { + struct.tokenId = iprot.readString(); + struct.setTokenIdIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; default: org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } @@ -1682,6 +1780,11 @@ import org.slf4j.LoggerFactory; oprot.writeString(struct.gatewayId); oprot.writeFieldEnd(); } + if (struct.tokenId != null) { + oprot.writeFieldBegin(TOKEN_ID_FIELD_DESC); + oprot.writeString(struct.tokenId); + oprot.writeFieldEnd(); + } oprot.writeFieldStop(); oprot.writeStructEnd(); } @@ -1702,6 +1805,7 @@ import org.slf4j.LoggerFactory; oprot.writeString(struct.experimentId); oprot.writeString(struct.taskId); oprot.writeString(struct.gatewayId); + oprot.writeString(struct.tokenId); } @Override @@ -1713,6 +1817,8 @@ import org.slf4j.LoggerFactory; struct.setTaskIdIsSet(true); struct.gatewayId = iprot.readString(); struct.setGatewayIdIsSet(true); + struct.tokenId = iprot.readString(); + struct.setTokenIdIsSet(true); } } @@ -2078,6 +2184,7 @@ import org.slf4j.LoggerFactory; private static final org.apache.thrift.protocol.TField EXPERIMENT_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("experimentId", org.apache.thrift.protocol.TType.STRING, (short)1); private static final org.apache.thrift.protocol.TField TASK_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("taskId", org.apache.thrift.protocol.TType.STRING, (short)2); private static final org.apache.thrift.protocol.TField GATEWAY_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("gatewayId", org.apache.thrift.protocol.TType.STRING, (short)3); + private static final org.apache.thrift.protocol.TField TOKEN_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("tokenId", org.apache.thrift.protocol.TType.STRING, (short)4); private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>(); static { @@ -2088,12 +2195,14 @@ import org.slf4j.LoggerFactory; public String experimentId; // required public String taskId; // required public String gatewayId; // required + public String tokenId; // required /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ @SuppressWarnings("all") public enum _Fields implements org.apache.thrift.TFieldIdEnum { EXPERIMENT_ID((short)1, "experimentId"), TASK_ID((short)2, "taskId"), - GATEWAY_ID((short)3, "gatewayId"); + GATEWAY_ID((short)3, "gatewayId"), + TOKEN_ID((short)4, "tokenId"); private static final Map<String, _Fields> byName = new HashMap<String, _Fields>(); @@ -2114,6 +2223,8 @@ import org.slf4j.LoggerFactory; return TASK_ID; case 3: // GATEWAY_ID return GATEWAY_ID; + case 4: // TOKEN_ID + return TOKEN_ID; default: return null; } @@ -2163,6 +2274,8 @@ import org.slf4j.LoggerFactory; new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); tmpMap.put(_Fields.GATEWAY_ID, new org.apache.thrift.meta_data.FieldMetaData("gatewayId", org.apache.thrift.TFieldRequirementType.REQUIRED, new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); + tmpMap.put(_Fields.TOKEN_ID, new org.apache.thrift.meta_data.FieldMetaData("tokenId", org.apache.thrift.TFieldRequirementType.REQUIRED, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); metaDataMap = Collections.unmodifiableMap(tmpMap); org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(cancelJob_args.class, metaDataMap); } @@ -2173,12 +2286,14 @@ import org.slf4j.LoggerFactory; public cancelJob_args( String experimentId, String taskId, - String gatewayId) + String gatewayId, + String tokenId) { this(); this.experimentId = experimentId; this.taskId = taskId; this.gatewayId = gatewayId; + this.tokenId = tokenId; } /** @@ -2194,6 +2309,9 @@ import org.slf4j.LoggerFactory; if (other.isSetGatewayId()) { this.gatewayId = other.gatewayId; } + if (other.isSetTokenId()) { + this.tokenId = other.tokenId; + } } public cancelJob_args deepCopy() { @@ -2205,6 +2323,7 @@ import org.slf4j.LoggerFactory; this.experimentId = null; this.taskId = null; this.gatewayId = null; + this.tokenId = null; } public String getExperimentId() { @@ -2279,6 +2398,30 @@ import org.slf4j.LoggerFactory; } } + public String getTokenId() { + return this.tokenId; + } + + public cancelJob_args setTokenId(String tokenId) { + this.tokenId = tokenId; + return this; + } + + public void unsetTokenId() { + this.tokenId = null; + } + + /** Returns true if field tokenId is set (has been assigned a value) and false otherwise */ + public boolean isSetTokenId() { + return this.tokenId != null; + } + + public void setTokenIdIsSet(boolean value) { + if (!value) { + this.tokenId = null; + } + } + public void setFieldValue(_Fields field, Object value) { switch (field) { case EXPERIMENT_ID: @@ -2305,6 +2448,14 @@ import org.slf4j.LoggerFactory; } break; + case TOKEN_ID: + if (value == null) { + unsetTokenId(); + } else { + setTokenId((String)value); + } + break; + } } @@ -2319,6 +2470,9 @@ import org.slf4j.LoggerFactory; case GATEWAY_ID: return getGatewayId(); + case TOKEN_ID: + return getTokenId(); + } throw new IllegalStateException(); } @@ -2336,6 +2490,8 @@ import org.slf4j.LoggerFactory; return isSetTaskId(); case GATEWAY_ID: return isSetGatewayId(); + case TOKEN_ID: + return isSetTokenId(); } throw new IllegalStateException(); } @@ -2380,6 +2536,15 @@ import org.slf4j.LoggerFactory; return false; } + boolean this_present_tokenId = true && this.isSetTokenId(); + boolean that_present_tokenId = true && that.isSetTokenId(); + if (this_present_tokenId || that_present_tokenId) { + if (!(this_present_tokenId && that_present_tokenId)) + return false; + if (!this.tokenId.equals(that.tokenId)) + return false; + } + return true; } @@ -2426,6 +2591,16 @@ import org.slf4j.LoggerFactory; return lastComparison; } } + lastComparison = Boolean.valueOf(isSetTokenId()).compareTo(other.isSetTokenId()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetTokenId()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.tokenId, other.tokenId); + if (lastComparison != 0) { + return lastComparison; + } + } return 0; } @@ -2469,6 +2644,14 @@ import org.slf4j.LoggerFactory; sb.append(this.gatewayId); } first = false; + if (!first) sb.append(", "); + sb.append("tokenId:"); + if (this.tokenId == null) { + sb.append("null"); + } else { + sb.append(this.tokenId); + } + first = false; sb.append(")"); return sb.toString(); } @@ -2484,6 +2667,9 @@ import org.slf4j.LoggerFactory; if (gatewayId == null) { throw new org.apache.thrift.protocol.TProtocolException("Required field 'gatewayId' was not present! Struct: " + toString()); } + if (tokenId == null) { + throw new org.apache.thrift.protocol.TProtocolException("Required field 'tokenId' was not present! Struct: " + toString()); + } // check for sub-struct validity } @@ -2545,6 +2731,14 @@ import org.slf4j.LoggerFactory; org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; + case 4: // TOKEN_ID + if (schemeField.type == org.apache.thrift.protocol.TType.STRING) { + struct.tokenId = iprot.readString(); + struct.setTokenIdIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; default: org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } @@ -2575,6 +2769,11 @@ import org.slf4j.LoggerFactory; oprot.writeString(struct.gatewayId); oprot.writeFieldEnd(); } + if (struct.tokenId != null) { + oprot.writeFieldBegin(TOKEN_ID_FIELD_DESC); + oprot.writeString(struct.tokenId); + oprot.writeFieldEnd(); + } oprot.writeFieldStop(); oprot.writeStructEnd(); } @@ -2595,6 +2794,7 @@ import org.slf4j.LoggerFactory; oprot.writeString(struct.experimentId); oprot.writeString(struct.taskId); oprot.writeString(struct.gatewayId); + oprot.writeString(struct.tokenId); } @Override @@ -2606,6 +2806,8 @@ import org.slf4j.LoggerFactory; struct.setTaskIdIsSet(true); struct.gatewayId = iprot.readString(); struct.setGatewayIdIsSet(true); + struct.tokenId = iprot.readString(); + struct.setTokenIdIsSet(true); } } http://git-wip-us.apache.org/repos/asf/airavata/blob/43c313f9/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java index 9b282db..8e17a38 100644 --- a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java +++ b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java @@ -237,12 +237,12 @@ public class GfacServerHandler implements GfacService.Iface, Watcher { * @param taskId * @param gatewayId */ - public boolean submitJob(String experimentId, String taskId, String gatewayId) throws TException { + public boolean submitJob(String experimentId, String taskId, String gatewayId, String tokenId) throws TException { requestCount++; logger.info("-----------------------------------------------------" + requestCount + "-----------------------------------------------------"); logger.infoId(experimentId, "GFac Received submit job request for the Experiment: {} TaskId: {}", experimentId, taskId); GFac gfac = getGfac(); - InputHandlerWorker inputHandlerWorker = new InputHandlerWorker(gfac, experimentId, taskId, gatewayId); + InputHandlerWorker inputHandlerWorker = new InputHandlerWorker(gfac, experimentId, taskId, gatewayId, tokenId); // try { // if( gfac.submitJob(experimentId, taskId, gatewayId)){ logger.debugId(experimentId, "Submitted job to the Gfac Implementation, experiment {}, task {}, gateway " + @@ -254,11 +254,11 @@ public class GfacServerHandler implements GfacService.Iface, Watcher { return true; } - public boolean cancelJob(String experimentId, String taskId, String gatewayId) throws TException { + public boolean cancelJob(String experimentId, String taskId, String gatewayId, String tokenId) throws TException { logger.infoId(experimentId, "GFac Received cancel job request for Experiment: {} TaskId: {} ", experimentId, taskId); GFac gfac = getGfac(); try { - if (gfac.cancel(experimentId, taskId, gatewayId)) { + if (gfac.cancel(experimentId, taskId, gatewayId, tokenId)) { logger.debugId(experimentId, "Successfully cancelled job, experiment {} , task {}", experimentId, taskId); return true; } else { @@ -375,7 +375,7 @@ public class GfacServerHandler implements GfacService.Iface, Watcher { try { GFacUtils.createExperimentEntryForPassive(event.getExperimentId(), event.getTaskId(), zk, experimentNode, nodeName, event.getTokenId(), message.getDeliveryTag()); AiravataZKUtils.getExpStatePath(event.getExperimentId()); - submitJob(event.getExperimentId(), event.getTaskId(), event.getGatewayId()); + submitJob(event.getExperimentId(), event.getTaskId(), event.getGatewayId(), event.getTokenId()); } catch (KeeperException e) { logger.error(nodeName + " was interrupted."); rabbitMQTaskLaunchConsumer.sendAck(message.getDeliveryTag()); @@ -395,7 +395,7 @@ public class GfacServerHandler implements GfacService.Iface, Watcher { TBase messageEvent = message.getEvent(); byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent); ThriftUtils.createThriftFromBytes(bytes, event); - cancelJob(event.getExperimentId(), event.getTaskId(), event.getGatewayId()); + cancelJob(event.getExperimentId(), event.getTaskId(), event.getGatewayId(), event.getTokenId()); System.out.println(" Message Received with message id '" + message.getMessageId() + "' and with message type '" + message.getType()); } catch (TException e) { http://git-wip-us.apache.org/repos/asf/airavata/blob/43c313f9/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java index d5e623e..6eeef28 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java @@ -209,10 +209,11 @@ public class BetterGfacImpl implements GFac,Watcher { * @return * @throws GFacException */ - public boolean submitJob(String experimentID, String taskID, String gatewayID) throws GFacException { + public boolean submitJob(String experimentID, String taskID, String gatewayID, String tokenId) throws GFacException { JobExecutionContext jobExecutionContext = null; try { jobExecutionContext = createJEC(experimentID, taskID, gatewayID); + jobExecutionContext.setCredentialStoreToken(tokenId); return submitJob(jobExecutionContext); } catch (Exception e) { log.error("Error inovoking the job with experiment ID: " + experimentID + ":"+e.getMessage()); @@ -316,7 +317,6 @@ public class BetterGfacImpl implements GFac,Watcher { jobExecutionContext.setProperty(Constants.PROP_TOPIC, experimentID); jobExecutionContext.setGfac(this); jobExecutionContext.setZk(zk); - jobExecutionContext.setCredentialStoreToken(AiravataZKUtils.getExpTokenId(zk, experimentID)); // handle job submission protocol List<JobSubmissionInterface> jobSubmissionInterfaces = computeResource.getJobSubmissionInterfaces(); @@ -550,10 +550,11 @@ public class BetterGfacImpl implements GFac,Watcher { } } - public boolean cancel(String experimentID, String taskID, String gatewayID) throws GFacException { + public boolean cancel(String experimentID, String taskID, String gatewayID, String tokenId) throws GFacException { JobExecutionContext jobExecutionContext = null; try { jobExecutionContext = createJEC(experimentID, taskID, gatewayID); + jobExecutionContext.setCredentialStoreToken(tokenId); return cancel(jobExecutionContext); } catch (Exception e) { GFacUtils.saveErrorDetails(jobExecutionContext, e.getCause().toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR); http://git-wip-us.apache.org/repos/asf/airavata/blob/43c313f9/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFac.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFac.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFac.java index 10656b7..5cf9d00 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFac.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFac.java @@ -37,7 +37,7 @@ public interface GFac { * @return boolean Successful acceptence of the jobExecution returns a true value * @throws org.apache.airavata.gfac.GFacException */ - public boolean submitJob(String experimentID,String taskID, String gatewayID) throws GFacException; + public boolean submitJob(String experimentID,String taskID, String gatewayID, String tokenId) throws GFacException; /** * This method can be used in a handler to ivvoke outhandler asynchronously @@ -58,6 +58,6 @@ public interface GFac { * @return Successful cancellation will return true * @throws GFacException */ - public boolean cancel(String experimentID, String taskID, String gatewayID)throws GFacException; + public boolean cancel(String experimentID, String taskID, String gatewayID, String tokenId)throws GFacException; } http://git-wip-us.apache.org/repos/asf/airavata/blob/43c313f9/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java index 434f6d4..1e9e212 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java @@ -1166,13 +1166,7 @@ public class GFacUtils { zk.create(newExpNode, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - Stat expParent = zk.exists(newExpNode, false); - if (tokenId != null && expParent != null) { - zk.setData(newExpNode, tokenId.getBytes(), - expParent.getVersion()); - } - String token = AiravataZKUtils.getExpTokenId(zk, experimentID); String s = zk.create(newExpNode + File.separator + "state", String .valueOf(GfacExperimentState.LAUNCHED.getValue()) .getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, http://git-wip-us.apache.org/repos/asf/airavata/blob/43c313f9/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/InputHandlerWorker.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/InputHandlerWorker.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/InputHandlerWorker.java index 24d8b75..36caf76 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/InputHandlerWorker.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/InputHandlerWorker.java @@ -20,35 +20,31 @@ */ package org.apache.airavata.gfac.core.utils; -import org.apache.airavata.gfac.GFacException; -import org.apache.airavata.gfac.core.context.JobExecutionContext; import org.apache.airavata.gfac.core.cpi.GFac; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.concurrent.Callable; - public class InputHandlerWorker implements Runnable { private static Logger log = LoggerFactory.getLogger(InputHandlerWorker.class); String experimentId; - String taskId; - String gatewayId; + String tokenId; GFac gfac; - public InputHandlerWorker(GFac gfac, String experimentId,String taskId,String gatewayId) { + public InputHandlerWorker(GFac gfac, String experimentId,String taskId,String gatewayId, String tokenId) { this.gfac = gfac; this.experimentId = experimentId; this.taskId = taskId; this.gatewayId = gatewayId; + this.tokenId = tokenId; } @Override public void run() { try { - gfac.submitJob(experimentId, taskId, gatewayId); + gfac.submitJob(experimentId, taskId, gatewayId, tokenId); } catch (Exception e) { log.error(e.getMessage(), e); } http://git-wip-us.apache.org/repos/asf/airavata/blob/43c313f9/modules/gfac/gfac-thrift-descriptions/gfac.cpi.service.thrift ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-thrift-descriptions/gfac.cpi.service.thrift b/modules/gfac/gfac-thrift-descriptions/gfac.cpi.service.thrift index 30f19a6..93d62c7 100644 --- a/modules/gfac/gfac-thrift-descriptions/gfac.cpi.service.thrift +++ b/modules/gfac/gfac-thrift-descriptions/gfac.cpi.service.thrift @@ -46,7 +46,8 @@ service GfacService { **/ bool submitJob (1: required string experimentId, 2: required string taskId - 3: required string gatewayId) + 3: required string gatewayId, + 4: required string tokenId) /** * @@ -62,5 +63,6 @@ service GfacService { **/ bool cancelJob (1: required string experimentId, 2: required string taskId, - 3: required string gatewayId) + 3: required string gatewayId, + 4: required string tokenId) } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/airavata/blob/43c313f9/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACEmbeddedJobSubmitter.java ---------------------------------------------------------------------- diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACEmbeddedJobSubmitter.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACEmbeddedJobSubmitter.java index 9f4d919..d6fe40f 100644 --- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACEmbeddedJobSubmitter.java +++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACEmbeddedJobSubmitter.java @@ -84,7 +84,7 @@ public class GFACEmbeddedJobSubmitter implements JobSubmitter { if(gatewayId == null || gatewayId.isEmpty()){ gatewayId = ServerSettings.getDefaultUserGateway(); } - return gfac.submitJob(experimentID, taskID, gatewayId); + return gfac.submitJob(experimentID, taskID, gatewayId, tokenId); } catch (Exception e) { String error = "Error launching the job : " + experimentID; logger.error(error); http://git-wip-us.apache.org/repos/asf/airavata/blob/43c313f9/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 32a5e69..5ebefcd 100644 --- a/pom.xml +++ b/pom.xml @@ -532,7 +532,7 @@ <module>modules/test-suite</module> <module>modules/distribution</module> <module>modules/messaging</module> - <module>modules/integration-tests</module> + <!--module>modules/integration-tests</module--> <module>modules/workflow</module> <module>modules/xbaya-gui</module> </modules>
