Repository: hbase Updated Branches: refs/heads/master ac31ceb83 -> fa033b6a0
HBASE-15793 Port over AsyncCall improvements Signed-off-by: stack <st...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/fa033b6a Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/fa033b6a Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/fa033b6a Branch: refs/heads/master Commit: fa033b6a08020282004ec87353f743fc205140ef Parents: ac31ceb Author: Jurriaan Mous <jurm...@jurmo.us> Authored: Sat May 7 12:46:58 2016 +0200 Committer: stack <st...@apache.org> Committed: Sat May 7 10:38:38 2016 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hbase/client/Future.java | 31 ++++++ .../hbase/client/ResponseFutureListener.java | 30 ++++++ .../org/apache/hadoop/hbase/ipc/AsyncCall.java | 87 ++++++++++++---- .../hadoop/hbase/ipc/AsyncRpcChannel.java | 21 +++- .../hadoop/hbase/ipc/AsyncRpcChannelImpl.java | 50 ++++----- .../apache/hadoop/hbase/ipc/AsyncRpcClient.java | 101 ++++++++++++------- .../hadoop/hbase/ipc/IOExceptionConverter.java | 34 +++++++ .../hadoop/hbase/ipc/MessageConverter.java | 47 +++++++++ .../org/apache/hadoop/hbase/ipc/Promise.java | 38 +++++++ 9 files changed, 346 insertions(+), 93 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/fa033b6a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Future.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Future.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Future.java new file mode 100644 index 0000000..1247fd4 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Future.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +/** + * Promise for responses + * @param <V> Value type + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public interface Future<V> extends io.netty.util.concurrent.Future<V> { + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/fa033b6a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResponseFutureListener.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResponseFutureListener.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResponseFutureListener.java new file mode 100644 index 0000000..f23dc8f --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResponseFutureListener.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import io.netty.util.concurrent.GenericFutureListener; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * Specific interface for the Response future listener + * @param <V> Value type. + */ +@InterfaceAudience.Private +public interface ResponseFutureListener<V> + extends GenericFutureListener<Future<V>> { +} http://git-wip-us.apache.org/repos/asf/hbase/blob/fa033b6a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncCall.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncCall.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncCall.java index a5da0dc..3acf280 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncCall.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncCall.java @@ -19,8 +19,7 @@ package org.apache.hadoop.hbase.ipc; import com.google.protobuf.Descriptors; import com.google.protobuf.Message; -import io.netty.channel.EventLoop; -import io.netty.util.concurrent.DefaultPromise; +import java.io.IOException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.CellScanner; @@ -31,51 +30,72 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.ExceptionUtil; import org.apache.hadoop.ipc.RemoteException; -import java.io.IOException; - /** * Represents an Async Hbase call and its response. * * Responses are passed on to its given doneHandler and failures to the rpcController + * + * @param <T> Type of message returned + * @param <M> Message returned in communication to be converted */ @InterfaceAudience.Private -public class AsyncCall extends DefaultPromise<Message> { +public class AsyncCall<M extends Message, T> extends Promise<T> { private static final Log LOG = LogFactory.getLog(AsyncCall.class.getName()); final int id; + private final AsyncRpcChannelImpl channel; + final Descriptors.MethodDescriptor method; final Message param; - final PayloadCarryingRpcController controller; final Message responseDefaultType; + + private final MessageConverter<M,T> messageConverter; final long startTime; final long rpcTimeout; + private final IOExceptionConverter exceptionConverter; + + // For only the request + private final CellScanner cellScanner; + private final int priority; + final MetricsConnection.CallStats callStats; /** * Constructor * - * @param eventLoop for call + * @param channel which initiated call * @param connectId connection id * @param md the method descriptor * @param param parameters to send to Server - * @param controller controller for response + * @param cellScanner cellScanner containing cells to send as request * @param responseDefaultType the default response type + * @param messageConverter converts the messages to what is the expected output + * @param rpcTimeout timeout for this call in ms + * @param priority for this request */ - public AsyncCall(EventLoop eventLoop, int connectId, Descriptors.MethodDescriptor md, Message - param, PayloadCarryingRpcController controller, Message responseDefaultType, + public AsyncCall(AsyncRpcChannelImpl channel, int connectId, Descriptors.MethodDescriptor + md, Message param, CellScanner cellScanner, M responseDefaultType, MessageConverter<M, T> + messageConverter, IOExceptionConverter exceptionConverter, long rpcTimeout, int priority, MetricsConnection.CallStats callStats) { - super(eventLoop); + super(channel.getEventExecutor()); + this.channel = channel; this.id = connectId; this.method = md; this.param = param; - this.controller = controller; this.responseDefaultType = responseDefaultType; + this.messageConverter = messageConverter; + this.exceptionConverter = exceptionConverter; + this.startTime = EnvironmentEdgeManager.currentTime(); - this.rpcTimeout = controller.hasCallTimeout() ? controller.getCallTimeout() : 0; + this.rpcTimeout = rpcTimeout; + + this.priority = priority; + this.cellScanner = cellScanner; + this.callStats = callStats; } @@ -101,17 +121,19 @@ public class AsyncCall extends DefaultPromise<Message> { * @param value to set * @param cellBlockScanner to set */ - public void setSuccess(Message value, CellScanner cellBlockScanner) { - if (cellBlockScanner != null) { - controller.setCellScanner(cellBlockScanner); - } - + public void setSuccess(M value, CellScanner cellBlockScanner) { if (LOG.isTraceEnabled()) { long callTime = EnvironmentEdgeManager.currentTime() - startTime; LOG.trace("Call: " + method.getName() + ", callTime: " + callTime + "ms"); } - this.setSuccess(value); + try { + this.setSuccess( + this.messageConverter.convert(value, cellBlockScanner) + ); + } catch (IOException e) { + this.setFailed(e); + } } /** @@ -127,6 +149,10 @@ public class AsyncCall extends DefaultPromise<Message> { exception = ((RemoteException) exception).unwrapRemoteException(); } + if (this.exceptionConverter != null) { + exception = this.exceptionConverter.convert(exception); + } + this.setFailure(exception); } @@ -138,4 +164,27 @@ public class AsyncCall extends DefaultPromise<Message> { public long getRpcTimeout() { return rpcTimeout; } + + + /** + * @return Priority for this call + */ + public int getPriority() { + return priority; + } + + /** + * Get the cellScanner for this request. + * @return CellScanner + */ + public CellScanner cellScanner() { + return cellScanner; + } + + @Override + public boolean cancel(boolean mayInterupt){ + this.channel.removePendingCall(this.id); + return super.cancel(mayInterupt); + } + } http://git-wip-us.apache.org/repos/asf/hbase/blob/fa033b6a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java index 60dc5e4..bd4be5a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java @@ -21,11 +21,12 @@ import com.google.protobuf.Descriptors; import com.google.protobuf.Message; import io.netty.util.concurrent.EventExecutor; -import io.netty.util.concurrent.Promise; import java.net.InetSocketAddress; +import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Future; import org.apache.hadoop.hbase.client.MetricsConnection; /** @@ -37,13 +38,23 @@ public interface AsyncRpcChannel { /** * Calls method on channel * @param method to call - * @param controller to run call with * @param request to send + * @param cellScanner with cells to send * @param responsePrototype to construct response with + * @param messageConverter for the messages to expected result + * @param exceptionConverter for converting exceptions + * @param rpcTimeout timeout for request + * @param priority for request + * @param callStats collects stats of the call + * @return Promise for the response Message */ - Promise<Message> callMethod(final Descriptors.MethodDescriptor method, - final PayloadCarryingRpcController controller, final Message request, - final Message responsePrototype, MetricsConnection.CallStats callStats); + + <R extends Message, O> Future<O> callMethod( + final Descriptors.MethodDescriptor method, + final Message request,final CellScanner cellScanner, + R responsePrototype, MessageConverter<R, O> messageConverter, IOExceptionConverter + exceptionConverter, long rpcTimeout, int priority, MetricsConnection.CallStats callStats); + /** * Get the EventLoop on which this channel operated http://git-wip-us.apache.org/repos/asf/hbase/blob/fa033b6a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannelImpl.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannelImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannelImpl.java index 7cc9e78..5af2354 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannelImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannelImpl.java @@ -19,8 +19,6 @@ package org.apache.hadoop.hbase.ipc; import com.google.protobuf.Descriptors; import com.google.protobuf.Message; -import com.google.protobuf.RpcCallback; - import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufOutputStream; @@ -32,7 +30,6 @@ import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.util.Timeout; import io.netty.util.TimerTask; import io.netty.util.concurrent.GenericFutureListener; -import io.netty.util.concurrent.Promise; import java.io.IOException; import java.net.ConnectException; @@ -51,8 +48,10 @@ import javax.security.sasl.SaslException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Future; import org.apache.hadoop.hbase.client.MetricsConnection; import org.apache.hadoop.hbase.exceptions.ConnectionClosingException; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; @@ -291,36 +290,25 @@ public class AsyncRpcChannelImpl implements AsyncRpcChannel { /** * Calls method on channel * @param method to call - * @param controller to run call with * @param request to send + * @param cellScanner with cells to send * @param responsePrototype to construct response with + * @param rpcTimeout timeout for request + * @param priority for request + * @return Promise for the response Message */ - public Promise<Message> callMethod(final Descriptors.MethodDescriptor method, - final PayloadCarryingRpcController controller, final Message request, - final Message responsePrototype, MetricsConnection.CallStats callStats) { - final AsyncCall call = new AsyncCall(channel.eventLoop(), client.callIdCnt.getAndIncrement(), - method, request, controller, responsePrototype, callStats); - controller.notifyOnCancel(new RpcCallback<Object>() { - @Override - public void run(Object parameter) { - // TODO: do not need to call AsyncCall.setFailed? - synchronized (pendingCalls) { - pendingCalls.remove(call.id); - } - } - }); - // TODO: this should be handled by PayloadCarryingRpcController. - if (controller.isCanceled()) { - // To finish if the call was cancelled before we set the notification (race condition) - call.cancel(true); - return call; - } - + public <R extends Message, O> Future<O> callMethod( + final Descriptors.MethodDescriptor method, + final Message request,final CellScanner cellScanner, + R responsePrototype, MessageConverter<R, O> messageConverter, IOExceptionConverter + exceptionConverter, long rpcTimeout, int priority, MetricsConnection.CallStats callStats) { + final AsyncCall<R, O> call = new AsyncCall<>(this, client.callIdCnt.getAndIncrement(), + method, request, cellScanner, responsePrototype, messageConverter, exceptionConverter, + rpcTimeout, priority, callStats); synchronized (pendingCalls) { if (closed) { - Promise<Message> promise = channel.eventLoop().newPromise(); - promise.setFailure(new ConnectException()); - return promise; + call.setFailure(new ConnectException()); + return call; } pendingCalls.put(call.id, call); // Add timeout for cleanup if none is present @@ -398,7 +386,7 @@ public class AsyncRpcChannelImpl implements AsyncRpcChannel { .setParentId(s.getSpanId()).setTraceId(s.getTraceId())); } - ByteBuffer cellBlock = client.buildCellBlock(call.controller.cellScanner()); + ByteBuffer cellBlock = client.buildCellBlock(call.cellScanner()); if (cellBlock != null) { final RPCProtos.CellBlockMeta.Builder cellBlockBuilder = RPCProtos.CellBlockMeta .newBuilder(); @@ -406,8 +394,8 @@ public class AsyncRpcChannelImpl implements AsyncRpcChannel { requestHeaderBuilder.setCellBlockMeta(cellBlockBuilder.build()); } // Only pass priority if there one. Let zero be same as no priority. - if (call.controller.getPriority() != PayloadCarryingRpcController.PRIORITY_UNSET) { - requestHeaderBuilder.setPriority(call.controller.getPriority()); + if (call.getPriority() != PayloadCarryingRpcController.PRIORITY_UNSET) { + requestHeaderBuilder.setPriority(call.getPriority()); } RPCProtos.RequestHeader rh = requestHeaderBuilder.build(); http://git-wip-us.apache.org/repos/asf/hbase/blob/fa033b6a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java index 8d9a5b3..2fdc1ec 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java @@ -17,6 +17,12 @@ */ package org.apache.hadoop.hbase.ipc; +import com.google.common.annotations.VisibleForTesting; +import com.google.protobuf.Descriptors; +import com.google.protobuf.Message; +import com.google.protobuf.RpcCallback; +import com.google.protobuf.RpcChannel; +import com.google.protobuf.RpcController; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelInitializer; @@ -30,9 +36,6 @@ import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.util.HashedWheelTimer; import io.netty.util.Timeout; import io.netty.util.TimerTask; -import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.GenericFutureListener; -import io.netty.util.concurrent.Promise; import java.io.IOException; import java.net.InetSocketAddress; @@ -52,7 +55,9 @@ import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Future; import org.apache.hadoop.hbase.client.MetricsConnection; +import org.apache.hadoop.hbase.client.ResponseFutureListener; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.JVM; @@ -60,13 +65,6 @@ import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.PoolMap; import org.apache.hadoop.hbase.util.Threads; -import com.google.common.annotations.VisibleForTesting; -import com.google.protobuf.Descriptors; -import com.google.protobuf.Message; -import com.google.protobuf.RpcCallback; -import com.google.protobuf.RpcChannel; -import com.google.protobuf.RpcController; - /** * Netty client for the requests and responses */ @@ -242,7 +240,18 @@ public class AsyncRpcClient extends AbstractRpcClient { } final AsyncRpcChannel connection = createRpcChannel(md.getService().getName(), addr, ticket); - Promise<Message> promise = connection.callMethod(md, pcrc, param, returnType, callStats); + final Future<Message> promise = connection.callMethod(md, param, pcrc.cellScanner(), returnType, + getMessageConverterWithRpcController(pcrc), null, pcrc.getCallTimeout(), pcrc.getPriority(), + callStats); + + pcrc.notifyOnCancel(new RpcCallback<Object>() { + @Override + public void run(Object parameter) { + // Will automatically fail the promise with CancellationException + promise.cancel(true); + } + }); + long timeout = pcrc.hasCallTimeout() ? pcrc.getCallTimeout() : 0; try { Message response = timeout > 0 ? promise.get(timeout, TimeUnit.MILLISECONDS) : promise.get(); @@ -259,6 +268,18 @@ public class AsyncRpcClient extends AbstractRpcClient { } } + private MessageConverter<Message, Message> getMessageConverterWithRpcController( + final PayloadCarryingRpcController pcrc) { + return new + MessageConverter<Message, Message>() { + @Override + public Message convert(Message msg, CellScanner cellScanner) { + pcrc.setCellScanner(cellScanner); + return msg; + } + }; + } + /** * Call method async */ @@ -269,42 +290,46 @@ public class AsyncRpcClient extends AbstractRpcClient { try { connection = createRpcChannel(md.getService().getName(), addr, ticket); final MetricsConnection.CallStats cs = MetricsConnection.newCallStats(); - GenericFutureListener<Future<Message>> listener = - new GenericFutureListener<Future<Message>>() { - @Override - public void operationComplete(Future<Message> future) throws Exception { - cs.setCallTimeMs(EnvironmentEdgeManager.currentTime() - cs.getStartTime()); - if (metrics != null) { - metrics.updateRpc(md, param, cs); - } - if (LOG.isTraceEnabled()) { - LOG.trace("Call: " + md.getName() + ", callTime: " + cs.getCallTimeMs() + "ms"); + + ResponseFutureListener<Message> listener = + new ResponseFutureListener<Message>() { + @Override + public void operationComplete(Future<Message> future) throws Exception { + cs.setCallTimeMs(EnvironmentEdgeManager.currentTime() - cs.getStartTime()); + if (metrics != null) { + metrics.updateRpc(md, param, cs); + } + if (LOG.isTraceEnabled()) { + LOG.trace("Call: " + md.getName() + ", callTime: " + cs.getCallTimeMs() + "ms"); + } + if (!future.isSuccess()) { + Throwable cause = future.cause(); + if (cause instanceof IOException) { + pcrc.setFailed((IOException) cause); + } else { + pcrc.setFailed(new IOException(cause)); } - if (!future.isSuccess()) { - Throwable cause = future.cause(); + } else { + try { + done.run(future.get()); + } catch (ExecutionException e) { + Throwable cause = e.getCause(); if (cause instanceof IOException) { pcrc.setFailed((IOException) cause); } else { pcrc.setFailed(new IOException(cause)); } - } else { - try { - done.run(future.get()); - } catch (ExecutionException e) { - Throwable cause = e.getCause(); - if (cause instanceof IOException) { - pcrc.setFailed((IOException) cause); - } else { - pcrc.setFailed(new IOException(cause)); - } - } catch (InterruptedException e) { - pcrc.setFailed(new IOException(e)); - } + } catch (InterruptedException e) { + pcrc.setFailed(new IOException(e)); } } - }; + } + }; cs.setStartTime(EnvironmentEdgeManager.currentTime()); - connection.callMethod(md, pcrc, param, returnType, cs).addListener(listener); + connection.callMethod(md, param, pcrc.cellScanner(), returnType, + getMessageConverterWithRpcController(pcrc), null, + pcrc.getCallTimeout(), pcrc.getPriority(), cs) + .addListener(listener); } catch (StoppedRpcClientException|FailedServerException e) { pcrc.setFailed(e); } http://git-wip-us.apache.org/repos/asf/hbase/blob/fa033b6a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IOExceptionConverter.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IOExceptionConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IOExceptionConverter.java new file mode 100644 index 0000000..09dda09 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IOExceptionConverter.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import java.io.IOException; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * Converts exceptions to other exceptions + */ +@InterfaceAudience.Private +public interface IOExceptionConverter { + /** + * Converts given IOException + * @param e exception to convert + * @return converted IOException + */ + IOException convert(IOException e); +} http://git-wip-us.apache.org/repos/asf/hbase/blob/fa033b6a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/MessageConverter.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/MessageConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/MessageConverter.java new file mode 100644 index 0000000..527ac95 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/MessageConverter.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import com.google.protobuf.Message; +import java.io.IOException; +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * Interface to convert Messages to specific types + * @param <M> Message Type to convert + * @param <O> Output Type + */ +@InterfaceAudience.Private +public interface MessageConverter<M,O> { + /** + * Converts Message to Output + * @param msg to convert + * @param cellScanner to use for conversion + * @return Output + * @throws IOException if message could not be converted to response + */ + O convert(M msg, CellScanner cellScanner) throws IOException; + + MessageConverter<Message,Message> NO_CONVERTER = new MessageConverter<Message, Message>() { + @Override + public Message convert(Message msg, CellScanner cellScanner) throws IOException { + return null; + } + }; +} http://git-wip-us.apache.org/repos/asf/hbase/blob/fa033b6a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Promise.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Promise.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Promise.java new file mode 100644 index 0000000..0d05db8 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Promise.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import io.netty.util.concurrent.DefaultPromise; +import io.netty.util.concurrent.EventExecutor; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Future; + +/** + * Abstract response promise + * @param <T> Type of result contained in Promise + */ +@InterfaceAudience.Private +public class Promise<T> extends DefaultPromise<T> implements Future<T> { + /** + * Constructor + * @param eventLoop to handle events on + */ + public Promise(EventExecutor eventLoop) { + super(eventLoop); + } +}