Hi,

I had the same problem. Try creating your NotificationConsumer this way:

---
   consumer = NotificationConsumerManager.getInstance();
   consumer.startListening();

   List<QName> topicPath = new LinkedList<QName>();
   topicPath.add(RFTConstants.OVERALL_STATUS_RESOURCE);

   consumerEPR = m_NotificationConsumerManager
       .createNotificationConsumer(topicPath, client, resDesc);
---

Yours,

Oliver

wtk wrote:
Hi,

I 've encountered a problem when using the RFT service.
I implemented the NotifyCallback interface( method: deliver(*){*} )in order to check whether my filetransfer jobs finished or failed,etc.
But here comes the problem:
After i run my code , the file transfer finished successfully but i can not get the OverallStatus's elements in the callback method. It seems like the method( deliver ) did not perform any action.

i wonder if any one knows the reason or have some suggestion?!

here is my code:

hope you can find something wrong in these lines.
*******************************************************************************************************************
method:runRFT

public int runRFT(String delortrans) throws GT4ClientException {
TransferRequestType transferType = null;
        DeleteRequestType deleteRequestType = null;
if ((!delortrans.equals(RFT_TANSFER)) &&
                (!delortrans.equals(RFT_DELETE))) {
throw new GT4ClientException("option error: delortrans should be "
                    + "RFT_TANSFER " + RFT_TANSFER + "\n"
                    + "RFT_DELETE" + RFT_DELETE);
        }
if (PORT == null) {
            if (authType.equals (GSIConstants.GSI_TRANSPORT)) {
                PORT = "8443";
            } else {
                PORT ="8080";
            }
        }
        if (authType.equals(GSIConstants.GSI_TRANSPORT )) {
            PROTOCOL = "https";
        }
        String rftFactoryAddress =
            PROTOCOL + "://"+ HOST+ ":" + PORT + SERVICE_URL_ROOT
            + "ReliableFileTransferFactoryService";
        String rftServiceAddress = PROTOCOL+ "://" + HOST + ":" + PORT +
            SERVICE_URL_ROOT + "ReliableFileTransferService";
EndpointReferenceType credEPR = null;
        EndpointReferenceType rftepr = null;
        //add proxy-init:
        try {
            credEPR = delegateCredential(HOST, PORT);
            if (delortrans == RFT_TANSFER) {
                transferType = getTransfer(PATH_TO_FILE, credEPR);
                rftepr = createRFT(rftFactoryAddress,
                        transferType);
            } else {
deleteRequestType = getDeleteRequest(PATH_TO_FILE, credEPR);
                rftepr = createRFT(rftFactoryAddress,
                        deleteRequestType);
            }
} catch (Exception e) {
            e.printStackTrace();
            throw new GT4ClientException("Error: " + e);
} if (outFileName != null) { FileWriter writer = null; try { writer = new FileWriter(outFileName);
                QName qName = new QName("", "RFT_EPR");
                writer.write(ObjectSerializer.toString(rftepr,
                                                       qName));
            } catch(Exception e) {
                throw new GT4ClientException("Errot: " + e);
            } finally {
                if (writer != null) {
                    try {
                        writer.close();
                    } catch(Exception e) {}
                }
            }
        }
        try {
            rftepr.setAddress(new Address(rftServiceAddress));
            ReliableFileTransferPortType rft = rftLocator
                    .getReliableFileTransferPortTypePort(rftepr);
            setSecurity((Stub)rft);
//For secure notifications subscribe(rft, this);//"this" stands for the GlobusRFTClient entity(extends BaseWSRFClient implements NotifyCallback) //System.out.println("Subscribed for overall status");
            //End subscription code
            Calendar termTime = Calendar.getInstance();
            termTime.add (Calendar.MINUTE, TERM_TIME);
            SetTerminationTime reqTermTime = new SetTerminationTime();
            reqTermTime.setRequestedTerminationTime(termTime);
//System.out.println("Termination time to set: " + TERM_TIME + " minutes"); SetTerminationTimeResponse termRes = rft.setTerminationTime(reqTermTime);
            StartOutputType startresp = rft.start(new Start());
            while (finished < transferCount && transferCount !=0) {
//My program do infinite loop here!!!because the parameter (finished) isn't changed by the method deliver().
                if (failed == transferCount ||
                        (failed + finished == transferCount)
                        || finished == transferCount) {
                    break;
                } else {
                    Thread.sleep(1000);
                }
            }
        } catch(Exception ee) {
            throw new GT4ClientException("Errot: " + ee);
        }
        / /check the result
        if ((finished == transferCount) && (finished != 0)) {
            //System.out.println( "All Transfers are completed");
            return TRANSFER_COMPLETE;
        }
        else if ((failed == transferCount) && (failed != 0)) {
            //System.out.println( "All Transfers failed !");
            return TRANSFER_FAILED;
        }
        else if ((failed + finished) == transferCount) {
            //System.out.println( "Transfers Done");
            return TRANSFER_PART_FAILED;
        }
        else {
            return TRANSFER_DONE; // uncertain condition
        }
    }
