http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/CipherProvider.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/CipherProvider.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/CipherProvider.java index 2c71bc9..2c516f9 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/CipherProvider.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/CipherProvider.java @@ -27,44 +27,45 @@ public abstract class CipherProvider { public abstract Encryptor.Builder<?> newEncryptorBuilder(); public abstract Decryptor.Builder<?> newDecryptorBuilder(); - public static abstract class Encryptor { + public abstract static class Encryptor { public abstract byte[] encrypt(byte[] clearText); public abstract byte[] getParameters(); public abstract String getCodec(); /** Builder implementations MUST have a no-arg constructor */ - public static abstract class Builder<T extends Encryptor> { + public abstract static class Builder<T extends Encryptor> { protected Key key; + public Builder<T> setKey(Key key) { this.key = Preconditions.checkNotNull(key, "key cannot be null"); return this; } + public abstract T build(); } - } - public static abstract class Decryptor { + public abstract static class Decryptor { public abstract byte[] decrypt(byte[] cipherText); public abstract String getCodec(); /** Builder implementations MUST have a no-arg constructor */ - public static abstract class Builder<T extends Decryptor> { + public abstract static class Builder<T extends Decryptor> { protected byte[] parameters; protected Key key; + public Builder<T> setKey(Key key) { this.key = Preconditions.checkNotNull(key, "key cannot be null"); return this; } + public Builder<T> setParameters(byte[] parameters) { this.parameters = parameters; return this; } + public abstract T build(); } - } - - }
http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/CipherProviderFactory.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/CipherProviderFactory.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/CipherProviderFactory.java index ca11f6b..85b0fbb 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/CipherProviderFactory.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/CipherProviderFactory.java @@ -33,7 +33,7 @@ public class CipherProviderFactory { public static CipherProvider.Encryptor getEncrypter(String cipherProviderType, Key key) { - if(cipherProviderType == null) { + if (cipherProviderType == null) { return null; } CipherProvider provider = getProvider(cipherProviderType); @@ -41,7 +41,7 @@ public class CipherProviderFactory { } public static CipherProvider.Decryptor getDecrypter(String cipherProviderType, Key key, byte[] parameters) { - if(cipherProviderType == null) { + if (cipherProviderType == null) { return null; } CipherProvider provider = getProvider(cipherProviderType); http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/CipherProviderType.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/CipherProviderType.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/CipherProviderType.java index 87834d7..73bc720 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/CipherProviderType.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/CipherProviderType.java @@ -18,7 +18,6 @@ */ package org.apache.flume.channel.file.encryption; - public enum CipherProviderType { AESCTRNOPADDING(AESCTRNoPaddingProvider.class), OTHER(null); http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/DecryptionFailureException.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/DecryptionFailureException.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/DecryptionFailureException.java index 0155c39..beffd9e 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/DecryptionFailureException.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/DecryptionFailureException.java @@ -27,7 +27,6 @@ import org.apache.flume.FlumeException; public class DecryptionFailureException extends FlumeException { private static final long serialVersionUID = 6646810195384793646L; - public DecryptionFailureException(String msg) { super(msg); } http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/JCEFileKeyProvider.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/JCEFileKeyProvider.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/JCEFileKeyProvider.java index f961ef9..c96cf0a 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/JCEFileKeyProvider.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/JCEFileKeyProvider.java @@ -55,7 +55,7 @@ public class JCEFileKeyProvider extends KeyProvider { keyStorePassword = Files.toString(keyStorePasswordFile, Charsets.UTF_8) .trim().toCharArray(); ks.load(new FileInputStream(keyStoreFile), keyStorePassword); - } catch(Exception ex) { + } catch (Exception ex) { throw Throwables.propagate(ex); } } @@ -65,14 +65,14 @@ public class JCEFileKeyProvider extends KeyProvider { String passwordFile = keyStorePasswordFile.getAbsolutePath(); try { char[] keyPassword = keyStorePassword; - if(aliasPasswordFileMap.containsKey(alias)) { + if (aliasPasswordFileMap.containsKey(alias)) { File keyPasswordFile = aliasPasswordFileMap.get(alias); keyPassword = Files.toString(keyPasswordFile, Charsets.UTF_8).trim().toCharArray(); passwordFile = keyPasswordFile.getAbsolutePath(); } Key key = ks.getKey(alias, keyPassword); - if(key == null) { + if (key == null) { throw new IllegalStateException("KeyStore returned null for " + alias); } return key; @@ -99,13 +99,13 @@ public class JCEFileKeyProvider extends KeyProvider { EncryptionConfiguration.JCE_FILE_KEYS); Preconditions.checkState(!Strings.isNullOrEmpty(passwordProtectedKeys), "Keys available to KeyStore was not specified or empty"); - for(String passwordName : passwordProtectedKeys.trim().split("\\s+")) { + for (String passwordName : passwordProtectedKeys.trim().split("\\s+")) { String propertyName = Joiner.on(".").join(EncryptionConfiguration.JCE_FILE_KEYS, passwordName, EncryptionConfiguration.JCE_FILE_KEY_PASSWORD_FILE); String passwordFileName = context.getString(propertyName, keyStorePasswordFileName); File passwordFile = new File(passwordFileName.trim()); - if(passwordFile.isFile()) { + if (passwordFile.isFile()) { aliasPasswordFileMap.put(passwordName, passwordFile); } else { logger.warn("Password file for alias " + passwordName + http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/KeyProviderType.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/KeyProviderType.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/KeyProviderType.java index 0fef6dc..3263615 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/KeyProviderType.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/KeyProviderType.java @@ -18,7 +18,6 @@ */ package org.apache.flume.channel.file.encryption; - public enum KeyProviderType { JCEKSFILE(JCEFileKeyProvider.Builder.class), OTHER(null); http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/proto/ProtosFactory.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/proto/ProtosFactory.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/proto/ProtosFactory.java index 34f93d9..50492cc 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/proto/ProtosFactory.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/proto/ProtosFactory.java @@ -5010,18 +5010,21 @@ public final class ProtosFactory { public interface RollbackOrBuilder extends com.google.protobuf.MessageOrBuilder { } + /** * Protobuf type {@code Rollback} */ - public static final class Rollback extends - com.google.protobuf.GeneratedMessage - implements RollbackOrBuilder { + public static final class Rollback extends com.google.protobuf.GeneratedMessage + implements RollbackOrBuilder { // Use Rollback.newBuilder() to construct. private Rollback(com.google.protobuf.GeneratedMessage.Builder<?> builder) { super(builder); this.unknownFields = builder.getUnknownFields(); } - private Rollback(boolean noInit) { this.unknownFields = com.google.protobuf.UnknownFieldSet.getDefaultInstance(); } + + private Rollback(boolean noInit) { + this.unknownFields = com.google.protobuf.UnknownFieldSet.getDefaultInstance(); + } private static final Rollback defaultInstance; public static Rollback getDefaultInstance() { http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/ConfigurationConstants.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/ConfigurationConstants.java b/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/ConfigurationConstants.java index 3b97684..7138b41 100644 --- a/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/ConfigurationConstants.java +++ b/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/ConfigurationConstants.java @@ -89,7 +89,6 @@ public final class ConfigurationConstants { public static final String OLD_CONFIG_CREATE_SCHEMA = PREFIX + CONFIG_CREATE_SCHEMA; - public static final String CONFIG_CREATE_INDEX = "create.index"; /** http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/JdbcChannel.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/JdbcChannel.java b/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/JdbcChannel.java index 1192452..fba6e7b 100644 --- a/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/JdbcChannel.java +++ b/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/JdbcChannel.java @@ -27,6 +27,7 @@ import org.apache.flume.annotations.Disposable; import org.apache.flume.channel.AbstractChannel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + /** * <p> * A JDBC based channel implementation. http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/JdbcChannelProvider.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/JdbcChannelProvider.java b/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/JdbcChannelProvider.java index e445d61..76bc627 100644 --- a/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/JdbcChannelProvider.java +++ b/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/JdbcChannelProvider.java @@ -29,7 +29,7 @@ public interface JdbcChannelProvider { /** * Initializes the channel provider. This method must be called before * the channel can be used in any way. - * @param properties the configuration for the system + * @param context the configuration for the system */ public void initialize(Context context); http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/DerbySchemaHandler.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/DerbySchemaHandler.java b/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/DerbySchemaHandler.java index 2dc3fcc..56eebfd 100644 --- a/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/DerbySchemaHandler.java +++ b/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/DerbySchemaHandler.java @@ -127,7 +127,6 @@ public class DerbySchemaHandler implements SchemaHandler { private static final Logger LOGGER = LoggerFactory.getLogger(DerbySchemaHandler.class); - private static final String QUREY_SYSCHEMA_FLUME = "SELECT SCHEMAID FROM SYS.SYSSCHEMAS WHERE SCHEMANAME = 'FLUME'"; @@ -613,15 +612,15 @@ public class DerbySchemaHandler implements SchemaHandler { // Persist the payload spill if (hasSpillPayload) { - spillEventStmt = connection.prepareStatement(STMT_INSERT_EVENT_SPILL); - spillEventStmt.setLong(1, eventId); - spillEventStmt.setBinaryStream(2, - new ByteArrayInputStream(spillPayload), spillPayload.length); - int spillEventCount = spillEventStmt.executeUpdate(); - if (spillEventCount != 1) { - throw new JdbcChannelException("Invalid update count on spill " - + "event insert: " + spillEventCount); - } + spillEventStmt = connection.prepareStatement(STMT_INSERT_EVENT_SPILL); + spillEventStmt.setLong(1, eventId); + spillEventStmt.setBinaryStream(2, + new ByteArrayInputStream(spillPayload), spillPayload.length); + int spillEventCount = spillEventStmt.executeUpdate(); + if (spillEventCount != 1) { + throw new JdbcChannelException("Invalid update count on spill " + + "event insert: " + spillEventCount); + } } // Persist the headers @@ -645,8 +644,7 @@ public class DerbySchemaHandler implements SchemaHandler { int updateCount = baseHeaderStmt.executeUpdate(); if (updateCount != 1) { - throw new JdbcChannelException("Unexpected update header count: " - + updateCount); + throw new JdbcChannelException("Unexpected update header count: " + updateCount); } ResultSet headerIdResultSet = baseHeaderStmt.getGeneratedKeys(); if (!headerIdResultSet.next()) { @@ -705,7 +703,7 @@ public class DerbySchemaHandler implements SchemaHandler { headerValueSpillStmt = connection.prepareStatement(STMT_INSERT_HEADER_VALUE_SPILL); - for(HeaderEntry entry : headerWithValueSpill) { + for (HeaderEntry entry : headerWithValueSpill) { String valueSpill = entry.getValue().getSpill(); headerValueSpillStmt.setLong(1, entry.getId()); http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java b/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java index f42b4dd..845b794 100644 --- a/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java +++ b/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java @@ -54,12 +54,10 @@ public class JdbcChannelProviderImpl implements JdbcChannelProvider { private static final Logger LOGGER = LoggerFactory.getLogger(JdbcChannelProviderImpl.class); - private static final String EMBEDDED_DERBY_DRIVER_CLASSNAME - = "org.apache.derby.jdbc.EmbeddedDriver"; + = "org.apache.derby.jdbc.EmbeddedDriver"; - private static final String DEFAULT_DRIVER_CLASSNAME - = EMBEDDED_DERBY_DRIVER_CLASSNAME; + private static final String DEFAULT_DRIVER_CLASSNAME = EMBEDDED_DERBY_DRIVER_CLASSNAME; private static final String DEFAULT_USERNAME = "sa"; private static final String DEFAULT_PASSWORD = ""; private static final String DEFAULT_DBTYPE = "DERBY"; @@ -133,7 +131,7 @@ public class JdbcChannelProviderImpl implements JdbcChannelProvider { for (String key: sysProps.keySet()) { String value = sysProps.get(key); - if(key != null && value != null) { + if (key != null && value != null) { System.setProperty(key, value); } } @@ -254,7 +252,7 @@ public class JdbcChannelProviderImpl implements JdbcChannelProvider { int index = connectUrl.indexOf(";"); String baseUrl = null; if (index != -1) { - baseUrl = connectUrl.substring(0, index+1); + baseUrl = connectUrl.substring(0, index + 1); } else { baseUrl = connectUrl + ";"; } @@ -440,12 +438,12 @@ public class JdbcChannelProviderImpl implements JdbcChannelProvider { databaseType = DatabaseType.getByName(dbTypeName); switch (databaseType) { - case DERBY: - case MYSQL: - break; - default: - throw new JdbcChannelException("Database " + databaseType - + " not supported at this time"); + case DERBY: + case MYSQL: + break; + default: + throw new JdbcChannelException("Database " + databaseType + + " not supported at this time"); } // Register driver http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcTransactionImpl.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcTransactionImpl.java b/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcTransactionImpl.java index 13b14f5..6f3aecd 100644 --- a/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcTransactionImpl.java +++ b/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcTransactionImpl.java @@ -28,7 +28,6 @@ import org.apache.flume.channel.jdbc.JdbcChannelException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - public class JdbcTransactionImpl implements Transaction { private static final Logger LOGGER = http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/SchemaHandler.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/SchemaHandler.java b/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/SchemaHandler.java index dba96fc..9bfc227 100644 --- a/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/SchemaHandler.java +++ b/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/SchemaHandler.java @@ -26,14 +26,12 @@ import java.sql.Connection; public interface SchemaHandler { /** - * @param connection the connection to check for schema. * @return true if the schema exists. False otherwise. */ public boolean schemaExists(); /** * Validates the schema. - * @param connection */ public void validateSchema(); @@ -74,8 +72,6 @@ public interface SchemaHandler { * must have an active transaction ongoing. This allows the provider impl to * enforce channel capacity limits when persisting events. * @return the current size of the channel. - * @param connection - * @return */ public long getChannelSize(Connection connection); } http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/SchemaHandlerFactory.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/SchemaHandlerFactory.java b/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/SchemaHandlerFactory.java index 2543848..35f4c61 100644 --- a/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/SchemaHandlerFactory.java +++ b/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/SchemaHandlerFactory.java @@ -27,19 +27,17 @@ import org.apache.flume.channel.jdbc.JdbcChannelException; */ public final class SchemaHandlerFactory { - public static SchemaHandler getHandler(DatabaseType dbType, - DataSource dataSource) { + public static SchemaHandler getHandler(DatabaseType dbType, DataSource dataSource) { SchemaHandler handler = null; - switch(dbType) { - case DERBY: - handler = new DerbySchemaHandler(dataSource); - break; - case MYSQL: - handler = new MySQLSchemaHandler(dataSource); - break; - default: - throw new JdbcChannelException("Database " + dbType - + " not supported yet"); + switch (dbType) { + case DERBY: + handler = new DerbySchemaHandler(dataSource); + break; + case MYSQL: + handler = new MySQLSchemaHandler(dataSource); + break; + default: + throw new JdbcChannelException("Database " + dbType + " not supported yet"); } return handler; http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java index dfc95bc..90e3288 100644 --- a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java +++ b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java @@ -73,7 +73,7 @@ import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.*; public class KafkaChannel extends BasicChannelSemantics { - private final static Logger logger = + private static final Logger logger = LoggerFactory.getLogger(KafkaChannel.class); private final Properties consumerProps = new Properties(); @@ -97,27 +97,27 @@ public class KafkaChannel extends BasicChannelSemantics { private KafkaChannelCounter counter; - /* Each Consumer commit will commit all partitions owned by it. To - * ensure that each partition is only committed when all events are - * actually done, we will need to keep a Consumer per thread. - */ + /* Each Consumer commit will commit all partitions owned by it. To + * ensure that each partition is only committed when all events are + * actually done, we will need to keep a Consumer per thread. + */ - private final ThreadLocal<ConsumerAndRecords> consumerAndRecords = new - ThreadLocal<ConsumerAndRecords>() { - @Override - public ConsumerAndRecords initialValue() { - return createConsumerAndRecords(); - } - }; + private final ThreadLocal<ConsumerAndRecords> consumerAndRecords = + new ThreadLocal<ConsumerAndRecords>() { + @Override + public ConsumerAndRecords initialValue() { + return createConsumerAndRecords(); + } + }; @Override public void start() { - logger.info("Starting Kafka Channel: {}", getName()); - producer = new KafkaProducer<String, byte[]>(producerProps); - // We always have just one topic being read by one thread - logger.info("Topic = {}", topic.get()); - counter.start(); - super.start(); + logger.info("Starting Kafka Channel: {}", getName()); + producer = new KafkaProducer<String, byte[]>(producerProps); + // We always have just one topic being read by one thread + logger.info("Topic = {}", topic.get()); + counter.start(); + super.start(); } @Override @@ -185,17 +185,19 @@ public class KafkaChannel extends BasicChannelSemantics { throw new ConfigurationException("Bootstrap Servers must be specified"); } else { ctx.put(BOOTSTRAP_SERVERS_CONFIG, brokerList); - logger.warn("{} is deprecated. Please use the parameter {}", BROKER_LIST_FLUME_KEY, BOOTSTRAP_SERVERS_CONFIG); + logger.warn("{} is deprecated. Please use the parameter {}", + BROKER_LIST_FLUME_KEY, BOOTSTRAP_SERVERS_CONFIG); } } //GroupId // If there is an old Group Id set, then use that if no groupId is set. if (!(ctx.containsKey(KAFKA_CONSUMER_PREFIX + ConsumerConfig.GROUP_ID_CONFIG))) { - String oldGroupId = ctx.getString(GROUP_ID_FLUME); - if ( oldGroupId != null && !oldGroupId.isEmpty()) { + String oldGroupId = ctx.getString(GROUP_ID_FLUME); + if (oldGroupId != null && !oldGroupId.isEmpty()) { ctx.put(KAFKA_CONSUMER_PREFIX + ConsumerConfig.GROUP_ID_CONFIG, oldGroupId); - logger.warn("{} is deprecated. Please use the parameter {}", GROUP_ID_FLUME, KAFKA_CONSUMER_PREFIX + ConsumerConfig.GROUP_ID_CONFIG); + logger.warn("{} is deprecated. Please use the parameter {}", + GROUP_ID_FLUME, KAFKA_CONSUMER_PREFIX + ConsumerConfig.GROUP_ID_CONFIG); } } @@ -209,7 +211,9 @@ public class KafkaChannel extends BasicChannelSemantics { auto = "latest"; } ctx.put(KAFKA_CONSUMER_PREFIX + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,auto); - logger.warn("{} is deprecated. Please use the parameter {}", READ_SMALLEST_OFFSET,KAFKA_CONSUMER_PREFIX + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG); + logger.warn("{} is deprecated. Please use the parameter {}", + READ_SMALLEST_OFFSET, + KAFKA_CONSUMER_PREFIX + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG); } } @@ -249,15 +253,17 @@ public class KafkaChannel extends BasicChannelSemantics { logger.info(consumerProps.toString()); } - protected Properties getConsumerProps() { return consumerProps; } - + protected Properties getConsumerProps() { + return consumerProps; + } private synchronized ConsumerAndRecords createConsumerAndRecords() { try { KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<String, byte[]>(consumerProps); ConsumerAndRecords car = new ConsumerAndRecords(consumer, channelUUID); logger.info("Created new consumer to connect to Kafka"); - car.consumer.subscribe(Arrays.asList(topic.get()), new ChannelRebalanceListener(rebalanceFlag)); + car.consumer.subscribe(Arrays.asList(topic.get()), + new ChannelRebalanceListener(rebalanceFlag)); car.offsets = new HashMap<TopicPartition, OffsetAndMetadata>(); consumers.add(car); return car; @@ -286,14 +292,14 @@ public class KafkaChannel extends BasicChannelSemantics { NONE } - private class KafkaTransaction extends BasicTransactionSemantics { private TransactionType type = TransactionType.NONE; private Optional<ByteArrayOutputStream> tempOutStream = Optional .absent(); // For put transactions, serialize the events and hold them until the commit goes is requested. - private Optional<LinkedList<ProducerRecord<String, byte[]>>> producerRecords = Optional.absent(); + private Optional<LinkedList<ProducerRecord<String, byte[]>>> producerRecords = + Optional.absent(); // For take transactions, deserialize and hold them till commit goes through private Optional<LinkedList<Event>> events = Optional.absent(); private Optional<SpecificDatumWriter<AvroFlumeEvent>> writer = @@ -323,8 +329,9 @@ public class KafkaChannel extends BasicChannelSemantics { } String key = event.getHeaders().get(KEY_HEADER); try { - producerRecords.get().add(new ProducerRecord<String, byte[]> - (topic.get(), key, serializeValue(event, parseAsFlumeEvent))); + producerRecords.get().add( + new ProducerRecord<String, byte[]>(topic.get(), key, + serializeValue(event, parseAsFlumeEvent))); } catch (Exception e) { throw new ChannelException("Error while serializing event", e); } @@ -382,7 +389,8 @@ public class KafkaChannel extends BasicChannelSemantics { } if (logger.isDebugEnabled()) { - logger.debug("Processed output from partition {} offset {}", record.partition(), record.offset()); + logger.debug("Processed output from partition {} offset {}", + record.partition(), record.offset()); } long endTime = System.nanoTime(); @@ -391,10 +399,10 @@ public class KafkaChannel extends BasicChannelSemantics { return null; } } catch (Exception ex) { - logger.warn("Error while getting events from Kafka. This is usually caused by trying to read " + - "a non-flume event. Ensure the setting for parseAsFlumeEvent is correct", ex); - throw new ChannelException("Error while getting events from Kafka", - ex); + logger.warn("Error while getting events from Kafka. This is usually caused by " + + "trying to read a non-flume event. Ensure the setting for " + + "parseAsFlumeEvent is correct", ex); + throw new ChannelException("Error while getting events from Kafka", ex); } } eventTaken = true; @@ -564,8 +572,9 @@ public class KafkaChannel extends BasicChannelSemantics { StringBuilder sb = new StringBuilder(); for (TopicPartition tp : this.consumer.assignment()) { try { - sb.append("Committed: [").append(tp).append(",").append(this.consumer.committed(tp).offset()) - .append(",").append(this.consumer.committed(tp).metadata()).append("]"); + sb.append("Committed: [").append(tp).append(",") + .append(this.consumer.committed(tp).offset()) + .append(",").append(this.consumer.committed(tp).metadata()).append("]"); if (logger.isDebugEnabled()) { logger.debug(sb.toString()); } @@ -596,8 +605,8 @@ class ChannelCallback implements Callback { } if (log.isDebugEnabled()) { long batchElapsedTime = System.currentTimeMillis() - startTime; - log.debug("Acked message_no " + index + ": " + metadata.topic() + "-" + metadata.partition() + "-" + - metadata.offset() + "-" + batchElapsedTime); + log.debug("Acked message_no " + index + ": " + metadata.topic() + "-" + + metadata.partition() + "-" + metadata.offset() + "-" + batchElapsedTime); } } } http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannelConfiguration.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannelConfiguration.java b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannelConfiguration.java index faf46b6..ccf46d9 100644 --- a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannelConfiguration.java +++ b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannelConfiguration.java @@ -26,12 +26,17 @@ public class KafkaChannelConfiguration { public static final String KAFKA_CONSUMER_PREFIX = KAFKA_PREFIX + "consumer."; public static final String KAFKA_PRODUCER_PREFIX = KAFKA_PREFIX + "producer."; public static final String DEFAULT_ACKS = "all"; - public static final String DEFAULT_KEY_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer"; - public static final String DEFAULT_VALUE_SERIAIZER = "org.apache.kafka.common.serialization.ByteArraySerializer"; - public static final String DEFAULT_KEY_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer"; - public static final String DEFAULT_VALUE_DESERIAIZER = "org.apache.kafka.common.serialization.ByteArrayDeserializer"; + public static final String DEFAULT_KEY_SERIALIZER = + "org.apache.kafka.common.serialization.StringSerializer"; + public static final String DEFAULT_VALUE_SERIAIZER = + "org.apache.kafka.common.serialization.ByteArraySerializer"; + public static final String DEFAULT_KEY_DESERIALIZER = + "org.apache.kafka.common.serialization.StringDeserializer"; + public static final String DEFAULT_VALUE_DESERIAIZER = + "org.apache.kafka.common.serialization.ByteArrayDeserializer"; public static final String TOPIC_CONFIG = KAFKA_PREFIX + "topic"; - public static final String BOOTSTRAP_SERVERS_CONFIG = KAFKA_PREFIX + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG; + public static final String BOOTSTRAP_SERVERS_CONFIG = + KAFKA_PREFIX + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG; public static final String DEFAULT_TOPIC = "flume-channel"; public static final String DEFAULT_GROUP_ID = "flume"; public static final String POLL_TIMEOUT = KAFKA_PREFIX + "pollTimeout"; http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-spillable-memory-channel/src/main/java/org/apache/flume/channel/SpillableMemoryChannel.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-spillable-memory-channel/src/main/java/org/apache/flume/channel/SpillableMemoryChannel.java b/flume-ng-channels/flume-spillable-memory-channel/src/main/java/org/apache/flume/channel/SpillableMemoryChannel.java index bdf42cd..b46d646 100644 --- a/flume-ng-channels/flume-spillable-memory-channel/src/main/java/org/apache/flume/channel/SpillableMemoryChannel.java +++ b/flume-ng-channels/flume-spillable-memory-channel/src/main/java/org/apache/flume/channel/SpillableMemoryChannel.java @@ -18,26 +18,26 @@ */ package org.apache.flume.channel; -import java.util.ArrayDeque; -import java.util.concurrent.Semaphore; -import java.util.concurrent.TimeUnit; - -import javax.annotation.concurrent.GuardedBy; - import com.google.common.annotations.VisibleForTesting; -import org.apache.flume.*; -import org.apache.flume.annotations.Recyclable; - +import com.google.common.base.Preconditions; +import org.apache.flume.ChannelException; +import org.apache.flume.ChannelFullException; +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.flume.Transaction; import org.apache.flume.annotations.InterfaceAudience; import org.apache.flume.annotations.InterfaceStability; +import org.apache.flume.annotations.Recyclable; import org.apache.flume.channel.file.FileChannel; import org.apache.flume.instrumentation.ChannelCounter; - import org.apache.flume.lifecycle.LifecycleState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Preconditions; +import javax.annotation.concurrent.GuardedBy; +import java.util.ArrayDeque; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; /** * <p> @@ -50,28 +50,45 @@ import com.google.common.base.Preconditions; @Recyclable public class SpillableMemoryChannel extends FileChannel { // config settings - /** Max number of events to be stored in memory */ + /** + * Max number of events to be stored in memory + */ public static final String MEMORY_CAPACITY = "memoryCapacity"; - /** Seconds to wait before enabling disk overflow when memory fills up */ + /** + * Seconds to wait before enabling disk overflow when memory fills up + */ public static final String OVERFLOW_TIMEOUT = "overflowTimeout"; - /** Internal use only. To remain undocumented in User guide. Determines the + /** + * Internal use only. To remain undocumented in User guide. Determines the * percent free space available in mem queue when we stop spilling to overflow */ public static final String OVERFLOW_DEACTIVATION_THRESHOLD - = "overflowDeactivationThreshold"; - /** percent of buffer between byteCapacity and the estimated event size. */ + = "overflowDeactivationThreshold"; + /** + * percent of buffer between byteCapacity and the estimated event size. + */ public static final String BYTE_CAPACITY_BUFFER_PERCENTAGE - = "byteCapacityBufferPercentage"; + = "byteCapacityBufferPercentage"; - /** max number of bytes used for all events in the queue. */ + /** + * max number of bytes used for all events in the queue. + */ public static final String BYTE_CAPACITY = "byteCapacity"; - /** max number of events in overflow. */ + /** + * max number of events in overflow. + */ public static final String OVERFLOW_CAPACITY = "overflowCapacity"; - /** file channel setting that is overriden by Spillable Channel */ + /** + * file channel setting that is overriden by Spillable Channel + */ public static final String KEEP_ALIVE = "keep-alive"; - /** file channel capacity overridden by Spillable Channel */ + /** + * file channel capacity overridden by Spillable Channel + */ public static final String CAPACITY = "capacity"; - /** Estimated average size of events expected to be in the channel */ + /** + * Estimated average size of events expected to be in the channel + */ public static final String AVG_EVENT_SIZE = "avgEventSize"; private static Logger LOGGER = LoggerFactory.getLogger(SpillableMemoryChannel.class); @@ -84,7 +101,7 @@ public class SpillableMemoryChannel extends FileChannel { // memory consumption control private static final int defaultAvgEventSize = 500; private static final Long defaultByteCapacity - = (long)(Runtime.getRuntime().maxMemory() * .80); + = (long) (Runtime.getRuntime().maxMemory() * .80); private static final int defaultByteCapacityBufferPercentage = 20; private volatile int byteCapacity; @@ -94,7 +111,7 @@ public class SpillableMemoryChannel extends FileChannel { private Semaphore bytesRemaining; // for synchronizing access to primary/overflow channels & drain order - final private Object queueLock = new Object(); + private final Object queueLock = new Object(); @GuardedBy(value = "queueLock") public ArrayDeque<Event> memQueue; @@ -109,8 +126,10 @@ public class SpillableMemoryChannel extends FileChannel { private int maxMemQueueSize = 0; // max sie of memory Queue - private boolean overflowDisabled; // if true indicates the overflow should not be used at all. - private boolean overflowActivated=false; // indicates if overflow can be used. invariant: false if overflowDisabled is true. + private boolean overflowDisabled; // if true indicates the overflow should not be used at all. + + // indicates if overflow can be used. invariant: false if overflowDisabled is true. + private boolean overflowActivated = false; // if true overflow can be used. invariant: false if overflowDisabled is true. private int memoryCapacity = -1; // max events that the channel can hold in memory @@ -120,7 +139,7 @@ public class SpillableMemoryChannel extends FileChannel { // mem full % at which we stop spill to overflow private double overflowDeactivationThreshold - = defaultOverflowDeactivationThreshold / 100; + = defaultOverflowDeactivationThreshold / 100; public SpillableMemoryChannel() { super(); @@ -133,6 +152,7 @@ public class SpillableMemoryChannel extends FileChannel { public int getMemoryCapacity() { return memoryCapacity; } + public int getOverflowTimeout() { return overflowTimeout; } @@ -160,7 +180,6 @@ public class SpillableMemoryChannel extends FileChannel { } } - private static class MutableInteger { private int value; @@ -186,7 +205,7 @@ public class SpillableMemoryChannel extends FileChannel { public int totalPuts = 0; // for debugging only private long overflowCounter = 0; // # of items in overflow channel - public String dump() { + public String dump() { StringBuilder sb = new StringBuilder(); sb.append(" [ "); @@ -195,12 +214,12 @@ public class SpillableMemoryChannel extends FileChannel { sb.append(" "); } sb.append("]"); - return sb.toString(); + return sb.toString(); } public void putPrimary(Integer eventCount) { totalPuts += eventCount; - if ( (queue.peekLast() == null) || queue.getLast().intValue() < 0) { + if ((queue.peekLast() == null) || queue.getLast().intValue() < 0) { queue.addLast(new MutableInteger(eventCount)); } else { queue.getLast().add(eventCount); @@ -208,7 +227,7 @@ public class SpillableMemoryChannel extends FileChannel { } public void putFirstPrimary(Integer eventCount) { - if ( (queue.peekFirst() == null) || queue.getFirst().intValue() < 0) { + if ((queue.peekFirst() == null) || queue.getFirst().intValue() < 0) { queue.addFirst(new MutableInteger(eventCount)); } else { queue.getFirst().add(eventCount); @@ -217,7 +236,7 @@ public class SpillableMemoryChannel extends FileChannel { public void putOverflow(Integer eventCount) { totalPuts += eventCount; - if ( (queue.peekLast() == null) || queue.getLast().intValue() > 0) { + if ((queue.peekLast() == null) || queue.getLast().intValue() > 0) { queue.addLast(new MutableInteger(-eventCount)); } else { queue.getLast().add(-eventCount); @@ -226,9 +245,9 @@ public class SpillableMemoryChannel extends FileChannel { } public void putFirstOverflow(Integer eventCount) { - if ( (queue.peekFirst() == null) || queue.getFirst().intValue() > 0) { + if ((queue.peekFirst() == null) || queue.getFirst().intValue() > 0) { queue.addFirst(new MutableInteger(-eventCount)); - } else { + } else { queue.getFirst().add(-eventCount); } overflowCounter += eventCount; @@ -247,9 +266,9 @@ public class SpillableMemoryChannel extends FileChannel { // this condition is optimization to avoid redundant conversions of // int -> Integer -> string in hot path - if (headValue.intValue() < takeCount) { + if (headValue.intValue() < takeCount) { throw new IllegalStateException("Cannot take " + takeCount + - " from " + headValue.intValue() + " in DrainOrder Queue"); + " from " + headValue.intValue() + " in DrainOrder Queue"); } headValue.add(-takeCount); @@ -260,9 +279,9 @@ public class SpillableMemoryChannel extends FileChannel { public void takeOverflow(int takeCount) { MutableInteger headValue = queue.getFirst(); - if(headValue.intValue() > -takeCount) { + if (headValue.intValue() > -takeCount) { throw new IllegalStateException("Cannot take " + takeCount + " from " - + headValue.intValue() + " in DrainOrder Queue head " ); + + headValue.intValue() + " in DrainOrder Queue head "); } headValue.add(takeCount); @@ -293,7 +312,6 @@ public class SpillableMemoryChannel extends FileChannel { ArrayDeque<Event> putList; private final ChannelCounter channelCounter; - public SpillableMemoryTransaction(ChannelCounter counter) { takeList = new ArrayDeque<Event>(largestTakeTxSize); putList = new ArrayDeque<Event>(largestPutTxSize); @@ -307,26 +325,25 @@ public class SpillableMemoryChannel extends FileChannel { @Override public void close() { - if (overflowTakeTx!=null) { + if (overflowTakeTx != null) { overflowTakeTx.close(); } - if (overflowPutTx!=null) { + if (overflowPutTx != null) { overflowPutTx.close(); } super.close(); } - @Override protected void doPut(Event event) throws InterruptedException { channelCounter.incrementEventPutAttemptCount(); putCalled = true; - int eventByteSize = (int)Math.ceil(estimateEventSize(event)/ avgEventSize); + int eventByteSize = (int) Math.ceil(estimateEventSize(event) / avgEventSize); if (!putList.offer(event)) { throw new ChannelFullException("Put queue in " + getName() + - " channel's Transaction having capacity " + putList.size() + - " full, consider reducing batch size of sources"); + " channel's Transaction having capacity " + putList.size() + + " full, consider reducing batch size of sources"); } putListByteCount += eventByteSize; } @@ -344,7 +361,7 @@ public class SpillableMemoryChannel extends FileChannel { boolean takeSuceeded = false; try { Event event; - synchronized(queueLock) { + synchronized (queueLock) { int drainOrderTop = drainOrder.front(); if (!takeCalled) { @@ -375,11 +392,11 @@ public class SpillableMemoryChannel extends FileChannel { ++takeCount; drainOrder.takePrimary(1); Preconditions.checkNotNull(event, "Queue.poll returned NULL despite" - + " semaphore signalling existence of entry"); + + " semaphore signalling existence of entry"); } } - int eventByteSize = (int)Math.ceil(estimateEventSize(event)/ avgEventSize); + int eventByteSize = (int) Math.ceil(estimateEventSize(event) / avgEventSize); if (!useOverflow) { // takeList is thd pvt, so no need to do this in synchronized block takeList.offer(event); @@ -389,7 +406,7 @@ public class SpillableMemoryChannel extends FileChannel { takeSuceeded = true; return event; } finally { - if(!takeSuceeded) { + if (!takeSuceeded) { totalStored.release(); } } @@ -400,37 +417,35 @@ public class SpillableMemoryChannel extends FileChannel { if (putCalled) { putCommit(); if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Put Committed. Drain Order Queue state : " - + drainOrder.dump()); + LOGGER.debug("Put Committed. Drain Order Queue state : " + drainOrder.dump()); } } else if (takeCalled) { takeCommit(); if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Take Committed. Drain Order Queue state : " - + drainOrder.dump()); + LOGGER.debug("Take Committed. Drain Order Queue state : " + drainOrder.dump()); } } } private void takeCommit() { - if (takeCount > largestTakeTxSize) + if (takeCount > largestTakeTxSize) { largestTakeTxSize = takeCount; + } synchronized (queueLock) { - if (overflowTakeTx!=null) { + if (overflowTakeTx != null) { overflowTakeTx.commit(); } - double memoryPercentFree = (memoryCapacity == 0) ? 0 - : (memoryCapacity - memQueue.size() + takeCount ) / (double)memoryCapacity ; + double memoryPercentFree = (memoryCapacity == 0) ? 0 : + (memoryCapacity - memQueue.size() + takeCount) / (double) memoryCapacity; - if (overflowActivated - && memoryPercentFree >= overflowDeactivationThreshold) { + if (overflowActivated && memoryPercentFree >= overflowDeactivationThreshold) { overflowActivated = false; LOGGER.info("Overflow Deactivated"); } channelCounter.setChannelSize(getTotalStored()); } - if (!useOverflow) { + if (!useOverflow) { memQueRemaining.release(takeCount); bytesRemaining.release(takeListByteCount); } @@ -440,30 +455,29 @@ public class SpillableMemoryChannel extends FileChannel { private void putCommit() throws InterruptedException { // decide if overflow needs to be used - int timeout = overflowActivated ? 0 : overflowTimeout; + int timeout = overflowActivated ? 0 : overflowTimeout; if (memoryCapacity != 0) { // check for enough event slots(memoryCapacity) for using memory queue if (!memQueRemaining.tryAcquire(putList.size(), timeout, - TimeUnit.SECONDS)) { + TimeUnit.SECONDS)) { if (overflowDisabled) { throw new ChannelFullException("Spillable Memory Channel's " + - "memory capacity has been reached and overflow is " + - "disabled. Consider increasing memoryCapacity."); + "memory capacity has been reached and overflow is " + + "disabled. Consider increasing memoryCapacity."); } overflowActivated = true; useOverflow = true; - } // check if we have enough byteCapacity for using memory queue - else if (!bytesRemaining.tryAcquire(putListByteCount, overflowTimeout - , TimeUnit.SECONDS)) { + } else if (!bytesRemaining.tryAcquire(putListByteCount, + overflowTimeout, TimeUnit.SECONDS)) { memQueRemaining.release(putList.size()); if (overflowDisabled) { throw new ChannelFullException("Spillable Memory Channel's " - + "memory capacity has been reached. " - + (bytesRemaining.availablePermits() * (int) avgEventSize) - + " bytes are free and overflow is disabled. Consider " - + "increasing byteCapacity or capacity."); + + "memory capacity has been reached. " + + (bytesRemaining.availablePermits() * (int) avgEventSize) + + " bytes are free and overflow is disabled. Consider " + + "increasing byteCapacity or capacity."); } overflowActivated = true; useOverflow = true; @@ -496,22 +510,21 @@ public class SpillableMemoryChannel extends FileChannel { } private void commitPutsToOverflow_core(Transaction overflowPutTx) - throws InterruptedException { + throws InterruptedException { // reattempt only once if overflow is full first time around - for (int i = 0; i < 2; ++i) { + for (int i = 0; i < 2; ++i) { try { - synchronized(queueLock) { + synchronized (queueLock) { overflowPutTx.commit(); drainOrder.putOverflow(putList.size()); channelCounter.setChannelSize(memQueue.size() - + drainOrder.overflowCounter); + + drainOrder.overflowCounter); break; } - } catch (ChannelFullException e) { // drop lock & reattempt - if (i==0) { - Thread.sleep(overflowTimeout *1000); - } - else { + } catch (ChannelFullException e) { // drop lock & reattempt + if (i == 0) { + Thread.sleep(overflowTimeout * 1000); + } else { throw e; } } @@ -523,14 +536,14 @@ public class SpillableMemoryChannel extends FileChannel { for (Event e : putList) { if (!memQueue.offer(e)) { throw new ChannelException("Unable to insert event into memory " + - "queue in spite of spare capacity, this is very unexpected"); + "queue in spite of spare capacity, this is very unexpected"); } } drainOrder.putPrimary(putList.size()); - maxMemQueueSize = (memQueue.size() > maxMemQueueSize) ? memQueue.size() - : maxMemQueueSize; + maxMemQueueSize = (memQueue.size() > maxMemQueueSize) ? memQueue.size() + : maxMemQueueSize; channelCounter.setChannelSize(memQueue.size() - + drainOrder.overflowCounter); + + drainOrder.overflowCounter); } // update counters and semaphores totalStored.release(putList.size()); @@ -540,10 +553,10 @@ public class SpillableMemoryChannel extends FileChannel { @Override protected void doRollback() { LOGGER.debug("Rollback() of " + - (takeCalled ? " Take Tx" : (putCalled ? " Put Tx" : "Empty Tx"))); + (takeCalled ? " Take Tx" : (putCalled ? " Put Tx" : "Empty Tx"))); if (putCalled) { - if (overflowPutTx!=null) { + if (overflowPutTx != null) { overflowPutTx.rollback(); } if (!useOverflow) { @@ -552,8 +565,8 @@ public class SpillableMemoryChannel extends FileChannel { } putListByteCount = 0; } else if (takeCalled) { - synchronized(queueLock) { - if (overflowTakeTx!=null) { + synchronized (queueLock) { + if (overflowTakeTx != null) { overflowTakeTx.rollback(); } if (useOverflow) { @@ -561,8 +574,8 @@ public class SpillableMemoryChannel extends FileChannel { } else { int remainingCapacity = memoryCapacity - memQueue.size(); Preconditions.checkState(remainingCapacity >= takeCount, - "Not enough space in memory queue to rollback takes. This" + - " should never happen, please report"); + "Not enough space in memory queue to rollback takes. This" + + " should never happen, please report"); while (!takeList.isEmpty()) { memQueue.addFirst(takeList.removeLast()); } @@ -582,15 +595,18 @@ public class SpillableMemoryChannel extends FileChannel { * <li>memoryCapacity = total number of events allowed at one time in the memory queue. * <li>overflowCapacity = total number of events allowed at one time in the overflow file channel. * <li>byteCapacity = the max number of bytes used for events in the memory queue. - * <li>byteCapacityBufferPercentage = type int. Defines the percent of buffer between byteCapacity and the estimated event size. - * <li>overflowTimeout = type int. Number of seconds to wait on a full memory before deciding to enable overflow + * <li>byteCapacityBufferPercentage = type int. Defines the percent of buffer between byteCapacity + * and the estimated event size. + * <li>overflowTimeout = type int. Number of seconds to wait on a full memory before deciding to + * enable overflow */ @Override public void configure(Context context) { - if (getLifecycleState() == LifecycleState.START // does not support reconfig when running - || getLifecycleState() == LifecycleState.ERROR) + if (getLifecycleState() == LifecycleState.START || // does not support reconfig when running + getLifecycleState() == LifecycleState.ERROR) { stop(); + } if (totalStored == null) { totalStored = new Semaphore(0); @@ -603,8 +619,7 @@ public class SpillableMemoryChannel extends FileChannel { // 1) Memory Capacity Integer newMemoryCapacity; try { - newMemoryCapacity = context.getInteger(MEMORY_CAPACITY - , defaultMemoryCapacity); + newMemoryCapacity = context.getInteger(MEMORY_CAPACITY, defaultMemoryCapacity); if (newMemoryCapacity == null) { newMemoryCapacity = defaultMemoryCapacity; } @@ -612,7 +627,7 @@ public class SpillableMemoryChannel extends FileChannel { throw new NumberFormatException(MEMORY_CAPACITY + " must be >= 0"); } - } catch(NumberFormatException e) { + } catch (NumberFormatException e) { newMemoryCapacity = defaultMemoryCapacity; LOGGER.warn("Invalid " + MEMORY_CAPACITY + " specified, initializing " + getName() + " channel to default value of {}", defaultMemoryCapacity); @@ -626,60 +641,60 @@ public class SpillableMemoryChannel extends FileChannel { // overflowTimeout - wait time before switching to overflow when mem is full try { Integer newOverflowTimeout = - context.getInteger(OVERFLOW_TIMEOUT, defaultOverflowTimeout); + context.getInteger(OVERFLOW_TIMEOUT, defaultOverflowTimeout); overflowTimeout = (newOverflowTimeout != null) ? newOverflowTimeout - : defaultOverflowTimeout; - } catch(NumberFormatException e) { + : defaultOverflowTimeout; + } catch (NumberFormatException e) { LOGGER.warn("Incorrect value for " + getName() + "'s " + OVERFLOW_TIMEOUT - + " setting. Using default value {}", defaultOverflowTimeout); + + " setting. Using default value {}", defaultOverflowTimeout); overflowTimeout = defaultOverflowTimeout; } try { Integer newThreshold = context.getInteger(OVERFLOW_DEACTIVATION_THRESHOLD); - overflowDeactivationThreshold = (newThreshold != null) ? - newThreshold/100.0 - : defaultOverflowDeactivationThreshold / 100.0; - } catch(NumberFormatException e) { + overflowDeactivationThreshold = (newThreshold != null) ? + newThreshold / 100.0 + : defaultOverflowDeactivationThreshold / 100.0; + } catch (NumberFormatException e) { LOGGER.warn("Incorrect value for " + getName() + "'s " + OVERFLOW_DEACTIVATION_THRESHOLD + ". Using default value {} %", - defaultOverflowDeactivationThreshold); + defaultOverflowDeactivationThreshold); overflowDeactivationThreshold = defaultOverflowDeactivationThreshold / 100.0; } // 3) Memory consumption control try { byteCapacityBufferPercentage = - context.getInteger(BYTE_CAPACITY_BUFFER_PERCENTAGE - , defaultByteCapacityBufferPercentage); - } catch(NumberFormatException e) { + context.getInteger(BYTE_CAPACITY_BUFFER_PERCENTAGE, defaultByteCapacityBufferPercentage); + } catch (NumberFormatException e) { LOGGER.warn("Error parsing " + BYTE_CAPACITY_BUFFER_PERCENTAGE + " for " - + getName() + ". Using default=" - + defaultByteCapacityBufferPercentage + ". " + e.getMessage()); + + getName() + ". Using default=" + + defaultByteCapacityBufferPercentage + ". " + e.getMessage()); byteCapacityBufferPercentage = defaultByteCapacityBufferPercentage; } try { avgEventSize = context.getInteger(AVG_EVENT_SIZE, defaultAvgEventSize); - } catch ( NumberFormatException e) { + } catch (NumberFormatException e) { LOGGER.warn("Error parsing " + AVG_EVENT_SIZE + " for " + getName() - + ". Using default = " + defaultAvgEventSize + ". " - + e.getMessage()); + + ". Using default = " + defaultAvgEventSize + ". " + + e.getMessage()); avgEventSize = defaultAvgEventSize; } try { - byteCapacity = (int) ((context.getLong(BYTE_CAPACITY, defaultByteCapacity) * (1 - byteCapacityBufferPercentage * .01 )) / avgEventSize); + byteCapacity = (int) ((context.getLong(BYTE_CAPACITY, defaultByteCapacity) * + (1 - byteCapacityBufferPercentage * .01)) / avgEventSize); if (byteCapacity < 1) { byteCapacity = Integer.MAX_VALUE; } - } catch(NumberFormatException e) { + } catch (NumberFormatException e) { LOGGER.warn("Error parsing " + BYTE_CAPACITY + " setting for " + getName() - + ". Using default = " + defaultByteCapacity + ". " - + e.getMessage()); + + ". Using default = " + defaultByteCapacity + ". " + + e.getMessage()); byteCapacity = (int) - ( (defaultByteCapacity * (1 - byteCapacityBufferPercentage * .01 )) - / avgEventSize); + ((defaultByteCapacity * (1 - byteCapacityBufferPercentage * .01)) + / avgEventSize); } @@ -692,8 +707,10 @@ public class SpillableMemoryChannel extends FileChannel { lastByteCapacity = byteCapacity; } else { try { - if (!bytesRemaining.tryAcquire(lastByteCapacity - byteCapacity, overflowTimeout, TimeUnit.SECONDS)) { - LOGGER.warn("Couldn't acquire permits to downsize the byte capacity, resizing has been aborted"); + if (!bytesRemaining.tryAcquire(lastByteCapacity - byteCapacity, + overflowTimeout, TimeUnit.SECONDS)) { + LOGGER.warn("Couldn't acquire permits to downsize the byte capacity, " + + "resizing has been aborted"); } else { lastByteCapacity = byteCapacity; } @@ -704,51 +721,53 @@ public class SpillableMemoryChannel extends FileChannel { } try { - overflowCapacity = context.getInteger(OVERFLOW_CAPACITY, defaultOverflowCapacity); // file channel capacity + // file channel capacity + overflowCapacity = context.getInteger(OVERFLOW_CAPACITY, defaultOverflowCapacity); // Determine if File channel needs to be disabled - if ( memoryCapacity < 1 && overflowCapacity < 1) { - LOGGER.warn("For channel " + getName() + OVERFLOW_CAPACITY + - " cannot be set to 0 if " + MEMORY_CAPACITY + " is also 0. " + - "Using default value " + OVERFLOW_CAPACITY + " = " + - defaultOverflowCapacity); - overflowCapacity = defaultOverflowCapacity; - } - overflowDisabled = (overflowCapacity < 1) ; - if (overflowDisabled) { - overflowActivated = false; - } - } catch(NumberFormatException e) { + if (memoryCapacity < 1 && overflowCapacity < 1) { + LOGGER.warn("For channel " + getName() + OVERFLOW_CAPACITY + + " cannot be set to 0 if " + MEMORY_CAPACITY + " is also 0. " + + "Using default value " + OVERFLOW_CAPACITY + " = " + + defaultOverflowCapacity); + overflowCapacity = defaultOverflowCapacity; + } + overflowDisabled = (overflowCapacity < 1); + if (overflowDisabled) { + overflowActivated = false; + } + } catch (NumberFormatException e) { overflowCapacity = defaultOverflowCapacity; } // Configure File channel - context.put(KEEP_ALIVE,"0"); // override keep-alive for File channel - context.put(CAPACITY, Integer.toString(overflowCapacity) ); // file channel capacity + context.put(KEEP_ALIVE, "0"); // override keep-alive for File channel + context.put(CAPACITY, Integer.toString(overflowCapacity)); // file channel capacity super.configure(context); } private void resizePrimaryQueue(int newMemoryCapacity) throws InterruptedException { - if (memQueue != null && memoryCapacity == newMemoryCapacity) { + if (memQueue != null && memoryCapacity == newMemoryCapacity) { return; } if (memoryCapacity > newMemoryCapacity) { int diff = memoryCapacity - newMemoryCapacity; if (!memQueRemaining.tryAcquire(diff, overflowTimeout, TimeUnit.SECONDS)) { - LOGGER.warn("Memory buffer currently contains more events than the new size. Downsizing has been aborted."); + LOGGER.warn("Memory buffer currently contains more events than the new size. " + + "Downsizing has been aborted."); return; } - synchronized(queueLock) { + synchronized (queueLock) { ArrayDeque<Event> newQueue = new ArrayDeque<Event>(newMemoryCapacity); newQueue.addAll(memQueue); memQueue = newQueue; memoryCapacity = newMemoryCapacity; } - } else { // if (memoryCapacity <= newMemoryCapacity) - synchronized(queueLock) { + } else { // if (memoryCapacity <= newMemoryCapacity) + synchronized (queueLock) { ArrayDeque<Event> newQueue = new ArrayDeque<Event>(newMemoryCapacity); - if (memQueue !=null) { + if (memQueue != null) { newQueue.addAll(memQueue); } memQueue = newQueue; @@ -771,14 +790,14 @@ public class SpillableMemoryChannel extends FileChannel { drainOrder.putOverflow(overFlowCount); totalStored.release(overFlowCount); } - int totalCount = overFlowCount + memQueue.size(); + int totalCount = overFlowCount + memQueue.size(); channelCounter.setChannelCapacity(memoryCapacity + getOverflowCapacity()); channelCounter.setChannelSize(totalCount); } @Override public synchronized void stop() { - if (getLifecycleState()==LifecycleState.STOP) { + if (getLifecycleState() == LifecycleState.STOP) { return; } channelCounter.setChannelSize(memQueue.size() + drainOrder.overflowCounter); http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/LoadBalancingLog4jAppender.java ---------------------------------------------------------------------- diff --git a/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/LoadBalancingLog4jAppender.java b/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/LoadBalancingLog4jAppender.java index 713234f..ae31916 100644 --- a/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/LoadBalancingLog4jAppender.java +++ b/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/LoadBalancingLog4jAppender.java @@ -99,11 +99,11 @@ public class LoadBalancingLog4jAppender extends Log4jAppender { @Override public synchronized void append(LoggingEvent event) { - if(!configured) { + if (!configured) { String errorMsg = "Flume Log4jAppender not configured correctly! Cannot" + - " send events to Flume."; + " send events to Flume."; LogLog.error(errorMsg); - if(getUnsafeMode()) { + if (getUnsafeMode()) { return; } throw new FlumeException(errorMsg); @@ -121,10 +121,9 @@ public class LoadBalancingLog4jAppender extends Log4jAppender { @Override public void activateOptions() throws FlumeException { try { - final Properties properties = getProperties(hosts, selector, - maxBackoff, getTimeout()); + final Properties properties = getProperties(hosts, selector, maxBackoff, getTimeout()); rpcClient = RpcClientFactory.getInstance(properties); - if(layout != null) { + if (layout != null) { layout.activateOptions(); } configured = true; @@ -169,14 +168,13 @@ public class LoadBalancingLog4jAppender extends Log4jAppender { throw new FlumeException( "Misconfigured max backoff, value must be greater than 0"); } - props.put(RpcClientConfigurationConstants.CONFIG_BACKOFF, - String.valueOf(true)); + props.put(RpcClientConfigurationConstants.CONFIG_BACKOFF, String.valueOf(true)); props.put(RpcClientConfigurationConstants.CONFIG_MAX_BACKOFF, maxBackoff); } props.setProperty(RpcClientConfigurationConstants.CONFIG_CONNECT_TIMEOUT, - String.valueOf(timeout)); + String.valueOf(timeout)); props.setProperty(RpcClientConfigurationConstants.CONFIG_REQUEST_TIMEOUT, - String.valueOf(timeout)); + String.valueOf(timeout)); return props; } } http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/Log4jAppender.java ---------------------------------------------------------------------- diff --git a/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/Log4jAppender.java b/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/Log4jAppender.java index 7c483db..f9803e4 100644 --- a/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/Log4jAppender.java +++ b/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/Log4jAppender.java @@ -75,8 +75,7 @@ public class Log4jAppender extends AppenderSkeleton { private String hostname; private int port; private boolean unsafeMode = false; - private long timeout = RpcClientConfigurationConstants - .DEFAULT_REQUEST_TIMEOUT_MILLIS; + private long timeout = RpcClientConfigurationConstants.DEFAULT_REQUEST_TIMEOUT_MILLIS; private boolean avroReflectionEnabled; private String avroSchemaUrl; @@ -99,7 +98,7 @@ public class Log4jAppender extends AppenderSkeleton { * @param port The port to connect on the host. * */ - public Log4jAppender(String hostname, int port){ + public Log4jAppender(String hostname, int port) { this.hostname = hostname; this.port = port; } @@ -112,14 +111,14 @@ public class Log4jAppender extends AppenderSkeleton { * was a connection error. */ @Override - public synchronized void append(LoggingEvent event) throws FlumeException{ + public synchronized void append(LoggingEvent event) throws FlumeException { //If rpcClient is null, it means either this appender object was never //setup by setting hostname and port and then calling activateOptions //or this appender object was closed by calling close(), so we throw an //exception to show the appender is no longer accessible. if (rpcClient == null) { String errorMsg = "Cannot Append to Appender! Appender either closed or" + - " not setup correctly!"; + " not setup correctly!"; LogLog.error(errorMsg); if (unsafeMode) { return; @@ -127,7 +126,7 @@ public class Log4jAppender extends AppenderSkeleton { throw new FlumeException(errorMsg); } - if(!rpcClient.isActive()){ + if (!rpcClient.isActive()) { reconnect(); } @@ -231,7 +230,7 @@ public class Log4jAppender extends AppenderSkeleton { } else { String errorMsg = "Flume log4jappender already closed!"; LogLog.error(errorMsg); - if(unsafeMode) { + if (unsafeMode) { return; } throw new FlumeException(errorMsg); @@ -251,7 +250,7 @@ public class Log4jAppender extends AppenderSkeleton { * Set the first flume hop hostname. * @param hostname The first hop where the client should connect to. */ - public void setHostname(String hostname){ + public void setHostname(String hostname) { this.hostname = hostname; } @@ -259,7 +258,7 @@ public class Log4jAppender extends AppenderSkeleton { * Set the port on the hostname to connect to. * @param port The port to connect on the host. */ - public void setPort(int port){ + public void setPort(int port) { this.port = port; } @@ -299,19 +298,18 @@ public class Log4jAppender extends AppenderSkeleton { Properties props = new Properties(); props.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS, "h1"); props.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS_PREFIX + "h1", - hostname + ":" + port); + hostname + ":" + port); props.setProperty(RpcClientConfigurationConstants.CONFIG_CONNECT_TIMEOUT, - String.valueOf(timeout)); + String.valueOf(timeout)); props.setProperty(RpcClientConfigurationConstants.CONFIG_REQUEST_TIMEOUT, - String.valueOf(timeout)); + String.valueOf(timeout)); try { rpcClient = RpcClientFactory.getInstance(props); if (layout != null) { layout.activateOptions(); } } catch (FlumeException e) { - String errormsg = "RPC client creation failed! " + - e.getMessage(); + String errormsg = "RPC client creation failed! " + e.getMessage(); LogLog.error(errormsg); if (unsafeMode) { return; http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/Log4jAvroHeaders.java ---------------------------------------------------------------------- diff --git a/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/Log4jAvroHeaders.java b/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/Log4jAvroHeaders.java index b68e749..22983d3 100644 --- a/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/Log4jAvroHeaders.java +++ b/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/Log4jAvroHeaders.java @@ -30,24 +30,23 @@ public enum Log4jAvroHeaders { AVRO_SCHEMA_URL("flume.avro.schema.url"); private String headerName; - private Log4jAvroHeaders(String headerName){ + private Log4jAvroHeaders(String headerName) { this.headerName = headerName; } - public String getName(){ + public String getName() { return headerName; } - public String toString(){ + public String toString() { return getName(); } - public static Log4jAvroHeaders getByName(String headerName){ + public static Log4jAvroHeaders getByName(String headerName) { Log4jAvroHeaders hdrs = null; - try{ + try { hdrs = Log4jAvroHeaders.valueOf(headerName.toLowerCase(Locale.ENGLISH).trim()); - } - catch(IllegalArgumentException e){ + } catch (IllegalArgumentException e) { hdrs = Log4jAvroHeaders.OTHER; } return hdrs; http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-configuration/src/main/java/org/apache/flume/Context.java ---------------------------------------------------------------------- diff --git a/flume-ng-configuration/src/main/java/org/apache/flume/Context.java b/flume-ng-configuration/src/main/java/org/apache/flume/Context.java index c0460d2..f00b571 100644 --- a/flume-ng-configuration/src/main/java/org/apache/flume/Context.java +++ b/flume-ng-configuration/src/main/java/org/apache/flume/Context.java @@ -86,7 +86,7 @@ public class Context { Preconditions.checkArgument(prefix.endsWith("."), "The given prefix does not end with a period (" + prefix + ")"); Map<String, String> result = Maps.newHashMap(); - synchronized(parameters) { + synchronized (parameters) { for (String key : parameters.keySet()) { if (key.startsWith(prefix)) { String name = key.substring(prefix.length()); @@ -129,7 +129,7 @@ public class Context { */ public Boolean getBoolean(String key, Boolean defaultValue) { String value = get(key); - if(value != null) { + if (value != null) { return Boolean.parseBoolean(value.trim()); } return defaultValue; @@ -158,7 +158,7 @@ public class Context { */ public Integer getInteger(String key, Integer defaultValue) { String value = get(key); - if(value != null) { + if (value != null) { return Integer.parseInt(value.trim()); } return defaultValue; @@ -187,7 +187,7 @@ public class Context { */ public Long getLong(String key, Long defaultValue) { String value = get(key); - if(value != null) { + if (value != null) { return Long.parseLong(value.trim()); } return defaultValue; @@ -227,7 +227,7 @@ public class Context { } private String get(String key, String defaultValue) { String result = parameters.get(key); - if(result != null) { + if (result != null) { return result; } return defaultValue; http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-configuration/src/main/java/org/apache/flume/conf/BasicConfigurationConstants.java ---------------------------------------------------------------------- diff --git a/flume-ng-configuration/src/main/java/org/apache/flume/conf/BasicConfigurationConstants.java b/flume-ng-configuration/src/main/java/org/apache/flume/conf/BasicConfigurationConstants.java index d6aa33a..9089122 100644 --- a/flume-ng-configuration/src/main/java/org/apache/flume/conf/BasicConfigurationConstants.java +++ b/flume-ng-configuration/src/main/java/org/apache/flume/conf/BasicConfigurationConstants.java @@ -22,7 +22,6 @@ public final class BasicConfigurationConstants { public static final String CONFIG_SOURCES_PREFIX = CONFIG_SOURCES + "."; public static final String CONFIG_SOURCE_CHANNELSELECTOR_PREFIX = "selector."; - public static final String CONFIG_SINKS = "sinks"; public static final String CONFIG_SINKS_PREFIX = CONFIG_SINKS + "."; public static final String CONFIG_SINK_PROCESSOR_PREFIX = "processor."; http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-configuration/src/main/java/org/apache/flume/conf/ComponentConfiguration.java ---------------------------------------------------------------------- diff --git a/flume-ng-configuration/src/main/java/org/apache/flume/conf/ComponentConfiguration.java b/flume-ng-configuration/src/main/java/org/apache/flume/conf/ComponentConfiguration.java index 0e0614e..477a3e6 100644 --- a/flume-ng-configuration/src/main/java/org/apache/flume/conf/ComponentConfiguration.java +++ b/flume-ng-configuration/src/main/java/org/apache/flume/conf/ComponentConfiguration.java @@ -64,7 +64,7 @@ public abstract class ComponentConfiguration { failIfConfigured(); String confType = context.getString( BasicConfigurationConstants.CONFIG_TYPE); - if (confType != null && !confType.isEmpty()){ + if (confType != null && !confType.isEmpty()) { this.type = confType; } // Type can be set by child class constructors, so check if it was. @@ -74,12 +74,12 @@ public abstract class ComponentConfiguration { FlumeConfigurationErrorType.ATTRS_MISSING, ErrorOrWarning.ERROR)); throw new ConfigurationException( - "Component has no type. Cannot configure. "+ componentName); + "Component has no type. Cannot configure. " + componentName); } } protected void failIfConfigured() throws ConfigurationException { - if (configured){ + if (configured) { throw new ConfigurationException("Already configured component." + componentName); } @@ -134,12 +134,13 @@ public abstract class ComponentConfiguration { CHANNELSELECTOR("ChannelSelector"); private final String componentType; - private ComponentType(String type){ + + private ComponentType(String type) { componentType = type; } + public String getComponentType() { return componentType; } - } } http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-configuration/src/main/java/org/apache/flume/conf/ComponentConfigurationFactory.java ---------------------------------------------------------------------- diff --git a/flume-ng-configuration/src/main/java/org/apache/flume/conf/ComponentConfigurationFactory.java b/flume-ng-configuration/src/main/java/org/apache/flume/conf/ComponentConfigurationFactory.java index 0433c9c..16860c3 100644 --- a/flume-ng-configuration/src/main/java/org/apache/flume/conf/ComponentConfigurationFactory.java +++ b/flume-ng-configuration/src/main/java/org/apache/flume/conf/ComponentConfigurationFactory.java @@ -27,9 +27,9 @@ import org.apache.flume.conf.sink.SinkProcessorConfiguration.SinkProcessorConfig import org.apache.flume.conf.source.SourceConfiguration.SourceConfigurationType; public class ComponentConfigurationFactory { + @SuppressWarnings("unchecked") - public static ComponentConfiguration - create(String name, String type, ComponentType component) + public static ComponentConfiguration create(String name, String type, ComponentType component) throws ConfigurationException { Class<? extends ComponentConfiguration> confType = null; @@ -43,7 +43,7 @@ public class ComponentConfigurationFactory { } catch (Exception ignored) { try { type = type.toUpperCase(Locale.ENGLISH); - switch(component){ + switch (component) { case SOURCE: return SourceConfigurationType.valueOf(type.toUpperCase(Locale.ENGLISH)) .getConfiguration(name); @@ -63,8 +63,7 @@ public class ComponentConfigurationFactory { return new SinkGroupConfiguration(name); default: throw new ConfigurationException( - "Cannot create configuration. Unknown Type specified: " + - type); + "Cannot create configuration. Unknown Type specified: " + type); } } catch (ConfigurationException e) { throw e;
