Repository: flume Updated Branches: refs/heads/trunk f46bee03e -> ee4999bc2
FLUME-2910. AsyncHBaseSink: Failure callbacks should log the exception that caused them (Abraham Fine via Mike Percy) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/ee4999bc Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/ee4999bc Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/ee4999bc Branch: refs/heads/trunk Commit: ee4999bc23f42bc300ed87b0d46fd96418d6a185 Parents: f46bee0 Author: Mike Percy <[email protected]> Authored: Fri Jun 10 12:52:29 2016 -0700 Committer: Mike Percy <[email protected]> Committed: Fri Jun 10 12:52:29 2016 -0700 ---------------------------------------------------------------------- .../java/org/apache/flume/sink/hbase/AsyncHBaseSink.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/ee4999bc/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java index c1ff0c4..28f0de1 100644 --- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java +++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java @@ -191,15 +191,15 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable { Callback<Object, Object> putSuccessCallback = new SuccessCallback<Object, Object>( lock, callbacksReceived, condition); - Callback<Object, Object> putFailureCallback = - new FailureCallback<Object, Object>( + Callback<Object, Exception> putFailureCallback = + new FailureCallback<Object, Exception>( lock, callbacksReceived, txnFail, condition); Callback<Long, Long> incrementSuccessCallback = new SuccessCallback<Long, Long>( lock, callbacksReceived, condition); - Callback<Long, Long> incrementFailureCallback = - new FailureCallback<Long, Long>( + Callback<Long, Exception> incrementFailureCallback = + new FailureCallback<Long, Exception>( lock, callbacksReceived, txnFail, condition); Status status = Status.READY; @@ -622,7 +622,7 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable { } } - private class FailureCallback<R,T> implements Callback<R,T> { + private class FailureCallback<R,T extends Exception> implements Callback<R,T> { private Lock lock; private AtomicInteger callbacksReceived; private AtomicBoolean txnFail; @@ -639,6 +639,7 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable { @Override public R call(T arg) throws Exception { + logger.error("failure callback:", arg); if (isTimeoutTesting) { //tests set timeout to 10 seconds, so sleep for 4 seconds try {
