Updated Branches: refs/heads/flume-1.4 4a9786468 -> 3f9408b2c
FLUME-2044. HDFS Sink impersonation fails after the first file. (Hari Shreedharan via Mike Percy) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/3f9408b2 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/3f9408b2 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/3f9408b2 Branch: refs/heads/flume-1.4 Commit: 3f9408b2c983fa5904f733b053319a1e735b9a0a Parents: 4a97864 Author: Mike Percy <[email protected]> Authored: Tue May 14 22:32:30 2013 -0700 Committer: Mike Percy <[email protected]> Committed: Tue May 14 22:33:24 2013 -0700 ---------------------------------------------------------------------- .../org/apache/flume/sink/hdfs/BucketWriter.java | 68 ++++++--------- 1 files changed, 28 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/3f9408b2/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java index 0897c97..af65167 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java @@ -187,20 +187,6 @@ class BucketWriter { * @throws InterruptedException */ private void open() throws IOException, InterruptedException { - runPrivileged(new PrivilegedExceptionAction<Void>() { - public Void run() throws Exception { - doOpen(); - return null; - } - }); - } - - /** - * doOpen() must only be called by open() - * @throws IOException - * @throws InterruptedException - */ - private void doOpen() throws IOException, InterruptedException { if ((filePath == null) || (writer == null)) { throw new IOException("Invalid file settings"); } @@ -233,7 +219,7 @@ class BucketWriter { targetPath = filePath + DIRECTORY_DELIMITER + fullFileName; LOG.info("Creating " + bucketPath); - callWithTimeout(new Callable<Void>() { + callWithTimeout(new CallRunner<Void>() { @Override public Void call() throws Exception { if (codeC == null) { @@ -291,23 +277,10 @@ class BucketWriter { public synchronized void close() throws IOException, InterruptedException { checkAndThrowInterruptedException(); flush(); - runPrivileged(new PrivilegedExceptionAction<Void>() { - public Void run() throws Exception { - doClose(); - return null; - } - }); - } - - /** - * doClose() must only be called by close() - * @throws IOException - */ - private void doClose() throws IOException, InterruptedException { LOG.debug("Closing {}", bucketPath); if (isOpen) { try { - callWithTimeout(new Callable<Void>() { + callWithTimeout(new CallRunner<Void>() { @Override public Void call() throws Exception { writer.close(); // could block @@ -345,12 +318,7 @@ class BucketWriter { public synchronized void flush() throws IOException, InterruptedException { checkAndThrowInterruptedException(); if (!isBatchComplete()) { - runPrivileged(new PrivilegedExceptionAction<Void>() { - public Void run() throws Exception { - doFlush(); - return null; - } - }); + doFlush(); if(idleTimeout > 0) { // if the future exists and couldn't be cancelled, that would mean it has already run @@ -384,7 +352,7 @@ class BucketWriter { * @throws IOException */ private void doFlush() throws IOException, InterruptedException { - callWithTimeout(new Callable<Void>() { + callWithTimeout(new CallRunner<Void>() { @Override public Void call() throws Exception { writer.sync(); // could block @@ -447,7 +415,7 @@ class BucketWriter { // write the event try { sinkCounter.incrementEventDrainAttemptCount(); - callWithTimeout(new Callable<Void>() { + callWithTimeout(new CallRunner<Void>() { @Override public Void call() throws Exception { writer.append(event); // could block @@ -514,7 +482,7 @@ class BucketWriter { final Path srcPath = new Path(bucketPath); final Path dstPath = new Path(targetPath); - callWithTimeout(new Callable<Object>() { + callWithTimeout(new CallRunner<Object>() { @Override public Object call() throws Exception { if(fileSystem.exists(srcPath)) { // could block @@ -559,9 +527,19 @@ class BucketWriter { * for the specified amount of time in milliseconds. In case of timeout * cancel the callable and throw an IOException */ - private <T> T callWithTimeout(Callable<T> callable) + private <T> T callWithTimeout(final CallRunner<T> callRunner) throws IOException, InterruptedException { - Future<T> future = callTimeoutPool.submit(callable); + Future<T> future = callTimeoutPool.submit(new Callable<T>() { + @Override + public T call() throws Exception { + return runPrivileged(new PrivilegedExceptionAction<T>() { + @Override + public T run() throws Exception { + return callRunner.call(); + } + }); + } + }); try { if (callTimeout > 0) { return future.get(callTimeout, TimeUnit.MILLISECONDS); @@ -597,4 +575,14 @@ class BucketWriter { } } + /** + * Simple interface whose <tt>call</tt> method is called by + * {#callWithTimeout} in a new thread inside a + * {@linkplain java.security.PrivilegedExceptionAction#run()} call. + * @param <T> + */ + private interface CallRunner<T> { + T call() throws Exception; + } + }
