Repository: flume Updated Branches: refs/heads/trunk 18453d3ef -> f215374bd
FLUME-3049. Make HDFS sink rotate more reliably in secure mode It was reported that the HDFS sink had a bug where file rotation was not reliable in secure mode. After investigating, it turns out that this was caused by a bug in the FlumeAuthenticator code: A "try" block in UGIExecutor.execute() was wrapping exceptions (such as IOException) with a SecurityException. That exception wrapping was breaking the contract of BucketWriter because the caller (HDFSEventSink) did not understand how to react to the SecurityException. This also likely had other negative effects in exceptional cases. The following changes are included in this patch: * Remove the exception wrapping in UGIExecutor.execute(). * Add tests for exception propagation in FlumeAuthenticator implementations. * Add testRotateBucketOnIOException() to TestBucketWriter as a regression test for the HDFS sink issue. Closes #106. Reviewers: Attila Simon, Mike Percy (Denes Arvay via Mike Percy) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/f215374b Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/f215374b Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/f215374b Branch: refs/heads/trunk Commit: f215374bdf9a08b16fa7ccd3b40098024afe8677 Parents: 18453d3 Author: Denes Arvay <[email protected]> Authored: Fri Jan 27 11:43:27 2017 +0100 Committer: Mike Percy <[email protected]> Committed: Wed Feb 1 09:34:44 2017 -0800 ---------------------------------------------------------------------- .../java/org/apache/flume/auth/UGIExecutor.java | 9 +--- .../flume/auth/TestFlumeAuthenticator.java | 50 ++++++++++++++++++-- .../flume/sink/hdfs/TestBucketWriter.java | 49 +++++++++++++++++++ 3 files changed, 95 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/f215374b/flume-ng-auth/src/main/java/org/apache/flume/auth/UGIExecutor.java ---------------------------------------------------------------------- diff --git a/flume-ng-auth/src/main/java/org/apache/flume/auth/UGIExecutor.java b/flume-ng-auth/src/main/java/org/apache/flume/auth/UGIExecutor.java index a6ebd86..41308b4 100644 --- a/flume-ng-auth/src/main/java/org/apache/flume/auth/UGIExecutor.java +++ b/flume-ng-auth/src/main/java/org/apache/flume/auth/UGIExecutor.java @@ -43,14 +43,7 @@ class UGIExecutor implements PrivilegedExecutor { @Override public <T> T execute(PrivilegedExceptionAction<T> action) throws Exception { ensureValidAuth(); - try { - return ugi.doAs(action); - } catch (IOException ex) { - throw new SecurityException("Privileged action failed", ex); - } catch (InterruptedException ex) { - Thread.interrupted(); - throw new SecurityException(ex); - } + return ugi.doAs(action); } private void ensureValidAuth() { http://git-wip-us.apache.org/repos/asf/flume/blob/f215374b/flume-ng-auth/src/test/java/org/apache/flume/auth/TestFlumeAuthenticator.java ---------------------------------------------------------------------- diff --git a/flume-ng-auth/src/test/java/org/apache/flume/auth/TestFlumeAuthenticator.java b/flume-ng-auth/src/test/java/org/apache/flume/auth/TestFlumeAuthenticator.java index 0dc8872..be84646 100644 --- a/flume-ng-auth/src/test/java/org/apache/flume/auth/TestFlumeAuthenticator.java +++ b/flume-ng-auth/src/test/java/org/apache/flume/auth/TestFlumeAuthenticator.java @@ -18,12 +18,14 @@ package org.apache.flume.auth; import org.apache.hadoop.minikdc.MiniKdc; +import org.junit.After; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; import java.io.File; import java.io.IOException; +import java.security.PrivilegedExceptionAction; import java.util.Properties; import static org.junit.Assert.assertEquals; @@ -66,6 +68,12 @@ public class TestFlumeAuthenticator { } } + @After + public void tearDown() { + // Clear the previous statically stored logged in credentials + FlumeAuthenticationUtil.clearCredentials(); + } + @Test public void testNullLogin() throws IOException { String principal = null; @@ -82,8 +90,6 @@ public class TestFlumeAuthenticator { String keytab = flumeKeytab.getAbsolutePath(); String expResult = principal; - // Clear the previous statically stored logged in credentials - FlumeAuthenticationUtil.clearCredentials(); FlumeAuthenticator authenticator = FlumeAuthenticationUtil.getAuthenticator( principal, keytab); assertTrue(authenticator.isAuthenticated()); @@ -110,6 +116,43 @@ public class TestFlumeAuthenticator { } } + /** + * Test whether the exception raised in the <code>PrivilegedExceptionAction</code> gets + * propagated as-is from {@link KerberosAuthenticator#execute(PrivilegedExceptionAction)}. + */ + @Test(expected = IOException.class) + public void testKerberosAuthenticatorExceptionInExecute() throws Exception { + String principal = flumePrincipal; + String keytab = flumeKeytab.getAbsolutePath(); + + FlumeAuthenticator authenticator = FlumeAuthenticationUtil.getAuthenticator(principal, keytab); + assertTrue(authenticator instanceof KerberosAuthenticator); + + authenticator.execute(new PrivilegedExceptionAction<Object>() { + @Override + public Object run() throws Exception { + throw new IOException(); + } + }); + } + + /** + * Test whether the exception raised in the <code>PrivilegedExceptionAction</code> gets + * propagated as-is from {@link SimpleAuthenticator#execute(PrivilegedExceptionAction)}. + */ + @Test(expected = IOException.class) + public void testSimpleAuthenticatorExceptionInExecute() throws Exception { + FlumeAuthenticator authenticator = FlumeAuthenticationUtil.getAuthenticator(null, null); + assertTrue(authenticator instanceof SimpleAuthenticator); + + authenticator.execute(new PrivilegedExceptionAction<Object>() { + @Override + public Object run() throws Exception { + throw new IOException(); + } + }); + } + @Test public void testProxyAs() throws IOException { String username = "alice"; @@ -138,9 +181,6 @@ public class TestFlumeAuthenticator { kdc.createPrincipal(keytab, principal); String expResult = principal + "@" + kdc.getRealm(); - // Clear the previous statically stored logged in credentials - FlumeAuthenticationUtil.clearCredentials(); - FlumeAuthenticator authenticator = FlumeAuthenticationUtil.getAuthenticator( principal, keytab.getAbsolutePath()); assertTrue(authenticator.isAuthenticated()); http://git-wip-us.apache.org/repos/asf/flume/blob/f215374b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java index 78241a1..4221a5d 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java @@ -36,6 +36,7 @@ import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; +import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -448,4 +449,52 @@ public class TestBucketWriter { "but got " + bucketWriter.renameTries.get(), bucketWriter.renameTries.get() == numberOfRetriesRequired); } + + // Test that we don't swallow IOExceptions in secure mode. We should close the bucket writer + // and rethrow the exception. Regression test for FLUME-3049. + @Test + public void testRotateBucketOnIOException() throws IOException, InterruptedException { + MockHDFSWriter hdfsWriter = Mockito.spy(new MockHDFSWriter()); + PrivilegedExecutor ugiProxy = + FlumeAuthenticationUtil.getAuthenticator(null, null).proxyAs("alice"); + + final int ROLL_COUNT = 1; // Cause a roll after every successful append(). + BucketWriter bucketWriter = new BucketWriter( + 0, 0, ROLL_COUNT, 0, ctx, "/tmp", "file", "", ".tmp", null, null, + SequenceFile.CompressionType.NONE, hdfsWriter, timedRollerPool, ugiProxy, + new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0, null, null, 30000, + Executors.newSingleThreadExecutor(), 0, 0); + + Event e = EventBuilder.withBody("foo", Charsets.UTF_8); + + // Write one event successfully. + bucketWriter.append(e); + + // Fail the next write. + IOException expectedIOException = new IOException("Test injected IOException"); + Mockito.doThrow(expectedIOException).when(hdfsWriter) + .append(Mockito.any(Event.class)); + + // The second time we try to write we should get an IOException. + try { + bucketWriter.append(e); + Assert.fail("Expected IOException wasn't thrown during append"); + } catch (IOException ex) { + Assert.assertEquals(expectedIOException, ex); + logger.info("Caught expected IOException", ex); + } + + // The third time we try to write we should get a BucketClosedException, because the + // BucketWriter should attempt to close itself before rethrowing the IOException on the first + // call. + try { + bucketWriter.append(e); + Assert.fail("BucketWriter should be already closed, BucketClosedException expected"); + } catch (BucketClosedException ex) { + logger.info("Caught expected BucketClosedException", ex); + } + + Assert.assertEquals("events written", 1, hdfsWriter.getEventsWritten()); + Assert.assertEquals("2 files should be closed", 2, hdfsWriter.getFilesClosed()); + } }
