http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-node/src/main/java/org/apache/flume/node/AbstractConfigurationProvider.java ---------------------------------------------------------------------- diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/AbstractConfigurationProvider.java b/flume-ng-node/src/main/java/org/apache/flume/node/AbstractConfigurationProvider.java index 40abba2..130bc64 100644 --- a/flume-ng-node/src/main/java/org/apache/flume/node/AbstractConfigurationProvider.java +++ b/flume-ng-node/src/main/java/org/apache/flume/node/AbstractConfigurationProvider.java @@ -17,8 +17,15 @@ */ package org.apache.flume.node; -import java.util.*; +import java.util.ArrayList; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; import java.util.Map.Entry; +import java.util.Properties; +import java.util.Set; import org.apache.flume.Channel; import org.apache.flume.ChannelFactory; @@ -58,18 +65,15 @@ import com.google.common.collect.ListMultimap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -public abstract class AbstractConfigurationProvider implements - ConfigurationProvider { +public abstract class AbstractConfigurationProvider implements ConfigurationProvider { - private static final Logger LOGGER = LoggerFactory - .getLogger(AbstractConfigurationProvider.class); + private static final Logger LOGGER = LoggerFactory.getLogger(AbstractConfigurationProvider.class); private final String agentName; private final SourceFactory sourceFactory; private final SinkFactory sinkFactory; private final ChannelFactory channelFactory; - private final Map<Class<? extends Channel>, Map<String, Channel>> channelCache; public AbstractConfigurationProvider(String agentName) { @@ -96,18 +100,16 @@ public abstract class AbstractConfigurationProvider implements loadChannels(agentConf, channelComponentMap); loadSources(agentConf, channelComponentMap, sourceRunnerMap); loadSinks(agentConf, channelComponentMap, sinkRunnerMap); - Set<String> channelNames = - new HashSet<String>(channelComponentMap.keySet()); - for(String channelName : channelNames) { - ChannelComponent channelComponent = channelComponentMap. - get(channelName); - if(channelComponent.components.isEmpty()) { + Set<String> channelNames = new HashSet<String>(channelComponentMap.keySet()); + for (String channelName : channelNames) { + ChannelComponent channelComponent = channelComponentMap.get(channelName); + if (channelComponent.components.isEmpty()) { LOGGER.warn(String.format("Channel %s has no components connected" + " and has been removed.", channelName)); channelComponentMap.remove(channelName); - Map<String, Channel> nameChannelMap = channelCache. - get(channelComponent.channel.getClass()); - if(nameChannelMap != null) { + Map<String, Channel> nameChannelMap = + channelCache.get(channelComponent.channel.getClass()); + if (nameChannelMap != null) { nameChannelMap.remove(channelName); } } else { @@ -116,10 +118,10 @@ public abstract class AbstractConfigurationProvider implements conf.addChannel(channelName, channelComponent.channel); } } - for(Map.Entry<String, SourceRunner> entry : sourceRunnerMap.entrySet()) { + for (Map.Entry<String, SourceRunner> entry : sourceRunnerMap.entrySet()) { conf.addSourceRunner(entry.getKey(), entry.getValue()); } - for(Map.Entry<String, SinkRunner> entry : sinkRunnerMap.entrySet()) { + for (Map.Entry<String, SinkRunner> entry : sinkRunnerMap.entrySet()) { conf.addSinkRunner(entry.getKey(), entry.getValue()); } } catch (InstantiationException ex) { @@ -155,21 +157,21 @@ public abstract class AbstractConfigurationProvider implements ListMultimap<Class<? extends Channel>, String> channelsNotReused = ArrayListMultimap.create(); // assume all channels will not be re-used - for(Map.Entry<Class<? extends Channel>, Map<String, Channel>> entry : channelCache.entrySet()) { + for (Map.Entry<Class<? extends Channel>, Map<String, Channel>> entry : + channelCache.entrySet()) { Class<? extends Channel> channelKlass = entry.getKey(); Set<String> channelNames = entry.getValue().keySet(); channelsNotReused.get(channelKlass).addAll(channelNames); } Set<String> channelNames = agentConf.getChannelSet(); - Map<String, ComponentConfiguration> compMap = - agentConf.getChannelConfigMap(); + Map<String, ComponentConfiguration> compMap = agentConf.getChannelConfigMap(); /* * Components which have a ComponentConfiguration object */ for (String chName : channelNames) { ComponentConfiguration comp = compMap.get(chName); - if(comp != null) { + if (comp != null) { Channel channel = getOrCreateChannel(channelsNotReused, comp.getComponentName(), comp.getType()); try { @@ -190,17 +192,16 @@ public abstract class AbstractConfigurationProvider implements */ for (String chName : channelNames) { Context context = agentConf.getChannelContext().get(chName); - if(context != null){ - Channel channel = - getOrCreateChannel(channelsNotReused, chName, context.getString( - BasicConfigurationConstants.CONFIG_TYPE)); + if (context != null) { + Channel channel = getOrCreateChannel(channelsNotReused, chName, + context.getString(BasicConfigurationConstants.CONFIG_TYPE)); try { Configurables.configure(channel, context); channelComponentMap.put(chName, new ChannelComponent(channel)); LOGGER.info("Created channel " + chName); } catch (Exception e) { String msg = String.format("Channel %s has been removed due to an " + - "error during configuration", chName); + "error during configuration", chName); LOGGER.error(msg, e); } } @@ -212,7 +213,7 @@ public abstract class AbstractConfigurationProvider implements Map<String, Channel> channelMap = channelCache.get(channelKlass); if (channelMap != null) { for (String channelName : channelsNotReused.get(channelKlass)) { - if(channelMap.remove(channelName) != null) { + if (channelMap.remove(channelName) != null) { LOGGER.info("Removed {} of type {}", channelName, channelKlass); } } @@ -228,12 +229,11 @@ public abstract class AbstractConfigurationProvider implements String name, String type) throws FlumeException { - Class<? extends Channel> channelClass = channelFactory. - getClass(type); + Class<? extends Channel> channelClass = channelFactory.getClass(type); /* * Channel has requested a new instance on each re-configuration */ - if(channelClass.isAnnotationPresent(Disposable.class)) { + if (channelClass.isAnnotationPresent(Disposable.class)) { Channel channel = channelFactory.create(name, type); channel.setName(name); return channel; @@ -244,7 +244,7 @@ public abstract class AbstractConfigurationProvider implements channelCache.put(channelClass, channelMap); } Channel channel = channelMap.get(name); - if(channel == null) { + if (channel == null) { channel = channelFactory.create(name, type); channel.setName(name); channelMap.put(name, channel); @@ -266,7 +266,7 @@ public abstract class AbstractConfigurationProvider implements */ for (String sourceName : sourceNames) { ComponentConfiguration comp = compMap.get(sourceName); - if(comp != null) { + if (comp != null) { SourceConfiguration config = (SourceConfiguration) comp; Source source = sourceFactory.create(comp.getComponentName(), @@ -277,11 +277,11 @@ public abstract class AbstractConfigurationProvider implements List<Channel> sourceChannels = new ArrayList<Channel>(); for (String chName : channelNames) { ChannelComponent channelComponent = channelComponentMap.get(chName); - if(channelComponent != null) { + if (channelComponent != null) { sourceChannels.add(channelComponent.channel); } } - if(sourceChannels.isEmpty()) { + if (sourceChannels.isEmpty()) { String msg = String.format("Source %s is not connected to a " + "channel", sourceName); throw new IllegalStateException(msg); @@ -298,10 +298,10 @@ public abstract class AbstractConfigurationProvider implements source.setChannelProcessor(channelProcessor); sourceRunnerMap.put(comp.getComponentName(), SourceRunner.forSource(source)); - for(Channel channel : sourceChannels) { - ChannelComponent channelComponent = Preconditions. - checkNotNull(channelComponentMap.get(channel.getName()), - String.format("Channel %s", channel.getName())); + for (Channel channel : sourceChannels) { + ChannelComponent channelComponent = + Preconditions.checkNotNull(channelComponentMap.get(channel.getName()), + String.format("Channel %s", channel.getName())); channelComponent.components.add(sourceName); } } catch (Exception e) { @@ -318,10 +318,10 @@ public abstract class AbstractConfigurationProvider implements Map<String, Context> sourceContexts = agentConf.getSourceContext(); for (String sourceName : sourceNames) { Context context = sourceContexts.get(sourceName); - if(context != null){ + if (context != null) { Source source = sourceFactory.create(sourceName, - context.getString(BasicConfigurationConstants.CONFIG_TYPE)); + context.getString(BasicConfigurationConstants.CONFIG_TYPE)); try { Configurables.configure(source, context); List<Channel> sourceChannels = new ArrayList<Channel>(); @@ -329,11 +329,11 @@ public abstract class AbstractConfigurationProvider implements BasicConfigurationConstants.CONFIG_CHANNELS).split("\\s+"); for (String chName : channelNames) { ChannelComponent channelComponent = channelComponentMap.get(chName); - if(channelComponent != null) { + if (channelComponent != null) { sourceChannels.add(channelComponent.channel); } } - if(sourceChannels.isEmpty()) { + if (sourceChannels.isEmpty()) { String msg = String.format("Source %s is not connected to a " + "channel", sourceName); throw new IllegalStateException(msg); @@ -349,10 +349,10 @@ public abstract class AbstractConfigurationProvider implements source.setChannelProcessor(channelProcessor); sourceRunnerMap.put(sourceName, SourceRunner.forSource(source)); - for(Channel channel : sourceChannels) { - ChannelComponent channelComponent = Preconditions. - checkNotNull(channelComponentMap.get(channel.getName()), - String.format("Channel %s", channel.getName())); + for (Channel channel : sourceChannels) { + ChannelComponent channelComponent = + Preconditions.checkNotNull(channelComponentMap.get(channel.getName()), + String.format("Channel %s", channel.getName())); channelComponent.components.add(sourceName); } } catch (Exception e) { @@ -376,15 +376,13 @@ public abstract class AbstractConfigurationProvider implements */ for (String sinkName : sinkNames) { ComponentConfiguration comp = compMap.get(sinkName); - if(comp != null) { + if (comp != null) { SinkConfiguration config = (SinkConfiguration) comp; - Sink sink = sinkFactory.create(comp.getComponentName(), - comp.getType()); + Sink sink = sinkFactory.create(comp.getComponentName(), comp.getType()); try { Configurables.configure(sink, config); - ChannelComponent channelComponent = channelComponentMap. - get(config.getChannel()); - if(channelComponent == null) { + ChannelComponent channelComponent = channelComponentMap.get(config.getChannel()); + if (channelComponent == null) { String msg = String.format("Sink %s is not connected to a " + "channel", sinkName); throw new IllegalStateException(msg); @@ -406,14 +404,15 @@ public abstract class AbstractConfigurationProvider implements Map<String, Context> sinkContexts = agentConf.getSinkContext(); for (String sinkName : sinkNames) { Context context = sinkContexts.get(sinkName); - if(context != null) { + if (context != null) { Sink sink = sinkFactory.create(sinkName, context.getString( BasicConfigurationConstants.CONFIG_TYPE)); try { Configurables.configure(sink, context); - ChannelComponent channelComponent = channelComponentMap. - get(context.getString(BasicConfigurationConstants.CONFIG_CHANNEL)); - if(channelComponent == null) { + ChannelComponent channelComponent = + channelComponentMap.get( + context.getString(BasicConfigurationConstants.CONFIG_CHANNEL)); + if (channelComponent == null) { String msg = String.format("Sink %s is not connected to a " + "channel", sinkName); throw new IllegalStateException(msg); @@ -441,7 +440,7 @@ public abstract class AbstractConfigurationProvider implements Map<String, String> usedSinks = new HashMap<String, String>(); for (String groupName: sinkGroupNames) { ComponentConfiguration comp = compMap.get(groupName); - if(comp != null) { + if (comp != null) { SinkGroupConfiguration groupConf = (SinkGroupConfiguration) comp; List<Sink> groupSinks = new ArrayList<Sink>(); for (String sink : groupConf.getSinks()) { @@ -475,7 +474,7 @@ public abstract class AbstractConfigurationProvider implements } } // add any unassigned sinks to solo collectors - for(Entry<String, Sink> entry : sinks.entrySet()) { + for (Entry<String, Sink> entry : sinks.entrySet()) { if (!usedSinks.containsValue(entry.getKey())) { try { SinkProcessor pr = new DefaultSinkProcessor(); @@ -483,9 +482,8 @@ public abstract class AbstractConfigurationProvider implements sinkMap.add(entry.getValue()); pr.setSinks(sinkMap); Configurables.configure(pr, new Context()); - sinkRunnerMap.put(entry.getKey(), - new SinkRunner(pr)); - } catch(Exception e) { + sinkRunnerMap.put(entry.getKey(), new SinkRunner(pr)); + } catch (Exception e) { String msg = String.format("SinkGroup %s has been removed due to " + "an error during configuration", entry.getKey()); LOGGER.error(msg, e); @@ -496,6 +494,7 @@ public abstract class AbstractConfigurationProvider implements private static class ChannelComponent { final Channel channel; final List<String> components; + ChannelComponent(Channel channel) { this.channel = channel; components = Lists.newArrayList();
http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-node/src/main/java/org/apache/flume/node/Application.java ---------------------------------------------------------------------- diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/Application.java b/flume-ng-node/src/main/java/org/apache/flume/node/Application.java index 959fa77..d6d92f0 100644 --- a/flume-ng-node/src/main/java/org/apache/flume/node/Application.java +++ b/flume-ng-node/src/main/java/org/apache/flume/node/Application.java @@ -19,15 +19,10 @@ package org.apache.flume.node; -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Locale; -import java.util.Map.Entry; -import java.util.Properties; -import java.util.Set; - +import com.google.common.base.Throwables; +import com.google.common.collect.Lists; +import com.google.common.eventbus.EventBus; +import com.google.common.eventbus.Subscribe; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.GnuParser; @@ -49,10 +44,14 @@ import org.apache.flume.lifecycle.LifecycleSupervisor.SupervisorPolicy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Throwables; -import com.google.common.collect.Lists; -import com.google.common.eventbus.EventBus; -import com.google.common.eventbus.Subscribe; +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Locale; +import java.util.Map.Entry; +import java.util.Properties; +import java.util.Set; public class Application { @@ -77,7 +76,7 @@ public class Application { } public synchronized void start() { - for(LifecycleAware component : components) { + for (LifecycleAware component : components) { supervisor.supervise(component, new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START); } @@ -91,7 +90,7 @@ public class Application { public synchronized void stop() { supervisor.stop(); - if(monitorServer != null) { + if (monitorServer != null) { monitorServer.stop(); } } @@ -99,37 +98,37 @@ public class Application { private void stopAllComponents() { if (this.materializedConfiguration != null) { logger.info("Shutting down configuration: {}", this.materializedConfiguration); - for (Entry<String, SourceRunner> entry : this.materializedConfiguration - .getSourceRunners().entrySet()) { - try{ + for (Entry<String, SourceRunner> entry : + this.materializedConfiguration.getSourceRunners().entrySet()) { + try { logger.info("Stopping Source " + entry.getKey()); supervisor.unsupervise(entry.getValue()); - } catch (Exception e){ + } catch (Exception e) { logger.error("Error while stopping {}", entry.getValue(), e); } } for (Entry<String, SinkRunner> entry : - this.materializedConfiguration.getSinkRunners().entrySet()) { - try{ + this.materializedConfiguration.getSinkRunners().entrySet()) { + try { logger.info("Stopping Sink " + entry.getKey()); supervisor.unsupervise(entry.getValue()); - } catch (Exception e){ + } catch (Exception e) { logger.error("Error while stopping {}", entry.getValue(), e); } } for (Entry<String, Channel> entry : - this.materializedConfiguration.getChannels().entrySet()) { - try{ + this.materializedConfiguration.getChannels().entrySet()) { + try { logger.info("Stopping Channel " + entry.getKey()); supervisor.unsupervise(entry.getValue()); - } catch (Exception e){ + } catch (Exception e) { logger.error("Error while stopping {}", entry.getValue(), e); } } } - if(monitorServer != null) { + if (monitorServer != null) { monitorServer.stop(); } } @@ -140,12 +139,12 @@ public class Application { this.materializedConfiguration = materializedConfiguration; for (Entry<String, Channel> entry : - materializedConfiguration.getChannels().entrySet()) { - try{ + materializedConfiguration.getChannels().entrySet()) { + try { logger.info("Starting Channel " + entry.getKey()); supervisor.supervise(entry.getValue(), new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START); - } catch (Exception e){ + } catch (Exception e) { logger.error("Error while starting {}", entry.getValue(), e); } } @@ -153,9 +152,9 @@ public class Application { /* * Wait for all channels to start. */ - for(Channel ch: materializedConfiguration.getChannels().values()){ - while(ch.getLifecycleState() != LifecycleState.START - && !supervisor.isComponentInErrorState(ch)){ + for (Channel ch : materializedConfiguration.getChannels().values()) { + while (ch.getLifecycleState() != LifecycleState.START + && !supervisor.isComponentInErrorState(ch)) { try { logger.info("Waiting for channel: " + ch.getName() + " to start. Sleeping for 500 ms"); @@ -167,23 +166,22 @@ public class Application { } } - for (Entry<String, SinkRunner> entry : materializedConfiguration.getSinkRunners() - .entrySet()) { - try{ + for (Entry<String, SinkRunner> entry : materializedConfiguration.getSinkRunners().entrySet()) { + try { logger.info("Starting Sink " + entry.getKey()); supervisor.supervise(entry.getValue(), - new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START); + new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START); } catch (Exception e) { logger.error("Error while starting {}", entry.getValue(), e); } } - for (Entry<String, SourceRunner> entry : materializedConfiguration - .getSourceRunners().entrySet()) { - try{ + for (Entry<String, SourceRunner> entry : + materializedConfiguration.getSourceRunners().entrySet()) { + try { logger.info("Starting Source " + entry.getKey()); supervisor.supervise(entry.getValue(), - new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START); + new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START); } catch (Exception e) { logger.error("Error while starting {}", entry.getValue(), e); } @@ -203,7 +201,7 @@ public class Application { try { //Is it a known type? klass = MonitoringType.valueOf( - monitorType.toUpperCase(Locale.ENGLISH)).getMonitorClass(); + monitorType.toUpperCase(Locale.ENGLISH)).getMonitorClass(); } catch (Exception e) { //Not a known type, use FQCN klass = (Class<? extends MonitorService>) Class.forName(monitorType); @@ -213,7 +211,7 @@ public class Application { for (String key : keys) { if (key.startsWith(CONF_MONITOR_PREFIX)) { context.put(key.substring(CONF_MONITOR_PREFIX.length()), - systemProps.getProperty(key)); + systemProps.getProperty(key)); } } monitorServer.configure(context); @@ -221,7 +219,7 @@ public class Application { } } catch (Exception e) { logger.warn("Error starting monitoring. " - + "Monitoring might not be available.", e); + + "Monitoring might not be available.", e); } } @@ -285,18 +283,17 @@ public class Application { EventBus eventBus = new EventBus(agentName + "-event-bus"); List<LifecycleAware> components = Lists.newArrayList(); PollingZooKeeperConfigurationProvider zookeeperConfigurationProvider = - new PollingZooKeeperConfigurationProvider( - agentName, zkConnectionStr, baseZkPath, eventBus); + new PollingZooKeeperConfigurationProvider( + agentName, zkConnectionStr, baseZkPath, eventBus); components.add(zookeeperConfigurationProvider); application = new Application(components); eventBus.register(application); } else { StaticZooKeeperConfigurationProvider zookeeperConfigurationProvider = - new StaticZooKeeperConfigurationProvider( - agentName, zkConnectionStr, baseZkPath); + new StaticZooKeeperConfigurationProvider( + agentName, zkConnectionStr, baseZkPath); application = new Application(); - application.handleConfigurationEvent(zookeeperConfigurationProvider - .getConfiguration()); + application.handleConfigurationEvent(zookeeperConfigurationProvider.getConfiguration()); } } else { File configurationFile = new File(commandLine.getOptionValue('f')); @@ -308,16 +305,16 @@ public class Application { if (!configurationFile.exists()) { // If command line invocation, then need to fail fast if (System.getProperty(Constants.SYSPROP_CALLED_FROM_SERVICE) == - null) { + null) { String path = configurationFile.getPath(); try { path = configurationFile.getCanonicalPath(); } catch (IOException ex) { logger.error("Failed to read canonical path for file: " + path, - ex); + ex); } throw new ParseException( - "The specified configuration file does not exist: " + path); + "The specified configuration file does not exist: " + path); } } List<LifecycleAware> components = Lists.newArrayList(); @@ -325,18 +322,16 @@ public class Application { if (reload) { EventBus eventBus = new EventBus(agentName + "-event-bus"); PollingPropertiesFileConfigurationProvider configurationProvider = - new PollingPropertiesFileConfigurationProvider( - agentName, configurationFile, eventBus, 30); + new PollingPropertiesFileConfigurationProvider( + agentName, configurationFile, eventBus, 30); components.add(configurationProvider); application = new Application(components); eventBus.register(application); } else { PropertiesFileConfigurationProvider configurationProvider = - new PropertiesFileConfigurationProvider( - agentName, configurationFile); + new PropertiesFileConfigurationProvider(agentName, configurationFile); application = new Application(); - application.handleConfigurationEvent(configurationProvider - .getConfiguration()); + application.handleConfigurationEvent(configurationProvider.getConfiguration()); } } application.start(); @@ -350,8 +345,7 @@ public class Application { }); } catch (Exception e) { - logger.error("A fatal error occurred while running. Exception follows.", - e); + logger.error("A fatal error occurred while running. Exception follows.", e); } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-node/src/main/java/org/apache/flume/node/ConfigurationProvider.java ---------------------------------------------------------------------- diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/ConfigurationProvider.java b/flume-ng-node/src/main/java/org/apache/flume/node/ConfigurationProvider.java index 6a27898..9528cb7 100644 --- a/flume-ng-node/src/main/java/org/apache/flume/node/ConfigurationProvider.java +++ b/flume-ng-node/src/main/java/org/apache/flume/node/ConfigurationProvider.java @@ -19,11 +19,6 @@ package org.apache.flume.node; - public interface ConfigurationProvider { - - - public MaterializedConfiguration getConfiguration(); - - + MaterializedConfiguration getConfiguration(); } http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-node/src/main/java/org/apache/flume/node/PollingPropertiesFileConfigurationProvider.java ---------------------------------------------------------------------- diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/PollingPropertiesFileConfigurationProvider.java b/flume-ng-node/src/main/java/org/apache/flume/node/PollingPropertiesFileConfigurationProvider.java index 857c8a5..91a09f0 100644 --- a/flume-ng-node/src/main/java/org/apache/flume/node/PollingPropertiesFileConfigurationProvider.java +++ b/flume-ng-node/src/main/java/org/apache/flume/node/PollingPropertiesFileConfigurationProvider.java @@ -32,11 +32,12 @@ import com.google.common.base.Preconditions; import com.google.common.eventbus.EventBus; import com.google.common.util.concurrent.ThreadFactoryBuilder; -public class PollingPropertiesFileConfigurationProvider extends - PropertiesFileConfigurationProvider implements LifecycleAware { +public class PollingPropertiesFileConfigurationProvider + extends PropertiesFileConfigurationProvider + implements LifecycleAware { - private static final Logger LOGGER = LoggerFactory - .getLogger(PollingPropertiesFileConfigurationProvider.class); + private static final Logger LOGGER = + LoggerFactory.getLogger(PollingPropertiesFileConfigurationProvider.class); private final EventBus eventBus; private final File file; @@ -83,8 +84,8 @@ public class PollingPropertiesFileConfigurationProvider extends LOGGER.info("Configuration provider stopping"); executorService.shutdown(); - try{ - while(!executorService.awaitTermination(500, TimeUnit.MILLISECONDS)) { + try { + while (!executorService.awaitTermination(500, TimeUnit.MILLISECONDS)) { LOGGER.debug("Waiting for file watcher to terminate"); } } catch (InterruptedException e) { http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-node/src/main/java/org/apache/flume/node/SimpleMaterializedConfiguration.java ---------------------------------------------------------------------- diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/SimpleMaterializedConfiguration.java b/flume-ng-node/src/main/java/org/apache/flume/node/SimpleMaterializedConfiguration.java index 536dcc4..a652390 100644 --- a/flume-ng-node/src/main/java/org/apache/flume/node/SimpleMaterializedConfiguration.java +++ b/flume-ng-node/src/main/java/org/apache/flume/node/SimpleMaterializedConfiguration.java @@ -56,7 +56,7 @@ public class SimpleMaterializedConfiguration implements MaterializedConfiguratio } @Override - public void addChannel(String name, Channel channel){ + public void addChannel(String name, Channel channel) { channels.put(name, channel); } http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sdk/src/main/java/org/apache/flume/api/AbstractRpcClient.java ---------------------------------------------------------------------- diff --git a/flume-ng-sdk/src/main/java/org/apache/flume/api/AbstractRpcClient.java b/flume-ng-sdk/src/main/java/org/apache/flume/api/AbstractRpcClient.java index 5cc292a..f20462b 100644 --- a/flume-ng-sdk/src/main/java/org/apache/flume/api/AbstractRpcClient.java +++ b/flume-ng-sdk/src/main/java/org/apache/flume/api/AbstractRpcClient.java @@ -35,7 +35,7 @@ public abstract class AbstractRpcClient implements RpcClient { RpcClientConfigurationConstants.DEFAULT_REQUEST_TIMEOUT_MILLIS; @Override - public int getBatchSize(){ + public int getBatchSize() { return batchSize; } @Override http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sdk/src/main/java/org/apache/flume/api/FailoverRpcClient.java ---------------------------------------------------------------------- diff --git a/flume-ng-sdk/src/main/java/org/apache/flume/api/FailoverRpcClient.java b/flume-ng-sdk/src/main/java/org/apache/flume/api/FailoverRpcClient.java index db6905a..9d82acb 100644 --- a/flume-ng-sdk/src/main/java/org/apache/flume/api/FailoverRpcClient.java +++ b/flume-ng-sdk/src/main/java/org/apache/flume/api/FailoverRpcClient.java @@ -70,7 +70,7 @@ public class FailoverRpcClient extends AbstractRpcClient implements RpcClient { //since shared data structures are created here. private synchronized void configureHosts(Properties properties) throws FlumeException { - if(isActive){ + if (isActive) { logger.error("This client was already configured, " + "cannot reconfigure."); throw new FlumeException("This client was already configured, " + @@ -79,7 +79,7 @@ public class FailoverRpcClient extends AbstractRpcClient implements RpcClient { hosts = HostInfo.getHostInfoList(properties); String tries = properties.getProperty( RpcClientConfigurationConstants.CONFIG_MAX_ATTEMPTS); - if (tries == null || tries.isEmpty()){ + if (tries == null || tries.isEmpty()) { maxTries = hosts.size(); } else { try { @@ -269,7 +269,7 @@ public class FailoverRpcClient extends AbstractRpcClient implements RpcClient { continue; } } - for(int count = 0; count <= lastCheckedhost; count++) { + for (int count = 0; count <= lastCheckedhost; count++) { HostInfo hostInfo = hosts.get(count); try { setDefaultProperties(hostInfo, props); http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sdk/src/main/java/org/apache/flume/api/HostInfo.java ---------------------------------------------------------------------- diff --git a/flume-ng-sdk/src/main/java/org/apache/flume/api/HostInfo.java b/flume-ng-sdk/src/main/java/org/apache/flume/api/HostInfo.java index 8a81208..53d99a2 100644 --- a/flume-ng-sdk/src/main/java/org/apache/flume/api/HostInfo.java +++ b/flume-ng-sdk/src/main/java/org/apache/flume/api/HostInfo.java @@ -76,7 +76,7 @@ public class HostInfo { // Ignore that host if value is not there if (hostAndPortStr != null) { String[] hostAndPort = hostAndPortStr.split(":"); - if (hostAndPort.length != 2){ + if (hostAndPort.length != 2) { LOGGER.error("Invalid host address" + hostAndPortStr); throw new FlumeException("Invalid host address" + hostAndPortStr); } http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sdk/src/main/java/org/apache/flume/api/LoadBalancingRpcClient.java ---------------------------------------------------------------------- diff --git a/flume-ng-sdk/src/main/java/org/apache/flume/api/LoadBalancingRpcClient.java b/flume-ng-sdk/src/main/java/org/apache/flume/api/LoadBalancingRpcClient.java index e5fcc36..d3ccf74 100644 --- a/flume-ng-sdk/src/main/java/org/apache/flume/api/LoadBalancingRpcClient.java +++ b/flume-ng-sdk/src/main/java/org/apache/flume/api/LoadBalancingRpcClient.java @@ -161,7 +161,7 @@ public class LoadBalancingRpcClient extends AbstractRpcClient { RpcClientConfigurationConstants.CONFIG_MAX_BACKOFF); long maxBackoff = 0; - if(maxBackoffStr != null) { + if (maxBackoffStr != null) { maxBackoff = Long.parseLong(maxBackoffStr); } @@ -240,12 +240,13 @@ public class LoadBalancingRpcClient extends AbstractRpcClient { private OrderSelector<HostInfo> selector; - RoundRobinHostSelector(boolean backoff, long maxBackoff){ + RoundRobinHostSelector(boolean backoff, long maxBackoff) { selector = new RoundRobinOrderSelector<HostInfo>(backoff); - if(maxBackoff != 0){ + if (maxBackoff != 0) { selector.setMaxTimeOut(maxBackoff); } } + @Override public synchronized Iterator<HostInfo> createHostIterator() { return selector.createIterator(); @@ -256,7 +257,7 @@ public class LoadBalancingRpcClient extends AbstractRpcClient { selector.setObjects(hosts); } - public synchronized void informFailure(HostInfo failedHost){ + public synchronized void informFailure(HostInfo failedHost) { selector.informFailure(failedHost); } } http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java ---------------------------------------------------------------------- diff --git a/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java b/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java index 3661672..21a9553 100644 --- a/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java +++ b/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java @@ -79,8 +79,7 @@ import org.slf4j.LoggerFactory; * The connections are intended to be opened before clients are given access so * that the object cannot ever be in an inconsistent when exposed to users. */ -public class NettyAvroRpcClient extends AbstractRpcClient -implements RpcClient { +public class NettyAvroRpcClient extends AbstractRpcClient implements RpcClient { private ExecutorService callTimeoutPool; private final ReentrantLock stateLock = new ReentrantLock(); @@ -135,11 +134,11 @@ implements RpcClient { try { ExecutorService bossExecutor = - Executors.newCachedThreadPool(new TransceiverThreadFactory( - "Avro " + NettyTransceiver.class.getSimpleName() + " Boss")); + Executors.newCachedThreadPool(new TransceiverThreadFactory( + "Avro " + NettyTransceiver.class.getSimpleName() + " Boss")); ExecutorService workerExecutor = - Executors.newCachedThreadPool(new TransceiverThreadFactory( - "Avro " + NettyTransceiver.class.getSimpleName() + " I/O Worker")); + Executors.newCachedThreadPool(new TransceiverThreadFactory( + "Avro " + NettyTransceiver.class.getSimpleName() + " I/O Worker")); if (enableDeflateCompression || enableSsl) { if (maxIoWorkers >= 1) { @@ -468,7 +467,7 @@ implements RpcClient { } - /** + /** * <p> * Configure the actual client using the properties. * <tt>properties</tt> should have at least 2 params: @@ -479,13 +478,13 @@ implements RpcClient { * <tt>batch-size</tt> = <i>batchSize</i> * @param properties The properties to instantiate the client with. * @return - */ + */ @Override public synchronized void configure(Properties properties) throws FlumeException { stateLock.lock(); - try{ - if(connState == ConnState.READY || connState == ConnState.DEAD){ + try { + if (connState == ConnState.READY || connState == ConnState.DEAD) { throw new FlumeException("This client was already configured, " + "cannot reconfigure."); } @@ -529,12 +528,12 @@ implements RpcClient { } String host = properties.getProperty( - RpcClientConfigurationConstants.CONFIG_HOSTS_PREFIX+hosts[0]); + RpcClientConfigurationConstants.CONFIG_HOSTS_PREFIX + hosts[0]); if (host == null || host.isEmpty()) { throw new FlumeException("Host not found: " + hosts[0]); } String[] hostAndPort = host.split(":"); - if (hostAndPort.length != 2){ + if (hostAndPort.length != 2) { throw new FlumeException("Invalid hostname: " + hosts[0]); } Integer port = null; @@ -583,10 +582,12 @@ implements RpcClient { } } - String enableCompressionStr = properties.getProperty(RpcClientConfigurationConstants.CONFIG_COMPRESSION_TYPE); + String enableCompressionStr = + properties.getProperty(RpcClientConfigurationConstants.CONFIG_COMPRESSION_TYPE); if (enableCompressionStr != null && enableCompressionStr.equalsIgnoreCase("deflate")) { this.enableDeflateCompression = true; - String compressionLvlStr = properties.getProperty(RpcClientConfigurationConstants.CONFIG_COMPRESSION_LEVEL); + String compressionLvlStr = + properties.getProperty(RpcClientConfigurationConstants.CONFIG_COMPRESSION_LEVEL); compressionLevel = RpcClientConfigurationConstants.DEFAULT_COMPRESSION_LEVEL; if (compressionLvlStr != null) { try { @@ -608,7 +609,7 @@ implements RpcClient { truststoreType = properties.getProperty( RpcClientConfigurationConstants.CONFIG_TRUSTSTORE_TYPE, "JKS"); String excludeProtocolsStr = properties.getProperty( - RpcClientConfigurationConstants.CONFIG_EXCLUDE_PROTOCOLS); + RpcClientConfigurationConstants.CONFIG_EXCLUDE_PROTOCOLS); if (excludeProtocolsStr == null) { excludeProtocols.add("SSLv3"); } else { @@ -618,14 +619,13 @@ implements RpcClient { } } - String maxIoWorkersStr = properties.getProperty( - RpcClientConfigurationConstants.MAX_IO_WORKERS); + String maxIoWorkersStr = properties.getProperty(RpcClientConfigurationConstants.MAX_IO_WORKERS); if (!StringUtils.isEmpty(maxIoWorkersStr)) { try { maxIoWorkers = Integer.parseInt(maxIoWorkersStr); } catch (NumberFormatException ex) { - logger.warn ("Invalid maxIOWorkers:" + maxIoWorkersStr + " Using " + - "default maxIOWorkers."); + logger.warn("Invalid maxIOWorkers:" + maxIoWorkersStr + " Using " + + "default maxIOWorkers."); maxIoWorkers = -1; } } http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java ---------------------------------------------------------------------- diff --git a/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java b/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java index 343e07b..d83cf19 100644 --- a/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java +++ b/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java @@ -66,18 +66,18 @@ public final class RpcClientConfigurationConstants { /** * Default batch size. */ - public final static Integer DEFAULT_BATCH_SIZE = 100; + public static final Integer DEFAULT_BATCH_SIZE = 100; /** * Default connection, handshake, and initial request timeout in milliseconds. */ - public final static long DEFAULT_CONNECT_TIMEOUT_MILLIS = + public static final long DEFAULT_CONNECT_TIMEOUT_MILLIS = TimeUnit.MILLISECONDS.convert(20, TimeUnit.SECONDS); /** * Default request timeout in milliseconds. */ - public final static long DEFAULT_REQUEST_TIMEOUT_MILLIS = + public static final long DEFAULT_REQUEST_TIMEOUT_MILLIS = TimeUnit.MILLISECONDS.convert(20, TimeUnit.SECONDS); /** http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientFactory.java ---------------------------------------------------------------------- diff --git a/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientFactory.java b/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientFactory.java index 11bc94c..5cb3332 100644 --- a/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientFactory.java +++ b/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientFactory.java @@ -18,6 +18,8 @@ */ package org.apache.flume.api; +import org.apache.flume.FlumeException; + import java.io.File; import java.io.FileNotFoundException; import java.io.FileReader; @@ -25,7 +27,6 @@ import java.io.IOException; import java.io.Reader; import java.util.Locale; import java.util.Properties; -import org.apache.flume.FlumeException; /** * Factory class to construct Flume {@link RPCClient} implementations. @@ -63,12 +64,12 @@ public class RpcClientFactory { try { String clientClassType = type; ClientType clientType = null; - try{ + try { clientType = ClientType.valueOf(type.toUpperCase(Locale.ENGLISH)); - } catch (IllegalArgumentException e){ + } catch (IllegalArgumentException e) { clientType = ClientType.OTHER; } - if (!clientType.equals(ClientType.OTHER)){ + if (!clientType.equals(ClientType.OTHER)) { clientClassType = clientType.getClientClassName(); } clazz = @@ -181,8 +182,7 @@ public class RpcClientFactory { * @return an {@linkplain RpcClient} which uses thrift configured with the * given parameters. */ - public static RpcClient getThriftInstance(String hostname, Integer port, - Integer batchSize) { + public static RpcClient getThriftInstance(String hostname, Integer port, Integer batchSize) { if (hostname == null) { throw new NullPointerException("hostname must not be null"); } @@ -196,7 +196,7 @@ public class RpcClientFactory { Properties props = new Properties(); props.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS, "h1"); props.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS_PREFIX + "h1", - hostname + ":" + port.intValue()); + hostname + ":" + port.intValue()); props.setProperty(RpcClientConfigurationConstants.CONFIG_BATCH_SIZE, batchSize.toString()); ThriftRpcClient client = new ThriftRpcClient(); client.configure(props); @@ -227,7 +227,7 @@ public class RpcClientFactory { */ public static RpcClient getThriftInstance(Properties props) { props.setProperty(RpcClientConfigurationConstants.CONFIG_CLIENT_TYPE, - ClientType.THRIFT.clientClassName); + ClientType.THRIFT.clientClassName); return getInstance(props); } http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sdk/src/main/java/org/apache/flume/api/ThriftRpcClient.java ---------------------------------------------------------------------- diff --git a/flume-ng-sdk/src/main/java/org/apache/flume/api/ThriftRpcClient.java b/flume-ng-sdk/src/main/java/org/apache/flume/api/ThriftRpcClient.java index 857948f..1d21d5f 100644 --- a/flume-ng-sdk/src/main/java/org/apache/flume/api/ThriftRpcClient.java +++ b/flume-ng-sdk/src/main/java/org/apache/flume/api/ThriftRpcClient.java @@ -32,16 +32,15 @@ import org.apache.thrift.transport.TTransport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.net.ssl.KeyManagerFactory; -import javax.net.ssl.TrustManagerFactory; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLSocket; import javax.net.ssl.SSLSocketFactory; - +import javax.net.ssl.TrustManagerFactory; import java.io.FileInputStream; import java.nio.ByteBuffer; import java.security.KeyStore; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashSet; import java.util.Iterator; import java.util.LinkedList; @@ -50,7 +49,6 @@ import java.util.Properties; import java.util.Queue; import java.util.Random; import java.util.Set; -import java.util.Arrays; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -65,8 +63,7 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class ThriftRpcClient extends AbstractRpcClient { - private static final Logger LOGGER = - LoggerFactory.getLogger(ThriftRpcClient.class); + private static final Logger LOGGER = LoggerFactory.getLogger(ThriftRpcClient.class); /** * Config param for the thrift protocol to use. @@ -104,8 +101,7 @@ public class ThriftRpcClient extends AbstractRpcClient { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); - t.setName("Flume Thrift RPC thread - " + String.valueOf( - threadCounter.incrementAndGet())); + t.setName("Flume Thrift RPC thread - " + String.valueOf(threadCounter.incrementAndGet())); return t; } }); @@ -126,11 +122,11 @@ public class ThriftRpcClient extends AbstractRpcClient { try { if (!isActive()) { throw new EventDeliveryException("Client was closed due to error. " + - "Please create a new client"); + "Please create a new client"); } client = connectionManager.checkout(); final ThriftFlumeEvent thriftEvent = new ThriftFlumeEvent(event - .getHeaders(), ByteBuffer.wrap(event.getBody())); + .getHeaders(), ByteBuffer.wrap(event.getBody())); doAppend(client, thriftEvent).get(requestTimeout, TimeUnit.MILLISECONDS); } catch (Throwable e) { if (e instanceof ExecutionException) { @@ -169,22 +165,22 @@ public class ThriftRpcClient extends AbstractRpcClient { try { if (!isActive()) { throw new EventDeliveryException("Client was closed " + - "due to error or is not yet configured."); + "due to error or is not yet configured."); } client = connectionManager.checkout(); final List<ThriftFlumeEvent> thriftFlumeEvents = new ArrayList - <ThriftFlumeEvent>(); + <ThriftFlumeEvent>(); Iterator<Event> eventsIter = events.iterator(); while (eventsIter.hasNext()) { thriftFlumeEvents.clear(); for (int i = 0; i < batchSize && eventsIter.hasNext(); i++) { Event event = eventsIter.next(); thriftFlumeEvents.add(new ThriftFlumeEvent(event.getHeaders(), - ByteBuffer.wrap(event.getBody()))); + ByteBuffer.wrap(event.getBody()))); } if (!thriftFlumeEvents.isEmpty()) { doAppendBatch(client, thriftFlumeEvents).get(requestTimeout, - TimeUnit.MILLISECONDS); + TimeUnit.MILLISECONDS); } } } catch (Throwable e) { @@ -216,7 +212,7 @@ public class ThriftRpcClient extends AbstractRpcClient { } private Future<Void> doAppend(final ClientWrapper client, - final ThriftFlumeEvent e) throws Exception { + final ThriftFlumeEvent e) throws Exception { return callTimeoutPool.submit(new Callable<Void>() { @Override @@ -224,7 +220,7 @@ public class ThriftRpcClient extends AbstractRpcClient { Status status = client.client.append(e); if (status != Status.OK) { throw new EventDeliveryException("Failed to deliver events. Server " + - "returned status : " + status.name()); + "returned status : " + status.name()); } return null; } @@ -232,7 +228,7 @@ public class ThriftRpcClient extends AbstractRpcClient { } private Future<Void> doAppendBatch(final ClientWrapper client, - final List<ThriftFlumeEvent> e) throws Exception { + final List<ThriftFlumeEvent> e) throws Exception { return callTimeoutPool.submit(new Callable<Void>() { @Override @@ -240,7 +236,7 @@ public class ThriftRpcClient extends AbstractRpcClient { Status status = client.client.appendBatch(e); if (status != Status.OK) { throw new EventDeliveryException("Failed to deliver events. Server " + - "returned status : " + status.name()); + "returned status : " + status.name()); } return null; } @@ -265,11 +261,11 @@ public class ThriftRpcClient extends AbstractRpcClient { connState = State.DEAD; connectionManager.closeAll(); callTimeoutPool.shutdown(); - if(!callTimeoutPool.awaitTermination(5, TimeUnit.SECONDS)) { + if (!callTimeoutPool.awaitTermination(5, TimeUnit.SECONDS)) { callTimeoutPool.shutdownNow(); } } catch (Throwable ex) { - if(ex instanceof Error) { + if (ex instanceof Error) { throw (Error) ex; } else if (ex instanceof RuntimeException) { throw (RuntimeException) ex; @@ -284,7 +280,7 @@ public class ThriftRpcClient extends AbstractRpcClient { protected void configure(Properties properties) throws FlumeException { if (isActive()) { throw new FlumeException("Attempting to re-configured an already " + - "configured client!"); + "configured client!"); } stateLock.lock(); try { @@ -304,40 +300,40 @@ public class ThriftRpcClient extends AbstractRpcClient { protocol = COMPACT_PROTOCOL; } batchSize = Integer.parseInt(properties.getProperty( - RpcClientConfigurationConstants.CONFIG_BATCH_SIZE, - RpcClientConfigurationConstants.DEFAULT_BATCH_SIZE.toString())); + RpcClientConfigurationConstants.CONFIG_BATCH_SIZE, + RpcClientConfigurationConstants.DEFAULT_BATCH_SIZE.toString())); requestTimeout = Long.parseLong(properties.getProperty( - RpcClientConfigurationConstants.CONFIG_REQUEST_TIMEOUT, - String.valueOf( - RpcClientConfigurationConstants.DEFAULT_REQUEST_TIMEOUT_MILLIS))); + RpcClientConfigurationConstants.CONFIG_REQUEST_TIMEOUT, + String.valueOf( + RpcClientConfigurationConstants.DEFAULT_REQUEST_TIMEOUT_MILLIS))); if (requestTimeout < 1000) { LOGGER.warn("Request timeout specified less than 1s. " + - "Using default value instead."); + "Using default value instead."); requestTimeout = - RpcClientConfigurationConstants.DEFAULT_REQUEST_TIMEOUT_MILLIS; + RpcClientConfigurationConstants.DEFAULT_REQUEST_TIMEOUT_MILLIS; } int connectionPoolSize = Integer.parseInt(properties.getProperty( - RpcClientConfigurationConstants.CONFIG_CONNECTION_POOL_SIZE, - String.valueOf(RpcClientConfigurationConstants - .DEFAULT_CONNECTION_POOL_SIZE))); - if(connectionPoolSize < 1) { + RpcClientConfigurationConstants.CONFIG_CONNECTION_POOL_SIZE, + String.valueOf(RpcClientConfigurationConstants + .DEFAULT_CONNECTION_POOL_SIZE))); + if (connectionPoolSize < 1) { LOGGER.warn("Connection Pool Size specified is less than 1. " + - "Using default value instead."); + "Using default value instead."); connectionPoolSize = RpcClientConfigurationConstants - .DEFAULT_CONNECTION_POOL_SIZE; + .DEFAULT_CONNECTION_POOL_SIZE; } enableSsl = Boolean.parseBoolean(properties.getProperty( - RpcClientConfigurationConstants.CONFIG_SSL)); - if(enableSsl) { + RpcClientConfigurationConstants.CONFIG_SSL)); + if (enableSsl) { truststore = properties.getProperty( - RpcClientConfigurationConstants.CONFIG_TRUSTSTORE); + RpcClientConfigurationConstants.CONFIG_TRUSTSTORE); truststorePassword = properties.getProperty( - RpcClientConfigurationConstants.CONFIG_TRUSTSTORE_PASSWORD); + RpcClientConfigurationConstants.CONFIG_TRUSTSTORE_PASSWORD); truststoreType = properties.getProperty( - RpcClientConfigurationConstants.CONFIG_TRUSTSTORE_TYPE, "JKS"); + RpcClientConfigurationConstants.CONFIG_TRUSTSTORE_TYPE, "JKS"); String excludeProtocolsStr = properties.getProperty( - RpcClientConfigurationConstants.CONFIG_EXCLUDE_PROTOCOLS); + RpcClientConfigurationConstants.CONFIG_EXCLUDE_PROTOCOLS); if (excludeProtocolsStr == null) { excludeProtocols.add("SSLv3"); } else { @@ -353,7 +349,7 @@ public class ThriftRpcClient extends AbstractRpcClient { } catch (Throwable ex) { //Failed to configure, kill the client. connState = State.DEAD; - if(ex instanceof Error) { + if (ex instanceof Error) { throw (Error) ex; } else if (ex instanceof RuntimeException) { throw (RuntimeException) ex; @@ -381,40 +377,37 @@ public class ThriftRpcClient extends AbstractRpcClient { public final TTransport transport; private final int hashCode; - public ClientWrapper() throws Exception{ + public ClientWrapper() throws Exception { TSocket tsocket; - if(enableSsl) { + if (enableSsl) { // JDK6's factory doesn't appear to pass the protocol onto the Socket // properly so we have to do some magic to make sure that happens. // Not an issue in JDK7 Lifted from thrift-0.9.1 to make the SSLContext SSLContext sslContext = createSSLContext(truststore, truststorePassword, - truststoreType); + truststoreType); // Create the factory from it SSLSocketFactory sslSockFactory = sslContext.getSocketFactory(); // Create the TSocket from that tsocket = createSSLSocket( - sslSockFactory, hostname, port, 120000, excludeProtocols); + sslSockFactory, hostname, port, 120000, excludeProtocols); } else { tsocket = new TSocket(hostname, port); } - - transport = getTransport(tsocket); + transport = getTransport(tsocket); // The transport is already open for SSL as part of TSSLTransportFactory.getClientSocket - if(!transport.isOpen()) { + if (!transport.isOpen()) { transport.open(); } if (protocol.equals(BINARY_PROTOCOL)) { LOGGER.info("Using TBinaryProtocol"); - client = new ThriftSourceProtocol.Client(new TBinaryProtocol - (transport)); + client = new ThriftSourceProtocol.Client(new TBinaryProtocol(transport)); } else { LOGGER.info("Using TCompactProtocol"); - client = new ThriftSourceProtocol.Client(new TCompactProtocol - (transport)); + client = new ThriftSourceProtocol.Client(new TCompactProtocol(transport)); } // Not a great hash code, but since this class is immutable and there // is at most one instance of the components of this class, @@ -423,12 +416,12 @@ public class ThriftRpcClient extends AbstractRpcClient { } public boolean equals(Object o) { - if(o == null) { + if (o == null) { return false; } // Since there is only one wrapper with any given client, // direct comparison is good enough. - if(this == o) { + if (this == o) { return true; } return false; @@ -507,10 +500,8 @@ public class ThriftRpcClient extends AbstractRpcClient { c.transport.close(); currentPoolSize--; } - /* - * Be cruel and close even the checked out clients. The threads writing - * using these will now get an exception. - */ + // Be cruel and close even the checked out clients. The threads writing + // using these will now get an exception. for (ClientWrapper c : checkedOutClients) { c.transport.close(); currentPoolSize--; @@ -522,12 +513,14 @@ public class ThriftRpcClient extends AbstractRpcClient { } /** - * Lifted from ACCUMULO-3318 - Lifted from TSSLTransportFactory in Thrift-0.9.1. The method to create a client socket with an SSLContextFactory object is not visibile to us. Have to use - * SslConnectionParams instead of TSSLTransportParameters because no getters exist on TSSLTransportParameters. - * + * Lifted from ACCUMULO-3318 - Lifted from TSSLTransportFactory in Thrift-0.9.1. + * The method to create a client socket with an SSLContextFactory object is not visible to us. + * Have to use * SslConnectionParams instead of TSSLTransportParameters because no getters exist + * on TSSLTransportParameters. */ private static SSLContext createSSLContext(String truststore, - String truststorePassword, String truststoreType) throws FlumeException { + String truststorePassword, + String truststoreType) throws FlumeException { SSLContext ctx; try { ctx = SSLContext.getInstance("TLS"); @@ -550,7 +543,8 @@ public class ThriftRpcClient extends AbstractRpcClient { } private static TSocket createSSLSocket(SSLSocketFactory factory, String host, - int port, int timeout, List<String> excludeProtocols) throws FlumeException { + int port, int timeout, List<String> excludeProtocols) + throws FlumeException { try { SSLSocket socket = (SSLSocket) factory.createSocket(host, port); socket.setSoTimeout(timeout); http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sdk/src/main/java/org/apache/flume/event/EventBuilder.java ---------------------------------------------------------------------- diff --git a/flume-ng-sdk/src/main/java/org/apache/flume/event/EventBuilder.java b/flume-ng-sdk/src/main/java/org/apache/flume/event/EventBuilder.java index a5e01fc..c19925a 100644 --- a/flume-ng-sdk/src/main/java/org/apache/flume/event/EventBuilder.java +++ b/flume-ng-sdk/src/main/java/org/apache/flume/event/EventBuilder.java @@ -37,7 +37,7 @@ public class EventBuilder { public static Event withBody(byte[] body, Map<String, String> headers) { Event event = new SimpleEvent(); - if(body == null) { + if (body == null) { body = new byte[0]; } event.setBody(body); http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sdk/src/main/java/org/apache/flume/event/JSONEvent.java ---------------------------------------------------------------------- diff --git a/flume-ng-sdk/src/main/java/org/apache/flume/event/JSONEvent.java b/flume-ng-sdk/src/main/java/org/apache/flume/event/JSONEvent.java index 0ec1678..9ee90ae 100644 --- a/flume-ng-sdk/src/main/java/org/apache/flume/event/JSONEvent.java +++ b/flume-ng-sdk/src/main/java/org/apache/flume/event/JSONEvent.java @@ -26,7 +26,7 @@ import org.apache.flume.FlumeException; /** * */ -public class JSONEvent implements Event{ +public class JSONEvent implements Event { private Map<String, String> headers; private String body; private transient String charset = "UTF-8"; @@ -43,7 +43,7 @@ public class JSONEvent implements Event{ @Override public byte[] getBody() { - if(body != null) { + if (body != null) { try { return body.getBytes(charset); } catch (UnsupportedEncodingException ex) { @@ -57,7 +57,7 @@ public class JSONEvent implements Event{ @Override public void setBody(byte[] body) { - if(body != null) { + if (body != null) { this.body = new String(body); } else { this.body = ""; http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sdk/src/main/java/org/apache/flume/event/SimpleEvent.java ---------------------------------------------------------------------- diff --git a/flume-ng-sdk/src/main/java/org/apache/flume/event/SimpleEvent.java b/flume-ng-sdk/src/main/java/org/apache/flume/event/SimpleEvent.java index a7ac36f..61f848d 100644 --- a/flume-ng-sdk/src/main/java/org/apache/flume/event/SimpleEvent.java +++ b/flume-ng-sdk/src/main/java/org/apache/flume/event/SimpleEvent.java @@ -51,7 +51,7 @@ public class SimpleEvent implements Event { @Override public void setBody(byte[] body) { - if(body == null){ + if (body == null) { body = new byte[0]; } this.body = body; http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sdk/src/main/java/org/apache/flume/util/OrderSelector.java ---------------------------------------------------------------------- diff --git a/flume-ng-sdk/src/main/java/org/apache/flume/util/OrderSelector.java b/flume-ng-sdk/src/main/java/org/apache/flume/util/OrderSelector.java index fd9e81f..806a553 100644 --- a/flume-ng-sdk/src/main/java/org/apache/flume/util/OrderSelector.java +++ b/flume-ng-sdk/src/main/java/org/apache/flume/util/OrderSelector.java @@ -40,9 +40,8 @@ import java.util.concurrent.TimeUnit; public abstract class OrderSelector<T> { private static final int EXP_BACKOFF_COUNTER_LIMIT = 16; - private static final long CONSIDER_SEQUENTIAL_RANGE = TimeUnit.HOURS - .toMillis(1); - private static final long MAX_TIMEOUT = 30000l; + private static final long CONSIDER_SEQUENTIAL_RANGE = TimeUnit.HOURS.toMillis(1); + private static final long MAX_TIMEOUT = 30000L; private final Map<T, FailureState> stateMap = new LinkedHashMap<T, FailureState>(); private long maxTimeout = MAX_TIMEOUT; http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java index a9f42b8..e3b57c3 100644 --- a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java +++ b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java @@ -104,12 +104,12 @@ public class DatasetSink extends AbstractSink implements Configurable { /** * The last time the writer rolled. */ - private long lastRolledMillis = 0l; + private long lastRolledMillis = 0L; /** * The raw number of bytes parsed. */ - private long bytesParsed = 0l; + private long bytesParsed = 0L; /** * A class for parsing Kite entities from Flume Events. @@ -225,7 +225,7 @@ public class DatasetSink extends AbstractSink implements Configurable { */ @VisibleForTesting void roll() { - this.lastRolledMillis = 0l; + this.lastRolledMillis = 0L; } @VisibleForTesting @@ -434,7 +434,7 @@ public class DatasetSink extends AbstractSink implements Configurable { // Reset the last rolled time and the metrics this.lastRolledMillis = System.currentTimeMillis(); - this.bytesParsed = 0l; + this.bytesParsed = 0L; } catch (DatasetNotFoundException ex) { throw new EventDeliveryException("Dataset " + datasetUri + " not found." + " The dataset must be created before Flume can write to it.", ex); @@ -558,7 +558,7 @@ public class DatasetSink extends AbstractSink implements Configurable { try { // If the transaction wasn't committed before we got the exception, we // need to rollback. - transaction.rollback(); + transaction.rollback(); } catch (RuntimeException ex) { LOG.error("Transaction rollback failed: " + ex.getLocalizedMessage()); LOG.debug("Exception follows.", ex); @@ -567,7 +567,7 @@ public class DatasetSink extends AbstractSink implements Configurable { this.transaction = null; } } -} + } /** * Get the name of the dataset from the URI http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/NonRecoverableEventException.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/NonRecoverableEventException.java b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/NonRecoverableEventException.java index 8f6c0ae..4373429 100644 --- a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/NonRecoverableEventException.java +++ b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/NonRecoverableEventException.java @@ -50,5 +50,4 @@ public class NonRecoverableEventException extends Exception { super(t); } - } http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/parser/EntityParserFactory.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/parser/EntityParserFactory.java b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/parser/EntityParserFactory.java index cfb7349..3720ff3 100644 --- a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/parser/EntityParserFactory.java +++ b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/parser/EntityParserFactory.java @@ -25,7 +25,6 @@ import org.apache.flume.Context; import static org.apache.flume.sink.kite.DatasetSinkConstants.*; - public class EntityParserFactory { public EntityParser<GenericRecord> newParser(Schema datasetSchema, Context config) { http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/policy/FailurePolicyFactory.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/policy/FailurePolicyFactory.java b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/policy/FailurePolicyFactory.java index a8b2008..d3b1fe8 100644 --- a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/policy/FailurePolicyFactory.java +++ b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/policy/FailurePolicyFactory.java @@ -23,7 +23,6 @@ import org.apache.flume.Context; import static org.apache.flume.sink.kite.DatasetSinkConstants.*; - public class FailurePolicyFactory { public FailurePolicy newPolicy(Context config) { http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/AbstractHDFSWriter.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/AbstractHDFSWriter.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/AbstractHDFSWriter.java index e367e12..2fe309f 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/AbstractHDFSWriter.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/AbstractHDFSWriter.java @@ -50,7 +50,7 @@ public abstract class AbstractHDFSWriter implements HDFSWriter { private Integer numberOfCloseRetries = null; private long timeBetweenCloseRetries = Long.MAX_VALUE; - final static Object [] NO_ARGS = new Object []{}; + static final Object[] NO_ARGS = new Object[]{}; @Override public void configure(Context context) { @@ -63,11 +63,12 @@ public abstract class AbstractHDFSWriter implements HDFSWriter { if (numberOfCloseRetries > 1) { try { - timeBetweenCloseRetries = context.getLong("hdfs.callTimeout", 10000l); + timeBetweenCloseRetries = context.getLong("hdfs.callTimeout", 10000L); } catch (NumberFormatException e) { - logger.warn("hdfs.callTimeout can not be parsed to a long: " + context.getLong("hdfs.callTimeout")); + logger.warn("hdfs.callTimeout can not be parsed to a long: " + + context.getLong("hdfs.callTimeout")); } - timeBetweenCloseRetries = Math.max(timeBetweenCloseRetries/numberOfCloseRetries, 1000); + timeBetweenCloseRetries = Math.max(timeBetweenCloseRetries / numberOfCloseRetries, 1000); } } @@ -232,7 +233,7 @@ public abstract class AbstractHDFSWriter implements HDFSWriter { private Method reflectHflushOrSync(FSDataOutputStream os) { Method m = null; - if(os != null) { + if (os != null) { Class<?> fsDataOutputStreamClass = os.getClass(); try { m = fsDataOutputStreamClass.getMethod("hflush"); @@ -242,7 +243,7 @@ public abstract class AbstractHDFSWriter implements HDFSWriter { m = fsDataOutputStreamClass.getMethod("sync"); } catch (Exception ex1) { String msg = "Neither hflush not sync were found. That seems to be " + - "a problem!"; + "a problem!"; logger.error(msg); throw new FlumeException(msg, ex1); } @@ -266,7 +267,7 @@ public abstract class AbstractHDFSWriter implements HDFSWriter { String msg = "Error while trying to hflushOrSync!"; logger.error(msg); Throwable cause = e.getCause(); - if(cause != null && cause instanceof IOException) { + if (cause != null && cause instanceof IOException) { throw (IOException)cause; } throw new FlumeException(msg, e); http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketClosedException.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketClosedException.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketClosedException.java index 1aca58f..1d8a9e4 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketClosedException.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketClosedException.java @@ -20,7 +20,7 @@ package org.apache.flume.sink.hdfs; import org.apache.flume.FlumeException; -public class BucketClosedException extends FlumeException{ +public class BucketClosedException extends FlumeException { private static final long serialVersionUID = -4216667125119540357L; http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java index 6b97de6..b096410 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java @@ -18,22 +18,8 @@ package org.apache.flume.sink.hdfs; -import java.io.IOException; -import java.lang.reflect.Method; -import java.security.PrivilegedExceptionAction; -import java.util.concurrent.Callable; -import java.util.concurrent.CancellationException; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; - import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Throwables; import org.apache.flume.Clock; import org.apache.flume.Context; import org.apache.flume.Event; @@ -49,7 +35,20 @@ import org.apache.hadoop.io.compress.CompressionCodec; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Throwables; +import java.io.IOException; +import java.lang.reflect.Method; +import java.security.PrivilegedExceptionAction; +import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; /** * Internal API intended for HDFSSink use. @@ -117,14 +116,14 @@ class BucketWriter { AtomicInteger renameTries = new AtomicInteger(0); BucketWriter(long rollInterval, long rollSize, long rollCount, long batchSize, - Context context, String filePath, String fileName, String inUsePrefix, - String inUseSuffix, String fileSuffix, CompressionCodec codeC, - CompressionType compType, HDFSWriter writer, - ScheduledExecutorService timedRollerPool, PrivilegedExecutor proxyUser, - SinkCounter sinkCounter, int idleTimeout, WriterCallback onCloseCallback, - String onCloseCallbackPath, long callTimeout, - ExecutorService callTimeoutPool, long retryInterval, - int maxCloseTries) { + Context context, String filePath, String fileName, String inUsePrefix, + String inUseSuffix, String fileSuffix, CompressionCodec codeC, + CompressionType compType, HDFSWriter writer, + ScheduledExecutorService timedRollerPool, PrivilegedExecutor proxyUser, + SinkCounter sinkCounter, int idleTimeout, WriterCallback onCloseCallback, + String onCloseCallbackPath, long callTimeout, + ExecutorService callTimeoutPool, long retryInterval, + int maxCloseTries) { this.rollInterval = rollInterval; this.rollSize = rollSize; this.rollCount = rollCount; @@ -181,17 +180,15 @@ class BucketWriter { Path.class); } catch (Exception e) { LOG.warn("isFileClosed is not available in the " + - "version of HDFS being used. Flume will not " + - "attempt to close files if the close fails on " + - "the first attempt",e); + "version of HDFS being used. Flume will not " + + "attempt to close files if the close fails on " + + "the first attempt",e); return null; } } - private Boolean isFileClosed(FileSystem fs, - Path tmpFilePath) throws Exception { - return (Boolean)(isClosedMethod.invoke(fs, - tmpFilePath)); + private Boolean isFileClosed(FileSystem fs, Path tmpFilePath) throws Exception { + return (Boolean)(isClosedMethod.invoke(fs, tmpFilePath)); } /** @@ -239,17 +236,15 @@ class BucketWriter { // Need to get reference to FS using above config before underlying // writer does in order to avoid shutdown hook & // IllegalStateExceptions - if(!mockFsInjected) { - fileSystem = new Path(bucketPath).getFileSystem( - config); + if (!mockFsInjected) { + fileSystem = new Path(bucketPath).getFileSystem(config); } writer.open(bucketPath); } else { // need to get reference to FS before writer does to // avoid shutdown hook - if(!mockFsInjected) { - fileSystem = new Path(bucketPath).getFileSystem( - config); + if (!mockFsInjected) { + fileSystem = new Path(bucketPath).getFileSystem(config); } writer.open(bucketPath, codeC, compType); } @@ -278,7 +273,7 @@ class BucketWriter { try { // Roll the file and remove reference from sfWriters map. close(true); - } catch(Throwable t) { + } catch (Throwable t) { LOG.error("Unexpected error", t); } return null; @@ -327,7 +322,7 @@ class BucketWriter { public Void call() throws Exception { if (renameTries >= maxRenameTries) { LOG.warn("Unsuccessfully attempted to rename " + path + " " + - maxRenameTries + " times. File may still be open."); + maxRenameTries + " times. File may still be open."); return null; } renameTries++; @@ -335,16 +330,15 @@ class BucketWriter { renameBucket(path, finalPath, fs); } catch (Exception e) { LOG.warn("Renaming file: " + path + " failed. Will " + - "retry again in " + retryInterval + " seconds.", e); - timedRollerPool.schedule(this, retryInterval, - TimeUnit.SECONDS); + "retry again in " + retryInterval + " seconds.", e); + timedRollerPool.schedule(this, retryInterval, TimeUnit.SECONDS); return null; } return null; } }; - } + /** * Close the file handle and rename the temp file to the permanent filename. * Safe to call multiple times. Logs HDFSWriter.close() exceptions. @@ -352,7 +346,7 @@ class BucketWriter { * @throws InterruptedException */ public synchronized void close(boolean callCloseCallback) - throws IOException, InterruptedException { + throws IOException, InterruptedException { checkAndThrowInterruptedException(); try { flush(); @@ -367,9 +361,8 @@ class BucketWriter { callWithTimeout(closeCallRunner); sinkCounter.incrementConnectionClosedCount(); } catch (IOException e) { - LOG.warn( - "failed to close() HDFSWriter for file (" + bucketPath + - "). Exception follows.", e); + LOG.warn("failed to close() HDFSWriter for file (" + bucketPath + + "). Exception follows.", e); sinkCounter.incrementConnectionFailedCount(); failedToClose = true; } @@ -393,15 +386,12 @@ class BucketWriter { // could block or throw IOException try { renameBucket(bucketPath, targetPath, fileSystem); - } catch(Exception e) { - LOG.warn( - "failed to rename() file (" + bucketPath + - "). Exception follows.", e); + } catch (Exception e) { + LOG.warn("failed to rename() file (" + bucketPath + + "). Exception follows.", e); sinkCounter.incrementConnectionFailedCount(); - final Callable<Void> scheduledRename = - createScheduledRenameCallable(); - timedRollerPool.schedule(scheduledRename, retryInterval, - TimeUnit.SECONDS); + final Callable<Void> scheduledRename = createScheduledRenameCallable(); + timedRollerPool.schedule(scheduledRename, retryInterval, TimeUnit.SECONDS); } } if (callCloseCallback) { @@ -420,14 +410,14 @@ class BucketWriter { if (!isBatchComplete()) { doFlush(); - if(idleTimeout > 0) { + if (idleTimeout > 0) { // if the future exists and couldn't be cancelled, that would mean it has already run // or been cancelled - if(idleFuture == null || idleFuture.cancel(false)) { + if (idleFuture == null || idleFuture.cancel(false)) { Callable<Void> idleAction = new Callable<Void>() { public Void call() throws Exception { LOG.info("Closing idle bucketWriter {} at {}", bucketPath, - System.currentTimeMillis()); + System.currentTimeMillis()); if (isOpen) { close(true); } @@ -443,10 +433,10 @@ class BucketWriter { private void runCloseAction() { try { - if(onCloseCallback != null) { + if (onCloseCallback != null) { onCloseCallback.run(onCloseCallbackPath); } - } catch(Throwable t) { + } catch (Throwable t) { LOG.error("Unexpected error", t); } } @@ -483,19 +473,19 @@ class BucketWriter { checkAndThrowInterruptedException(); // If idleFuture is not null, cancel it before we move forward to avoid a // close call in the middle of the append. - if(idleFuture != null) { + if (idleFuture != null) { idleFuture.cancel(false); // There is still a small race condition - if the idleFuture is already // running, interrupting it can cause HDFS close operation to throw - // so we cannot interrupt it while running. If the future could not be // cancelled, it is already running - wait for it to finish before // attempting to write. - if(!idleFuture.isDone()) { + if (!idleFuture.isDone()) { try { idleFuture.get(callTimeout, TimeUnit.MILLISECONDS); } catch (TimeoutException ex) { LOG.warn("Timeout while trying to cancel closing of idle file. Idle" + - " file close may have failed", ex); + " file close may have failed", ex); } catch (Exception ex) { LOG.warn("Error while trying to cancel closing of idle file. ", ex); } @@ -612,10 +602,9 @@ class BucketWriter { // this method can get called from the scheduled thread so the // file gets closed later - so an implicit reference to this // bucket writer would still be alive in the Callable instance. - private void renameBucket(String bucketPath, - String targetPath, final FileSystem fs) throws IOException, - InterruptedException { - if(bucketPath.equals(targetPath)) { + private void renameBucket(String bucketPath, String targetPath, final FileSystem fs) + throws IOException, InterruptedException { + if (bucketPath.equals(targetPath)) { return; } @@ -646,7 +635,7 @@ class BucketWriter { } void setClock(Clock clock) { - this.clock = clock; + this.clock = clock; } /** @@ -669,7 +658,7 @@ class BucketWriter { * cancel the callable and throw an IOException */ private <T> T callWithTimeout(final CallRunner<T> callRunner) - throws IOException, InterruptedException { + throws IOException, InterruptedException { Future<T> future = callTimeoutPool.submit(new Callable<T>() { @Override public T call() throws Exception { http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java index f128795..80b7cb4 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java @@ -78,8 +78,8 @@ public class HDFSCompressedDataStream extends AbstractHDFSWriter { Configuration conf = new Configuration(); Path dstPath = new Path(filePath); FileSystem hdfs = dstPath.getFileSystem(conf); - if(useRawLocalFileSystem) { - if(hdfs instanceof LocalFileSystem) { + if (useRawLocalFileSystem) { + if (hdfs instanceof LocalFileSystem) { hdfs = ((LocalFileSystem)hdfs).getRaw(); } else { logger.warn("useRawLocalFileSystem is set to true but file system " + @@ -87,14 +87,13 @@ public class HDFSCompressedDataStream extends AbstractHDFSWriter { } } boolean appending = false; - if (conf.getBoolean("hdfs.append.support", false) == true && hdfs.isFile - (dstPath)) { + if (conf.getBoolean("hdfs.append.support", false) == true && hdfs.isFile(dstPath)) { fsOut = hdfs.append(dstPath); appending = true; } else { fsOut = hdfs.create(dstPath); } - if(compressor == null) { + if (compressor == null) { compressor = CodecPool.getCompressor(codec, conf); } cmpOut = codec.createOutputStream(fsOut, compressor); http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java index 7054bfc..c4ad919 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java @@ -37,8 +37,7 @@ import org.slf4j.LoggerFactory; public class HDFSDataStream extends AbstractHDFSWriter { - private static final Logger logger = - LoggerFactory.getLogger(HDFSDataStream.class); + private static final Logger logger = LoggerFactory.getLogger(HDFSDataStream.class); private FSDataOutputStream outStream; private String serializerType; @@ -60,16 +59,13 @@ public class HDFSDataStream extends AbstractHDFSWriter { } @VisibleForTesting - protected FileSystem getDfs(Configuration conf, - Path dstPath) throws IOException{ - return dstPath.getFileSystem(conf); + protected FileSystem getDfs(Configuration conf, Path dstPath) throws IOException { + return dstPath.getFileSystem(conf); } - protected void doOpen(Configuration conf, - Path dstPath, FileSystem hdfs) throws - IOException { - if(useRawLocalFileSystem) { - if(hdfs instanceof LocalFileSystem) { + protected void doOpen(Configuration conf, Path dstPath, FileSystem hdfs) throws IOException { + if (useRawLocalFileSystem) { + if (hdfs instanceof LocalFileSystem) { hdfs = ((LocalFileSystem)hdfs).getRaw(); } else { logger.warn("useRawLocalFileSystem is set to true but file system " + @@ -78,8 +74,7 @@ public class HDFSDataStream extends AbstractHDFSWriter { } boolean appending = false; - if (conf.getBoolean("hdfs.append.support", false) == true && hdfs.isFile - (dstPath)) { + if (conf.getBoolean("hdfs.append.support", false) == true && hdfs.isFile(dstPath)) { outStream = hdfs.append(dstPath); appending = true; } else {
