Updated Branches:
  refs/heads/flume-1.3.0 31309887a -> ee25a3bc6

FLUME-1565. FileChannel Decryption in RandomReader is not thread safe.

(Brock Noland via Mike Percy)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/ee25a3bc
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/ee25a3bc
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/ee25a3bc

Branch: refs/heads/flume-1.3.0
Commit: ee25a3bc60e492b0f856fb1fd005d916fefcbfed
Parents: 3130988
Author: Mike Percy <[email protected]>
Authored: Wed Sep 12 12:09:10 2012 -0700
Committer: Mike Percy <[email protected]>
Committed: Wed Sep 12 12:45:34 2012 -0700

----------------------------------------------------------------------
 .../org/apache/flume/channel/file/LogFileV3.java   |   52 ++++++++--
 .../file/encryption/TestFileChannelEncryption.java |   80 +++++++++++++++
 2 files changed, 121 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/ee25a3bc/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java
 
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java
index ddd1744..5de6e82 100644
--- 
a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java
+++ 
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java
@@ -25,6 +25,8 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.security.Key;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingDeque;
 
 import javax.annotation.Nullable;
 
@@ -173,7 +175,12 @@ class LogFileV3 extends LogFile {
 
   static class RandomReader extends LogFile.RandomReader {
     private volatile boolean initialized;
-    private CipherProvider.Decryptor decryptor;
+    private volatile boolean encryptionEnabled;
+    private volatile Key key;
+    private volatile String cipherProvider;
+    private volatile byte[] parameters;
+    private BlockingQueue<CipherProvider.Decryptor> decryptors =
+        new LinkedBlockingDeque<CipherProvider.Decryptor>();
     RandomReader(File file, @Nullable KeyProvider encryptionKeyProvider)
         throws IOException {
       super(file, encryptionKeyProvider);
@@ -190,16 +197,17 @@ class LogFileV3 extends LogFile {
               " expected " + Integer.toHexString(getVersion())
               + " file: " + getFile().getCanonicalPath());
         }
+        encryptionEnabled = false;
         if(metaData.hasEncryption()) {
           if(getKeyProvider() == null) {
             throw new IllegalStateException("Data file is encrypted but no " +
                 " provider was specified");
           }
           ProtosFactory.LogFileEncryption encryption = 
metaData.getEncryption();
-          Key key = getKeyProvider().getKey(encryption.getKeyAlias());
-          decryptor = CipherProviderFactory.
-              getDecrypter(encryption.getCipherProvider(), key,
-                  encryption.getParameters().toByteArray());
+          key = getKeyProvider().getKey(encryption.getKeyAlias());
+          cipherProvider = encryption.getCipherProvider();
+          parameters = encryption.getParameters().toByteArray();
+          encryptionEnabled = true;
         }
       } finally {
         try {
@@ -209,6 +217,14 @@ class LogFileV3 extends LogFile {
         }
       }
     }
+    private CipherProvider.Decryptor getDecryptor() {
+      CipherProvider.Decryptor decryptor = decryptors.poll();
+      if(decryptor == null) {
+        decryptor = CipherProviderFactory.getDecrypter(cipherProvider, key,
+            parameters);
+      }
+      return decryptor;
+    }
     @Override
     int getVersion() {
       return Serialization.VERSION_3;
@@ -219,15 +235,29 @@ class LogFileV3 extends LogFile {
       // readers are opened right when the file is created and thus
       // empty. As such we wait to initialize until there is some
       // data before we we initialize
-      if(!initialized) {
-        initialized = true;
-        initialize();
+      synchronized (this) {
+        if(!initialized) {
+          initialized = true;
+          initialize();
+        }
       }
       byte[] buffer = readDelimitedBuffer(fileHandle);
-      if(decryptor != null) {
-        buffer = decryptor.decrypt(buffer);
+      CipherProvider.Decryptor decryptor = null;
+      boolean success = false;
+      try {
+        if(encryptionEnabled) {
+          decryptor = getDecryptor();
+          buffer = decryptor.decrypt(buffer);
+        }
+        TransactionEventRecord event = TransactionEventRecord.
+            fromByteArray(buffer);
+        success = true;
+        return event;
+      } finally {
+        if(success && encryptionEnabled && decryptor != null) {
+          decryptors.offer(decryptor);
+        }
       }
-      return TransactionEventRecord.fromByteArray(buffer);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/flume/blob/ee25a3bc/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/TestFileChannelEncryption.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/TestFileChannelEncryption.java
 
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/TestFileChannelEncryption.java
index 44af4c9..a0037b8 100644
--- 
a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/TestFileChannelEncryption.java
+++ 
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/TestFileChannelEncryption.java
@@ -21,8 +21,14 @@ package org.apache.flume.channel.file.encryption;
 import static org.apache.flume.channel.file.TestUtils.*;
 
 import java.io.File;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.flume.ChannelException;
 import org.apache.flume.FlumeException;
@@ -83,6 +89,80 @@ public class TestFileChannelEncryption extends 
TestFileChannelBase {
     }
     return overrides;
   }
+  /**
+   * Test fails without FLUME-1565
+   */
+  @Test
+  public void testThreadedConsume() throws Exception {
+    int numThreads = 20;
+    Map<String, String> overrides = getOverridesForEncryption();
+    overrides.put(FileChannelConfiguration.CAPACITY, String.valueOf(10000));
+    channel = createFileChannel(overrides);
+    channel.start();
+    Assert.assertTrue(channel.isOpen());
+    Executor executor = Executors.newFixedThreadPool(numThreads);
+    Set<String> in = fillChannel(channel, "threaded-consume");
+    final AtomicBoolean error = new AtomicBoolean(false);
+    final CountDownLatch startLatch = new CountDownLatch(numThreads);
+    final CountDownLatch stopLatch = new CountDownLatch(numThreads);
+    final Set<String> out = Collections.synchronizedSet(new HashSet<String>());
+    for (int i = 0; i < numThreads; i++) {
+      executor.execute(new Runnable() {
+        @Override
+        public void run() {
+          try {
+            startLatch.countDown();
+            startLatch.await();
+            out.addAll(takeEvents(channel, 10));
+          } catch (Throwable t) {
+            error.set(true);
+            LOGGER.error("Error in take thread", t);
+          } finally {
+            stopLatch.countDown();
+          }
+        }
+      });
+    }
+    stopLatch.await();
+    Assert.assertFalse(error.get());
+    compareInputAndOut(in, out);
+  }
+  @Test
+  public void testThreadedProduce() throws Exception {
+    int numThreads = 20;
+    Map<String, String> overrides = getOverridesForEncryption();
+    overrides.put(FileChannelConfiguration.CAPACITY, String.valueOf(10000));
+    channel = createFileChannel(overrides);
+    channel.start();
+    Assert.assertTrue(channel.isOpen());
+    Executor executor = Executors.newFixedThreadPool(numThreads);
+    final AtomicBoolean error = new AtomicBoolean(false);
+    final CountDownLatch startLatch = new CountDownLatch(numThreads);
+    final CountDownLatch stopLatch = new CountDownLatch(numThreads);
+    final Set<String> in = Collections.synchronizedSet(new HashSet<String>());
+    for (int i = 0; i < numThreads; i++) {
+      executor.execute(new Runnable() {
+        @Override
+        public void run() {
+          try {
+            startLatch.countDown();
+            startLatch.await();
+            in.addAll(putEvents(channel, "thread-produce", 10, 10000, true));
+          } catch (Throwable t) {
+            error.set(true);
+            LOGGER.error("Error in put thread", t);
+          } finally {
+            stopLatch.countDown();
+          }
+        }
+      });
+    }
+    stopLatch.await();
+    Set<String> out = consumeChannel(channel);
+
+    Assert.assertFalse(error.get());
+    compareInputAndOut(in, out);
+  }
   @Test
   public void testConfiguration() throws Exception {
     Map<String, String> overrides = Maps.newHashMap();

Reply via email to