http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/instrumentation/ChannelCounter.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/ChannelCounter.java b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/ChannelCounter.java index 977ad6c..c4fe6c6 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/ChannelCounter.java +++ b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/ChannelCounter.java @@ -100,20 +100,20 @@ public class ChannelCounter extends MonitoredCounterGroup implements return addAndGet(COUNTER_EVENT_TAKE_SUCCESS, delta); } - public void setChannelCapacity(long capacity){ + public void setChannelCapacity(long capacity) { set(COUNTER_CHANNEL_CAPACITY, capacity); } @Override - public long getChannelCapacity(){ + public long getChannelCapacity() { return get(COUNTER_CHANNEL_CAPACITY); } @Override - public double getChannelFillPercentage(){ + public double getChannelFillPercentage() { long capacity = getChannelCapacity(); - if(capacity != 0L) { - return ((getChannelSize()/(double)capacity) * 100); + if (capacity != 0L) { + return (getChannelSize() / (double)capacity) * 100; } return Double.MAX_VALUE; }
http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/instrumentation/GangliaServer.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/GangliaServer.java b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/GangliaServer.java index 7d4be55..bd9cd88 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/GangliaServer.java +++ b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/GangliaServer.java @@ -85,12 +85,6 @@ public class GangliaServer implements MonitorService { public final String CONF_ISGANGLIA3 = "isGanglia3"; private static final String GANGLIA_CONTEXT = "flume."; - /** - * - * @param hosts List of hosts to send the metrics to. All of them have to be - * running the version of ganglia specified by the configuration. - * @throws FlumeException - */ public GangliaServer() throws FlumeException { collectorRunnable = new GangliaCollector(); } http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitoredCounterGroup.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitoredCounterGroup.java b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitoredCounterGroup.java index 44e26e4..633513a 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitoredCounterGroup.java +++ b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitoredCounterGroup.java @@ -59,7 +59,6 @@ public abstract class MonitoredCounterGroup { private AtomicLong stopTime; private volatile boolean registered = false; - protected MonitoredCounterGroup(Type type, String name, String... attrs) { this.type = type; this.name = name; @@ -154,15 +153,15 @@ public abstract class MonitoredCounterGroup { // Print out the startTime for this component logger.info("Shutdown Metric for type: " + type + ", " - + "name: " + name + ". " - + typePrefix + "." + COUNTER_GROUP_START_TIME - + " == " + startTime); + + "name: " + name + ". " + + typePrefix + "." + COUNTER_GROUP_START_TIME + + " == " + startTime); // Print out the stopTime for this component logger.info("Shutdown Metric for type: " + type + ", " - + "name: " + name + ". " - + typePrefix + "." + COUNTER_GROUP_STOP_TIME - + " == " + stopTime); + + "name: " + name + ". " + + typePrefix + "." + COUNTER_GROUP_STOP_TIME + + " == " + stopTime); // Retrieve and sort counter group map keys final List<String> mapKeys = new ArrayList<String>(counterMap.keySet()); @@ -176,8 +175,8 @@ public abstract class MonitoredCounterGroup { final long counterMapValue = get(counterMapKey); logger.info("Shutdown Metric for type: " + type + ", " - + "name: " + name + ". " - + counterMapKey + " == " + counterMapValue); + + "name: " + name + ". " + + counterMapKey + " == " + counterMapValue); } } @@ -276,9 +275,9 @@ public abstract class MonitoredCounterGroup { INTERCEPTOR, SERIALIZER, OTHER - }; + } - public String getType(){ + public String getType() { return type.name(); } } http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitoringType.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitoringType.java b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitoringType.java index 443335c..4e1a28c 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitoringType.java +++ b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitoringType.java @@ -33,7 +33,7 @@ public enum MonitoringType { this.monitoringClass = klass; } - public Class<? extends MonitorService> getMonitorClass(){ + public Class<? extends MonitorService> getMonitorClass() { return this.monitoringClass; } } http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SinkCounter.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SinkCounter.java b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SinkCounter.java index 54f4a4c..534adc8 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SinkCounter.java +++ b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SinkCounter.java @@ -53,7 +53,6 @@ public class SinkCounter extends MonitoredCounterGroup implements COUNTER_EVENT_DRAIN_ATTEMPT, COUNTER_EVENT_DRAIN_SUCCESS }; - public SinkCounter(String name) { super(MonitoredCounterGroup.Type.SINK, name, ATTRIBUTES); } @@ -63,7 +62,6 @@ public class SinkCounter extends MonitoredCounterGroup implements (String[]) ArrayUtils.addAll(attributes,ATTRIBUTES)); } - @Override public long getConnectionCreatedCount() { return get(COUNTER_CONNECTION_CREATED); http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SourceCounter.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SourceCounter.java b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SourceCounter.java index 02ef6ed..f96694e 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SourceCounter.java +++ b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SourceCounter.java @@ -41,15 +41,12 @@ public class SourceCounter extends MonitoredCounterGroup implements private static final String COUNTER_OPEN_CONNECTION_COUNT = "src.open-connection.count"; - - private static final String[] ATTRIBUTES = - { - COUNTER_EVENTS_RECEIVED, COUNTER_EVENTS_ACCEPTED, - COUNTER_APPEND_RECEIVED, COUNTER_APPEND_ACCEPTED, - COUNTER_APPEND_BATCH_RECEIVED, COUNTER_APPEND_BATCH_ACCEPTED, - COUNTER_OPEN_CONNECTION_COUNT - }; - + private static final String[] ATTRIBUTES = { + COUNTER_EVENTS_RECEIVED, COUNTER_EVENTS_ACCEPTED, + COUNTER_APPEND_RECEIVED, COUNTER_APPEND_ACCEPTED, + COUNTER_APPEND_BATCH_RECEIVED, COUNTER_APPEND_BATCH_ACCEPTED, + COUNTER_OPEN_CONNECTION_COUNT + }; public SourceCounter(String name) { super(MonitoredCounterGroup.Type.SOURCE, name, ATTRIBUTES); @@ -126,7 +123,7 @@ public class SourceCounter extends MonitoredCounterGroup implements return get(COUNTER_OPEN_CONNECTION_COUNT); } - public void setOpenConnectionCount(long openConnectionCount){ + public void setOpenConnectionCount(long openConnectionCount) { set(COUNTER_OPEN_CONNECTION_COUNT, openConnectionCount); } } http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/instrumentation/http/HTTPMetricsServer.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/http/HTTPMetricsServer.java b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/http/HTTPMetricsServer.java index 7c0afb0..921a1f7 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/http/HTTPMetricsServer.java +++ b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/http/HTTPMetricsServer.java @@ -94,9 +94,7 @@ public class HTTPMetricsServer implements MonitorService { private class HTTPMetricsHandler extends AbstractHandler { - Type mapType = - new TypeToken<Map<String, Map<String, String>>>() { - }.getType(); + Type mapType = new TypeToken<Map<String, Map<String, String>>>() {}.getType(); Gson gson = new Gson(); @Override @@ -108,8 +106,8 @@ public class HTTPMetricsServer implements MonitorService { //If we want to use any other url for something else, we should make sure //that for metrics only /metrics is used to prevent backward //compatibility issues. - if(request.getMethod().equalsIgnoreCase("TRACE") || request.getMethod() - .equalsIgnoreCase("OPTIONS")) { + if (request.getMethod().equalsIgnoreCase("TRACE") || + request.getMethod().equalsIgnoreCase("OPTIONS")) { response.sendError(HttpServletResponse.SC_FORBIDDEN); response.flushBuffer(); ((Request) request).setHandled(true); http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/instrumentation/kafka/KafkaChannelCounter.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/kafka/KafkaChannelCounter.java b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/kafka/KafkaChannelCounter.java index 6e142cf..28d3c8c 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/kafka/KafkaChannelCounter.java +++ b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/kafka/KafkaChannelCounter.java @@ -23,7 +23,6 @@ import org.apache.flume.instrumentation.ChannelCounter; public class KafkaChannelCounter extends ChannelCounter implements KafkaChannelCounterMBean { - private static final String TIMER_KAFKA_EVENT_GET = "channel.kafka.event.get.time"; @@ -36,13 +35,10 @@ public class KafkaChannelCounter extends ChannelCounter private static final String COUNT_ROLLBACK = "channel.rollback.count"; - private static String[] ATTRIBUTES = { - TIMER_KAFKA_COMMIT,TIMER_KAFKA_EVENT_SEND,TIMER_KAFKA_EVENT_GET, - COUNT_ROLLBACK + TIMER_KAFKA_COMMIT,TIMER_KAFKA_EVENT_SEND,TIMER_KAFKA_EVENT_GET, COUNT_ROLLBACK }; - public KafkaChannelCounter(String name) { super(name,ATTRIBUTES); } http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/instrumentation/util/JMXPollUtil.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/util/JMXPollUtil.java b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/util/JMXPollUtil.java index cbd6c35..a779496 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/util/JMXPollUtil.java +++ b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/util/JMXPollUtil.java @@ -37,8 +37,7 @@ import org.slf4j.LoggerFactory; public class JMXPollUtil { private static Logger LOG = LoggerFactory.getLogger(JMXPollUtil.class); - private static MBeanServer mbeanServer = ManagementFactory. - getPlatformMBeanServer(); + private static MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer(); public static Map<String, Map<String, String>> getAllMBeans() { Map<String, Map<String, String>> mbeanMap = Maps.newHashMap(); @@ -54,23 +53,20 @@ public class JMXPollUtil { if (!obj.getObjectName().toString().startsWith("org.apache.flume")) { continue; } - MBeanAttributeInfo[] attrs = mbeanServer. - getMBeanInfo(obj.getObjectName()).getAttributes(); - String strAtts[] = new String[attrs.length]; + MBeanAttributeInfo[] attrs = mbeanServer.getMBeanInfo(obj.getObjectName()).getAttributes(); + String[] strAtts = new String[attrs.length]; for (int i = 0; i < strAtts.length; i++) { strAtts[i] = attrs[i].getName(); } - AttributeList attrList = mbeanServer.getAttributes( - obj.getObjectName(), strAtts); + AttributeList attrList = mbeanServer.getAttributes(obj.getObjectName(), strAtts); String component = obj.getObjectName().toString().substring( - obj.getObjectName().toString().indexOf('=') + 1); + obj.getObjectName().toString().indexOf('=') + 1); Map<String, String> attrMap = Maps.newHashMap(); - for (Object attr : attrList) { Attribute localAttr = (Attribute) attr; - if(localAttr.getName().equalsIgnoreCase("type")){ - component = localAttr.getValue()+ "." + component; + if (localAttr.getName().equalsIgnoreCase("type")) { + component = localAttr.getValue() + "." + component; } attrMap.put(localAttr.getName(), localAttr.getValue().toString()); } http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/interceptor/HostInterceptor.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/interceptor/HostInterceptor.java b/flume-ng-core/src/main/java/org/apache/flume/interceptor/HostInterceptor.java index 2693123..a2ad018 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/interceptor/HostInterceptor.java +++ b/flume-ng-core/src/main/java/org/apache/flume/interceptor/HostInterceptor.java @@ -88,7 +88,6 @@ public class HostInterceptor implements Interceptor { logger.warn("Could not get local host address. Exception follows.", e); } - } @Override @@ -106,7 +105,7 @@ public class HostInterceptor implements Interceptor { if (preserveExisting && headers.containsKey(header)) { return event; } - if(host != null) { + if (host != null) { headers.put(header, host); } http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/interceptor/RegexExtractorInterceptor.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/interceptor/RegexExtractorInterceptor.java b/flume-ng-core/src/main/java/org/apache/flume/interceptor/RegexExtractorInterceptor.java index 67cfc43..7fda90d 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/interceptor/RegexExtractorInterceptor.java +++ b/flume-ng-core/src/main/java/org/apache/flume/interceptor/RegexExtractorInterceptor.java @@ -61,10 +61,10 @@ import com.google.common.collect.Lists; * agent.sources.r1.interceptors.i1.serializers = s1 s2 * agent.sources.r1.interceptors.i1.serializers.s1.type = com.blah.SomeSerializer * agent.sources.r1.interceptors.i1.serializers.s1.name = warning - * agent.sources.r1.interceptors.i1.serializers.s2.type = org.apache.flume.interceptor.RegexExtractorInterceptorTimestampSerializer + * agent.sources.r1.interceptors.i1.serializers.s2.type = + * org.apache.flume.interceptor.RegexExtractorInterceptorTimestampSerializer * agent.sources.r1.interceptors.i1.serializers.s2.name = error * agent.sources.r1.interceptors.i1.serializers.s2.dateFormat = yyyy-MM-dd - * </code> * </p> * <pre> * Example 1: @@ -167,7 +167,8 @@ public class RegexExtractorInterceptor implements Interceptor { private Pattern regex; private List<NameAndSerializer> serializerList; - private final RegexExtractorInterceptorSerializer defaultSerializer = new RegexExtractorInterceptorPassThroughSerializer(); + private final RegexExtractorInterceptorSerializer defaultSerializer = + new RegexExtractorInterceptorPassThroughSerializer(); @Override public void configure(Context context) { @@ -191,7 +192,7 @@ public class RegexExtractorInterceptor implements Interceptor { new Context(context.getSubProperties(SERIALIZERS + ".")); serializerList = Lists.newArrayListWithCapacity(serializerNames.length); - for(String serializerName : serializerNames) { + for (String serializerName : serializerNames) { Context serializerContext = new Context( serializerContexts.getSubProperties(serializerName + ".")); String type = serializerContext.getString("type", "DEFAULT"); @@ -199,7 +200,7 @@ public class RegexExtractorInterceptor implements Interceptor { Preconditions.checkArgument(!StringUtils.isEmpty(name), "Supplied name cannot be empty."); - if("DEFAULT".equals(type)) { + if ("DEFAULT".equals(type)) { serializerList.add(new NameAndSerializer(name, defaultSerializer)); } else { serializerList.add(new NameAndSerializer(name, getCustomSerializer( http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/interceptor/RegexFilteringInterceptor.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/interceptor/RegexFilteringInterceptor.java b/flume-ng-core/src/main/java/org/apache/flume/interceptor/RegexFilteringInterceptor.java index 8a3b6ce..d8327d4 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/interceptor/RegexFilteringInterceptor.java +++ b/flume-ng-core/src/main/java/org/apache/flume/interceptor/RegexFilteringInterceptor.java @@ -103,16 +103,13 @@ public class RegexFilteringInterceptor implements Interceptor { if (!excludeEvents) { if (regex.matcher(new String(event.getBody())).find()) { return event; - } - else { + } else { return null; } - } - else { + } else { if (regex.matcher(new String(event.getBody())).find()) { return null; - } - else { + } else { return event; } } @@ -129,7 +126,9 @@ public class RegexFilteringInterceptor implements Interceptor { List<Event> out = Lists.newArrayList(); for (Event event : events) { Event outEvent = intercept(event); - if (outEvent != null) { out.add(outEvent); } + if (outEvent != null) { + out.add(outEvent); + } } return out; } http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/interceptor/SearchAndReplaceInterceptor.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/interceptor/SearchAndReplaceInterceptor.java b/flume-ng-core/src/main/java/org/apache/flume/interceptor/SearchAndReplaceInterceptor.java index b8588cd..c4ec43b 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/interceptor/SearchAndReplaceInterceptor.java +++ b/flume-ng-core/src/main/java/org/apache/flume/interceptor/SearchAndReplaceInterceptor.java @@ -56,8 +56,8 @@ public class SearchAndReplaceInterceptor implements Interceptor { private final Charset charset; private SearchAndReplaceInterceptor(Pattern searchPattern, - String replaceString, - Charset charset) { + String replaceString, + Charset charset) { this.searchPattern = searchPattern; this.replaceString = replaceString; this.charset = charset; @@ -107,7 +107,7 @@ public class SearchAndReplaceInterceptor implements Interceptor { replaceString = context.getString(REPLACE_STRING_KEY); // Empty replacement String value or if the property itself is not present // assign empty string as replacement - if(replaceString == null) { + if (replaceString == null) { replaceString = ""; } http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/interceptor/StaticInterceptor.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/interceptor/StaticInterceptor.java b/flume-ng-core/src/main/java/org/apache/flume/interceptor/StaticInterceptor.java index 97df467..d2eb523 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/interceptor/StaticInterceptor.java +++ b/flume-ng-core/src/main/java/org/apache/flume/interceptor/StaticInterceptor.java @@ -26,8 +26,6 @@ import org.apache.flume.Event; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.flume.interceptor.StaticInterceptor.Constants.*; - /** * Interceptor class that appends a static, pre-configured header to all events. * @@ -57,8 +55,7 @@ import static org.apache.flume.interceptor.StaticInterceptor.Constants.*; */ public class StaticInterceptor implements Interceptor { - private static final Logger logger = LoggerFactory - .getLogger(StaticInterceptor.class); + private static final Logger logger = LoggerFactory.getLogger(StaticInterceptor.class); private final boolean preserveExisting; private final String key; @@ -123,9 +120,9 @@ public class StaticInterceptor implements Interceptor { @Override public void configure(Context context) { - preserveExisting = context.getBoolean(PRESERVE, PRESERVE_DEFAULT); - key = context.getString(KEY, KEY_DEFAULT); - value = context.getString(VALUE, VALUE_DEFAULT); + preserveExisting = context.getBoolean(Constants.PRESERVE, Constants.PRESERVE_DEFAULT); + key = context.getString(Constants.KEY, Constants.KEY_DEFAULT); + value = context.getString(Constants.VALUE, Constants.VALUE_DEFAULT); } @Override @@ -136,11 +133,9 @@ public class StaticInterceptor implements Interceptor { return new StaticInterceptor(preserveExisting, key, value); } - } public static class Constants { - public static final String KEY = "key"; public static final String KEY_DEFAULT = "key"; http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/interceptor/TimestampInterceptor.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/interceptor/TimestampInterceptor.java b/flume-ng-core/src/main/java/org/apache/flume/interceptor/TimestampInterceptor.java index 9d942f6..50c3695 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/interceptor/TimestampInterceptor.java +++ b/flume-ng-core/src/main/java/org/apache/flume/interceptor/TimestampInterceptor.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.Map; import org.apache.flume.Context; import org.apache.flume.Event; + import static org.apache.flume.interceptor.TimestampInterceptor.Constants.*; /** http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleAware.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleAware.java b/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleAware.java index d7fe7ac..f42fd86 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleAware.java +++ b/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleAware.java @@ -44,44 +44,42 @@ import org.apache.flume.annotations.InterfaceStability; * <p> * Example usage * </p> - * <code> - * public class MyService implements LifecycleAware { + * <pre> + * {@code + * public class MyService implements LifecycleAware { * - * private LifecycleState lifecycleState; + * private LifecycleState lifecycleState; * - * public MyService() { - * lifecycleState = LifecycleState.IDLE; - * } + * public MyService() { + * lifecycleState = LifecycleState.IDLE; + * } * - * @Override - * public void start(Context context) throws LifecycleException, - * InterruptedException { + * @Override + * public void start(Context context) throws LifecycleException, InterruptedException { + * // ...your code does something. + * lifecycleState = LifecycleState.START; + * } * - * ...your code does something. + * @Override + * public void stop(Context context) throws LifecycleException, InterruptedException { * - * lifecycleState = LifecycleState.START; - * } + * try { + * // ...you stop services here. + * } catch (SomethingException) { + * lifecycleState = LifecycleState.ERROR; + * } * - * @Override - * public void stop(Context context) throws LifecycleException, - * InterruptedException { + * lifecycleState = LifecycleState.STOP; + * } * - * try { - * ...you stop services here. - * } catch (SomethingException) { - * lifecycleState = LifecycleState.ERROR; - * } + * @Override + * public LifecycleState getLifecycleState() { + * return lifecycleState; + * } * - * lifecycleState = LifecycleState.STOP; - * } - * - * @Override - * public LifecycleState getLifecycleState() { - * return lifecycleState; - * } - * - * } - * </code> + * } + * } + * </pre> */ @InterfaceAudience.Public @InterfaceStability.Stable http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleSupervisor.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleSupervisor.java b/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleSupervisor.java index 59d780a..773d671 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleSupervisor.java +++ b/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleSupervisor.java @@ -36,8 +36,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; public class LifecycleSupervisor implements LifecycleAware { - private static final Logger logger = LoggerFactory - .getLogger(LifecycleSupervisor.class); + private static final Logger logger = LoggerFactory.getLogger(LifecycleSupervisor.class); private Map<LifecycleAware, Supervisoree> supervisedProcesses; private Map<LifecycleAware, ScheduledFuture<?>> monitorFutures; @@ -81,15 +80,15 @@ public class LifecycleSupervisor implements LifecycleAware { if (monitorService != null) { monitorService.shutdown(); - try{ + try { monitorService.awaitTermination(10, TimeUnit.SECONDS); } catch (InterruptedException e) { logger.error("Interrupted while waiting for monitor service to stop"); } - if(!monitorService.isTerminated()) { + if (!monitorService.isTerminated()) { monitorService.shutdownNow(); try { - while(!monitorService.isTerminated()) { + while (!monitorService.isTerminated()) { monitorService.awaitTermination(10, TimeUnit.SECONDS); } } catch (InterruptedException e) { @@ -98,8 +97,7 @@ public class LifecycleSupervisor implements LifecycleAware { } } - for (final Entry<LifecycleAware, Supervisoree> entry : supervisedProcesses - .entrySet()) { + for (final Entry<LifecycleAware, Supervisoree> entry : supervisedProcesses.entrySet()) { if (entry.getKey().getLifecycleState().equals(LifecycleState.START)) { entry.getValue().status.desiredState = LifecycleState.STOP; @@ -122,9 +120,9 @@ public class LifecycleSupervisor implements LifecycleAware { public synchronized void supervise(LifecycleAware lifecycleAware, SupervisorPolicy policy, LifecycleState desiredState) { - if(this.monitorService.isShutdown() + if (this.monitorService.isShutdown() || this.monitorService.isTerminated() - || this.monitorService.isTerminating()){ + || this.monitorService.isTerminating()) { throw new FlumeException("Supervise called on " + lifecycleAware + " " + "after shutdown has been initiated. " + lifecycleAware + " will not" + " be started"); @@ -165,8 +163,8 @@ public class LifecycleSupervisor implements LifecycleAware { logger.debug("Unsupervising service:{}", lifecycleAware); synchronized (lifecycleAware) { - Supervisoree supervisoree = supervisedProcesses.get(lifecycleAware); - supervisoree.status.discard = true; + Supervisoree supervisoree = supervisedProcesses.get(lifecycleAware); + supervisoree.status.discard = true; this.setDesiredState(lifecycleAware, LifecycleState.STOP); logger.info("Stopping component: {}", lifecycleAware); lifecycleAware.stop(); @@ -199,7 +197,7 @@ public class LifecycleSupervisor implements LifecycleAware { return lifecycleState; } - public synchronized boolean isComponentInErrorState(LifecycleAware component){ + public synchronized boolean isComponentInErrorState(LifecycleAware component) { return supervisedProcesses.get(component).status.error; } @@ -301,18 +299,18 @@ public class LifecycleSupervisor implements LifecycleAware { } } } - } catch(Throwable t) { + } catch (Throwable t) { logger.error("Unexpected error", t); } logger.debug("Status check complete"); } } - private class Purger implements Runnable{ + private class Purger implements Runnable { @Override public void run() { - if(needToPurge){ + if (needToPurge) { monitorService.purge(); needToPurge = false; } @@ -338,7 +336,7 @@ public class LifecycleSupervisor implements LifecycleAware { } - public static abstract class SupervisorPolicy { + public abstract static class SupervisorPolicy { abstract boolean isValid(LifecycleAware object, Status status); @@ -372,5 +370,4 @@ public class LifecycleSupervisor implements LifecycleAware { } - } http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/serialization/AvroEventDeserializer.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/serialization/AvroEventDeserializer.java b/flume-ng-core/src/main/java/org/apache/flume/serialization/AvroEventDeserializer.java index 5faf449..c648958 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/serialization/AvroEventDeserializer.java +++ b/flume-ng-core/src/main/java/org/apache/flume/serialization/AvroEventDeserializer.java @@ -52,8 +52,7 @@ import java.util.Locale; */ public class AvroEventDeserializer implements EventDeserializer { - private static final Logger logger = LoggerFactory.getLogger - (AvroEventDeserializer.class); + private static final Logger logger = LoggerFactory.getLogger(AvroEventDeserializer.class); private final AvroSchemaType schemaType; private final ResettableInputStream ris; @@ -180,8 +179,8 @@ public class AvroEventDeserializer implements EventDeserializer { } private static class SeekableResettableInputBridge implements SeekableInput { - ResettableInputStream ris; + public SeekableResettableInputBridge(ResettableInputStream ris) { this.ris = ris; } http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/serialization/BodyTextEventSerializer.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/serialization/BodyTextEventSerializer.java b/flume-ng-core/src/main/java/org/apache/flume/serialization/BodyTextEventSerializer.java index d09f02d..4d7a525 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/serialization/BodyTextEventSerializer.java +++ b/flume-ng-core/src/main/java/org/apache/flume/serialization/BodyTextEventSerializer.java @@ -31,8 +31,7 @@ import org.slf4j.LoggerFactory; */ public class BodyTextEventSerializer implements EventSerializer { - private final static Logger logger = - LoggerFactory.getLogger(BodyTextEventSerializer.class); + private static final Logger logger = LoggerFactory.getLogger(BodyTextEventSerializer.class); // for legacy reasons, by default, append a newline to each event written out private final String APPEND_NEWLINE = "appendNewline"; http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/serialization/HeaderAndBodyTextEventSerializer.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/serialization/HeaderAndBodyTextEventSerializer.java b/flume-ng-core/src/main/java/org/apache/flume/serialization/HeaderAndBodyTextEventSerializer.java index 9c6003c..8e3621d 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/serialization/HeaderAndBodyTextEventSerializer.java +++ b/flume-ng-core/src/main/java/org/apache/flume/serialization/HeaderAndBodyTextEventSerializer.java @@ -31,7 +31,7 @@ import org.slf4j.LoggerFactory; */ public class HeaderAndBodyTextEventSerializer implements EventSerializer { - private final static Logger logger = + private static final Logger logger = LoggerFactory.getLogger(HeaderAndBodyTextEventSerializer.class); // for legacy reasons, by default, append a newline to each event written out http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/serialization/LineDeserializer.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/serialization/LineDeserializer.java b/flume-ng-core/src/main/java/org/apache/flume/serialization/LineDeserializer.java index 7c87235..8f23685 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/serialization/LineDeserializer.java +++ b/flume-ng-core/src/main/java/org/apache/flume/serialization/LineDeserializer.java @@ -38,8 +38,7 @@ import java.util.List; @InterfaceStability.Evolving public class LineDeserializer implements EventDeserializer { - private static final Logger logger = LoggerFactory.getLogger - (LineDeserializer.class); + private static final Logger logger = LoggerFactory.getLogger(LineDeserializer.class); private final ResettableInputStream in; private final Charset outputCharset; http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/serialization/ResettableFileInputStream.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/serialization/ResettableFileInputStream.java b/flume-ng-core/src/main/java/org/apache/flume/serialization/ResettableFileInputStream.java index 618913e..7d6d95c 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/serialization/ResettableFileInputStream.java +++ b/flume-ng-core/src/main/java/org/apache/flume/serialization/ResettableFileInputStream.java @@ -64,12 +64,13 @@ import java.nio.charset.CodingErrorAction; * <p>Thus the behaviour of mark and reset is as follows:</p> * * <ol> - * <li>If {@link #mark()} is called after a high surrogate pair has been returned by {@link #readChar()}, - * the marked position will be that of the character <em>following</em> the low surrogate, - * <em>not</em> that of the low surrogate itself.</li> - * <li>If {@link #reset()} is called after a high surrogate pair has been returned by {@link #readChar()}, - * the low surrogate is always returned by the next call to {@link #readChar()}, - * <em>before</em> the stream is actually reset to the last marked position.</li> + * <li>If {@link #mark()} is called after a high surrogate pair has been returned by + * {@link #readChar()}, the marked position will be that of the character <em>following</em> + * the low surrogate, <em>not</em> that of the low surrogate itself.</li> + * <li>If {@link #reset()} is called after a high surrogate pair has been returned by + * {@link #readChar()}, the low surrogate is always returned by the next call to + * {@link #readChar()}, <em>before</em> the stream is actually reset to the last marked + * position.</li> * </ol> * * <p>This ensures that no dangling high surrogate could ever be read as long as @@ -181,13 +182,13 @@ public class ResettableFileInputStream extends ResettableInputStream this.decoder = charset.newDecoder(); this.position = 0; this.syncPosition = 0; - if(charset.name().startsWith("UTF-8")) { + if (charset.name().startsWith("UTF-8")) { // some JDKs wrongly report 3 bytes max this.maxCharWidth = 4; - } else if(charset.name().startsWith("UTF-16")) { + } else if (charset.name().startsWith("UTF-16")) { // UTF_16BE and UTF_16LE wrongly report 2 bytes max this.maxCharWidth = 4; - } else if(charset.name().startsWith("UTF-32")) { + } else if (charset.name().startsWith("UTF-32")) { // UTF_32BE and UTF_32LE wrongly report 4 bytes max this.maxCharWidth = 8; } else { @@ -254,7 +255,7 @@ public class ResettableFileInputStream extends ResettableInputStream // Check whether we are in the middle of a surrogate pair, // in which case, return the last (low surrogate) char of the pair. - if(hasLowSurrogate) { + if (hasLowSurrogate) { hasLowSurrogate = false; return lowSurrogate; } @@ -296,7 +297,7 @@ public class ResettableFileInputStream extends ResettableInputStream // Found nothing, but the byte buffer has not been entirely consumed. // This situation denotes the presence of a surrogate pair // that can only be decoded if we have a 2-char buffer. - if(buf.hasRemaining()) { + if (buf.hasRemaining()) { charBuf.clear(); // increase the limit to 2 charBuf.limit(2); @@ -312,9 +313,10 @@ public class ResettableFileInputStream extends ResettableInputStream // save second (low surrogate) char for later consumption lowSurrogate = charBuf.get(); // Check if we really have a surrogate pair - if( ! Character.isHighSurrogate(highSurrogate) || ! Character.isLowSurrogate(lowSurrogate)) { + if (!Character.isHighSurrogate(highSurrogate) || !Character.isLowSurrogate(lowSurrogate)) { // This should only happen in case of bad sequences (dangling surrogate, etc.) - logger.warn("Decoded a pair of chars, but it does not seem to be a surrogate pair: {} {}", (int)highSurrogate, (int)lowSurrogate); + logger.warn("Decoded a pair of chars, but it does not seem to be a surrogate pair: {} {}", + (int)highSurrogate, (int)lowSurrogate); } hasLowSurrogate = true; // consider the pair as a single unit and increment position normally http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractRpcSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractRpcSink.java b/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractRpcSink.java index 5146834..f6024aa 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractRpcSink.java +++ b/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractRpcSink.java @@ -115,14 +115,16 @@ import java.util.concurrent.locks.ReentrantLock; * </tr> * <tr> * <td><tt>compression-type</tt></td> - * <td>Select compression type. Default is "none" and the only compression type available is "deflate"</td> + * <td>Select compression type. Default is "none" and the only compression type available + * is "deflate"</td> * <td>compression type</td> * <td>none</td> * </tr> * <tr> * <td><tt>compression-level</tt></td> - * <td>In the case compression type is "deflate" this value can be between 0-9. 0 being no compression and - * 1-9 is compression. The higher the number the better the compression. 6 is the default.</td> + * <td>In the case compression type is "deflate" this value can be between 0-9. + * 0 being no compression and 1-9 is compression. + * The higher the number the better the compression. 6 is the default.</td> * <td>compression level</td> * <td>6</td> * </tr> @@ -139,11 +141,9 @@ import java.util.concurrent.locks.ReentrantLock; * This method will be called whenever this sink needs to create a new * connection to the source. */ -public abstract class AbstractRpcSink extends AbstractSink - implements Configurable { +public abstract class AbstractRpcSink extends AbstractSink implements Configurable { - private static final Logger logger = LoggerFactory.getLogger - (AbstractRpcSink.class); + private static final Logger logger = LoggerFactory.getLogger(AbstractRpcSink.class); private String hostname; private Integer port; private RpcClient client; @@ -152,9 +152,9 @@ public abstract class AbstractRpcSink extends AbstractSink private int cxnResetInterval; private AtomicBoolean resetConnectionFlag; private final int DEFAULT_CXN_RESET_INTERVAL = 0; - private final ScheduledExecutorService cxnResetExecutor = Executors - .newSingleThreadScheduledExecutor(new ThreadFactoryBuilder() - .setNameFormat("Rpc Sink Reset Thread").build()); + private final ScheduledExecutorService cxnResetExecutor = + Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder().setNameFormat("Rpc Sink Reset Thread").build()); @Override public void configure(Context context) { @@ -179,10 +179,9 @@ public abstract class AbstractRpcSink extends AbstractSink } cxnResetInterval = context.getInteger("reset-connection-interval", DEFAULT_CXN_RESET_INTERVAL); - if(cxnResetInterval == DEFAULT_CXN_RESET_INTERVAL) { - logger.info("Connection reset is set to " + String.valueOf - (DEFAULT_CXN_RESET_INTERVAL) +". Will not reset connection to next " + - "hop"); + if (cxnResetInterval == DEFAULT_CXN_RESET_INTERVAL) { + logger.info("Connection reset is set to " + String.valueOf(DEFAULT_CXN_RESET_INTERVAL) + + ". Will not reset connection to next hop"); } } @@ -210,7 +209,7 @@ public abstract class AbstractRpcSink extends AbstractSink resetConnectionFlag = new AtomicBoolean(false); client = initializeRpcClient(clientProps); Preconditions.checkNotNull(client, "Rpc Client could not be " + - "initialized. " + getName() + " could not be started"); + "initialized. " + getName() + " could not be started"); sinkCounter.incrementConnectionCreatedCount(); if (cxnResetInterval > 0) { cxnResetExecutor.schedule(new Runnable() { @@ -228,20 +227,19 @@ public abstract class AbstractRpcSink extends AbstractSink throw new FlumeException(ex); } } - logger.debug("Rpc sink {}: Created RpcClient: {}", getName(), client); + logger.debug("Rpc sink {}: Created RpcClient: {}", getName(), client); } } private void resetConnection() { - try { - destroyConnection(); - createConnection(); - } catch (Throwable throwable) { - //Don't rethrow, else this runnable won't get scheduled again. - logger.error("Error while trying to expire connection", - throwable); - } + try { + destroyConnection(); + createConnection(); + } catch (Throwable throwable) { + // Don't rethrow, else this runnable won't get scheduled again. + logger.error("Error while trying to expire connection", throwable); + } } private void destroyConnection() { @@ -314,8 +312,7 @@ public abstract class AbstractRpcSink extends AbstractSink cxnResetExecutor.shutdownNow(); } } catch (Exception ex) { - logger.error("Interrupted while waiting for connection reset executor " + - "to shut down"); + logger.error("Interrupted while waiting for connection reset executor to shut down"); } sinkCounter.stop(); super.stop(); @@ -335,7 +332,7 @@ public abstract class AbstractRpcSink extends AbstractSink Channel channel = getChannel(); Transaction transaction = channel.getTransaction(); - if(resetConnectionFlag.get()) { + if (resetConnectionFlag.get()) { resetConnection(); // if the time to reset is long and the timeout is short // this may cancel the next reset request http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractSink.java b/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractSink.java index 1112643..585044b 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractSink.java +++ b/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractSink.java @@ -30,7 +30,7 @@ import com.google.common.base.Preconditions; @InterfaceAudience.Public @InterfaceStability.Stable -abstract public class AbstractSink implements Sink, LifecycleAware { +public abstract class AbstractSink implements Sink, LifecycleAware { private Channel channel; private String name; @@ -78,7 +78,8 @@ abstract public class AbstractSink implements Sink, LifecycleAware { return name; } + @Override public String toString() { - return this.getClass().getName() + "{name:" + name + ", channel:" + channel.getName() + "}"; + return this.getClass().getName() + "{name:" + name + ", channel:" + channel.getName() + "}"; } } http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractSinkProcessor.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractSinkProcessor.java b/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractSinkProcessor.java index 1c30592..3de653a 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractSinkProcessor.java +++ b/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractSinkProcessor.java @@ -37,7 +37,7 @@ public abstract class AbstractSinkProcessor implements SinkProcessor { @Override public void start() { - for(Sink s : sinkList) { + for (Sink s : sinkList) { s.start(); } @@ -46,7 +46,7 @@ public abstract class AbstractSinkProcessor implements SinkProcessor { @Override public void stop() { - for(Sink s : sinkList) { + for (Sink s : sinkList) { s.stop(); } state = LifecycleState.STOP; http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractSinkSelector.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractSinkSelector.java b/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractSinkSelector.java index 9ddeef4..f236a8a 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractSinkSelector.java +++ b/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractSinkSelector.java @@ -38,7 +38,7 @@ public abstract class AbstractSinkSelector implements SinkSelector { @Override public void configure(Context context) { Long timeOut = context.getLong("maxTimeOut"); - if(timeOut != null){ + if (timeOut != null) { maxTimeOut = timeOut; } } http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/sink/DefaultSinkFactory.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/sink/DefaultSinkFactory.java b/flume-ng-core/src/main/java/org/apache/flume/sink/DefaultSinkFactory.java index 6a5be92..6f405fb 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/sink/DefaultSinkFactory.java +++ b/flume-ng-core/src/main/java/org/apache/flume/sink/DefaultSinkFactory.java @@ -53,8 +53,7 @@ public class DefaultSinkFactory implements SinkFactory { @SuppressWarnings("unchecked") @Override - public Class<? extends Sink> getClass(String type) - throws FlumeException { + public Class<? extends Sink> getClass(String type) throws FlumeException { String sinkClassName = type; SinkType sinkType = SinkType.OTHER; try { http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/sink/DefaultSinkProcessor.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/sink/DefaultSinkProcessor.java b/flume-ng-core/src/main/java/org/apache/flume/sink/DefaultSinkProcessor.java index 00a362b..2da9264 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/sink/DefaultSinkProcessor.java +++ b/flume-ng-core/src/main/java/org/apache/flume/sink/DefaultSinkProcessor.java @@ -35,8 +35,7 @@ import com.google.common.base.Preconditions; * results without any additional handling. Suitable for all sinks that aren't * assigned to a group. */ -public class DefaultSinkProcessor implements SinkProcessor, -ConfigurableComponent { +public class DefaultSinkProcessor implements SinkProcessor, ConfigurableComponent { private Sink sink; private LifecycleState lifecycleState; http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/sink/FailoverSinkProcessor.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/sink/FailoverSinkProcessor.java b/flume-ng-core/src/main/java/org/apache/flume/sink/FailoverSinkProcessor.java index 3bd52f2..69541e6 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/sink/FailoverSinkProcessor.java +++ b/flume-ng-core/src/main/java/org/apache/flume/sink/FailoverSinkProcessor.java @@ -70,12 +70,14 @@ public class FailoverSinkProcessor extends AbstractSinkProcessor { private Integer priority; private Sink sink; private Integer sequentialFailures; + public FailedSink(Integer priority, Sink sink, int seqFailures) { this.sink = sink; this.priority = priority; this.sequentialFailures = seqFailures; adjustRefresh(); } + @Override public int compareTo(FailedSink arg0) { return refresh.compareTo(arg0.refresh); @@ -88,24 +90,25 @@ public class FailoverSinkProcessor extends AbstractSinkProcessor { public Sink getSink() { return sink; } + public Integer getPriority() { return priority; } + public void incFails() { sequentialFailures++; adjustRefresh(); - logger.debug("Sink {} failed again, new refresh is at {}, " + - "current time {}", new Object[] { - sink.getName(), refresh, System.currentTimeMillis()}); + logger.debug("Sink {} failed again, new refresh is at {}, current time {}", + new Object[] { sink.getName(), refresh, System.currentTimeMillis() }); } + private void adjustRefresh() { refresh = System.currentTimeMillis() - + Math.min(maxPenalty, (1 << sequentialFailures) * FAILURE_PENALTY); + + Math.min(maxPenalty, (1 << sequentialFailures) * FAILURE_PENALTY); } } - private static final Logger logger = LoggerFactory - .getLogger(FailoverSinkProcessor.class); + private static final Logger logger = LoggerFactory.getLogger(FailoverSinkProcessor.class); private static final String PRIORITY_PREFIX = "priority."; private static final String MAX_PENALTY_PREFIX = "maxpenalty"; @@ -121,15 +124,15 @@ public class FailoverSinkProcessor extends AbstractSinkProcessor { failedSinks = new PriorityQueue<FailedSink>(); Integer nextPrio = 0; String maxPenaltyStr = context.getString(MAX_PENALTY_PREFIX); - if(maxPenaltyStr == null) { + if (maxPenaltyStr == null) { maxPenalty = DEFAULT_MAX_PENALTY; } else { try { maxPenalty = Integer.parseInt(maxPenaltyStr); } catch (NumberFormatException e) { logger.warn("{} is not a valid value for {}", - new Object[] { maxPenaltyStr, MAX_PENALTY_PREFIX }); - maxPenalty = DEFAULT_MAX_PENALTY; + new Object[] { maxPenaltyStr, MAX_PENALTY_PREFIX }); + maxPenalty = DEFAULT_MAX_PENALTY; } } for (Entry<String, Sink> entry : sinks.entrySet()) { @@ -140,7 +143,7 @@ public class FailoverSinkProcessor extends AbstractSinkProcessor { } catch (Exception e) { priority = --nextPrio; } - if(!liveSinks.containsKey(priority)) { + if (!liveSinks.containsKey(priority)) { liveSinks.put(priority, sinks.get(entry.getKey())); } else { logger.warn("Sink {} not added to FailverSinkProcessor as priority" + @@ -155,7 +158,7 @@ public class FailoverSinkProcessor extends AbstractSinkProcessor { public Status process() throws EventDeliveryException { // Retry any failed sinks that have gone through their "cooldown" period Long now = System.currentTimeMillis(); - while(!failedSinks.isEmpty() && failedSinks.peek().getRefresh() < now) { + while (!failedSinks.isEmpty() && failedSinks.peek().getRefresh() < now) { FailedSink cur = failedSinks.poll(); Status s; try { @@ -177,7 +180,7 @@ public class FailoverSinkProcessor extends AbstractSinkProcessor { } Status ret = null; - while(activeSink != null) { + while (activeSink != null) { try { ret = activeSink.process(); return ret; @@ -196,8 +199,8 @@ public class FailoverSinkProcessor extends AbstractSinkProcessor { Integer key = liveSinks.lastKey(); failedSinks.add(new FailedSink(key, activeSink, 1)); liveSinks.remove(key); - if(liveSinks.isEmpty()) return null; - if(liveSinks.lastKey() != null) { + if (liveSinks.isEmpty()) return null; + if (liveSinks.lastKey() != null) { return liveSinks.get(liveSinks.lastKey()); } else { return null; http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/sink/LoadBalancingSinkProcessor.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/sink/LoadBalancingSinkProcessor.java b/flume-ng-core/src/main/java/org/apache/flume/sink/LoadBalancingSinkProcessor.java index 2d85756..ac0781e 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/sink/LoadBalancingSinkProcessor.java +++ b/flume-ng-core/src/main/java/org/apache/flume/sink/LoadBalancingSinkProcessor.java @@ -56,12 +56,14 @@ import org.apache.flume.util.RoundRobinOrderSelector; * * <p> * Sample configuration: - * <pre> - * host1.sinkgroups.group1.sinks = sink1 sink2 - * host1.sinkgroups.group1.processor.type = load_balance - * host1.sinkgroups.group1.processor.selector = <selector type> - * host1.sinkgroups.group1.processor.selector.selector_property = <value> - * </pre> + * <pre> + * {@code + * host1.sinkgroups.group1.sinks = sink1 sink2 + * host1.sinkgroups.group1.processor.type = load_balance + * host1.sinkgroups.group1.processor.selector = <selector type> + * host1.sinkgroups.group1.processor.selector.selector_property = <value> + * } + * </pre> * * The value of processor.selector could be either <tt>round_robin</tt> for * round-robin scheme of load-balancing or <tt>random</tt> for random @@ -200,26 +202,28 @@ public class LoadBalancingSinkProcessor extends AbstractSinkProcessor { } /** - * A sink selector that implements the round-robin sink selection policy. - * This implementation is not MT safe. + * <p>A sink selector that implements the round-robin sink selection policy. + * This implementation is not MT safe.</p> + * + * <p>Unfortunately both implementations need to override the base implementation + * in AbstractSinkSelector class, because any custom sink selectors + * will break if this stuff is moved to that class.</p> */ - //Unfortunately both implementations need to override the base implementation - //in AbstractSinkSelector class, because any custom sink selectors - //will break if this stuff is moved to that class. private static class RoundRobinSinkSelector extends AbstractSinkSelector { - private OrderSelector<Sink> selector; - RoundRobinSinkSelector(boolean backoff){ + + RoundRobinSinkSelector(boolean backoff) { selector = new RoundRobinOrderSelector<Sink>(backoff); } @Override - public void configure(Context context){ + public void configure(Context context) { super.configure(context); if (maxTimeOut != 0) { selector.setMaxTimeOut(maxTimeOut); } } + @Override public Iterator<Sink> createSinkIterator() { return selector.createIterator(); @@ -245,7 +249,7 @@ public class LoadBalancingSinkProcessor extends AbstractSinkProcessor { private OrderSelector<Sink> selector; - RandomOrderSinkSelector(boolean backoff){ + RandomOrderSinkSelector(boolean backoff) { selector = new RandomOrderSelector<Sink>(backoff); } http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/sink/LoggerSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/sink/LoggerSink.java b/flume-ng-core/src/main/java/org/apache/flume/sink/LoggerSink.java index 9cf9bc2..d219be7 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/sink/LoggerSink.java +++ b/flume-ng-core/src/main/java/org/apache/flume/sink/LoggerSink.java @@ -71,8 +71,9 @@ public class LoggerSink extends AbstractSink implements Configurable { try { maxBytesToLog = Integer.parseInt(strMaxBytes); } catch (NumberFormatException e) { - logger.warn(String.format("Unable to convert %s to integer, using default value(%d) for maxByteToDump", - strMaxBytes, DEFAULT_MAX_BYTE_DUMP)); + logger.warn(String.format( + "Unable to convert %s to integer, using default value(%d) for maxByteToDump", + strMaxBytes, DEFAULT_MAX_BYTE_DUMP)); maxBytesToLog = DEFAULT_MAX_BYTE_DUMP; } } http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/sink/NullSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/sink/NullSink.java b/flume-ng-core/src/main/java/org/apache/flume/sink/NullSink.java index cada6ec..eb00e15 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/sink/NullSink.java +++ b/flume-ng-core/src/main/java/org/apache/flume/sink/NullSink.java @@ -92,7 +92,7 @@ public class NullSink extends AbstractSink implements Configurable { if (++eventCounter % logEveryNEvents == 0) { logger.info("Null sink {} successful processed {} events.", getName(), eventCounter); } - if(event == null) { + if (event == null) { status = Status.BACKOFF; break; } http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java b/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java index b97d404..ee4b947 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java +++ b/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java @@ -112,7 +112,7 @@ public class RollingFileSink extends AbstractSink implements Configurable { super.start(); pathController.setBaseDirectory(directory); - if(rollInterval > 0){ + if (rollInterval > 0) { rollService = Executors.newScheduledThreadPool( 1, @@ -136,7 +136,7 @@ public class RollingFileSink extends AbstractSink implements Configurable { } }, rollInterval, rollInterval, TimeUnit.SECONDS); - } else{ + } else { logger.info("RollInterval is not valid, file rolling will not happen."); } logger.info("RollingFileSink {} started.", getName()); @@ -251,17 +251,15 @@ public class RollingFileSink extends AbstractSink implements Configurable { serializer = null; } } - if(rollInterval > 0){ + if (rollInterval > 0) { rollService.shutdown(); while (!rollService.isTerminated()) { try { rollService.awaitTermination(1, TimeUnit.SECONDS); } catch (InterruptedException e) { - logger - .debug( - "Interrupted while waiting for roll service to stop. " + - "Please report this.", e); + logger.debug("Interrupted while waiting for roll service to stop. " + + "Please report this.", e); } } } http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/sink/SinkProcessorFactory.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/sink/SinkProcessorFactory.java b/flume-ng-core/src/main/java/org/apache/flume/sink/SinkProcessorFactory.java index dcdcad2..084072f 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/sink/SinkProcessorFactory.java +++ b/flume-ng-core/src/main/java/org/apache/flume/sink/SinkProcessorFactory.java @@ -55,8 +55,7 @@ public class SinkProcessorFactory { * processor configuration */ @SuppressWarnings("unchecked") - public static SinkProcessor getProcessor(Context context, - List<Sink> sinks) { + public static SinkProcessor getProcessor(Context context, List<Sink> sinks) { Preconditions.checkNotNull(context); Preconditions.checkNotNull(sinks); Preconditions.checkArgument(!sinks.isEmpty()); @@ -71,7 +70,7 @@ public class SinkProcessorFactory { logger.warn("Sink Processor type {} is a custom type", typeStr); } - if(!type.equals(SinkProcessorType.OTHER)) { + if (!type.equals(SinkProcessorType.OTHER)) { processorClassName = type.getSinkProcessorClassName(); } http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/sink/ThriftSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/sink/ThriftSink.java b/flume-ng-core/src/main/java/org/apache/flume/sink/ThriftSink.java index 32021d3..bcab731 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/sink/ThriftSink.java +++ b/flume-ng-core/src/main/java/org/apache/flume/sink/ThriftSink.java @@ -106,15 +106,15 @@ public class ThriftSink extends AbstractRpcSink { protected RpcClient initializeRpcClient(Properties props) { // Only one thread is enough, since only one sink thread processes // transactions at any given time. Each sink owns its own Rpc client. - props.setProperty(RpcClientConfigurationConstants - .CONFIG_CONNECTION_POOL_SIZE, String.valueOf(1)); - boolean enableKerberos = Boolean.parseBoolean(props.getProperty( - RpcClientConfigurationConstants.KERBEROS_KEY, "false")); - if(enableKerberos) { + props.setProperty(RpcClientConfigurationConstants.CONFIG_CONNECTION_POOL_SIZE, + String.valueOf(1)); + boolean enableKerberos = Boolean.parseBoolean( + props.getProperty(RpcClientConfigurationConstants.KERBEROS_KEY, "false")); + if (enableKerberos) { return SecureRpcClientFactory.getThriftInstance(props); } else { props.setProperty(RpcClientConfigurationConstants.CONFIG_CLIENT_TYPE, - RpcClientFactory.ClientType.THRIFT.name()); + RpcClientFactory.ClientType.THRIFT.name()); return RpcClientFactory.getInstance(props); } } http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/source/AbstractEventDrivenSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/AbstractEventDrivenSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/AbstractEventDrivenSource.java index 89bd357..08f9b84 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/AbstractEventDrivenSource.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/AbstractEventDrivenSource.java @@ -29,9 +29,9 @@ import org.apache.flume.annotations.InterfaceStability; */ @InterfaceAudience.Public @InterfaceStability.Evolving -public abstract class AbstractEventDrivenSource extends BasicSourceSemantics - implements EventDrivenSource { - +public abstract class AbstractEventDrivenSource + extends BasicSourceSemantics + implements EventDrivenSource { public AbstractEventDrivenSource() { super(); } http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/source/AbstractPollableSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/AbstractPollableSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/AbstractPollableSource.java index 33e1acc..97f6c99 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/AbstractPollableSource.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/AbstractPollableSource.java @@ -37,8 +37,8 @@ import org.apache.flume.annotations.InterfaceStability; */ @InterfaceAudience.Public @InterfaceStability.Evolving -public abstract class AbstractPollableSource extends BasicSourceSemantics - implements PollableSource { +public abstract class AbstractPollableSource + extends BasicSourceSemantics implements PollableSource { long backoffSleepIncrement = PollableSourceConstants.DEFAULT_BACKOFF_SLEEP_INCREMENT; long maxBackoffSleep = PollableSourceConstants.DEFAULT_MAX_BACKOFF_SLEEP; @@ -46,14 +46,16 @@ public abstract class AbstractPollableSource extends BasicSourceSemantics public AbstractPollableSource() { super(); } + public Status process() throws EventDeliveryException { Exception exception = getStartException(); if (exception != null) { throw new FlumeException("Source had error configuring or starting", exception); } - if(!isStarted()) { - throw new EventDeliveryException("Source is not started. It is in '" + getLifecycleState() + "' state"); + if (!isStarted()) { + throw new EventDeliveryException("Source is not started. It is in '" + + getLifecycleState() + "' state"); } return doProcess(); } http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/source/AbstractSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/AbstractSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/AbstractSource.java index 0855de3..88ef665 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/AbstractSource.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/AbstractSource.java @@ -29,7 +29,7 @@ import com.google.common.base.Preconditions; @InterfaceAudience.Public @InterfaceStability.Stable -abstract public class AbstractSource implements Source { +public abstract class AbstractSource implements Source { private ChannelProcessor channelProcessor; private String name; @@ -79,6 +79,6 @@ abstract public class AbstractSource implements Source { } public String toString() { - return this.getClass().getName() + "{name:" + name + ",state:" + lifecycleState +"}"; + return this.getClass().getName() + "{name:" + name + ",state:" + lifecycleState + "}"; } } http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java index 6eb6a0a..8b9b956 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java @@ -21,18 +21,6 @@ package org.apache.flume.source; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; -import java.io.FileInputStream; -import java.net.InetSocketAddress; -import java.security.KeyStore; -import java.security.Security; -import java.util.*; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import javax.net.ssl.KeyManagerFactory; -import javax.net.ssl.SSLContext; -import javax.net.ssl.SSLEngine; - import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.avro.ipc.NettyServer; import org.apache.avro.ipc.NettyTransceiver; @@ -53,10 +41,10 @@ import org.apache.flume.instrumentation.SourceCounter; import org.apache.flume.source.avro.AvroFlumeEvent; import org.apache.flume.source.avro.AvroSourceProtocol; import org.apache.flume.source.avro.Status; -import org.jboss.netty.channel.ChannelPipelineFactory; import org.jboss.netty.channel.ChannelPipeline; -import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; +import org.jboss.netty.channel.ChannelPipelineFactory; import org.jboss.netty.channel.Channels; +import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; import org.jboss.netty.handler.codec.compression.ZlibDecoder; import org.jboss.netty.handler.codec.compression.ZlibEncoder; import org.jboss.netty.handler.ipfilter.IpFilterRule; @@ -66,6 +54,23 @@ import org.jboss.netty.handler.ssl.SslHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLEngine; +import java.io.FileInputStream; +import java.net.InetSocketAddress; +import java.security.KeyStore; +import java.security.Security; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + /** * <p> * A {@link Source} implementation that receives Avro events from clients that @@ -205,7 +210,7 @@ public class AvroSource extends AbstractSource implements EventDrivenSource, if (enableIpFilter) { patternRuleConfigDefinition = context.getString(IP_FILTER_RULES_KEY); if (patternRuleConfigDefinition == null || - patternRuleConfigDefinition.trim().isEmpty()) { + patternRuleConfigDefinition.trim().isEmpty()) { throw new FlumeException( "ipFilter is configured with true but ipFilterRules is not defined:" + " "); @@ -241,7 +246,7 @@ public class AvroSource extends AbstractSource implements EventDrivenSource, sourceCounter.start(); super.start(); final NettyServer srv = (NettyServer)server; - connectionCountUpdater.scheduleWithFixedDelay(new Runnable(){ + connectionCountUpdater.scheduleWithFixedDelay(new Runnable() { @Override public void run() { @@ -256,22 +261,17 @@ public class AvroSource extends AbstractSource implements EventDrivenSource, private NioServerSocketChannelFactory initSocketChannelFactory() { NioServerSocketChannelFactory socketChannelFactory; if (maxThreads <= 0) { - socketChannelFactory = new NioServerSocketChannelFactory - (Executors.newCachedThreadPool(new ThreadFactoryBuilder(). - setNameFormat("Avro " + NettyTransceiver.class.getSimpleName() - + " Boss-%d").build()), - Executors.newCachedThreadPool(new ThreadFactoryBuilder(). - setNameFormat("Avro " + NettyTransceiver.class.getSimpleName() - + " I/O Worker-%d").build())); + socketChannelFactory = new NioServerSocketChannelFactory( + Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat( + "Avro " + NettyTransceiver.class.getSimpleName() + " Boss-%d").build()), + Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat( + "Avro " + NettyTransceiver.class.getSimpleName() + " I/O Worker-%d").build())); } else { socketChannelFactory = new NioServerSocketChannelFactory( - Executors.newCachedThreadPool(new ThreadFactoryBuilder(). - setNameFormat( - "Avro " + NettyTransceiver.class.getSimpleName() - + " Boss-%d").build()), - Executors.newFixedThreadPool(maxThreads, new ThreadFactoryBuilder(). - setNameFormat("Avro " + NettyTransceiver.class.getSimpleName() + - " I/O Worker-%d").build())); + Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat( + "Avro " + NettyTransceiver.class.getSimpleName() + " Boss-%d").build()), + Executors.newFixedThreadPool(maxThreads, new ThreadFactoryBuilder().setNameFormat( + "Avro " + NettyTransceiver.class.getSimpleName() + " I/O Worker-%d").build())); } return socketChannelFactory; } @@ -309,7 +309,7 @@ public class AvroSource extends AbstractSource implements EventDrivenSource, } sourceCounter.stop(); connectionCountUpdater.shutdown(); - while(!connectionCountUpdater.isTerminated()){ + while (!connectionCountUpdater.isTerminated()) { try { Thread.sleep(100); } catch (InterruptedException ex) { @@ -399,44 +399,40 @@ public class AvroSource extends AbstractSource implements EventDrivenSource, return Status.OK; } - private PatternRule generateRule( - String patternRuleDefinition) throws FlumeException { + private PatternRule generateRule(String patternRuleDefinition) throws FlumeException { patternRuleDefinition = patternRuleDefinition.trim(); //first validate the format int firstColonIndex = patternRuleDefinition.indexOf(":"); if (firstColonIndex == -1) { throw new FlumeException( - "Invalid ipFilter patternRule '" + patternRuleDefinition + + "Invalid ipFilter patternRule '" + patternRuleDefinition + "' should look like <'allow' or 'deny'>:<'ip' or " + "'name'>:<pattern>"); } else { - String ruleAccessFlag = patternRuleDefinition.substring(0, - firstColonIndex); - int secondColonIndex = patternRuleDefinition.indexOf(":", - firstColonIndex + 1); - if ((!ruleAccessFlag.equals("allow") && - !ruleAccessFlag.equals("deny")) || secondColonIndex == -1) { + String ruleAccessFlag = patternRuleDefinition.substring(0, firstColonIndex); + int secondColonIndex = patternRuleDefinition.indexOf(":", firstColonIndex + 1); + if ((!ruleAccessFlag.equals("allow") && !ruleAccessFlag.equals("deny")) || + secondColonIndex == -1) { throw new FlumeException( - "Invalid ipFilter patternRule '" + patternRuleDefinition + + "Invalid ipFilter patternRule '" + patternRuleDefinition + "' should look like <'allow' or 'deny'>:<'ip' or " + "'name'>:<pattern>"); } String patternTypeFlag = patternRuleDefinition.substring( - firstColonIndex + 1, secondColonIndex); - if ((!patternTypeFlag.equals("ip") && - !patternTypeFlag.equals("name"))) { + firstColonIndex + 1, secondColonIndex); + if ((!patternTypeFlag.equals("ip") && !patternTypeFlag.equals("name"))) { throw new FlumeException( - "Invalid ipFilter patternRule '" + patternRuleDefinition + + "Invalid ipFilter patternRule '" + patternRuleDefinition + "' should look like <'allow' or 'deny'>:<'ip' or " + "'name'>:<pattern>"); } boolean isAllow = ruleAccessFlag.equals("allow"); String patternRuleString = (patternTypeFlag.equals("ip") ? "i" : "n") - + ":" + patternRuleDefinition.substring(secondColonIndex + 1); + + ":" + patternRuleDefinition.substring(secondColonIndex + 1); logger.info("Adding ipFilter PatternRule: " - + (isAllow ? "Allow" : "deny") + " " + patternRuleString); + + (isAllow ? "Allow" : "deny") + " " + patternRuleString); return new PatternRule(isAllow, patternRuleString); } } @@ -458,9 +454,9 @@ public class AvroSource extends AbstractSource implements EventDrivenSource, private String patternRuleConfigDefinition; public AdvancedChannelPipelineFactory(boolean enableCompression, - boolean enableSsl, String keystore, String keystorePassword, - String keystoreType, boolean enableIpFilter, - String patternRuleConfigDefinition) { + boolean enableSsl, String keystore, String keystorePassword, + String keystoreType, boolean enableIpFilter, + String patternRuleConfigDefinition) { this.enableCompression = enableCompression; this.enableSsl = enableSsl; this.keystore = keystore; @@ -505,7 +501,6 @@ public class AvroSource extends AbstractSource implements EventDrivenSource, pipeline.addFirst("inflater", new ZlibDecoder()); } - if (enableSsl) { SSLEngine sslEngine = createServerSSLContext().createSSLEngine(); sslEngine.setUseClientMode(false); @@ -527,11 +522,10 @@ public class AvroSource extends AbstractSource implements EventDrivenSource, if (enableIpFilter) { logger.info("Setting up ipFilter with the following rule definition: " + - patternRuleConfigDefinition); + patternRuleConfigDefinition); IpFilterRuleHandler ipFilterHandler = new IpFilterRuleHandler(); ipFilterHandler.addAll(rules); - logger.info( - "Adding ipFilter with " + ipFilterHandler.size() + " rules"); + logger.info("Adding ipFilter with " + ipFilterHandler.size() + " rules"); pipeline.addFirst("ipFilter", ipFilterHandler); } http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/source/BasicSourceSemantics.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/BasicSourceSemantics.java b/flume-ng-core/src/main/java/org/apache/flume/source/BasicSourceSemantics.java index d2672b5..931e1e4 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/BasicSourceSemantics.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/BasicSourceSemantics.java @@ -44,8 +44,7 @@ import com.google.common.base.Throwables; @InterfaceAudience.Public @InterfaceStability.Evolving public abstract class BasicSourceSemantics implements Source, Configurable { - private static final Logger logger = LoggerFactory - .getLogger(BasicSourceSemantics.class); + private static final Logger logger = LoggerFactory.getLogger(BasicSourceSemantics.class); private Exception exception; private ChannelProcessor channelProcessor; private String name; @@ -54,9 +53,10 @@ public abstract class BasicSourceSemantics implements Source, Configurable { public BasicSourceSemantics() { lifecycleState = LifecycleState.IDLE; } + @Override public synchronized void configure(Context context) { - if(isStarted()) { + if (isStarted()) { throw new IllegalStateException("Configure called when started"); } else { try { @@ -126,8 +126,7 @@ public abstract class BasicSourceSemantics implements Source, Configurable { } public String toString() { - return this.getClass().getName() + "{name:" + name + ",state:" - + lifecycleState +"}"; + return this.getClass().getName() + "{name:" + name + ",state:" + lifecycleState + "}"; } protected boolean isStarted() { http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/source/DefaultSourceFactory.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/DefaultSourceFactory.java b/flume-ng-core/src/main/java/org/apache/flume/source/DefaultSourceFactory.java index bb9d3f1..f2da332 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/DefaultSourceFactory.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/DefaultSourceFactory.java @@ -46,13 +46,12 @@ public class DefaultSourceFactory implements SourceFactory { return source; } catch (Exception ex) { throw new FlumeException("Unable to create source: " + name - +", type: " + type + ", class: " + sourceClass.getName(), ex); + + ", type: " + type + ", class: " + sourceClass.getName(), ex); } } @SuppressWarnings("unchecked") @Override - public Class<? extends Source> getClass(String type) - throws FlumeException { + public Class<? extends Source> getClass(String type) throws FlumeException { String sourceClassName = type; SourceType srcType = SourceType.OTHER; try { http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java index 18e662c..52ea808 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java @@ -133,7 +133,8 @@ import java.nio.charset.Charset; * </tr> * <tr> * <td><tt>batchTimeout</tt></td> - * <td>Amount of time (in milliseconds) to wait, if the buffer size was not reached, before data is pushed downstream.</td> + * <td>Amount of time (in milliseconds) to wait, if the buffer size was not reached, + * before data is pushed downstream.</td> * <td>long</td> * <td>3000</td> * </tr> @@ -145,11 +146,9 @@ import java.nio.charset.Charset; * TODO * </p> */ -public class ExecSource extends AbstractSource implements EventDrivenSource, -Configurable { +public class ExecSource extends AbstractSource implements EventDrivenSource, Configurable { - private static final Logger logger = LoggerFactory - .getLogger(ExecSource.class); + private static final Logger logger = LoggerFactory.getLogger(ExecSource.class); private String shell; private String command; @@ -190,7 +189,7 @@ Configurable { @Override public void stop() { logger.info("Stopping exec source with command:{}", command); - if(runner != null) { + if (runner != null) { runner.setRestart(false); runner.kill(); } @@ -298,7 +297,7 @@ Configurable { "timedFlushExecService" + Thread.currentThread().getId() + "-%d").build()); try { - if(shell != null) { + if (shell != null) { String[] commandArgs = formulateShellCommand(shell, command); process = Runtime.getRuntime().exec(commandArgs); } else { @@ -320,14 +319,14 @@ Configurable { public void run() { try { synchronized (eventList) { - if(!eventList.isEmpty() && timeout()) { + if (!eventList.isEmpty() && timeout()) { flushEventBatch(eventList); } } } catch (Exception e) { logger.error("Exception occured when processing event batch", e); - if(e instanceof InterruptedException) { - Thread.currentThread().interrupt(); + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); } } } @@ -338,20 +337,20 @@ Configurable { synchronized (eventList) { sourceCounter.incrementEventReceivedCount(); eventList.add(EventBuilder.withBody(line.getBytes(charset))); - if(eventList.size() >= bufferCount || timeout()) { + if (eventList.size() >= bufferCount || timeout()) { flushEventBatch(eventList); } } } synchronized (eventList) { - if(!eventList.isEmpty()) { - flushEventBatch(eventList); - } + if (!eventList.isEmpty()) { + flushEventBatch(eventList); + } } } catch (Exception e) { logger.error("Failed while running command: " + command, e); - if(e instanceof InterruptedException) { + if (e instanceof InterruptedException) { Thread.currentThread().interrupt(); } } finally { @@ -364,7 +363,7 @@ Configurable { } exitCode = String.valueOf(kill()); } - if(restart) { + if (restart) { logger.info("Restarting in {}ms, exit code {}", restartThrottle, exitCode); try { @@ -375,17 +374,17 @@ Configurable { } else { logger.info("Command [" + command + "] exited with " + exitCode); } - } while(restart); + } while (restart); } - private void flushEventBatch(List<Event> eventList){ + private void flushEventBatch(List<Event> eventList) { channelProcessor.processEventBatch(eventList); sourceCounter.addToEventAcceptedCount(eventList.size()); eventList.clear(); lastPushToChannel = systemClock.currentTimeMillis(); } - private boolean timeout(){ + private boolean timeout() { return (systemClock.currentTimeMillis() - lastPushToChannel) >= batchTimeout; } @@ -398,7 +397,7 @@ Configurable { } public int kill() { - if(process != null) { + if (process != null) { synchronized (process) { process.destroy(); @@ -407,7 +406,7 @@ Configurable { // Stop the Thread that flushes periodically if (future != null) { - future.cancel(true); + future.cancel(true); } if (timedFlushService != null) { @@ -417,7 +416,7 @@ Configurable { timedFlushService.awaitTermination(500, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { logger.debug("Interrupted while waiting for exec executor service " - + "to stop. Just exiting."); + + "to stop. Just exiting."); Thread.currentThread().interrupt(); } } @@ -435,6 +434,7 @@ Configurable { this.restart = restart; } } + private static class StderrReader extends Thread { private BufferedReader input; private boolean logStderr; @@ -449,8 +449,8 @@ Configurable { try { int i = 0; String line = null; - while((line = input.readLine()) != null) { - if(logStderr) { + while ((line = input.readLine()) != null) { + if (logStderr) { // There is no need to read 'line' with a charset // as we do not to propagate it. // It is in UTF-16 and would be printed in UTF-8 format. @@ -461,7 +461,7 @@ Configurable { logger.info("StderrLogger exiting", e); } finally { try { - if(input != null) { + if (input != null) { input.close(); } } catch (IOException ex) {
