Author: johnh
Date: Mon Jun 21 17:52:27 2010
New Revision: 956654

URL: http://svn.apache.org/viewvc?rev=956654&view=rev
Log:
Patch by Vikas Arora.                                                        
                                                                             
Allow concat servlet to fetch resources concurrently. This behavior is 
configured by the injection of a @Named Executor.
                                                                             
Detail from original CL (http://codereview.appspot.com/1681044/show):
Employs Java (1.5) Concurrent constructs 'Executor', 'FutureTask', 'Callable' 
to create a FutureTask of Callable (HttpRequest) object and submits this future 
Task to the appropriate Executor. The default Executor implementation is a 
sequential one and can be 'binded' to appropriate 'Threded' implementation in 
the GuiceModule.                                                           


Modified:
    
shindig/trunk/java/gadgets/src/main/java/org/apache/shindig/gadgets/servlet/ConcatProxyServlet.java
    
shindig/trunk/java/gadgets/src/test/java/org/apache/shindig/gadgets/servlet/ConcatProxyServletTest.java

Modified: 
shindig/trunk/java/gadgets/src/main/java/org/apache/shindig/gadgets/servlet/ConcatProxyServlet.java
URL: 
http://svn.apache.org/viewvc/shindig/trunk/java/gadgets/src/main/java/org/apache/shindig/gadgets/servlet/ConcatProxyServlet.java?rev=956654&r1=956653&r2=956654&view=diff
==============================================================================
--- 
shindig/trunk/java/gadgets/src/main/java/org/apache/shindig/gadgets/servlet/ConcatProxyServlet.java
 (original)
+++ 
shindig/trunk/java/gadgets/src/main/java/org/apache/shindig/gadgets/servlet/ConcatProxyServlet.java
 Mon Jun 21 17:52:27 2010
@@ -19,11 +19,14 @@
 package org.apache.shindig.gadgets.servlet;
 
 import com.google.inject.Inject;
+import com.google.inject.name.Named;
 
 import org.apache.commons.lang.StringEscapeUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.shindig.common.servlet.HttpUtil;
 import org.apache.shindig.common.servlet.InjectedServlet;
+import org.apache.shindig.common.Pair;
+import org.apache.shindig.common.Pairs;
 import org.apache.shindig.common.uri.Uri;
 import org.apache.shindig.common.uri.UriBuilder;
 import org.apache.shindig.gadgets.GadgetException;
@@ -36,6 +39,12 @@ import org.apache.shindig.gadgets.uri.Co
 import org.apache.shindig.gadgets.uri.UriCommon.Param;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.FutureTask;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 import java.util.regex.Pattern;
@@ -60,11 +69,18 @@ public class ConcatProxyServlet extends 
 
   private static final Logger LOG 
       = Logger.getLogger(ConcatProxyServlet.class.getName());
-
+  
   private RequestPipeline requestPipeline;
   private ConcatUriManager concatUriManager;
   private ResponseRewriterRegistry contentRewriterRegistry;
 
+  private Executor executor = new Executor() {
+    public void execute(Runnable r) {
+      // Sequential version of 'execute' by default.
+      r.run();
+    }
+  };
+
   @Inject
   public void setRequestPipeline(RequestPipeline requestPipeline) {
     this.requestPipeline = requestPipeline;
@@ -79,6 +95,13 @@ public class ConcatProxyServlet extends 
   public void setContentRewriterRegistry(ResponseRewriterRegistry 
contentRewriterRegistry) {
     this.contentRewriterRegistry = contentRewriterRegistry;
   }
+  
+  @Inject
+  public void setExecutor(@Named("shindig.concat.executor") Executor executor) 
{
+    // Executor is independently named to allow separate configuration of
+    // concat fetch parallelism and other Shindig job execution.
+    this.executor = executor;
+  }
 
   @SuppressWarnings("boxing")
   @Override
@@ -110,6 +133,21 @@ public class ConcatProxyServlet extends 
     response.setHeader("Content-Type", concatType.getMimeType() + "; 
charset=UTF8");
     response.setHeader("Content-Disposition", "attachment;filename=p.txt");
 
+    if (doFetchConcatResources(response, concatUri)) {
+      response.setStatus(HttpResponse.SC_OK);
+    } else {
+      response.setStatus(HttpResponse.SC_BAD_REQUEST);
+    }
+  }
+
+  /**
+   * @param response HttpservletResponse.
+   * @param concatUri URI representing the concatenated list of resources 
requested.
+   * @return false for cases where concat resources could not be fetched, true 
for success cases.
+   * @throws IOException
+   */
+  private boolean doFetchConcatResources(HttpServletResponse response,
+      ConcatUriManager.ConcatUri concatUri) throws IOException {
     // Check for json concat and set output stream.
     ConcatOutputStream cos = null;
     
@@ -122,38 +160,67 @@ public class ConcatProxyServlet extends 
         response.getOutputStream().println(
             formatHttpError(HttpServletResponse.SC_BAD_REQUEST,
                 "Bad json variable name " + jsonVar, null));
-        response.setStatus(HttpServletResponse.SC_BAD_REQUEST);
-        return;
+        return false;
       }
     } else {
       // Standard concat output mode.
       cos = new VerbatimConcatOutputStream(response.getOutputStream());
     }
 
+    List<Pair<Uri, FutureTask<RequestContext>>> futureTasks =
+        new ArrayList<Pair<Uri, FutureTask<RequestContext>>>();
+    
     for (Uri resourceUri : concatUri.getBatch()) {
       try {
         HttpRequest httpReq = concatUri.makeHttpRequest(resourceUri);
-        HttpResponse httpResp = requestPipeline.execute(httpReq);
-        if (contentRewriterRegistry != null) {
-          try {
-            httpResp = contentRewriterRegistry.rewriteHttpResponse(httpReq, 
httpResp);
-          } catch (RewritingException e) {
-            throw new 
GadgetException(GadgetException.Code.INTERNAL_SERVER_ERROR, e,
-                e.getHttpStatusCode());
-          }
-        }
-        cos.output(resourceUri, httpResp);
+        FutureTask<RequestContext> httpFetcher =
+            new FutureTask<RequestContext>(new HttpFetchCallable(httpReq));
+        futureTasks.add(Pairs.newPair(httpReq.getUri(), httpFetcher));
+        executor.execute(httpFetcher);
       } catch (GadgetException ge) {
-        response.setStatus(HttpResponse.SC_BAD_REQUEST);
         if (cos.outputError(resourceUri, ge)) {
           // True returned from outputError indicates a terminal error.
-          return;
+          return false;
         }
       }
     }
-    
-    cos.close();
-    response.setStatus(HttpResponse.SC_OK);
+  
+    for (Pair<Uri, FutureTask<RequestContext>> futureTask : futureTasks) {
+      RequestContext requestCxt = null;
+      try {
+        try {
+          requestCxt = futureTask.two.get();
+        } catch (InterruptedException ie) {
+          throw new 
GadgetException(GadgetException.Code.INTERNAL_SERVER_ERROR, ie);
+        } catch (ExecutionException ee) {
+          throw new 
GadgetException(GadgetException.Code.INTERNAL_SERVER_ERROR, ee);
+        }
+        if (requestCxt.getGadgetException() != null) {
+          throw requestCxt.getGadgetException();
+        }
+        HttpResponse httpResp = requestCxt.getHttpResp();
+        if (httpResp != null) {
+          if (contentRewriterRegistry != null) {
+            try {
+              httpResp = 
contentRewriterRegistry.rewriteHttpResponse(requestCxt.getHttpReq(), 
+                  httpResp);
+            } catch (RewritingException e) {
+              throw new 
GadgetException(GadgetException.Code.INTERNAL_SERVER_ERROR, e,
+                  e.getHttpStatusCode());
+            }
+          }
+          cos.output(futureTask.one, httpResp);
+        } else {
+          return false;
+        }        
+      } catch (GadgetException ge) {
+        if (cos.outputError(futureTask.one, ge)) {
+          return false;
+        }    
+      }
+    }
+    cos.close();    
+    return true;
   }
 
   private static String formatHttpError(int status, String errorMessage, Uri 
uri) {
@@ -277,5 +344,50 @@ public class ConcatProxyServlet extends 
     }
     
   }
