Revision: 6423 Author: [email protected] Date: Tue Oct 20 09:01:37 2009 Log: A simple API for exchanging requests and responses. It will be used for communication between the DevModeServer and remote Viewers.
Review by: mmendez, rdayal (pair prog) http://code.google.com/p/google-web-toolkit/source/detail?r=6423 Added: /branches/remote-ui-communication/dev/core/src/com/google/gwt/dev/shell/remoteui/MessageTransport.java /branches/remote-ui-communication/dev/core/src/com/google/gwt/dev/shell/remoteui/RequestProcessor.java /branches/remote-ui-communication/dev/core/test/com/google/gwt/dev/shell/remoteui /branches/remote-ui-communication/dev/core/test/com/google/gwt/dev/shell/remoteui/MessageTransportTest.java ======================================= --- /dev/null +++ /branches/remote-ui-communication/dev/core/src/com/google/gwt/dev/shell/remoteui/MessageTransport.java Tue Oct 20 09:01:37 2009 @@ -0,0 +1,335 @@ +/* + * Copyright 2009 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.gwt.dev.shell.remoteui; + +import com.google.gwt.dev.shell.remoteui.RemoteMessageProto.Message; +import com.google.gwt.dev.shell.remoteui.RemoteMessageProto.Message.Request; +import com.google.gwt.dev.shell.remoteui.RemoteMessageProto.Message.Response; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +/** + * Responsible for exchanging requests and responses between services. + */ +public class MessageTransport { + class PendingRequest extends PendingSend { + private final ReentrantLock lock = new ReentrantLock(); + private final Condition availableResponseCondition = lock.newCondition(); + private Response responseMessage; + private Exception exception; + private final Message message; + + public PendingRequest(Message message) { + this.message = message; + } + + @Override + public void failed(Exception e) { + pendingRequestMap.remove(message.getRequest().getRequestId()); + + lock.lock(); + try { + exception = e; + availableResponseCondition.signal(); + } finally { + lock.unlock(); + } + } + + @Override + public void send(OutputStream outputStream) throws IOException { + Request request = message.getRequest(); + int requestId = request.getRequestId(); + pendingRequestMap.put(requestId, this); + message.writeDelimitedTo(outputStream); + } + + /** + * Sets the response that was received from the server, and signals the + * thread that is waiting on the response. + * + * @param responseMessage the server's response + * @throws InterruptedException + */ + public void setResponse(Message.Response responseMessage) + throws InterruptedException { + assert (responseMessage != null); + lock.lock(); + try { + if (this.responseMessage != null) { + throw new IllegalStateException("Response has already been set."); + } + this.responseMessage = responseMessage; + availableResponseCondition.signal(); + } finally { + lock.unlock(); + } + } + + /** + * Waits for a response to be returned for a given request. + * + * @return the response from the server + * @throws Exception if an exception occurred while processing the request + */ + public Response waitForResponse() throws Exception { + lock.lock(); + + try { + while (responseMessage == null && exception == null) { + availableResponseCondition.await(); + } + + if (exception != null) { + throw exception; + } + + return responseMessage; + } finally { + lock.unlock(); + } + } + } + + static class PendingRequestMap { + private final Lock mapLock = new ReentrantLock(); + private final Map<Integer, PendingRequest> requestIdToPendingServerRequest = new HashMap<Integer, PendingRequest>(); + private boolean noMoreAdds; + + public void blockAdds(Exception e) { + mapLock.lock(); + try { + noMoreAdds = true; + for (PendingRequest pendingRequest : requestIdToPendingServerRequest.values()) { + pendingRequest.failed(e); + } + } finally { + mapLock.unlock(); + } + } + + public PendingRequest remove(int requestId) { + mapLock.lock(); + try { + return requestIdToPendingServerRequest.remove(requestId); + } finally { + mapLock.unlock(); + } + } + + void put(int requestId, PendingRequest pendingServerRequest) { + mapLock.lock(); + try { + if (noMoreAdds) { + pendingServerRequest.failed(new IllegalStateException( + "InputStream is closed")); + } else { + requestIdToPendingServerRequest.put(requestId, pendingServerRequest); + } + } finally { + mapLock.unlock(); + } + } + } + + class PendingResponse extends PendingSend { + Message message; + + public PendingResponse(Message message) { + this.message = message; + } + + @Override + public void failed(Exception e) { + // Do nothing + } + + @Override + public void send(OutputStream outputStream) throws IOException { + message.writeDelimitedTo(outputStream); + } + } + + abstract class PendingSend { + public abstract void failed(Exception e); + + public abstract void send(OutputStream outputStream) throws IOException; + } + + private static final int DEFAULT_SERVICE_THREADS = 2; + + private final Thread messageProcessingThread; + private final AtomicInteger nextMessageId = new AtomicInteger(); + private final RequestProcessor requestProcessor; + private final LinkedBlockingQueue<PendingSend> sendQueue = new LinkedBlockingQueue<PendingSend>(); + private final Thread sendThread; + private final ExecutorService serverRequestExecutor; + private final PendingRequestMap pendingRequestMap = new PendingRequestMap(); + + /** + * Create a new instance using the given streams and request processor. + * Closing either stream will cause the termination of the transport. + * + * @param inputStream an input stream for reading messages + * @param outputStream an output stream for writing messages + * @param requestProcessor a callback interface for handling remote client + * requests + */ + public MessageTransport(final InputStream inputStream, + final OutputStream outputStream, RequestProcessor requestProcessor) { + this.requestProcessor = requestProcessor; + serverRequestExecutor = Executors.newFixedThreadPool(DEFAULT_SERVICE_THREADS); + + // This thread terminates on interruption or IO failure + messageProcessingThread = new Thread(new Runnable() { + public void run() { + try { + while (true) { + Message message = Message.parseDelimitedFrom(inputStream); + processMessage(message); + } + } catch (IOException e) { + terminateDueToException(e); + } catch (InterruptedException e) { + terminateDueToException(e); + } + } + }); + messageProcessingThread.start(); + + // This thread only terminates if it is interrupted + sendThread = new Thread(new Runnable() { + public void run() { + while (true) { + try { + PendingSend pendingSend = sendQueue.take(); + try { + pendingSend.send(outputStream); + } catch (IOException e) { + pendingSend.failed(e); + } + } catch (InterruptedException e) { + break; + } + } + } + }); + sendThread.setDaemon(true); + sendThread.start(); + } + + /** + * Asynchronously executes the request on a remote server. + * + * @param requestMessage The request to execute + * + * @return a {...@link Future} that can be used to access the server's response + */ + public Future<Response> executeRequestAsync(final Request requestMessage) { + Future<Response> responseFuture = serverRequestExecutor.submit(new Callable<Response>() { + public Response call() throws Exception { + int requestId = nextMessageId.getAndIncrement(); + + Message.Request.Builder requestBuilder = Message.Request.newBuilder(requestMessage); + requestBuilder.setRequestId(requestId); + + Message.Builder messageBuilder = Message.newBuilder(); + messageBuilder.setMessageType(Message.MessageType.REQUEST); + messageBuilder.setRequest(requestBuilder); + + Message message = messageBuilder.build(); + PendingRequest pendingRequest = new PendingRequest(message); + sendQueue.put(pendingRequest); + + return pendingRequest.waitForResponse(); + } + }); + + return responseFuture; + } + + private void processClientRequest(int requestId, Request request) + throws InterruptedException { + Message.Builder messageBuilder = Message.newBuilder(); + messageBuilder.setMessageType(Message.MessageType.RESPONSE); + + Response response = null; + try { + response = requestProcessor.execute(request); + } catch (Exception e) { + // TODO: Write error information into the builder and set the request Id + return; + } + + // This would not be necessary if the request id was not part of request + // or response. + Response.Builder responseBuilder = Response.newBuilder(response); + responseBuilder.setRequestId(requestId); + + messageBuilder.setResponse(responseBuilder); + PendingResponse pendingResponse = new PendingResponse( + messageBuilder.build()); + sendQueue.put(pendingResponse); + } + + private void processMessage(final Message message) + throws InterruptedException { + switch (message.getMessageType()) { + case RESPONSE: { + processServerResponse(message.getResponse().getRequestId(), + message.getResponse()); + break; + } + + case REQUEST: { + processClientRequest(message.getRequest().getRequestId(), + message.getRequest()); + break; + } + + default: { + // TODO: Return a response indicating that the message type + // is unknown + break; + } + } + } + + private void processServerResponse(int requestId, Response response) + throws InterruptedException { + PendingRequest pendingServerRequest = pendingRequestMap.remove(requestId); + if (pendingServerRequest != null) { + pendingServerRequest.setResponse(response); + } + } + + private void terminateDueToException(Exception e) { + pendingRequestMap.blockAdds(e); + } +} ======================================= --- /dev/null +++ /branches/remote-ui-communication/dev/core/src/com/google/gwt/dev/shell/remoteui/RequestProcessor.java Tue Oct 20 09:01:37 2009 @@ -0,0 +1,36 @@ +/* + * Copyright 2009 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.gwt.dev.shell.remoteui; + +import com.google.gwt.dev.shell.remoteui.RemoteMessageProto.Message.Request; +import com.google.gwt.dev.shell.remoteui.RemoteMessageProto.Message.Response; + +/** + * Handles in-bound client requests. + */ +public interface RequestProcessor { + + /** + * Execute the given client request. + * + * @param request The request to execute. + * + * @return the response resulting from executing the request + * + * @throws Exception if a problem occurred while executing the request + */ + Response execute(Request request) throws Exception; +} ======================================= --- /dev/null +++ /branches/remote-ui-communication/dev/core/test/com/google/gwt/dev/shell/remoteui/MessageTransportTest.java Tue Oct 20 09:01:37 2009 @@ -0,0 +1,360 @@ +package com.google.gwt.dev.shell.remoteui; + +import com.google.gwt.dev.shell.remoteui.RemoteMessageProto.Message; +import com.google.gwt.dev.shell.remoteui.RemoteMessageProto.Message.Request; +import com.google.gwt.dev.shell.remoteui.RemoteMessageProto.Message.Response; + +import junit.framework.TestCase; + +import java.io.IOException; +import java.net.ServerSocket; +import java.net.Socket; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +public class MessageTransportTest extends TestCase { + + private static class MockNetwork { + private final Socket clientSocket; + private final Socket serverSocket; + + private final ServerSocket listenSocket; + + public MockNetwork(Socket clientSocket, Socket serverSocket, + ServerSocket listenSocket) { + this.clientSocket = clientSocket; + this.serverSocket = serverSocket; + this.listenSocket = listenSocket; + } + + public Socket getClientSocket() { + return clientSocket; + } + + public Socket getServerSocket() { + return serverSocket; + } + + public void shutdown() { + try { + clientSocket.close(); + } catch (IOException e) { + // Ignore + } + + try { + serverSocket.close(); + } catch (IOException e) { + // Ignore + } + + try { + listenSocket.close(); + } catch (IOException e) { + // Ignore + } + } + } + + private static MockNetwork createMockNetwork() throws IOException, + InterruptedException, ExecutionException { + final ServerSocket listenSocket = new ServerSocket(0); + ExecutorService executorService = Executors.newFixedThreadPool(1); + Future<Socket> future = executorService.submit(new Callable<Socket>() { + public Socket call() throws Exception { + return listenSocket.accept(); + } + }); + + Socket clientSocket = new Socket("localhost", listenSocket.getLocalPort()); + Socket serverSocket = future.get(); + + return new MockNetwork(clientSocket, serverSocket, listenSocket); + } + + /** + * Tests that a client request is successfully received by the + * RequestProcessor, and the response generated by the RequestProcessor is + * successfully received by the client. + * + * @throws IOException + * @throws ExecutionException + * @throws InterruptedException + */ + public void testClientRequestReceived() throws IOException, + InterruptedException, ExecutionException { + MockNetwork network = createMockNetwork(); + + Message.Request.Builder clientRequestBuilder = Message.Request.newBuilder(); + clientRequestBuilder.setRequestId(25); + clientRequestBuilder.setServiceType(Message.Request.ServiceType.DEV_MODE); + final Message.Request clientRequest = clientRequestBuilder.build(); + + Message.Response.Builder clientResponseBuilder = Message.Response.newBuilder(); + clientResponseBuilder.setRequestId(25); + final Message.Response clientResponse = clientResponseBuilder.build(); + + RequestProcessor requestProcessor = new RequestProcessor() { + public Response execute(Request request) throws Exception { + assertEquals(clientRequest, request); + return clientResponse; + } + }; + + MessageTransport messageTransport = new MessageTransport( + network.getClientSocket().getInputStream(), + network.getClientSocket().getOutputStream(), requestProcessor); + + Message.Builder clientRequestMsgBuilder = Message.newBuilder(); + clientRequestMsgBuilder.setMessageType(Message.MessageType.REQUEST); + clientRequestMsgBuilder.setRequest(clientRequest); + Message clientRequestMsg = clientRequestMsgBuilder.build(); + clientRequestMsg.writeDelimitedTo(network.getServerSocket().getOutputStream()); + + Message receivedResponseMsg = Message.parseDelimitedFrom(network.getServerSocket().getInputStream()); + assertEquals(receivedResponseMsg.getResponse(), clientResponse); + + network.shutdown(); + } + + /** + * Tests that a client request is successfully received by the + * RequestProcessor, and the exception thrown by the RequestProcessor is + * passed back in the form of an error response to the client. + * + * @throws IOException + * @throws ExecutionException + * @throws InterruptedException + */ + public void testClientRequestReceivedHandlerThrowsException() { + // TODO: Implement once we add error message information to the protobufs + // fail("Not yet implemented"); + } + + /** + * Tests that sending an async request to a server when the sending stream is + * closed will result in: + * + * 1) A rejection of the request to the executor 2) An ExecutionException on a + * call to future.get() + * + * @throws ExecutionException + * @throws InterruptedException + * @throws IOException + */ + public void testExecuteAsyncRequestWithClosedSendStream() throws IOException, + InterruptedException, ExecutionException { + MockNetwork network = createMockNetwork(); + + RequestProcessor requestProcessor = new RequestProcessor() { + public Response execute(Request request) throws Exception { + fail("Should not reach here."); + return null; + } + }; + + MessageTransport messageTransport = new MessageTransport( + network.getClientSocket().getInputStream(), + network.getClientSocket().getOutputStream(), requestProcessor); + + Message.Request.Builder requestMessageBuilder = Message.Request.newBuilder(); + // FIXME: We are assuming what the request id is, based on knowledge of + // MessageTransport's implementation. We need to provide a testing-only + // method that allows us to specify (or peek at) the request id. + requestMessageBuilder.setRequestId(0); + requestMessageBuilder.setServiceType(Message.Request.ServiceType.DEV_MODE); + Message.Request request = requestMessageBuilder.build(); + + // Close the server's input stream; that will close the client's output + // stream + network.getServerSocket().getInputStream().close(); + + Future<Response> responseFuture = null; + responseFuture = messageTransport.executeRequestAsync(request); + assertNotNull(responseFuture); + + try { + responseFuture.get(2, TimeUnit.SECONDS); + fail("Should have thrown an exception"); + } catch (TimeoutException te) { + fail("Should not have timed out"); + } catch (ExecutionException e) { + assertTrue(e.getCause() instanceof IllegalStateException); + } catch (Exception e) { + fail("Should not have thrown any other exception"); + } + + network.shutdown(); + } + + /** + * Tests that an async request to a remote server is successfully sent, and + * the server's response is successfully received. + */ + public void testExecuteRequestAsync() throws InterruptedException, + ExecutionException, IOException, TimeoutException { + + MockNetwork network = createMockNetwork(); + + RequestProcessor requestProcessor = new RequestProcessor() { + public Response execute(Request request) throws Exception { + fail("Should not reach here."); + return null; + } + }; + + MessageTransport messageTransport = new MessageTransport( + network.getClientSocket().getInputStream(), + network.getClientSocket().getOutputStream(), requestProcessor); + + Message.Request.Builder requestMessageBuilder = Message.Request.newBuilder(); + // FIXME: We are assuming what the request id is, based on knowledge of + // MessageTransport's implementation. We need to provide a testing-only + // method that allows us to specify (or peek at) the request id. + requestMessageBuilder.setRequestId(0); + requestMessageBuilder.setServiceType(Message.Request.ServiceType.DEV_MODE); + Message.Request request = requestMessageBuilder.build(); + + Future<Response> responseFuture = messageTransport.executeRequestAsync(request); + assertNotNull(responseFuture); + + Message receivedRequest = Message.parseDelimitedFrom(network.getServerSocket().getInputStream()); + assertEquals(receivedRequest.getRequest(), request); + + Message.Response.Builder responseBuilder = Message.Response.newBuilder(); + // FIXME: We are assuming what the request id is, based on knowledge of + // MessageTransport's implementation. We need to provide a testing-only + // method that allows us to specify (or peek at) the request id. + responseBuilder.setRequestId(0); + Message.Response response = responseBuilder.build(); + + Message.Builder responseMsgBuilder = Message.newBuilder(); + responseMsgBuilder.setMessageType(Message.MessageType.RESPONSE); + responseMsgBuilder.setResponse(response); + Message responseMsg = responseMsgBuilder.build(); + + responseMsg.writeDelimitedTo(network.getServerSocket().getOutputStream()); + assertEquals(responseFuture.get(2, TimeUnit.SECONDS), response); + + network.shutdown(); + } + + /** + * Tests that a future for an async request to a remote server will hang if a + * matching response is not received. + * + * @throws ExecutionException + * @throws InterruptedException + * @throws IOException + */ + public void testExecuteRequestAsyncReceiveUnmatchedResponse() + throws IOException, InterruptedException, ExecutionException { + MockNetwork network = createMockNetwork(); + + RequestProcessor requestProcessor = new RequestProcessor() { + public Response execute(Request request) throws Exception { + fail("Should not reach here."); + return null; + } + }; + + MessageTransport messageTransport = new MessageTransport( + network.getClientSocket().getInputStream(), + network.getClientSocket().getOutputStream(), requestProcessor); + + Message.Request.Builder requestMessageBuilder = Message.Request.newBuilder(); + // FIXME: We are assuming what the request id is, based on knowledge of + // MessageTransport's implementation. We need to provide a testing-only + // method that allows us to specify (or peek at) the request id. + requestMessageBuilder.setRequestId(0); + requestMessageBuilder.setServiceType(Message.Request.ServiceType.DEV_MODE); + Message.Request request = requestMessageBuilder.build(); + + Future<Response> responseFuture = messageTransport.executeRequestAsync(request); + assertNotNull(responseFuture); + + Message receivedRequest = Message.parseDelimitedFrom(network.getServerSocket().getInputStream()); + assertEquals(receivedRequest.getRequest(), request); + + Message.Response.Builder responseBuilder = Message.Response.newBuilder(); + // Unmatched request id + responseBuilder.setRequestId(20); + Message.Response response = responseBuilder.build(); + + Message.Builder responseMsgBuilder = Message.newBuilder(); + responseMsgBuilder.setMessageType(Message.MessageType.RESPONSE); + responseMsgBuilder.setResponse(response); + Message responseMsg = responseMsgBuilder.build(); + + responseMsg.writeDelimitedTo(network.getServerSocket().getOutputStream()); + + try { + responseFuture.get(2, TimeUnit.SECONDS); + fail("Should have thrown an exception"); + } catch (TimeoutException te) { + // This is where we should hit + } + + network.shutdown(); + } + + /** + * Tests that a future for an async request to a remote server will be + * interrupted if the server closes the connection before the response is + * received. + */ + public void testExecuteRequestAsyncWithClosedReceiveStreamBeforeResponse() + throws IOException, InterruptedException, ExecutionException, + TimeoutException { + MockNetwork network = createMockNetwork(); + + RequestProcessor requestProcessor = new RequestProcessor() { + public Response execute(Request request) throws Exception { + fail("Should not reach here."); + return null; + } + }; + + MessageTransport messageTransport = new MessageTransport( + network.getClientSocket().getInputStream(), + network.getClientSocket().getOutputStream(), requestProcessor); + + Message.Request.Builder requestMessageBuilder = Message.Request.newBuilder(); + // FIXME: We are assuming what the request id is, based on knowledge of + // MessageTransport's implementation. We need to provide a testing-only + // method that allows us to specify (or peek at) the request id. + requestMessageBuilder.setRequestId(0); + requestMessageBuilder.setServiceType(Message.Request.ServiceType.DEV_MODE); + Message.Request request = requestMessageBuilder.build(); + + // assertNotNull(response); + + // This will close the client's input stream + network.getServerSocket().getOutputStream().close(); + + try { + Future<Response> response = messageTransport.executeRequestAsync(request); + response.get(2, TimeUnit.SECONDS); + fail("Should have thrown an exception"); + } catch (TimeoutException te) { + fail("Should not have timed out"); + } catch (ExecutionException e) { + // This is where we should hit + assertTrue(e.getCause() instanceof IllegalStateException); + } catch (Exception e) { + fail("Should not have thrown any other exception"); + } + + network.shutdown(); + } + + public void testMessageTransport() { + // TODO: Implement tests for the constructor + // fail("Not yet implemented"); + } +} --~--~---------~--~----~------------~-------~--~----~ http://groups.google.com/group/Google-Web-Toolkit-Contributors -~----------~----~----~----~------~----~------~--~---
