Hi,
I 've encountered a problem when using the RFT service.
I implemented the NotifyCallback interface( method: deliver(*){*} )in
order to check whether my filetransfer jobs finished or failed,etc.
But here comes the problem:
After i run my code , the file transfer finished successfully but i
can not get the OverallStatus's elements in the callback method. It
seems like the method( deliver ) did not perform any action.
i wonder if any one knows the reason or have some suggestion?!
here is my code:
hope you can find something wrong in these lines.
*******************************************************************************************************************
method:runRFT
public int runRFT(String delortrans) throws GT4ClientException {
TransferRequestType transferType = null;
DeleteRequestType deleteRequestType = null;
if ((!delortrans.equals(RFT_TANSFER)) &&
(!delortrans.equals(RFT_DELETE))) {
throw new GT4ClientException("option error: delortrans
should be "
+ "RFT_TANSFER " + RFT_TANSFER + "\n"
+ "RFT_DELETE" + RFT_DELETE);
}
if (PORT == null) {
if (authType.equals (GSIConstants.GSI_TRANSPORT)) {
PORT = "8443";
} else {
PORT ="8080";
}
}
if (authType.equals(GSIConstants.GSI_TRANSPORT )) {
PROTOCOL = "https";
}
String rftFactoryAddress =
PROTOCOL + "://"+ HOST+ ":" + PORT + SERVICE_URL_ROOT
+ "ReliableFileTransferFactoryService";
String rftServiceAddress = PROTOCOL+ "://" + HOST + ":" + PORT +
SERVICE_URL_ROOT + "ReliableFileTransferService";
EndpointReferenceType credEPR = null;
EndpointReferenceType rftepr = null;
//add proxy-init:
try {
credEPR = delegateCredential(HOST, PORT);
if (delortrans == RFT_TANSFER) {
transferType = getTransfer(PATH_TO_FILE, credEPR);
rftepr = createRFT(rftFactoryAddress,
transferType);
} else {
deleteRequestType = getDeleteRequest(PATH_TO_FILE,
credEPR);
rftepr = createRFT(rftFactoryAddress,
deleteRequestType);
}
} catch (Exception e) {
e.printStackTrace();
throw new GT4ClientException("Error: " + e);
}
if (outFileName != null) {
FileWriter writer = null;
try {
writer = new FileWriter(outFileName);
QName qName = new QName("", "RFT_EPR");
writer.write(ObjectSerializer.toString(rftepr,
qName));
} catch(Exception e) {
throw new GT4ClientException("Errot: " + e);
} finally {
if (writer != null) {
try {
writer.close();
} catch(Exception e) {}
}
}
}
try {
rftepr.setAddress(new Address(rftServiceAddress));
ReliableFileTransferPortType rft = rftLocator
.getReliableFileTransferPortTypePort(rftepr);
setSecurity((Stub)rft);
//For secure notifications
subscribe(rft, this);//"this" stands for the
GlobusRFTClient entity(extends BaseWSRFClient implements NotifyCallback)
//System.out.println("Subscribed for overall status");
//End subscription code
Calendar termTime = Calendar.getInstance();
termTime.add (Calendar.MINUTE, TERM_TIME);
SetTerminationTime reqTermTime = new SetTerminationTime();
reqTermTime.setRequestedTerminationTime(termTime);
//System.out.println("Termination time to set: " +
TERM_TIME + " minutes");
SetTerminationTimeResponse termRes =
rft.setTerminationTime(reqTermTime);
StartOutputType startresp = rft.start(new Start());
while (finished < transferCount && transferCount !=0) {
//My program do infinite loop here!!!because the parameter (finished)
isn't changed by the method deliver().
if (failed == transferCount ||
(failed + finished == transferCount)
|| finished == transferCount) {
break;
} else {
Thread.sleep(1000);
}
}
} catch(Exception ee) {
throw new GT4ClientException("Errot: " + ee);
}
/ /check the result
if ((finished == transferCount) && (finished != 0)) {
//System.out.println( "All Transfers are completed");
return TRANSFER_COMPLETE;
}
else if ((failed == transferCount) && (failed != 0)) {
//System.out.println( "All Transfers failed !");
return TRANSFER_FAILED;
}
else if ((failed + finished) == transferCount) {
//System.out.println( "Transfers Done");
return TRANSFER_PART_FAILED;
}
else {
return TRANSFER_DONE; // uncertain condition
}
}
*******************************************************************************************************************
method:subscrib (maybe something wrong in this method! thanks for
your attention)
public void
subscribe(ReliableFileTransferPortType rft, BaseRFTClient client)
throws Exception {
Subscribe request = new Subscribe();
request.setUseNotify(Boolean.TRUE);
if(PROTOCOL.equals("http")) {
consumer = NotificationConsumerManager.getInstance();
} else if (PROTOCOL.equals("https")) {
Map<String, String> properties = new HashMap<String,
String>();
properties.put(ServiceContainer.CLASS,
"org.globus.wsrf.container.GSIServiceContainer");
consumer =
NotificationConsumerManager.getInstance(properties);
}
consumer.startListening();
EndpointReferenceType consumerEPR = null;
ResourceSecurityDescriptor resDesc = new
ResourceSecurityDescriptor();
Vector<AuthMethod> authMethod = new Vector<AuthMethod>();
if(AUTHZ.equalsIgnoreCase("host")) {
ResourcePDPConfig pdpConfig = new ResourcePDPConfig("host");
pdpConfig.setProperty(Authorization.HOST_PREFIX,
HostAuthorization.URL_PROPERTY,endpoint);
ServiceAuthorizationChain authz = new
ServiceAuthorizationChain();
authz.initialize(pdpConfig, "chainName", "someId");
resDesc.setAuthzChain(authz);
} else if(AUTHZ.equalsIgnoreCase("self")) {
resDesc.setAuthz("self");
}
if (PROTOCOL.equals("http")) {
if (authType.equals(Constants.GSI_SEC_MSG)) {
authMethod.add(GSISecureMsgAuthMethod.BOTH);
} else if (authType.equals(Constants.GSI_SEC_CONV)) {
authMethod.add(GSISecureConvAuthMethod.BOTH);
}
} else if (PROTOCOL.equals("https")) {
authMethod.add(GSITransportAuthMethod.BOTH);
}
resDesc.setAuthMethods(authMethod);
consumerEPR = consumer.createNotificationConsumer(
/*new BaseRFTClient()*/client, resDesc);
request.setConsumerReference(consumerEPR);
TopicExpressionType topicExpression = new TopicExpressionType();
topicExpression.setDialect(WSNConstants.SIMPLE_TOPIC_DIALECT);
topicExpression.setValue(RFTConstants.OVERALL_STATUS_RESOURCE);
request.setTopicExpression(topicExpression);
rft.subscribe(request);
}
*******************************************************************************************************************
method: deliver
public void deliver(List topicPath, EndpointReferenceType producer,
Object message) {
ResourcePropertyValueChangeNotificationType changeMessage =
((ResourcePropertyValueChangeNotificationElementType)
message)
.getResourcePropertyValueChangeNotification();
BaseFaultType fault = null;
try {
if (changeMessage != null) {
OverallStatus overallStatus = (OverallStatus)
changeMessage
.getNewValue().get_any()[0].getValueAsType(
RFTConstants.OVERALL_STATUS_RESOURCE,
OverallStatus.class);
System.out.println ("\n Overall status of transfer:");
System.out.println("Finished/Active/Failed/Retrying/Pending");
System.out.print(overallStatus.getTransfersFinished()
+ "/");
System.out.print(overallStatus.getTransfersActive() +
"/");
System.out.print(overallStatus.getTransfersFailed() +
"/");
System.out.print(overallStatus.getTransfersRestarted
() + "/");
System.out.println(overallStatus.getTransfersPending());
if ( finished < overallStatus.getTransfersFinished())
{//i wanna get the execution resualt here
finished = overallStatus.getTransfersFinished();
}
if (failed < overallStatus.getTransfersFailed()) {
failed = overallStatus.getTransfersFailed ();
}
transferCount = overallStatus.getTransfersFinished() +
overallStatus.getTransfersActive()
+ overallStatus.getTransfersFailed() +
overallStatus.getTransfersRestarted ()
+ overallStatus.getTransfersPending();
RFTFaultResourcePropertyType faultRP =
overallStatus.getFault();
if(faultRP != null) {
fault = getFaultFromRP(faultRP);
}
if (fault != null) {
System.err.println("Error:" +
fault.getDescription(0));
}
}
} catch (Exception e) {
System.err.println(e.getMessage());
}
}
I'd appreciate any help!
Thanks very much!