http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java index e659ada..4c8b52b 100644 --- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java +++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java @@ -18,15 +18,10 @@ */ package org.apache.flume.sink.hbase; -import java.io.IOException; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.NavigableMap; - import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Charsets; +import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.apache.flume.Channel; @@ -52,14 +47,16 @@ import org.apache.hadoop.hbase.util.Bytes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Charsets; -import com.google.common.base.Preconditions; -import com.google.common.base.Throwables; +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.security.PrivilegedExceptionAction; - +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; /** - * * A simple sink which reads events from a channel and writes them to HBase. * The Hbase configuration is picked up from the first <tt>hbase-site.xml</tt> * encountered in the classpath. This sink supports batch reading of @@ -73,7 +70,7 @@ import java.security.PrivilegedExceptionAction; * batch size, whichever comes first.<p> * Other optional parameters are:<p> * <tt>serializer:</tt> A class implementing {@link HbaseEventSerializer}. - * An instance of + * An instance of * this class will be used to write out events to hbase.<p> * <tt>serializer.*:</tt> Passed in the configure() method to serializer * as an object of {@link org.apache.flume.Context}.<p> @@ -81,7 +78,7 @@ import java.security.PrivilegedExceptionAction; * maximum number of events the sink will commit per transaction. The default * batch size is 100 events. * <p> - * + * <p> * <strong>Note: </strong> While this sink flushes all events in a transaction * to HBase in one shot, Hbase does not guarantee atomic commits on multiple * rows. So if a subset of events in a batch are written to disk by Hbase and @@ -113,11 +110,11 @@ public class HBaseSink extends AbstractSink implements Configurable { // Internal hooks used for unit testing. private DebugIncrementsCallback debugIncrCallback = null; - public HBaseSink(){ + public HBaseSink() { this(HBaseConfiguration.create()); } - public HBaseSink(Configuration conf){ + public HBaseSink(Configuration conf) { this.config = conf; } @@ -129,15 +126,16 @@ public class HBaseSink extends AbstractSink implements Configurable { } @Override - public void start(){ + public void start() { Preconditions.checkArgument(table == null, "Please call stop " + "before calling start on an old instance."); try { - privilegedExecutor = FlumeAuthenticationUtil.getAuthenticator(kerberosPrincipal, kerberosKeytab); + privilegedExecutor = + FlumeAuthenticationUtil.getAuthenticator(kerberosPrincipal, kerberosKeytab); } catch (Exception ex) { sinkCounter.incrementConnectionFailedCount(); throw new FlumeException("Failed to login to HBase using " - + "provided credentials.", ex); + + "provided credentials.", ex); } try { table = privilegedExecutor.execute(new PrivilegedExceptionAction<HTable>() { @@ -165,16 +163,16 @@ public class HBaseSink extends AbstractSink implements Configurable { } })) { throw new IOException("Table " + tableName - + " has no such column family " + Bytes.toString(columnFamily)); + + " has no such column family " + Bytes.toString(columnFamily)); } } catch (Exception e) { //Get getTableDescriptor also throws IOException, so catch the IOException //thrown above or by the getTableDescriptor() call. sinkCounter.incrementConnectionFailedCount(); throw new FlumeException("Error getting column family from HBase." - + "Please verify that the table " + tableName + " and Column Family, " - + Bytes.toString(columnFamily) + " exists in HBase, and the" - + " current user has permissions to access that table.", e); + + "Please verify that the table " + tableName + " and Column Family, " + + Bytes.toString(columnFamily) + " exists in HBase, and the" + + " current user has permissions to access that table.", e); } super.start(); @@ -183,7 +181,7 @@ public class HBaseSink extends AbstractSink implements Configurable { } @Override - public void stop(){ + public void stop() { try { if (table != null) { table.close(); @@ -198,7 +196,7 @@ public class HBaseSink extends AbstractSink implements Configurable { @SuppressWarnings("unchecked") @Override - public void configure(Context context){ + public void configure(Context context) { tableName = context.getString(HBaseSinkConfigurationConstants.CONFIG_TABLE); String cf = context.getString( HBaseSinkConfigurationConstants.CONFIG_COLUMN_FAMILY); @@ -213,48 +211,48 @@ public class HBaseSink extends AbstractSink implements Configurable { Preconditions.checkNotNull(cf, "Column family cannot be empty, please specify in configuration file"); //Check foe event serializer, if null set event serializer type - if(eventSerializerType == null || eventSerializerType.isEmpty()) { + if (eventSerializerType == null || eventSerializerType.isEmpty()) { eventSerializerType = "org.apache.flume.sink.hbase.SimpleHbaseEventSerializer"; logger.info("No serializer defined, Will use default"); } serializerContext.putAll(context.getSubProperties( - HBaseSinkConfigurationConstants.CONFIG_SERIALIZER_PREFIX)); + HBaseSinkConfigurationConstants.CONFIG_SERIALIZER_PREFIX)); columnFamily = cf.getBytes(Charsets.UTF_8); try { Class<? extends HbaseEventSerializer> clazz = (Class<? extends HbaseEventSerializer>) - Class.forName(eventSerializerType); + Class.forName(eventSerializerType); serializer = clazz.newInstance(); serializer.configure(serializerContext); } catch (Exception e) { - logger.error("Could not instantiate event serializer." , e); + logger.error("Could not instantiate event serializer.", e); Throwables.propagate(e); } kerberosKeytab = context.getString(HBaseSinkConfigurationConstants.CONFIG_KEYTAB); kerberosPrincipal = context.getString(HBaseSinkConfigurationConstants.CONFIG_PRINCIPAL); enableWal = context.getBoolean(HBaseSinkConfigurationConstants - .CONFIG_ENABLE_WAL, HBaseSinkConfigurationConstants.DEFAULT_ENABLE_WAL); + .CONFIG_ENABLE_WAL, HBaseSinkConfigurationConstants.DEFAULT_ENABLE_WAL); logger.info("The write to WAL option is set to: " + String.valueOf(enableWal)); - if(!enableWal) { + if (!enableWal) { logger.warn("HBase Sink's enableWal configuration is set to false. All " + - "writes to HBase will have WAL disabled, and any data in the " + - "memstore of this region in the Region Server could be lost!"); + "writes to HBase will have WAL disabled, and any data in the " + + "memstore of this region in the Region Server could be lost!"); } batchIncrements = context.getBoolean( - HBaseSinkConfigurationConstants.CONFIG_COALESCE_INCREMENTS, - HBaseSinkConfigurationConstants.DEFAULT_COALESCE_INCREMENTS); + HBaseSinkConfigurationConstants.CONFIG_COALESCE_INCREMENTS, + HBaseSinkConfigurationConstants.DEFAULT_COALESCE_INCREMENTS); if (batchIncrements) { logger.info("Increment coalescing is enabled. Increments will be " + - "buffered."); + "buffered."); refGetFamilyMap = reflectLookupGetFamilyMap(); } String zkQuorum = context.getString(HBaseSinkConfigurationConstants - .ZK_QUORUM); + .ZK_QUORUM); Integer port = null; /** * HBase allows multiple nodes in the quorum, but all need to use the @@ -267,10 +265,10 @@ public class HBaseSink extends AbstractSink implements Configurable { logger.info("Using ZK Quorum: " + zkQuorum); String[] zkHosts = zkQuorum.split(","); int length = zkHosts.length; - for(int i = 0; i < length; i++) { + for (int i = 0; i < length; i++) { String[] zkHostAndPort = zkHosts[i].split(":"); zkBuilder.append(zkHostAndPort[0].trim()); - if(i != length-1) { + if (i != length - 1) { zkBuilder.append(","); } else { zkQuorum = zkBuilder.toString(); @@ -282,18 +280,18 @@ public class HBaseSink extends AbstractSink implements Configurable { port = Integer.parseInt(zkHostAndPort[1].trim()); } else if (!port.equals(Integer.parseInt(zkHostAndPort[1].trim()))) { throw new FlumeException("All Zookeeper nodes in the quorum must " + - "use the same client port."); + "use the same client port."); } } - if(port == null) { + if (port == null) { port = HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT; } this.config.set(HConstants.ZOOKEEPER_QUORUM, zkQuorum); this.config.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, port); } String hbaseZnode = context.getString( - HBaseSinkConfigurationConstants.ZK_ZNODE_PARENT); - if(hbaseZnode != null && !hbaseZnode.isEmpty()) { + HBaseSinkConfigurationConstants.ZK_ZNODE_PARENT); + if (hbaseZnode != null && !hbaseZnode.isEmpty()) { this.config.set(HConstants.ZOOKEEPER_ZNODE_PARENT, hbaseZnode); } sinkCounter = new SinkCounter(this.getName()); @@ -314,7 +312,7 @@ public class HBaseSink extends AbstractSink implements Configurable { txn.begin(); if (serializer instanceof BatchAware) { - ((BatchAware)serializer).onBatchStart(); + ((BatchAware) serializer).onBatchStart(); } long i = 0; @@ -342,15 +340,15 @@ public class HBaseSink extends AbstractSink implements Configurable { putEventsAndCommit(actions, incs, txn); } catch (Throwable e) { - try{ + try { txn.rollback(); } catch (Exception e2) { logger.error("Exception in rollback. Rollback might not have been " + - "successful." , e2); + "successful.", e2); } logger.error("Failed to commit transaction." + "Transaction rolled back.", e); - if(e instanceof Error || e instanceof RuntimeException){ + if (e instanceof Error || e instanceof RuntimeException) { logger.error("Failed to commit transaction." + "Transaction rolled back.", e); Throwables.propagate(e); @@ -367,7 +365,7 @@ public class HBaseSink extends AbstractSink implements Configurable { } private void putEventsAndCommit(final List<Row> actions, - final List<Increment> incs, Transaction txn) throws Exception { + final List<Increment> incs, Transaction txn) throws Exception { privilegedExecutor.execute(new PrivilegedExceptionAction<Void>() { @Override @@ -421,7 +419,7 @@ public class HBaseSink extends AbstractSink implements Configurable { @VisibleForTesting static Method reflectLookupGetFamilyMap() { Method m = null; - String[] methodNames = { "getFamilyMapOfLongs", "getFamilyMap" }; + String[] methodNames = {"getFamilyMapOfLongs", "getFamilyMap"}; for (String methodName : methodNames) { try { m = Increment.class.getMethod(methodName); @@ -447,7 +445,7 @@ public class HBaseSink extends AbstractSink implements Configurable { @SuppressWarnings("unchecked") private Map<byte[], NavigableMap<byte[], Long>> getFamilyMap(Increment inc) { Preconditions.checkNotNull(refGetFamilyMap, - "Increment.getFamilymap() not found"); + "Increment.getFamilymap() not found"); Preconditions.checkNotNull(inc, "Increment required"); Map<byte[], NavigableMap<byte[], Long>> familyMap = null; try { @@ -466,6 +464,7 @@ public class HBaseSink extends AbstractSink implements Configurable { /** * Perform "compression" on the given set of increments so that Flume sends * the minimum possible number of RPC operations to HBase per batch. + * * @param incs Input: Increment objects to coalesce. * @return List of new Increment objects after coalescing the unique counts. */ @@ -478,7 +477,7 @@ public class HBaseSink extends AbstractSink implements Configurable { for (Increment inc : incs) { byte[] row = inc.getRow(); Map<byte[], NavigableMap<byte[], Long>> families = getFamilyMap(inc); - for (Map.Entry<byte[], NavigableMap<byte[],Long>> familyEntry : families.entrySet()) { + for (Map.Entry<byte[], NavigableMap<byte[], Long>> familyEntry : families.entrySet()) { byte[] family = familyEntry.getKey(); NavigableMap<byte[], Long> qualifiers = familyEntry.getValue(); for (Map.Entry<byte[], Long> qualifierEntry : qualifiers.entrySet()) { @@ -491,9 +490,10 @@ public class HBaseSink extends AbstractSink implements Configurable { // Reconstruct list of Increments per unique row/family/qualifier. List<Increment> coalesced = Lists.newLinkedList(); - for (Map.Entry<byte[], Map<byte[],NavigableMap<byte[], Long>>> rowEntry : counters.entrySet()) { + for (Map.Entry<byte[], Map<byte[], NavigableMap<byte[], Long>>> rowEntry : + counters.entrySet()) { byte[] row = rowEntry.getKey(); - Map <byte[], NavigableMap<byte[], Long>> families = rowEntry.getValue(); + Map<byte[], NavigableMap<byte[], Long>> families = rowEntry.getValue(); Increment inc = new Increment(row); for (Map.Entry<byte[], NavigableMap<byte[], Long>> familyEntry : families.entrySet()) { byte[] family = familyEntry.getKey(); @@ -513,11 +513,12 @@ public class HBaseSink extends AbstractSink implements Configurable { /** * Helper function for {@link #coalesceIncrements} to increment a counter * value in the passed data structure. - * @param counters Nested data structure containing the counters. - * @param row Row key to increment. - * @param family Column family to increment. + * + * @param counters Nested data structure containing the counters. + * @param row Row key to increment. + * @param family Column family to increment. * @param qualifier Column qualifier to increment. - * @param count Amount to increment by. + * @param count Amount to increment by. */ private void incrementCounter( Map<byte[], Map<byte[], NavigableMap<byte[], Long>>> counters,
http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HbaseEventSerializer.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HbaseEventSerializer.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HbaseEventSerializer.java index 2c0f0e6..d4e3f84 100644 --- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HbaseEventSerializer.java +++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HbaseEventSerializer.java @@ -32,13 +32,12 @@ import org.apache.hadoop.hbase.client.Row; * params required should be taken through this. Only the column family is * passed in. The columns should exist in the table and column family * specified in the configuration for the HbaseSink. - * */ -public interface HbaseEventSerializer extends Configurable, - ConfigurableComponent { +public interface HbaseEventSerializer extends Configurable, ConfigurableComponent { /** * Initialize the event serializer. - * @param Event to be written to HBase. + * @param event Event to be written to HBase + * @param columnFamily Column family to write to */ public void initialize(Event event, byte[] columnFamily); @@ -54,10 +53,9 @@ public interface HbaseEventSerializer extends Configurable, public List<Row> getActions(); public List<Increment> getIncrements(); + /* * Clean up any state. This will be called when the sink is being stopped. */ public void close(); - - } http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/RegexHbaseEventSerializer.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/RegexHbaseEventSerializer.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/RegexHbaseEventSerializer.java index 7d2b8b7..8342d67 100644 --- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/RegexHbaseEventSerializer.java +++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/RegexHbaseEventSerializer.java @@ -18,14 +18,8 @@ */ package org.apache.flume.sink.hbase; -import java.nio.charset.Charset; -import java.util.Calendar; -import java.util.List; -import java.util.Map; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - +import com.google.common.base.Charsets; +import com.google.common.collect.Lists; import org.apache.commons.lang.RandomStringUtils; import org.apache.flume.Context; import org.apache.flume.Event; @@ -35,20 +29,25 @@ import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Row; -import com.google.common.base.Charsets; -import com.google.common.collect.Lists; +import java.nio.charset.Charset; +import java.util.Calendar; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.regex.Matcher; +import java.util.regex.Pattern; /** - * An {@link HbaseEventSerializer} which parses columns based on a supplied - * regular expression and column name list. - * + * An {@link HbaseEventSerializer} which parses columns based on a supplied + * regular expression and column name list. + * <p> * Note that if the regular expression does not return the correct number of * groups for a particular event, or it does not correctly match an event, * the event is silently dropped. - * + * <p> * Row keys for each event consist of a timestamp concatenated with an * identifier which enforces uniqueness of keys across flume agents. - * + * <p> * See static constant variables for configuration options. */ public class RegexHbaseEventSerializer implements HbaseEventSerializer { @@ -108,21 +107,21 @@ public class RegexHbaseEventSerializer implements HbaseEventSerializer { String colNameStr = context.getString(COL_NAME_CONFIG, COLUMN_NAME_DEFAULT); String[] columnNames = colNameStr.split(","); - for (String s: columnNames) { + for (String s : columnNames) { colNames.add(s.getBytes(charset)); } //Rowkey is optional, default is -1 rowKeyIndex = context.getInteger(ROW_KEY_INDEX_CONFIG, -1); //if row key is being used, make sure it is specified correct - if(rowKeyIndex >=0){ - if(rowKeyIndex >= columnNames.length) { + if (rowKeyIndex >= 0) { + if (rowKeyIndex >= columnNames.length) { throw new IllegalArgumentException(ROW_KEY_INDEX_CONFIG + " must be " + - "less than num columns " + columnNames.length); + "less than num columns " + columnNames.length); } - if(!ROW_KEY_NAME.equalsIgnoreCase(columnNames[rowKeyIndex])) { + if (!ROW_KEY_NAME.equalsIgnoreCase(columnNames[rowKeyIndex])) { throw new IllegalArgumentException("Column at " + rowKeyIndex + " must be " - + ROW_KEY_NAME + " and is " + columnNames[rowKeyIndex]); + + ROW_KEY_NAME + " and is " + columnNames[rowKeyIndex]); } } } @@ -181,15 +180,15 @@ public class RegexHbaseEventSerializer implements HbaseEventSerializer { } try { - if(rowKeyIndex < 0){ + if (rowKeyIndex < 0) { rowKey = getRowKey(); - }else{ + } else { rowKey = m.group(rowKeyIndex + 1).getBytes(Charsets.UTF_8); } Put put = new Put(rowKey); for (int i = 0; i < colNames.size(); i++) { - if(i != rowKeyIndex) { + if (i != rowKeyIndex) { put.add(cf, colNames.get(i), m.group(i + 1).getBytes(Charsets.UTF_8)); } } @@ -211,5 +210,6 @@ public class RegexHbaseEventSerializer implements HbaseEventSerializer { } @Override - public void close() { } + public void close() { + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/SimpleAsyncHbaseEventSerializer.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/SimpleAsyncHbaseEventSerializer.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/SimpleAsyncHbaseEventSerializer.java index 96095d1..3f442e8 100644 --- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/SimpleAsyncHbaseEventSerializer.java +++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/SimpleAsyncHbaseEventSerializer.java @@ -18,18 +18,17 @@ */ package org.apache.flume.sink.hbase; -import java.util.ArrayList; -import java.util.List; - +import com.google.common.base.Charsets; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.FlumeException; -import org.hbase.async.AtomicIncrementRequest; -import org.hbase.async.PutRequest; import org.apache.flume.conf.ComponentConfiguration; import org.apache.flume.sink.hbase.SimpleHbaseEventSerializer.KeyType; +import org.hbase.async.AtomicIncrementRequest; +import org.hbase.async.PutRequest; -import com.google.common.base.Charsets; +import java.util.ArrayList; +import java.util.List; /** * A simple serializer to be used with the AsyncHBaseSink @@ -69,7 +68,7 @@ public class SimpleAsyncHbaseEventSerializer implements AsyncHbaseEventSerialize @Override public List<PutRequest> getActions() { List<PutRequest> actions = new ArrayList<PutRequest>(); - if(payloadColumn != null){ + if (payloadColumn != null) { byte[] rowKey; try { switch (keyType) { @@ -89,17 +88,16 @@ public class SimpleAsyncHbaseEventSerializer implements AsyncHbaseEventSerialize PutRequest putRequest = new PutRequest(table, rowKey, cf, payloadColumn, payload); actions.add(putRequest); - } catch (Exception e){ + } catch (Exception e) { throw new FlumeException("Could not get row key!", e); } } return actions; } - public List<AtomicIncrementRequest> getIncrements(){ - List<AtomicIncrementRequest> actions = new - ArrayList<AtomicIncrementRequest>(); - if(incrementColumn != null) { + public List<AtomicIncrementRequest> getIncrements() { + List<AtomicIncrementRequest> actions = new ArrayList<AtomicIncrementRequest>(); + if (incrementColumn != null) { AtomicIncrementRequest inc = new AtomicIncrementRequest(table, incrementRow, cf, incrementColumn); actions.add(inc); @@ -119,23 +117,22 @@ public class SimpleAsyncHbaseEventSerializer implements AsyncHbaseEventSerialize String iCol = context.getString("incrementColumn", "iCol"); rowPrefix = context.getString("rowPrefix", "default"); String suffix = context.getString("suffix", "uuid"); - if(pCol != null && !pCol.isEmpty()) { - if(suffix.equals("timestamp")){ + if (pCol != null && !pCol.isEmpty()) { + if (suffix.equals("timestamp")) { keyType = KeyType.TS; } else if (suffix.equals("random")) { keyType = KeyType.RANDOM; - } else if(suffix.equals("nano")){ + } else if (suffix.equals("nano")) { keyType = KeyType.TSNANO; } else { keyType = KeyType.UUID; } payloadColumn = pCol.getBytes(Charsets.UTF_8); } - if(iCol != null && !iCol.isEmpty()) { + if (iCol != null && !iCol.isEmpty()) { incrementColumn = iCol.getBytes(Charsets.UTF_8); } - incrementRow = - context.getString("incrementRow", "incRow").getBytes(Charsets.UTF_8); + incrementRow = context.getString("incrementRow", "incRow").getBytes(Charsets.UTF_8); } @Override http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/SimpleHbaseEventSerializer.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/SimpleHbaseEventSerializer.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/SimpleHbaseEventSerializer.java index 758252b..dc89fd7 100644 --- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/SimpleHbaseEventSerializer.java +++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/SimpleHbaseEventSerializer.java @@ -19,9 +19,7 @@ package org.apache.flume.sink.hbase; -import java.util.LinkedList; -import java.util.List; - +import com.google.common.base.Charsets; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.FlumeException; @@ -30,19 +28,18 @@ import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Row; -import com.google.common.base.Charsets; +import java.util.LinkedList; +import java.util.List; /** * A simple serializer that returns puts from an event, by writing the event * body into it. The headers are discarded. It also updates a row in hbase * which acts as an event counter. - * - * Takes optional parameters:<p> + * <p>Takes optional parameters:<p> * <tt>rowPrefix:</tt> The prefix to be used. Default: <i>default</i><p> * <tt>incrementRow</tt> The row to increment. Default: <i>incRow</i><p> * <tt>suffix:</tt> <i>uuid/random/timestamp.</i>Default: <i>uuid</i><p> - * - * Mandatory parameters: <p> + * <p>Mandatory parameters: <p> * <tt>cf:</tt>Column family.<p> * Components that have no defaults and will not be used if null: * <tt>payloadColumn:</tt> Which column to put payload in. If it is null, @@ -59,8 +56,7 @@ public class SimpleHbaseEventSerializer implements HbaseEventSerializer { private KeyType keyType; private byte[] payload; - public SimpleHbaseEventSerializer(){ - + public SimpleHbaseEventSerializer() { } @Override @@ -70,21 +66,21 @@ public class SimpleHbaseEventSerializer implements HbaseEventSerializer { context.getString("incrementRow", "incRow").getBytes(Charsets.UTF_8); String suffix = context.getString("suffix", "uuid"); - String payloadColumn = context.getString("payloadColumn","pCol"); - String incColumn = context.getString("incrementColumn","iCol"); - if(payloadColumn != null && !payloadColumn.isEmpty()) { - if(suffix.equals("timestamp")){ + String payloadColumn = context.getString("payloadColumn", "pCol"); + String incColumn = context.getString("incrementColumn", "iCol"); + if (payloadColumn != null && !payloadColumn.isEmpty()) { + if (suffix.equals("timestamp")) { keyType = KeyType.TS; } else if (suffix.equals("random")) { keyType = KeyType.RANDOM; - } else if(suffix.equals("nano")){ + } else if (suffix.equals("nano")) { keyType = KeyType.TSNANO; } else { keyType = KeyType.UUID; } plCol = payloadColumn.getBytes(Charsets.UTF_8); } - if(incColumn != null && !incColumn.isEmpty()) { + if (incColumn != null && !incColumn.isEmpty()) { incCol = incColumn.getBytes(Charsets.UTF_8); } } @@ -102,14 +98,14 @@ public class SimpleHbaseEventSerializer implements HbaseEventSerializer { @Override public List<Row> getActions() throws FlumeException { List<Row> actions = new LinkedList<Row>(); - if(plCol != null){ + if (plCol != null) { byte[] rowKey; try { if (keyType == KeyType.TS) { rowKey = SimpleRowKeyGenerator.getTimestampKey(rowPrefix); - } else if(keyType == KeyType.RANDOM) { + } else if (keyType == KeyType.RANDOM) { rowKey = SimpleRowKeyGenerator.getRandomKey(rowPrefix); - } else if(keyType == KeyType.TSNANO) { + } else if (keyType == KeyType.TSNANO) { rowKey = SimpleRowKeyGenerator.getNanoTimestampKey(rowPrefix); } else { rowKey = SimpleRowKeyGenerator.getUUIDKey(rowPrefix); @@ -117,33 +113,34 @@ public class SimpleHbaseEventSerializer implements HbaseEventSerializer { Put put = new Put(rowKey); put.add(cf, plCol, payload); actions.add(put); - } catch (Exception e){ + } catch (Exception e) { throw new FlumeException("Could not get row key!", e); } } return actions; } - @Override - public List<Increment> getIncrements(){ - List<Increment> incs = new LinkedList<Increment>(); - if(incCol != null) { - Increment inc = new Increment(incrementRow); - inc.addColumn(cf, incCol, 1); - incs.add(inc); - } - return incs; - } - @Override - public void close() { + @Override + public List<Increment> getIncrements() { + List<Increment> incs = new LinkedList<Increment>(); + if (incCol != null) { + Increment inc = new Increment(incrementRow); + inc.addColumn(cf, incCol, 1); + incs.add(inc); } + return incs; + } - public enum KeyType{ - UUID, - RANDOM, - TS, - TSNANO; - } + @Override + public void close() { + } + public enum KeyType { + UUID, + RANDOM, + TS, + TSNANO; } + +} http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/SimpleRowKeyGenerator.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/SimpleRowKeyGenerator.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/SimpleRowKeyGenerator.java index b25eb6a..2d654f2 100644 --- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/SimpleRowKeyGenerator.java +++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/SimpleRowKeyGenerator.java @@ -25,28 +25,22 @@ import java.util.UUID; /** * Utility class for users to generate their own keys. Any key can be used, * this is just a utility that provides a set of simple keys. - * - * */ public class SimpleRowKeyGenerator { - public static byte[] getUUIDKey(String prefix) - throws UnsupportedEncodingException{ + public static byte[] getUUIDKey(String prefix) throws UnsupportedEncodingException { return (prefix + UUID.randomUUID().toString()).getBytes("UTF8"); } - public static byte[] getRandomKey(String prefix) - throws UnsupportedEncodingException{ + public static byte[] getRandomKey(String prefix) throws UnsupportedEncodingException { return (prefix + String.valueOf(new Random().nextLong())).getBytes("UTF8"); } - public static byte[] getTimestampKey(String prefix) - throws UnsupportedEncodingException { - return (prefix + String.valueOf( - System.currentTimeMillis())).getBytes("UTF8"); + + public static byte[] getTimestampKey(String prefix) throws UnsupportedEncodingException { + return (prefix + String.valueOf(System.currentTimeMillis())).getBytes("UTF8"); } - public static byte[] getNanoTimestampKey(String prefix) - throws UnsupportedEncodingException{ - return (prefix + String.valueOf( - System.nanoTime())).getBytes("UTF8"); + + public static byte[] getNanoTimestampKey(String prefix) throws UnsupportedEncodingException { + return (prefix + String.valueOf(System.nanoTime())).getBytes("UTF8"); } } http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java index 7bef7f3..9453546 100644 --- a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java +++ b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java @@ -157,7 +157,7 @@ public class KafkaSink extends AbstractSink implements Configurable { if (event == null) { // no events available in channel - if(processedEvents == 0) { + if (processedEvents == 0) { result = Status.BACKOFF; counter.incrementBatchEmptyCount(); } else { @@ -177,7 +177,7 @@ public class KafkaSink extends AbstractSink implements Configurable { if (logger.isDebugEnabled()) { logger.debug("{Event} " + eventTopic + " : " + eventKey + " : " - + new String(eventBody, "UTF-8")); + + new String(eventBody, "UTF-8")); logger.debug("event #{}", processedEvents); } @@ -185,8 +185,10 @@ public class KafkaSink extends AbstractSink implements Configurable { long startTime = System.currentTimeMillis(); try { - kafkaFutures.add(producer.send(new ProducerRecord<String, byte[]> (eventTopic, eventKey, serializeEvent(event, useAvroEventFormat)), - new SinkCallback(startTime))); + kafkaFutures.add(producer.send( + new ProducerRecord<String, byte[]>(eventTopic, eventKey, + serializeEvent(event, useAvroEventFormat)), + new SinkCallback(startTime))); } catch (IOException ex) { throw new EventDeliveryException("Could not serialize event", ex); } @@ -197,11 +199,11 @@ public class KafkaSink extends AbstractSink implements Configurable { // publish batch and commit. if (processedEvents > 0) { - for (Future<RecordMetadata> future : kafkaFutures) { - future.get(); - } + for (Future<RecordMetadata> future : kafkaFutures) { + future.get(); + } long endTime = System.nanoTime(); - counter.addToKafkaEventSendTimer((endTime-batchStartTime)/(1000*1000)); + counter.addToKafkaEventSendTimer((endTime - batchStartTime) / (1000 * 1000)); counter.addToEventDrainSuccessCount(Long.valueOf(kafkaFutures.size())); } @@ -270,8 +272,7 @@ public class KafkaSink extends AbstractSink implements Configurable { if (topicStr == null || topicStr.isEmpty()) { topicStr = DEFAULT_TOPIC; logger.warn("Topic was not specified. Using {} as the topic.", topicStr); - } - else { + } else { logger.info("Using the static topic {}. This may be overridden by event headers", topicStr); } @@ -283,7 +284,8 @@ public class KafkaSink extends AbstractSink implements Configurable { logger.debug("Using batch size: {}", batchSize); } - useAvroEventFormat = context.getBoolean(KafkaSinkConstants.AVRO_EVENT, KafkaSinkConstants.DEFAULT_AVRO_EVENT); + useAvroEventFormat = context.getBoolean(KafkaSinkConstants.AVRO_EVENT, + KafkaSinkConstants.DEFAULT_AVRO_EVENT); if (logger.isDebugEnabled()) { logger.debug(KafkaSinkConstants.AVRO_EVENT + " set to: {}", useAvroEventFormat); @@ -322,7 +324,8 @@ public class KafkaSink extends AbstractSink implements Configurable { throw new ConfigurationException("Bootstrap Servers must be specified"); } else { ctx.put(BOOTSTRAP_SERVERS_CONFIG, brokerList); - logger.warn("{} is deprecated. Please use the parameter {}", BROKER_LIST_FLUME_KEY, BOOTSTRAP_SERVERS_CONFIG); + logger.warn("{} is deprecated. Please use the parameter {}", + BROKER_LIST_FLUME_KEY, BOOTSTRAP_SERVERS_CONFIG); } } @@ -348,21 +351,18 @@ public class KafkaSink extends AbstractSink implements Configurable { if (ctx.containsKey(KEY_SERIALIZER_KEY )) { logger.warn("{} is deprecated. Flume now uses the latest Kafka producer which implements " + - "a different interface for serializers. Please use the parameter {}", - KEY_SERIALIZER_KEY,KAFKA_PRODUCER_PREFIX + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG); + "a different interface for serializers. Please use the parameter {}", + KEY_SERIALIZER_KEY,KAFKA_PRODUCER_PREFIX + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG); } if (ctx.containsKey(MESSAGE_SERIALIZER_KEY)) { logger.warn("{} is deprecated. Flume now uses the latest Kafka producer which implements " + - "a different interface for serializers. Please use the parameter {}", - MESSAGE_SERIALIZER_KEY,KAFKA_PRODUCER_PREFIX + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG); + "a different interface for serializers. Please use the parameter {}", + MESSAGE_SERIALIZER_KEY, + KAFKA_PRODUCER_PREFIX + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG); } - - - - - } + private void setProducerProps(Context context, String bootStrapServers) { kafkaProps.put(ProducerConfig.ACKS_CONFIG, DEFAULT_ACKS); //Defaults overridden based on config @@ -387,7 +387,8 @@ public class KafkaSink extends AbstractSink implements Configurable { writer = Optional.of(new SpecificDatumWriter<AvroFlumeEvent>(AvroFlumeEvent.class)); } tempOutStream.get().reset(); - AvroFlumeEvent e = new AvroFlumeEvent(toCharSeqMap(event.getHeaders()), ByteBuffer.wrap(event.getBody())); + AvroFlumeEvent e = new AvroFlumeEvent(toCharSeqMap(event.getHeaders()), + ByteBuffer.wrap(event.getBody())); encoder = EncoderFactory.get().directBinaryEncoder(tempOutStream.get(), encoder); writer.get().write(e, encoder); encoder.flush(); http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java index 6b64bc1..1bf380c 100644 --- a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java +++ b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java @@ -29,7 +29,8 @@ public class KafkaSinkConstants { public static final String TOPIC_CONFIG = KAFKA_PREFIX + "topic"; public static final String BATCH_SIZE = "flumeBatchSize"; - public static final String BOOTSTRAP_SERVERS_CONFIG = KAFKA_PREFIX + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG; + public static final String BOOTSTRAP_SERVERS_CONFIG = + KAFKA_PREFIX + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG; public static final String KEY_HEADER = "key"; public static final String TOPIC_HEADER = "topic"; @@ -37,25 +38,23 @@ public class KafkaSinkConstants { public static final String AVRO_EVENT = "useFlumeEventFormat"; public static final boolean DEFAULT_AVRO_EVENT = false; - public static final String DEFAULT_KEY_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer"; - public static final String DEFAULT_VALUE_SERIAIZER = "org.apache.kafka.common.serialization.ByteArraySerializer"; + public static final String DEFAULT_KEY_SERIALIZER = + "org.apache.kafka.common.serialization.StringSerializer"; + public static final String DEFAULT_VALUE_SERIAIZER = + "org.apache.kafka.common.serialization.ByteArraySerializer"; public static final int DEFAULT_BATCH_SIZE = 100; public static final String DEFAULT_TOPIC = "default-flume-topic"; public static final String DEFAULT_ACKS = "1"; - /* Old Properties */ - /* Properties */ + /* Properties */ public static final String OLD_BATCH_SIZE = "batchSize"; public static final String MESSAGE_SERIALIZER_KEY = "serializer.class"; public static final String KEY_SERIALIZER_KEY = "key.serializer.class"; public static final String BROKER_LIST_FLUME_KEY = "brokerList"; public static final String REQUIRED_ACKS_FLUME_KEY = "requiredAcks"; - - - } http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/BlobDeserializer.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/BlobDeserializer.java b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/BlobDeserializer.java index 12bdc40..095f889 100644 --- a/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/BlobDeserializer.java +++ b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/BlobDeserializer.java @@ -84,7 +84,8 @@ public class BlobDeserializer implements EventDeserializer { blob.write(buf, 0, n); blobLength += n; if (blobLength >= maxBlobLength) { - LOGGER.warn("File length exceeds maxBlobLength ({}), truncating BLOB event!", maxBlobLength); + LOGGER.warn("File length exceeds maxBlobLength ({}), truncating BLOB event!", + maxBlobLength); break; } } http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/BlobHandler.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/BlobHandler.java b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/BlobHandler.java index e84dec1..ca7614a 100644 --- a/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/BlobHandler.java +++ b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/BlobHandler.java @@ -87,7 +87,8 @@ public class BlobHandler implements HTTPSourceHandler { blob.write(buf, 0, n); blobLength += n; if (blobLength >= maxBlobLength) { - LOGGER.warn("Request length exceeds maxBlobLength ({}), truncating BLOB event!", maxBlobLength); + LOGGER.warn("Request length exceeds maxBlobLength ({}), truncating BLOB event!", + maxBlobLength); break; } } http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineHandlerImpl.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineHandlerImpl.java b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineHandlerImpl.java index d3154af..d877814 100644 --- a/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineHandlerImpl.java +++ b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineHandlerImpl.java @@ -97,8 +97,10 @@ public class MorphlineHandlerImpl implements MorphlineHandler { .build(); } - Config override = ConfigFactory.parseMap(context.getSubProperties(MORPHLINE_VARIABLE_PARAM + ".")); - morphline = new Compiler().compile(new File(morphlineFile), morphlineId, morphlineContext, finalChild, override); + Config override = ConfigFactory.parseMap( + context.getSubProperties(MORPHLINE_VARIABLE_PARAM + ".")); + morphline = new Compiler().compile( + new File(morphlineFile), morphlineId, morphlineContext, finalChild, override); this.mappingTimer = morphlineContext.getMetricRegistry().timer( MetricRegistry.name("morphline.app", Metrics.ELAPSED_TIME)); http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineInterceptor.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineInterceptor.java b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineInterceptor.java index ef8f716..3b94133 100644 --- a/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineInterceptor.java +++ b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineInterceptor.java @@ -47,12 +47,13 @@ import com.google.common.io.ByteStreams; public class MorphlineInterceptor implements Interceptor { private final Context context; - private final Queue<LocalMorphlineInterceptor> pool = new ConcurrentLinkedQueue<LocalMorphlineInterceptor>(); + private final Queue<LocalMorphlineInterceptor> pool = new ConcurrentLinkedQueue<>(); protected MorphlineInterceptor(Context context) { Preconditions.checkNotNull(context); this.context = context; - returnToPool(new LocalMorphlineInterceptor(context)); // fail fast on morphline compilation exception + // fail fast on morphline compilation exception + returnToPool(new LocalMorphlineInterceptor(context)); } @Override http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineSink.java b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineSink.java index 9c4dc25..f7a73f3 100644 --- a/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineSink.java +++ b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineSink.java @@ -160,15 +160,15 @@ public class MorphlineSink extends AbstractSink implements Configurable { return numEventsTaken == 0 ? Status.BACKOFF : Status.READY; } catch (Throwable t) { // Ooops - need to rollback and back off - LOGGER.error("Morphline Sink " + getName() + ": Unable to process event from channel " + myChannel.getName() - + ". Exception follows.", t); + LOGGER.error("Morphline Sink " + getName() + ": Unable to process event from channel " + + myChannel.getName() + ". Exception follows.", t); try { if (!isMorphlineTransactionCommitted) { handler.rollbackTransaction(); } } catch (Throwable t2) { - LOGGER.error("Morphline Sink " + getName() + ": Unable to rollback morphline transaction. " + - "Exception follows.", t2); + LOGGER.error("Morphline Sink " + getName() + + ": Unable to rollback morphline transaction. Exception follows.", t2); } finally { try { txn.rollback(); http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/DefaultJMSMessageConverter.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/DefaultJMSMessageConverter.java b/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/DefaultJMSMessageConverter.java index 6b327ce..acb5118 100644 --- a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/DefaultJMSMessageConverter.java +++ b/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/DefaultJMSMessageConverter.java @@ -57,15 +57,17 @@ import org.apache.flume.event.SimpleEvent; public class DefaultJMSMessageConverter implements JMSMessageConverter { private final Charset charset; + private DefaultJMSMessageConverter(String charset) { this.charset = Charset.forName(charset); } + public static class Builder implements JMSMessageConverter.Builder { @Override public JMSMessageConverter build(Context context) { - return new DefaultJMSMessageConverter(context. - getString(JMSSourceConfiguration.CONVERTER_CHARSET, - JMSSourceConfiguration.CONVERTER_CHARSET_DEFAULT).trim()); + return new DefaultJMSMessageConverter(context.getString( + JMSSourceConfiguration.CONVERTER_CHARSET, + JMSSourceConfiguration.CONVERTER_CHARSET_DEFAULT).trim()); } } @@ -75,52 +77,52 @@ public class DefaultJMSMessageConverter implements JMSMessageConverter { Map<String, String> headers = event.getHeaders(); @SuppressWarnings("rawtypes") Enumeration propertyNames = message.getPropertyNames(); - while(propertyNames.hasMoreElements()) { + while (propertyNames.hasMoreElements()) { String name = propertyNames.nextElement().toString(); String value = message.getStringProperty(name); headers.put(name, value); } - if(message instanceof BytesMessage) { + if (message instanceof BytesMessage) { BytesMessage bytesMessage = (BytesMessage)message; long length = bytesMessage.getBodyLength(); - if(length > 0L) { + if (length > 0L) { if (length > Integer.MAX_VALUE) { throw new JMSException("Unable to process message " + "of size " + length); } byte[] body = new byte[(int)length]; int count = bytesMessage.readBytes(body); - if(count != length) { + if (count != length) { throw new JMSException("Unable to read full message. " + "Read " + count + " of total " + length); } event.setBody(body); } - } else if(message instanceof TextMessage) { + } else if (message instanceof TextMessage) { TextMessage textMessage = (TextMessage)message; event.setBody(textMessage.getText().getBytes(charset)); - } else if(message instanceof ObjectMessage) { + } else if (message instanceof ObjectMessage) { ObjectMessage objectMessage = (ObjectMessage)message; Object object = objectMessage.getObject(); - if(object != null) { + if (object != null) { ByteArrayOutputStream bos = new ByteArrayOutputStream(); ObjectOutput out = null; try { out = new ObjectOutputStream(bos); out.writeObject(object); event.setBody(bos.toByteArray()); - } catch(IOException e) { + } catch (IOException e) { throw new FlumeException("Error serializing object", e); } finally { try { - if(out != null) { + if (out != null) { out.close(); } } catch (IOException e) { throw new FlumeException("Error closing ObjectOutputStream", e); } try { - if(bos != null) { + if (bos != null) { bos.close(); } } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/InitialContextFactory.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/InitialContextFactory.java b/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/InitialContextFactory.java index 2f0220a..8874dd1 100644 --- a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/InitialContextFactory.java +++ b/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/InitialContextFactory.java @@ -22,7 +22,6 @@ import java.util.Properties; import javax.naming.InitialContext; import javax.naming.NamingException; - public class InitialContextFactory { public InitialContext create(Properties properties) throws NamingException { http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConsumer.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConsumer.java b/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConsumer.java index 7a9461b..6b3a1cf 100644 --- a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConsumer.java +++ b/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConsumer.java @@ -18,8 +18,12 @@ */ package org.apache.flume.source.jms; -import java.util.ArrayList; -import java.util.List; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import org.apache.flume.Event; +import org.apache.flume.FlumeException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.jms.Connection; import javax.jms.ConnectionFactory; @@ -30,14 +34,8 @@ import javax.jms.MessageConsumer; import javax.jms.Session; import javax.naming.InitialContext; import javax.naming.NamingException; - -import org.apache.flume.Event; -import org.apache.flume.FlumeException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Optional; -import com.google.common.base.Preconditions; +import java.util.ArrayList; +import java.util.List; class JMSMessageConsumer { private static final Logger logger = LoggerFactory @@ -52,11 +50,11 @@ class JMSMessageConsumer { private final Destination destination; private final MessageConsumer messageConsumer; - JMSMessageConsumer(InitialContext initialContext, ConnectionFactory connectionFactory, String destinationName, - JMSDestinationLocator destinationLocator, JMSDestinationType destinationType, - String messageSelector, int batchSize, long pollTimeout, - JMSMessageConverter messageConverter, - Optional<String> userName, Optional<String> password) { + JMSMessageConsumer(InitialContext initialContext, ConnectionFactory connectionFactory, + String destinationName, JMSDestinationLocator destinationLocator, + JMSDestinationType destinationType, String messageSelector, int batchSize, + long pollTimeout, JMSMessageConverter messageConverter, + Optional<String> userName, Optional<String> password) { this.batchSize = batchSize; this.pollTimeout = pollTimeout; this.messageConverter = messageConverter; @@ -65,7 +63,7 @@ class JMSMessageConsumer { Preconditions.checkArgument(pollTimeout >= 0, "Poll timeout cannot be " + "negative"); try { - if(userName.isPresent()) { + if (userName.isPresent()) { connection = connectionFactory.createConnection(userName.get(), password.get()); } else { @@ -82,37 +80,37 @@ class JMSMessageConsumer { throw new FlumeException("Could not create session", e); } - try { - if (destinationLocator.equals(JMSDestinationLocator.CDI)) { - switch (destinationType) { - case QUEUE: - destination = session.createQueue(destinationName); - break; - case TOPIC: - destination = session.createTopic(destinationName); - break; - default: - throw new IllegalStateException(String.valueOf(destinationType)); + try { + if (destinationLocator.equals(JMSDestinationLocator.CDI)) { + switch (destinationType) { + case QUEUE: + destination = session.createQueue(destinationName); + break; + case TOPIC: + destination = session.createTopic(destinationName); + break; + default: + throw new IllegalStateException(String.valueOf(destinationType)); + } + } else { + destination = (Destination) initialContext.lookup(destinationName); } - } else { - destination = (Destination) initialContext.lookup(destinationName); + } catch (JMSException e) { + throw new FlumeException("Could not create destination " + destinationName, e); + } catch (NamingException e) { + throw new FlumeException("Could not find destination " + destinationName, e); } - } catch (JMSException e) { - throw new FlumeException("Could not create destination " + destinationName, e); - } catch (NamingException e) { - throw new FlumeException("Could not find destination " + destinationName, e); - } - try { + try { messageConsumer = session.createConsumer(destination, - messageSelector.isEmpty() ? null: messageSelector); + messageSelector.isEmpty() ? null : messageSelector); } catch (JMSException e) { throw new FlumeException("Could not create consumer", e); } String startupMsg = String.format("Connected to '%s' of type '%s' with " + - "user '%s', batch size '%d', selector '%s' ", destinationName, + "user '%s', batch size '%d', selector '%s' ", destinationName, destinationType, userName.isPresent() ? userName.get() : "null", - batchSize, messageSelector.isEmpty() ? null : messageSelector); + batchSize, messageSelector.isEmpty() ? null : messageSelector); logger.info(startupMsg); } @@ -120,23 +118,23 @@ class JMSMessageConsumer { List<Event> result = new ArrayList<Event>(batchSize); Message message; message = messageConsumer.receive(pollTimeout); - if(message != null) { + if (message != null) { result.addAll(messageConverter.convert(message)); int max = batchSize - 1; for (int i = 0; i < max; i++) { message = messageConsumer.receiveNoWait(); - if(message == null) { + if (message == null) { break; } result.addAll(messageConverter.convert(message)); } } - if(logger.isDebugEnabled()) { - logger.debug(String.format("Took batch of %s from %s", result.size(), - destination)); + if (logger.isDebugEnabled()) { + logger.debug(String.format("Took batch of %s from %s", result.size(), destination)); } return result; } + void commit() { try { session.commit(); http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConsumerFactory.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConsumerFactory.java b/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConsumerFactory.java index af74bf4..9747a31 100644 --- a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConsumerFactory.java +++ b/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConsumerFactory.java @@ -22,15 +22,15 @@ import javax.naming.InitialContext; import com.google.common.base.Optional; - public class JMSMessageConsumerFactory { JMSMessageConsumer create(InitialContext initialContext, ConnectionFactory connectionFactory, - String destinationName, JMSDestinationType destinationType, JMSDestinationLocator destinationLocator, - String messageSelector, int batchSize, long pollTimeout, JMSMessageConverter messageConverter, - Optional<String> userName, Optional<String> password) { + String destinationName, JMSDestinationType destinationType, + JMSDestinationLocator destinationLocator, String messageSelector, int batchSize, + long pollTimeout, JMSMessageConverter messageConverter, + Optional<String> userName, Optional<String> password) { return new JMSMessageConsumer(initialContext, connectionFactory, destinationName, - destinationLocator, destinationType, messageSelector, batchSize, pollTimeout, + destinationLocator, destinationType, messageSelector, batchSize, pollTimeout, messageConverter, userName, password); } } http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSSource.java b/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSSource.java index c1cc9cf..7631827 100644 --- a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSSource.java +++ b/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSSource.java @@ -76,40 +76,39 @@ public class JMSSource extends AbstractPollableSource { private int jmsExceptionCounter; private InitialContext initialContext; - public JMSSource() { this(new JMSMessageConsumerFactory(), new InitialContextFactory()); } + @VisibleForTesting - public JMSSource(JMSMessageConsumerFactory consumerFactory, InitialContextFactory initialContextFactory) { + public JMSSource(JMSMessageConsumerFactory consumerFactory, + InitialContextFactory initialContextFactory) { super(); this.consumerFactory = consumerFactory; this.initialContextFactory = initialContextFactory; - } @Override protected void doConfigure(Context context) throws FlumeException { sourceCounter = new SourceCounter(getName()); - initialContextFactoryName = context.getString(JMSSourceConfiguration. - INITIAL_CONTEXT_FACTORY, "").trim(); + initialContextFactoryName = context.getString( + JMSSourceConfiguration.INITIAL_CONTEXT_FACTORY, "").trim(); - providerUrl = context.getString(JMSSourceConfiguration.PROVIDER_URL, "") - .trim(); + providerUrl = context.getString(JMSSourceConfiguration.PROVIDER_URL, "").trim(); - destinationName = context.getString(JMSSourceConfiguration. - DESTINATION_NAME, "").trim(); + destinationName = context.getString(JMSSourceConfiguration.DESTINATION_NAME, "").trim(); - String destinationTypeName = context.getString(JMSSourceConfiguration. - DESTINATION_TYPE, "").trim().toUpperCase(Locale.ENGLISH); + String destinationTypeName = context.getString( + JMSSourceConfiguration.DESTINATION_TYPE, "").trim().toUpperCase(Locale.ENGLISH); - String destinationLocatorName = context.getString(JMSSourceConfiguration. - DESTINATION_LOCATOR, JMSSourceConfiguration.DESTINATION_LOCATOR_DEFAULT) - .trim().toUpperCase(Locale.ENGLISH); + String destinationLocatorName = context.getString( + JMSSourceConfiguration.DESTINATION_LOCATOR, + JMSSourceConfiguration.DESTINATION_LOCATOR_DEFAULT) + .trim().toUpperCase(Locale.ENGLISH); - messageSelector = context.getString(JMSSourceConfiguration. - MESSAGE_SELECTOR, "").trim(); + messageSelector = context.getString( + JMSSourceConfiguration.MESSAGE_SELECTOR, "").trim(); batchSize = context.getInteger(JMSSourceConfiguration.BATCH_SIZE, JMSSourceConfiguration.BATCH_SIZE_DEFAULT); @@ -117,16 +116,14 @@ public class JMSSource extends AbstractPollableSource { errorThreshold = context.getInteger(JMSSourceConfiguration.ERROR_THRESHOLD, JMSSourceConfiguration.ERROR_THRESHOLD_DEFAULT); - userName = Optional.fromNullable(context.getString(JMSSourceConfiguration. - USERNAME)); + userName = Optional.fromNullable(context.getString(JMSSourceConfiguration.USERNAME)); pollTimeout = context.getLong(JMSSourceConfiguration.POLL_TIMEOUT, JMSSourceConfiguration.POLL_TIMEOUT_DEFAULT); - String passwordFile = context.getString(JMSSourceConfiguration. - PASSWORD_FILE, "").trim(); + String passwordFile = context.getString(JMSSourceConfiguration.PASSWORD_FILE, "").trim(); - if(passwordFile.isEmpty()) { + if (passwordFile.isEmpty()) { password = Optional.of(""); } else { try { @@ -140,45 +137,38 @@ public class JMSSource extends AbstractPollableSource { String converterClassName = context.getString( JMSSourceConfiguration.CONVERTER_TYPE, - JMSSourceConfiguration.CONVERTER_TYPE_DEFAULT) - .trim(); - if(JMSSourceConfiguration.CONVERTER_TYPE_DEFAULT. - equalsIgnoreCase(converterClassName)) { + JMSSourceConfiguration.CONVERTER_TYPE_DEFAULT).trim(); + if (JMSSourceConfiguration.CONVERTER_TYPE_DEFAULT.equalsIgnoreCase(converterClassName)) { converterClassName = DefaultJMSMessageConverter.Builder.class.getName(); } - Context converterContext = new Context(context. - getSubProperties(JMSSourceConfiguration.CONVERTER + ".")); + Context converterContext = new Context(context.getSubProperties( + JMSSourceConfiguration.CONVERTER + ".")); try { @SuppressWarnings("rawtypes") Class clazz = Class.forName(converterClassName); boolean isBuilder = JMSMessageConverter.Builder.class .isAssignableFrom(clazz); - if(isBuilder) { - JMSMessageConverter.Builder builder = (JMSMessageConverter.Builder) - clazz.newInstance(); + if (isBuilder) { + JMSMessageConverter.Builder builder = (JMSMessageConverter.Builder)clazz.newInstance(); converter = builder.build(converterContext); } else { - Preconditions.checkState(JMSMessageConverter.class. - isAssignableFrom(clazz), String. - format("Class %s is not a subclass of JMSMessageConverter", - clazz.getName())); + Preconditions.checkState(JMSMessageConverter.class.isAssignableFrom(clazz), + String.format("Class %s is not a subclass of JMSMessageConverter", clazz.getName())); converter = (JMSMessageConverter)clazz.newInstance(); - boolean configured = Configurables.configure(converter, - converterContext); - if(logger.isDebugEnabled()) { - logger.debug(String. - format("Attempted configuration of %s, result = %s", - converterClassName, String.valueOf(configured))); + boolean configured = Configurables.configure(converter, converterContext); + if (logger.isDebugEnabled()) { + logger.debug(String.format("Attempted configuration of %s, result = %s", + converterClassName, String.valueOf(configured))); } } - } catch(Exception e) { + } catch (Exception e) { throw new FlumeException(String.format( "Unable to create instance of converter %s", converterClassName), e); } - String connectionFactoryName = context.getString(JMSSourceConfiguration. - CONNECTION_FACTORY, JMSSourceConfiguration.CONNECTION_FACTORY_DEFAULT) - .trim(); + String connectionFactoryName = context.getString( + JMSSourceConfiguration.CONNECTION_FACTORY, + JMSSourceConfiguration.CONNECTION_FACTORY_DEFAULT).trim(); assertNotEmpty(initialContextFactoryName, String.format( "Initial Context Factory is empty. This is specified by %s", @@ -210,8 +200,7 @@ public class JMSSource extends AbstractPollableSource { "invalid.", destinationLocatorName), e); } - Preconditions.checkArgument(batchSize > 0, "Batch size must be greater " + - "than 0"); + Preconditions.checkArgument(batchSize > 0, "Batch size must be greater than 0"); try { Properties contextProperties = new Properties(); @@ -223,12 +212,12 @@ public class JMSSource extends AbstractPollableSource { // Provide properties for connecting via JNDI if (this.userName.isPresent()) { - contextProperties.setProperty( - javax.naming.Context.SECURITY_PRINCIPAL, this.userName.get()); + contextProperties.setProperty(javax.naming.Context.SECURITY_PRINCIPAL, + this.userName.get()); } if (this.password.isPresent()) { - contextProperties.setProperty( - javax.naming.Context.SECURITY_CREDENTIALS, this.password.get()); + contextProperties.setProperty(javax.naming.Context.SECURITY_CREDENTIALS, + this.password.get()); } initialContext = initialContextFactory.create(contextProperties); @@ -239,28 +228,26 @@ public class JMSSource extends AbstractPollableSource { } try { - connectionFactory = (ConnectionFactory) initialContext. - lookup(connectionFactoryName); + connectionFactory = (ConnectionFactory) initialContext.lookup(connectionFactoryName); } catch (NamingException e) { throw new FlumeException("Could not lookup ConnectionFactory", e); } } private void assertNotEmpty(String arg, String msg) { - Preconditions.checkArgument(!arg.isEmpty(), - msg); + Preconditions.checkArgument(!arg.isEmpty(), msg); } @Override protected synchronized Status doProcess() throws EventDeliveryException { boolean error = true; try { - if(consumer == null) { + if (consumer == null) { consumer = createConsumer(); } List<Event> events = consumer.take(); int size = events.size(); - if(size == 0) { + if (size == 0) { error = false; return Status.BACKOFF; } @@ -275,28 +262,28 @@ public class JMSSource extends AbstractPollableSource { logger.warn("Error appending event to channel. " + "Channel might be full. Consider increasing the channel " + "capacity or make sure the sinks perform faster.", channelException); - } catch(JMSException jmsException) { + } catch (JMSException jmsException) { logger.warn("JMSException consuming events", jmsException); - if(++jmsExceptionCounter > errorThreshold) { - if(consumer != null) { + if (++jmsExceptionCounter > errorThreshold) { + if (consumer != null) { logger.warn("Exceeded JMSException threshold, closing consumer"); consumer.rollback(); consumer.close(); consumer = null; } } - } catch(Throwable throwable) { + } catch (Throwable throwable) { logger.error("Unexpected error processing events", throwable); - if(throwable instanceof Error) { + if (throwable instanceof Error) { throw (Error) throwable; } } finally { - if(error) { - if(consumer != null) { + if (error) { + if (consumer != null) { consumer.rollback(); } } else { - if(consumer != null) { + if (consumer != null) { consumer.commit(); jmsExceptionCounter = 0; } @@ -304,6 +291,7 @@ public class JMSSource extends AbstractPollableSource { } return Status.BACKOFF; } + @Override protected synchronized void doStart() { try { @@ -317,18 +305,18 @@ public class JMSSource extends AbstractPollableSource { @Override protected synchronized void doStop() { - if(consumer != null) { + if (consumer != null) { consumer.close(); consumer = null; } sourceCounter.stop(); } + private JMSMessageConsumer createConsumer() throws JMSException { logger.info("Creating new consumer for " + destinationName); JMSMessageConsumer consumer = consumerFactory.create(initialContext, - connectionFactory, destinationName, destinationType, destinationLocator, - messageSelector, batchSize, - pollTimeout, converter, userName, password); + connectionFactory, destinationName, destinationType, destinationLocator, + messageSelector, batchSize, pollTimeout, converter, userName, password); jmsExceptionCounter = 0; return consumer; } http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java index 84fef52..90e4715 100644 --- a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java +++ b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java @@ -112,18 +112,24 @@ public class KafkaSource extends AbstractPollableSource */ public abstract class Subscriber<T> { public abstract void subscribe(KafkaConsumer<?, ?> consumer, SourceRebalanceListener listener); - public T get() {return null;} + + public T get() { + return null; + } } private class TopicListSubscriber extends Subscriber<List<String>> { private List<String> topicList; + public TopicListSubscriber(String commaSeparatedTopics) { this.topicList = Arrays.asList(commaSeparatedTopics.split("^\\s+|\\s*,\\s*|\\s+$")); } + @Override public void subscribe(KafkaConsumer<?, ?> consumer, SourceRebalanceListener listener) { consumer.subscribe(topicList, listener); } + @Override public List<String> get() { return topicList; @@ -132,13 +138,16 @@ public class KafkaSource extends AbstractPollableSource private class PatternSubscriber extends Subscriber<Pattern> { private Pattern pattern; + public PatternSubscriber(String regex) { this.pattern = Pattern.compile(regex); } + @Override public void subscribe(KafkaConsumer<?, ?> consumer, SourceRebalanceListener listener) { consumer.subscribe(pattern, listener); } + @Override public Pattern get() { return pattern; @@ -232,10 +241,11 @@ public class KafkaSource extends AbstractPollableSource } if (log.isDebugEnabled()) { - log.debug("Topic: {} Partition: {} Message: {}", new String[]{ - message.topic(), - String.valueOf(message.partition()), - new String(eventBody)}); + log.debug("Topic: {} Partition: {} Message: {}", new String[] { + message.topic(), + String.valueOf(message.partition()), + new String(eventBody) + }); } event = EventBuilder.withBody(eventBody, headers); @@ -305,21 +315,21 @@ public class KafkaSource extends AbstractPollableSource if (topicProperty != null && !topicProperty.isEmpty()) { // create subscriber that uses pattern-based subscription subscriber = new PatternSubscriber(topicProperty); - } else - if((topicProperty = context.getString(KafkaSourceConstants.TOPICS)) != null && !topicProperty.isEmpty()) { + } else if ((topicProperty = context.getString(KafkaSourceConstants.TOPICS)) != null && + !topicProperty.isEmpty()) { // create subscriber that uses topic list subscription subscriber = new TopicListSubscriber(topicProperty); - } else - if (subscriber == null) { + } else if (subscriber == null) { throw new ConfigurationException("At least one Kafka topic must be specified."); } batchUpperLimit = context.getInteger(KafkaSourceConstants.BATCH_SIZE, - KafkaSourceConstants.DEFAULT_BATCH_SIZE); + KafkaSourceConstants.DEFAULT_BATCH_SIZE); maxBatchDurationMillis = context.getInteger(KafkaSourceConstants.BATCH_DURATION_MS, - KafkaSourceConstants.DEFAULT_BATCH_DURATION); + KafkaSourceConstants.DEFAULT_BATCH_DURATION); - useAvroEventFormat = context.getBoolean(KafkaSourceConstants.AVRO_EVENT, KafkaSourceConstants.DEFAULT_AVRO_EVENT); + useAvroEventFormat = context.getBoolean(KafkaSourceConstants.AVRO_EVENT, + KafkaSourceConstants.DEFAULT_AVRO_EVENT); if (log.isDebugEnabled()) { log.debug(KafkaSourceConstants.AVRO_EVENT + " set to: {}", useAvroEventFormat); @@ -337,7 +347,6 @@ public class KafkaSource extends AbstractPollableSource } } - // We can remove this once the properties are officially deprecated private void translateOldProperties(Context ctx) { // topic @@ -358,16 +367,18 @@ public class KafkaSource extends AbstractPollableSource } } - private void setConsumerProps(Context ctx, String bootStrapServers) { - String groupId = ctx.getString(KafkaSourceConstants.KAFKA_CONSUMER_PREFIX + ConsumerConfig.GROUP_ID_CONFIG); + String groupId = ctx.getString( + KafkaSourceConstants.KAFKA_CONSUMER_PREFIX + ConsumerConfig.GROUP_ID_CONFIG); if ((groupId == null || groupId.isEmpty()) && - kafkaProps.getProperty(ConsumerConfig.GROUP_ID_CONFIG) == null) { - groupId = KafkaSourceConstants.DEFAULT_GROUP_ID; - log.info("Group ID was not specified. Using " + groupId + " as the group id."); + kafkaProps.getProperty(ConsumerConfig.GROUP_ID_CONFIG) == null) { + groupId = KafkaSourceConstants.DEFAULT_GROUP_ID; + log.info("Group ID was not specified. Using " + groupId + " as the group id."); } - kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaSourceConstants.DEFAULT_KEY_DESERIALIZER); - kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaSourceConstants.DEFAULT_VALUE_DESERIALIZER); + kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, + KafkaSourceConstants.DEFAULT_KEY_DESERIALIZER); + kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, + KafkaSourceConstants.DEFAULT_VALUE_DESERIALIZER); //Defaults overridden based on config kafkaProps.putAll(ctx.getSubProperties(KafkaSourceConstants.KAFKA_CONSUMER_PREFIX)); //These always take precedence over config @@ -375,7 +386,8 @@ public class KafkaSource extends AbstractPollableSource if (groupId != null) { kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); } - kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, KafkaSourceConstants.DEFAULT_AUTO_COMMIT); + kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, + KafkaSourceConstants.DEFAULT_AUTO_COMMIT); log.info(kafkaProps.toString()); } @@ -426,7 +438,6 @@ public class KafkaSource extends AbstractPollableSource } } - class SourceRebalanceListener implements ConsumerRebalanceListener { private static final Logger log = LoggerFactory.getLogger(SourceRebalanceListener.class); private AtomicBoolean rebalanceFlag; http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java index 9f20f61..1f255f9 100644 --- a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java +++ b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java @@ -22,9 +22,12 @@ public class KafkaSourceConstants { public static final String KAFKA_PREFIX = "kafka."; public static final String KAFKA_CONSUMER_PREFIX = KAFKA_PREFIX + "consumer."; - public static final String DEFAULT_KEY_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer"; - public static final String DEFAULT_VALUE_DESERIALIZER = "org.apache.kafka.common.serialization.ByteArrayDeserializer"; - public static final String BOOTSTRAP_SERVERS = KAFKA_PREFIX + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG; + public static final String DEFAULT_KEY_DESERIALIZER = + "org.apache.kafka.common.serialization.StringDeserializer"; + public static final String DEFAULT_VALUE_DESERIALIZER = + "org.apache.kafka.common.serialization.ByteArrayDeserializer"; + public static final String BOOTSTRAP_SERVERS = + KAFKA_PREFIX + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG; public static final String TOPICS = KAFKA_PREFIX + "topics"; public static final String TOPICS_REGEX = TOPICS + "." + "regex"; public static final String DEFAULT_AUTO_COMMIT = "false"; http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java index 8128df4..1409f25 100644 --- a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java +++ b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java @@ -19,6 +19,20 @@ package org.apache.flume.source.taildir; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Table; +import com.google.gson.stream.JsonReader; +import org.apache.flume.Event; +import org.apache.flume.FlumeException; +import org.apache.flume.annotations.InterfaceAudience; +import org.apache.flume.annotations.InterfaceStability; +import org.apache.flume.client.avro.ReliableEventReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.File; import java.io.FileNotFoundException; import java.io.FileReader; @@ -29,21 +43,6 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; -import org.apache.flume.Event; -import org.apache.flume.FlumeException; -import org.apache.flume.annotations.InterfaceAudience; -import org.apache.flume.annotations.InterfaceStability; -import org.apache.flume.client.avro.ReliableEventReader; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Table; -import com.google.gson.stream.JsonReader; - @InterfaceAudience.Private @InterfaceStability.Evolving public class ReliableTaildirEventReader implements ReliableEventReader { @@ -111,15 +110,15 @@ public class ReliableTaildirEventReader implements ReliableEventReader { jr.beginObject(); while (jr.hasNext()) { switch (jr.nextName()) { - case "inode": - inode = jr.nextLong(); - break; - case "pos": - pos = jr.nextLong(); - break; - case "file": - path = jr.nextString(); - break; + case "inode": + inode = jr.nextLong(); + break; + case "pos": + pos = jr.nextLong(); + break; + case "file": + path = jr.nextString(); + break; } } jr.endObject(); @@ -238,7 +237,7 @@ public class ReliableTaildirEventReader implements ReliableEventReader { if (tf == null || !tf.getPath().equals(f.getAbsolutePath())) { long startPos = skipToEnd ? f.length() : 0; tf = openFile(f, headers, inode, startPos); - } else{ + } else { boolean updated = tf.getLastUpdated() < f.lastModified(); if (updated) { if (tf.getRaf() == null) { @@ -320,7 +319,8 @@ public class ReliableTaildirEventReader implements ReliableEventReader { } public ReliableTaildirEventReader build() throws IOException { - return new ReliableTaildirEventReader(filePaths, headerTable, positionFilePath, skipToEnd, addByteOffset, cachePatternMatching); + return new ReliableTaildirEventReader(filePaths, headerTable, positionFilePath, skipToEnd, + addByteOffset, cachePatternMatching); } }