******************************************************************************************************************* method:subscrib (maybe something wrong in this method! thanks for your attention)

 public void
        subscribe(ReliableFileTransferPortType rft, BaseRFTClient client)
        throws Exception {
        Subscribe request = new Subscribe();
        request.setUseNotify(Boolean.TRUE);
        if(PROTOCOL.equals("http")) {
            consumer = NotificationConsumerManager.getInstance();
        } else if (PROTOCOL.equals("https")) {
Map<String, String> properties = new HashMap<String, String>();
            properties.put(ServiceContainer.CLASS,
            "org.globus.wsrf.container.GSIServiceContainer");
consumer = NotificationConsumerManager.getInstance(properties);
        }
        consumer.startListening();
        EndpointReferenceType consumerEPR = null;
ResourceSecurityDescriptor resDesc = new ResourceSecurityDescriptor();
        Vector<AuthMethod> authMethod = new Vector<AuthMethod>();
        if(AUTHZ.equalsIgnoreCase("host")) {
            ResourcePDPConfig pdpConfig = new ResourcePDPConfig("host");
            pdpConfig.setProperty(Authorization.HOST_PREFIX,
                HostAuthorization.URL_PROPERTY,endpoint);
ServiceAuthorizationChain authz = new ServiceAuthorizationChain();
            authz.initialize(pdpConfig, "chainName", "someId");
            resDesc.setAuthzChain(authz);
        } else if(AUTHZ.equalsIgnoreCase("self")) {
            resDesc.setAuthz("self");
        }
        if (PROTOCOL.equals("http")) {
            if (authType.equals(Constants.GSI_SEC_MSG)) {
                authMethod.add(GSISecureMsgAuthMethod.BOTH);
            } else if (authType.equals(Constants.GSI_SEC_CONV)) {
                authMethod.add(GSISecureConvAuthMethod.BOTH);
            }
        } else if (PROTOCOL.equals("https")) {
            authMethod.add(GSITransportAuthMethod.BOTH);
        }
        resDesc.setAuthMethods(authMethod);
        consumerEPR = consumer.createNotificationConsumer(
                /*new BaseRFTClient()*/client, resDesc);
        request.setConsumerReference(consumerEPR);
        TopicExpressionType topicExpression = new TopicExpressionType();
        topicExpression.setDialect(WSNConstants.SIMPLE_TOPIC_DIALECT);
        topicExpression.setValue(RFTConstants.OVERALL_STATUS_RESOURCE);
        request.setTopicExpression(topicExpression);

        rft.subscribe(request);
    }

*******************************************************************************************************************
method: deliver

   public void deliver(List topicPath, EndpointReferenceType producer,
            Object message) {
        ResourcePropertyValueChangeNotificationType changeMessage =
((ResourcePropertyValueChangeNotificationElementType) message)
                .getResourcePropertyValueChangeNotification();
        BaseFaultType fault = null;
        try {
if (changeMessage != null) {

OverallStatus overallStatus = (OverallStatus) changeMessage
                        .getNewValue().get_any()[0].getValueAsType(
                        RFTConstants.OVERALL_STATUS_RESOURCE,
                        OverallStatus.class);
                System.out.println ("\n Overall status of transfer:");
System.out.println("Finished/Active/Failed/Retrying/Pending"); System.out.print(overallStatus.getTransfersFinished() + "/"); System.out.print(overallStatus.getTransfersActive() + "/"); System.out.print(overallStatus.getTransfersFailed() + "/"); System.out.print(overallStatus.getTransfersRestarted () + "/");
                System.out.println(overallStatus.getTransfersPending());
if ( finished < overallStatus.getTransfersFinished()) {//i wanna get the execution resualt here
                    finished = overallStatus.getTransfersFinished();
                }
                if (failed < overallStatus.getTransfersFailed()) {
                    failed = overallStatus.getTransfersFailed ();
                }
transferCount = overallStatus.getTransfersFinished() + overallStatus.getTransfersActive() + overallStatus.getTransfersFailed() + overallStatus.getTransfersRestarted ()
                    + overallStatus.getTransfersPending();
RFTFaultResourcePropertyType faultRP = overallStatus.getFault();
                if(faultRP != null) {
                    fault = getFaultFromRP(faultRP);
                }
                if (fault != null) {
System.err.println("Error:" + fault.getDescription(0));
                }

            }
        } catch (Exception e) {
            System.err.println(e.getMessage());
        }
    }

I'd appreciate any help!
Thanks very much!


--
Dipl.-Phys. Oliver Mangold
High Performance Computing Center Stuttgart (HLRS)
University of Stuttgart
Nobelstraße 19
70569 Stuttgart
Germany
Mail: [EMAIL PROTECTED]
Phone: +49 711 685 87264

Reply via email to