Updated Branches: refs/heads/flume-1.3.0 31309887a -> ee25a3bc6
FLUME-1565. FileChannel Decryption in RandomReader is not thread safe. (Brock Noland via Mike Percy) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/ee25a3bc Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/ee25a3bc Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/ee25a3bc Branch: refs/heads/flume-1.3.0 Commit: ee25a3bc60e492b0f856fb1fd005d916fefcbfed Parents: 3130988 Author: Mike Percy <[email protected]> Authored: Wed Sep 12 12:09:10 2012 -0700 Committer: Mike Percy <[email protected]> Committed: Wed Sep 12 12:45:34 2012 -0700 ---------------------------------------------------------------------- .../org/apache/flume/channel/file/LogFileV3.java | 52 ++++++++-- .../file/encryption/TestFileChannelEncryption.java | 80 +++++++++++++++ 2 files changed, 121 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/ee25a3bc/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java index ddd1744..5de6e82 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java @@ -25,6 +25,8 @@ import java.io.FileOutputStream; import java.io.IOException; import java.io.RandomAccessFile; import java.security.Key; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingDeque; import javax.annotation.Nullable; @@ -173,7 +175,12 @@ class LogFileV3 extends LogFile { static class RandomReader extends LogFile.RandomReader { private volatile boolean initialized; - private CipherProvider.Decryptor decryptor; + private volatile boolean encryptionEnabled; + private volatile Key key; + private volatile String cipherProvider; + private volatile byte[] parameters; + private BlockingQueue<CipherProvider.Decryptor> decryptors = + new LinkedBlockingDeque<CipherProvider.Decryptor>(); RandomReader(File file, @Nullable KeyProvider encryptionKeyProvider) throws IOException { super(file, encryptionKeyProvider); @@ -190,16 +197,17 @@ class LogFileV3 extends LogFile { " expected " + Integer.toHexString(getVersion()) + " file: " + getFile().getCanonicalPath()); } + encryptionEnabled = false; if(metaData.hasEncryption()) { if(getKeyProvider() == null) { throw new IllegalStateException("Data file is encrypted but no " + " provider was specified"); } ProtosFactory.LogFileEncryption encryption = metaData.getEncryption(); - Key key = getKeyProvider().getKey(encryption.getKeyAlias()); - decryptor = CipherProviderFactory. - getDecrypter(encryption.getCipherProvider(), key, - encryption.getParameters().toByteArray()); + key = getKeyProvider().getKey(encryption.getKeyAlias()); + cipherProvider = encryption.getCipherProvider(); + parameters = encryption.getParameters().toByteArray(); + encryptionEnabled = true; } } finally { try { @@ -209,6 +217,14 @@ class LogFileV3 extends LogFile { } } } + private CipherProvider.Decryptor getDecryptor() { + CipherProvider.Decryptor decryptor = decryptors.poll(); + if(decryptor == null) { + decryptor = CipherProviderFactory.getDecrypter(cipherProvider, key, + parameters); + } + return decryptor; + } @Override int getVersion() { return Serialization.VERSION_3; @@ -219,15 +235,29 @@ class LogFileV3 extends LogFile { // readers are opened right when the file is created and thus // empty. As such we wait to initialize until there is some // data before we we initialize - if(!initialized) { - initialized = true; - initialize(); + synchronized (this) { + if(!initialized) { + initialized = true; + initialize(); + } } byte[] buffer = readDelimitedBuffer(fileHandle); - if(decryptor != null) { - buffer = decryptor.decrypt(buffer); + CipherProvider.Decryptor decryptor = null; + boolean success = false; + try { + if(encryptionEnabled) { + decryptor = getDecryptor(); + buffer = decryptor.decrypt(buffer); + } + TransactionEventRecord event = TransactionEventRecord. + fromByteArray(buffer); + success = true; + return event; + } finally { + if(success && encryptionEnabled && decryptor != null) { + decryptors.offer(decryptor); + } } - return TransactionEventRecord.fromByteArray(buffer); } } http://git-wip-us.apache.org/repos/asf/flume/blob/ee25a3bc/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/TestFileChannelEncryption.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/TestFileChannelEncryption.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/TestFileChannelEncryption.java index 44af4c9..a0037b8 100644 --- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/TestFileChannelEncryption.java +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/TestFileChannelEncryption.java @@ -21,8 +21,14 @@ package org.apache.flume.channel.file.encryption; import static org.apache.flume.channel.file.TestUtils.*; import java.io.File; +import java.util.Collections; +import java.util.HashSet; import java.util.Map; import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.flume.ChannelException; import org.apache.flume.FlumeException; @@ -83,6 +89,80 @@ public class TestFileChannelEncryption extends TestFileChannelBase { } return overrides; } + /** + * Test fails without FLUME-1565 + */ + @Test + public void testThreadedConsume() throws Exception { + int numThreads = 20; + Map<String, String> overrides = getOverridesForEncryption(); + overrides.put(FileChannelConfiguration.CAPACITY, String.valueOf(10000)); + channel = createFileChannel(overrides); + channel.start(); + Assert.assertTrue(channel.isOpen()); + Executor executor = Executors.newFixedThreadPool(numThreads); + Set<String> in = fillChannel(channel, "threaded-consume"); + final AtomicBoolean error = new AtomicBoolean(false); + final CountDownLatch startLatch = new CountDownLatch(numThreads); + final CountDownLatch stopLatch = new CountDownLatch(numThreads); + final Set<String> out = Collections.synchronizedSet(new HashSet<String>()); + for (int i = 0; i < numThreads; i++) { + executor.execute(new Runnable() { + @Override + public void run() { + try { + startLatch.countDown(); + startLatch.await(); + out.addAll(takeEvents(channel, 10)); + } catch (Throwable t) { + error.set(true); + LOGGER.error("Error in take thread", t); + } finally { + stopLatch.countDown(); + } + } + }); + } + stopLatch.await(); + Assert.assertFalse(error.get()); + compareInputAndOut(in, out); + } + @Test + public void testThreadedProduce() throws Exception { + int numThreads = 20; + Map<String, String> overrides = getOverridesForEncryption(); + overrides.put(FileChannelConfiguration.CAPACITY, String.valueOf(10000)); + channel = createFileChannel(overrides); + channel.start(); + Assert.assertTrue(channel.isOpen()); + Executor executor = Executors.newFixedThreadPool(numThreads); + final AtomicBoolean error = new AtomicBoolean(false); + final CountDownLatch startLatch = new CountDownLatch(numThreads); + final CountDownLatch stopLatch = new CountDownLatch(numThreads); + final Set<String> in = Collections.synchronizedSet(new HashSet<String>()); + for (int i = 0; i < numThreads; i++) { + executor.execute(new Runnable() { + @Override + public void run() { + try { + startLatch.countDown(); + startLatch.await(); + in.addAll(putEvents(channel, "thread-produce", 10, 10000, true)); + } catch (Throwable t) { + error.set(true); + LOGGER.error("Error in put thread", t); + } finally { + stopLatch.countDown(); + } + } + }); + } + stopLatch.await(); + Set<String> out = consumeChannel(channel); + + Assert.assertFalse(error.get()); + compareInputAndOut(in, out); + } @Test public void testConfiguration() throws Exception { Map<String, String> overrides = Maps.newHashMap();
