Hi,

I'm trying to get a grip on composing futures through a small but essential 
problem:

My aim is to asynchronously send out HTTP requests to multiple urls and 
compute the length of the responses whenever they are received. I'm using 
AsyncHttpClient for this purpose. The code is as below. The problem is that 
when the Future.flatMap() method in Driver.java gets called, the iteration 
over the values finds nulls instead of valid integers.

The error message is:

Contacting http://www.yahoo.com
Contacting http://www.google.com
Current value: null
AsyncHttpClient completed. Length=12177
AsyncHttpClient completed. Length=22294

Any idea what's causing this problem?

Best regards,
Shiv

===============================

Client.java
=========
package akka.future;

import java.io.IOException;
import java.util.concurrent.Callable;

import com.ning.http.client.AsyncCompletionHandler;
import com.ning.http.client.AsyncHttpClient;
import com.ning.http.client.Response;

public class Client implements Callable<Integer> {
    private AsyncHttpClient asyncHttpClient;
    private String uri;
    private Integer length = 0;
    
    public Client(String uri) {
        this.uri = uri;
        asyncHttpClient = new AsyncHttpClient();
    }
    
    @Override
    public Integer call() throws IOException {
        System.out.println("Contacting " + uri);
        asyncHttpClient.prepareGet(uri)
            .execute(new AsyncCompletionHandler<Integer>() {

                @Override
                public Integer onCompleted(Response response) throws 
Exception {
                    Integer length = response.getResponseBody().length();
                    System.out.println("AsyncHttpClient completed. Length=" 
+ length);
                    asyncHttpClient.close();
                    return length;
                }
                
                 @Override
                 public void onThrowable(Throwable t) {
                     System.out.println("Something bad happened!");
                     t.printStackTrace();
                     asyncHttpClient.close();
                     length = 0;
                 }
            });
        return length;
    }
}

Driver.java
========
package akka.future;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;

import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import akka.actor.ActorSystem;
import akka.dispatch.Futures;
import akka.dispatch.Mapper;
import akka.dispatch.OnSuccess;
import akka.japi.Function2;

public class LengthComputer {
    
    public void length(String[] uris) throws IOException, 
InterruptedException {
        ActorSystem actorSystem = ActorSystem.create("Driver");
        final ExecutionContext ec = actorSystem.dispatcher();
        
        List<Future<Integer>> futures = new ArrayList<Future<Integer>>();
        for (String uri: uris) {
            Client client = new Client(uri);
            futures.add(Futures.future(client, ec));
        }
        
        Future<Iterable<Integer>> futureSequence = 
Futures.sequence(futures, ec);
        
        Future<Integer> futureMax = futureSequence.flatMap(
                new Mapper<Iterable<Integer>, Future<Integer>>() {
                    @Override
                    public Future<Integer> apply(final Iterable<Integer> 
values) {
                        return Futures.future(new Callable<Integer>() {
                            public Integer call() {
                                Integer max = 0;
                                for (Integer value : values) {
                                    System.out.println("Current value: " + 
value);
                                    if (value > max) {
                                        max = value;
                                    }
                                }
                                return max;
                            }
                        }, ec);
                    }
                }, ec);
        
        
        Thread.sleep(20*1000);
        actorSystem.shutdown();
    }
    
    public static void main(String[] args) {
        String[] uris = { "http://www.yahoo.com";, "http://www.google.com"; };
        Driver driver = new Driver();
        driver.length(bidderUris);
    }
}

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: http://akka.io/faq/
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/groups/opt_out.

Reply via email to