+  
+  // Encapsulates the response context of a single resource fetch.
+  private class RequestContext {
+    private HttpRequest httpReq;
+    private HttpResponse httpResp;
+    private GadgetException gadgetException;
+
+    public HttpRequest getHttpReq() {
+      return httpReq;
+    }
+
+    public HttpResponse getHttpResp() {
+      return httpResp;
+    }
+
+    public GadgetException getGadgetException() {
+      return gadgetException;
+    }
+
+    public RequestContext(HttpRequest httpReq, HttpResponse httpResp, 
GadgetException ge) {
+      this.httpReq = httpReq;
+      this.httpResp = httpResp;
+      this.gadgetException = ge;
+    }
+  }
+
+  // Worker class responsible for fetching a single resource.
+  public class HttpFetchCallable implements Callable<RequestContext> {
+    private HttpRequest httpReq;
+
+    public HttpFetchCallable(HttpRequest httpReq) {
+      this.httpReq = httpReq;
+    }
+    
+    public RequestContext call() {
+      HttpResponse httpResp = null;
+      GadgetException gEx = null;
+      try {
+        httpResp = requestPipeline.execute(httpReq);
+      } catch (GadgetException ge){
+        gEx = ge;
+      }
+      return new RequestContext(httpReq, httpResp, gEx);
+    }
+  }  
 }
 

