http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-configuration/src/main/java/org/apache/flume/conf/FlumeConfiguration.java ---------------------------------------------------------------------- diff --git a/flume-ng-configuration/src/main/java/org/apache/flume/conf/FlumeConfiguration.java b/flume-ng-configuration/src/main/java/org/apache/flume/conf/FlumeConfiguration.java index 5997406..9b3a434 100644 --- a/flume-ng-configuration/src/main/java/org/apache/flume/conf/FlumeConfiguration.java +++ b/flume-ng-configuration/src/main/java/org/apache/flume/conf/FlumeConfiguration.java @@ -5,9 +5,9 @@ * licenses this file to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * <p> * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the @@ -16,6 +16,19 @@ */ package org.apache.flume.conf; +import org.apache.flume.Context; +import org.apache.flume.conf.ComponentConfiguration.ComponentType; +import org.apache.flume.conf.FlumeConfigurationError.ErrorOrWarning; +import org.apache.flume.conf.channel.ChannelConfiguration; +import org.apache.flume.conf.channel.ChannelType; +import org.apache.flume.conf.sink.SinkConfiguration; +import org.apache.flume.conf.sink.SinkGroupConfiguration; +import org.apache.flume.conf.sink.SinkType; +import org.apache.flume.conf.source.SourceConfiguration; +import org.apache.flume.conf.source.SourceType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -30,19 +43,6 @@ import java.util.Properties; import java.util.Set; import java.util.StringTokenizer; -import org.apache.flume.Context; -import org.apache.flume.conf.ComponentConfiguration.ComponentType; -import org.apache.flume.conf.FlumeConfigurationError.ErrorOrWarning; -import org.apache.flume.conf.channel.ChannelConfiguration; -import org.apache.flume.conf.channel.ChannelType; -import org.apache.flume.conf.sink.SinkConfiguration; -import org.apache.flume.conf.sink.SinkGroupConfiguration; -import org.apache.flume.conf.sink.SinkType; -import org.apache.flume.conf.source.SourceConfiguration; -import org.apache.flume.conf.source.SourceType; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - /** * <p> * FlumeConfiguration is an in memory representation of the hierarchical @@ -58,14 +58,13 @@ import org.slf4j.LoggerFactory; */ public class FlumeConfiguration { - private static final Logger logger = LoggerFactory - .getLogger(FlumeConfiguration.class); + private static final Logger logger = LoggerFactory.getLogger(FlumeConfiguration.class); private final Map<String, AgentConfiguration> agentConfigMap; private final LinkedList<FlumeConfigurationError> errors; - public static final String NEWLINE = System.getProperty("line.separator", - "\n"); + public static final String NEWLINE = System.getProperty("line.separator", "\n"); public static final String INDENTSTEP = " "; + /** * Creates a populated Flume Configuration object. * @deprecated please use the other constructor @@ -75,7 +74,7 @@ public class FlumeConfiguration { agentConfigMap = new HashMap<String, AgentConfiguration>(); errors = new LinkedList<FlumeConfigurationError>(); // Construct the in-memory component hierarchy - for(Object name : properties.keySet()) { + for (Object name : properties.keySet()) { Object value = properties.get(name); if (!addRawProperty(name.toString(), value.toString())) { logger.warn("Configuration property ignored: " + name + " = " + value); @@ -86,8 +85,8 @@ public class FlumeConfiguration { // validate and remove improperly configured components validateConfiguration(); - } + /** * Creates a populated Flume Configuration object. */ @@ -95,7 +94,7 @@ public class FlumeConfiguration { agentConfigMap = new HashMap<String, AgentConfiguration>(); errors = new LinkedList<FlumeConfigurationError>(); // Construct the in-memory component hierarchy - for(String name : properties.keySet()) { + for (String name : properties.keySet()) { String value = properties.get(name); if (!addRawProperty(name, value)) { @@ -146,18 +145,18 @@ public class FlumeConfiguration { // Null names and values not supported if (name == null || value == null) { errors - .add(new FlumeConfigurationError("", "", - FlumeConfigurationErrorType.AGENT_NAME_MISSING, - ErrorOrWarning.ERROR)); + .add(new FlumeConfigurationError("", "", + FlumeConfigurationErrorType.AGENT_NAME_MISSING, + ErrorOrWarning.ERROR)); return false; } // Empty values are not supported if (value.trim().length() == 0) { errors - .add(new FlumeConfigurationError(name, "", - FlumeConfigurationErrorType.PROPERTY_VALUE_NULL, - ErrorOrWarning.ERROR)); + .add(new FlumeConfigurationError(name, "", + FlumeConfigurationErrorType.PROPERTY_VALUE_NULL, + ErrorOrWarning.ERROR)); return false; } @@ -170,9 +169,9 @@ public class FlumeConfiguration { // All configuration keys must have a prefix defined as agent name if (index == -1) { errors - .add(new FlumeConfigurationError(name, "", - FlumeConfigurationErrorType.AGENT_NAME_MISSING, - ErrorOrWarning.ERROR)); + .add(new FlumeConfigurationError(name, "", + FlumeConfigurationErrorType.AGENT_NAME_MISSING, + ErrorOrWarning.ERROR)); return false; } @@ -181,9 +180,9 @@ public class FlumeConfiguration { // Agent name must be specified for all properties if (agentName.length() == 0) { errors - .add(new FlumeConfigurationError(name, "", - FlumeConfigurationErrorType.AGENT_NAME_MISSING, - ErrorOrWarning.ERROR)); + .add(new FlumeConfigurationError(name, "", + FlumeConfigurationErrorType.AGENT_NAME_MISSING, + ErrorOrWarning.ERROR)); return false; } @@ -192,9 +191,9 @@ public class FlumeConfiguration { // Configuration key must be specified for every property if (configKey.length() == 0) { errors - .add(new FlumeConfigurationError(name, "", - FlumeConfigurationErrorType.PROPERTY_NAME_NULL, - ErrorOrWarning.ERROR)); + .add(new FlumeConfigurationError(name, "", + FlumeConfigurationErrorType.PROPERTY_NAME_NULL, + ErrorOrWarning.ERROR)); return false; } @@ -236,7 +235,7 @@ public class FlumeConfiguration { private final List<FlumeConfigurationError> errorList; private AgentConfiguration(String agentName, - List<FlumeConfigurationError> errorList) { + List<FlumeConfigurationError> errorList) { this.agentName = agentName; this.errorList = errorList; sourceConfigMap = new HashMap<String, ComponentConfiguration>(); @@ -411,9 +410,6 @@ public class FlumeConfiguration { /** * If it is a known component it will do the full validation required for * that component, else it will do the validation required for that class. - * - * @param channelSet - * @return */ private Set<String> validateChannels(Set<String> channelSet) { Iterator<String> iter = channelSet.iterator(); @@ -445,13 +441,12 @@ public class FlumeConfiguration { if (channelContext != null) { // Get the configuration object for the channel: ChannelType chType = getKnownChannel(channelContext.getString( - BasicConfigurationConstants.CONFIG_TYPE)); + BasicConfigurationConstants.CONFIG_TYPE)); boolean configSpecified = false; String config = null; // Not a known channel - cannot do specific validation to this channel if (chType == null) { - config = channelContext.getString - (BasicConfigurationConstants.CONFIG_CONFIG); + config = channelContext.getString(BasicConfigurationConstants.CONFIG_CONFIG); if (config == null || config.isEmpty()) { config = "OTHER"; } else { @@ -470,14 +465,15 @@ public class FlumeConfiguration { if (conf != null) { conf.configure(channelContext); } - if((configSpecified && conf.isNotFoundConfigClass()) || - !configSpecified){ + if ((configSpecified && conf.isNotFoundConfigClass()) || + !configSpecified) { newContextMap.put(channelName, channelContext); } else if (configSpecified) { channelConfigMap.put(channelName, conf); } - if (conf != null) + if (conf != null) { errorList.addAll(conf.getErrors()); + } } catch (ConfigurationException e) { // Could not configure channel - skip it. // No need to add to error list - already added before exception is @@ -504,7 +500,7 @@ public class FlumeConfiguration { private Set<String> validateSources(Set<String> channelSet) { //Arrays.split() call will throw NPE if the sources string is empty - if(sources == null || sources.isEmpty()){ + if (sources == null || sources.isEmpty()) { logger.warn("Agent configuration for '" + agentName + "' has no sources."); errorList.add(new FlumeConfigurationError(agentName, @@ -570,17 +566,17 @@ public class FlumeConfiguration { channels.addAll(srcConf.getChannels()); } channels.retainAll(channelSet); - if(channels.isEmpty()){ + if (channels.isEmpty()) { throw new ConfigurationException( - "No Channels configured for " + sourceName); + "No Channels configured for " + sourceName); } srcContext.put(BasicConfigurationConstants.CONFIG_CHANNELS, - this.getSpaceDelimitedList(channels)); + this.getSpaceDelimitedList(channels)); } if ((configSpecified && srcConf.isNotFoundConfigClass()) || !configSpecified) { newContextMap.put(sourceName, srcContext); - } else if (configSpecified){ + } else if (configSpecified) { sourceConfigMap.put(sourceName, srcConf); } if (srcConf != null) errorList.addAll(srcConf.getErrors()); @@ -664,7 +660,7 @@ public class FlumeConfiguration { BasicConfigurationConstants.CONFIG_CONFIG); if (config == null || config.isEmpty()) { config = "OTHER"; - } else{ + } else { configSpecified = true; } } else { @@ -681,9 +677,9 @@ public class FlumeConfiguration { sinkConf.configure(sinkContext); } - if(!channelSet.contains(sinkConf.getChannel())){ + if (!channelSet.contains(sinkConf.getChannel())) { throw new ConfigurationException("Channel " + - sinkConf.getChannel() + " not in active set."); + sinkConf.getChannel() + " not in active set."); } if ((configSpecified && sinkConf.isNotFoundConfigClass()) || !configSpecified) { @@ -753,15 +749,15 @@ public class FlumeConfiguration { if (conf != null) errorList.addAll(conf.getErrors()); throw new ConfigurationException( "No available sinks for sinkgroup: " + sinkgroupName - + ". Sinkgroup will be removed"); + + ". Sinkgroup will be removed"); } } catch (ConfigurationException e) { iter.remove(); errorList - .add(new FlumeConfigurationError(agentName, sinkgroupName, - FlumeConfigurationErrorType.CONFIG_ERROR, - ErrorOrWarning.ERROR)); + .add(new FlumeConfigurationError(agentName, sinkgroupName, + FlumeConfigurationErrorType.CONFIG_ERROR, + ErrorOrWarning.ERROR)); logger.warn("Could not configure sink group " + sinkgroupName + " due to: " + e.getMessage(), e); } @@ -791,10 +787,10 @@ public class FlumeConfiguration { * @return List of sinks available and reserved for group */ private Set<String> validGroupSinks(Set<String> sinkSet, - Map<String, String> usedSinks, SinkGroupConfiguration groupConf) { + Map<String, String> usedSinks, + SinkGroupConfiguration groupConf) { Set<String> groupSinks = - Collections - .synchronizedSet(new HashSet<String>(groupConf.getSinks())); + Collections.synchronizedSet(new HashSet<String>(groupConf.getSinks())); if (groupSinks.isEmpty()) return null; Iterator<String> sinkIt = groupSinks.iterator(); @@ -866,40 +862,40 @@ public class FlumeConfiguration { public String getPostvalidationConfig() { StringBuilder sb = new StringBuilder( "AgentConfiguration created without Configuration stubs " + - "for which only basic syntactical validation was performed["); + "for which only basic syntactical validation was performed["); sb.append(agentName).append("]").append(NEWLINE); - if(!sourceContextMap.isEmpty() || + if (!sourceContextMap.isEmpty() || !sinkContextMap.isEmpty() || !channelContextMap.isEmpty()) { - if(!sourceContextMap.isEmpty()){ + if (!sourceContextMap.isEmpty()) { sb.append("SOURCES: ").append(sourceContextMap).append(NEWLINE); } - if(!channelContextMap.isEmpty()){ + if (!channelContextMap.isEmpty()) { sb.append("CHANNELS: ").append(channelContextMap).append(NEWLINE); } - if(!sinkContextMap.isEmpty()){ + if (!sinkContextMap.isEmpty()) { sb.append("SINKS: ").append(sinkContextMap).append(NEWLINE); } } - if(!sourceConfigMap.isEmpty() || + if (!sourceConfigMap.isEmpty() || !sinkConfigMap.isEmpty() || !channelConfigMap.isEmpty()) { sb.append("AgentConfiguration created with Configuration stubs " + "for which full validation was performed["); sb.append(agentName).append("]").append(NEWLINE); - if(!sourceConfigMap.isEmpty()){ + if (!sourceConfigMap.isEmpty()) { sb.append("SOURCES: ").append(sourceConfigMap).append(NEWLINE); } - if(!channelConfigMap.isEmpty()){ + if (!channelConfigMap.isEmpty()) { sb.append("CHANNELS: ").append(channelConfigMap).append(NEWLINE); } - if(!sinkConfigMap.isEmpty()){ + if (!sinkConfigMap.isEmpty()) { sb.append("SINKS: ").append(sinkConfigMap).append(NEWLINE); } } @@ -915,7 +911,7 @@ public class FlumeConfiguration { return true; } else { logger - .warn("Duplicate source list specified for agent: " + agentName); + .warn("Duplicate source list specified for agent: " + agentName); errorList.add(new FlumeConfigurationError(agentName, BasicConfigurationConstants.CONFIG_SOURCES, FlumeConfigurationErrorType.DUPLICATE_PROPERTY, @@ -965,7 +961,7 @@ public class FlumeConfiguration { return true; } else { logger - .warn("Duplicate sinkgroup list specfied for agent: " + agentName); + .warn("Duplicate sinkgroup list specfied for agent: " + agentName); errorList.add(new FlumeConfigurationError(agentName, BasicConfigurationConstants.CONFIG_SINKGROUPS, FlumeConfigurationErrorType.DUPLICATE_PROPERTY,
http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-configuration/src/main/java/org/apache/flume/conf/FlumeConfigurationError.java ---------------------------------------------------------------------- diff --git a/flume-ng-configuration/src/main/java/org/apache/flume/conf/FlumeConfigurationError.java b/flume-ng-configuration/src/main/java/org/apache/flume/conf/FlumeConfigurationError.java index 94025a4..5fac11c 100644 --- a/flume-ng-configuration/src/main/java/org/apache/flume/conf/FlumeConfigurationError.java +++ b/flume-ng-configuration/src/main/java/org/apache/flume/conf/FlumeConfigurationError.java @@ -32,7 +32,7 @@ public class FlumeConfigurationError { public FlumeConfigurationError(String component, String key, FlumeConfigurationErrorType error, ErrorOrWarning err) { this.error = err; - if (component != null){ + if (component != null) { this.componentName = component; } else { this.componentName = ""; http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkConfiguration.java ---------------------------------------------------------------------- diff --git a/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkConfiguration.java b/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkConfiguration.java index b34a367..1acd291 100644 --- a/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkConfiguration.java +++ b/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkConfiguration.java @@ -54,7 +54,7 @@ public class SinkConfiguration extends ComponentConfiguration { } @Override - public String toString(int indentCount){ + public String toString(int indentCount) { StringBuilder indentSb = new StringBuilder(""); for (int i = 0; i < indentCount; i++) { @@ -187,7 +187,7 @@ public class SinkConfiguration extends ComponentConfiguration { instance = new SinkConfiguration(name); // Let the caller know that this was created because of this exception. instance.setNotFoundConfigClass(); - } catch (Exception e){ + } catch (Exception e) { throw new ConfigurationException("Couldn't create configuration", e); } return instance; http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkGroupConfiguration.java ---------------------------------------------------------------------- diff --git a/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkGroupConfiguration.java b/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkGroupConfiguration.java index 6b487e5..c3fc2bb 100644 --- a/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkGroupConfiguration.java +++ b/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkGroupConfiguration.java @@ -92,8 +92,7 @@ public class SinkGroupConfiguration extends ComponentConfiguration { for (SinkProcessorType value : values) { if (value.toString().equalsIgnoreCase(type)) return value; String sinkProcessClassName = value.getSinkProcessorClassName(); - if (sinkProcessClassName != null - && sinkProcessClassName.equalsIgnoreCase(type)){ + if (sinkProcessClassName != null && sinkProcessClassName.equalsIgnoreCase(type)) { return value; } } http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceConfiguration.java ---------------------------------------------------------------------- diff --git a/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceConfiguration.java b/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceConfiguration.java index 068bd69..8d7318e 100644 --- a/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceConfiguration.java +++ b/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceConfiguration.java @@ -113,11 +113,11 @@ public class SourceConfiguration extends ComponentConfiguration { } @Override - public String toString(int indentCount){ + public String toString(int indentCount) { String basicStr = super.toString(indentCount); StringBuilder sb = new StringBuilder(); sb.append(basicStr).append("CHANNELS:"); - for(String channel : this.channels){ + for (String channel : this.channels) { sb.append(FlumeConfiguration.INDENTSTEP).append( channel).append(FlumeConfiguration.NEWLINE); } http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/ChannelFactory.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/ChannelFactory.java b/flume-ng-core/src/main/java/org/apache/flume/ChannelFactory.java index 26af8e1..e08e4f3 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/ChannelFactory.java +++ b/flume-ng-core/src/main/java/org/apache/flume/ChannelFactory.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -17,12 +17,9 @@ */ package org.apache.flume; - public interface ChannelFactory { - public Channel create(String name, String type) throws FlumeException; - - public Class<? extends Channel> getClass(String type) - throws FlumeException; + Channel create(String name, String type) throws FlumeException; + Class<? extends Channel> getClass(String type) throws FlumeException; } http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/ChannelSelector.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/ChannelSelector.java b/flume-ng-core/src/main/java/org/apache/flume/ChannelSelector.java index f86aec7..fba2dcb 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/ChannelSelector.java +++ b/flume-ng-core/src/main/java/org/apache/flume/ChannelSelector.java @@ -21,7 +21,6 @@ package org.apache.flume; import java.util.List; import org.apache.flume.conf.Configurable; -import org.apache.flume.conf.ConfigurableComponent; /** * <p> http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/Clock.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/Clock.java b/flume-ng-core/src/main/java/org/apache/flume/Clock.java index fc719bc..119df48 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/Clock.java +++ b/flume-ng-core/src/main/java/org/apache/flume/Clock.java @@ -22,7 +22,5 @@ package org.apache.flume; * Facade for System.currentTimeMillis for Testing */ public interface Clock { - - public long currentTimeMillis(); - + long currentTimeMillis(); } http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/SinkFactory.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/SinkFactory.java b/flume-ng-core/src/main/java/org/apache/flume/SinkFactory.java index e9e52a8..db4a49b 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/SinkFactory.java +++ b/flume-ng-core/src/main/java/org/apache/flume/SinkFactory.java @@ -19,12 +19,10 @@ package org.apache.flume; - public interface SinkFactory { - - public Sink create(String name, String type) + Sink create(String name, String type) throws FlumeException; - public Class<? extends Sink> getClass(String type) - throws FlumeException; + Class<? extends Sink> getClass(String type) + throws FlumeException; } http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/SinkRunner.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/SinkRunner.java b/flume-ng-core/src/main/java/org/apache/flume/SinkRunner.java index c353d1f..f513beb 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/SinkRunner.java +++ b/flume-ng-core/src/main/java/org/apache/flume/SinkRunner.java @@ -104,10 +104,8 @@ public class SinkRunner implements LifecycleAware { logger.debug("Waiting for runner thread to exit"); runnerThread.join(500); } catch (InterruptedException e) { - logger - .debug( - "Interrupted while waiting for runner thread to exit. Exception follows.", - e); + logger.debug("Interrupted while waiting for runner thread to exit. Exception follows.", + e); } } } http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/SourceFactory.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/SourceFactory.java b/flume-ng-core/src/main/java/org/apache/flume/SourceFactory.java index e147410..946cc37 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/SourceFactory.java +++ b/flume-ng-core/src/main/java/org/apache/flume/SourceFactory.java @@ -19,12 +19,11 @@ package org.apache.flume; - public interface SourceFactory { - public Source create(String sourceName, String type) + Source create(String sourceName, String type) throws FlumeException; - public Class<? extends Source> getClass(String type) - throws FlumeException; + Class<? extends Source> getClass(String type) + throws FlumeException; } http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/SourceRunner.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/SourceRunner.java b/flume-ng-core/src/main/java/org/apache/flume/SourceRunner.java index 3246151..2687287 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/SourceRunner.java +++ b/flume-ng-core/src/main/java/org/apache/flume/SourceRunner.java @@ -28,7 +28,7 @@ import org.apache.flume.source.PollableSourceRunner; * * This is an abstract class used for instantiating derived classes. */ -abstract public class SourceRunner implements LifecycleAware { +public abstract class SourceRunner implements LifecycleAware { private Source source; http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/SystemClock.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/SystemClock.java b/flume-ng-core/src/main/java/org/apache/flume/SystemClock.java index f176807..4e326cc 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/SystemClock.java +++ b/flume-ng-core/src/main/java/org/apache/flume/SystemClock.java @@ -23,8 +23,8 @@ package org.apache.flume; */ public class SystemClock implements Clock { - public long currentTimeMillis() { - return System.currentTimeMillis(); - } + public long currentTimeMillis() { + return System.currentTimeMillis(); + } } http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/Transaction.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/Transaction.java b/flume-ng-core/src/main/java/org/apache/flume/Transaction.java index 24f12a3..ffb07bd 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/Transaction.java +++ b/flume-ng-core/src/main/java/org/apache/flume/Transaction.java @@ -52,7 +52,8 @@ import org.apache.flume.channel.BasicTransactionSemantics; */ public interface Transaction { -public enum TransactionState {Started, Committed, RolledBack, Closed }; + enum TransactionState { Started, Committed, RolledBack, Closed } + /** * <p>Starts a transaction boundary for the current channel operation. If a * transaction is already in progress, this method will join that transaction @@ -62,19 +63,19 @@ public enum TransactionState {Started, Committed, RolledBack, Closed }; * to ensure this can lead to dangling transactions and unpredictable results. * </p> */ - public void begin(); + void begin(); /** * Indicates that the transaction can be successfully committed. It is * required that a transaction be in progress when this method is invoked. */ - public void commit(); + void commit(); /** * Indicates that the transaction can must be aborted. It is * required that a transaction be in progress when this method is invoked. */ - public void rollback(); + void rollback(); /** * <p>Ends a transaction boundary for the current channel operation. If a @@ -86,5 +87,5 @@ public enum TransactionState {Started, Committed, RolledBack, Closed }; * to ensure this can lead to dangling transactions and unpredictable results. * </p> */ - public void close(); + void close(); } http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/annotations/Disposable.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/annotations/Disposable.java b/flume-ng-core/src/main/java/org/apache/flume/annotations/Disposable.java index d8db82c..10b1884 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/annotations/Disposable.java +++ b/flume-ng-core/src/main/java/org/apache/flume/annotations/Disposable.java @@ -18,8 +18,10 @@ */ package org.apache.flume.annotations; + import java.lang.annotation.Target; import java.lang.annotation.Retention; + import static java.lang.annotation.RetentionPolicy.RUNTIME; import static java.lang.annotation.ElementType.TYPE; http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/annotations/InterfaceStability.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/annotations/InterfaceStability.java b/flume-ng-core/src/main/java/org/apache/flume/annotations/InterfaceStability.java index e946ac7..70bc347 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/annotations/InterfaceStability.java +++ b/flume-ng-core/src/main/java/org/apache/flume/annotations/InterfaceStability.java @@ -29,6 +29,7 @@ import java.lang.annotation.Documented; * <li>Classes that are {@link Private} are to be considered unstable unless * a different InterfaceStability annotation states otherwise.</li> * <li>Incompatible changes must not be made to classes marked as stable.</li> + * </ul> */ @InterfaceAudience.Public @InterfaceStability.Evolving http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/annotations/Recyclable.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/annotations/Recyclable.java b/flume-ng-core/src/main/java/org/apache/flume/annotations/Recyclable.java index a732c83..bc9ed0e 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/annotations/Recyclable.java +++ b/flume-ng-core/src/main/java/org/apache/flume/annotations/Recyclable.java @@ -18,8 +18,10 @@ */ package org.apache.flume.annotations; + import java.lang.annotation.Target; import java.lang.annotation.Retention; + import static java.lang.annotation.RetentionPolicy.RUNTIME; import static java.lang.annotation.ElementType.TYPE; http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/channel/AbstractChannel.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/channel/AbstractChannel.java b/flume-ng-core/src/main/java/org/apache/flume/channel/AbstractChannel.java index b991650..750cd91 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/channel/AbstractChannel.java +++ b/flume-ng-core/src/main/java/org/apache/flume/channel/AbstractChannel.java @@ -67,12 +67,10 @@ public abstract class AbstractChannel } @Override - public void configure(Context context) { - - } + public void configure(Context context) {} public String toString() { - return this.getClass().getName() + "{name: " + name + "}"; + return this.getClass().getName() + "{name: " + name + "}"; } } http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/channel/AbstractChannelSelector.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/channel/AbstractChannelSelector.java b/flume-ng-core/src/main/java/org/apache/flume/channel/AbstractChannelSelector.java index d69087f..3b1e6e0 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/channel/AbstractChannelSelector.java +++ b/flume-ng-core/src/main/java/org/apache/flume/channel/AbstractChannelSelector.java @@ -73,7 +73,7 @@ public abstract class AbstractChannelSelector implements ChannelSelector { protected List<Channel> getChannelListFromNames(String channels, Map<String, Channel> channelNameMap) { List<Channel> configuredChannels = new ArrayList<Channel>(); - if(channels == null || channels.isEmpty()) { + if (channels == null || channels.isEmpty()) { return configuredChannels; } String[] chNames = channels.split(" "); http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/channel/BasicTransactionSemantics.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/channel/BasicTransactionSemantics.java b/flume-ng-core/src/main/java/org/apache/flume/channel/BasicTransactionSemantics.java index 403cbca..20231f4 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/channel/BasicTransactionSemantics.java +++ b/flume-ng-core/src/main/java/org/apache/flume/channel/BasicTransactionSemantics.java @@ -210,6 +210,7 @@ public abstract class BasicTransactionSemantics implements Transaction { * perform any further operations beyond closing it.</dd> * <dt>CLOSED</dt> * <dd>A closed transaction. No further operations are permitted.</dd> + * </dl> */ protected static enum State { NEW, OPEN, COMPLETED, CLOSED http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelSelectorFactory.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelSelectorFactory.java b/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelSelectorFactory.java index 80b1453..fac6271 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelSelectorFactory.java +++ b/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelSelectorFactory.java @@ -56,7 +56,7 @@ public class ChannelSelectorFactory { public static ChannelSelector create(List<Channel> channels, ChannelSelectorConfiguration conf) { String type = ChannelSelectorType.REPLICATING.toString(); - if (conf != null){ + if (conf != null) { type = conf.getType(); } ChannelSelector selector = getSelectorForType(type); http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/channel/DefaultChannelFactory.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/channel/DefaultChannelFactory.java b/flume-ng-core/src/main/java/org/apache/flume/channel/DefaultChannelFactory.java index 1dd124e..640fe21 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/channel/DefaultChannelFactory.java +++ b/flume-ng-core/src/main/java/org/apache/flume/channel/DefaultChannelFactory.java @@ -51,8 +51,7 @@ public class DefaultChannelFactory implements ChannelFactory { @SuppressWarnings("unchecked") @Override - public Class<? extends Channel> getClass(String type) - throws FlumeException { + public Class<? extends Channel> getClass(String type) throws FlumeException { String channelClassName = type; ChannelType channelType = ChannelType.OTHER; try { http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java b/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java index 6575d10..4393783 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java +++ b/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java @@ -18,12 +18,7 @@ */ package org.apache.flume.channel; -import java.util.concurrent.LinkedBlockingDeque; -import java.util.concurrent.Semaphore; -import java.util.concurrent.TimeUnit; - -import javax.annotation.concurrent.GuardedBy; - +import com.google.common.base.Preconditions; import org.apache.flume.ChannelException; import org.apache.flume.ChannelFullException; import org.apache.flume.Context; @@ -35,7 +30,10 @@ import org.apache.flume.instrumentation.ChannelCounter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Preconditions; +import javax.annotation.concurrent.GuardedBy; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; /** * <p> @@ -78,11 +76,11 @@ public class MemoryChannel extends BasicChannelSemantics { @Override protected void doPut(Event event) throws InterruptedException { channelCounter.incrementEventPutAttemptCount(); - int eventByteSize = (int)Math.ceil(estimateEventSize(event)/byteCapacitySlotSize); + int eventByteSize = (int) Math.ceil(estimateEventSize(event) / byteCapacitySlotSize); if (!putList.offer(event)) { throw new ChannelException( - "Put queue for MemoryTransaction of capacity " + + "Put queue for MemoryTransaction of capacity " + putList.size() + " full, consider committing more frequently, " + "increasing capacity or increasing thread count"); } @@ -92,23 +90,23 @@ public class MemoryChannel extends BasicChannelSemantics { @Override protected Event doTake() throws InterruptedException { channelCounter.incrementEventTakeAttemptCount(); - if(takeList.remainingCapacity() == 0) { + if (takeList.remainingCapacity() == 0) { throw new ChannelException("Take list for MemoryTransaction, capacity " + takeList.size() + " full, consider committing more frequently, " + "increasing capacity, or increasing thread count"); } - if(!queueStored.tryAcquire(keepAlive, TimeUnit.SECONDS)) { + if (!queueStored.tryAcquire(keepAlive, TimeUnit.SECONDS)) { return null; } Event event; - synchronized(queueLock) { + synchronized (queueLock) { event = queue.poll(); } Preconditions.checkNotNull(event, "Queue.poll returned NULL despite semaphore " + "signalling existence of entry"); takeList.put(event); - int eventByteSize = (int)Math.ceil(estimateEventSize(event)/byteCapacitySlotSize); + int eventByteSize = (int) Math.ceil(estimateEventSize(event) / byteCapacitySlotSize); takeByteCounter += eventByteSize; return event; @@ -117,15 +115,14 @@ public class MemoryChannel extends BasicChannelSemantics { @Override protected void doCommit() throws InterruptedException { int remainingChange = takeList.size() - putList.size(); - if(remainingChange < 0) { - if(!bytesRemaining.tryAcquire(putByteCounter, keepAlive, - TimeUnit.SECONDS)) { + if (remainingChange < 0) { + if (!bytesRemaining.tryAcquire(putByteCounter, keepAlive, TimeUnit.SECONDS)) { throw new ChannelException("Cannot commit transaction. Byte capacity " + - "allocated to store event body " + byteCapacity * byteCapacitySlotSize + - "reached. Please increase heap space/byte capacity allocated to " + - "the channel as the sinks may not be keeping up with the sources"); + "allocated to store event body " + byteCapacity * byteCapacitySlotSize + + "reached. Please increase heap space/byte capacity allocated to " + + "the channel as the sinks may not be keeping up with the sources"); } - if(!queueRemaining.tryAcquire(-remainingChange, keepAlive, TimeUnit.SECONDS)) { + if (!queueRemaining.tryAcquire(-remainingChange, keepAlive, TimeUnit.SECONDS)) { bytesRemaining.release(putByteCounter); throw new ChannelFullException("Space for commit to queue couldn't be acquired." + " Sinks are likely not keeping up with sources, or the buffer size is too tight"); @@ -133,10 +130,10 @@ public class MemoryChannel extends BasicChannelSemantics { } int puts = putList.size(); int takes = takeList.size(); - synchronized(queueLock) { - if(puts > 0 ) { - while(!putList.isEmpty()) { - if(!queue.offer(putList.removeFirst())) { + synchronized (queueLock) { + if (puts > 0) { + while (!putList.isEmpty()) { + if (!queue.offer(putList.removeFirst())) { throw new RuntimeException("Queue add failed, this shouldn't be able to happen"); } } @@ -149,7 +146,7 @@ public class MemoryChannel extends BasicChannelSemantics { putByteCounter = 0; queueStored.release(puts); - if(remainingChange > 0) { + if (remainingChange > 0) { queueRemaining.release(remainingChange); } if (puts > 0) { @@ -165,10 +162,11 @@ public class MemoryChannel extends BasicChannelSemantics { @Override protected void doRollback() { int takes = takeList.size(); - synchronized(queueLock) { - Preconditions.checkState(queue.remainingCapacity() >= takeList.size(), "Not enough space in memory channel " + + synchronized (queueLock) { + Preconditions.checkState(queue.remainingCapacity() >= takeList.size(), + "Not enough space in memory channel " + "queue to rollback takes. This should never happen, please report"); - while(!takeList.isEmpty()) { + while (!takeList.isEmpty()) { queue.addFirst(takeList.removeLast()); } putList.clear(); @@ -195,10 +193,12 @@ public class MemoryChannel extends BasicChannelSemantics { // this allows local threads waiting for space in the queue to commit without denying access to the // shared lock to threads that would make more space on the queue private Semaphore queueRemaining; + // used to make "reservations" to grab data from the queue. // by using this we can block for a while to get data without locking all other threads out // like we would if we tried to use a blocking call on queue private Semaphore queueStored; + // maximum items in a transaction queue private volatile Integer transCapacity; private volatile int keepAlive; @@ -208,7 +208,6 @@ public class MemoryChannel extends BasicChannelSemantics { private Semaphore bytesRemaining; private ChannelCounter channelCounter; - public MemoryChannel() { super(); } @@ -226,7 +225,7 @@ public class MemoryChannel extends BasicChannelSemantics { Integer capacity = null; try { capacity = context.getInteger("capacity", defaultCapacity); - } catch(NumberFormatException e) { + } catch (NumberFormatException e) { capacity = defaultCapacity; LOGGER.warn("Invalid capacity specified, initializing channel to " + "default capacity of {}", defaultCapacity); @@ -239,7 +238,7 @@ public class MemoryChannel extends BasicChannelSemantics { } try { transCapacity = context.getInteger("transactionCapacity", defaultTransCapacity); - } catch(NumberFormatException e) { + } catch (NumberFormatException e) { transCapacity = defaultTransCapacity; LOGGER.warn("Invalid transation capacity specified, initializing channel" + " to default capacity of {}", defaultTransCapacity); @@ -255,34 +254,37 @@ public class MemoryChannel extends BasicChannelSemantics { "the capacity."); try { - byteCapacityBufferPercentage = context.getInteger("byteCapacityBufferPercentage", defaultByteCapacityBufferPercentage); - } catch(NumberFormatException e) { + byteCapacityBufferPercentage = context.getInteger("byteCapacityBufferPercentage", + defaultByteCapacityBufferPercentage); + } catch (NumberFormatException e) { byteCapacityBufferPercentage = defaultByteCapacityBufferPercentage; } try { - byteCapacity = (int)((context.getLong("byteCapacity", defaultByteCapacity).longValue() * (1 - byteCapacityBufferPercentage * .01 )) /byteCapacitySlotSize); + byteCapacity = (int) ((context.getLong("byteCapacity", defaultByteCapacity).longValue() * + (1 - byteCapacityBufferPercentage * .01)) / byteCapacitySlotSize); if (byteCapacity < 1) { byteCapacity = Integer.MAX_VALUE; } - } catch(NumberFormatException e) { - byteCapacity = (int)((defaultByteCapacity * (1 - byteCapacityBufferPercentage * .01 )) /byteCapacitySlotSize); + } catch (NumberFormatException e) { + byteCapacity = (int) ((defaultByteCapacity * (1 - byteCapacityBufferPercentage * .01)) / + byteCapacitySlotSize); } try { keepAlive = context.getInteger("keep-alive", defaultKeepAlive); - } catch(NumberFormatException e) { + } catch (NumberFormatException e) { keepAlive = defaultKeepAlive; } - if(queue != null) { + if (queue != null) { try { resizeQueue(capacity); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } else { - synchronized(queueLock) { + synchronized (queueLock) { queue = new LinkedBlockingDeque<Event>(capacity); queueRemaining = new Semaphore(capacity); queueStored = new Semaphore(0); @@ -298,7 +300,8 @@ public class MemoryChannel extends BasicChannelSemantics { lastByteCapacity = byteCapacity; } else { try { - if(!bytesRemaining.tryAcquire(lastByteCapacity - byteCapacity, keepAlive, TimeUnit.SECONDS)) { + if (!bytesRemaining.tryAcquire(lastByteCapacity - byteCapacity, keepAlive, + TimeUnit.SECONDS)) { LOGGER.warn("Couldn't acquire permits to downsize the byte capacity, resizing has been aborted"); } else { lastByteCapacity = byteCapacity; @@ -316,24 +319,24 @@ public class MemoryChannel extends BasicChannelSemantics { private void resizeQueue(int capacity) throws InterruptedException { int oldCapacity; - synchronized(queueLock) { + synchronized (queueLock) { oldCapacity = queue.size() + queue.remainingCapacity(); } - if(oldCapacity == capacity) { + if (oldCapacity == capacity) { return; } else if (oldCapacity > capacity) { - if(!queueRemaining.tryAcquire(oldCapacity - capacity, keepAlive, TimeUnit.SECONDS)) { + if (!queueRemaining.tryAcquire(oldCapacity - capacity, keepAlive, TimeUnit.SECONDS)) { LOGGER.warn("Couldn't acquire permits to downsize the queue, resizing has been aborted"); } else { - synchronized(queueLock) { + synchronized (queueLock) { LinkedBlockingDeque<Event> newQueue = new LinkedBlockingDeque<Event>(capacity); newQueue.addAll(queue); queue = newQueue; } } } else { - synchronized(queueLock) { + synchronized (queueLock) { LinkedBlockingDeque<Event> newQueue = new LinkedBlockingDeque<Event>(capacity); newQueue.addAll(queue); queue = newQueue; @@ -363,10 +366,9 @@ public class MemoryChannel extends BasicChannelSemantics { return new MemoryTransaction(transCapacity, channelCounter); } - private long estimateEventSize(Event event) - { + private long estimateEventSize(Event event) { byte[] body = event.getBody(); - if(body != null && body.length != 0) { + if (body != null && body.length != 0) { return body.length; } //Each event occupies at least 1 slot, so return 1. http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/channel/MultiplexingChannelSelector.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/channel/MultiplexingChannelSelector.java b/flume-ng-core/src/main/java/org/apache/flume/channel/MultiplexingChannelSelector.java index 3e32804..e57a223 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/channel/MultiplexingChannelSelector.java +++ b/flume-ng-core/src/main/java/org/apache/flume/channel/MultiplexingChannelSelector.java @@ -40,8 +40,7 @@ public class MultiplexingChannelSelector extends AbstractChannelSelector { public static final String CONFIG_PREFIX_OPTIONAL = "optional"; @SuppressWarnings("unused") - private static final Logger LOG = LoggerFactory - .getLogger(MultiplexingChannelSelector.class); + private static final Logger LOG = LoggerFactory.getLogger(MultiplexingChannelSelector.class); private static final List<Channel> EMPTY_LIST = Collections.emptyList(); @@ -51,6 +50,7 @@ public class MultiplexingChannelSelector extends AbstractChannelSelector { private Map<String, List<Channel>> channelMapping; private Map<String, List<Channel>> optionalChannels; private List<Channel> defaultChannels; + @Override public List<Channel> getRequiredChannels(Event event) { String headerValue = event.getHeaders().get(headerName); @@ -74,7 +74,7 @@ public class MultiplexingChannelSelector extends AbstractChannelSelector { String hdr = event.getHeaders().get(headerName); List<Channel> channels = optionalChannels.get(hdr); - if(channels == null) { + if (channels == null) { channels = EMPTY_LIST; } return channels; @@ -128,7 +128,7 @@ public class MultiplexingChannelSelector extends AbstractChannelSelector { List<Channel> reqdChannels = channelMapping.get(hdr); //Check if there are required channels, else defaults to default channels - if(reqdChannels == null || reqdChannels.isEmpty()) { + if (reqdChannels == null || reqdChannels.isEmpty()) { reqdChannels = defaultChannels; } for (Channel c : reqdChannels) { http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/channel/PseudoTxnMemoryChannel.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/channel/PseudoTxnMemoryChannel.java b/flume-ng-core/src/main/java/org/apache/flume/channel/PseudoTxnMemoryChannel.java index cc391c4..22e1002 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/channel/PseudoTxnMemoryChannel.java +++ b/flume-ng-core/src/main/java/org/apache/flume/channel/PseudoTxnMemoryChannel.java @@ -100,13 +100,13 @@ public class PseudoTxnMemoryChannel extends AbstractChannel { } queue = new ArrayBlockingQueue<Event>(capacity); - if(channelCounter == null) { + if (channelCounter == null) { channelCounter = new ChannelCounter(getName()); } } @Override - public void start(){ + public void start() { channelCounter.start(); channelCounter.setChannelSize(queue.size()); channelCounter.setChannelSize( @@ -115,7 +115,7 @@ public class PseudoTxnMemoryChannel extends AbstractChannel { } @Override - public void stop(){ + public void stop() { channelCounter.setChannelSize(queue.size()); channelCounter.stop(); super.stop(); http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/channel/ReplicatingChannelSelector.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/channel/ReplicatingChannelSelector.java b/flume-ng-core/src/main/java/org/apache/flume/channel/ReplicatingChannelSelector.java index 8a0d2bd..47c6aca 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/channel/ReplicatingChannelSelector.java +++ b/flume-ng-core/src/main/java/org/apache/flume/channel/ReplicatingChannelSelector.java @@ -47,7 +47,7 @@ public class ReplicatingChannelSelector extends AbstractChannelSelector { * configure method. It is conceiveable that custom component tests too * do that. So in that case, revert to old behavior. */ - if(requiredChannels == null) { + if (requiredChannels == null) { return getAllChannels(); } return requiredChannels; @@ -63,8 +63,8 @@ public class ReplicatingChannelSelector extends AbstractChannelSelector { String optionalList = context.getString(CONFIG_OPTIONAL); requiredChannels = new ArrayList<Channel>(getAllChannels()); Map<String, Channel> channelNameMap = getChannelNameMap(); - if(optionalList != null && !optionalList.isEmpty()) { - for(String optional : optionalList.split("\\s+")) { + if (optionalList != null && !optionalList.isEmpty()) { + for (String optional : optionalList.split("\\s+")) { Channel optionalChannel = channelNameMap.get(optional); requiredChannels.remove(optionalChannel); if (!optionalChannels.contains(optionalChannel)) { http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/client/avro/AvroCLIClient.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/client/avro/AvroCLIClient.java b/flume-ng-core/src/main/java/org/apache/flume/client/avro/AvroCLIClient.java index 3c8c267..dd2aeef 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/client/avro/AvroCLIClient.java +++ b/flume-ng-core/src/main/java/org/apache/flume/client/avro/AvroCLIClient.java @@ -98,8 +98,7 @@ public class AvroCLIClient { for (Map.Entry<Object, Object> propertiesEntry : properties.entrySet()) { String key = (String) propertiesEntry.getKey(); String value = (String) propertiesEntry.getValue(); - logger.debug("Inserting Header Key [" + key + "] header value [" + - value + "]"); + logger.debug("Inserting Header Key [" + key + "] header value [" + value + "]"); headers.put(key, value); } } @@ -108,12 +107,12 @@ public class AvroCLIClient { return; } finally { if (fs != null) { - try { - fs.close(); - }catch (Exception e) { - logger.error("Unable to close headerFile", e); - return; - } + try { + fs.close(); + } catch (Exception e) { + logger.error("Unable to close headerFile", e); + return; + } } } } @@ -179,7 +178,7 @@ public class AvroCLIClient { fileName = commandLine.getOptionValue("filename"); dirName = commandLine.getOptionValue("dirname"); - if (commandLine.hasOption("headerFile")){ + if (commandLine.hasOption("headerFile")) { parseHeaders(commandLine); } http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java b/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java index 36d80f0..4dc0207 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java +++ b/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java @@ -24,15 +24,18 @@ import com.google.common.base.Charsets; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.io.Files; -import org.apache.commons.io.FileUtils; -import org.apache.commons.io.filefilter.IOFileFilter; -import org.apache.commons.lang.StringUtils; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.FlumeException; import org.apache.flume.annotations.InterfaceAudience; import org.apache.flume.annotations.InterfaceStability; -import org.apache.flume.serialization.*; +import org.apache.flume.serialization.DecodeErrorPolicy; +import org.apache.flume.serialization.DurablePositionTracker; +import org.apache.flume.serialization.EventDeserializer; +import org.apache.flume.serialization.EventDeserializerFactory; +import org.apache.flume.serialization.PositionTracker; +import org.apache.flume.serialization.ResettableFileInputStream; +import org.apache.flume.serialization.ResettableInputStream; import org.apache.flume.source.SpoolDirectorySourceConfigurationConstants; import org.apache.flume.source.SpoolDirectorySourceConfigurationConstants.ConsumeOrder; import org.apache.flume.tools.PlatformDetect; @@ -44,9 +47,12 @@ import java.io.FileFilter; import java.io.FileNotFoundException; import java.io.IOException; import java.nio.charset.Charset; -import java.util.*; -import java.util.regex.Pattern; import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Locale; +import java.util.regex.Pattern; /** * <p/>A {@link ReliableEventReader} which reads log data from files stored @@ -137,9 +143,11 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader { if (logger.isDebugEnabled()) { logger.debug("Initializing {} with directory={}, metaDir={}, " + - "deserializer={}", - new Object[] { ReliableSpoolingFileEventReader.class.getSimpleName(), - spoolDirectory, trackerDirPath, deserializerType }); + "deserializer={}", + new Object[] { + ReliableSpoolingFileEventReader.class.getSimpleName(), + spoolDirectory, trackerDirPath, deserializerType + }); } // Verify directory exists and is readable/writable @@ -202,7 +210,7 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader { this.metaFile = new File(trackerDirectory, metaFileName); - if(metaFile.exists() && metaFile.length() == 0) { + if (metaFile.exists() && metaFile.length() == 0) { deleteMetaFile(); } } @@ -214,7 +222,8 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader { public boolean accept(File candidate) { if (candidate.isDirectory()) { String directoryName = candidate.getName(); - if (!recursiveDirectorySearch || directoryName.startsWith(".") || + if (!recursiveDirectorySearch || + directoryName.startsWith(".") || ignorePattern.matcher(directoryName).matches()) { return false; @@ -222,7 +231,8 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader { return true; } String fileName = candidate.getName(); - if (fileName.endsWith(completedSuffix) || fileName.startsWith(".") || + if (fileName.endsWith(completedSuffix) || + fileName.startsWith(".") || ignorePattern.matcher(fileName).matches()) { return false; } @@ -243,11 +253,10 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader { return candidateFiles; } - for(File file : directory.listFiles(filter)){ + for (File file : directory.listFiles(filter)) { if (file.isDirectory()) { candidateFiles.addAll(getCandidateFiles(file)); - } - else { + } else { candidateFiles.add(file); } } @@ -555,8 +564,8 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader { new ResettableFileInputStream(file, tracker, ResettableFileInputStream.DEFAULT_BUF_SIZE, inputCharset, decodeErrorPolicy); - EventDeserializer deserializer = EventDeserializerFactory.getInstance - (deserializerType, deserializerContext, in); + EventDeserializer deserializer = + EventDeserializerFactory.getInstance(deserializerType, deserializerContext, in); return Optional.of(new FileInfo(file, deserializer)); } catch (FileNotFoundException e) { @@ -589,10 +598,21 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader { this.deserializer = deserializer; } - public long getLength() { return length; } - public long getLastModified() { return lastModified; } - public EventDeserializer getDeserializer() { return deserializer; } - public File getFile() { return file; } + public long getLength() { + return length; + } + + public long getLastModified() { + return lastModified; + } + + public EventDeserializer getDeserializer() { + return deserializer; + } + + public File getFile() { + return file; + } } @InterfaceAudience.Private http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/event/EventHelper.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/event/EventHelper.java b/flume-ng-core/src/main/java/org/apache/flume/event/EventHelper.java index a2c0a57..0e3398e 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/event/EventHelper.java +++ b/flume-ng-core/src/main/java/org/apache/flume/event/EventHelper.java @@ -53,18 +53,18 @@ public class EventHelper { HexDump.dump(data, 0, out, 0); String hexDump = new String(out.toByteArray()); // remove offset since it's not relevant for such a small dataset - if(hexDump.startsWith(HEXDUMP_OFFSET)) { + if (hexDump.startsWith(HEXDUMP_OFFSET)) { hexDump = hexDump.substring(HEXDUMP_OFFSET.length()); } buffer.append(hexDump); } catch (Exception e) { - if(LOGGER.isInfoEnabled()) { - LOGGER.info("Exception while dumping event", e); - } + if (LOGGER.isInfoEnabled()) { + LOGGER.info("Exception while dumping event", e); + } buffer.append("...Exception while dumping: ").append(e.getMessage()); } String result = buffer.toString(); - if(result.endsWith(EOL) && buffer.length() > EOL.length()) { + if (result.endsWith(EOL) && buffer.length() > EOL.length()) { buffer.delete(buffer.length() - EOL.length(), buffer.length()).toString(); } } http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/formatter/output/BucketPath.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/formatter/output/BucketPath.java b/flume-ng-core/src/main/java/org/apache/flume/formatter/output/BucketPath.java index e565192..b2fe3f0 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/formatter/output/BucketPath.java +++ b/flume-ng-core/src/main/java/org/apache/flume/formatter/output/BucketPath.java @@ -18,32 +18,29 @@ package org.apache.flume.formatter.output; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import org.apache.flume.Clock; +import org.apache.flume.SystemClock; +import org.apache.flume.tools.TimestampRoundDownUtil; + import java.text.SimpleDateFormat; import java.util.Calendar; import java.util.Date; import java.util.HashMap; import java.util.Map; import java.util.TimeZone; -import java.util.concurrent.ConcurrentHashMap; import java.util.regex.Matcher; import java.util.regex.Pattern; -import com.google.common.annotations.VisibleForTesting; - -import org.apache.flume.Clock; -import org.apache.flume.SystemClock; -import org.apache.flume.tools.TimestampRoundDownUtil; - -import com.google.common.base.Preconditions; - public class BucketPath { /** * These are useful to other classes which might want to search for tags in * strings. */ - final public static String TAG_REGEX = "\\%(\\w|\\%)|\\%\\{([\\w\\.-]+)\\}"; - final public static Pattern tagPattern = Pattern.compile(TAG_REGEX); + public static final String TAG_REGEX = "\\%(\\w|\\%)|\\%\\{([\\w\\.-]+)\\}"; + public static final Pattern tagPattern = Pattern.compile(TAG_REGEX); private static Clock clock = new SystemClock(); @@ -62,57 +59,57 @@ public class BucketPath { public static String expandShorthand(char c) { // It's a date switch (c) { - case 'a': - return "weekday_short"; - case 'A': - return "weekday_full"; - case 'b': - return "monthname_short"; - case 'B': - return "monthname_full"; - case 'c': - return "datetime"; - case 'd': - return "day_of_month_xx"; // two digit - case 'e': - return "day_of_month_x"; // 1 or 2 digit - case 'D': - return "date_short"; // "MM/dd/yy"; - case 'H': - return "hour_24_xx"; - case 'I': - return "hour_12_xx"; - case 'j': - return "day_of_year_xxx"; // three digits - case 'k': - return "hour_24"; // 1 or 2 digits - case 'l': - return "hour_12"; // 1 or 2 digits - case 'm': - return "month_xx"; - case 'n': - return "month_x"; // 1 or 2 digits - case 'M': - return "minute_xx"; - case 'p': - return "am_pm"; - case 's': - return "unix_seconds"; - case 'S': - return "seconds_xx"; - case 't': - // This is different from unix date (which would insert a tab character - // here) - return "unix_millis"; - case 'y': - return "year_xx"; - case 'Y': - return "year_xxxx"; - case 'z': - return "timezone_delta"; - default: -// LOG.warn("Unrecognized escape in event format string: %" + c); - return "" + c; + case 'a': + return "weekday_short"; + case 'A': + return "weekday_full"; + case 'b': + return "monthname_short"; + case 'B': + return "monthname_full"; + case 'c': + return "datetime"; + case 'd': + return "day_of_month_xx"; // two digit + case 'e': + return "day_of_month_x"; // 1 or 2 digit + case 'D': + return "date_short"; // "MM/dd/yy"; + case 'H': + return "hour_24_xx"; + case 'I': + return "hour_12_xx"; + case 'j': + return "day_of_year_xxx"; // three digits + case 'k': + return "hour_24"; // 1 or 2 digits + case 'l': + return "hour_12"; // 1 or 2 digits + case 'm': + return "month_xx"; + case 'n': + return "month_x"; // 1 or 2 digits + case 'M': + return "minute_xx"; + case 'p': + return "am_pm"; + case 's': + return "unix_seconds"; + case 'S': + return "seconds_xx"; + case 't': + // This is different from unix date (which would insert a tab character + // here) + return "unix_millis"; + case 'y': + return "year_xx"; + case 'Y': + return "year_xxxx"; + case 'z': + return "timezone_delta"; + default: + // LOG.warn("Unrecognized escape in event format string: %" + c); + return "" + c; } } @@ -181,8 +178,8 @@ public class BucketPath { @VisibleForTesting @Deprecated public static String replaceShorthand(char c, Map<String, String> headers, - TimeZone timeZone, boolean needRounding, int unit, int roundDown, - boolean useLocalTimestamp) { + TimeZone timeZone, boolean needRounding, int unit, int roundDown, + boolean useLocalTimestamp) { long ts = 0; if (useLocalTimestamp) { ts = clock.currentTimeMillis(); @@ -191,27 +188,27 @@ public class BucketPath { roundDown, false, ts); } - protected static final ThreadLocal<HashMap<String, SimpleDateFormat>> simpleDateFormatCache = new ThreadLocal<HashMap<String, SimpleDateFormat>>() { - - @Override - protected HashMap<String, SimpleDateFormat> initialValue() { - return new HashMap<String, SimpleDateFormat>(); - } - }; - + protected static final ThreadLocal<HashMap<String, SimpleDateFormat>> simpleDateFormatCache = + new ThreadLocal<HashMap<String, SimpleDateFormat>>() { + + @Override + protected HashMap<String, SimpleDateFormat> initialValue() { + return new HashMap<String, SimpleDateFormat>(); + } + }; + protected static SimpleDateFormat getSimpleDateFormat(String string) { - HashMap<String, SimpleDateFormat> localCache = simpleDateFormatCache.get(); - - SimpleDateFormat simpleDateFormat = localCache.get(string); - if (simpleDateFormat == null) { - simpleDateFormat = new SimpleDateFormat(string); - localCache.put(string, simpleDateFormat); - simpleDateFormatCache.set(localCache); - } - - return simpleDateFormat; + HashMap<String, SimpleDateFormat> localCache = simpleDateFormatCache.get(); + + SimpleDateFormat simpleDateFormat = localCache.get(string); + if (simpleDateFormat == null) { + simpleDateFormat = new SimpleDateFormat(string); + localCache.put(string, simpleDateFormat); + simpleDateFormatCache.set(localCache); + } + + return simpleDateFormat; } - /** * Not intended as a public API @@ -223,10 +220,10 @@ public class BucketPath { String timestampHeader = null; try { - if(!useLocalTimestamp) { + if (!useLocalTimestamp) { timestampHeader = headers.get("timestamp"); Preconditions.checkNotNull(timestampHeader, "Expected timestamp in " + - "the Flume event headers, but it was null"); + "the Flume event headers, but it was null"); ts = Long.valueOf(timestampHeader); } else { timestampHeader = String.valueOf(ts); @@ -238,87 +235,87 @@ public class BucketPath { + " TimestampInterceptor source interceptor).", e); } - if(needRounding){ + if (needRounding) { ts = roundDown(roundDown, unit, ts); } // It's a date String formatString = ""; switch (c) { - case '%': - return "%"; - case 'a': - formatString = "EEE"; - break; - case 'A': - formatString = "EEEE"; - break; - case 'b': - formatString = "MMM"; - break; - case 'B': - formatString = "MMMM"; - break; - case 'c': - formatString = "EEE MMM d HH:mm:ss yyyy"; - break; - case 'd': - formatString = "dd"; - break; - case 'e': - formatString = "d"; - break; - case 'D': - formatString = "MM/dd/yy"; - break; - case 'H': - formatString = "HH"; - break; - case 'I': - formatString = "hh"; - break; - case 'j': - formatString = "DDD"; - break; - case 'k': - formatString = "H"; - break; - case 'l': - formatString = "h"; - break; - case 'm': - formatString = "MM"; - break; - case 'M': - formatString = "mm"; - break; - case 'n': - formatString = "M"; - break; - case 'p': - formatString = "a"; - break; - case 's': - return "" + (ts/1000); - case 'S': - formatString = "ss"; - break; - case 't': - // This is different from unix date (which would insert a tab character - // here) - return timestampHeader; - case 'y': - formatString = "yy"; - break; - case 'Y': - formatString = "yyyy"; - break; - case 'z': - formatString = "ZZZ"; - break; - default: -// LOG.warn("Unrecognized escape in event format string: %" + c); - return ""; + case '%': + return "%"; + case 'a': + formatString = "EEE"; + break; + case 'A': + formatString = "EEEE"; + break; + case 'b': + formatString = "MMM"; + break; + case 'B': + formatString = "MMMM"; + break; + case 'c': + formatString = "EEE MMM d HH:mm:ss yyyy"; + break; + case 'd': + formatString = "dd"; + break; + case 'e': + formatString = "d"; + break; + case 'D': + formatString = "MM/dd/yy"; + break; + case 'H': + formatString = "HH"; + break; + case 'I': + formatString = "hh"; + break; + case 'j': + formatString = "DDD"; + break; + case 'k': + formatString = "H"; + break; + case 'l': + formatString = "h"; + break; + case 'm': + formatString = "MM"; + break; + case 'M': + formatString = "mm"; + break; + case 'n': + formatString = "M"; + break; + case 'p': + formatString = "a"; + break; + case 's': + return "" + (ts / 1000); + case 'S': + formatString = "ss"; + break; + case 't': + // This is different from unix date (which would insert a tab character + // here) + return timestampHeader; + case 'y': + formatString = "yy"; + break; + case 'Y': + formatString = "yyyy"; + break; + case 'z': + formatString = "ZZZ"; + break; + default: + // LOG.warn("Unrecognized escape in event format string: %" + c); + return ""; } SimpleDateFormat format = getSimpleDateFormat(formatString); @@ -332,9 +329,9 @@ public class BucketPath { return format.format(date); } - private static long roundDown(int roundDown, int unit, long ts){ + private static long roundDown(int roundDown, int unit, long ts) { long timestamp = ts; - if(roundDown <= 0){ + if (roundDown <= 0) { roundDown = 1; } switch (unit) { @@ -366,7 +363,7 @@ public class BucketPath { * TODO(henry): we may want to consider taking this out of Event and into a * more general class when we get more use cases for this pattern. */ - public static String escapeString(String in, Map<String, String> headers){ + public static String escapeString(String in, Map<String, String> headers) { return escapeString(in, headers, false, 0, 0); } @@ -404,8 +401,8 @@ public class BucketPath { * @return Escaped string. */ public static String escapeString(String in, Map<String, String> headers, - TimeZone timeZone, boolean needRounding, int unit, int roundDown, - boolean useLocalTimeStamp) { + TimeZone timeZone, boolean needRounding, int unit, int roundDown, + boolean useLocalTimeStamp) { long ts = clock.currentTimeMillis(); http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/formatter/output/DefaultPathManager.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/formatter/output/DefaultPathManager.java b/flume-ng-core/src/main/java/org/apache/flume/formatter/output/DefaultPathManager.java index 176db7f..a6e203a 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/formatter/output/DefaultPathManager.java +++ b/flume-ng-core/src/main/java/org/apache/flume/formatter/output/DefaultPathManager.java @@ -51,7 +51,7 @@ public class DefaultPathManager implements PathManager { sb.append(filePrefix).append(seriesTimestamp).append("-"); sb.append(fileIndex.incrementAndGet()); if (extension.length() > 0) { - sb.append(".").append(extension); + sb.append(".").append(extension); } currentFile = new File(baseDirectory, sb.toString()); http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/formatter/output/PathManager.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/formatter/output/PathManager.java b/flume-ng-core/src/main/java/org/apache/flume/formatter/output/PathManager.java index 5a3066a..baad5d0 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/formatter/output/PathManager.java +++ b/flume-ng-core/src/main/java/org/apache/flume/formatter/output/PathManager.java @@ -25,26 +25,26 @@ import org.apache.flume.Context; * Creates the files used by the RollingFileSink. */ public interface PathManager { - /** - * {@link Context} prefix - */ - public static String CTX_PREFIX = "pathManager."; + /** + * {@link Context} prefix + */ + public static String CTX_PREFIX = "pathManager."; - File nextFile(); + File nextFile(); - File getCurrentFile(); + File getCurrentFile(); - void rotate(); + void rotate(); - File getBaseDirectory(); + File getBaseDirectory(); - void setBaseDirectory(File baseDirectory); + void setBaseDirectory(File baseDirectory); - /** - * Knows how to construct this path manager.<br/> - * <b>Note: Implementations MUST provide a public a no-arg constructor.</b> - */ - public interface Builder { - public PathManager build(Context context); - } + /** + * Knows how to construct this path manager.<br/> + * <b>Note: Implementations MUST provide a public a no-arg constructor.</b> + */ + public interface Builder { + public PathManager build(Context context); + } } http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/formatter/output/PathManagerFactory.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/formatter/output/PathManagerFactory.java b/flume-ng-core/src/main/java/org/apache/flume/formatter/output/PathManagerFactory.java index 4dbe083..4446e5f 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/formatter/output/PathManagerFactory.java +++ b/flume-ng-core/src/main/java/org/apache/flume/formatter/output/PathManagerFactory.java @@ -19,64 +19,65 @@ package org.apache.flume.formatter.output; import com.google.common.base.Preconditions; -import java.util.Locale; import org.apache.flume.Context; import org.apache.flume.FlumeException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Locale; + /** * Create PathManager instances. */ public class PathManagerFactory { - private static final Logger logger = LoggerFactory.getLogger(PathManagerFactory.class); + private static final Logger logger = LoggerFactory.getLogger(PathManagerFactory.class); - public static PathManager getInstance(String managerType, Context context) { + public static PathManager getInstance(String managerType, Context context) { - Preconditions.checkNotNull(managerType, "path manager type must not be null"); - - // try to find builder class in enum of known output serializers - PathManagerType type; - try { - type = PathManagerType.valueOf(managerType.toUpperCase(Locale.ENGLISH)); - } catch (IllegalArgumentException e) { - logger.debug("Not in enum, loading builder class: {}", managerType); - type = PathManagerType.OTHER; - } - Class<? extends PathManager.Builder> builderClass = type.getBuilderClass(); + Preconditions.checkNotNull(managerType, "path manager type must not be null"); - // handle the case where they have specified their own builder in the config - if (builderClass == null) { - try { - Class c = Class.forName(managerType); - if (c != null && PathManager.Builder.class.isAssignableFrom(c)) { - builderClass = (Class<? extends PathManager.Builder>) c; - } else { - String errMessage = "Unable to instantiate Builder from " + - managerType + ": does not appear to implement " + - PathManager.Builder.class.getName(); - throw new FlumeException(errMessage); - } - } catch (ClassNotFoundException ex) { - logger.error("Class not found: " + managerType, ex); - throw new FlumeException(ex); - } - } + // try to find builder class in enum of known output serializers + PathManagerType type; + try { + type = PathManagerType.valueOf(managerType.toUpperCase(Locale.ENGLISH)); + } catch (IllegalArgumentException e) { + logger.debug("Not in enum, loading builder class: {}", managerType); + type = PathManagerType.OTHER; + } + Class<? extends PathManager.Builder> builderClass = type.getBuilderClass(); - // build the builder - PathManager.Builder builder; - try { - builder = builderClass.newInstance(); - } catch (InstantiationException ex) { - String errMessage = "Cannot instantiate builder: " + managerType; - logger.error(errMessage, ex); - throw new FlumeException(errMessage, ex); - } catch (IllegalAccessException ex) { - String errMessage = "Cannot instantiate builder: " + managerType; - logger.error(errMessage, ex); - throw new FlumeException(errMessage, ex); + // handle the case where they have specified their own builder in the config + if (builderClass == null) { + try { + Class c = Class.forName(managerType); + if (c != null && PathManager.Builder.class.isAssignableFrom(c)) { + builderClass = (Class<? extends PathManager.Builder>) c; + } else { + String errMessage = "Unable to instantiate Builder from " + + managerType + ": does not appear to implement " + + PathManager.Builder.class.getName(); + throw new FlumeException(errMessage); } + } catch (ClassNotFoundException ex) { + logger.error("Class not found: " + managerType, ex); + throw new FlumeException(ex); + } + } - return builder.build(context); + // build the builder + PathManager.Builder builder; + try { + builder = builderClass.newInstance(); + } catch (InstantiationException ex) { + String errMessage = "Cannot instantiate builder: " + managerType; + logger.error(errMessage, ex); + throw new FlumeException(errMessage, ex); + } catch (IllegalAccessException ex) { + String errMessage = "Cannot instantiate builder: " + managerType; + logger.error(errMessage, ex); + throw new FlumeException(errMessage, ex); } + + return builder.build(context); + } } http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/formatter/output/PathManagerType.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/formatter/output/PathManagerType.java b/flume-ng-core/src/main/java/org/apache/flume/formatter/output/PathManagerType.java index 4f1fa93..f131ff3 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/formatter/output/PathManagerType.java +++ b/flume-ng-core/src/main/java/org/apache/flume/formatter/output/PathManagerType.java @@ -27,17 +27,17 @@ import org.apache.flume.annotations.InterfaceStability; @InterfaceAudience.Private @InterfaceStability.Unstable public enum PathManagerType { - DEFAULT(DefaultPathManager.Builder.class), - ROLLTIME(RollTimePathManager.Builder.class), - OTHER(null); + DEFAULT(DefaultPathManager.Builder.class), + ROLLTIME(RollTimePathManager.Builder.class), + OTHER(null); - private final Class<? extends PathManager.Builder> builderClass; + private final Class<? extends PathManager.Builder> builderClass; - PathManagerType(Class<? extends PathManager.Builder> builderClass) { - this.builderClass = builderClass; - } + PathManagerType(Class<? extends PathManager.Builder> builderClass) { + this.builderClass = builderClass; + } - public Class<? extends PathManager.Builder> getBuilderClass() { - return builderClass; - } + public Class<? extends PathManager.Builder> getBuilderClass() { + return builderClass; + } } http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/formatter/output/RollTimePathManager.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/formatter/output/RollTimePathManager.java b/flume-ng-core/src/main/java/org/apache/flume/formatter/output/RollTimePathManager.java index 6883a9c..b14a812 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/formatter/output/RollTimePathManager.java +++ b/flume-ng-core/src/main/java/org/apache/flume/formatter/output/RollTimePathManager.java @@ -19,48 +19,48 @@ package org.apache.flume.formatter.output; -import java.io.File; - import org.apache.flume.Context; import org.joda.time.LocalDateTime; import org.joda.time.format.DateTimeFormat; import org.joda.time.format.DateTimeFormatter; +import java.io.File; + /** * */ public class RollTimePathManager extends DefaultPathManager { - private final DateTimeFormatter formatter = DateTimeFormat.forPattern("yyyyMMddHHmmss"); - private String lastRoll; - - public RollTimePathManager(Context context) { - super(context); - } + private final DateTimeFormatter formatter = DateTimeFormat.forPattern("yyyyMMddHHmmss"); + private String lastRoll; - @Override - public File nextFile() { - StringBuilder sb = new StringBuilder(); - String date = formatter.print(LocalDateTime.now()); - if (!date.equals(lastRoll)) { - getFileIndex().set(0); - lastRoll = date; - } - sb.append(getPrefix()).append(date).append("-"); - sb.append(getFileIndex().incrementAndGet()); - if (getExtension().length() > 0) { - sb.append(".").append(getExtension()); - } - currentFile = new File(getBaseDirectory(), sb.toString()); + public RollTimePathManager(Context context) { + super(context); + } - return currentFile; + @Override + public File nextFile() { + StringBuilder sb = new StringBuilder(); + String date = formatter.print(LocalDateTime.now()); + if (!date.equals(lastRoll)) { + getFileIndex().set(0); + lastRoll = date; + } + sb.append(getPrefix()).append(date).append("-"); + sb.append(getFileIndex().incrementAndGet()); + if (getExtension().length() > 0) { + sb.append(".").append(getExtension()); } + currentFile = new File(getBaseDirectory(), sb.toString()); - public static class Builder implements PathManager.Builder { - @Override - public PathManager build(Context context) { - return new RollTimePathManager(context); - } + return currentFile; + } + + public static class Builder implements PathManager.Builder { + @Override + public PathManager build(Context context) { + return new RollTimePathManager(context); } + } }
