http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/source/ExecSourceConfigurationConstants.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/ExecSourceConfigurationConstants.java b/flume-ng-core/src/main/java/org/apache/flume/source/ExecSourceConfigurationConstants.java index 957ec7f..ef1d51c 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/ExecSourceConfigurationConstants.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/ExecSourceConfigurationConstants.java @@ -18,7 +18,6 @@ */ package org.apache.flume.source; - public class ExecSourceConfigurationConstants { /** @@ -50,7 +49,7 @@ public class ExecSourceConfigurationConstants { * to data is pushed downstream: : default 3000 ms */ public static final String CONFIG_BATCH_TIME_OUT = "batchTimeout"; - public static final long DEFAULT_BATCH_TIME_OUT = 3000l; + public static final long DEFAULT_BATCH_TIME_OUT = 3000L; /** * Charset for reading input
http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/source/MultiportSyslogTCPSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/MultiportSyslogTCPSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/MultiportSyslogTCPSource.java index 87f0db1..b9f2438 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/MultiportSyslogTCPSource.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/MultiportSyslogTCPSource.java @@ -113,7 +113,7 @@ public class MultiportSyslogTCPSource extends AbstractSource implements portCharsets.clear(); { ImmutableMap<String, String> portCharsetCfg = context.getSubProperties( - SyslogSourceConfigurationConstants.CONFIG_PORT_CHARSET_PREFIX); + SyslogSourceConfigurationConstants.CONFIG_PORT_CHARSET_PREFIX); for (Map.Entry<String, String> entry : portCharsetCfg.entrySet()) { String portStr = entry.getKey(); String charsetStr = entry.getValue(); @@ -386,7 +386,7 @@ public class MultiportSyslogTCPSource extends AbstractSource implements */ static class LineSplitter { - private final static byte NEWLINE = '\n'; + private static final byte NEWLINE = '\n'; private final int maxLineLength; public LineSplitter(int maxLineLength) { http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSourceConfigurationConstants.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSourceConfigurationConstants.java b/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSourceConfigurationConstants.java index 1720d5f..f3efddb 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSourceConfigurationConstants.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSourceConfigurationConstants.java @@ -42,9 +42,9 @@ public class NetcatSourceConfigurationConstants { public static final String CONFIG_MAX_LINE_LENGTH = "max-line-length"; public static final int DEFAULT_MAX_LINE_LENGTH = 512; - /** - * Encoding for the netcat source - */ + /** + * Encoding for the netcat source + */ public static final String CONFIG_SOURCE_ENCODING = "encoding"; public static final String DEFAULT_ENCODING = "utf-8"; } http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/source/PollableSourceRunner.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/PollableSourceRunner.java b/flume-ng-core/src/main/java/org/apache/flume/source/PollableSourceRunner.java index ea37703..7357793 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/PollableSourceRunner.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/PollableSourceRunner.java @@ -48,8 +48,7 @@ import org.slf4j.LoggerFactory; */ public class PollableSourceRunner extends SourceRunner { - private static final Logger logger = LoggerFactory - .getLogger(PollableSourceRunner.class); + private static final Logger logger = LoggerFactory.getLogger(PollableSourceRunner.class); private AtomicBoolean shouldStop; @@ -94,10 +93,7 @@ public class PollableSourceRunner extends SourceRunner { runnerThread.interrupt(); runnerThread.join(); } catch (InterruptedException e) { - logger - .warn( - "Interrupted while waiting for polling runner to stop. Please report this.", - e); + logger.warn("Interrupted while waiting for polling runner to stop. Please report this.", e); Thread.currentThread().interrupt(); } http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java index 1214635..9f831bd 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java @@ -71,7 +71,7 @@ public class SequenceGeneratorSource extends AbstractPollableSource implements int i = 0; try { if (batchSize <= 1) { - if(eventsSent < totalEvents) { + if (eventsSent < totalEvents) { getChannelProcessor().processEvent( EventBuilder.withBody(String.valueOf(sequence++).getBytes())); sourceCounter.incrementEventAcceptedCount(); @@ -82,7 +82,7 @@ public class SequenceGeneratorSource extends AbstractPollableSource implements } else { batchArrayList.clear(); for (i = 0; i < batchSize; i++) { - if(eventsSent < totalEvents){ + if (eventsSent < totalEvents) { batchArrayList.add(i, EventBuilder.withBody(String .valueOf(sequence++).getBytes())); eventsSent++; @@ -90,7 +90,7 @@ public class SequenceGeneratorSource extends AbstractPollableSource implements status = Status.BACKOFF; } } - if(!batchArrayList.isEmpty()) { + if (!batchArrayList.isEmpty()) { getChannelProcessor().processEventBatch(batchArrayList); sourceCounter.incrementAppendBatchAcceptedCount(); sourceCounter.addToEventAcceptedCount(batchArrayList.size()); http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java index 3af3e53..d88cc1d 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java @@ -20,7 +20,11 @@ package org.apache.flume.source; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; -import org.apache.flume.*; +import org.apache.flume.ChannelException; +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.flume.EventDrivenSource; +import org.apache.flume.FlumeException; import org.apache.flume.client.avro.ReliableSpoolingFileEventReader; import org.apache.flume.conf.Configurable; import org.apache.flume.instrumentation.SourceCounter; @@ -39,11 +43,10 @@ import java.util.concurrent.TimeUnit; import static org.apache.flume.source.SpoolDirectorySourceConfigurationConstants.*; -public class SpoolDirectorySource extends AbstractSource implements -Configurable, EventDrivenSource { +public class SpoolDirectorySource extends AbstractSource + implements Configurable, EventDrivenSource { - private static final Logger logger = LoggerFactory - .getLogger(SpoolDirectorySource.class); + private static final Logger logger = LoggerFactory.getLogger(SpoolDirectorySource.class); /* Config options */ private String completedSuffix; @@ -124,8 +127,7 @@ Configurable, EventDrivenSource { super.stop(); sourceCounter.stop(); - logger.info("SpoolDir source {} stopped. Metrics: {}", getName(), - sourceCounter); + logger.info("SpoolDir source {} stopped. Metrics: {}", getName(), sourceCounter); } @Override @@ -247,8 +249,8 @@ Configurable, EventDrivenSource { reader.commit(); } catch (ChannelException ex) { logger.warn("The channel is full, and cannot write data now. The " + - "source will try again after " + String.valueOf(backoffInterval) + - " milliseconds"); + "source will try again after " + String.valueOf(backoffInterval) + + " milliseconds"); hitChannelException = true; if (backoff) { TimeUnit.MILLISECONDS.sleep(backoffInterval); http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java index 32b7837..5859aa2 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java @@ -91,6 +91,7 @@ public class SpoolDirectorySourceConfigurationConstants { public enum ConsumeOrder { OLDEST, YOUNGEST, RANDOM } + public static final String CONSUME_ORDER = "consumeOrder"; public static final ConsumeOrder DEFAULT_CONSUME_ORDER = ConsumeOrder.OLDEST; http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/source/StressSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/StressSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/StressSource.java index 9aa1477..aa95294 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/StressSource.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/StressSource.java @@ -53,11 +53,9 @@ import org.slf4j.LoggerFactory; * * See {@link StressSource#configure(Context)} for configuration options. */ -public class StressSource extends AbstractPollableSource implements - Configurable { +public class StressSource extends AbstractPollableSource implements Configurable { - private static final Logger logger = LoggerFactory - .getLogger(StressSource.class); + private static final Logger logger = LoggerFactory.getLogger(StressSource.class); private CounterGroup counterGroup; private byte[] buffer; @@ -102,8 +100,7 @@ public class StressSource extends AbstractPollableSource implements //Create event objects in case of batch test eventBatchList = new ArrayList<Event>(); - for (int i = 0; i < batchSize; i++) - { + for (int i = 0; i < batchSize; i++) { eventBatchList.add(EventBuilder.withBody(buffer)); } } else { http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/source/SyslogParser.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogParser.java b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogParser.java index b57ffac..de727f6 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogParser.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogParser.java @@ -115,7 +115,7 @@ public class SyslogParser { // remember version string String version = null; if (msgLen > curPos + 2 && "1 ".equals(msg.substring(curPos, curPos + 2))) { - version = msg.substring(curPos, curPos+1); + version = msg.substring(curPos, curPos + 1); headers.put(SyslogUtils.SYSLOG_VERSION, version); curPos += 2; } @@ -313,18 +313,19 @@ public class SyslogParser { try { date = rfc3164Format.parseDateTime(ts); } catch (IllegalArgumentException e) { - logger.debug("rfc3164 date parse failed on ("+ts+"): invalid format", e); + logger.debug("rfc3164 date parse failed on (" + ts + "): invalid format", e); return 0; } // rfc3164 dates are really dumb. /* - * Some code to try and add some smarts to the year insertion as without a year in the message we - * need to make some educated guessing. + * Some code to try and add some smarts to the year insertion as without a year in the message + * we need to make some educated guessing. * First set the "fixed" to be the timestamp with the current year. * If the "fixed" time is more than one month in the future then roll it back a year. * If the "fixed" time is more than eleven months in the past then roll it forward a year. - * This gives us a 12 month rolling window (11 months in the past, 1 month in the future) of timestamps. + * This gives us a 12 month rolling window (11 months in the past, 1 month in the future) of + * timestamps. */ if (date != null) { @@ -332,7 +333,7 @@ public class SyslogParser { // flume clock is ahead or there is some latency, and the year rolled if (fixed.isAfter(now) && fixed.minusMonths(1).isAfter(now)) { - fixed = date.minusYears(1); + fixed = date.minusYears(1); // flume clock is behind and the year rolled } else if (fixed.isBefore(now) && fixed.plusMonths(1).isBefore(now)) { fixed = date.plusYears(1); http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java index bd87151..185c00c 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java @@ -48,11 +48,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class SyslogTcpSource extends AbstractSource -implements EventDrivenSource, Configurable { + implements EventDrivenSource, Configurable { + private static final Logger logger = LoggerFactory.getLogger(SyslogTcpSource.class); - - private static final Logger logger = LoggerFactory - .getLogger(SyslogTcpSource.class); private int port; private String host = null; private Channel nettyChannel; @@ -65,7 +63,7 @@ implements EventDrivenSource, Configurable { private SyslogUtils syslogUtils = new SyslogUtils(); - public void setEventSize(int eventSize){ + public void setEventSize(int eventSize) { syslogUtils.setEventSize(eventSize); } http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java index 47993dd..175bebb 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java @@ -35,14 +35,21 @@ import org.apache.flume.conf.Configurable; import org.apache.flume.conf.Configurables; import org.jboss.netty.bootstrap.ConnectionlessBootstrap; import org.jboss.netty.buffer.ChannelBuffer; -import org.jboss.netty.channel.*; +import org.jboss.netty.channel.AdaptiveReceiveBufferSizePredictorFactory; +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelHandlerContext; +import org.jboss.netty.channel.ChannelPipeline; +import org.jboss.netty.channel.ChannelPipelineFactory; +import org.jboss.netty.channel.Channels; +import org.jboss.netty.channel.MessageEvent; +import org.jboss.netty.channel.SimpleChannelHandler; import org.jboss.netty.channel.socket.oio.OioDatagramChannelFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class SyslogUDPSource extends AbstractSource - implements EventDrivenSource, Configurable { + implements EventDrivenSource, Configurable { private int port; private int maxsize = 1 << 16; // 64k is max allowable in RFC 5426 @@ -51,8 +58,7 @@ public class SyslogUDPSource extends AbstractSource private Map<String, String> formaterProp; private Set<String> keepFields; - private static final Logger logger = LoggerFactory - .getLogger(SyslogUDPSource.class); + private static final Logger logger = LoggerFactory.getLogger(SyslogUDPSource.class); private CounterGroup counterGroup = new CounterGroup(); @@ -96,20 +102,20 @@ public class SyslogUDPSource extends AbstractSource @Override public void start() { // setup Netty server - ConnectionlessBootstrap serverBootstrap = new ConnectionlessBootstrap - (new OioDatagramChannelFactory(Executors.newCachedThreadPool())); + ConnectionlessBootstrap serverBootstrap = new ConnectionlessBootstrap( + new OioDatagramChannelFactory(Executors.newCachedThreadPool())); final syslogHandler handler = new syslogHandler(); handler.setFormater(formaterProp); handler.setKeepFields(keepFields); serverBootstrap.setOption("receiveBufferSizePredictorFactory", - new AdaptiveReceiveBufferSizePredictorFactory(DEFAULT_MIN_SIZE, - DEFAULT_INITIAL_SIZE, maxsize)); + new AdaptiveReceiveBufferSizePredictorFactory(DEFAULT_MIN_SIZE, + DEFAULT_INITIAL_SIZE, maxsize)); serverBootstrap.setPipelineFactory(new ChannelPipelineFactory() { @Override public ChannelPipeline getPipeline() { - return Channels.pipeline(handler); + return Channels.pipeline(handler); } - }); + }); if (host == null) { nettyChannel = serverBootstrap.bind(new InetSocketAddress(port)); http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUtils.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUtils.java b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUtils.java index 4866183..43a10e1 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUtils.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUtils.java @@ -36,7 +36,6 @@ import java.util.Calendar; import java.util.Date; import java.util.HashMap; import java.util.HashSet; -import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Set; @@ -47,38 +46,38 @@ import java.util.regex.Pattern; @InterfaceAudience.Private @InterfaceStability.Evolving public class SyslogUtils { - final public static String SYSLOG_TIMESTAMP_FORMAT_RFC5424_2 = "yyyy-MM-dd'T'HH:mm:ss.SZ"; - final public static String SYSLOG_TIMESTAMP_FORMAT_RFC5424_1 = "yyyy-MM-dd'T'HH:mm:ss.S"; - final public static String SYSLOG_TIMESTAMP_FORMAT_RFC5424_3 = "yyyy-MM-dd'T'HH:mm:ssZ"; - final public static String SYSLOG_TIMESTAMP_FORMAT_RFC5424_4 = "yyyy-MM-dd'T'HH:mm:ss"; - final public static String SYSLOG_TIMESTAMP_FORMAT_RFC3164_1 = "yyyyMMM d HH:mm:ss"; + public static final String SYSLOG_TIMESTAMP_FORMAT_RFC5424_2 = "yyyy-MM-dd'T'HH:mm:ss.SZ"; + public static final String SYSLOG_TIMESTAMP_FORMAT_RFC5424_1 = "yyyy-MM-dd'T'HH:mm:ss.S"; + public static final String SYSLOG_TIMESTAMP_FORMAT_RFC5424_3 = "yyyy-MM-dd'T'HH:mm:ssZ"; + public static final String SYSLOG_TIMESTAMP_FORMAT_RFC5424_4 = "yyyy-MM-dd'T'HH:mm:ss"; + public static final String SYSLOG_TIMESTAMP_FORMAT_RFC3164_1 = "yyyyMMM d HH:mm:ss"; - final public static String SYSLOG_MSG_RFC5424_0 = + public static final String SYSLOG_MSG_RFC5424_0 = "(?:\\<(\\d{1,3})\\>)" + // priority - "(?:(\\d?)\\s?)" + // version - /* yyyy-MM-dd'T'HH:mm:ss.SZ or yyyy-MM-dd'T'HH:mm:ss.S+hh:mm or - (null stamp) */ - "(?:" + - "(\\d{4}[-]\\d{2}[-]\\d{2}[T]\\d{2}[:]\\d{2}[:]\\d{2}" + - "(?:\\.\\d{1,6})?(?:[+-]\\d{2}[:]\\d{2}|Z)?)|-)" + // stamp - "\\s" + // separator - "(?:([\\w][\\w\\d\\.@\\-]*)|-)" + // host name or - (null) - "\\s" + // separator - "(.*)$"; // body - - final public static String SYSLOG_MSG_RFC3164_0 = + "(?:(\\d?)\\s?)" + // version + /* yyyy-MM-dd'T'HH:mm:ss.SZ or yyyy-MM-dd'T'HH:mm:ss.S+hh:mm or - (null stamp) */ + "(?:" + + "(\\d{4}[-]\\d{2}[-]\\d{2}[T]\\d{2}[:]\\d{2}[:]\\d{2}" + + "(?:\\.\\d{1,6})?(?:[+-]\\d{2}[:]\\d{2}|Z)?)|-)" + // stamp + "\\s" + // separator + "(?:([\\w][\\w\\d\\.@\\-]*)|-)" + // host name or - (null) + "\\s" + // separator + "(.*)$"; // body + + public static final String SYSLOG_MSG_RFC3164_0 = "(?:\\<(\\d{1,3})\\>)" + - "(?:(\\d)?\\s?)" + // version - // stamp MMM d HH:mm:ss, single digit date has two spaces - "([A-Z][a-z][a-z]\\s{1,2}\\d{1,2}\\s\\d{2}[:]\\d{2}[:]\\d{2})" + - "\\s" + // separator - "([\\w][\\w\\d\\.@-]*)" + // host - "\\s(.*)$"; // body - - final public static int SYSLOG_PRIORITY_POS = 1; - final public static int SYSLOG_VERSION_POS = 2; - final public static int SYSLOG_TIMESTAMP_POS = 3; - final public static int SYSLOG_HOSTNAME_POS = 4; - final public static int SYSLOG_BODY_POS = 5; + "(?:(\\d)?\\s?)" + // version + // stamp MMM d HH:mm:ss, single digit date has two spaces + "([A-Z][a-z][a-z]\\s{1,2}\\d{1,2}\\s\\d{2}[:]\\d{2}[:]\\d{2})" + + "\\s" + // separator + "([\\w][\\w\\d\\.@-]*)" + // host + "\\s(.*)$"; // body + + public static final int SYSLOG_PRIORITY_POS = 1; + public static final int SYSLOG_VERSION_POS = 2; + public static final int SYSLOG_TIMESTAMP_POS = 3; + public static final int SYSLOG_HOSTNAME_POS = 4; + public static final int SYSLOG_BODY_POS = 5; private Mode m = Mode.START; private StringBuilder prio = new StringBuilder(); @@ -86,13 +85,13 @@ public class SyslogUtils { private static final Logger logger = LoggerFactory .getLogger(SyslogUtils.class); - final public static String SYSLOG_FACILITY = "Facility"; - final public static String SYSLOG_SEVERITY = "Severity"; - final public static String SYSLOG_PRIORITY = "Priority"; - final public static String SYSLOG_VERSION = "Version"; - final public static String EVENT_STATUS = "flume.syslog.status"; - final public static Integer MIN_SIZE = 10; - final public static Integer DEFAULT_SIZE = 2500; + public static final String SYSLOG_FACILITY = "Facility"; + public static final String SYSLOG_SEVERITY = "Severity"; + public static final String SYSLOG_PRIORITY = "Priority"; + public static final String SYSLOG_VERSION = "Version"; + public static final String EVENT_STATUS = "flume.syslog.status"; + public static final Integer MIN_SIZE = 10; + public static final Integer DEFAULT_SIZE = 2500; private final boolean isUdp; private boolean isBadEvent; private boolean isIncompleteEvent; @@ -106,6 +105,7 @@ public class SyslogUtils { public ArrayList<SimpleDateFormat> dateFormat = new ArrayList<SimpleDateFormat>(); public boolean addYear; } + private ArrayList<SyslogFormatter> formats = new ArrayList<SyslogFormatter>(); private String priority = null; @@ -115,10 +115,10 @@ public class SyslogUtils { private String msgBody = null; private static final String[] DEFAULT_FIELDS_TO_KEEP = { - SyslogSourceConfigurationConstants.CONFIG_KEEP_FIELDS_PRIORITY, - SyslogSourceConfigurationConstants.CONFIG_KEEP_FIELDS_VERSION, - SyslogSourceConfigurationConstants.CONFIG_KEEP_FIELDS_TIMESTAMP, - SyslogSourceConfigurationConstants.CONFIG_KEEP_FIELDS_HOSTNAME + SyslogSourceConfigurationConstants.CONFIG_KEEP_FIELDS_PRIORITY, + SyslogSourceConfigurationConstants.CONFIG_KEEP_FIELDS_VERSION, + SyslogSourceConfigurationConstants.CONFIG_KEEP_FIELDS_TIMESTAMP, + SyslogSourceConfigurationConstants.CONFIG_KEEP_FIELDS_HOSTNAME }; public static final String KEEP_FIELDS_ALL = "--all--"; @@ -211,22 +211,19 @@ public class SyslogUtils { return; } SyslogFormatter fmt1 = new SyslogFormatter(); - fmt1.regexPattern = Pattern.compile( formatProp.get( - SyslogSourceConfigurationConstants.CONFIG_REGEX) ); - if (formatProp.containsKey( - SyslogSourceConfigurationConstants.CONFIG_SEARCH)) { - fmt1.searchPattern.add(formatProp.get( - SyslogSourceConfigurationConstants.CONFIG_SEARCH)); + fmt1.regexPattern = Pattern.compile( + formatProp.get(SyslogSourceConfigurationConstants.CONFIG_REGEX)); + if (formatProp.containsKey(SyslogSourceConfigurationConstants.CONFIG_SEARCH)) { + fmt1.searchPattern.add( + formatProp.get(SyslogSourceConfigurationConstants.CONFIG_SEARCH)); } - if (formatProp.containsKey( - SyslogSourceConfigurationConstants.CONFIG_REPLACE)) { - fmt1.replacePattern.add(formatProp.get( - SyslogSourceConfigurationConstants.CONFIG_REPLACE)); + if (formatProp.containsKey(SyslogSourceConfigurationConstants.CONFIG_REPLACE)) { + fmt1.replacePattern.add( + formatProp.get(SyslogSourceConfigurationConstants.CONFIG_REPLACE)); } - if (formatProp.containsKey( - SyslogSourceConfigurationConstants.CONFIG_DATEFORMAT)) { - fmt1.dateFormat.add(new SimpleDateFormat(formatProp.get( - SyslogSourceConfigurationConstants.CONFIG_DATEFORMAT))); + if (formatProp.containsKey(SyslogSourceConfigurationConstants.CONFIG_DATEFORMAT)) { + fmt1.dateFormat.add(new SimpleDateFormat( + formatProp.get(SyslogSourceConfigurationConstants.CONFIG_DATEFORMAT))); } formats.add(0, fmt1); } @@ -266,20 +263,22 @@ public class SyslogUtils { enum Mode { START, PRIO, DATA - }; + } - public enum SyslogStatus{ + ; + + public enum SyslogStatus { OTHER("Unknown"), INVALID("Invalid"), INCOMPLETE("Incomplete"); private final String syslogStatus; - private SyslogStatus(String status){ + private SyslogStatus(String status) { syslogStatus = status; } - public String getSyslogStatus(){ + public String getSyslogStatus() { return this.syslogStatus; } } @@ -292,14 +291,14 @@ public class SyslogUtils { int sev = 0; int facility = 0; - if(!isBadEvent){ + if (!isBadEvent) { pri = Integer.parseInt(prio.toString()); sev = pri % 8; facility = pri / 8; formatHeaders(); } - Map <String, String> headers = new HashMap<String, String>(); + Map<String, String> headers = new HashMap<String, String>(); headers.put(SYSLOG_FACILITY, String.valueOf(facility)); headers.put(SYSLOG_SEVERITY, String.valueOf(sev)); if ((priority != null) && (priority.length() > 0)) { @@ -314,10 +313,10 @@ public class SyslogUtils { if ((hostName != null) && (hostName.length() > 0)) { headers.put("host", hostName); } - if(isBadEvent){ + if (isBadEvent) { logger.warn("Event created from Invalid Syslog data."); headers.put(EVENT_STATUS, SyslogStatus.INVALID.getSyslogStatus()); - } else if(isIncompleteEvent){ + } else if (isIncompleteEvent) { logger.warn("Event size larger than specified event size: {}. You should " + "consider increasing your event size.", maxSize); headers.put(EVENT_STATUS, SyslogStatus.INCOMPLETE.getSyslogStatus()); @@ -345,22 +344,22 @@ public class SyslogUtils { String eventStr = baos.toString(); String timeStampString = null; - for(int p=0; p < formats.size(); p++) { + for (int p = 0; p < formats.size(); p++) { SyslogFormatter fmt = formats.get(p); Pattern pattern = fmt.regexPattern; Matcher matcher = pattern.matcher(eventStr); - if (! matcher.matches()) { + if (!matcher.matches()) { continue; } MatchResult res = matcher.toMatchResult(); - for (int grp=1; grp <= res.groupCount(); grp++) { + for (int grp = 1; grp <= res.groupCount(); grp++) { String value = res.group(grp); if (grp == SYSLOG_TIMESTAMP_POS) { timeStampString = value; // apply available format replacements to timestamp if (value != null) { - for (int sp=0; sp < fmt.searchPattern.size(); sp++) { + for (int sp = 0; sp < fmt.searchPattern.size(); sp++) { value = value.replaceAll(fmt.searchPattern.get(sp), fmt.replacePattern.get(sp)); } // Add year to timestamp if needed @@ -373,14 +372,16 @@ public class SyslogUtils { Date parsedDate = fmt.dateFormat.get(dt).parse(value); /* * Some code to try and add some smarts to the year insertion. - * Original code just added the current year which was okay-ish, but around January 1st becomes - * pretty naïve. - * The current year is added above. This code, if the year has been added does the following: + * Original code just added the current year which was okay-ish, but around + * January 1st becomes pretty naïve. + * The current year is added above. This code, if the year has been added does + * the following: * 1. Compute what the computed time, but one month in the past would be. * 2. Compute what the computed time, but eleven months in the future would be. - * If the computed time is more than one month in the future then roll it back a year. - * If the computed time is more than eleven months in the past then roll it forward a year. - * This gives us a 12 month rolling window (11 months in the past, 1 month in the future) of timestamps. + * If the computed time is more than one month in the future then roll it back a + * year. If the computed time is more than eleven months in the past then roll it + * forward a year. This gives us a 12 month rolling window (11 months in the past, + * 1 month in the future) of timestamps. */ if (fmt.addYear) { Calendar cal = Calendar.getInstance(); @@ -393,13 +394,15 @@ public class SyslogUtils { calPlusElevenMonths.setTime(parsedDate); calPlusElevenMonths.add(Calendar.MONTH, +11); - if (cal.getTimeInMillis() > System.currentTimeMillis() && calMinusOneMonth.getTimeInMillis() > System.currentTimeMillis()) { + if (cal.getTimeInMillis() > System.currentTimeMillis() && + calMinusOneMonth.getTimeInMillis() > System.currentTimeMillis()) { //Need to roll back a year Calendar c1 = Calendar.getInstance(); c1.setTime(parsedDate); c1.add(Calendar.YEAR, -1); parsedDate = c1.getTime(); - } else if (cal.getTimeInMillis() < System.currentTimeMillis() && calPlusElevenMonths.getTimeInMillis() < System.currentTimeMillis() ) { + } else if (cal.getTimeInMillis() < System.currentTimeMillis() && + calPlusElevenMonths.getTimeInMillis() < System.currentTimeMillis()) { //Need to roll forward a year Calendar c1 = Calendar.getInstance(); c1.setTime(parsedDate); @@ -422,14 +425,15 @@ public class SyslogUtils { } else if (grp == SYSLOG_VERSION_POS) { version = value; } else if (grp == SYSLOG_BODY_POS) { - msgBody = addFieldsToBody(keepFields, value, priority, version, timeStampString, hostName); + msgBody = addFieldsToBody(keepFields, value, priority, version, + timeStampString, hostName); } } break; // we successfully parsed the message using this pattern } } - private void reset(){ + private void reset() { baos.reset(); m = Mode.START; prio.delete(0, prio.length()); @@ -441,7 +445,7 @@ public class SyslogUtils { } // extract relevant syslog data needed for building Flume event - public Event extractEvent(ChannelBuffer in){ + public Event extractEvent(ChannelBuffer in) { /* for protocol debugging ByteBuffer bb = in.toByteBuffer(); @@ -459,61 +463,61 @@ public class SyslogUtils { while (!doneReading && in.readable()) { b = in.readByte(); switch (m) { - case START: - if (b == '<') { - baos.write(b); - m = Mode.PRIO; - } else if(b == '\n'){ - //If the character is \n, it was because the last event was exactly - //as long as the maximum size allowed and - //the only remaining character was the delimiter - '\n', or - //multiple delimiters were sent in a row. - //Just ignore it, and move forward, don't change the mode. - //This is a no-op, just ignore it. - logger.debug("Delimiter found while in START mode, ignoring.."); - - } else { - isBadEvent = true; - baos.write(b); - //Bad event, just dump everything as if it is data. - m = Mode.DATA; - } - break; - case PRIO: - baos.write(b); - if (b == '>') { - if (prio.length() == 0) { - isBadEvent = true; - } - m = Mode.DATA; - } else { - char ch = (char) b; - prio.append(ch); - // Priority is max 3 digits per both RFC 3164 and 5424 - // With this check there is basically no danger of - // boas.size() exceeding this.maxSize before getting to the - // DATA state where this is actually checked - if (!Character.isDigit(ch) || prio.length() > 3) { + case START: + if (b == '<') { + baos.write(b); + m = Mode.PRIO; + } else if (b == '\n') { + //If the character is \n, it was because the last event was exactly + //as long as the maximum size allowed and + //the only remaining character was the delimiter - '\n', or + //multiple delimiters were sent in a row. + //Just ignore it, and move forward, don't change the mode. + //This is a no-op, just ignore it. + logger.debug("Delimiter found while in START mode, ignoring.."); + + } else { isBadEvent = true; - //If we hit a bad priority, just write as if everything is data. + baos.write(b); + //Bad event, just dump everything as if it is data. m = Mode.DATA; } - } - break; - case DATA: - // TCP syslog entries are separated by '\n' - if (b == '\n') { - e = buildEvent(); - doneReading = true; - } else { + break; + case PRIO: baos.write(b); - } - if(baos.size() == this.maxSize && !doneReading) { - isIncompleteEvent = true; - e = buildEvent(); - doneReading = true; - } - break; + if (b == '>') { + if (prio.length() == 0) { + isBadEvent = true; + } + m = Mode.DATA; + } else { + char ch = (char) b; + prio.append(ch); + // Priority is max 3 digits per both RFC 3164 and 5424 + // With this check there is basically no danger of + // boas.size() exceeding this.maxSize before getting to the + // DATA state where this is actually checked + if (!Character.isDigit(ch) || prio.length() > 3) { + isBadEvent = true; + //If we hit a bad priority, just write as if everything is data. + m = Mode.DATA; + } + } + break; + case DATA: + // TCP syslog entries are separated by '\n' + if (b == '\n') { + e = buildEvent(); + doneReading = true; + } else { + baos.write(b); + } + if (baos.size() == this.maxSize && !doneReading) { + isIncompleteEvent = true; + e = buildEvent(); + doneReading = true; + } + break; } } http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/source/ThriftSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/ThriftSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/ThriftSource.java index 7df5ddb..6a25e64 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/ThriftSource.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/ThriftSource.java @@ -75,11 +75,10 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.security.PrivilegedAction; -public class ThriftSource extends AbstractSource implements Configurable, - EventDrivenSource { +public class ThriftSource extends AbstractSource implements Configurable, EventDrivenSource { + + public static final Logger logger = LoggerFactory.getLogger(ThriftSource.class); - public static final Logger logger = LoggerFactory.getLogger(ThriftSource - .class); /** * Config param for the maximum number of threads this source should use to * handle incoming data. @@ -131,17 +130,17 @@ public class ThriftSource extends AbstractSource implements Configurable, logger.info("Configuring thrift source."); port = context.getInteger(CONFIG_PORT); Preconditions.checkNotNull(port, "Port must be specified for Thrift " + - "Source."); + "Source."); bindAddress = context.getString(CONFIG_BIND); Preconditions.checkNotNull(bindAddress, "Bind address must be specified " + - "for Thrift Source."); + "for Thrift Source."); try { maxThreads = context.getInteger(CONFIG_THREADS, 0); maxThreads = (maxThreads <= 0) ? Integer.MAX_VALUE : maxThreads; } catch (NumberFormatException e) { logger.warn("Thrift source\'s \"threads\" property must specify an " + - "integer value: " + context.getString(CONFIG_THREADS)); + "integer value: " + context.getString(CONFIG_THREADS)); } if (sourceCounter == null) { @@ -190,8 +189,8 @@ public class ThriftSource extends AbstractSource implements Configurable, String keytab = context.getString(AGENT_KEYTAB); enableKerberos = context.getBoolean(KERBEROS_KEY, false); this.flumeAuth = FlumeAuthenticationUtil.getAuthenticator(principal, keytab); - if(enableKerberos) { - if(!flumeAuth.isAuthenticated()) { + if (enableKerberos) { + if (!flumeAuth.isAuthenticated()) { throw new FlumeException("Authentication failed in Kerberos mode for " + "principal " + principal + " keytab " + keytab); } @@ -221,29 +220,27 @@ public class ThriftSource extends AbstractSource implements Configurable, servingExecutor.submit(new Runnable() { @Override public void run() { - flumeAuth.execute( - new PrivilegedAction<Object>() { - @Override - public Object run() { - server.serve(); - return null; - } + flumeAuth.execute(new PrivilegedAction<Object>() { + @Override + public Object run() { + server.serve(); + return null; } - ); + }); } }); long timeAfterStart = System.currentTimeMillis(); - while(!server.isServing()) { + while (!server.isServing()) { try { - if(System.currentTimeMillis() - timeAfterStart >=10000) { + if (System.currentTimeMillis() - timeAfterStart >= 10000) { throw new FlumeException("Thrift server failed to start!"); } TimeUnit.MILLISECONDS.sleep(1000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new FlumeException("Interrupted while waiting for Thrift server" + - " to start.", e); + " to start.", e); } } sourceCounter.start(); @@ -287,8 +284,7 @@ public class ThriftSource extends AbstractSource implements Configurable, private TServerTransport getTServerTransport() { try { - return new TServerSocket(new InetSocketAddress - (bindAddress, port)); + return new TServerSocket(new InetSocketAddress(bindAddress, port)); } catch (Throwable throwable) { throw new FlumeException("Cannot start Thrift source.", throwable); } @@ -305,7 +301,7 @@ public class ThriftSource extends AbstractSource implements Configurable, } private TServer getTThreadedSelectorServer() { - if(enableSsl || enableKerberos) { + if (enableSsl || enableKerberos) { return null; } Class<?> serverClass; @@ -345,7 +341,7 @@ public class ThriftSource extends AbstractSource implements Configurable, * */ server = (TServer) serverClass.getConstructor(argsClass).newInstance(args); - } catch(ClassNotFoundException e) { + } catch (ClassNotFoundException e) { return null; } catch (Throwable ex) { throw new FlumeException("Cannot start Thrift Source.", ex); @@ -371,7 +367,7 @@ public class ThriftSource extends AbstractSource implements Configurable, args.protocolFactory(getProtocolFactory()); //populate the transportFactory - if(enableKerberos) { + if (enableKerberos) { args.transportFactory(getSASLTransportFactory()); } else { args.transportFactory(new TFastFramedTransport.Factory()); @@ -402,7 +398,7 @@ public class ThriftSource extends AbstractSource implements Configurable, @Override public void stop() { - if(server != null && server.isServing()) { + if (server != null && server.isServing()) { server.stop(); } if (servingExecutor != null) { @@ -424,8 +420,7 @@ public class ThriftSource extends AbstractSource implements Configurable, @Override public Status append(ThriftFlumeEvent event) throws TException { - Event flumeEvent = EventBuilder.withBody(event.getBody(), - event.getHeaders()); + Event flumeEvent = EventBuilder.withBody(event.getBody(), event.getHeaders()); sourceCounter.incrementAppendReceivedCount(); sourceCounter.incrementEventReceivedCount(); @@ -434,7 +429,7 @@ public class ThriftSource extends AbstractSource implements Configurable, getChannelProcessor().processEvent(flumeEvent); } catch (ChannelException ex) { logger.warn("Thrift source " + getName() + " could not append events " + - "to the channel.", ex); + "to the channel.", ex); return Status.FAILED; } sourceCounter.incrementAppendAcceptedCount(); @@ -448,16 +443,14 @@ public class ThriftSource extends AbstractSource implements Configurable, sourceCounter.addToEventReceivedCount(events.size()); List<Event> flumeEvents = Lists.newArrayList(); - for(ThriftFlumeEvent event : events) { - flumeEvents.add(EventBuilder.withBody(event.getBody(), - event.getHeaders())); + for (ThriftFlumeEvent event : events) { + flumeEvents.add(EventBuilder.withBody(event.getBody(), event.getHeaders())); } try { getChannelProcessor().processEventBatch(flumeEvents); } catch (ChannelException ex) { - logger.warn("Thrift source %s could not append events to the " + - "channel.", getName()); + logger.warn("Thrift source %s could not append events to the channel.", getName()); return Status.FAILED; } http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/source/http/BLOBHandler.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/http/BLOBHandler.java b/flume-ng-core/src/main/java/org/apache/flume/source/http/BLOBHandler.java index a816363..e24d4c6 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/http/BLOBHandler.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/http/BLOBHandler.java @@ -77,7 +77,7 @@ public class BLOBHandler implements HTTPSourceHandler { } ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); - try{ + try { IOUtils.copy(inputStream, outputStream); LOG.debug("Building an Event with stream of size -- {}", outputStream.size()); Event event = EventBuilder.withBody(outputStream.toByteArray(), headers); @@ -85,8 +85,7 @@ public class BLOBHandler implements HTTPSourceHandler { List<Event> eventList = new ArrayList<Event>(); eventList.add(event); return eventList; - } - finally { + } finally { outputStream.close(); inputStream.close(); } @@ -94,7 +93,8 @@ public class BLOBHandler implements HTTPSourceHandler { @Override public void configure(Context context) { - this.commaSeparatedHeaders = context.getString(MANDATORY_PARAMETERS, DEFAULT_MANDATORY_PARAMETERS); + this.commaSeparatedHeaders = context.getString(MANDATORY_PARAMETERS, + DEFAULT_MANDATORY_PARAMETERS); this.mandatoryHeaders = commaSeparatedHeaders.split(PARAMETER_SEPARATOR); } http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSource.java index b520b03..38bdfda 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSource.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSource.java @@ -41,7 +41,12 @@ import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import java.io.IOException; import java.net.ServerSocket; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; /** * A source which accepts Flume Events by HTTP POST and GET. GET should be used @@ -104,27 +109,28 @@ public class HTTPSource extends AbstractSource implements port = context.getInteger(HTTPSourceConfigurationConstants.CONFIG_PORT); host = context.getString(HTTPSourceConfigurationConstants.CONFIG_BIND, - HTTPSourceConfigurationConstants.DEFAULT_BIND); + HTTPSourceConfigurationConstants.DEFAULT_BIND); Preconditions.checkState(host != null && !host.isEmpty(), "HTTPSource hostname specified is empty"); Preconditions.checkNotNull(port, "HTTPSource requires a port number to be" - + " specified"); + + " specified"); String handlerClassName = context.getString( HTTPSourceConfigurationConstants.CONFIG_HANDLER, HTTPSourceConfigurationConstants.DEFAULT_HANDLER).trim(); - if(sslEnabled) { + if (sslEnabled) { LOG.debug("SSL configuration enabled"); keyStorePath = context.getString(HTTPSourceConfigurationConstants.SSL_KEYSTORE); Preconditions.checkArgument(keyStorePath != null && !keyStorePath.isEmpty(), - "Keystore is required for SSL Conifguration" ); - keyStorePassword = context.getString(HTTPSourceConfigurationConstants.SSL_KEYSTORE_PASSWORD); + "Keystore is required for SSL Conifguration" ); + keyStorePassword = + context.getString(HTTPSourceConfigurationConstants.SSL_KEYSTORE_PASSWORD); Preconditions.checkArgument(keyStorePassword != null, - "Keystore password is required for SSL Configuration"); - String excludeProtocolsStr = context.getString(HTTPSourceConfigurationConstants - .EXCLUDE_PROTOCOLS); + "Keystore password is required for SSL Configuration"); + String excludeProtocolsStr = + context.getString(HTTPSourceConfigurationConstants.EXCLUDE_PROTOCOLS); if (excludeProtocolsStr == null) { excludedProtocols.add("SSLv3"); } else { @@ -166,9 +172,9 @@ public class HTTPSource extends AbstractSource implements private void checkHostAndPort() { Preconditions.checkState(host != null && !host.isEmpty(), - "HTTPSource hostname specified is empty"); + "HTTPSource hostname specified is empty"); Preconditions.checkNotNull(port, "HTTPSource requires a port number to be" - + " specified"); + + " specified"); } @Override @@ -199,8 +205,7 @@ public class HTTPSource extends AbstractSource implements connectors[0].setPort(port); srv.setConnectors(connectors); try { - org.mortbay.jetty.servlet.Context root = - new org.mortbay.jetty.servlet.Context( + org.mortbay.jetty.servlet.Context root = new org.mortbay.jetty.servlet.Context( srv, "/", org.mortbay.jetty.servlet.Context.SESSIONS); root.addServlet(new ServletHolder(new FlumeHTTPServlet()), "/"); HTTPServerConstraintUtil.enforceConstraints(root); @@ -285,26 +290,23 @@ public class HTTPSource extends AbstractSource implements } private static class HTTPSourceSocketConnector extends SslSocketConnector { - private final List<String> excludedProtocols; + HTTPSourceSocketConnector(List<String> excludedProtocols) { this.excludedProtocols = excludedProtocols; } @Override - public ServerSocket newServerSocket(String host, int port, - int backlog) throws IOException { - SSLServerSocket socket = (SSLServerSocket)super.newServerSocket(host, - port, backlog); + public ServerSocket newServerSocket(String host, int port, int backlog) throws IOException { + SSLServerSocket socket = (SSLServerSocket)super.newServerSocket(host, port, backlog); String[] protocols = socket.getEnabledProtocols(); List<String> newProtocols = new ArrayList<String>(protocols.length); - for(String protocol: protocols) { + for (String protocol: protocols) { if (!excludedProtocols.contains(protocol)) { newProtocols.add(protocol); } } - socket.setEnabledProtocols( - newProtocols.toArray(new String[newProtocols.size()])); + socket.setEnabledProtocols(newProtocols.toArray(new String[newProtocols.size()])); return socket; } } http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/source/http/JSONHandler.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/http/JSONHandler.java b/flume-ng-core/src/main/java/org/apache/flume/source/http/JSONHandler.java index 197f66a..c99eb18 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/http/JSONHandler.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/http/JSONHandler.java @@ -35,13 +35,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * * JSONHandler for HTTPSource that accepts an array of events. * * This handler throws exception if the deserialization fails because of bad * format or any other reason. * - * * Each event must be encoded as a map with two key-value pairs. <p> 1. headers * - the key for this key-value pair is "headers". The value for this key is * another map, which represent the event headers. These headers are inserted @@ -69,17 +67,15 @@ import org.slf4j.LoggerFactory; * {@linkplain Gson#toJson(java.lang.Object, java.lang.reflect.Type) } * method. The type token to pass as the 2nd argument of this method * for list of events can be created by: <p> - * - * Type type = new TypeToken<List<JSONEvent>>() {}.getType(); <p> - * + * {@code + * Type type = new TypeToken<List<JSONEvent>>() {}.getType(); + * } */ public class JSONHandler implements HTTPSourceHandler { private static final Logger LOG = LoggerFactory.getLogger(JSONHandler.class); - private final Type listType = - new TypeToken<List<JSONEvent>>() { - }.getType(); + private final Type listType = new TypeToken<List<JSONEvent>>() {}.getType(); private final Gson gson; public JSONHandler() { @@ -131,7 +127,7 @@ public class JSONHandler implements HTTPSourceHandler { private List<Event> getSimpleEvents(List<Event> events) { List<Event> newEvents = new ArrayList<Event>(events.size()); - for(Event e:events) { + for (Event e:events) { newEvents.add(EventBuilder.withBody(e.getBody(), e.getHeaders())); } return newEvents; http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/tools/DirectMemoryUtils.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/tools/DirectMemoryUtils.java b/flume-ng-core/src/main/java/org/apache/flume/tools/DirectMemoryUtils.java index 8c2db2c..dfa2229 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/tools/DirectMemoryUtils.java +++ b/flume-ng-core/src/main/java/org/apache/flume/tools/DirectMemoryUtils.java @@ -54,7 +54,7 @@ public class DirectMemoryUtils { ByteBuffer result = ByteBuffer.allocateDirect(size); allocated.addAndGet(size); return result; - } catch(OutOfMemoryError error) { + } catch (OutOfMemoryError error) { LOG.error("Error allocating " + size + ", you likely want" + " to increase " + MAX_DIRECT_MEMORY_PARAM, error); throw error; @@ -88,11 +88,9 @@ public class DirectMemoryUtils { if (memSize.contains("k")) { multiplier = 1024; - } - else if (memSize.contains("m")) { + } else if (memSize.contains("m")) { multiplier = 1048576; - } - else if (memSize.contains("g")) { + } else if (memSize.contains("g")) { multiplier = 1073741824; } memSize = memSize.replaceAll("[^\\d]", ""); @@ -107,7 +105,7 @@ public class DirectMemoryUtils { Class<?> VM = Class.forName("sun.misc.VM"); Method maxDirectMemory = VM.getDeclaredMethod("maxDirectMemory", (Class<?>)null); Object result = maxDirectMemory.invoke(null, (Object[])null); - if(result != null && result instanceof Long) { + if (result != null && result instanceof Long) { return (Long)result; } } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/tools/GetJavaProperty.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/tools/GetJavaProperty.java b/flume-ng-core/src/main/java/org/apache/flume/tools/GetJavaProperty.java index 5d0ea74..bc073bb 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/tools/GetJavaProperty.java +++ b/flume-ng-core/src/main/java/org/apache/flume/tools/GetJavaProperty.java @@ -21,7 +21,7 @@ package org.apache.flume.tools; * A generic way for querying Java properties. */ public class GetJavaProperty { - public static void main(String args[]) { + public static void main(String[] args) { if (args.length == 0) { for (Object prop : System.getProperties().keySet()) { System.out.println(prop + "=" + System.getProperty((String)prop, "")); http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/tools/TimestampRoundDownUtil.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/tools/TimestampRoundDownUtil.java b/flume-ng-core/src/main/java/org/apache/flume/tools/TimestampRoundDownUtil.java index 3a59953..daa9606 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/tools/TimestampRoundDownUtil.java +++ b/flume-ng-core/src/main/java/org/apache/flume/tools/TimestampRoundDownUtil.java @@ -35,7 +35,7 @@ public class TimestampRoundDownUtil { */ public static long roundDownTimeStampSeconds(long timestamp, int roundDownSec) throws IllegalStateException { - Preconditions.checkArgument(roundDownSec > 0 && roundDownSec <=60, + Preconditions.checkArgument(roundDownSec > 0 && roundDownSec <= 60, "RoundDownSec must be > 0 and <=60"); Calendar cal = roundDownField(timestamp, Calendar.SECOND, roundDownSec); cal.set(Calendar.MILLISECOND, 0); @@ -53,7 +53,7 @@ public class TimestampRoundDownUtil { */ public static long roundDownTimeStampMinutes(long timestamp, int roundDownMins) throws IllegalStateException { - Preconditions.checkArgument(roundDownMins > 0 && roundDownMins <=60, + Preconditions.checkArgument(roundDownMins > 0 && roundDownMins <= 60, "RoundDown must be > 0 and <=60"); Calendar cal = roundDownField(timestamp, Calendar.MINUTE, roundDownMins); cal.set(Calendar.SECOND, 0); @@ -73,7 +73,7 @@ public class TimestampRoundDownUtil { */ public static long roundDownTimeStampHours(long timestamp, int roundDownHours) throws IllegalStateException { - Preconditions.checkArgument(roundDownHours > 0 && roundDownHours <=24, + Preconditions.checkArgument(roundDownHours > 0 && roundDownHours <= 24, "RoundDown must be > 0 and <=24"); Calendar cal = roundDownField(timestamp, Calendar.HOUR_OF_DAY, roundDownHours); @@ -83,8 +83,7 @@ public class TimestampRoundDownUtil { return cal.getTimeInMillis(); } - private static Calendar roundDownField( - long timestamp, int field, int roundDown){ + private static Calendar roundDownField(long timestamp, int field, int roundDown) { Preconditions.checkArgument(timestamp > 0, "Timestamp must be positive"); Calendar cal = Calendar.getInstance(); cal.setTimeInMillis(timestamp); http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-core/src/main/java/org/apache/flume/tools/VersionInfo.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/tools/VersionInfo.java b/flume-ng-core/src/main/java/org/apache/flume/tools/VersionInfo.java index c12cf8d..95aa29d 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/tools/VersionInfo.java +++ b/flume-ng-core/src/main/java/org/apache/flume/tools/VersionInfo.java @@ -54,9 +54,9 @@ public class VersionInfo { * @return the revision number, eg. "100755" */ public static String getRevision() { - if(version != null - && version.revision() != null - && !version.revision().isEmpty()){ + if (version != null + && version.revision() != null + && !version.revision().isEmpty()) { return version.revision(); } return "Unknown"; @@ -105,12 +105,12 @@ public class VersionInfo { * Returns the build version info which includes version, * revision, user, date and source checksum */ - public static String getBuildVersion(){ + public static String getBuildVersion() { return VersionInfo.getVersion() + - " from " + VersionInfo.getRevision() + - " by " + VersionInfo.getUser() + - " on " + VersionInfo.getDate() + - " source checksum " + VersionInfo.getSrcChecksum(); + " from " + VersionInfo.getRevision() + + " by " + VersionInfo.getUser() + + " on " + VersionInfo.getDate() + + " source checksum " + VersionInfo.getSrcChecksum(); } public static void main(String[] args) { http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/EmbeddedAgent.java ---------------------------------------------------------------------- diff --git a/flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/EmbeddedAgent.java b/flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/EmbeddedAgent.java index 32c9f18..ad3e138 100644 --- a/flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/EmbeddedAgent.java +++ b/flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/EmbeddedAgent.java @@ -76,9 +76,11 @@ public class EmbeddedAgent { supervisor = new LifecycleSupervisor(); } + public EmbeddedAgent(String name) { this(new MaterializedConfigurationProvider(), name); } + /** * Configures the embedded agent. Can only be called after the object * is created or after the stop() method is called. @@ -89,12 +91,13 @@ public class EmbeddedAgent { */ public void configure(Map<String, String> properties) throws FlumeException { - if(state == State.STARTED) { + if (state == State.STARTED) { throw new IllegalStateException("Cannot be configured while started"); } doConfigure(properties); state = State.STOPPED; } + /** * Started the agent. Can only be called after a successful call to * configure(). @@ -105,9 +108,9 @@ public class EmbeddedAgent { */ public void start() throws FlumeException { - if(state == State.STARTED) { + if (state == State.STARTED) { throw new IllegalStateException("Cannot be started while started"); - } else if(state == State.NEW) { + } else if (state == State.NEW) { throw new IllegalStateException("Cannot be started before being " + "configured"); } @@ -115,15 +118,15 @@ public class EmbeddedAgent { // as doStart() accesses sourceRunner.getSource() Source source = Preconditions.checkNotNull(sourceRunner.getSource(), "Source runner returned null source"); - if(source instanceof EmbeddedSource) { + if (source instanceof EmbeddedSource) { embeddedSource = (EmbeddedSource)source; } else { - throw new IllegalStateException("Unknown source type: " + source. - getClass().getName()); + throw new IllegalStateException("Unknown source type: " + source.getClass().getName()); } doStart(); state = State.STARTED; } + /** * Stops the agent. Can only be called after a successful call to start(). * After a call to stop(), the agent can be re-configured with the @@ -134,7 +137,7 @@ public class EmbeddedAgent { */ public void stop() throws FlumeException { - if(state != State.STARTED) { + if (state != State.STARTED) { throw new IllegalStateException("Cannot be stopped unless started"); } supervisor.stop(); @@ -146,9 +149,9 @@ public class EmbeddedAgent { properties = EmbeddedAgentConfiguration.configure(name, properties); - if(LOGGER.isDebugEnabled()) { + if (LOGGER.isDebugEnabled()) { LOGGER.debug("Agent configuration values"); - for(String key : new TreeSet<String>(properties.keySet())) { + for (String key : new TreeSet<String>(properties.keySet())) { LOGGER.debug(key + " = " + properties.get(key)); } } @@ -156,17 +159,17 @@ public class EmbeddedAgent { MaterializedConfiguration conf = configurationProvider.get(name, properties); Map<String, SourceRunner> sources = conf.getSourceRunners(); - if(sources.size() != 1) { + if (sources.size() != 1) { throw new FlumeException("Expected one source and got " + sources.size()); } Map<String, Channel> channels = conf.getChannels(); - if(channels.size() != 1) { + if (channels.size() != 1) { throw new FlumeException("Expected one channel and got " + channels.size()); } Map<String, SinkRunner> sinks = conf.getSinkRunners(); - if(sinks.size() != 1) { + if (sinks.size() != 1) { throw new FlumeException("Expected one sink group and got " + sinks.size()); } @@ -174,6 +177,7 @@ public class EmbeddedAgent { this.channel = channels.values().iterator().next(); this.sinkRunner = sinks.values().iterator().next(); } + /** * Adds event to the channel owned by the agent. Note however, that the * event is not copied and as such, the byte array and headers cannot @@ -182,7 +186,7 @@ public class EmbeddedAgent { * @throws EventDeliveryException if unable to add event to channel */ public void put(Event event) throws EventDeliveryException { - if(state != State.STARTED) { + if (state != State.STARTED) { throw new IllegalStateException("Cannot put events unless started"); } try { @@ -192,6 +196,7 @@ public class EmbeddedAgent { ": Unable to process event: " + ex.getMessage(), ex); } } + /** * Adds events to the channel owned by the agent. Note however, that the * event is not copied and as such, the byte array and headers cannot @@ -200,7 +205,7 @@ public class EmbeddedAgent { * @throws EventDeliveryException if unable to add event to channel */ public void putAll(List<Event> events) throws EventDeliveryException { - if(state != State.STARTED) { + if (state != State.STARTED) { throw new IllegalStateException("Cannot put events unless started"); } try { @@ -226,7 +231,7 @@ public class EmbeddedAgent { new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START); error = false; } finally { - if(error) { + if (error) { stopLogError(sourceRunner); stopLogError(channel); stopLogError(sinkRunner); @@ -234,9 +239,10 @@ public class EmbeddedAgent { } } } + private void stopLogError(LifecycleAware lifeCycleAware) { try { - if(LifecycleState.START.equals(lifeCycleAware.getLifecycleState())) { + if (LifecycleState.START.equals(lifeCycleAware.getLifecycleState())) { lifeCycleAware.stop(); } } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/EmbeddedAgentConfiguration.java ---------------------------------------------------------------------- diff --git a/flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/EmbeddedAgentConfiguration.java b/flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/EmbeddedAgentConfiguration.java index 4a49fa0..05d2d04 100644 --- a/flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/EmbeddedAgentConfiguration.java +++ b/flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/EmbeddedAgentConfiguration.java @@ -136,8 +136,8 @@ public class EmbeddedAgentConfiguration { * Load balancing sink processor. See Flume User Guide for configuration * information. */ - public static final String SINK_PROCESSOR_TYPE_LOAD_BALANCE = SinkProcessorType.LOAD_BALANCE.name(); - + public static final String SINK_PROCESSOR_TYPE_LOAD_BALANCE = + SinkProcessorType.LOAD_BALANCE.name(); private static final String[] ALLOWED_SOURCES = { SOURCE_TYPE_EMBEDDED_ALIAS, @@ -165,22 +165,21 @@ public class EmbeddedAgentConfiguration { private static void validate(String name, Map<String, String> properties) throws FlumeException { - if(properties.containsKey(SOURCE_TYPE)) { + if (properties.containsKey(SOURCE_TYPE)) { checkAllowed(ALLOWED_SOURCES, properties.get(SOURCE_TYPE)); } checkRequired(properties, CHANNEL_TYPE); checkAllowed(ALLOWED_CHANNELS, properties.get(CHANNEL_TYPE)); checkRequired(properties, SINKS); String sinkNames = properties.get(SINKS); - for(String sink : sinkNames.split("\\s+")) { - if(DISALLOWED_SINK_NAMES.contains(sink.toLowerCase(Locale.ENGLISH))) { + for (String sink : sinkNames.split("\\s+")) { + if (DISALLOWED_SINK_NAMES.contains(sink.toLowerCase(Locale.ENGLISH))) { throw new FlumeException("Sink name " + sink + " is one of the" + " disallowed sink names: " + DISALLOWED_SINK_NAMES); } String key = join(sink, TYPE); checkRequired(properties, key); checkAllowed(ALLOWED_SINKS, properties.get(key)); - } checkRequired(properties, SINK_PROCESSOR_TYPE); checkAllowed(ALLOWED_SINK_PROCESSORS, properties.get(SINK_PROCESSOR_TYPE)); @@ -201,8 +200,8 @@ public class EmbeddedAgentConfiguration { // we are going to modify the properties as we parse the config properties = new HashMap<String, String>(properties); - if(!properties.containsKey(SOURCE_TYPE) || SOURCE_TYPE_EMBEDDED_ALIAS. - equalsIgnoreCase(properties.get(SOURCE_TYPE))) { + if (!properties.containsKey(SOURCE_TYPE) || + SOURCE_TYPE_EMBEDDED_ALIAS.equalsIgnoreCase(properties.get(SOURCE_TYPE))) { properties.put(SOURCE_TYPE, SOURCE_TYPE_EMBEDDED); } String sinkNames = properties.remove(SINKS); @@ -220,9 +219,6 @@ public class EmbeddedAgentConfiguration { // user supplied config -> agent configuration Map<String, String> result = Maps.newHashMap(); - // properties will be modified during iteration so we need a - // copy of the keys - Set<String> userProvidedKeys; /* * First we are going to setup all the root level pointers. I.E * point the agent at the components, sink group at sinks, and @@ -247,15 +243,19 @@ public class EmbeddedAgentConfiguration { result.put(join(name, BasicConfigurationConstants.CONFIG_SOURCES, sourceName, BasicConfigurationConstants.CONFIG_CHANNELS), channelName); + + // Properties will be modified during iteration so we need a + // copy of the keys. + Set<String> userProvidedKeys = new HashSet<String>(properties.keySet()); + /* * Second process the sink configuration and point the sinks * at the channel. */ - userProvidedKeys = new HashSet<String>(properties.keySet()); - for(String sink : sinkNames.split("\\s+")) { - for(String key : userProvidedKeys) { + for (String sink : sinkNames.split("\\s+")) { + for (String key : userProvidedKeys) { String value = properties.get(key); - if(key.startsWith(sink + SEPERATOR)) { + if (key.startsWith(sink + SEPERATOR)) { properties.remove(key); result.put(join(name, BasicConfigurationConstants.CONFIG_SINKS, key), value); @@ -271,19 +271,19 @@ public class EmbeddedAgentConfiguration { * correctly and then passing them on to the agent. */ userProvidedKeys = new HashSet<String>(properties.keySet()); - for(String key : userProvidedKeys) { + for (String key : userProvidedKeys) { String value = properties.get(key); - if(key.startsWith(SOURCE_PREFIX)) { + if (key.startsWith(SOURCE_PREFIX)) { // users use `source' but agent needs the actual source name key = key.replaceFirst(SOURCE, sourceName); result.put(join(name, BasicConfigurationConstants.CONFIG_SOURCES, key), value); - } else if(key.startsWith(CHANNEL_PREFIX)) { + } else if (key.startsWith(CHANNEL_PREFIX)) { // users use `channel' but agent needs the actual channel name key = key.replaceFirst(CHANNEL, channelName); result.put(join(name, BasicConfigurationConstants.CONFIG_CHANNELS, key), value); - } else if(key.startsWith(SINK_PROCESSOR_PREFIX)) { + } else if (key.startsWith(SINK_PROCESSOR_PREFIX)) { // agent.sinkgroups.sinkgroup.processor.* result.put(join(name, BasicConfigurationConstants.CONFIG_SINKGROUPS, sinkGroupName, key), value); @@ -297,20 +297,19 @@ public class EmbeddedAgentConfiguration { private static void checkAllowed(String[] allowedTypes, String type) { boolean isAllowed = false; type = type.trim(); - for(String allowedType : allowedTypes) { - if(allowedType.equalsIgnoreCase(type)) { + for (String allowedType : allowedTypes) { + if (allowedType.equalsIgnoreCase(type)) { isAllowed = true; break; } } - if(!isAllowed) { + if (!isAllowed) { throw new FlumeException("Component type of " + type + " is not in " + "allowed types of " + Arrays.toString(allowedTypes)); } } - private static void checkRequired(Map<String, String> properties, - String name) { - if(!properties.containsKey(name)) { + private static void checkRequired(Map<String, String> properties, String name) { + if (!properties.containsKey(name)) { throw new FlumeException("Required parameter not found " + name); } } @@ -319,7 +318,5 @@ public class EmbeddedAgentConfiguration { return JOINER.join(parts); } - private EmbeddedAgentConfiguration() { - - } + private EmbeddedAgentConfiguration() {} } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/EmbeddedSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/EmbeddedSource.java b/flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/EmbeddedSource.java index 53389d2..71a88ec 100644 --- a/flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/EmbeddedSource.java +++ b/flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/EmbeddedSource.java @@ -35,16 +35,16 @@ import org.apache.flume.source.AbstractSource; */ @InterfaceAudience.Private @InterfaceStability.Unstable -public class EmbeddedSource extends AbstractSource - implements EventDrivenSource, Configurable { +public class EmbeddedSource extends AbstractSource implements EventDrivenSource, Configurable { @Override public void configure(Context context) { - } + public void put(Event event) throws ChannelException { getChannelProcessor().processEvent(event); } + public void putAll(List<Event> events) throws ChannelException { getChannelProcessor().processEventBatch(events); } http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-legacy-sources/flume-thrift-source/src/main/java/org/apache/flume/source/thriftLegacy/ThriftLegacySource.java ---------------------------------------------------------------------- diff --git a/flume-ng-legacy-sources/flume-thrift-source/src/main/java/org/apache/flume/source/thriftLegacy/ThriftLegacySource.java b/flume-ng-legacy-sources/flume-thrift-source/src/main/java/org/apache/flume/source/thriftLegacy/ThriftLegacySource.java index 47913dc..7140f07 100644 --- a/flume-ng-legacy-sources/flume-thrift-source/src/main/java/org/apache/flume/source/thriftLegacy/ThriftLegacySource.java +++ b/flume-ng-legacy-sources/flume-thrift-source/src/main/java/org/apache/flume/source/thriftLegacy/ThriftLegacySource.java @@ -27,6 +27,8 @@ import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; +import com.cloudera.flume.handlers.thrift.ThriftFlumeEvent; +import com.cloudera.flume.handlers.thrift.ThriftFlumeEventServer; import org.apache.flume.ChannelException; import org.apache.flume.Context; import org.apache.flume.CounterGroup; @@ -45,8 +47,6 @@ import org.apache.thrift.transport.TServerSocket; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.cloudera.flume.handlers.thrift.*; - public class ThriftLegacySource extends AbstractSource implements EventDrivenSource, Configurable { @@ -86,7 +86,7 @@ public class ThriftLegacySource extends AbstractSource implements headers.put(NANOS, Long.toString(evt.getNanos())); for (Entry<String, ByteBuffer> entry: evt.getFields().entrySet()) { headers.put(entry.getKey().toString(), - UTF_8.decode(entry.getValue()).toString()); + UTF_8.decode(entry.getValue()).toString()); } headers.put(OG_EVENT, "yes"); @@ -139,8 +139,8 @@ public class ThriftLegacySource extends AbstractSource implements serverTransport = new TServerSocket(bindAddr); ThriftFlumeEventServer.Processor processor = new ThriftFlumeEventServer.Processor(new ThriftFlumeEventServerImpl()); - server = new TThreadPoolServer(new TThreadPoolServer. - Args(serverTransport).processor(processor)); + server = new TThreadPoolServer( + new TThreadPoolServer.Args(serverTransport).processor(processor)); } catch (TTransportException e) { throw new FlumeException("Failed starting source", e); }
