http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/EncryptionTestUtils.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/EncryptionTestUtils.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/EncryptionTestUtils.java index 6ca3246..f290035 100644 --- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/EncryptionTestUtils.java +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/EncryptionTestUtils.java @@ -18,18 +18,6 @@ */ package org.apache.flume.channel.file.encryption; -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.security.Key; -import java.security.KeyStore; -import java.util.List; -import java.util.Map; - -import javax.crypto.KeyGenerator; - -import org.apache.flume.channel.file.TestUtils; - import com.google.common.base.Charsets; import com.google.common.base.Joiner; import com.google.common.base.Throwables; @@ -37,6 +25,16 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.io.Files; import com.google.common.io.Resources; +import org.apache.flume.channel.file.TestUtils; + +import javax.crypto.KeyGenerator; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.security.Key; +import java.security.KeyStore; +import java.util.List; +import java.util.Map; public class EncryptionTestUtils { @@ -50,33 +48,32 @@ public class EncryptionTestUtils { throw Throwables.propagate(e); } } - public static void createKeyStore(File keyStoreFile, - File keyStorePasswordFile, Map<String, File> keyAliasPassword) - throws Exception { + + public static void createKeyStore(File keyStoreFile, File keyStorePasswordFile, + Map<String, File> keyAliasPassword) throws Exception { KeyStore ks = KeyStore.getInstance("jceks"); ks.load(null); List<String> keysWithSeperatePasswords = Lists.newArrayList(); - for(String alias : keyAliasPassword.keySet()) { + for (String alias : keyAliasPassword.keySet()) { Key key = newKey(); char[] password = null; File passwordFile = keyAliasPassword.get(alias); - if(passwordFile == null) { - password = Files.toString(keyStorePasswordFile, Charsets.UTF_8) - .toCharArray(); + if (passwordFile == null) { + password = Files.toString(keyStorePasswordFile, Charsets.UTF_8).toCharArray(); } else { keysWithSeperatePasswords.add(alias); password = Files.toString(passwordFile, Charsets.UTF_8).toCharArray(); } ks.setKeyEntry(alias, key, password, null); } - char[] keyStorePassword = Files. - toString(keyStorePasswordFile, Charsets.UTF_8).toCharArray(); + char[] keyStorePassword = Files.toString(keyStorePasswordFile, Charsets.UTF_8).toCharArray(); FileOutputStream outputStream = new FileOutputStream(keyStoreFile); ks.store(outputStream, keyStorePassword); outputStream.close(); } - public static Map<String, File> configureTestKeyStore(File baseDir, - File keyStoreFile) throws IOException { + + public static Map<String, File> configureTestKeyStore(File baseDir, File keyStoreFile) + throws IOException { Map<String, File> result = Maps.newHashMap(); if (System.getProperty("java.vendor").contains("IBM")) { @@ -86,50 +83,52 @@ public class EncryptionTestUtils { Resources.copy(Resources.getResource("sun-test.keystore"), new FileOutputStream(keyStoreFile)); } - /* - Commands below: - keytool -genseckey -alias key-0 -keypass keyPassword -keyalg AES \ - -keysize 128 -validity 9000 -keystore src/test/resources/test.keystore \ - -storetype jceks -storepass keyStorePassword - keytool -genseckey -alias key-1 -keyalg AES -keysize 128 -validity 9000 \ - -keystore src/test/resources/test.keystore -storetype jceks \ - -storepass keyStorePassword + + /* Commands below: + * keytool -genseckey -alias key-0 -keypass keyPassword -keyalg AES \ + * -keysize 128 -validity 9000 -keystore src/test/resources/test.keystore \ + * -storetype jceks -storepass keyStorePassword + * keytool -genseckey -alias key-1 -keyalg AES -keysize 128 -validity 9000 \ + * -keystore src/test/resources/test.keystore -storetype jceks \ + * -storepass keyStorePassword */ -// key-0 has own password, key-1 used key store password - result.put("key-0", - TestUtils.writeStringToFile(baseDir, "key-0", "keyPassword")); + // key-0 has own password, key-1 used key store password + result.put("key-0", TestUtils.writeStringToFile(baseDir, "key-0", "keyPassword")); result.put("key-1", null); return result; } - public static Map<String,String> configureForKeyStore(File keyStoreFile, - File keyStorePasswordFile, Map<String, File> keyAliasPassword) - throws Exception { + + public static Map<String, String> configureForKeyStore(File keyStoreFile, + File keyStorePasswordFile, + Map<String, File> keyAliasPassword) + throws Exception { Map<String, String> context = Maps.newHashMap(); List<String> keys = Lists.newArrayList(); Joiner joiner = Joiner.on("."); - for(String alias : keyAliasPassword.keySet()) { + for (String alias : keyAliasPassword.keySet()) { File passwordFile = keyAliasPassword.get(alias); - if(passwordFile == null) { + if (passwordFile == null) { keys.add(alias); } else { String propertyName = joiner.join(EncryptionConfiguration.KEY_PROVIDER, - EncryptionConfiguration.JCE_FILE_KEYS, alias, - EncryptionConfiguration.JCE_FILE_KEY_PASSWORD_FILE); + EncryptionConfiguration.JCE_FILE_KEYS, + alias, + EncryptionConfiguration.JCE_FILE_KEY_PASSWORD_FILE); keys.add(alias); context.put(propertyName, passwordFile.getAbsolutePath()); } } context.put(joiner.join(EncryptionConfiguration.KEY_PROVIDER, - EncryptionConfiguration.JCE_FILE_KEY_STORE_FILE), - keyStoreFile.getAbsolutePath()); - if(keyStorePasswordFile != null) { + EncryptionConfiguration.JCE_FILE_KEY_STORE_FILE), + keyStoreFile.getAbsolutePath()); + if (keyStorePasswordFile != null) { context.put(joiner.join(EncryptionConfiguration.KEY_PROVIDER, - EncryptionConfiguration.JCE_FILE_KEY_STORE_PASSWORD_FILE), - keyStorePasswordFile.getAbsolutePath()); + EncryptionConfiguration.JCE_FILE_KEY_STORE_PASSWORD_FILE), + keyStorePasswordFile.getAbsolutePath()); } context.put(joiner.join(EncryptionConfiguration.KEY_PROVIDER, - EncryptionConfiguration.JCE_FILE_KEYS), - Joiner.on(" ").join(keys)); + EncryptionConfiguration.JCE_FILE_KEYS), + Joiner.on(" ").join(keys)); return context; } }
http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/TestAESCTRNoPaddingProvider.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/TestAESCTRNoPaddingProvider.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/TestAESCTRNoPaddingProvider.java index a7c7cb2..d483fcc 100644 --- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/TestAESCTRNoPaddingProvider.java +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/TestAESCTRNoPaddingProvider.java @@ -35,13 +35,13 @@ public class TestAESCTRNoPaddingProvider { public void setup() throws Exception { KeyGenerator keyGen = KeyGenerator.getInstance("AES"); key = keyGen.generateKey(); - encryptor = CipherProviderFactory. - getEncrypter(CipherProviderType.AESCTRNOPADDING.name(), key); - decryptor = CipherProviderFactory. - getDecrypter(CipherProviderType.AESCTRNOPADDING.name(), key, - encryptor.getParameters()); + encryptor = CipherProviderFactory.getEncrypter( + CipherProviderType.AESCTRNOPADDING.name(), key); + decryptor = CipherProviderFactory.getDecrypter( + CipherProviderType.AESCTRNOPADDING.name(), key, encryptor.getParameters()); cipherProviderTestSuite = new CipherProviderTestSuite(encryptor, decryptor); } + @Test public void test() throws Exception { cipherProviderTestSuite.test(); http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/TestFileChannelEncryption.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/TestFileChannelEncryption.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/TestFileChannelEncryption.java index d4537a8..4e5ab6f 100644 --- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/TestFileChannelEncryption.java +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/TestFileChannelEncryption.java @@ -18,18 +18,10 @@ */ package org.apache.flume.channel.file.encryption; -import static org.apache.flume.channel.file.TestUtils.*; - -import java.io.File; -import java.util.Collections; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executor; -import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicBoolean; - +import com.google.common.base.Charsets; +import com.google.common.base.Joiner; +import com.google.common.collect.Maps; +import com.google.common.io.Files; import org.apache.flume.ChannelException; import org.apache.flume.FlumeException; import org.apache.flume.channel.file.FileChannelConfiguration; @@ -42,10 +34,21 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Charsets; -import com.google.common.base.Joiner; -import com.google.common.collect.Maps; -import com.google.common.io.Files; +import java.io.File; +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.apache.flume.channel.file.TestUtils.compareInputAndOut; +import static org.apache.flume.channel.file.TestUtils.consumeChannel; +import static org.apache.flume.channel.file.TestUtils.fillChannel; +import static org.apache.flume.channel.file.TestUtils.putEvents; +import static org.apache.flume.channel.file.TestUtils.takeEvents; public class TestFileChannelEncryption extends TestFileChannelBase { protected static final Logger LOGGER = @@ -62,36 +65,39 @@ public class TestFileChannelEncryption extends TestFileChannelBase { keyStoreFile = new File(baseDir, "keyStoreFile"); Assert.assertTrue(keyStoreFile.createNewFile()); keyAliasPassword = Maps.newHashMap(); - keyAliasPassword.putAll(EncryptionTestUtils. - configureTestKeyStore(baseDir, keyStoreFile)); + keyAliasPassword.putAll(EncryptionTestUtils.configureTestKeyStore(baseDir, keyStoreFile)); } + @After public void teardown() { super.teardown(); } + private Map<String, String> getOverrides() throws Exception { Map<String, String> overrides = Maps.newHashMap(); overrides.put(FileChannelConfiguration.CAPACITY, String.valueOf(100)); - overrides.put(FileChannelConfiguration.TRANSACTION_CAPACITY, - String.valueOf(100)); + overrides.put(FileChannelConfiguration.TRANSACTION_CAPACITY, String.valueOf(100)); return overrides; } + private Map<String, String> getOverridesForEncryption() throws Exception { Map<String, String> overrides = getOverrides(); - Map<String, String> encryptionProps = EncryptionTestUtils. - configureForKeyStore(keyStoreFile, - keyStorePasswordFile, keyAliasPassword); + Map<String, String> encryptionProps = + EncryptionTestUtils.configureForKeyStore(keyStoreFile, + keyStorePasswordFile, + keyAliasPassword); encryptionProps.put(EncryptionConfiguration.KEY_PROVIDER, - KeyProviderType.JCEKSFILE.name()); + KeyProviderType.JCEKSFILE.name()); encryptionProps.put(EncryptionConfiguration.CIPHER_PROVIDER, - CipherProviderType.AESCTRNOPADDING.name()); + CipherProviderType.AESCTRNOPADDING.name()); encryptionProps.put(EncryptionConfiguration.ACTIVE_KEY, "key-1"); - for(String key : encryptionProps.keySet()) { + for (String key : encryptionProps.keySet()) { overrides.put(EncryptionConfiguration.ENCRYPTION_PREFIX + "." + key, - encryptionProps.get(key)); + encryptionProps.get(key)); } return overrides; } + /** * Test fails without FLUME-1565 */ @@ -222,15 +228,16 @@ public class TestFileChannelEncryption extends TestFileChannelBase { channel = createFileChannel(noEncryptionOverrides); channel.start(); - if(channel.isOpen()) { + if (channel.isOpen()) { try { takeEvents(channel, 1, 1); Assert.fail("Channel was opened and take did not throw exception"); - } catch(ChannelException ex) { + } catch (ChannelException ex) { // expected } } } + @Test public void testUnencyrptedAndEncryptedLogs() throws Exception { Map<String, String> noEncryptionOverrides = getOverrides(); @@ -252,41 +259,46 @@ public class TestFileChannelEncryption extends TestFileChannelBase { Set<String> out = consumeChannel(channel); compareInputAndOut(in, out); } + @Test public void testBadKeyProviderInvalidValue() throws Exception { Map<String, String> overrides = getOverridesForEncryption(); overrides.put(Joiner.on(".").join(EncryptionConfiguration.ENCRYPTION_PREFIX, - EncryptionConfiguration.KEY_PROVIDER), "invalid"); + EncryptionConfiguration.KEY_PROVIDER), + "invalid"); try { channel = createFileChannel(overrides); Assert.fail(); - } catch(FlumeException ex) { - Assert.assertEquals("java.lang.ClassNotFoundException: invalid", - ex.getMessage()); + } catch (FlumeException ex) { + Assert.assertEquals("java.lang.ClassNotFoundException: invalid", ex.getMessage()); } } + @Test public void testBadKeyProviderInvalidClass() throws Exception { Map<String, String> overrides = getOverridesForEncryption(); overrides.put(Joiner.on(".").join(EncryptionConfiguration.ENCRYPTION_PREFIX, - EncryptionConfiguration.KEY_PROVIDER), String.class.getName()); + EncryptionConfiguration.KEY_PROVIDER), + String.class.getName()); try { channel = createFileChannel(overrides); Assert.fail(); - } catch(FlumeException ex) { - Assert.assertEquals("Unable to instantiate Builder from java.lang.String", - ex.getMessage()); + } catch (FlumeException ex) { + Assert.assertEquals("Unable to instantiate Builder from java.lang.String", ex.getMessage()); } } + @Test public void testBadCipherProviderInvalidValue() throws Exception { Map<String, String> overrides = getOverridesForEncryption(); overrides.put(Joiner.on(".").join(EncryptionConfiguration.ENCRYPTION_PREFIX, - EncryptionConfiguration.CIPHER_PROVIDER), "invalid"); + EncryptionConfiguration.CIPHER_PROVIDER), + "invalid"); channel = createFileChannel(overrides); channel.start(); Assert.assertFalse(channel.isOpen()); } + @Test public void testBadCipherProviderInvalidClass() throws Exception { Map<String, String> overrides = getOverridesForEncryption(); @@ -296,6 +308,7 @@ public class TestFileChannelEncryption extends TestFileChannelBase { channel.start(); Assert.assertFalse(channel.isOpen()); } + @Test public void testMissingKeyStoreFile() throws Exception { Map<String, String> overrides = getOverridesForEncryption(); @@ -306,11 +319,12 @@ public class TestFileChannelEncryption extends TestFileChannelBase { try { channel = createFileChannel(overrides); Assert.fail(); - } catch(RuntimeException ex) { + } catch (RuntimeException ex) { Assert.assertTrue("Exception message is incorrect: " + ex.getMessage(), ex.getMessage().startsWith("java.io.FileNotFoundException: /path/does/not/exist ")); } } + @Test public void testMissingKeyStorePasswordFile() throws Exception { Map<String, String> overrides = getOverridesForEncryption(); @@ -321,11 +335,12 @@ public class TestFileChannelEncryption extends TestFileChannelBase { try { channel = createFileChannel(overrides); Assert.fail(); - } catch(RuntimeException ex) { + } catch (RuntimeException ex) { Assert.assertTrue("Exception message is incorrect: " + ex.getMessage(), ex.getMessage().startsWith("java.io.FileNotFoundException: /path/does/not/exist ")); } } + @Test public void testBadKeyStorePassword() throws Exception { Files.write("invalid", keyStorePasswordFile, Charsets.UTF_8); @@ -334,11 +349,12 @@ public class TestFileChannelEncryption extends TestFileChannelBase { channel = TestUtils.createFileChannel(checkpointDir.getAbsolutePath(), dataDir, overrides); Assert.fail(); - } catch(RuntimeException ex) { + } catch (RuntimeException ex) { Assert.assertEquals("java.io.IOException: Keystore was tampered with, or " + "password was incorrect", ex.getMessage()); } } + @Test public void testBadKeyAlias() throws Exception { Map<String, String> overrides = getOverridesForEncryption(); http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/TestJCEFileKeyProvider.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/TestJCEFileKeyProvider.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/TestJCEFileKeyProvider.java index f33cada..8214a05 100644 --- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/TestJCEFileKeyProvider.java +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/TestJCEFileKeyProvider.java @@ -18,12 +18,10 @@ */ package org.apache.flume.channel.file.encryption; -import java.io.File; -import java.security.Key; -import java.util.Map; - +import com.google.common.base.Charsets; +import com.google.common.collect.Maps; +import com.google.common.io.Files; import junit.framework.Assert; - import org.apache.commons.io.FileUtils; import org.apache.flume.Context; import org.apache.flume.channel.file.TestUtils; @@ -31,9 +29,9 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; -import com.google.common.base.Charsets; -import com.google.common.collect.Maps; -import com.google.common.io.Files; +import java.io.File; +import java.security.Key; +import java.util.Map; public class TestJCEFileKeyProvider { private CipherProvider.Encryptor encryptor; @@ -53,62 +51,69 @@ public class TestJCEFileKeyProvider { Assert.assertTrue(keyStoreFile.createNewFile()); } + @After public void cleanup() { FileUtils.deleteQuietly(baseDir); } + private void initializeForKey(Key key) { - encryptor = - new AESCTRNoPaddingProvider.EncryptorBuilder().setKey(key).build(); - decryptor = - new AESCTRNoPaddingProvider.DecryptorBuilder() - .setKey(key).setParameters(encryptor.getParameters()).build(); + encryptor = new AESCTRNoPaddingProvider.EncryptorBuilder() + .setKey(key) + .build(); + decryptor = new AESCTRNoPaddingProvider.DecryptorBuilder() + .setKey(key) + .setParameters(encryptor.getParameters()) + .build(); } + @Test public void testWithNewKeyStore() throws Exception { createNewKeyStore(); EncryptionTestUtils.createKeyStore(keyStoreFile, keyStorePasswordFile, keyAliasPassword); - Context context = new Context(EncryptionTestUtils. - configureForKeyStore(keyStoreFile, - keyStorePasswordFile, keyAliasPassword)); + Context context = new Context( + EncryptionTestUtils.configureForKeyStore(keyStoreFile, + keyStorePasswordFile, + keyAliasPassword)); Context keyProviderContext = new Context( context.getSubProperties(EncryptionConfiguration.KEY_PROVIDER + ".")); - KeyProvider keyProvider = KeyProviderFactory. - getInstance(KeyProviderType.JCEKSFILE.name(), keyProviderContext); + KeyProvider keyProvider = + KeyProviderFactory.getInstance(KeyProviderType.JCEKSFILE.name(), keyProviderContext); testKeyProvider(keyProvider); } + @Test public void testWithExistingKeyStore() throws Exception { - keyAliasPassword.putAll(EncryptionTestUtils. - configureTestKeyStore(baseDir, keyStoreFile)); - Context context = new Context(EncryptionTestUtils. - configureForKeyStore(keyStoreFile, - keyStorePasswordFile, keyAliasPassword)); + keyAliasPassword.putAll(EncryptionTestUtils.configureTestKeyStore(baseDir, keyStoreFile)); + Context context = new Context( + EncryptionTestUtils.configureForKeyStore(keyStoreFile, + keyStorePasswordFile, + keyAliasPassword)); Context keyProviderContext = new Context( context.getSubProperties(EncryptionConfiguration.KEY_PROVIDER + ".")); - KeyProvider keyProvider = KeyProviderFactory. - getInstance(KeyProviderType.JCEKSFILE.name(), keyProviderContext); + KeyProvider keyProvider = + KeyProviderFactory.getInstance(KeyProviderType.JCEKSFILE.name(), keyProviderContext); testKeyProvider(keyProvider); } + private void createNewKeyStore() throws Exception { - for(int i = 0; i < 10; i++) { + for (int i = 0; i < 10; i++) { // create some with passwords, some without - if(i % 2 == 0) { + if (i % 2 == 0) { String alias = "test-" + i; String password = String.valueOf(i); - keyAliasPassword.put(alias, - TestUtils.writeStringToFile(baseDir, alias, password)); + keyAliasPassword.put(alias, TestUtils.writeStringToFile(baseDir, alias, password)); } } } + private void testKeyProvider(KeyProvider keyProvider) { - for(String alias : keyAliasPassword.keySet()) { + for (String alias : keyAliasPassword.keySet()) { Key key = keyProvider.getKey(alias); initializeForKey(key); String expected = "some text here " + alias; - byte[] cipherText = encryptor. - encrypt(expected.getBytes(Charsets.UTF_8)); + byte[] cipherText = encryptor.encrypt(expected.getBytes(Charsets.UTF_8)); byte[] clearText = decryptor.decrypt(cipherText); Assert.assertEquals(expected, new String(clearText, Charsets.UTF_8)); } http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/BaseJdbcChannelProviderTest.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/BaseJdbcChannelProviderTest.java b/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/BaseJdbcChannelProviderTest.java index 85ad7fe..7031eb7 100644 --- a/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/BaseJdbcChannelProviderTest.java +++ b/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/BaseJdbcChannelProviderTest.java @@ -17,6 +17,17 @@ */ package org.apache.flume.channel.jdbc; +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.flume.Transaction; +import org.apache.flume.channel.jdbc.impl.JdbcChannelProviderImpl; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.File; import java.io.IOException; import java.util.ArrayList; @@ -32,17 +43,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import org.apache.flume.Context; -import org.apache.flume.Event; -import org.apache.flume.Transaction; -import org.apache.flume.channel.jdbc.impl.JdbcChannelProviderImpl; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - public abstract class BaseJdbcChannelProviderTest { private static final Logger LOGGER = LoggerFactory.getLogger(BaseJdbcChannelProviderTest.class); @@ -103,7 +103,7 @@ public abstract class BaseJdbcChannelProviderTest { Set<MockEvent> events = new HashSet<MockEvent>(); for (int i = 1; i < 12; i++) { - events.add(MockEventUtils.generateMockEvent(i, i, i, 61%i, 1)); + events.add(MockEventUtils.generateMockEvent(i, i, i, 61 % i, 1)); } Iterator<MockEvent> meIt = events.iterator(); @@ -170,7 +170,7 @@ public abstract class BaseJdbcChannelProviderTest { new HashMap<String, List<MockEvent>>(); for (int i = 1; i < 121; i++) { - MockEvent me = MockEventUtils.generateMockEvent(i, i, i, 61%i, 10); + MockEvent me = MockEventUtils.generateMockEvent(i, i, i, 61 % i, 10); List<MockEvent> meList = eventMap.get(me.getChannel()); if (meList == null) { meList = new ArrayList<MockEvent>(); @@ -227,7 +227,7 @@ public abstract class BaseJdbcChannelProviderTest { Set<MockEvent> events = new HashSet<MockEvent>(); for (int i = 1; i < 81; i++) { - events.add(MockEventUtils.generateMockEvent(i, i, i, 61%i, 5)); + events.add(MockEventUtils.generateMockEvent(i, i, i, 61 % i, 5)); } Iterator<MockEvent> meIt = events.iterator(); http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/MockEvent.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/MockEvent.java b/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/MockEvent.java index 1e412c5..6804a9f 100644 --- a/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/MockEvent.java +++ b/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/MockEvent.java @@ -27,8 +27,7 @@ public class MockEvent implements Event { private final Map<String, String> headers; private final String channel; - public MockEvent(byte[] payload, Map<String, String> headers, String channel) - { + public MockEvent(byte[] payload, Map<String, String> headers, String channel) { this.payload = payload; this.headers = headers; this.channel = channel; http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/MockEventUtils.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/MockEventUtils.java b/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/MockEventUtils.java index 10d8b51..e5ee324 100644 --- a/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/MockEventUtils.java +++ b/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/MockEventUtils.java @@ -17,13 +17,13 @@ */ package org.apache.flume.channel.jdbc; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.HashMap; import java.util.Map; import java.util.Random; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - public final class MockEventUtils { public static final Logger LOGGER = @@ -70,20 +70,20 @@ public final class MockEventUtils { * @param numChannels * @return */ - public static MockEvent generateMockEvent(int payloadMargin, - int headerNameMargin, int headerValueMargin, int numHeaders, - int numChannels) { + public static MockEvent generateMockEvent(int payloadMargin, int headerNameMargin, + int headerValueMargin, int numHeaders, + int numChannels) { int chIndex = 0; if (numChannels > 1) { - chIndex = Math.abs(RANDOM.nextInt())%numChannels; + chIndex = Math.abs(RANDOM.nextInt()) % numChannels; } - String channel = "test-"+chIndex; + String channel = "test-" + chIndex; StringBuilder sb = new StringBuilder("New Event[payload size:"); int plTh = ConfigurationConstants.PAYLOAD_LENGTH_THRESHOLD; - int plSize = Math.abs(RANDOM.nextInt())%plTh + payloadMargin; + int plSize = Math.abs(RANDOM.nextInt()) % plTh + payloadMargin; sb.append(plSize).append(", numHeaders:").append(numHeaders); sb.append(", channel:").append(channel); @@ -93,8 +93,8 @@ public final class MockEventUtils { Map<String, String> headers = new HashMap<String, String>(); for (int i = 0; i < numHeaders; i++) { - int nmSize = Math.abs(RANDOM.nextInt())%nmTh + headerNameMargin; - int vlSize = Math.abs(RANDOM.nextInt())%vlTh + headerValueMargin; + int nmSize = Math.abs(RANDOM.nextInt()) % nmTh + headerNameMargin; + int vlSize = Math.abs(RANDOM.nextInt()) % vlTh + headerValueMargin; String name = generateHeaderString(nmSize); String value = generateHeaderString(vlSize); http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestDerbySchemaHandlerQueries.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestDerbySchemaHandlerQueries.java b/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestDerbySchemaHandlerQueries.java index 362bcfa..cad972c 100644 --- a/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestDerbySchemaHandlerQueries.java +++ b/flume-ng-channels/flume-jdbc-channel/src/test/java/org/apache/flume/channel/jdbc/TestDerbySchemaHandlerQueries.java @@ -107,7 +107,6 @@ public class TestDerbySchemaHandlerQueries { = "INSERT INTO FLUME.FL_HEADER (FLH_EVENT, FLH_NAME, FLH_VALUE, " + "FLH_NMSPILL, FLH_VLSPILL) VALUES ( ?, ?, ?, ?, ?)"; - public static final String EXPECTED_STMT_INSERT_HEADER_NAME_SPILL = "INSERT INTO FLUME.FL_NMSPILL (FLN_HEADER, FLN_SPILL) VALUES ( ?, ?)"; @@ -119,7 +118,6 @@ public class TestDerbySchemaHandlerQueries { + "FLE_ID = (SELECT MIN(FLE_ID) FROM FLUME.FL_EVENT WHERE " + "FLE_CHANNEL = ?)"; - public static final String EXPECTED_STMT_FETCH_PAYLOAD_SPILL = "SELECT FLP_SPILL FROM FLUME.FL_PLSPILL WHERE FLP_EVENT = ?"; http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java index d01346a..b63ac9b 100644 --- a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java +++ b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java @@ -130,19 +130,19 @@ public class TestKafkaChannel { Properties consumerProps = channel.getConsumerProps(); Properties producerProps = channel.getProducerProps(); - Assert.assertEquals(producerProps.getProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG),testUtil.getKafkaServerUrl()); - Assert.assertEquals(consumerProps.getProperty(ConsumerConfig.GROUP_ID_CONFIG), "flume-something"); - Assert.assertEquals(consumerProps.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest"); - + Assert.assertEquals(producerProps.getProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG), + testUtil.getKafkaServerUrl()); + Assert.assertEquals(consumerProps.getProperty(ConsumerConfig.GROUP_ID_CONFIG), + "flume-something"); + Assert.assertEquals(consumerProps.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), + "earliest"); } - @Test public void testSuccess() throws Exception { doTestSuccessRollback(false, false); } - @Test public void testSuccessInterleave() throws Exception { doTestSuccessRollback(false, true); @@ -212,8 +212,10 @@ public class TestKafkaChannel { KafkaChannel channel = startChannel(false); - KafkaProducer<String, byte[]> producer = new KafkaProducer<String, byte[]>(channel.getProducerProps()); - ProducerRecord<String, byte[]> data = new ProducerRecord<String, byte[]>(topic, "header-" + message, message.getBytes()); + KafkaProducer<String, byte[]> producer = + new KafkaProducer<String, byte[]>(channel.getProducerProps()); + ProducerRecord<String, byte[]> data = + new ProducerRecord<String, byte[]>(topic, "header-" + message, message.getBytes()); producer.send(data).get(); producer.flush(); producer.close(); @@ -234,7 +236,7 @@ public class TestKafkaChannel { } private Event takeEventWithoutCommittingTxn(KafkaChannel channel) { - for (int i=0; i < 5; i++) { + for (int i = 0; i < 5; i++) { Transaction txn = channel.getTransaction(); txn.begin(); @@ -255,17 +257,19 @@ public class TestKafkaChannel { KafkaProducer<String, byte[]> producer = new KafkaProducer<String, byte[]>(props); for (int i = 0; i < 50; i++) { - ProducerRecord<String, byte[]> data = new ProducerRecord<String, byte[]>(topic, String.valueOf(i) + "-header", String.valueOf(i).getBytes()); + ProducerRecord<String, byte[]> data = + new ProducerRecord<String, byte[]>(topic, String.valueOf(i) + "-header", + String.valueOf(i).getBytes()); producer.send(data).get(); } ExecutorCompletionService<Void> submitterSvc = new - ExecutorCompletionService<Void>(Executors.newCachedThreadPool()); - List<Event> events = pullEvents(channel, submitterSvc, - 50, false, false); + ExecutorCompletionService<Void>(Executors.newCachedThreadPool()); + List<Event> events = pullEvents(channel, submitterSvc, 50, false, false); wait(submitterSvc, 5); Map<Integer, String> finals = new HashMap<Integer, String>(); for (int i = 0; i < 50; i++) { - finals.put(Integer.parseInt(new String(events.get(i).getBody())), events.get(i).getHeaders().get(KEY_HEADER)); + finals.put(Integer.parseInt(new String(events.get(i).getBody())), + events.get(i).getHeaders().get(KEY_HEADER)); } for (int i = 0; i < 50; i++) { Assert.assertTrue(finals.keySet().contains(i)); @@ -284,7 +288,8 @@ public class TestKafkaChannel { KafkaProducer<String, byte[]> producer = new KafkaProducer<String, byte[]>(props); for (int i = 0; i < 50; i++) { - ProducerRecord<String, byte[]> data = new ProducerRecord<String, byte[]>(topic, null, String.valueOf(i).getBytes()); + ProducerRecord<String, byte[]> data = + new ProducerRecord<String, byte[]>(topic, null, String.valueOf(i).getBytes()); producer.send(data).get(); } ExecutorCompletionService<Void> submitterSvc = new @@ -323,14 +328,14 @@ public class TestKafkaChannel { channel.put(EventBuilder.withBody(msgs.get(i).getBytes(), headers)); } tx.commit(); - ExecutorCompletionService<Void> submitterSvc = new - ExecutorCompletionService<Void>(Executors.newCachedThreadPool()); - List<Event> events = pullEvents(channel, submitterSvc, - 50, false, false); + ExecutorCompletionService<Void> submitterSvc = + new ExecutorCompletionService<Void>(Executors.newCachedThreadPool()); + List<Event> events = pullEvents(channel, submitterSvc, 50, false, false); wait(submitterSvc, 5); Map<Integer, String> finals = new HashMap<Integer, String>(); for (int i = 0; i < 50; i++) { - finals.put(Integer.parseInt(new String(events.get(i).getBody())), events.get(i).getHeaders().get(KEY_HEADER)); + finals.put(Integer.parseInt(new String(events.get(i).getBody())), + events.get(i).getHeaders().get(KEY_HEADER)); } for (int i = 0; i < 50; i++) { Assert.assertTrue(finals.keySet().contains(i)); @@ -403,13 +408,13 @@ public class TestKafkaChannel { } private void writeAndVerify(final boolean testRollbacks, - final KafkaChannel channel, final boolean interleave) throws Exception { + final KafkaChannel channel, final boolean interleave) + throws Exception { final List<List<Event>> events = createBaseList(); ExecutorCompletionService<Void> submitterSvc = - new ExecutorCompletionService<Void>(Executors - .newCachedThreadPool()); + new ExecutorCompletionService<Void>(Executors.newCachedThreadPool()); putEvents(channel, events, submitterSvc); @@ -418,11 +423,9 @@ public class TestKafkaChannel { } ExecutorCompletionService<Void> submitterSvc2 = - new ExecutorCompletionService<Void>(Executors - .newCachedThreadPool()); + new ExecutorCompletionService<Void>(Executors.newCachedThreadPool()); - final List<Event> eventsPulled = - pullEvents(channel, submitterSvc2, 50, testRollbacks, true); + final List<Event> eventsPulled = pullEvents(channel, submitterSvc2, 50, testRollbacks, true); if (!interleave) { wait(submitterSvc, 5); @@ -586,18 +589,18 @@ public class TestKafkaChannel { int numPartitions = 5; int sessionTimeoutMs = 10000; int connectionTimeoutMs = 10000; - ZkUtils zkUtils = ZkUtils.apply(testUtil.getZkUrl(), sessionTimeoutMs, connectionTimeoutMs, false); - + ZkUtils zkUtils = + ZkUtils.apply(testUtil.getZkUrl(), sessionTimeoutMs, connectionTimeoutMs, false); int replicationFactor = 1; Properties topicConfig = new Properties(); - AdminUtils.createTopic(zkUtils, topicName, numPartitions, - replicationFactor, topicConfig); + AdminUtils.createTopic(zkUtils, topicName, numPartitions, replicationFactor, topicConfig); } public static void deleteTopic(String topicName) { int sessionTimeoutMs = 10000; int connectionTimeoutMs = 10000; - ZkUtils zkUtils = ZkUtils.apply(testUtil.getZkUrl(), sessionTimeoutMs, connectionTimeoutMs, false); + ZkUtils zkUtils = + ZkUtils.apply(testUtil.getZkUrl(), sessionTimeoutMs, connectionTimeoutMs, false); AdminUtils.deleteTopic(zkUtils, topicName); } } http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-channels/flume-spillable-memory-channel/src/test/java/org/apache/flume/channel/TestSpillableMemoryChannel.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-spillable-memory-channel/src/test/java/org/apache/flume/channel/TestSpillableMemoryChannel.java b/flume-ng-channels/flume-spillable-memory-channel/src/test/java/org/apache/flume/channel/TestSpillableMemoryChannel.java index 1e4e819..848636b 100644 --- a/flume-ng-channels/flume-spillable-memory-channel/src/test/java/org/apache/flume/channel/TestSpillableMemoryChannel.java +++ b/flume-ng-channels/flume-spillable-memory-channel/src/test/java/org/apache/flume/channel/TestSpillableMemoryChannel.java @@ -24,10 +24,13 @@ import java.io.File; import java.util.ArrayList; import java.util.HashMap; import java.util.Map; - import java.util.UUID; -import org.apache.flume.*; +import org.apache.flume.ChannelException; +import org.apache.flume.ChannelFullException; +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.flume.Transaction; import org.apache.flume.conf.Configurables; import org.apache.flume.event.EventBuilder; import org.apache.flume.channel.file.FileChannelConfiguration; @@ -39,7 +42,6 @@ import org.junit.Test; import org.junit.Rule; import org.junit.rules.TemporaryFolder; - public class TestSpillableMemoryChannel { private SpillableMemoryChannel channel; @@ -51,14 +53,14 @@ public class TestSpillableMemoryChannel { Context context = new Context(); File checkPointDir = fileChannelDir.newFolder("checkpoint"); File dataDir = fileChannelDir.newFolder("data"); - context.put(FileChannelConfiguration.CHECKPOINT_DIR - , checkPointDir.getAbsolutePath()); + context.put(FileChannelConfiguration.CHECKPOINT_DIR, checkPointDir.getAbsolutePath()); context.put(FileChannelConfiguration.DATA_DIRS, dataDir.getAbsolutePath()); // Set checkpoint for 5 seconds otherwise test will run out of memory context.put(FileChannelConfiguration.CHECKPOINT_INTERVAL, "5000"); - if (overrides != null) + if (overrides != null) { context.putAll(overrides); + } Configurables.configure(channel, context); } @@ -81,9 +83,9 @@ public class TestSpillableMemoryChannel { startChannel(params); } - static class NullFound extends RuntimeException { public int expectedValue; + public NullFound(int expected) { super("Expected " + expected + ", but null found"); expectedValue = expected; @@ -92,9 +94,10 @@ public class TestSpillableMemoryChannel { static class TooManyNulls extends RuntimeException { private int nullsFound; + public TooManyNulls(int count) { super("Total nulls found in thread (" - + Thread.currentThread().getName() + ") : " + count); + + Thread.currentThread().getName() + ") : " + count); nullsFound = count; } } @@ -102,7 +105,7 @@ public class TestSpillableMemoryChannel { @Before public void setUp() { channel = new SpillableMemoryChannel(); - channel.setName("spillChannel-" + UUID.randomUUID() ); + channel.setName("spillChannel-" + UUID.randomUUID()); } @After @@ -117,7 +120,7 @@ public class TestSpillableMemoryChannel { } private static void takeNull(AbstractChannel channel) { - channel.take(); + channel.take(); } private static void takeN(int first, int count, AbstractChannel channel) { @@ -127,7 +130,7 @@ public class TestSpillableMemoryChannel { if (e == null) { throw new NullFound(i); } - Event expected = EventBuilder.withBody( String.valueOf(i).getBytes() ); + Event expected = EventBuilder.withBody(String.valueOf(i).getBytes()); Assert.assertArrayEquals(e.getBody(), expected.getBody()); } } @@ -140,16 +143,14 @@ public class TestSpillableMemoryChannel { if (e == null) { try { Thread.sleep(0); - } catch (InterruptedException ex) - { /* ignore */ } + } catch (InterruptedException ex) { /* ignore */ } return i; } } return i; } - private static void transactionalPutN(int first, int count, - AbstractChannel channel) { + private static void transactionalPutN(int first, int count, AbstractChannel channel) { Transaction tx = channel.getTransaction(); tx.begin(); try { @@ -163,8 +164,7 @@ public class TestSpillableMemoryChannel { } } - private static void transactionalTakeN(int first, int count, - AbstractChannel channel) { + private static void transactionalTakeN(int first, int count, AbstractChannel channel) { Transaction tx = channel.getTransaction(); tx.begin(); try { @@ -184,14 +184,13 @@ public class TestSpillableMemoryChannel { } } - private static int transactionalTakeN_NoCheck(int count - , AbstractChannel channel) { + private static int transactionalTakeN_NoCheck(int count, AbstractChannel channel) { Transaction tx = channel.getTransaction(); tx.begin(); try { int eventCount = takeN_NoCheck(count, channel); tx.commit(); - return eventCount; + return eventCount; } catch (RuntimeException e) { tx.rollback(); throw e; @@ -204,8 +203,9 @@ public class TestSpillableMemoryChannel { Transaction tx = channel.getTransaction(); tx.begin(); try { - for (int i = 0; i < count; ++i) + for (int i = 0; i < count; ++i) { takeNull(channel); + } tx.commit(); } catch (AssertionError e) { tx.rollback(); @@ -218,68 +218,63 @@ public class TestSpillableMemoryChannel { } } - private Thread makePutThread(String threadName - , final int first, final int count, final int batchSize - , final AbstractChannel channel) { - return - new Thread(threadName) { - public void run() { - int maxdepth = 0; - StopWatch watch = new StopWatch(); - for (int i = first; i<first+count; i=i+batchSize) { - transactionalPutN(i, batchSize, channel); + private Thread makePutThread(String threadName, final int first, final int count, + final int batchSize, final AbstractChannel channel) { + return new Thread(threadName) { + public void run() { + int maxdepth = 0; + StopWatch watch = new StopWatch(); + for (int i = first; i < first + count; i = i + batchSize) { + transactionalPutN(i, batchSize, channel); + } + watch.elapsed(); + } + }; + } + + private static Thread makeTakeThread(String threadName, final int first, final int count, + final int batchSize, final AbstractChannel channel) { + return new Thread(threadName) { + public void run() { + StopWatch watch = new StopWatch(); + for (int i = first; i < first + count; ) { + try { + transactionalTakeN(i, batchSize, channel); + i = i + batchSize; + } catch (NullFound e) { + i = e.expectedValue; } - watch.elapsed(); } - }; - } - - private static Thread makeTakeThread(String threadName, final int first - , final int count, final int batchSize, final AbstractChannel channel) { - return - new Thread(threadName) { - public void run() { - StopWatch watch = new StopWatch(); - for (int i = first; i < first+count; ) { + watch.elapsed(); + } + }; + } + + private static Thread makeTakeThread_noCheck(String threadName, final int totalEvents, + final int batchSize, final AbstractChannel channel) { + return new Thread(threadName) { + public void run() { + int batchSz = batchSize; + StopWatch watch = new StopWatch(); + int i = 0, attempts = 0; + while (i < totalEvents) { + int remaining = totalEvents - i; + batchSz = (remaining > batchSz) ? batchSz : remaining; + int takenCount = transactionalTakeN_NoCheck(batchSz, channel); + if (takenCount < batchSz) { try { - transactionalTakeN(i, batchSize, channel); - i = i + batchSize; - } catch (NullFound e) { - i = e.expectedValue; - } + Thread.sleep(20); + } catch (InterruptedException ex) { /* ignore */ } } - watch.elapsed(); - } - }; - } - - private static Thread makeTakeThread_noCheck(String threadName - , final int totalEvents, final int batchSize, final AbstractChannel channel) { - return - new Thread(threadName) { - public void run() { - int batchSz = batchSize; - StopWatch watch = new StopWatch(); - int i = 0, attempts = 0 ; - while(i < totalEvents) { - int remaining = totalEvents - i; - batchSz = (remaining > batchSz) ? batchSz : remaining; - int takenCount = transactionalTakeN_NoCheck(batchSz, channel); - if(takenCount < batchSz) { - try { - Thread.sleep(20); - } catch (InterruptedException ex) - { /* ignore */ } - } - i += takenCount; - ++attempts; - if(attempts > totalEvents * 3 ) { - throw new TooManyNulls(attempts); - } + i += takenCount; + ++attempts; + if (attempts > totalEvents * 3) { + throw new TooManyNulls(attempts); } - watch.elapsed(" items = " + i + ", attempts = " + attempts); } - }; + watch.elapsed(" items = " + i + ", attempts = " + attempts); + } + }; } @Test @@ -292,37 +287,36 @@ public class TestSpillableMemoryChannel { Transaction tx = channel.getTransaction(); tx.begin(); - putN(0,2,channel); + putN(0, 2, channel); tx.commit(); tx.close(); tx = channel.getTransaction(); tx.begin(); - takeN(0,2,channel); + takeN(0, 2, channel); tx.commit(); tx.close(); } - @Test - public void testCapacityDisableOverflow() { + public void testCapacityDisableOverflow() { Map<String, String> params = new HashMap<String, String>(); params.put("memoryCapacity", "2"); params.put("overflowCapacity", "0"); // overflow is disabled effectively - params.put("overflowTimeout", "0" ); + params.put("overflowTimeout", "0"); startChannel(params); - transactionalPutN(0,2,channel); + transactionalPutN(0, 2, channel); boolean threw = false; try { - transactionalPutN(2,1,channel); + transactionalPutN(2, 1, channel); } catch (ChannelException e) { threw = true; } Assert.assertTrue("Expecting ChannelFullException to be thrown", threw); - transactionalTakeN(0,2, channel); + transactionalTakeN(0, 2, channel); Transaction tx = channel.getTransaction(); tx.begin(); @@ -332,7 +326,7 @@ public class TestSpillableMemoryChannel { } @Test - public void testCapacityWithOverflow() { + public void testCapacityWithOverflow() { Map<String, String> params = new HashMap<String, String>(); params.put("memoryCapacity", "2"); params.put("overflowCapacity", "4"); @@ -346,19 +340,19 @@ public class TestSpillableMemoryChannel { boolean threw = false; try { - transactionalPutN(7,2,channel); // cannot fit in channel + transactionalPutN(7, 2, channel); // cannot fit in channel } catch (ChannelFullException e) { threw = true; } Assert.assertTrue("Expecting ChannelFullException to be thrown", threw); - transactionalTakeN(1,2, channel); - transactionalTakeN(3,2, channel); - transactionalTakeN(5,2, channel); + transactionalTakeN(1, 2, channel); + transactionalTakeN(3, 2, channel); + transactionalTakeN(5, 2, channel); } @Test - public void testRestart() { + public void testRestart() { Map<String, String> params = new HashMap<String, String>(); params.put("memoryCapacity", "2"); params.put("overflowCapacity", "10"); @@ -372,8 +366,7 @@ public class TestSpillableMemoryChannel { restartChannel(params); // from overflow, as in memory stuff should be lost - transactionalTakeN(3,2, channel); - + transactionalTakeN(3, 2, channel); } @Test @@ -382,35 +375,34 @@ public class TestSpillableMemoryChannel { params.put("memoryCapacity", "10000000"); params.put("overflowCapacity", "20000000"); params.put(FileChannelConfiguration.TRANSACTION_CAPACITY, "10"); - params.put("overflowTimeout", "1" ); + params.put("overflowTimeout", "1"); startChannel(params); - transactionalPutN( 1,5,channel); - transactionalPutN( 6,5,channel); - transactionalPutN(11,5,channel); // these should go to overflow + transactionalPutN(1, 5, channel); + transactionalPutN(6, 5, channel); + transactionalPutN(11, 5, channel); // these should go to overflow - transactionalTakeN(1,10, channel); - transactionalTakeN(11,5, channel); + transactionalTakeN(1, 10, channel); + transactionalTakeN(11, 5, channel); } @Test public void testOverflow() { - Map<String, String> params = new HashMap<String, String>(); params.put("memoryCapacity", "10"); params.put("overflowCapacity", "20"); params.put(FileChannelConfiguration.TRANSACTION_CAPACITY, "10"); - params.put("overflowTimeout", "1" ); + params.put("overflowTimeout", "1"); startChannel(params); - transactionalPutN( 1,5,channel); - transactionalPutN( 6,5,channel); - transactionalPutN(11,5,channel); // these should go to overflow + transactionalPutN(1, 5, channel); + transactionalPutN(6, 5, channel); + transactionalPutN(11, 5, channel); // these should go to overflow - transactionalTakeN(1,10, channel); - transactionalTakeN(11,5, channel); + transactionalTakeN(1, 10, channel); + transactionalTakeN(11, 5, channel); } @Test @@ -419,29 +411,29 @@ public class TestSpillableMemoryChannel { params.put("memoryCapacity", "10"); params.put("overflowCapacity", "10"); params.put(FileChannelConfiguration.TRANSACTION_CAPACITY, "5"); - params.put("overflowTimeout", "1" ); + params.put("overflowTimeout", "1"); startChannel(params); - transactionalPutN( 1,5,channel); - transactionalPutN( 6,5,channel); - transactionalPutN(11,5,channel); // into overflow - transactionalPutN(16,5,channel); // into overflow + transactionalPutN(1, 5, channel); + transactionalPutN(6, 5, channel); + transactionalPutN(11, 5, channel); // into overflow + transactionalPutN(16, 5, channel); // into overflow transactionalTakeN(1, 1, channel); - transactionalTakeN(2, 5,channel); - transactionalTakeN(7, 4,channel); + transactionalTakeN(2, 5, channel); + transactionalTakeN(7, 4, channel); - transactionalPutN( 20,2,channel); - transactionalPutN( 22,3,channel); + transactionalPutN(20, 2, channel); + transactionalPutN(22, 3, channel); - transactionalTakeN( 11,3,channel); // from overflow - transactionalTakeN( 14,5,channel); // from overflow - transactionalTakeN( 19,2,channel); // from overflow + transactionalTakeN(11, 3, channel); // from overflow + transactionalTakeN(14, 5, channel); // from overflow + transactionalTakeN(19, 2, channel); // from overflow } @Test - public void testByteCapacity() { + public void testByteCapacity() { Map<String, String> params = new HashMap<String, String>(); params.put("memoryCapacity", "1000"); // configure to hold 8 events of 10 bytes each (plus 20% event header space) @@ -449,12 +441,12 @@ public class TestSpillableMemoryChannel { params.put("avgEventSize", "10"); params.put("overflowCapacity", "20"); params.put(FileChannelConfiguration.TRANSACTION_CAPACITY, "10"); - params.put("overflowTimeout", "1" ); + params.put("overflowTimeout", "1"); startChannel(params); transactionalPutN(1, 8, channel); // this wil max the byteCapacity transactionalPutN(9, 10, channel); - transactionalPutN(19,10, channel); // this will fill up the overflow + transactionalPutN(19, 10, channel); // this will fill up the overflow boolean threw = false; try { @@ -463,7 +455,6 @@ public class TestSpillableMemoryChannel { threw = true; } Assert.assertTrue("byteCapacity did not throw as expected", threw); - } @Test @@ -473,7 +464,7 @@ public class TestSpillableMemoryChannel { params.put("memoryCapacity", "5"); params.put("overflowCapacity", "15"); params.put(FileChannelConfiguration.TRANSACTION_CAPACITY, "10"); - params.put("overflowTimeout", "1" ); + params.put("overflowTimeout", "1"); startChannel(params); transactionalPutN(1, 5, channel); @@ -493,13 +484,13 @@ public class TestSpillableMemoryChannel { transactionalTakeN(6, 5, channel); // from overflow transactionalTakeN(11, 5, channel); // from overflow - transactionalTakeN(16, 2,channel); // from overflow + transactionalTakeN(16, 2, channel); // from overflow transactionalPutN(21, 5, channel); tx = channel.getTransaction(); tx.begin(); - takeN(18,3, channel); // from overflow + takeN(18, 3, channel); // from overflow takeNull(channel); // expect null since next event is in primary tx.commit(); tx.close(); @@ -516,9 +507,8 @@ public class TestSpillableMemoryChannel { params.put("overflowTimeout", "0"); startChannel(params); - //1 Rollback for Puts - transactionalPutN(1,5, channel); + transactionalPutN(1, 5, channel); Transaction tx = channel.getTransaction(); tx.begin(); putN(6, 5, channel); @@ -530,8 +520,7 @@ public class TestSpillableMemoryChannel { //2. verify things back to normal after put rollback transactionalPutN(11, 5, channel); - transactionalTakeN(11,5,channel); - + transactionalTakeN(11, 5, channel); //3 Rollback for Takes transactionalPutN(16, 5, channel); @@ -545,13 +534,12 @@ public class TestSpillableMemoryChannel { transactionalTakeN_NoCheck(5, channel); //4. verify things back to normal after take rollback - transactionalPutN(21,5, channel); - transactionalTakeN(21,5,channel); + transactionalPutN(21, 5, channel); + transactionalTakeN(21, 5, channel); } - @Test - public void testReconfigure() { + public void testReconfigure() { //1) bring up with small capacity Map<String, String> params = new HashMap<String, String>(); params.put("memoryCapacity", "10"); @@ -559,12 +547,12 @@ public class TestSpillableMemoryChannel { params.put("overflowTimeout", "0"); startChannel(params); - Assert.assertTrue("overflowTimeout setting did not reconfigure correctly" - , channel.getOverflowTimeout() == 0); - Assert.assertTrue("memoryCapacity did not reconfigure correctly" - , channel.getMemoryCapacity() == 10); - Assert.assertTrue("overflowCapacity did not reconfigure correctly" - , channel.isOverflowDisabled() ); + Assert.assertTrue("overflowTimeout setting did not reconfigure correctly", + channel.getOverflowTimeout() == 0); + Assert.assertTrue("memoryCapacity did not reconfigure correctly", + channel.getMemoryCapacity() == 10); + Assert.assertTrue("overflowCapacity did not reconfigure correctly", + channel.isOverflowDisabled()); transactionalPutN(1, 10, channel); boolean threw = false; @@ -574,8 +562,7 @@ public class TestSpillableMemoryChannel { threw = true; } Assert.assertTrue("Expected the channel to fill up and throw an exception, " - + "but it did not throw", threw); - + + "but it did not throw", threw); //2) Resize and verify params = new HashMap<String, String>(); @@ -583,12 +570,13 @@ public class TestSpillableMemoryChannel { params.put("overflowCapacity", "0"); reconfigureChannel(params); - Assert.assertTrue("overflowTimeout setting did not reconfigure correctly" - , channel.getOverflowTimeout() == SpillableMemoryChannel.defaultOverflowTimeout); - Assert.assertTrue("memoryCapacity did not reconfigure correctly" - , channel.getMemoryCapacity() == 20); - Assert.assertTrue("overflowCapacity did not reconfigure correctly" - , channel.isOverflowDisabled() ); + Assert.assertTrue("overflowTimeout setting did not reconfigure correctly", + channel.getOverflowTimeout() == + SpillableMemoryChannel.defaultOverflowTimeout); + Assert.assertTrue("memoryCapacity did not reconfigure correctly", + channel.getMemoryCapacity() == 20); + Assert.assertTrue("overflowCapacity did not reconfigure correctly", + channel.isOverflowDisabled()); // pull out the values inserted prior to reconfiguration transactionalTakeN(1, 10, channel); @@ -603,25 +591,25 @@ public class TestSpillableMemoryChannel { threw = true; } Assert.assertTrue("Expected the channel to fill up and throw an exception, " - + "but it did not throw", threw); + + "but it did not throw", threw); transactionalTakeN(11, 10, channel); transactionalTakeN(21, 10, channel); - // 3) Reconfigure with empty config and verify settings revert to default params = new HashMap<String, String>(); reconfigureChannel(params); - Assert.assertTrue("overflowTimeout setting did not reconfigure correctly" - , channel.getOverflowTimeout() == SpillableMemoryChannel.defaultOverflowTimeout); - Assert.assertTrue("memoryCapacity did not reconfigure correctly" - , channel.getMemoryCapacity() == SpillableMemoryChannel.defaultMemoryCapacity); - Assert.assertTrue("overflowCapacity did not reconfigure correctly" - , channel.getOverflowCapacity() == SpillableMemoryChannel.defaultOverflowCapacity); - Assert.assertFalse("overflowCapacity did not reconfigure correctly" - , channel.isOverflowDisabled()); - + Assert.assertTrue("overflowTimeout setting did not reconfigure correctly", + channel.getOverflowTimeout() == + SpillableMemoryChannel.defaultOverflowTimeout); + Assert.assertTrue("memoryCapacity did not reconfigure correctly", + channel.getMemoryCapacity() == SpillableMemoryChannel.defaultMemoryCapacity); + Assert.assertTrue("overflowCapacity did not reconfigure correctly", + channel.getOverflowCapacity() == + SpillableMemoryChannel.defaultOverflowCapacity); + Assert.assertFalse("overflowCapacity did not reconfigure correctly", + channel.isOverflowDisabled()); // 4) Reconfiguring of overflow params = new HashMap<String, String>(); @@ -631,19 +619,18 @@ public class TestSpillableMemoryChannel { params.put("overflowTimeout", "1"); reconfigureChannel(params); - transactionalPutN( 1,5, channel); - transactionalPutN( 6,5, channel); - transactionalPutN(11,5, channel); - transactionalPutN(16,5, channel); - threw=false; + transactionalPutN(1, 5, channel); + transactionalPutN(6, 5, channel); + transactionalPutN(11, 5, channel); + transactionalPutN(16, 5, channel); + threw = false; try { // should error out as both primary & overflow are full - transactionalPutN(21,5, channel); + transactionalPutN(21, 5, channel); } catch (ChannelException e) { threw = true; } - Assert.assertTrue("Expected the last insertion to fail, but it didn't." - , threw); + Assert.assertTrue("Expected the last insertion to fail, but it didn't.", threw); // reconfig the overflow params = new HashMap<String, String>(); @@ -654,10 +641,10 @@ public class TestSpillableMemoryChannel { reconfigureChannel(params); // should succeed now as we have made room in the overflow - transactionalPutN(21,5, channel); + transactionalPutN(21, 5, channel); - transactionalTakeN(1,10, channel); - transactionalTakeN(11,5, channel); + transactionalTakeN(1, 10, channel); + transactionalTakeN(11, 5, channel); transactionalTakeN(16, 5, channel); transactionalTakeN(21, 5, channel); } @@ -666,13 +653,13 @@ public class TestSpillableMemoryChannel { public void testParallelSingleSourceAndSink() throws InterruptedException { Map<String, String> params = new HashMap<String, String>(); params.put("memoryCapacity", "1000020"); - params.put("overflowCapacity", "0"); + params.put("overflowCapacity", "0"); params.put("overflowTimeout", "3"); startChannel(params); // run source and sink concurrently Thread sourceThd = makePutThread("src", 1, 500000, 100, channel); - Thread sinkThd = makeTakeThread("sink", 1, 500000, 100, channel); + Thread sinkThd = makeTakeThread("sink", 1, 500000, 100, channel); StopWatch watch = new StopWatch(); @@ -683,15 +670,15 @@ public class TestSpillableMemoryChannel { sinkThd.join(); watch.elapsed(); - System.out.println("Max Queue size " + channel.getMaxMemQueueSize() ); + System.out.println("Max Queue size " + channel.getMaxMemQueueSize()); } @Test public void testCounters() throws InterruptedException { Map<String, String> params = new HashMap<String, String>(); - params.put("memoryCapacity", "5000"); - params.put("overflowCapacity","5000"); - params.put("transactionCapacity","5000"); + params.put("memoryCapacity", "5000"); + params.put("overflowCapacity", "5000"); + params.put("transactionCapacity", "5000"); params.put("overflowTimeout", "0"); startChannel(params); @@ -706,7 +693,7 @@ public class TestSpillableMemoryChannel { Assert.assertEquals(5000, channel.channelCounter.getEventPutSuccessCount()); //2. empty mem queue - Thread sinkThd = makeTakeThread("sink", 1, 5000, 1000, channel); + Thread sinkThd = makeTakeThread("sink", 1, 5000, 1000, channel); sinkThd.start(); sinkThd.join(); Assert.assertEquals(0, channel.getTotalStored()); @@ -714,7 +701,6 @@ public class TestSpillableMemoryChannel { Assert.assertEquals(5000, channel.channelCounter.getEventTakeAttemptCount()); Assert.assertEquals(5000, channel.channelCounter.getEventTakeSuccessCount()); - //3. fill up mem & overflow sourceThd = makePutThread("src", 1, 10000, 1000, channel); sourceThd.start(); @@ -724,9 +710,8 @@ public class TestSpillableMemoryChannel { Assert.assertEquals(15000, channel.channelCounter.getEventPutAttemptCount()); Assert.assertEquals(15000, channel.channelCounter.getEventPutSuccessCount()); - //4. empty memory - sinkThd = makeTakeThread("sink", 1, 5000, 1000, channel); + sinkThd = makeTakeThread("sink", 1, 5000, 1000, channel); sinkThd.start(); sinkThd.join(); Assert.assertEquals(5000, channel.getTotalStored()); @@ -745,12 +730,10 @@ public class TestSpillableMemoryChannel { Assert.assertEquals(15000, channel.channelCounter.getEventTakeAttemptCount()); Assert.assertEquals(15000, channel.channelCounter.getEventTakeSuccessCount()); - - //6. now do it concurrently sourceThd = makePutThread("src1", 1, 5000, 1000, channel); Thread sourceThd2 = makePutThread("src2", 1, 5000, 500, channel); - sinkThd = makeTakeThread_noCheck("sink1", 5000, 1000, channel); + sinkThd = makeTakeThread_noCheck("sink1", 5000, 1000, channel); sourceThd.start(); sourceThd2.start(); sinkThd.start(); @@ -759,8 +742,8 @@ public class TestSpillableMemoryChannel { sinkThd.join(); Assert.assertEquals(5000, channel.getTotalStored()); Assert.assertEquals(5000, channel.channelCounter.getChannelSize()); - Thread sinkThd2 = makeTakeThread_noCheck("sink2", 2500, 500, channel); - Thread sinkThd3 = makeTakeThread_noCheck("sink3", 2500, 1000, channel); + Thread sinkThd2 = makeTakeThread_noCheck("sink2", 2500, 500, channel); + Thread sinkThd3 = makeTakeThread_noCheck("sink3", 2500, 1000, channel); sinkThd2.start(); sinkThd3.start(); sinkThd2.join(); @@ -769,30 +752,26 @@ public class TestSpillableMemoryChannel { Assert.assertEquals(0, channel.channelCounter.getChannelSize()); Assert.assertEquals(25000, channel.channelCounter.getEventTakeSuccessCount()); Assert.assertEquals(25000, channel.channelCounter.getEventPutSuccessCount()); - Assert.assertTrue("TakeAttempt channel counter value larger than expected" , - 25000 <= channel.channelCounter.getEventTakeAttemptCount()); + Assert.assertTrue("TakeAttempt channel counter value larger than expected", + 25000 <= channel.channelCounter.getEventTakeAttemptCount()); Assert.assertTrue("PutAttempt channel counter value larger than expected", - 25000 <= channel.channelCounter.getEventPutAttemptCount()); + 25000 <= channel.channelCounter.getEventPutAttemptCount()); } - public ArrayList<Thread> createSourceThreads(int count, int totalEvents - , int batchSize) { + public ArrayList<Thread> createSourceThreads(int count, int totalEvents, int batchSize) { ArrayList<Thread> sourceThds = new ArrayList<Thread>(); for (int i = 0; i < count; ++i) { - sourceThds.add( makePutThread("src" + i, 1, totalEvents/count - , batchSize, channel) ); + sourceThds.add(makePutThread("src" + i, 1, totalEvents / count, batchSize, channel)); } return sourceThds; } - public ArrayList<Thread> createSinkThreads(int count, int totalEvents - , int batchSize) { + public ArrayList<Thread> createSinkThreads(int count, int totalEvents, int batchSize) { ArrayList<Thread> sinkThreads = new ArrayList<Thread>(count); for (int i = 0; i < count; ++i) { - sinkThreads.add( makeTakeThread_noCheck("sink"+i, totalEvents/count - , batchSize, channel) ); + sinkThreads.add(makeTakeThread_noCheck("sink" + i, totalEvents / count, batchSize, channel)); } return sinkThreads; } @@ -803,13 +782,12 @@ public class TestSpillableMemoryChannel { } } - public void joinThreads(ArrayList<Thread> threads) - throws InterruptedException { + public void joinThreads(ArrayList<Thread> threads) throws InterruptedException { for (Thread thread : threads) { try { thread.join(); } catch (InterruptedException e) { - System.out.println("Interrupted while waiting on " + thread.getName() ); + System.out.println("Interrupted while waiting on " + thread.getName()); throw e; } } @@ -824,15 +802,13 @@ public class TestSpillableMemoryChannel { Map<String, String> params = new HashMap<String, String>(); params.put("memoryCapacity", "0"); - params.put("overflowCapacity", "500020"); + params.put("overflowCapacity", "500020"); params.put("overflowTimeout", "3"); startChannel(params); ArrayList<Thread> sinks = createSinkThreads(sinkCount, eventCount, batchSize); - ArrayList<Thread> sources = createSourceThreads(sourceCount - , eventCount, batchSize); - + ArrayList<Thread> sources = createSourceThreads(sourceCount, eventCount, batchSize); StopWatch watch = new StopWatch(); startThreads(sinks); @@ -845,7 +821,7 @@ public class TestSpillableMemoryChannel { System.out.println("Total puts " + channel.drainOrder.totalPuts); - System.out.println("Max Queue size " + channel.getMaxMemQueueSize() ); + System.out.println("Max Queue size " + channel.getMaxMemQueueSize()); System.out.println(channel.memQueue.size()); System.out.println("done"); @@ -872,10 +848,10 @@ public class TestSpillableMemoryChannel { if (elapsed < 10000) { System.out.println(Thread.currentThread().getName() - + " : [ " + elapsed + " ms ]. " + suffix); + + " : [ " + elapsed + " ms ]. " + suffix); } else { System.out.println(Thread.currentThread().getName() - + " : [ " + elapsed / 1000 + " sec ]. " + suffix); + + " : [ " + elapsed / 1000 + " sec ]. " + suffix); } } } http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLoadBalancingLog4jAppender.java ---------------------------------------------------------------------- diff --git a/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLoadBalancingLog4jAppender.java b/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLoadBalancingLog4jAppender.java index 267ac1d..53795fb 100644 --- a/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLoadBalancingLog4jAppender.java +++ b/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLoadBalancingLog4jAppender.java @@ -65,7 +65,7 @@ public class TestLoadBalancingLog4jAppender { private boolean slowDown = false; @Before - public void initiate() throws InterruptedException{ + public void initiate() throws InterruptedException { ch = new MemoryChannel(); configureChannel(); @@ -164,9 +164,9 @@ public class TestLoadBalancingLog4jAppender { @Test public void testRandomBackoffUnsafeMode() throws Exception { File TESTFILE = new File(TestLoadBalancingLog4jAppender.class - .getClassLoader() - .getResource("flume-loadbalancing-backoff-log4jtest.properties") - .getFile()); + .getClassLoader() + .getResource("flume-loadbalancing-backoff-log4jtest.properties") + .getFile()); startSources(TESTFILE, true, new int[]{25430, 25431, 25432}); sources.get(0).setFail(); @@ -179,9 +179,9 @@ public class TestLoadBalancingLog4jAppender { @Test (expected = EventDeliveryException.class) public void testTimeout() throws Throwable { File TESTFILE = new File(TestLoadBalancingLog4jAppender.class - .getClassLoader() - .getResource("flume-loadbalancinglog4jtest.properties") - .getFile()); + .getClassLoader() + .getResource("flume-loadbalancinglog4jtest.properties") + .getFile()); ch = new TestLog4jAppender.SlowMemoryChannel(2000); configureChannel(); @@ -200,9 +200,9 @@ public class TestLoadBalancingLog4jAppender { @Test(expected = EventDeliveryException.class) public void testRandomBackoffNotUnsafeMode() throws Throwable { File TESTFILE = new File(TestLoadBalancingLog4jAppender.class - .getClassLoader() - .getResource("flume-loadbalancing-backoff-log4jtest.properties") - .getFile()); + .getClassLoader() + .getResource("flume-loadbalancing-backoff-log4jtest.properties") + .getFile()); startSources(TESTFILE, false, new int[]{25430, 25431, 25432}); sources.get(0).setFail(); @@ -224,17 +224,17 @@ public class TestLoadBalancingLog4jAppender { } private void sendAndAssertFail() throws IOException { - int level = 20000; - String msg = "This is log message number" + String.valueOf(level); - fixture.log(Level.toLevel(level), msg); + int level = 20000; + String msg = "This is log message number" + String.valueOf(level); + fixture.log(Level.toLevel(level), msg); - Transaction transaction = ch.getTransaction(); - transaction.begin(); - Event event = ch.take(); - Assert.assertNull(event); + Transaction transaction = ch.getTransaction(); + transaction.begin(); + Event event = ch.take(); + Assert.assertNull(event); - transaction.commit(); - transaction.close(); + transaction.commit(); + transaction.close(); } @@ -271,8 +271,7 @@ public class TestLoadBalancingLog4jAppender { } private void startSources(File log4jProps, boolean unsafeMode, int... ports) - throws - IOException { + throws IOException { for (int port : ports) { CountingAvroSource source = new CountingAvroSource(port); Context context = new Context(); @@ -297,8 +296,8 @@ public class TestLoadBalancingLog4jAppender { Properties props = new Properties(); props.load(reader); props.setProperty("log4j.appender.out2.UnsafeMode", - String.valueOf(unsafeMode)); - if(slowDown) { + String.valueOf(unsafeMode)); + if (slowDown) { props.setProperty("log4j.appender.out2.Timeout", String.valueOf(1000)); } PropertyConfigurator.configure(props); @@ -308,13 +307,13 @@ public class TestLoadBalancingLog4jAppender { static class CountingAvroSource extends AvroSource { AtomicInteger appendCount = new AtomicInteger(); volatile boolean isFail = false; - private final int port2; + private final int port2; public CountingAvroSource(int port) { - port2 = port; + port2 = port; } - public void setOk() { + public void setOk() { this.isFail = false; } http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLog4jAppender.java ---------------------------------------------------------------------- diff --git a/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLog4jAppender.java b/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLog4jAppender.java index 1b840f3..c087b67 100644 --- a/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLog4jAppender.java +++ b/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLog4jAppender.java @@ -21,7 +21,10 @@ package org.apache.flume.clients.log4jappender; import java.io.File; import java.io.FileReader; import java.io.IOException; -import java.util.*; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Properties; import java.util.concurrent.TimeUnit; import junit.framework.Assert; @@ -46,13 +49,13 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; -public class TestLog4jAppender{ +public class TestLog4jAppender { private AvroSource source; private Channel ch; private Properties props; @Before - public void initiate() throws Exception{ + public void initiate() throws Exception { int port = 25430; source = new AvroSource(); ch = new MemoryChannel(); @@ -88,13 +91,13 @@ public class TestLog4jAppender{ configureSource(); PropertyConfigurator.configure(props); Logger logger = LogManager.getLogger(TestLog4jAppender.class); - for(int count = 0; count <= 1000; count++){ + for (int count = 0; count <= 1000; count++) { /* * Log4j internally defines levels as multiples of 10000. So if we * create levels directly using count, the level will be set as the * default. */ - int level = ((count % 5)+1)*10000; + int level = ((count % 5) + 1) * 10000; String msg = "This is log message number" + String.valueOf(count); logger.log(Level.toLevel(level), msg); @@ -146,11 +149,11 @@ public class TestLog4jAppender{ } private void sendAndAssertFail(Logger logger) throws Throwable { - /* - * Log4j internally defines levels as multiples of 10000. So if we - * create levels directly using count, the level will be set as the - * default. - */ + /* + * Log4j internally defines levels as multiples of 10000. So if we + * create levels directly using count, the level will be set as the + * default. + */ int level = 20000; try { logger.log(Level.toLevel(level), "Test Msg"); @@ -177,13 +180,13 @@ public class TestLog4jAppender{ PropertyConfigurator.configure(props); Logger logger = LogManager.getLogger(TestLog4jAppender.class); Thread.currentThread().setName("Log4jAppenderTest"); - for(int count = 0; count <= 100; count++){ + for (int count = 0; count <= 100; count++) { /* * Log4j internally defines levels as multiples of 10000. So if we * create levels directly using count, the level will be set as the * default. */ - int level = ((count % 5)+1)*10000; + int level = ((count % 5) + 1) * 10000; String msg = "This is log message number" + String.valueOf(count); logger.log(Level.toLevel(level), msg); @@ -230,7 +233,7 @@ public class TestLog4jAppender{ props.put("log4j.appender.out2.Timeout", "1000"); props.put("log4j.appender.out2.layout", "org.apache.log4j.PatternLayout"); props.put("log4j.appender.out2.layout.ConversionPattern", - "%-5p [%t]: %m%n"); + "%-5p [%t]: %m%n"); PropertyConfigurator.configure(props); Logger logger = LogManager.getLogger(TestLog4jAppender.class); Thread.currentThread().setName("Log4jAppenderTest"); @@ -251,13 +254,12 @@ public class TestLog4jAppender{ @After - public void cleanUp(){ + public void cleanUp() { source.stop(); ch.stop(); props.clear(); } - static class SlowMemoryChannel extends MemoryChannel { private final int slowTime; http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLog4jAppenderWithAvro.java ---------------------------------------------------------------------- diff --git a/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLog4jAppenderWithAvro.java b/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLog4jAppenderWithAvro.java index 5899c62..0607e3a 100644 --- a/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLog4jAppenderWithAvro.java +++ b/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLog4jAppenderWithAvro.java @@ -125,8 +125,7 @@ public class TestLog4jAppenderWithAvro { Assert.assertNull(hdrs.get(Log4jAvroHeaders.MESSAGE_ENCODING.toString())); Assert.assertEquals("Schema URL should be set", - "file:///tmp/myrecord.avsc", hdrs.get(Log4jAvroHeaders.AVRO_SCHEMA_URL.toString - ())); + "file:///tmp/myrecord.avsc", hdrs.get(Log4jAvroHeaders.AVRO_SCHEMA_URL.toString())); Assert.assertNull("Schema string should not be set", hdrs.get(Log4jAvroHeaders.AVRO_SCHEMA_LITERAL.toString())); @@ -174,7 +173,7 @@ public class TestLog4jAppenderWithAvro { } @After - public void cleanUp(){ + public void cleanUp() { source.stop(); ch.stop(); props.clear(); http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-core/src/test/java/org/apache/flume/channel/AbstractBasicChannelSemanticsTest.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/channel/AbstractBasicChannelSemanticsTest.java b/flume-ng-core/src/test/java/org/apache/flume/channel/AbstractBasicChannelSemanticsTest.java index 59a804c..7600856 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/channel/AbstractBasicChannelSemanticsTest.java +++ b/flume-ng-core/src/test/java/org/apache/flume/channel/AbstractBasicChannelSemanticsTest.java @@ -40,6 +40,7 @@ import com.google.common.base.Preconditions; public abstract class AbstractBasicChannelSemanticsTest { protected static List<Event> events; + static { Event[] array = new Event[7]; for (int i = 0; i < array.length; ++i) { @@ -61,7 +62,7 @@ public abstract class AbstractBasicChannelSemanticsTest { THROW_RUNTIME, THROW_CHANNEL, SLEEP - }; + } private Mode mode = Mode.NORMAL; private boolean lastTransactionCommitted = false; @@ -158,11 +159,11 @@ public abstract class AbstractBasicChannelSemanticsTest { protected static class TestError extends Error { static final long serialVersionUID = -1; - }; + } protected static class TestRuntimeException extends RuntimeException { static final long serialVersionUID = -1; - }; + } protected void testException(Class<? extends Throwable> exceptionClass, Runnable test) { http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-core/src/test/java/org/apache/flume/channel/TestChannelProcessor.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/channel/TestChannelProcessor.java b/flume-ng-core/src/test/java/org/apache/flume/channel/TestChannelProcessor.java index b37b823..93bc0cf 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/channel/TestChannelProcessor.java +++ b/flume-ng-core/src/test/java/org/apache/flume/channel/TestChannelProcessor.java @@ -20,16 +20,22 @@ package org.apache.flume.channel; import com.google.common.base.Charsets; import com.google.common.collect.Lists; - -import java.util.ArrayList; -import java.util.List; - -import org.apache.flume.*; +import org.apache.flume.Channel; +import org.apache.flume.ChannelException; +import org.apache.flume.ChannelSelector; +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.flume.Transaction; import org.apache.flume.conf.Configurables; import org.apache.flume.event.EventBuilder; import org.junit.Assert; import org.junit.Test; -import static org.mockito.Mockito.*; + +import java.util.ArrayList; +import java.util.List; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class TestChannelProcessor { @@ -88,9 +94,9 @@ public class TestChannelProcessor { public void testRequiredAndOptionalChannels() { Context context = new Context(); ArrayList<Channel> channels = new ArrayList<Channel>(); - for(int i = 0; i < 4; i++) { + for (int i = 0; i < 4; i++) { Channel ch = new MemoryChannel(); - ch.setName("ch"+i); + ch.setName("ch" + i); Configurables.configure(ch, context); channels.add(ch); } @@ -114,7 +120,7 @@ public class TestChannelProcessor { } catch (InterruptedException e) { } - for(Channel channel : channels) { + for (Channel channel : channels) { Transaction transaction = channel.getTransaction(); transaction.begin(); Event event_ch = channel.take(); @@ -124,18 +130,18 @@ public class TestChannelProcessor { } List<Event> events = Lists.newArrayList(); - for(int i = 0; i < 100; i ++) { - events.add(EventBuilder.withBody("event "+i, Charsets.UTF_8)); + for (int i = 0; i < 100; i++) { + events.add(EventBuilder.withBody("event " + i, Charsets.UTF_8)); } processor.processEventBatch(events); try { Thread.sleep(3000); } catch (InterruptedException e) { } - for(Channel channel : channels) { + for (Channel channel : channels) { Transaction transaction = channel.getTransaction(); transaction.begin(); - for(int i = 0; i < 100; i ++) { + for (int i = 0; i < 100; i++) { Event event_ch = channel.take(); Assert.assertNotNull(event_ch); }