Modified: 
shindig/trunk/java/gadgets/src/test/java/org/apache/shindig/gadgets/servlet/ConcatProxyServletTest.java
URL: 
http://svn.apache.org/viewvc/shindig/trunk/java/gadgets/src/test/java/org/apache/shindig/gadgets/servlet/ConcatProxyServletTest.java?rev=956654&r1=956653&r2=956654&view=diff
==============================================================================
--- 
shindig/trunk/java/gadgets/src/test/java/org/apache/shindig/gadgets/servlet/ConcatProxyServletTest.java
 (original)
+++ 
shindig/trunk/java/gadgets/src/test/java/org/apache/shindig/gadgets/servlet/ConcatProxyServletTest.java
 Mon Jun 21 17:52:27 2010
@@ -37,6 +37,7 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
 import java.util.Map;
+import java.util.concurrent.Executor;
 
 public class ConcatProxyServletTest extends ServletTestFixture {
   private static final String REQUEST_DOMAIN = "example.org";
@@ -57,6 +58,20 @@ public class ConcatProxyServletTest exte
   private final ConcatProxyServlet servlet = new ConcatProxyServlet();
   private TestConcatUriManager uriManager;
   
+  private final Executor sequentialExecutor = new Executor() {
+    public void execute(Runnable r) {
+      // Sequential version of 'execute'.
+      r.run();
+    }
+  }; 
+
+  private final Executor threadedExecutor = new Executor() {
+    public void execute(Runnable r) {
+      // Threaded version of 'execute'.
+      new Thread(r).start();
+    }
+  }; 
+
   @Before
   public void setUp() throws Exception {
     servlet.setRequestPipeline(pipeline);
@@ -104,17 +119,19 @@ public class ConcatProxyServletTest exte
     return '\"' + url + "\":\"" + data +"\",\r\n";
     
   }
-  
+
   /**
-   * Run a concat test
+   * Run a concat test by fetching resources as configured by given Executor
    * @param result - expected concat results
    * @param uris - list of uris to concat
    * @throws Exception
    */
-  private void runConcat(String result, String tok, Uri... uris) throws 
Exception {
+  private void runConcat(Executor exec, String result, String tok, Uri... uris)
+      throws Exception {
     expectRequestWithUris(Lists.newArrayList(uris), tok);
     
     // Run the servlet
+    servlet.setExecutor(exec);
     servlet.doGet(request, recorder);
     verify();
     assertEquals(result, recorder.getResponseAsString());
@@ -124,17 +141,30 @@ public class ConcatProxyServletTest exte
   @Test
   public void testSimpleConcat() throws Exception {
     String results = addComment(SCRT1, URL1.toString()) + 
addComment(SCRT2,URL2.toString());
-    runConcat(results, null, URL1, URL2);
+    runConcat(sequentialExecutor, results, null, URL1, URL2);
   }
 
   @Test
+  public void testSimpleConcatThreaded() throws Exception {
+    String results = addComment(SCRT1, URL1.toString()) + 
addComment(SCRT2,URL2.toString());
+    runConcat(threadedExecutor, results, null, URL1, URL2);
+  }
+  
+  @Test
   public void testThreeConcat() throws Exception {
     String results = addComment(SCRT1, URL1.toString()) + 
addComment(SCRT2,URL2.toString())
         + addComment(SCRT3, URL3.toString());
-    runConcat(results, null, URL1, URL2, URL3);
+    runConcat(sequentialExecutor, results, null, URL1, URL2, URL3);
   }
 
   @Test
+  public void testThreeConcatThreaded() throws Exception {
+    String results = addComment(SCRT1, URL1.toString()) + 
addComment(SCRT2,URL2.toString())
+        + addComment(SCRT3, URL3.toString());
+    runConcat(threadedExecutor, results, null, URL1, URL2, URL3);
+  }
+  
+  @Test
   public void testConcatBadException() throws Exception {
     final Uri URL4 = Uri.parse("http://example.org/4.js";);
 
@@ -178,7 +208,7 @@ public class ConcatProxyServletTest exte
         + addVar(URL1.toString(), SCRT1_ESCAPED)
         + addVar(URL2.toString(), SCRT2_ESCAPED)
         + "};\r\n";
-    runConcat(results, "_js", URL1, URL2);
+    runConcat(sequentialExecutor, results, "_js", URL1, URL2);
   }
 
   @Test
@@ -188,7 +218,7 @@ public class ConcatProxyServletTest exte
         + addVar(URL2.toString(), SCRT2_ESCAPED)
         + addVar(URL3.toString(), SCRT3_ESCAPED)
         + "};\r\n";
-    runConcat(results, "_js", URL1, URL2, URL3);
+    runConcat(sequentialExecutor, results, "_js", URL1, URL2, URL3);
   }
   
   @Test
@@ -213,7 +243,7 @@ public class ConcatProxyServletTest exte
         + addVar(URL1.toString(), SCRT1_ESCAPED)
         + "/* ---- Error 404 (http://example.org/4.js) ---- */\r\n"
         + "};\r\n";
-    runConcat(results, "_js", URL1, URL4);
+    runConcat(sequentialExecutor, results, "_js", URL1, URL4);
   }
   
   @Test


Reply via email to