http://git-wip-us.apache.org/repos/asf/avro/blob/da22ffcb/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java ---------------------------------------------------------------------- diff --git a/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java b/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java index a8a2e3d..0d7dfb7 100644 --- a/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java +++ b/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java @@ -61,24 +61,24 @@ import org.slf4j.LoggerFactory; public class NettyTransceiver extends Transceiver { /** If not specified, the default connection timeout will be used (60 sec). */ public static final long DEFAULT_CONNECTION_TIMEOUT_MILLIS = 60 * 1000L; - public static final String NETTY_CONNECT_TIMEOUT_OPTION = + public static final String NETTY_CONNECT_TIMEOUT_OPTION = "connectTimeoutMillis"; public static final String NETTY_TCP_NODELAY_OPTION = "tcpNoDelay"; public static final String NETTY_KEEPALIVE_OPTION = "keepAlive"; public static final boolean DEFAULT_TCP_NODELAY_VALUE = true; - + private static final Logger LOG = LoggerFactory.getLogger(NettyTransceiver.class .getName()); private final AtomicInteger serialGenerator = new AtomicInteger(0); - private final Map<Integer, Callback<List<ByteBuffer>>> requests = + private final Map<Integer, Callback<List<ByteBuffer>>> requests = new ConcurrentHashMap<Integer, Callback<List<ByteBuffer>>>(); - + private final ChannelFactory channelFactory; private final long connectTimeoutMillis; private final ClientBootstrap bootstrap; private final InetSocketAddress remoteAddr; - + volatile ChannelFuture channelFuture; volatile boolean stopping; private final Object channelFutureLock = new Object(); @@ -101,7 +101,7 @@ public class NettyTransceiver extends Transceiver { /** * Creates a NettyTransceiver, and attempts to connect to the given address. - * {@link #DEFAULT_CONNECTION_TIMEOUT_MILLIS} is used for the connection + * {@link #DEFAULT_CONNECTION_TIMEOUT_MILLIS} is used for the connection * timeout. * @param addr the address to connect to. * @throws IOException if an error occurs connecting to the given address. @@ -109,75 +109,75 @@ public class NettyTransceiver extends Transceiver { public NettyTransceiver(InetSocketAddress addr) throws IOException { this(addr, DEFAULT_CONNECTION_TIMEOUT_MILLIS); } - + /** * Creates a NettyTransceiver, and attempts to connect to the given address. * @param addr the address to connect to. - * @param connectTimeoutMillis maximum amount of time to wait for connection - * establishment in milliseconds, or null to use + * @param connectTimeoutMillis maximum amount of time to wait for connection + * establishment in milliseconds, or null to use * {@link #DEFAULT_CONNECTION_TIMEOUT_MILLIS}. * @throws IOException if an error occurs connecting to the given address. */ - public NettyTransceiver(InetSocketAddress addr, + public NettyTransceiver(InetSocketAddress addr, Long connectTimeoutMillis) throws IOException { this(addr, new NioClientSocketChannelFactory( Executors.newCachedThreadPool(new NettyTransceiverThreadFactory( - "Avro " + NettyTransceiver.class.getSimpleName() + " Boss")), + "Avro " + NettyTransceiver.class.getSimpleName() + " Boss")), Executors.newCachedThreadPool(new NettyTransceiverThreadFactory( - "Avro " + NettyTransceiver.class.getSimpleName() + " I/O Worker"))), + "Avro " + NettyTransceiver.class.getSimpleName() + " I/O Worker"))), connectTimeoutMillis); } /** * Creates a NettyTransceiver, and attempts to connect to the given address. - * {@link #DEFAULT_CONNECTION_TIMEOUT_MILLIS} is used for the connection + * {@link #DEFAULT_CONNECTION_TIMEOUT_MILLIS} is used for the connection * timeout. * @param addr the address to connect to. * @param channelFactory the factory to use to create a new Netty Channel. * @throws IOException if an error occurs connecting to the given address. */ - public NettyTransceiver(InetSocketAddress addr, ChannelFactory channelFactory) + public NettyTransceiver(InetSocketAddress addr, ChannelFactory channelFactory) throws IOException { this(addr, channelFactory, buildDefaultBootstrapOptions(null)); } - + /** * Creates a NettyTransceiver, and attempts to connect to the given address. * @param addr the address to connect to. * @param channelFactory the factory to use to create a new Netty Channel. - * @param connectTimeoutMillis maximum amount of time to wait for connection - * establishment in milliseconds, or null to use + * @param connectTimeoutMillis maximum amount of time to wait for connection + * establishment in milliseconds, or null to use * {@link #DEFAULT_CONNECTION_TIMEOUT_MILLIS}. * @throws IOException if an error occurs connecting to the given address. */ - public NettyTransceiver(InetSocketAddress addr, ChannelFactory channelFactory, + public NettyTransceiver(InetSocketAddress addr, ChannelFactory channelFactory, Long connectTimeoutMillis) throws IOException { - this(addr, channelFactory, + this(addr, channelFactory, buildDefaultBootstrapOptions(connectTimeoutMillis)); } - + /** * Creates a NettyTransceiver, and attempts to connect to the given address. - * It is strongly recommended that the {@link #NETTY_CONNECT_TIMEOUT_OPTION} - * option be set to a reasonable timeout value (a Long value in milliseconds) - * to prevent connect/disconnect attempts from hanging indefinitely. It is - * also recommended that the {@link #NETTY_TCP_NODELAY_OPTION} option be set + * It is strongly recommended that the {@link #NETTY_CONNECT_TIMEOUT_OPTION} + * option be set to a reasonable timeout value (a Long value in milliseconds) + * to prevent connect/disconnect attempts from hanging indefinitely. It is + * also recommended that the {@link #NETTY_TCP_NODELAY_OPTION} option be set * to true to minimize RPC latency. * @param addr the address to connect to. * @param channelFactory the factory to use to create a new Netty Channel. - * @param nettyClientBootstrapOptions map of Netty ClientBootstrap options + * @param nettyClientBootstrapOptions map of Netty ClientBootstrap options * to use. * @throws IOException if an error occurs connecting to the given address. */ - public NettyTransceiver(InetSocketAddress addr, ChannelFactory channelFactory, + public NettyTransceiver(InetSocketAddress addr, ChannelFactory channelFactory, Map<String, Object> nettyClientBootstrapOptions) throws IOException { if (channelFactory == null) { throw new NullPointerException("channelFactory is null"); } - + // Set up. this.channelFactory = channelFactory; - this.connectTimeoutMillis = (Long) + this.connectTimeoutMillis = (Long) nettyClientBootstrapOptions.get(NETTY_CONNECT_TIMEOUT_OPTION); bootstrap = new ClientBootstrap(channelFactory); remoteAddr = addr; @@ -195,7 +195,7 @@ public class NettyTransceiver extends Transceiver { }); if (nettyClientBootstrapOptions != null) { - LOG.debug("Using Netty bootstrap options: " + + LOG.debug("Using Netty bootstrap options: " + nettyClientBootstrapOptions); bootstrap.setOptions(nettyClientBootstrapOptions); } @@ -220,19 +220,19 @@ public class NettyTransceiver extends Transceiver { stateLock.readLock().unlock(); } } - + /** - * Creates a Netty ChannelUpstreamHandler for handling events on the + * Creates a Netty ChannelUpstreamHandler for handling events on the * Netty client channel. * @return the ChannelUpstreamHandler to use. */ protected ChannelUpstreamHandler createNettyClientAvroHandler() { return new NettyClientAvroHandler(); } - + /** * Creates the default options map for the Netty ClientBootstrap. - * @param connectTimeoutMillis connection timeout in milliseconds, or null + * @param connectTimeoutMillis connection timeout in milliseconds, or null * if no timeout is desired. * @return the map of Netty bootstrap options. */ @@ -241,25 +241,25 @@ public class NettyTransceiver extends Transceiver { Map<String, Object> options = new HashMap<String, Object>(3); options.put(NETTY_TCP_NODELAY_OPTION, DEFAULT_TCP_NODELAY_VALUE); options.put(NETTY_KEEPALIVE_OPTION, true); - options.put(NETTY_CONNECT_TIMEOUT_OPTION, - connectTimeoutMillis == null ? DEFAULT_CONNECTION_TIMEOUT_MILLIS : + options.put(NETTY_CONNECT_TIMEOUT_OPTION, + connectTimeoutMillis == null ? DEFAULT_CONNECTION_TIMEOUT_MILLIS : connectTimeoutMillis); return options; } - + /** * Tests whether the given channel is ready for writing. * @return true if the channel is open and ready; false otherwise. */ private static boolean isChannelReady(Channel channel) { - return (channel != null) && + return (channel != null) && channel.isOpen() && channel.isBound() && channel.isConnected(); } - + /** - * Gets the Netty channel. If the channel is not connected, first attempts + * Gets the Netty channel. If the channel is not connected, first attempts * to connect. - * NOTE: The stateLock read lock *must* be acquired before calling this + * NOTE: The stateLock read lock *must* be acquired before calling this * method. * @return the Netty channel * @throws IOException if an error occurs connecting the channel. @@ -283,13 +283,13 @@ public class NettyTransceiver extends Transceiver { channelFuture.await(connectTimeoutMillis); } catch (InterruptedException e) { Thread.currentThread().interrupt(); // Reset interrupt flag - throw new IOException("Interrupted while connecting to " + + throw new IOException("Interrupted while connecting to " + remoteAddr); } synchronized(channelFutureLock) { if (!channelFuture.isSuccess()) { - throw new IOException("Error connecting to " + remoteAddr, + throw new IOException("Error connecting to " + remoteAddr, channelFuture.getCause()); } channel = channelFuture.getChannel(); @@ -305,20 +305,20 @@ public class NettyTransceiver extends Transceiver { } return channel; } - + /** * Closes the connection to the remote peer if connected. */ private void disconnect() { disconnect(false, false, null); } - + /** * Closes the connection to the remote peer if connected. * @param awaitCompletion if true, will block until the close has completed. - * @param cancelPendingRequests if true, will drain the requests map and + * @param cancelPendingRequests if true, will drain the requests map and * send an IOException to all Callbacks. - * @param cause if non-null and cancelPendingRequests is true, this Throwable + * @param cause if non-null and cancelPendingRequests is true, this Throwable * will be passed to all Callbacks. */ private void disconnect(boolean awaitCompletion, boolean cancelPendingRequests, @@ -337,7 +337,7 @@ public class NettyTransceiver extends Transceiver { if (channelFutureToCancel != null) { channelFutureToCancel.cancel(); } - + if (stateReadLockHeld) { stateLock.readLock().unlock(); } @@ -354,9 +354,9 @@ public class NettyTransceiver extends Transceiver { channel = null; remote = null; if (cancelPendingRequests) { - // Remove all pending requests (will be canceled after relinquishing + // Remove all pending requests (will be canceled after relinquishing // write lock). - requestsToCancel = + requestsToCancel = new ConcurrentHashMap<Integer, Callback<List<ByteBuffer>>>(requests); requests.clear(); } @@ -367,17 +367,17 @@ public class NettyTransceiver extends Transceiver { } stateLock.writeLock().unlock(); } - + // Cancel any pending requests by sending errors to the callbacks: if ((requestsToCancel != null) && !requestsToCancel.isEmpty()) { LOG.debug("Removing " + requestsToCancel.size() + " pending request(s)."); for (Callback<List<ByteBuffer>> request : requestsToCancel.values()) { request.handleError( - cause != null ? cause : + cause != null ? cause : new IOException(getClass().getSimpleName() + " closed")); } } - + // Close the channel: if (channelToClose != null) { ChannelFuture closeFuture = channelToClose.close(); @@ -391,35 +391,35 @@ public class NettyTransceiver extends Transceiver { } } } - + /** * Netty channels are thread-safe, so there is no need to acquire locks. * This method is a no-op. */ @Override public void lockChannel() { - + } - + /** * Netty channels are thread-safe, so there is no need to acquire locks. * This method is a no-op. */ @Override public void unlockChannel() { - + } /** * Closes this transceiver and disconnects from the remote peer. - * Cancels all pending RPCs, sends an IOException to all pending callbacks, + * Cancels all pending RPCs, sends an IOException to all pending callbacks, * and blocks until the close has completed. */ @Override public void close() { close(true); } - + /** * Closes this transceiver and disconnects from the remote peer. * Cancels all pending RPCs and sends an IOException to all pending callbacks. @@ -450,7 +450,7 @@ public class NettyTransceiver extends Transceiver { * Override as non-synchronized method because the method is thread safe. */ @Override - public List<ByteBuffer> transceive(List<ByteBuffer> request) + public List<ByteBuffer> transceive(List<ByteBuffer> request) throws IOException { try { CallFuture<List<ByteBuffer>> transceiverFuture = new CallFuture<List<ByteBuffer>>(); @@ -464,9 +464,9 @@ public class NettyTransceiver extends Transceiver { return null; } } - + @Override - public void transceive(List<ByteBuffer> request, + public void transceive(List<ByteBuffer> request, Callback<List<ByteBuffer>> callback) throws IOException { stateLock.readLock().lock(); try { @@ -478,7 +478,7 @@ public class NettyTransceiver extends Transceiver { stateLock.readLock().unlock(); } } - + @Override public void writeBuffers(List<ByteBuffer> buffers) throws IOException { ChannelFuture writeFuture; @@ -489,7 +489,7 @@ public class NettyTransceiver extends Transceiver { } finally { stateLock.readLock().unlock(); } - + if (!writeFuture.isDone()) { try { writeFuture.await(); @@ -502,10 +502,10 @@ public class NettyTransceiver extends Transceiver { throw new IOException("Error writing buffers", writeFuture.getCause()); } } - + /** * Writes a NettyDataPack, reconnecting to the remote peer if necessary. - * NOTE: The stateLock read lock *must* be acquired before calling this + * NOTE: The stateLock read lock *must* be acquired before calling this * method. * @param dataPack the data pack to write. * @return the Netty ChannelFuture for the write operation. @@ -517,9 +517,9 @@ public class NettyTransceiver extends Transceiver { @Override public List<ByteBuffer> readBuffers() throws IOException { - throw new UnsupportedOperationException(); + throw new UnsupportedOperationException(); } - + @Override public Protocol getRemote() { stateLock.readLock().lock(); @@ -549,23 +549,23 @@ public class NettyTransceiver extends Transceiver { stateLock.writeLock().unlock(); } } - + /** - * A ChannelFutureListener for channel write operations that notifies + * A ChannelFutureListener for channel write operations that notifies * a {@link Callback} if an error occurs while writing to the channel. */ protected class WriteFutureListener implements ChannelFutureListener { protected final Callback<List<ByteBuffer>> callback; - + /** - * Creates a WriteFutureListener that notifies the given callback + * Creates a WriteFutureListener that notifies the given callback * if an error occurs writing data to the channel. * @param callback the callback to notify, or null to skip notification. */ public WriteFutureListener(Callback<List<ByteBuffer>> callback) { this.callback = callback; } - + @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess() && (callback != null)) { @@ -576,7 +576,7 @@ public class NettyTransceiver extends Transceiver { } /** - * Avro client handler for the Netty transport + * Avro client handler for the Netty transport */ protected class NettyClientAvroHandler extends SimpleChannelUpstreamHandler { @@ -618,7 +618,7 @@ public class NettyTransceiver extends Transceiver { @Override public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) { - disconnect(false, true, e.getCause()); + disconnect(false, true, e.getCause()); } } @@ -629,18 +629,18 @@ public class NettyTransceiver extends Transceiver { protected static class NettyTransceiverThreadFactory implements ThreadFactory { private final AtomicInteger threadId = new AtomicInteger(0); private final String prefix; - + /** - * Creates a NettyTransceiverThreadFactory that creates threads with the + * Creates a NettyTransceiverThreadFactory that creates threads with the * specified name. - * @param prefix the name prefix to use for all threads created by this - * ThreadFactory. A unique ID will be appended to this prefix to form the + * @param prefix the name prefix to use for all threads created by this + * ThreadFactory. A unique ID will be appended to this prefix to form the * final thread name. */ public NettyTransceiverThreadFactory(String prefix) { this.prefix = prefix; } - + @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r);
http://git-wip-us.apache.org/repos/asf/avro/blob/da22ffcb/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransportCodec.java ---------------------------------------------------------------------- diff --git a/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransportCodec.java b/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransportCodec.java index 1668c5e..9a96a37 100644 --- a/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransportCodec.java +++ b/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransportCodec.java @@ -31,23 +31,23 @@ import org.jboss.netty.handler.codec.frame.FrameDecoder; import org.jboss.netty.handler.codec.oneone.OneToOneEncoder; /** - * Data structure, encoder and decoder classes for the Netty transport. + * Data structure, encoder and decoder classes for the Netty transport. */ public class NettyTransportCodec { /** - * Transport protocol data structure when using Netty. + * Transport protocol data structure when using Netty. */ public static class NettyDataPack { private int serial; // to track each call in client side private List<ByteBuffer> datas; public NettyDataPack() {} - + public NettyDataPack(int serial, List<ByteBuffer> datas) { this.serial = serial; this.datas = datas; } - + public void setSerial(int serial) { this.serial = serial; } @@ -55,7 +55,7 @@ public class NettyTransportCodec { public int getSerial() { return serial; } - + public void setDatas(List<ByteBuffer> datas) { this.datas = datas; } @@ -63,19 +63,19 @@ public class NettyTransportCodec { public List<ByteBuffer> getDatas() { return datas; } - + } - + /** - * Protocol encoder which converts NettyDataPack which contains the - * Responder's output List<ByteBuffer> to ChannelBuffer needed + * Protocol encoder which converts NettyDataPack which contains the + * Responder's output List<ByteBuffer> to ChannelBuffer needed * by Netty. */ public static class NettyFrameEncoder extends OneToOneEncoder { /** * encode msg to ChannelBuffer - * @param msg NettyDataPack from + * @param msg NettyDataPack from * NettyServerAvroHandler/NettyClientAvroHandler in the pipeline * @return encoded ChannelBuffer */ @@ -94,7 +94,7 @@ public class NettyTransportCodec { return ChannelBuffers .wrappedBuffer(bbs.toArray(new ByteBuffer[bbs.size()])); } - + private ByteBuffer getPackHeader(NettyDataPack dataPack) { ByteBuffer header = ByteBuffer.allocate(8); header.putInt(dataPack.getSerial()); @@ -112,8 +112,8 @@ public class NettyTransportCodec { } /** - * Protocol decoder which converts Netty's ChannelBuffer to - * NettyDataPack which contains a List<ByteBuffer> needed + * Protocol decoder which converts Netty's ChannelBuffer to + * NettyDataPack which contains a List<ByteBuffer> needed * by Avro Responder. */ public static class NettyFrameDecoder extends FrameDecoder { @@ -127,7 +127,7 @@ public class NettyTransportCodec { public NettyFrameDecoder() { maxMem = Runtime.getRuntime().maxMemory(); } - + /** * decode buffer to NettyDataPack */ @@ -148,9 +148,9 @@ public class NettyTransportCodec { return null; } } - + } - + private boolean decodePackHeader(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception { if (buffer.readableBytes()<8) { @@ -173,7 +173,7 @@ public class NettyTransportCodec { return true; } - + private boolean decodePackBody(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception { if (buffer.readableBytes() < 4) { @@ -181,7 +181,7 @@ public class NettyTransportCodec { } buffer.markReaderIndex(); - + int length = buffer.readInt(); if (buffer.readableBytes() < length) { @@ -193,10 +193,10 @@ public class NettyTransportCodec { buffer.readBytes(bb); bb.flip(); dataPack.getDatas().add(bb); - + return dataPack.getDatas().size()==listSize; } } - + } http://git-wip-us.apache.org/repos/asf/avro/blob/da22ffcb/lang/java/ipc/src/main/java/org/apache/avro/ipc/RPCContext.java ---------------------------------------------------------------------- diff --git a/lang/java/ipc/src/main/java/org/apache/avro/ipc/RPCContext.java b/lang/java/ipc/src/main/java/org/apache/avro/ipc/RPCContext.java index 99a88ac..9eeefc4 100644 --- a/lang/java/ipc/src/main/java/org/apache/avro/ipc/RPCContext.java +++ b/lang/java/ipc/src/main/java/org/apache/avro/ipc/RPCContext.java @@ -34,33 +34,33 @@ import org.apache.avro.Protocol.Message; * */ public class RPCContext { - + private HandshakeRequest handshakeRequest; private HandshakeResponse handshakeResponse; protected Map<String,ByteBuffer> requestCallMeta, responseCallMeta; - + protected Object response; protected Exception error; private Message message; List<ByteBuffer> requestPayload; List<ByteBuffer> responsePayload; - + /** Set the handshake request of this RPC. */ public void setHandshakeRequest(HandshakeRequest handshakeRequest) { this.handshakeRequest = handshakeRequest; } - + /** Get the handshake request of this RPC. */ public HandshakeRequest getHandshakeRequest() { return this.handshakeRequest; } - + /** Set the handshake response of this RPC. */ public void setHandshakeResponse(HandshakeResponse handshakeResponse) { this.handshakeResponse = handshakeResponse; } - + /** Get the handshake response of this RPC. */ public HandshakeResponse getHandshakeResponse() { return this.handshakeResponse; @@ -77,11 +77,11 @@ public class RPCContext { handshakeRequest.meta = new HashMap<String,ByteBuffer>(); return handshakeRequest.meta; } - + void setRequestHandshakeMeta(Map<String,ByteBuffer> newmeta) { handshakeRequest.meta = newmeta; } - + /** * This is an access method for the handshake state * provided by the server back to the client @@ -93,11 +93,11 @@ public class RPCContext { handshakeResponse.meta = new HashMap<String,ByteBuffer>(); return handshakeResponse.meta; } - + void setResponseHandshakeMeta(Map<String,ByteBuffer> newmeta) { handshakeResponse.meta = newmeta; } - + /** * This is an access method for the per-call state * provided by the client to the server. @@ -110,11 +110,11 @@ public class RPCContext { } return requestCallMeta; } - + void setRequestCallMeta(Map<String,ByteBuffer> newmeta) { requestCallMeta = newmeta; } - + /** * This is an access method for the per-call state * provided by the server back to the client. @@ -127,16 +127,16 @@ public class RPCContext { } return responseCallMeta; } - + void setResponseCallMeta(Map<String,ByteBuffer> newmeta) { responseCallMeta = newmeta; } - + void setResponse(Object response) { this.response = response; this.error = null; } - + /** * The response object generated at the server, * if it exists. If an exception was generated, @@ -147,12 +147,12 @@ public class RPCContext { public Object response() { return response; } - + void setError(Exception error) { this.response = null; this.error = error; } - + /** * The exception generated at the server, * or null if no such exception has occured @@ -162,7 +162,7 @@ public class RPCContext { public Exception error() { return error; } - + /** * Indicates whether an exception was generated * at the server @@ -172,41 +172,41 @@ public class RPCContext { public boolean isError() { return error != null; } - + /** Sets the {@link Message} corresponding to this RPC */ public void setMessage(Message message) { - this.message = message; + this.message = message; } - + /** Returns the {@link Message} corresponding to this RPC - * @return this RPC's {@link Message} + * @return this RPC's {@link Message} */ public Message getMessage() { return message; } - + /** Sets the serialized payload of the request in this RPC. Will * not include handshake or meta-data. */ public void setRequestPayload(List<ByteBuffer> payload) { this.requestPayload = payload; } - + /** Returns the serialized payload of the request in this RPC. Will only be - * generated from a Requestor and will not include handshake or meta-data. + * generated from a Requestor and will not include handshake or meta-data. * If the request payload has not been set yet, returns null. - * + * * @return this RPC's request payload.*/ public List<ByteBuffer> getRequestPayload() { return this.requestPayload; } - + /** Returns the serialized payload of the response in this RPC. Will only be - * generated from a Responder and will not include handshake or meta-data. + * generated from a Responder and will not include handshake or meta-data. * If the response payload has not been set yet, returns null. - * + * * @return this RPC's response payload.*/ public List<ByteBuffer> getResponsePayload() { return this.responsePayload; } - + /** Sets the serialized payload of the response in this RPC. Will * not include handshake or meta-data. */ public void setResponsePayload(List<ByteBuffer> payload) { http://git-wip-us.apache.org/repos/asf/avro/blob/da22ffcb/lang/java/ipc/src/main/java/org/apache/avro/ipc/RPCPlugin.java ---------------------------------------------------------------------- diff --git a/lang/java/ipc/src/main/java/org/apache/avro/ipc/RPCPlugin.java b/lang/java/ipc/src/main/java/org/apache/avro/ipc/RPCPlugin.java index 64e1231..0dba9af 100644 --- a/lang/java/ipc/src/main/java/org/apache/avro/ipc/RPCPlugin.java +++ b/lang/java/ipc/src/main/java/org/apache/avro/ipc/RPCPlugin.java @@ -24,20 +24,20 @@ package org.apache.avro.ipc; * and can be set or queried by subsequent instrumentation points. */ public class RPCPlugin { - + /** * Called on the client before the initial RPC handshake to * setup any handshake metadata for this plugin * @param context the handshake rpc context */ public void clientStartConnect(RPCContext context) { } - + /** * Called on the server during the RPC handshake * @param context the handshake rpc context */ public void serverConnecting(RPCContext context) { } - + /** * Called on the client after the initial RPC handshake * @param context the handshake rpc context @@ -49,22 +49,22 @@ public class RPCPlugin { * @param context the per-call rpc context (in/out parameter) */ public void clientSendRequest(RPCContext context) { } - - + + /** * This method is invoked at the RPC server when the request is received, * but before the call itself is executed * @param context the per-call rpc context (in/out parameter) */ public void serverReceiveRequest(RPCContext context) { } - + /** * This method is invoked at the server before the response is executed, * but before the response has been formulated * @param context the per-call rpc context (in/out parameter) */ public void serverSendResponse(RPCContext context) { } - + /** * This method is invoked at the client after the call is executed, * and after the client receives the response @@ -72,5 +72,5 @@ public class RPCPlugin { */ public void clientReceiveResponse(RPCContext context) { } - + } http://git-wip-us.apache.org/repos/asf/avro/blob/da22ffcb/lang/java/ipc/src/main/java/org/apache/avro/ipc/Requestor.java ---------------------------------------------------------------------- diff --git a/lang/java/ipc/src/main/java/org/apache/avro/ipc/Requestor.java b/lang/java/ipc/src/main/java/org/apache/avro/ipc/Requestor.java index 5379945..4dfeb7c 100644 --- a/lang/java/ipc/src/main/java/org/apache/avro/ipc/Requestor.java +++ b/lang/java/ipc/src/main/java/org/apache/avro/ipc/Requestor.java @@ -64,7 +64,7 @@ public abstract class Requestor { private volatile boolean sendLocalText; private final Transceiver transceiver; private final ReentrantLock handshakeLock = new ReentrantLock(); - + protected final List<RPCPlugin> rpcMetaPlugins; public Protocol getLocal() { return local; } @@ -77,7 +77,7 @@ public abstract class Requestor { this.rpcMetaPlugins = new CopyOnWriteArrayList<RPCPlugin>(); } - + /** * Adds a new plugin to manipulate RPC metadata. Plugins * are executed in the order that they are added. @@ -88,7 +88,7 @@ public abstract class Requestor { } private static final EncoderFactory ENCODER_FACTORY = new EncoderFactory(); - + /** Writes a request message and reads a response or error message. */ public Object request(String messageName, Object request) throws Exception { @@ -96,10 +96,10 @@ public abstract class Requestor { Request rpcRequest = new Request(messageName, request, new RPCContext()); CallFuture<Object> future = /* only need a Future for two-way messages */ rpcRequest.getMessage().isOneWay() ? null : new CallFuture<Object>(); - + // Send request request(rpcRequest, future); - + if (future == null) // the message is one-way, so return immediately return null; try { // the message is two-way, wait for the result @@ -112,7 +112,7 @@ public abstract class Requestor { } } } - + /** * Writes a request message and returns the result through a Callback. * Clients can also use a Future interface by creating a new CallFuture<T>, @@ -120,15 +120,15 @@ public abstract class Requestor { * @param <T> the return type of the message. * @param messageName the name of the message to invoke. * @param request the request data to send. - * @param callback the callback which will be invoked when the response is returned + * @param callback the callback which will be invoked when the response is returned * or an error occurs. * @throws Exception if an error occurs sending the message. */ - public <T> void request(String messageName, Object request, Callback<T> callback) + public <T> void request(String messageName, Object request, Callback<T> callback) throws Exception { request(new Request(messageName, request, new RPCContext()), callback); } - + /** Writes a request message and returns the result through a Callback. */ <T> void request(Request request, Callback<T> callback) throws Exception { @@ -166,7 +166,7 @@ public abstract class Requestor { } } } - + if (request.getMessage().isOneWay()) { t.lockChannel(); try { @@ -181,7 +181,7 @@ public abstract class Requestor { t.transceive(request.getBytes(), new TransceiverCallback<T>(request, callback)); } - + } private static final ConcurrentMap<String,MD5> REMOTE_HASHES = @@ -212,14 +212,14 @@ public abstract class Requestor { handshake.serverHash = remoteHash; if (sendLocalText) handshake.clientProtocol = local.toString(); - + RPCContext context = new RPCContext(); context.setHandshakeRequest(handshake); for (RPCPlugin plugin : rpcMetaPlugins) { plugin.clientStartConnect(context); } handshake.meta = context.requestHandshakeMeta(); - + HANDSHAKE_WRITER.write(handshake, out); } @@ -246,7 +246,7 @@ public abstract class Requestor { default: throw new AvroRuntimeException("Unexpected match: "+handshake.match); } - + RPCContext context = new RPCContext(); context.setHandshakeResponse(handshake); for (RPCPlugin plugin : rpcMetaPlugins) { @@ -315,14 +315,14 @@ public abstract class Requestor { /** Reads an error message. */ public abstract Exception readError(Schema writer, Schema reader, Decoder in) throws IOException; - + /** * Handles callbacks from transceiver invocations. */ protected class TransceiverCallback<T> implements Callback<List<ByteBuffer>> { private final Request request; private final Callback<T> callback; - + /** * Creates a TransceiverCallback. * @param request the request to set. @@ -332,7 +332,7 @@ public abstract class Requestor { this.request = request; this.callback = callback; } - + @Override @SuppressWarnings("unchecked") public void handleResult(List<ByteBuffer> responseBytes) { @@ -350,7 +350,7 @@ public abstract class Requestor { } catch (Exception e) { LOG.error("Error handling transceiver callback: " + e, e); } - + // Read response; invoke callback Response response = new Response(request, in); Object responseObject; @@ -370,13 +370,13 @@ public abstract class Requestor { LOG.error("Error in callback handler: " + t, t); } } - + @Override public void handleError(Throwable error) { callback.handleError(error); } } - + /** * Encapsulates/generates a request. */ @@ -387,7 +387,7 @@ public abstract class Requestor { private final BinaryEncoder encoder; private Message message; private List<ByteBuffer> requestBytes; - + /** * Creates a Request. * @param messageName the name of the message to invoke. @@ -397,7 +397,7 @@ public abstract class Requestor { public Request(String messageName, Object request, RPCContext context) { this(messageName, request, context, null); } - + /** * Creates a Request. * @param messageName the name of the message to invoke. @@ -413,7 +413,7 @@ public abstract class Requestor { this.encoder = ENCODER_FACTORY.binaryEncoder(new ByteBufferOutputStream(), encoder); } - + /** * Copy constructor. * @param other Request from which to copy fields. @@ -424,7 +424,7 @@ public abstract class Requestor { this.context = other.context; this.encoder = other.encoder; } - + /** * Gets the message name. * @return the message name. @@ -432,7 +432,7 @@ public abstract class Requestor { public String getMessageName() { return messageName; } - + /** * Gets the RPC context. * @return the RPC context. @@ -440,7 +440,7 @@ public abstract class Requestor { public RPCContext getContext() { return context; } - + /** * Gets the Message associated with this request. * @return this request's message. @@ -454,13 +454,13 @@ public abstract class Requestor { } return message; } - + /** * Gets the request data, generating it first if necessary. * @return the request data. * @throws Exception if an error occurs generating the request data. */ - public List<ByteBuffer> getBytes() + public List<ByteBuffer> getBytes() throws Exception { if (requestBytes == null) { ByteBufferOutputStream bbo = new ByteBufferOutputStream(); @@ -493,14 +493,14 @@ public abstract class Requestor { return requestBytes; } } - + /** * Encapsulates/parses a response. */ class Response { private final Request request; private final BinaryDecoder in; - + /** * Creates a Response. * @param request the Request associated with this response. @@ -508,7 +508,7 @@ public abstract class Requestor { public Response(Request request) { this(request, null); } - + /** * Creates a Creates a Response. * @param request the Request associated with this response. @@ -518,13 +518,13 @@ public abstract class Requestor { this.request = request; this.in = in; } - + /** * Gets the RPC response, reading/deserializing it first if necessary. * @return the RPC response. * @throws Exception if an error occurs reading/deserializing the response. */ - public Object getResponse() + public Object getResponse() throws Exception { Message lm = request.getMessage(); Message rm = remote.getMessages().get(request.getMessageName()); @@ -538,7 +538,7 @@ public abstract class Requestor { ("Not both one-way messages: "+request.getMessageName()); if (lm.isOneWay() && t.isConnected()) return null; // one-way w/ handshake - + RPCContext context = request.getContext(); context.setResponseCallMeta(META_READER.read(null, in)); http://git-wip-us.apache.org/repos/asf/avro/blob/da22ffcb/lang/java/ipc/src/main/java/org/apache/avro/ipc/Responder.java ---------------------------------------------------------------------- diff --git a/lang/java/ipc/src/main/java/org/apache/avro/ipc/Responder.java b/lang/java/ipc/src/main/java/org/apache/avro/ipc/Responder.java index 6a1a3ff..322c3fb 100644 --- a/lang/java/ipc/src/main/java/org/apache/avro/ipc/Responder.java +++ b/lang/java/ipc/src/main/java/org/apache/avro/ipc/Responder.java @@ -80,10 +80,10 @@ public abstract class Responder { /** Return the remote protocol. Accesses a {@link ThreadLocal} that's set * around calls to {@link #respond(Protocol.Message, Object)}. */ public static Protocol getRemote() { return REMOTE.get(); } - + /** Return the local protocol. */ public Protocol getLocal() { return local; } - + /** * Adds a new plugin to manipulate per-call metadata. Plugins * are executed in the order that they are added. @@ -98,7 +98,7 @@ public abstract class Responder { public List<ByteBuffer> respond(List<ByteBuffer> buffers) throws IOException { return respond(buffers, null); } - + /** Called by a server to deserialize a request, compute and serialize a * response or error. Transciever is used by connection-based servers to * track handshake status of connection. */ @@ -119,7 +119,7 @@ public abstract class Responder { if (remote == null) // handshake failed return bbo.getBufferList(); handshake = bbo.getBufferList(); - + // read request using remote protocol specification context.setRequestCallMeta(META_READER.read(null, in)); String messageName = in.readString(null).toString(); @@ -134,7 +134,7 @@ public abstract class Responder { +" in "+getLocal()); Object request = readRequest(rm.getRequest(), m.getRequest(), in); - + context.setMessage(rm); for (RPCPlugin plugin : rpcMetaPlugins) { plugin.serverReceiveRequest(context); @@ -145,7 +145,7 @@ public abstract class Responder { throw new AvroRuntimeException("Not both one-way: "+messageName); Object response = null; - + try { REMOTE.set(remote); response = respond(m, request); @@ -157,7 +157,7 @@ public abstract class Responder { } finally { REMOTE.set(null); } - + if (m.isOneWay() && wasConnected) // no response data return null; @@ -183,7 +183,7 @@ public abstract class Responder { } out.flush(); payload = bbo.getBufferList(); - + // Grab meta-data from plugins context.setResponsePayload(payload); for (RPCPlugin plugin : rpcMetaPlugins) { @@ -225,7 +225,7 @@ public abstract class Responder { response.serverProtocol = local.toString(); response.serverHash = localHash; } - + RPCContext context = new RPCContext(); context.setHandshakeRequest(request); context.setHandshakeResponse(response); http://git-wip-us.apache.org/repos/asf/avro/blob/da22ffcb/lang/java/ipc/src/main/java/org/apache/avro/ipc/SaslSocketTransceiver.java ---------------------------------------------------------------------- diff --git a/lang/java/ipc/src/main/java/org/apache/avro/ipc/SaslSocketTransceiver.java b/lang/java/ipc/src/main/java/org/apache/avro/ipc/SaslSocketTransceiver.java index 880c7a5..aba720c 100644 --- a/lang/java/ipc/src/main/java/org/apache/avro/ipc/SaslSocketTransceiver.java +++ b/lang/java/ipc/src/main/java/org/apache/avro/ipc/SaslSocketTransceiver.java @@ -55,7 +55,7 @@ public class SaslSocketTransceiver extends Transceiver { private boolean saslResponsePiggybacked; private Protocol remote; - + private ByteBuffer readHeader = ByteBuffer.allocate(4); private ByteBuffer writeHeader = ByteBuffer.allocate(4); private ByteBuffer zeroHeader = ByteBuffer.allocate(4).putInt(0); @@ -128,7 +128,7 @@ public class SaslSocketTransceiver extends Transceiver { if (sasl.isComplete()) saslResponsePiggybacked = true; } - + while (!sasl.isComplete()) { Status status = readStatus(); ByteBuffer frame = readFrame(); @@ -140,7 +140,7 @@ public class SaslSocketTransceiver extends Transceiver { write(Status.FAIL, "Wrong mechanism: "+mechanism); throw new SaslException("Wrong mechanism: "+mechanism); } - case CONTINUE: + case CONTINUE: byte[] response; try { response = sasl.evaluate(frame.array()); @@ -213,7 +213,7 @@ public class SaslSocketTransceiver extends Transceiver { read(buffer); return buffer; } - + private void read(ByteBuffer buffer) throws IOException { buffer.clear(); while (buffer.hasRemaining()) http://git-wip-us.apache.org/repos/asf/avro/blob/da22ffcb/lang/java/ipc/src/main/java/org/apache/avro/ipc/Server.java ---------------------------------------------------------------------- diff --git a/lang/java/ipc/src/main/java/org/apache/avro/ipc/Server.java b/lang/java/ipc/src/main/java/org/apache/avro/ipc/Server.java index 4ae6053..aa0efe3 100644 --- a/lang/java/ipc/src/main/java/org/apache/avro/ipc/Server.java +++ b/lang/java/ipc/src/main/java/org/apache/avro/ipc/Server.java @@ -28,8 +28,8 @@ public interface Server { /** Stop this server. */ void close(); - + /** Wait for this server to exit. */ void join() throws InterruptedException; - + } http://git-wip-us.apache.org/repos/asf/avro/blob/da22ffcb/lang/java/ipc/src/main/java/org/apache/avro/ipc/SocketServer.java ---------------------------------------------------------------------- diff --git a/lang/java/ipc/src/main/java/org/apache/avro/ipc/SocketServer.java b/lang/java/ipc/src/main/java/org/apache/avro/ipc/SocketServer.java index 8db5d66..580c16e 100644 --- a/lang/java/ipc/src/main/java/org/apache/avro/ipc/SocketServer.java +++ b/lang/java/ipc/src/main/java/org/apache/avro/ipc/SocketServer.java @@ -83,7 +83,7 @@ public class SocketServer extends Thread implements Server { } public void close() { - this.interrupt(); + this.interrupt(); group.interrupt(); } @@ -128,7 +128,7 @@ public class SocketServer extends Thread implements Server { } } - + public static void main(String[] arg) throws Exception { Responder responder = new GenericResponder(Protocol.parse("{\"protocol\": \"X\"}")) { http://git-wip-us.apache.org/repos/asf/avro/blob/da22ffcb/lang/java/ipc/src/main/java/org/apache/avro/ipc/SocketTransceiver.java ---------------------------------------------------------------------- diff --git a/lang/java/ipc/src/main/java/org/apache/avro/ipc/SocketTransceiver.java b/lang/java/ipc/src/main/java/org/apache/avro/ipc/SocketTransceiver.java index e2178c6..50bc5be 100644 --- a/lang/java/ipc/src/main/java/org/apache/avro/ipc/SocketTransceiver.java +++ b/lang/java/ipc/src/main/java/org/apache/avro/ipc/SocketTransceiver.java @@ -43,7 +43,7 @@ public class SocketTransceiver extends Transceiver { private ByteBuffer header = ByteBuffer.allocate(4); private Protocol remote; - + public SocketTransceiver(SocketAddress address) throws IOException { this(SocketChannel.open(address)); } http://git-wip-us.apache.org/repos/asf/avro/blob/da22ffcb/lang/java/ipc/src/main/java/org/apache/avro/ipc/Transceiver.java ---------------------------------------------------------------------- diff --git a/lang/java/ipc/src/main/java/org/apache/avro/ipc/Transceiver.java b/lang/java/ipc/src/main/java/org/apache/avro/ipc/Transceiver.java index 898fd77..820ba17 100644 --- a/lang/java/ipc/src/main/java/org/apache/avro/ipc/Transceiver.java +++ b/lang/java/ipc/src/main/java/org/apache/avro/ipc/Transceiver.java @@ -31,14 +31,14 @@ public abstract class Transceiver implements Closeable { private final ReentrantLock channelLock = new ReentrantLock(); public abstract String getRemoteName() throws IOException; - + /** * Acquires an exclusive lock on the transceiver's channel. */ public void lockChannel() { channelLock.lock(); } - + /** * Releases the lock on the transceiver's channel if held by the calling thread. */ @@ -61,8 +61,8 @@ public abstract class Transceiver implements Closeable { unlockChannel(); } } - - /** + + /** * Called by {@link Requestor#request(String,Object,Callback)} for two-way messages using callbacks. */ public void transceive(List<ByteBuffer> request, Callback<List<ByteBuffer>> callback) http://git-wip-us.apache.org/repos/asf/avro/blob/da22ffcb/lang/java/ipc/src/main/java/org/apache/avro/ipc/generic/GenericResponder.java ---------------------------------------------------------------------- diff --git a/lang/java/ipc/src/main/java/org/apache/avro/ipc/generic/GenericResponder.java b/lang/java/ipc/src/main/java/org/apache/avro/ipc/generic/GenericResponder.java index c5beac0..1ed6bc7 100644 --- a/lang/java/ipc/src/main/java/org/apache/avro/ipc/generic/GenericResponder.java +++ b/lang/java/ipc/src/main/java/org/apache/avro/ipc/generic/GenericResponder.java @@ -38,7 +38,7 @@ public abstract class GenericResponder extends Responder { public GenericResponder(Protocol local) { this(local, GenericData.get()); - + } public GenericResponder(Protocol local, GenericData data) { http://git-wip-us.apache.org/repos/asf/avro/blob/da22ffcb/lang/java/ipc/src/main/java/org/apache/avro/ipc/reflect/ReflectRequestor.java ---------------------------------------------------------------------- diff --git a/lang/java/ipc/src/main/java/org/apache/avro/ipc/reflect/ReflectRequestor.java b/lang/java/ipc/src/main/java/org/apache/avro/ipc/reflect/ReflectRequestor.java index 84d798e..29903ec 100644 --- a/lang/java/ipc/src/main/java/org/apache/avro/ipc/reflect/ReflectRequestor.java +++ b/lang/java/ipc/src/main/java/org/apache/avro/ipc/reflect/ReflectRequestor.java @@ -33,7 +33,7 @@ import org.apache.avro.ipc.specific.SpecificRequestor; /** A {@link org.apache.avro.ipc.Requestor} for existing interfaces. */ public class ReflectRequestor extends SpecificRequestor { - + public ReflectRequestor(Class<?> iface, Transceiver transceiver) throws IOException { this(iface, transceiver, new ReflectData(iface.getClassLoader())); @@ -43,19 +43,19 @@ public class ReflectRequestor extends SpecificRequestor { throws IOException { this(protocol, transceiver, ReflectData.get()); } - + public ReflectRequestor(Class<?> iface, Transceiver transceiver, ReflectData data) throws IOException { this(data.getProtocol(iface), transceiver, data); } - + public ReflectRequestor(Protocol protocol, Transceiver transceiver, ReflectData data) throws IOException { super(protocol, transceiver, data); } - + public ReflectData getReflectData() { return (ReflectData)getSpecificData(); } @Override @@ -69,7 +69,7 @@ public class ReflectRequestor extends SpecificRequestor { } /** Create a proxy instance whose methods invoke RPCs. */ - public static <T> T getClient(Class<T> iface, Transceiver transciever) + public static <T> T getClient(Class<T> iface, Transceiver transciever) throws IOException { return getClient(iface, transciever, new ReflectData(iface.getClassLoader())); @@ -81,16 +81,16 @@ public class ReflectRequestor extends SpecificRequestor { ReflectData reflectData) throws IOException { Protocol protocol = reflectData.getProtocol(iface); return (T)Proxy.newProxyInstance - (reflectData.getClassLoader(), + (reflectData.getClassLoader(), new Class[] { iface }, new ReflectRequestor(protocol, transciever, reflectData)); } - + /** Create a proxy instance whose methods invoke RPCs. */ @SuppressWarnings("unchecked") - public static <T> T getClient(Class<T> iface, ReflectRequestor rreq) + public static <T> T getClient(Class<T> iface, ReflectRequestor rreq) throws IOException { - return (T)Proxy.newProxyInstance(rreq.getReflectData().getClassLoader(), + return (T)Proxy.newProxyInstance(rreq.getReflectData().getClassLoader(), new Class[] { iface }, rreq); } } http://git-wip-us.apache.org/repos/asf/avro/blob/da22ffcb/lang/java/ipc/src/main/java/org/apache/avro/ipc/reflect/ReflectResponder.java ---------------------------------------------------------------------- diff --git a/lang/java/ipc/src/main/java/org/apache/avro/ipc/reflect/ReflectResponder.java b/lang/java/ipc/src/main/java/org/apache/avro/ipc/reflect/ReflectResponder.java index 3e66943..b9d8eff 100644 --- a/lang/java/ipc/src/main/java/org/apache/avro/ipc/reflect/ReflectResponder.java +++ b/lang/java/ipc/src/main/java/org/apache/avro/ipc/reflect/ReflectResponder.java @@ -35,7 +35,7 @@ public class ReflectResponder extends SpecificResponder { public ReflectResponder(Class iface, Object impl) { this(iface, impl, new ReflectData(impl.getClass().getClassLoader())); } - + public ReflectResponder(Protocol protocol, Object impl) { this(protocol, impl, new ReflectData(impl.getClass().getClassLoader())); } http://git-wip-us.apache.org/repos/asf/avro/blob/da22ffcb/lang/java/ipc/src/main/java/org/apache/avro/ipc/specific/SpecificRequestor.java ---------------------------------------------------------------------- diff --git a/lang/java/ipc/src/main/java/org/apache/avro/ipc/specific/SpecificRequestor.java b/lang/java/ipc/src/main/java/org/apache/avro/ipc/specific/SpecificRequestor.java index d464737..5bcddc3 100644 --- a/lang/java/ipc/src/main/java/org/apache/avro/ipc/specific/SpecificRequestor.java +++ b/lang/java/ipc/src/main/java/org/apache/avro/ipc/specific/SpecificRequestor.java @@ -124,12 +124,12 @@ public class SpecificRequestor extends Requestor implements InvocationHandler { throw e; } } - + // Next, check for RuntimeExceptions: if (e instanceof RuntimeException) { throw e; } - + // Not an expected Exception, so wrap it in AvroRemoteException: throw new AvroRemoteException(e); } @@ -157,7 +157,7 @@ public class SpecificRequestor extends Requestor implements InvocationHandler { for (Schema.Field param : schema.getFields()) getDatumWriter(param.schema()).write(args[i++], out); } - + @Override public Object readResponse(Schema writer, Schema reader, Decoder in) throws IOException { @@ -203,7 +203,7 @@ public class SpecificRequestor extends Requestor implements InvocationHandler { /** Return the remote protocol for a proxy. */ public static Protocol getRemote(Object proxy) throws IOException { return ((Requestor)Proxy.getInvocationHandler(proxy)).getRemote(); - + } } http://git-wip-us.apache.org/repos/asf/avro/blob/da22ffcb/lang/java/ipc/src/main/java/org/apache/avro/ipc/specific/SpecificResponder.java ---------------------------------------------------------------------- diff --git a/lang/java/ipc/src/main/java/org/apache/avro/ipc/specific/SpecificResponder.java b/lang/java/ipc/src/main/java/org/apache/avro/ipc/specific/SpecificResponder.java index ae3a30d..5e84d14 100644 --- a/lang/java/ipc/src/main/java/org/apache/avro/ipc/specific/SpecificResponder.java +++ b/lang/java/ipc/src/main/java/org/apache/avro/ipc/specific/SpecificResponder.java @@ -42,7 +42,7 @@ public class SpecificResponder extends GenericResponder { public SpecificResponder(Class iface, Object impl) { this(iface, impl, new SpecificData(impl.getClass().getClassLoader())); } - + public SpecificResponder(Protocol protocol, Object impl) { this(protocol, impl, new SpecificData(impl.getClass().getClassLoader())); } http://git-wip-us.apache.org/repos/asf/avro/blob/da22ffcb/lang/java/ipc/src/main/java/org/apache/avro/ipc/stats/Histogram.java ---------------------------------------------------------------------- diff --git a/lang/java/ipc/src/main/java/org/apache/avro/ipc/stats/Histogram.java b/lang/java/ipc/src/main/java/org/apache/avro/ipc/stats/Histogram.java index 521e1c2..6fef833 100644 --- a/lang/java/ipc/src/main/java/org/apache/avro/ipc/stats/Histogram.java +++ b/lang/java/ipc/src/main/java/org/apache/avro/ipc/stats/Histogram.java @@ -39,8 +39,8 @@ class Histogram<B, T> { /** * How many recent additions we should track. */ - public static final int MAX_HISTORY_SIZE = 20; - + public static final int MAX_HISTORY_SIZE = 20; + private Segmenter<B, T> segmenter; private int[] counts; protected int totalCount; @@ -67,13 +67,13 @@ class Histogram<B, T> { * is consistent with the segment numbers. */ Iterator<B> getBuckets(); - + /** * Returns a List of bucket boundaries. Useful for printing * segmenters. */ List<String> getBoundaryLabels(); - + /** * Returns the bucket labels as an array; */ @@ -116,7 +116,7 @@ class Histogram<B, T> { private String rangeAsString(T a, T b) { return String.format("[%s,%s)", a, b == null ? "infinity" : b); } - + @Override public ArrayList<String> getBoundaryLabels() { ArrayList<String> outArray = new ArrayList<String>(index.keySet().size()); @@ -125,7 +125,7 @@ class Histogram<B, T> { } return outArray; } - + @Override public ArrayList<String> getBucketLabels() { ArrayList<String> outArray = new ArrayList<String>(index.keySet().size()); @@ -135,14 +135,14 @@ class Histogram<B, T> { } return outArray; } - + @Override public Iterator<String> getBuckets() { return new Iterator<String>() { Iterator<T> it = index.keySet().iterator(); T cur = it.next(); // there's always at least one element int pos = 0; - + @Override public boolean hasNext() { return (pos < index.keySet().size()); @@ -190,14 +190,14 @@ class Histogram<B, T> { public int[] getHistogram() { return counts; } - + /** * Returns the underlying segmenter used for this histogram. */ public Segmenter<B, T> getSegmenter() { return this.segmenter; } - + /** * Returns values recently added to this histogram. These are in reverse * order (most recent first). @@ -210,7 +210,7 @@ class Histogram<B, T> { public int getCount() { return totalCount; } - + public String toString() { StringBuilder sb = new StringBuilder(); http://git-wip-us.apache.org/repos/asf/avro/blob/da22ffcb/lang/java/ipc/src/main/java/org/apache/avro/ipc/stats/StaticServlet.java ---------------------------------------------------------------------- diff --git a/lang/java/ipc/src/main/java/org/apache/avro/ipc/stats/StaticServlet.java b/lang/java/ipc/src/main/java/org/apache/avro/ipc/stats/StaticServlet.java index c079ec5..88a50f8 100644 --- a/lang/java/ipc/src/main/java/org/apache/avro/ipc/stats/StaticServlet.java +++ b/lang/java/ipc/src/main/java/org/apache/avro/ipc/stats/StaticServlet.java @@ -29,7 +29,7 @@ import org.mortbay.resource.Resource; */ public class StaticServlet extends DefaultServlet { public Resource getResource(String pathInContext) { - // Take only last slice of the URL as a filename, so we can adjust path. + // Take only last slice of the URL as a filename, so we can adjust path. // This also prevents mischief like '../../foo.css' String[] parts = pathInContext.split("/"); String filename = parts[parts.length - 1]; @@ -43,4 +43,4 @@ public class StaticServlet extends DefaultServlet { return null; } } -} +} http://git-wip-us.apache.org/repos/asf/avro/blob/da22ffcb/lang/java/ipc/src/main/java/org/apache/avro/ipc/stats/StatsPlugin.java ---------------------------------------------------------------------- diff --git a/lang/java/ipc/src/main/java/org/apache/avro/ipc/stats/StatsPlugin.java b/lang/java/ipc/src/main/java/org/apache/avro/ipc/stats/StatsPlugin.java index 565f532..6301bbe 100644 --- a/lang/java/ipc/src/main/java/org/apache/avro/ipc/stats/StatsPlugin.java +++ b/lang/java/ipc/src/main/java/org/apache/avro/ipc/stats/StatsPlugin.java @@ -36,7 +36,7 @@ import org.apache.avro.ipc.stats.Stopwatch.Ticks; /** * Collects count and latency statistics about RPC calls. Keeps * data for every method. Can be added to a Requestor (client) - * or Responder (server). + * or Responder (server). * * This uses milliseconds as the standard unit of measure * throughout the class, stored in floats. @@ -76,9 +76,9 @@ public class StatsPlugin extends RPCPlugin { 2000, 5000, 10000, - 50000, + 50000, 100000))); - + /** Per-method histograms. * Must be accessed while holding a lock. */ Map<Message, FloatHistogram<?>> methodTimings = @@ -86,10 +86,10 @@ public class StatsPlugin extends RPCPlugin { Map<Message, IntegerHistogram<?>> sendPayloads = new HashMap<Message, IntegerHistogram<?>>(); - + Map<Message, IntegerHistogram<?>> receivePayloads = new HashMap<Message, IntegerHistogram<?>>(); - + /** RPCs in flight. */ ConcurrentMap<RPCContext, Stopwatch> activeRpcs = new ConcurrentHashMap<RPCContext, Stopwatch>(); @@ -97,12 +97,12 @@ public class StatsPlugin extends RPCPlugin { /** How long I've been alive */ public Date startupTime = new Date(); - + private Segmenter<?, Float> floatSegmenter; private Segmenter<?, Integer> integerSegmenter; /** Construct a plugin with custom Ticks and Segmenter implementations. */ - StatsPlugin(Ticks ticks, Segmenter<?, Float> floatSegmenter, + StatsPlugin(Ticks ticks, Segmenter<?, Float> floatSegmenter, Segmenter<?, Integer> integerSegmenter) { this.floatSegmenter = floatSegmenter; this.integerSegmenter = integerSegmenter; @@ -114,7 +114,7 @@ public class StatsPlugin extends RPCPlugin { public StatsPlugin() { this(Stopwatch.SYSTEM_TICKS, LATENCY_SEGMENTER, PAYLOAD_SEGMENTER); } - + /** * Helper to get the size of an RPC payload. */ @@ -122,12 +122,12 @@ public class StatsPlugin extends RPCPlugin { if (payload == null) { return 0; } - + int size = 0; for (ByteBuffer bb: payload) { size = size + bb.limit(); } - + return size; } @@ -136,7 +136,7 @@ public class StatsPlugin extends RPCPlugin { Stopwatch t = new Stopwatch(ticks); t.start(); this.activeRpcs.put(context, t); - + synchronized(receivePayloads) { IntegerHistogram<?> h = receivePayloads.get(context.getMessage()); if (h == null) { @@ -146,13 +146,13 @@ public class StatsPlugin extends RPCPlugin { h.add(getPayloadSize(context.getRequestPayload())); } } - + @Override public void serverSendResponse(RPCContext context) { Stopwatch t = this.activeRpcs.remove(context); t.stop(); publish(context, t); - + synchronized(sendPayloads) { IntegerHistogram<?> h = sendPayloads.get(context.getMessage()); if (h == null) { @@ -162,13 +162,13 @@ public class StatsPlugin extends RPCPlugin { h.add(getPayloadSize(context.getResponsePayload())); } } - + @Override public void clientSendRequest(RPCContext context) { Stopwatch t = new Stopwatch(ticks); t.start(); this.activeRpcs.put(context, t); - + synchronized(sendPayloads) { IntegerHistogram<?> h = sendPayloads.get(context.getMessage()); if (h == null) { @@ -178,13 +178,13 @@ public class StatsPlugin extends RPCPlugin { h.add(getPayloadSize(context.getRequestPayload())); } } - + @Override public void clientReceiveResponse(RPCContext context) { Stopwatch t = this.activeRpcs.remove(context); t.stop(); publish(context, t); - + synchronized(receivePayloads) { IntegerHistogram<?> h = receivePayloads.get(context.getMessage()); if (h == null) { @@ -194,7 +194,7 @@ public class StatsPlugin extends RPCPlugin { h.add(getPayloadSize(context.getRequestPayload())); } } - + /** Adds timing to the histograms. */ private void publish(RPCContext context, Stopwatch t) { Message message = context.getMessage(); @@ -218,7 +218,7 @@ public class StatsPlugin extends RPCPlugin { private IntegerHistogram<?> createNewIntegerHistogram() { return new IntegerHistogram(integerSegmenter); } - + /** Converts nanoseconds to milliseconds. */ static float nanosToMillis(long elapsedNanos) { return elapsedNanos / 1000000.0f; http://git-wip-us.apache.org/repos/asf/avro/blob/da22ffcb/lang/java/ipc/src/main/java/org/apache/avro/ipc/stats/StatsServer.java ---------------------------------------------------------------------- diff --git a/lang/java/ipc/src/main/java/org/apache/avro/ipc/stats/StatsServer.java b/lang/java/ipc/src/main/java/org/apache/avro/ipc/stats/StatsServer.java index 3ae8ada..1b2e54b 100644 --- a/lang/java/ipc/src/main/java/org/apache/avro/ipc/stats/StatsServer.java +++ b/lang/java/ipc/src/main/java/org/apache/avro/ipc/stats/StatsServer.java @@ -21,32 +21,32 @@ import org.mortbay.jetty.servlet.Context; import org.mortbay.jetty.servlet.ServletHolder; /* This is a server that displays live information from a StatsPlugin. - * + * * Typical usage is as follows: - * StatsPlugin plugin = new StatsPlugin(); + * StatsPlugin plugin = new StatsPlugin(); * requestor.addPlugin(plugin); * StatsServer server = new StatsServer(plugin, 8080); - * + * * */ public class StatsServer { Server httpServer; StatsPlugin plugin; - - /* Start a stats server on the given port, + + /* Start a stats server on the given port, * responsible for the given plugin. */ public StatsServer(StatsPlugin plugin, int port) throws Exception { this.httpServer = new Server(port); this.plugin = plugin; - + Context staticContext = new Context(httpServer, "/static"); staticContext.addServlet(new ServletHolder(new StaticServlet()), "/"); - + Context context = new Context(httpServer, "/"); context.addServlet(new ServletHolder(new StatsServlet(plugin)), "/"); - + httpServer.start(); } - + /* Stops this server. */ public void stop() throws Exception { this.httpServer.stop(); http://git-wip-us.apache.org/repos/asf/avro/blob/da22ffcb/lang/java/ipc/src/main/java/org/apache/avro/ipc/stats/StatsServlet.java ---------------------------------------------------------------------- diff --git a/lang/java/ipc/src/main/java/org/apache/avro/ipc/stats/StatsServlet.java b/lang/java/ipc/src/main/java/org/apache/avro/ipc/stats/StatsServlet.java index 3af2ffd..075bff2 100644 --- a/lang/java/ipc/src/main/java/org/apache/avro/ipc/stats/StatsServlet.java +++ b/lang/java/ipc/src/main/java/org/apache/avro/ipc/stats/StatsServlet.java @@ -49,49 +49,49 @@ import org.apache.avro.ipc.RPCContext; * This class follows the same synchronization conventions * as StatsPlugin, to avoid requiring StatsPlugin to serve * a copy of the data. - */ + */ public class StatsServlet extends HttpServlet { private final StatsPlugin statsPlugin; private VelocityEngine velocityEngine; - private static final SimpleDateFormat FORMATTER = + private static final SimpleDateFormat FORMATTER = new SimpleDateFormat("dd-MMM-yyyy HH:mm:ss"); public StatsServlet(StatsPlugin statsPlugin) throws UnavailableException { this.statsPlugin = statsPlugin; this.velocityEngine = new VelocityEngine(); - + // These two properties tell Velocity to use its own classpath-based loader velocityEngine.addProperty("resource.loader", "class"); velocityEngine.addProperty("class.resource.loader.class", "org.apache.velocity.runtime.resource.loader.ClasspathResourceLoader"); - + velocityEngine.setProperty("runtime.references.strict", true); String logChuteName = "org.apache.velocity.runtime.log.NullLogChute"; velocityEngine.setProperty("runtime.log.logsystem.class", logChuteName); } - + /* Helper class to store per-message data which is passed to templates. - * + * * The template expects a list of charts, each of which is parameterized by * map key-value string attributes. */ public class RenderableMessage { // Velocity brakes if not public public String name; public int numCalls; public ArrayList<HashMap<String, String>> charts; - + public RenderableMessage(String name) { this.name = name; this.charts = new ArrayList<HashMap<String, String>>(); } - + public ArrayList<HashMap<String, String>> getCharts() { return this.charts; } - + public String getname() { return this.name; } - + public int getNumCalls() { return this.numCalls; } @@ -99,9 +99,9 @@ public class StatsServlet extends HttpServlet { /* Surround each string in an array with * quotation marks and escape existing quotes. - * + * * This is useful when we have an array of strings that we want to turn into - * a javascript array declaration. + * a javascript array declaration. */ protected static List<String> escapeStringArray(List<String> input) { for (int i = 0; i < input.size(); i++) { @@ -109,16 +109,16 @@ public class StatsServlet extends HttpServlet { } return input; } - + @Override protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { resp.setContentType("text/html"); String url = req.getRequestURL().toString(); String[] parts = url.split("//")[1].split("/"); - + try { - writeStats(resp.getWriter()); + writeStats(resp.getWriter()); } catch (Exception e) { e.printStackTrace(); @@ -127,34 +127,34 @@ public class StatsServlet extends HttpServlet { void writeStats(Writer w) throws IOException { VelocityContext context = new VelocityContext(); - context.put("title", "Avro RPC Stats"); - + context.put("title", "Avro RPC Stats"); + ArrayList<String> rpcs = new ArrayList<String>(); // in flight rpcs - - ArrayList<RenderableMessage> messages = + + ArrayList<RenderableMessage> messages = new ArrayList<RenderableMessage>(); - - for (Entry<RPCContext, Stopwatch> rpc : + + for (Entry<RPCContext, Stopwatch> rpc : this.statsPlugin.activeRpcs.entrySet()) { rpcs.add(renderActiveRpc(rpc.getKey(), rpc.getValue())); } - + // Get set of all seen messages Set<Message> keys = null; synchronized(this.statsPlugin.methodTimings) { keys = this.statsPlugin.methodTimings.keySet(); - + for (Message m: keys) { messages.add(renderMethod(m)); } } - + context.put("inFlightRpcs", rpcs); context.put("messages", messages); - + context.put("currTime", FORMATTER.format(new Date())); context.put("startupTime", FORMATTER.format(statsPlugin.startupTime)); - + Template t; try { t = velocityEngine.getTemplate( @@ -169,22 +169,22 @@ public class StatsServlet extends HttpServlet { t.merge(context, w); } - private String renderActiveRpc(RPCContext rpc, Stopwatch stopwatch) + private String renderActiveRpc(RPCContext rpc, Stopwatch stopwatch) throws IOException { String out = new String(); - out += rpc.getMessage().getName() + ": " + + out += rpc.getMessage().getName() + ": " + formatMillis(StatsPlugin.nanosToMillis(stopwatch.elapsedNanos())); return out; } - + private RenderableMessage renderMethod(Message message) { RenderableMessage out = new RenderableMessage(message.getName()); - + synchronized(this.statsPlugin.methodTimings) { FloatHistogram<?> hist = this.statsPlugin.methodTimings.get(message); out.numCalls = hist.getCount(); - + HashMap<String, String> latencyBar = new HashMap<String, String>(); // Fill in chart attributes for velocity latencyBar.put("type", "bar"); @@ -193,22 +193,22 @@ public class StatsServlet extends HttpServlet { latencyBar.put("numCalls", Integer.toString(hist.getCount())); latencyBar.put("avg", Float.toString(hist.getMean())); latencyBar.put("stdDev", Float.toString(hist.getUnbiasedStdDev())); - latencyBar.put("labelStr", + latencyBar.put("labelStr", Arrays.toString(hist.getSegmenter().getBoundaryLabels().toArray())); latencyBar.put("boundaryStr", Arrays.toString(escapeStringArray(hist.getSegmenter(). getBucketLabels()).toArray())); - latencyBar.put("dataStr", Arrays.toString(hist.getHistogram())); + latencyBar.put("dataStr", Arrays.toString(hist.getHistogram())); out.charts.add(latencyBar); - + HashMap<String, String> latencyDot = new HashMap<String, String>(); latencyDot.put("title", "Latency"); latencyDot.put("type", "dot"); - latencyDot.put("dataStr", + latencyDot.put("dataStr", Arrays.toString(hist.getRecentAdditions().toArray())); out.charts.add(latencyDot); } - + synchronized(this.statsPlugin.sendPayloads) { IntegerHistogram<?> hist = this.statsPlugin.sendPayloads.get(message); HashMap<String, String> latencyBar = new HashMap<String, String>(); @@ -219,22 +219,22 @@ public class StatsServlet extends HttpServlet { latencyBar.put("numCalls", Integer.toString(hist.getCount())); latencyBar.put("avg", Float.toString(hist.getMean())); latencyBar.put("stdDev", Float.toString(hist.getUnbiasedStdDev())); - latencyBar.put("labelStr", + latencyBar.put("labelStr", Arrays.toString(hist.getSegmenter().getBoundaryLabels().toArray())); latencyBar.put("boundaryStr", Arrays.toString(escapeStringArray(hist.getSegmenter(). getBucketLabels()).toArray())); - latencyBar.put("dataStr", Arrays.toString(hist.getHistogram())); + latencyBar.put("dataStr", Arrays.toString(hist.getHistogram())); out.charts.add(latencyBar); - + HashMap<String, String> latencyDot = new HashMap<String, String>(); latencyDot.put("title", "Send Payload"); latencyDot.put("type", "dot"); - latencyDot.put("dataStr", + latencyDot.put("dataStr", Arrays.toString(hist.getRecentAdditions().toArray())); out.charts.add(latencyDot); } - + synchronized(this.statsPlugin.receivePayloads) { IntegerHistogram<?> hist = this.statsPlugin.receivePayloads.get(message); HashMap<String, String> latencyBar = new HashMap<String, String>(); @@ -245,25 +245,25 @@ public class StatsServlet extends HttpServlet { latencyBar.put("numCalls", Integer.toString(hist.getCount())); latencyBar.put("avg", Float.toString(hist.getMean())); latencyBar.put("stdDev", Float.toString(hist.getUnbiasedStdDev())); - latencyBar.put("labelStr", + latencyBar.put("labelStr", Arrays.toString(hist.getSegmenter().getBoundaryLabels().toArray())); latencyBar.put("boundaryStr", Arrays.toString(escapeStringArray(hist.getSegmenter(). getBucketLabels()).toArray())); - latencyBar.put("dataStr", Arrays.toString(hist.getHistogram())); + latencyBar.put("dataStr", Arrays.toString(hist.getHistogram())); out.charts.add(latencyBar); - + HashMap<String, String> latencyDot = new HashMap<String, String>(); latencyDot.put("title", "Recv Payload"); latencyDot.put("type", "dot"); - latencyDot.put("dataStr", + latencyDot.put("dataStr", Arrays.toString(hist.getRecentAdditions().toArray())); out.charts.add(latencyDot); } - + return out; } - + private CharSequence formatMillis(float millis) { return String.format("%.0fms", millis); } http://git-wip-us.apache.org/repos/asf/avro/blob/da22ffcb/lang/java/ipc/src/test/java/org/apache/avro/DataFileInteropTest.java ---------------------------------------------------------------------- diff --git a/lang/java/ipc/src/test/java/org/apache/avro/DataFileInteropTest.java b/lang/java/ipc/src/test/java/org/apache/avro/DataFileInteropTest.java index dd64bf5..60862e2 100644 --- a/lang/java/ipc/src/test/java/org/apache/avro/DataFileInteropTest.java +++ b/lang/java/ipc/src/test/java/org/apache/avro/DataFileInteropTest.java @@ -32,7 +32,7 @@ import org.junit.Test; public class DataFileInteropTest { - private static final File DATAFILE_DIR = + private static final File DATAFILE_DIR = new File(System.getProperty("test.dir", "/tmp")); @BeforeClass @@ -45,8 +45,8 @@ public class DataFileInteropTest { public void testGeneratedGeneric() throws IOException { System.out.println("Reading with generic:"); DatumReaderProvider<Object> provider = new DatumReaderProvider<Object>() { - @Override public DatumReader<Object> get() { - return new GenericDatumReader<Object>(); + @Override public DatumReader<Object> get() { + return new GenericDatumReader<Object>(); } }; readFiles(provider); @@ -56,15 +56,15 @@ public class DataFileInteropTest { public void testGeneratedSpecific() throws IOException { System.out.println("Reading with specific:"); DatumReaderProvider<Interop> provider = new DatumReaderProvider<Interop>() { - @Override public DatumReader<Interop> get() { - return new SpecificDatumReader<Interop>(); + @Override public DatumReader<Interop> get() { + return new SpecificDatumReader<Interop>(); } }; readFiles(provider); } // Can't use same Interop.java as specific for reflect. - // This used to be the case because one used Utf8 and the other Sring, but + // This used to be the case because one used Utf8 and the other Sring, but // we use CharSequence now. // The current incompatibility is now that one uses byte[] and the other ByteBuffer @@ -78,8 +78,8 @@ public class DataFileInteropTest { // @Test // public void testGeneratedReflect() throws IOException { // DatumReaderProvider<Interop> provider = new DatumReaderProvider<Interop>() { -// @Override public DatumReader<Interop> get() { -// return new ReflectDatumReader<Interop>(Interop.class); +// @Override public DatumReader<Interop> get() { +// return new ReflectDatumReader<Interop>(Interop.class); // } // }; // readFiles(provider); @@ -95,7 +95,7 @@ public class DataFileInteropTest { } } } - + interface DatumReaderProvider<T extends Object> { public DatumReader<T> get(); } http://git-wip-us.apache.org/repos/asf/avro/blob/da22ffcb/lang/java/ipc/src/test/java/org/apache/avro/RPCMetaTestPlugin.java ---------------------------------------------------------------------- diff --git a/lang/java/ipc/src/test/java/org/apache/avro/RPCMetaTestPlugin.java b/lang/java/ipc/src/test/java/org/apache/avro/RPCMetaTestPlugin.java index 96e2e62..feefd80 100644 --- a/lang/java/ipc/src/test/java/org/apache/avro/RPCMetaTestPlugin.java +++ b/lang/java/ipc/src/test/java/org/apache/avro/RPCMetaTestPlugin.java @@ -30,157 +30,157 @@ import org.apache.avro.ipc.RPCPlugin; * This plugin tests handshake and call state by passing a string as metadata, * slowly building it up at each instrumentation point, testing it as it goes. * Finally, after the call or handshake is complete, the constructed string is - * tested. It also tests that RPC context data is appropriately filled in + * tested. It also tests that RPC context data is appropriately filled in * along the way by Requestor and Responder classes. */ public final class RPCMetaTestPlugin extends RPCPlugin { - + protected final String key; - + public RPCMetaTestPlugin(String keyname) { key = keyname; } - + @Override public void clientStartConnect(RPCContext context) { ByteBuffer buf = ByteBuffer.wrap("ap".getBytes()); context.requestHandshakeMeta().put(key, buf); } - + @Override public void serverConnecting(RPCContext context) { - + Assert.assertNotNull(context.requestHandshakeMeta()); Assert.assertNotNull(context.responseHandshakeMeta()); Assert.assertNull(context.getRequestPayload()); Assert.assertNull(context.getResponsePayload()); - + if (!context.requestHandshakeMeta().containsKey(key)) return; - + ByteBuffer buf = context.requestHandshakeMeta().get(key); Assert.assertNotNull(buf); Assert.assertNotNull(buf.array()); - + String partialstr = new String(buf.array()); Assert.assertNotNull(partialstr); Assert.assertEquals("partial string mismatch", "ap", partialstr); - + buf = ByteBuffer.wrap((partialstr + "ac").getBytes()); Assert.assertTrue(buf.remaining() > 0); context.responseHandshakeMeta().put(key, buf); } - + @Override public void clientFinishConnect(RPCContext context) { Map<String,ByteBuffer> handshakeMeta = context.responseHandshakeMeta(); - + Assert.assertNull(context.getRequestPayload()); Assert.assertNull(context.getResponsePayload()); Assert.assertNotNull(handshakeMeta); - + if (!handshakeMeta.containsKey(key)) return; - + ByteBuffer buf = handshakeMeta.get(key); Assert.assertNotNull(buf); Assert.assertNotNull(buf.array()); - + String partialstr = new String(buf.array()); Assert.assertNotNull(partialstr); Assert.assertEquals("partial string mismatch", "apac", partialstr); - + buf = ByteBuffer.wrap((partialstr + "he").getBytes()); Assert.assertTrue(buf.remaining() > 0); handshakeMeta.put(key, buf); - + checkRPCMetaMap(handshakeMeta); } - + @Override - public void clientSendRequest(RPCContext context) { + public void clientSendRequest(RPCContext context) { ByteBuffer buf = ByteBuffer.wrap("ap".getBytes()); context.requestCallMeta().put(key, buf); Assert.assertNotNull(context.getMessage()); Assert.assertNotNull(context.getRequestPayload()); Assert.assertNull(context.getResponsePayload()); } - + @Override public void serverReceiveRequest(RPCContext context) { Map<String,ByteBuffer> meta = context.requestCallMeta(); - - Assert.assertNotNull(meta); + + Assert.assertNotNull(meta); Assert.assertNotNull(context.getMessage()); Assert.assertNull(context.getResponsePayload()); - + if (!meta.containsKey(key)) return; - + ByteBuffer buf = meta.get(key); Assert.assertNotNull(buf); Assert.assertNotNull(buf.array()); - + String partialstr = new String(buf.array()); Assert.assertNotNull(partialstr); Assert.assertEquals("partial string mismatch", "ap", partialstr); - + buf = ByteBuffer.wrap((partialstr + "a").getBytes()); Assert.assertTrue(buf.remaining() > 0); meta.put(key, buf); } - + @Override public void serverSendResponse(RPCContext context) { Assert.assertNotNull(context.requestCallMeta()); Assert.assertNotNull(context.responseCallMeta()); Assert.assertNotNull(context.getResponsePayload()); - + if (!context.requestCallMeta().containsKey(key)) return; - + ByteBuffer buf = context.requestCallMeta().get(key); Assert.assertNotNull(buf); Assert.assertNotNull(buf.array()); - + String partialstr = new String(buf.array()); Assert.assertNotNull(partialstr); Assert.assertEquals("partial string mismatch", "apa", partialstr); - + buf = ByteBuffer.wrap((partialstr + "c").getBytes()); Assert.assertTrue(buf.remaining() > 0); context.responseCallMeta().put(key, buf); } - + @Override public void clientReceiveResponse(RPCContext context) { Assert.assertNotNull(context.responseCallMeta()); Assert.assertNotNull(context.getRequestPayload()); - + if (!context.responseCallMeta().containsKey(key)) return; - + ByteBuffer buf = context.responseCallMeta().get(key); Assert.assertNotNull(buf); Assert.assertNotNull(buf.array()); - + String partialstr = new String(buf.array()); Assert.assertNotNull(partialstr); Assert.assertEquals("partial string mismatch", "apac", partialstr); - + buf = ByteBuffer.wrap((partialstr + "he").getBytes()); Assert.assertTrue(buf.remaining() > 0); context.responseCallMeta().put(key, buf); - + checkRPCMetaMap(context.responseCallMeta()); } - + protected void checkRPCMetaMap(Map<String,ByteBuffer> rpcMeta) { Assert.assertNotNull(rpcMeta); Assert.assertTrue("key not present in map", rpcMeta.containsKey(key)); - + ByteBuffer keybuf = rpcMeta.get(key); Assert.assertNotNull(keybuf); Assert.assertTrue("key BB had nothing remaining", keybuf.remaining() > 0); - + String str = new String(keybuf.array()); Assert.assertEquals("apache", str); } - + } http://git-wip-us.apache.org/repos/asf/avro/blob/da22ffcb/lang/java/ipc/src/test/java/org/apache/avro/RandomData.java ---------------------------------------------------------------------- diff --git a/lang/java/ipc/src/test/java/org/apache/avro/RandomData.java b/lang/java/ipc/src/test/java/org/apache/avro/RandomData.java index 49f8857..85da034 100644 --- a/lang/java/ipc/src/test/java/org/apache/avro/RandomData.java +++ b/lang/java/ipc/src/test/java/org/apache/avro/RandomData.java @@ -47,7 +47,7 @@ public class RandomData implements Iterable<Object> { this.seed = seed; this.count = count; } - + public Iterator<Object> iterator() { return new Iterator<Object>() { private int n; @@ -60,7 +60,7 @@ public class RandomData implements Iterable<Object> { public void remove() { throw new UnsupportedOperationException(); } }; } - + @SuppressWarnings(value="unchecked") private static Object generate(Schema schema, Random random, int d) { switch (schema.getType()) { http://git-wip-us.apache.org/repos/asf/avro/blob/da22ffcb/lang/java/ipc/src/test/java/org/apache/avro/TestCompare.java ---------------------------------------------------------------------- diff --git a/lang/java/ipc/src/test/java/org/apache/avro/TestCompare.java b/lang/java/ipc/src/test/java/org/apache/avro/TestCompare.java index 8f02022..20efe17 100644 --- a/lang/java/ipc/src/test/java/org/apache/avro/TestCompare.java +++ b/lang/java/ipc/src/test/java/org/apache/avro/TestCompare.java @@ -184,7 +184,7 @@ public class TestCompare { s2.setKind(Kind.BAZ); check(schema, s1, s2, true, new SpecificDatumWriter<TestRecord>(schema), SpecificData.get()); - } + } private static <T> void check(String schemaJson, T o1, T o2) throws Exception { http://git-wip-us.apache.org/repos/asf/avro/blob/da22ffcb/lang/java/ipc/src/test/java/org/apache/avro/TestDataFileSpecific.java ---------------------------------------------------------------------- diff --git a/lang/java/ipc/src/test/java/org/apache/avro/TestDataFileSpecific.java b/lang/java/ipc/src/test/java/org/apache/avro/TestDataFileSpecific.java index 9d85a20..9f73b86 100644 --- a/lang/java/ipc/src/test/java/org/apache/avro/TestDataFileSpecific.java +++ b/lang/java/ipc/src/test/java/org/apache/avro/TestDataFileSpecific.java @@ -68,7 +68,7 @@ public class TestDataFileSpecific { DataFileReader<Foo> reader = new DataFileReader<Foo>(FILE, new SpecificDatumReader<Foo>()); int i = 0; - for (Foo f : reader) + for (Foo f : reader) Assert.assertEquals(""+(i++), f.getLabel().toString()); Assert.assertEquals(10, i); reader.close(); http://git-wip-us.apache.org/repos/asf/avro/blob/da22ffcb/lang/java/ipc/src/test/java/org/apache/avro/TestProtocolDatagram.java ---------------------------------------------------------------------- diff --git a/lang/java/ipc/src/test/java/org/apache/avro/TestProtocolDatagram.java b/lang/java/ipc/src/test/java/org/apache/avro/TestProtocolDatagram.java index 1f16acc..300ca37 100644 --- a/lang/java/ipc/src/test/java/org/apache/avro/TestProtocolDatagram.java +++ b/lang/java/ipc/src/test/java/org/apache/avro/TestProtocolDatagram.java @@ -35,7 +35,7 @@ public class TestProtocolDatagram extends TestProtocolSpecific { new InetSocketAddress("localhost", new Random().nextInt(10000)+10000)); } - + @Override public Transceiver createTransceiver() throws Exception{ return new DatagramTransceiver(new InetSocketAddress("localhost", server.getPort())); http://git-wip-us.apache.org/repos/asf/avro/blob/da22ffcb/lang/java/ipc/src/test/java/org/apache/avro/TestProtocolGeneric.java ---------------------------------------------------------------------- diff --git a/lang/java/ipc/src/test/java/org/apache/avro/TestProtocolGeneric.java b/lang/java/ipc/src/test/java/org/apache/avro/TestProtocolGeneric.java index 1309ea1..2e3dad2 100644 --- a/lang/java/ipc/src/test/java/org/apache/avro/TestProtocolGeneric.java +++ b/lang/java/ipc/src/test/java/org/apache/avro/TestProtocolGeneric.java @@ -90,7 +90,7 @@ public class TestProtocolGeneric { error.put("message", new Utf8("an error")); throw new AvroRemoteException(error); } - + throw new AvroRuntimeException("unexpected message: "+message.getName()); } @@ -111,7 +111,7 @@ public class TestProtocolGeneric { @Test public void testHello() throws IOException { - GenericRecord params = + GenericRecord params = new GenericData.Record(PROTOCOL.getMessages().get("hello").getRequest()); params.put("greeting", new Utf8("bob")); Utf8 response = (Utf8)requestor.request("hello", params);
