http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java index 9a48841..741f01e 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java @@ -63,8 +63,7 @@ public class HDFSEventSink extends AbstractSink implements Configurable { public void run(String filePath); } - private static final Logger LOG = LoggerFactory - .getLogger(HDFSEventSink.class); + private static final Logger LOG = LoggerFactory.getLogger(HDFSEventSink.class); private static String DIRECTORY_DELIMITER = System.getProperty("file.separator"); @@ -98,7 +97,6 @@ public class HDFSEventSink extends AbstractSink implements Configurable { private static final int defaultThreadPoolSize = 10; private static final int defaultRollTimerPoolSize = 1; - private final HDFSWriterFactory writerFactory; private WriterLinkedHashMap sfWriters; @@ -217,23 +215,21 @@ public class HDFSEventSink extends AbstractSink implements Configurable { String kerbKeytab = context.getString("hdfs.kerberosKeytab"); String proxyUser = context.getString("hdfs.proxyUser"); tryCount = context.getInteger("hdfs.closeTries", defaultTryCount); - if(tryCount <= 0) { + if (tryCount <= 0) { LOG.warn("Retry count value : " + tryCount + " is not " + - "valid. The sink will try to close the file until the file " + - "is eventually closed."); + "valid. The sink will try to close the file until the file " + + "is eventually closed."); tryCount = defaultTryCount; } - retryInterval = context.getLong("hdfs.retryInterval", - defaultRetryInterval); - if(retryInterval <= 0) { + retryInterval = context.getLong("hdfs.retryInterval", defaultRetryInterval); + if (retryInterval <= 0) { LOG.warn("Retry Interval value: " + retryInterval + " is not " + - "valid. If the first close of a file fails, " + - "it may remain open and will not be renamed."); + "valid. If the first close of a file fails, " + + "it may remain open and will not be renamed."); tryCount = 1; } - Preconditions.checkArgument(batchSize > 0, - "batchSize must be greater than 0"); + Preconditions.checkArgument(batchSize > 0, "batchSize must be greater than 0"); if (codecName == null) { codeC = null; compType = CompressionType.NONE; @@ -245,14 +241,13 @@ public class HDFSEventSink extends AbstractSink implements Configurable { // Do not allow user to set fileType DataStream with codeC together // To prevent output file with compress extension (like .snappy) - if(fileType.equalsIgnoreCase(HDFSWriterFactory.DataStreamType) - && codecName != null) { + if (fileType.equalsIgnoreCase(HDFSWriterFactory.DataStreamType) && codecName != null) { throw new IllegalArgumentException("fileType: " + fileType + " which does NOT support compressed output. Please don't set codeC" + " or change the fileType if compressed output is desired."); } - if(fileType.equalsIgnoreCase(HDFSWriterFactory.CompStreamType)) { + if (fileType.equalsIgnoreCase(HDFSWriterFactory.CompStreamType)) { Preconditions.checkNotNull(codeC, "It's essential to set compress codec" + " when fileType is: " + fileType); } @@ -261,18 +256,15 @@ public class HDFSEventSink extends AbstractSink implements Configurable { this.privExecutor = FlumeAuthenticationUtil.getAuthenticator( kerbConfPrincipal, kerbKeytab).proxyAs(proxyUser); - - - needRounding = context.getBoolean("hdfs.round", false); - if(needRounding) { + if (needRounding) { String unit = context.getString("hdfs.roundUnit", "second"); if (unit.equalsIgnoreCase("hour")) { this.roundUnit = Calendar.HOUR_OF_DAY; } else if (unit.equalsIgnoreCase("minute")) { this.roundUnit = Calendar.MINUTE; - } else if (unit.equalsIgnoreCase("second")){ + } else if (unit.equalsIgnoreCase("second")) { this.roundUnit = Calendar.SECOND; } else { LOG.warn("Rounding unit is not valid, please set one of" + @@ -280,11 +272,11 @@ public class HDFSEventSink extends AbstractSink implements Configurable { needRounding = false; } this.roundValue = context.getInteger("hdfs.roundValue", 1); - if(roundUnit == Calendar.SECOND || roundUnit == Calendar.MINUTE){ + if (roundUnit == Calendar.SECOND || roundUnit == Calendar.MINUTE) { Preconditions.checkArgument(roundValue > 0 && roundValue <= 60, "Round value" + "must be > 0 and <= 60"); - } else if (roundUnit == Calendar.HOUR_OF_DAY){ + } else if (roundUnit == Calendar.HOUR_OF_DAY) { Preconditions.checkArgument(roundValue > 0 && roundValue <= 24, "Round value" + "must be > 0 and <= 24"); @@ -292,7 +284,7 @@ public class HDFSEventSink extends AbstractSink implements Configurable { } this.useLocalTime = context.getBoolean("hdfs.useLocalTimeStamp", false); - if(useLocalTime) { + if (useLocalTime) { clock = new SystemClock(); } @@ -301,16 +293,13 @@ public class HDFSEventSink extends AbstractSink implements Configurable { } } - private static boolean codecMatches(Class<? extends CompressionCodec> cls, - String codecName) { + private static boolean codecMatches(Class<? extends CompressionCodec> cls, String codecName) { String simpleName = cls.getSimpleName(); - if (cls.getName().equals(codecName) - || simpleName.equalsIgnoreCase(codecName)) { + if (cls.getName().equals(codecName) || simpleName.equalsIgnoreCase(codecName)) { return true; } if (simpleName.endsWith("Codec")) { - String prefix = simpleName.substring(0, - simpleName.length() - "Codec".length()); + String prefix = simpleName.substring(0, simpleName.length() - "Codec".length()); if (prefix.equalsIgnoreCase(codecName)) { return true; } @@ -321,8 +310,7 @@ public class HDFSEventSink extends AbstractSink implements Configurable { @VisibleForTesting static CompressionCodec getCodec(String codecName) { Configuration conf = new Configuration(); - List<Class<? extends CompressionCodec>> codecs = CompressionCodecFactory - .getCodecClasses(conf); + List<Class<? extends CompressionCodec>> codecs = CompressionCodecFactory.getCodecClasses(conf); // Wish we could base this on DefaultCodec but appears not all codec's // extend DefaultCodec(Lzo) CompressionCodec codec = null; @@ -380,7 +368,7 @@ public class HDFSEventSink extends AbstractSink implements Configurable { String realPath = BucketPath.escapeString(filePath, event.getHeaders(), timeZone, needRounding, roundUnit, roundValue, useLocalTime); String realName = BucketPath.escapeString(fileName, event.getHeaders(), - timeZone, needRounding, roundUnit, roundValue, useLocalTime); + timeZone, needRounding, roundUnit, roundValue, useLocalTime); String lookupPath = realPath + DIRECTORY_DELIMITER + realName; BucketWriter bucketWriter; @@ -418,7 +406,7 @@ public class HDFSEventSink extends AbstractSink implements Configurable { bucketWriter.append(event); } catch (BucketClosedException ex) { LOG.info("Bucket was closed while trying to append, " + - "reinitializing bucket and writing event."); + "reinitializing bucket and writing event."); hdfsWriter = writerFactory.getWriter(fileType); bucketWriter = initializeBucketWriter(realPath, realName, lookupPath, hdfsWriter, closeCallback); @@ -468,16 +456,16 @@ public class HDFSEventSink extends AbstractSink implements Configurable { } private BucketWriter initializeBucketWriter(String realPath, - String realName, String lookupPath, HDFSWriter hdfsWriter, - WriterCallback closeCallback) { + String realName, String lookupPath, HDFSWriter hdfsWriter, + WriterCallback closeCallback) { BucketWriter bucketWriter = new BucketWriter(rollInterval, - rollSize, rollCount, - batchSize, context, realPath, realName, inUsePrefix, inUseSuffix, - suffix, codeC, compType, hdfsWriter, timedRollerPool, - privExecutor, sinkCounter, idleTimeout, closeCallback, - lookupPath, callTimeout, callTimeoutPool, retryInterval, - tryCount); - if(mockFs != null) { + rollSize, rollCount, + batchSize, context, realPath, realName, inUsePrefix, inUseSuffix, + suffix, codeC, compType, hdfsWriter, timedRollerPool, + privExecutor, sinkCounter, idleTimeout, closeCallback, + lookupPath, callTimeout, callTimeoutPool, retryInterval, + tryCount); + if (mockFs != null) { bucketWriter.setFileSystem(mockFs); bucketWriter.setMockStream(mockWriter); } @@ -504,7 +492,7 @@ public class HDFSEventSink extends AbstractSink implements Configurable { } // shut down all our thread pools - ExecutorService toShutdown[] = {callTimeoutPool, timedRollerPool}; + ExecutorService[] toShutdown = { callTimeoutPool, timedRollerPool }; for (ExecutorService execService : toShutdown) { execService.shutdown(); try {
http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java index a261cce..ba8b30d 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java @@ -81,16 +81,15 @@ public class HDFSSequenceFile extends AbstractHDFSWriter { protected void open(Path dstPath, CompressionCodec codeC, CompressionType compType, Configuration conf, FileSystem hdfs) throws IOException { - if(useRawLocalFileSystem) { - if(hdfs instanceof LocalFileSystem) { + if (useRawLocalFileSystem) { + if (hdfs instanceof LocalFileSystem) { hdfs = ((LocalFileSystem)hdfs).getRaw(); } else { logger.warn("useRawLocalFileSystem is set to true but file system " + "is not of type LocalFileSystem: " + hdfs.getClass().getName()); } } - if (conf.getBoolean("hdfs.append.support", false) == true && hdfs.isFile - (dstPath)) { + if (conf.getBoolean("hdfs.append.support", false) == true && hdfs.isFile(dstPath)) { outStream = hdfs.append(dstPath); } else { outStream = hdfs.create(dstPath); http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/KerberosUser.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/KerberosUser.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/KerberosUser.java index 516988e..43297e2 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/KerberosUser.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/KerberosUser.java @@ -46,7 +46,9 @@ public class KerberosUser { return false; } final KerberosUser other = (KerberosUser) obj; - if ((this.principal == null) ? (other.principal != null) : !this.principal.equals(other.principal)) { + if ((this.principal == null) ? + (other.principal != null) : + !this.principal.equals(other.principal)) { return false; } if ((this.keyTab == null) ? (other.keyTab != null) : !this.keyTab.equals(other.keyTab)) { http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/SequenceFileSerializerType.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/SequenceFileSerializerType.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/SequenceFileSerializerType.java index 4351488..2ad7689 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/SequenceFileSerializerType.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/SequenceFileSerializerType.java @@ -25,8 +25,7 @@ public enum SequenceFileSerializerType { private final Class<? extends SequenceFileSerializer.Builder> builderClass; - SequenceFileSerializerType( - Class<? extends SequenceFileSerializer.Builder> builderClass) { + SequenceFileSerializerType(Class<? extends SequenceFileSerializer.Builder> builderClass) { this.builderClass = builderClass; } http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveDelimitedTextSerializer.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveDelimitedTextSerializer.java b/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveDelimitedTextSerializer.java index 4d70aaa..59520e7 100644 --- a/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveDelimitedTextSerializer.java +++ b/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveDelimitedTextSerializer.java @@ -18,7 +18,6 @@ package org.apache.flume.sink.hive; - import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.hive.hcatalog.streaming.DelimitedInputWriter; @@ -60,12 +59,11 @@ public class HiveDelimitedTextSerializer implements HiveEventSerializer { @Override public RecordWriter createRecordWriter(HiveEndPoint endPoint) - throws StreamingException, IOException, ClassNotFoundException { + throws StreamingException, IOException, ClassNotFoundException { if (serdeSeparator == null) { return new DelimitedInputWriter(fieldToColMapping, delimiter, endPoint); } - return new DelimitedInputWriter(fieldToColMapping, delimiter, endPoint, null - , serdeSeparator); + return new DelimitedInputWriter(fieldToColMapping, delimiter, endPoint, null, serdeSeparator); } @Override @@ -90,8 +88,8 @@ public class HiveDelimitedTextSerializer implements HiveEventSerializer { return null; } if (delimiter.charAt(0) == '"' && - delimiter.charAt(delimiter.length()-1) == '"') { - return delimiter.substring(1,delimiter.length()-1); + delimiter.charAt(delimiter.length() - 1) == '"') { + return delimiter.substring(1,delimiter.length() - 1); } return delimiter; } @@ -105,9 +103,9 @@ public class HiveDelimitedTextSerializer implements HiveEventSerializer { return separatorStr.charAt(0); } if (separatorStr.length() == 3 && - separatorStr.charAt(2) == '\'' && - separatorStr.charAt(separatorStr.length()-1) == '\'') { - return separatorStr.charAt(1); + separatorStr.charAt(2) == '\'' && + separatorStr.charAt(separatorStr.length() - 1) == '\'') { + return separatorStr.charAt(1); } throw new IllegalArgumentException("serializer.serdeSeparator spec is invalid " + http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveEventSerializer.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveEventSerializer.java b/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveEventSerializer.java index 386484c..7ed2c82 100644 --- a/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveEventSerializer.java +++ b/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveEventSerializer.java @@ -18,7 +18,6 @@ package org.apache.flume.sink.hive; - import org.apache.flume.Event; import org.apache.flume.conf.Configurable; import org.apache.hive.hcatalog.streaming.HiveEndPoint; http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveSink.java b/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveSink.java index d93bca3..cc5cdca 100644 --- a/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveSink.java +++ b/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveSink.java @@ -32,7 +32,7 @@ import org.apache.flume.conf.Configurable; import org.apache.flume.formatter.output.BucketPath; import org.apache.flume.instrumentation.SinkCounter; import org.apache.flume.sink.AbstractSink; -import org.apache.hive.hcatalog.streaming.*; +import org.apache.hive.hcatalog.streaming.HiveEndPoint; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,8 +52,7 @@ import java.util.concurrent.atomic.AtomicBoolean; public class HiveSink extends AbstractSink implements Configurable { - private static final Logger LOG = LoggerFactory - .getLogger(HiveSink.class); + private static final Logger LOG = LoggerFactory.getLogger(HiveSink.class); private static final int DEFAULT_MAXOPENCONNECTIONS = 500; private static final int DEFAULT_TXNSPERBATCH = 100; @@ -62,7 +61,6 @@ public class HiveSink extends AbstractSink implements Configurable { private static final int DEFAULT_IDLETIMEOUT = 0; private static final int DEFAULT_HEARTBEATINTERVAL = 240; // seconds - private Map<HiveEndPoint, HiveWriter> allWriters; private SinkCounter sinkCounter; @@ -162,7 +160,8 @@ public class HiveSink extends AbstractSink implements Configurable { + DEFAULT_HEARTBEATINTERVAL); heartBeatInterval = DEFAULT_HEARTBEATINTERVAL; } - maxOpenConnections = context.getInteger(Config.MAX_OPEN_CONNECTIONS, DEFAULT_MAXOPENCONNECTIONS); + maxOpenConnections = context.getInteger(Config.MAX_OPEN_CONNECTIONS, + DEFAULT_MAXOPENCONNECTIONS); autoCreatePartitions = context.getBoolean("autoCreatePartitions", true); // Timestamp processing @@ -177,7 +176,7 @@ public class HiveSink extends AbstractSink implements Configurable { this.roundUnit = Calendar.HOUR_OF_DAY; } else if (unit.equalsIgnoreCase(Config.MINUTE)) { this.roundUnit = Calendar.MINUTE; - } else if (unit.equalsIgnoreCase(Config.SECOND)){ + } else if (unit.equalsIgnoreCase(Config.SECOND)) { this.roundUnit = Calendar.SECOND; } else { LOG.warn(getName() + ". Rounding unit is not valid, please set one of " + @@ -185,10 +184,10 @@ public class HiveSink extends AbstractSink implements Configurable { needRounding = false; } this.roundValue = context.getInteger(Config.ROUND_VALUE, 1); - if (roundUnit == Calendar.SECOND || roundUnit == Calendar.MINUTE){ + if (roundUnit == Calendar.SECOND || roundUnit == Calendar.MINUTE) { Preconditions.checkArgument(roundValue > 0 && roundValue <= 60, "Round value must be > 0 and <= 60"); - } else if (roundUnit == Calendar.HOUR_OF_DAY){ + } else if (roundUnit == Calendar.HOUR_OF_DAY) { Preconditions.checkArgument(roundValue > 0 && roundValue <= 24, "Round value must be > 0 and <= 24"); } @@ -215,8 +214,8 @@ public class HiveSink extends AbstractSink implements Configurable { return sinkCounter; } private HiveEventSerializer createSerializer(String serializerName) { - if(serializerName.compareToIgnoreCase(HiveDelimitedTextSerializer.ALIAS) == 0 || - serializerName.compareTo(HiveDelimitedTextSerializer.class.getName()) == 0) { + if (serializerName.compareToIgnoreCase(HiveDelimitedTextSerializer.ALIAS) == 0 || + serializerName.compareTo(HiveDelimitedTextSerializer.class.getName()) == 0) { return new HiveDelimitedTextSerializer(); } else if (serializerName.compareToIgnoreCase(HiveJsonSerializer.ALIAS) == 0 || serializerName.compareTo(HiveJsonSerializer.class.getName()) == 0) { @@ -345,7 +344,7 @@ public class HiveSink extends AbstractSink implements Configurable { callTimeout, callTimeoutPool, proxyUser, serializer, sinkCounter); sinkCounter.incrementConnectionCreatedCount(); - if (allWriters.size() > maxOpenConnections){ + if (allWriters.size() > maxOpenConnections) { int retired = closeIdleWriters(); if (retired == 0) { closeEldestWriter(); @@ -353,8 +352,7 @@ public class HiveSink extends AbstractSink implements Configurable { } allWriters.put(endPoint, writer); activeWriters.put(endPoint, writer); - } - else { + } else { if (activeWriters.get(endPoint) == null) { activeWriters.put(endPoint,writer); } @@ -425,7 +423,7 @@ public class HiveSink extends AbstractSink implements Configurable { } } //2) Retire them - for(HiveEndPoint ep : retirees) { + for (HiveEndPoint ep : retirees) { sinkCounter.incrementConnectionClosedCount(); LOG.info(getName() + ": Closing idle Writer to Hive end point : {}", ep); allWriters.remove(ep).close(); @@ -440,7 +438,7 @@ public class HiveSink extends AbstractSink implements Configurable { private void closeAllWriters() throws InterruptedException { //1) Retire writers for (Entry<HiveEndPoint,HiveWriter> entry : allWriters.entrySet()) { - entry.getValue().close(); + entry.getValue().close(); } //2) Clear cache @@ -453,7 +451,7 @@ public class HiveSink extends AbstractSink implements Configurable { */ private void abortAllWriters() throws InterruptedException { for (Entry<HiveEndPoint,HiveWriter> entry : allWriters.entrySet()) { - entry.getValue().abort(); + entry.getValue().abort(); } } http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java b/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java index ec30c98..7106696 100644 --- a/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java +++ b/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -18,6 +18,18 @@ package org.apache.flume.sink.hive; +import org.apache.flume.Event; +import org.apache.flume.instrumentation.SinkCounter; +import org.apache.hive.hcatalog.streaming.HiveEndPoint; +import org.apache.hive.hcatalog.streaming.RecordWriter; +import org.apache.hive.hcatalog.streaming.SerializationError; +import org.apache.hive.hcatalog.streaming.StreamingConnection; +import org.apache.hive.hcatalog.streaming.StreamingException; +import org.apache.hive.hcatalog.streaming.StreamingIOFailure; +import org.apache.hive.hcatalog.streaming.TransactionBatch; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.IOException; import java.util.ArrayList; import java.util.concurrent.Callable; @@ -27,24 +39,12 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import org.apache.hive.hcatalog.streaming.*; - -import org.apache.flume.Event; - -import org.apache.flume.instrumentation.SinkCounter; - - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - /** * Internal API intended for HiveSink use. */ class HiveWriter { - private static final Logger LOG = LoggerFactory - .getLogger(HiveWriter.class); + private static final Logger LOG = LoggerFactory.getLogger(HiveWriter.class); private final HiveEndPoint endPoint; private HiveEventSerializer serializer; @@ -76,7 +76,7 @@ class HiveWriter { boolean autoCreatePartitions, long callTimeout, ExecutorService callTimeoutPool, String hiveUser, HiveEventSerializer serializer, SinkCounter sinkCounter) - throws ConnectException, InterruptedException { + throws ConnectException, InterruptedException { try { this.autoCreatePartitions = autoCreatePartitions; this.sinkCounter = sinkCounter; @@ -130,13 +130,13 @@ class HiveWriter { * @throws InterruptedException */ public synchronized void write(final Event event) - throws WriteException, InterruptedException { + throws WriteException, InterruptedException { if (closed) { throw new IllegalStateException("Writer closed. Cannot write to : " + endPoint); } batch.add(event); - if(batch.size()== writeBatchSz) { + if (batch.size() == writeBatchSz) { // write the event writeEventBatchToSerializer(); } @@ -147,7 +147,7 @@ class HiveWriter { } private void writeEventBatchToSerializer() - throws InterruptedException, WriteException { + throws InterruptedException, WriteException { try { timedCall(new CallRunner1<Void>() { @Override @@ -180,15 +180,15 @@ class HiveWriter { * new TxnBatch if current Txn batch is exhausted */ public void flush(boolean rollToNext) - throws CommitException, TxnBatchException, TxnFailure, InterruptedException, - WriteException { - if(!batch.isEmpty()) { + throws CommitException, TxnBatchException, TxnFailure, InterruptedException, + WriteException { + if (!batch.isEmpty()) { writeEventBatchToSerializer(); batch.clear(); } //0 Heart beat on TxnBatch - if(hearbeatNeeded) { + if (hearbeatNeeded) { hearbeatNeeded = false; heartBeat(); } @@ -197,16 +197,16 @@ class HiveWriter { try { //1 commit txn & close batch if needed commitTxn(); - if(txnBatch.remainingTransactions() == 0) { + if (txnBatch.remainingTransactions() == 0) { closeTxnBatch(); txnBatch = null; - if(rollToNext) { + if (rollToNext) { txnBatch = nextTxnBatch(recordWriter); } } //2 roll to next Txn - if(rollToNext) { + if (rollToNext) { LOG.debug("Switching to next Txn for {}", endPoint); txnBatch.beginNextTransaction(); // does not block } @@ -219,7 +219,7 @@ class HiveWriter { * Aborts the current Txn * @throws InterruptedException */ - public void abort() throws InterruptedException { + public void abort() throws InterruptedException { batch.clear(); abortTxn(); } @@ -227,7 +227,7 @@ class HiveWriter { /** Queues up a heartbeat request on the current and remaining txns using the * heartbeatThdPool and returns immediately */ - public void heartBeat() throws InterruptedException { + public void heartBeat() throws InterruptedException { // 1) schedule the heartbeat on one thread in pool try { timedCall(new CallRunner1<Void>() { @@ -261,43 +261,43 @@ class HiveWriter { private void abortRemainingTxns() throws InterruptedException { - try { - if ( !isClosed(txnBatch.getCurrentTransactionState()) ) { - abortCurrTxnHelper(); - } + try { + if (!isClosed(txnBatch.getCurrentTransactionState())) { + abortCurrTxnHelper(); + } - // recursively abort remaining txns - if(txnBatch.remainingTransactions()>0) { - timedCall( - new CallRunner1<Void>() { - @Override - public Void call() throws StreamingException, InterruptedException { - txnBatch.beginNextTransaction(); - return null; - } - }); - abortRemainingTxns(); - } - } catch (StreamingException e) { - LOG.warn("Error when aborting remaining transactions in batch " + txnBatch, e); - return; - } catch (TimeoutException e) { - LOG.warn("Timed out when aborting remaining transactions in batch " + txnBatch, e); - return; + // recursively abort remaining txns + if (txnBatch.remainingTransactions() > 0) { + timedCall( + new CallRunner1<Void>() { + @Override + public Void call() throws StreamingException, InterruptedException { + txnBatch.beginNextTransaction(); + return null; + } + }); + abortRemainingTxns(); } + } catch (StreamingException e) { + LOG.warn("Error when aborting remaining transactions in batch " + txnBatch, e); + return; + } catch (TimeoutException e) { + LOG.warn("Timed out when aborting remaining transactions in batch " + txnBatch, e); + return; + } } private void abortCurrTxnHelper() throws TimeoutException, InterruptedException { try { timedCall( - new CallRunner1<Void>() { - @Override - public Void call() throws StreamingException, InterruptedException { - txnBatch.abort(); - LOG.info("Aborted txn " + txnBatch.getCurrentTxnId()); - return null; - } - } + new CallRunner1<Void>() { + @Override + public Void call() throws StreamingException, InterruptedException { + txnBatch.abort(); + LOG.info("Aborted txn " + txnBatch.getCurrentTxnId()); + return null; + } + } ); } catch (StreamingException e) { LOG.warn("Unable to abort transaction " + txnBatch.getCurrentTxnId(), e); @@ -306,10 +306,12 @@ class HiveWriter { } private boolean isClosed(TransactionBatch.TxnState txnState) { - if(txnState == TransactionBatch.TxnState.COMMITTED) + if (txnState == TransactionBatch.TxnState.COMMITTED) { return true; - if(txnState == TransactionBatch.TxnState.ABORTED) + } + if (txnState == TransactionBatch.TxnState.ABORTED) { return true; + } return false; } @@ -360,7 +362,8 @@ class HiveWriter { } catch (InterruptedException e) { throw e; } catch (TimeoutException e) { - LOG.warn("Timeout while aborting Txn " + txnBatch.getCurrentTxnId() + " on EndPoint: " + endPoint, e); + LOG.warn("Timeout while aborting Txn " + txnBatch.getCurrentTxnId() + + " on EndPoint: " + endPoint, e); } catch (Exception e) { LOG.warn("Error aborting Txn " + txnBatch.getCurrentTxnId() + " on EndPoint: " + endPoint, e); // Suppressing exceptions as we don't care for errors on abort @@ -368,9 +371,9 @@ class HiveWriter { } private StreamingConnection newConnection(final String proxyUser) - throws InterruptedException, ConnectException { + throws InterruptedException, ConnectException { try { - return timedCall(new CallRunner1<StreamingConnection>() { + return timedCall(new CallRunner1<StreamingConnection>() { @Override public StreamingConnection call() throws InterruptedException, StreamingException { return endPoint.newConnection(autoCreatePartitions); // could block @@ -382,7 +385,7 @@ class HiveWriter { } private TransactionBatch nextTxnBatch(final RecordWriter recordWriter) - throws InterruptedException, TxnBatchException { + throws InterruptedException, TxnBatchException { LOG.debug("Fetching new Txn Batch for {}", endPoint); TransactionBatch batch = null; try { @@ -418,7 +421,7 @@ class HiveWriter { } private <T> T timedCall(final CallRunner1<T> callRunner) - throws TimeoutException, InterruptedException, StreamingException { + throws TimeoutException, InterruptedException, StreamingException { Future<T> future = callTimeoutPool.submit(new Callable<T>() { @Override public T call() throws StreamingException, InterruptedException, Failure { @@ -439,7 +442,7 @@ class HiveWriter { } catch (ExecutionException e1) { sinkCounter.incrementConnectionFailedCount(); Throwable cause = e1.getCause(); - if (cause instanceof IOException ) { + if (cause instanceof IOException) { throw new StreamingException("I/O Failure", (IOException) cause); } else if (cause instanceof StreamingException) { throw (StreamingException) cause; @@ -468,12 +471,10 @@ class HiveWriter { T call() throws Exception; } - private interface CallRunner1<T> { T call() throws StreamingException, InterruptedException, Failure; } - public static class Failure extends Exception { public Failure(String msg, Throwable cause) { super(msg, cause); @@ -488,7 +489,7 @@ class HiveWriter { public static class CommitException extends Failure { public CommitException(HiveEndPoint endPoint, Long txnID, Throwable cause) { - super("Commit of Txn " + txnID + " failed on EndPoint: " + endPoint, cause); + super("Commit of Txn " + txnID + " failed on EndPoint: " + endPoint, cause); } } http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sinks/flume-irc-sink/src/main/java/org/apache/flume/sink/irc/IRCSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-irc-sink/src/main/java/org/apache/flume/sink/irc/IRCSink.java b/flume-ng-sinks/flume-irc-sink/src/main/java/org/apache/flume/sink/irc/IRCSink.java index 40657b4..52bbfc8 100644 --- a/flume-ng-sinks/flume-irc-sink/src/main/java/org/apache/flume/sink/irc/IRCSink.java +++ b/flume-ng-sinks/flume-irc-sink/src/main/java/org/apache/flume/sink/irc/IRCSink.java @@ -60,7 +60,7 @@ public class IRCSink extends AbstractSink implements Configurable { private CounterGroup counterGroup; - static public class IRCConnectionListener implements IRCEventListener { + public static class IRCConnectionListener implements IRCEventListener { public void onRegistered() { } @@ -214,7 +214,7 @@ public class IRCSink extends AbstractSink implements Configurable { if (splitLines) { String[] lines = body.split(splitChars); - for(String line: lines) { + for (String line: lines) { connection.doPrivmsg(IRC_CHANNEL_PREFIX + this.chan, line); } } else { http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchIndexRequestBuilderFactory.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchIndexRequestBuilderFactory.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchIndexRequestBuilderFactory.java index 9996142..754155c 100644 --- a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchIndexRequestBuilderFactory.java +++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchIndexRequestBuilderFactory.java @@ -49,8 +49,7 @@ public abstract class AbstractElasticSearchIndexRequestBuilderFactory * Constructor for subclasses * @param fastDateFormat {@link FastDateFormat} to use for index names */ - protected AbstractElasticSearchIndexRequestBuilderFactory( - FastDateFormat fastDateFormat) { + protected AbstractElasticSearchIndexRequestBuilderFactory(FastDateFormat fastDateFormat) { this.fastDateFormat = fastDateFormat; } @@ -94,11 +93,11 @@ public abstract class AbstractElasticSearchIndexRequestBuilderFactory /** * Gets the name of the index to use for an index request - * @return index name of the form 'indexPrefix-formattedTimestamp' * @param indexPrefix * Prefix of index name to use -- as configured on the sink * @param timestamp * timestamp (millis) to format / use + * @return index name of the form 'indexPrefix-formattedTimestamp' */ protected String getIndexName(String indexPrefix, long timestamp) { return new StringBuilder(indexPrefix).append('-') http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchIndexRequestBuilderFactory.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchIndexRequestBuilderFactory.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchIndexRequestBuilderFactory.java index 1ca227a..f76308c 100644 --- a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchIndexRequestBuilderFactory.java +++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchIndexRequestBuilderFactory.java @@ -40,7 +40,6 @@ public interface ElasticSearchIndexRequestBuilderFactory extends Configurable, TimeZone.getTimeZone("Etc/UTC")); /** - * @return prepared ElasticSearch {@link IndexRequestBuilder} instance * @param client * ElasticSearch {@link Client} to prepare index from * @param indexPrefix @@ -49,6 +48,7 @@ public interface ElasticSearchIndexRequestBuilderFactory extends Configurable, * Index type to use -- as configured on the sink * @param event * Flume event to serialize and add to index request + * @return prepared ElasticSearch {@link IndexRequestBuilder} instance * @throws IOException * If an error occurs e.g. during serialization */ http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java index 1d9dfce..ebafb9f 100644 --- a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java +++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java @@ -54,6 +54,7 @@ import com.google.common.base.Throwables; import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; import java.util.regex.Pattern; + import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.CLIENT_PREFIX; import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.CLIENT_TYPE; import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.DEFAULT_CLIENT_TYPE; http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/EventSerializerIndexRequestBuilderFactory.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/EventSerializerIndexRequestBuilderFactory.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/EventSerializerIndexRequestBuilderFactory.java index c71b2e5..d6cca50 100644 --- a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/EventSerializerIndexRequestBuilderFactory.java +++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/EventSerializerIndexRequestBuilderFactory.java @@ -33,7 +33,7 @@ import org.elasticsearch.common.io.BytesStream; * {@link ElasticSearchEventSerializer} instance configured on the sink. */ public class EventSerializerIndexRequestBuilderFactory - extends AbstractElasticSearchIndexRequestBuilderFactory { + extends AbstractElasticSearchIndexRequestBuilderFactory { protected final ElasticSearchEventSerializer serializer; http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchClientFactory.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchClientFactory.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchClientFactory.java index 873157a..cb34394 100644 --- a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchClientFactory.java +++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchClientFactory.java @@ -63,8 +63,10 @@ public class ElasticSearchClientFactory { * * @return Local elastic search instance client */ - public ElasticSearchClient getLocalClient(String clientType, ElasticSearchEventSerializer serializer, - ElasticSearchIndexRequestBuilderFactory indexBuilder) throws NoSuchClientTypeException { + public ElasticSearchClient getLocalClient(String clientType, + ElasticSearchEventSerializer serializer, + ElasticSearchIndexRequestBuilderFactory indexBuilder) + throws NoSuchClientTypeException { if (clientType.equalsIgnoreCase(TransportClient) && serializer != null) { return new ElasticSearchTransportClient(serializer); } else if (clientType.equalsIgnoreCase(TransportClient) && indexBuilder != null) { http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchRestClient.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchRestClient.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchRestClient.java index 0d1c37f..e51efe2 100644 --- a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchRestClient.java +++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchRestClient.java @@ -25,12 +25,6 @@ import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; import org.apache.flume.sink.elasticsearch.ElasticSearchEventSerializer; import org.apache.flume.sink.elasticsearch.IndexNameBuilder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; import org.apache.http.HttpResponse; import org.apache.http.HttpStatus; import org.apache.http.client.HttpClient; @@ -39,6 +33,12 @@ import org.apache.http.entity.StringEntity; import org.apache.http.impl.client.DefaultHttpClient; import org.apache.http.util.EntityUtils; import org.elasticsearch.common.bytes.BytesReference; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; /** * Rest ElasticSearch client which is responsible for sending bulks of events to @@ -92,7 +92,8 @@ public class ElasticSearchRestClient implements ElasticSearchClient { } @Override - public void addEvent(Event event, IndexNameBuilder indexNameBuilder, String indexType, long ttlMs) throws Exception { + public void addEvent(Event event, IndexNameBuilder indexNameBuilder, String indexType, + long ttlMs) throws Exception { BytesReference content = serializer.getContentBuilder(event).bytes(); Map<String, Map<String, String>> parameters = new HashMap<String, Map<String, String>>(); Map<String, String> indexParameters = new HashMap<String, String>(); @@ -104,7 +105,7 @@ public class ElasticSearchRestClient implements ElasticSearchClient { parameters.put(INDEX_OPERATION_NAME, indexParameters); Gson gson = new Gson(); - synchronized(bulkBuilder) { + synchronized (bulkBuilder) { bulkBuilder.append(gson.toJson(parameters)); bulkBuilder.append("\n"); bulkBuilder.append(content.toBytesArray().toUtf8()); @@ -131,8 +132,10 @@ public class ElasticSearchRestClient implements ElasticSearchClient { response = httpClient.execute(httpRequest); statusCode = response.getStatusLine().getStatusCode(); logger.info("Status code from elasticsearch: " + statusCode); - if (response.getEntity() != null) - logger.debug("Status message from elasticsearch: " + EntityUtils.toString(response.getEntity(), "UTF-8")); + if (response.getEntity() != null) { + logger.debug("Status message from elasticsearch: " + + EntityUtils.toString(response.getEntity(), "UTF-8")); + } } if (statusCode != HttpStatus.SC_OK) { http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchTransportClient.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchTransportClient.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchTransportClient.java index d44c8ad..2cf365e 100644 --- a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchTransportClient.java +++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchTransportClient.java @@ -122,12 +122,10 @@ public class ElasticSearchTransportClient implements ElasticSearchClient { /** * Used for testing - * - * @param client ElasticSearch Client - * @param serializer Event Serializer */ public ElasticSearchTransportClient(Client client, - ElasticSearchIndexRequestBuilderFactory requestBuilderFactory) throws IOException { + ElasticSearchIndexRequestBuilderFactory requestBuilderFactory) + throws IOException { this.client = client; requestBuilderFactory.createIndexRequest(client, null, null, null); } http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/RoundRobinList.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/RoundRobinList.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/RoundRobinList.java index dbad8d8..4cbbe91 100644 --- a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/RoundRobinList.java +++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/RoundRobinList.java @@ -29,7 +29,7 @@ public class RoundRobinList<T> { iterator = this.elements.iterator(); } - synchronized public T get() { + public synchronized T get() { if (iterator.hasNext()) { return iterator.next(); } else { http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java index 28f0de1..280d0b0 100644 --- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java +++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java @@ -18,27 +18,23 @@ */ package org.apache.flume.sink.hbase; -import java.util.Arrays; -import java.util.Collection; -import java.util.Comparator; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicBoolean; - import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Charsets; +import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; import com.google.common.collect.Maps; import com.google.common.primitives.UnsignedBytes; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.stumbleupon.async.Callback; import org.apache.flume.Channel; +import org.apache.flume.ChannelException; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; import org.apache.flume.FlumeException; import org.apache.flume.Transaction; import org.apache.flume.conf.Configurable; +import org.apache.flume.instrumentation.SinkCounter; import org.apache.flume.sink.AbstractSink; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; @@ -47,68 +43,68 @@ import org.apache.hadoop.hbase.zookeeper.ZKConfig; import org.hbase.async.AtomicIncrementRequest; import org.hbase.async.HBaseClient; import org.hbase.async.PutRequest; -import org.jboss.netty.channel.socket.nio - .NioClientSocketChannelFactory; +import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Charsets; -import com.google.common.base.Preconditions; -import com.google.common.base.Throwables; -import com.stumbleupon.async.Callback; +import java.util.Arrays; +import java.util.Collection; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; -import org.apache.flume.ChannelException; -import org.apache.flume.instrumentation.SinkCounter; /** -* -* A simple sink which reads events from a channel and writes them to HBase. -* This Sink uses an aysnchronous API internally and is likely to -* perform better. -* The Hbase configution is picked up from the first <tt>hbase-site.xml</tt> -* encountered in the classpath. This sink supports batch reading of -* events from the channel, and writing them to Hbase, to minimize the number -* of flushes on the hbase tables. To use this sink, it has to be configured -* with certain mandatory parameters:<p> -* -* <tt>table: </tt> The name of the table in Hbase to write to. <p> -* <tt>columnFamily: </tt> The column family in Hbase to write to.<p> -* Other optional parameters are:<p> -* <tt>serializer:</tt> A class implementing -* {@link AsyncHbaseEventSerializer}. -* An instance of -* this class will be used to serialize events which are written to hbase.<p> -* <tt>serializer.*:</tt> Passed in the <code>configure()</code> method to -* serializer -* as an object of {@link org.apache.flume.Context}.<p> -* <tt>batchSize: </tt>This is the batch size used by the client. This is the -* maximum number of events the sink will commit per transaction. The default -* batch size is 100 events. -* <p> -* <tt>timeout: </tt> The length of time in milliseconds the sink waits for -* callbacks from hbase for all events in a transaction. -* If no timeout is specified, the sink will wait forever.<p> -* -* <strong>Note: </strong> Hbase does not guarantee atomic commits on multiple -* rows. So if a subset of events in a batch are written to disk by Hbase and -* Hbase fails, the flume transaction is rolled back, causing flume to write -* all the events in the transaction all over again, which will cause -* duplicates. The serializer is expected to take care of the handling of -* duplicates etc. HBase also does not support batch increments, so if -* multiple increments are returned by the serializer, then HBase failure -* will cause them to be re-written, when HBase comes back up. -*/ + * A simple sink which reads events from a channel and writes them to HBase. + * This Sink uses an aysnchronous API internally and is likely to + * perform better. + * The Hbase configution is picked up from the first <tt>hbase-site.xml</tt> + * encountered in the classpath. This sink supports batch reading of + * events from the channel, and writing them to Hbase, to minimize the number + * of flushes on the hbase tables. To use this sink, it has to be configured + * with certain mandatory parameters:<p> + * <p> + * <tt>table: </tt> The name of the table in Hbase to write to. <p> + * <tt>columnFamily: </tt> The column family in Hbase to write to.<p> + * Other optional parameters are:<p> + * <tt>serializer:</tt> A class implementing + * {@link AsyncHbaseEventSerializer}. + * An instance of + * this class will be used to serialize events which are written to hbase.<p> + * <tt>serializer.*:</tt> Passed in the <code>configure()</code> method to + * serializer + * as an object of {@link org.apache.flume.Context}.<p> + * <tt>batchSize: </tt>This is the batch size used by the client. This is the + * maximum number of events the sink will commit per transaction. The default + * batch size is 100 events. + * <p> + * <tt>timeout: </tt> The length of time in milliseconds the sink waits for + * callbacks from hbase for all events in a transaction. + * If no timeout is specified, the sink will wait forever.<p> + * <p> + * <strong>Note: </strong> Hbase does not guarantee atomic commits on multiple + * rows. So if a subset of events in a batch are written to disk by Hbase and + * Hbase fails, the flume transaction is rolled back, causing flume to write + * all the events in the transaction all over again, which will cause + * duplicates. The serializer is expected to take care of the handling of + * duplicates etc. HBase also does not support batch increments, so if + * multiple increments are returned by the serializer, then HBase failure + * will cause them to be re-written, when HBase comes back up. + */ public class AsyncHBaseSink extends AbstractSink implements Configurable { private String tableName; private byte[] columnFamily; private long batchSize; - private static final Logger logger = - LoggerFactory.getLogger(AsyncHBaseSink.class); + private static final Logger logger = LoggerFactory.getLogger(AsyncHBaseSink.class); private AsyncHbaseEventSerializer serializer; private String eventSerializerType; private Context serializerContext; @@ -138,10 +134,9 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable { // Does not need to be thread-safe. Always called only from the sink's // process method. - private final Comparator<byte[]> COMPARATOR = UnsignedBytes - .lexicographicalComparator(); + private final Comparator<byte[]> COMPARATOR = UnsignedBytes.lexicographicalComparator(); - public AsyncHBaseSink(){ + public AsyncHBaseSink() { this(null); } @@ -151,7 +146,7 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable { @VisibleForTesting AsyncHBaseSink(Configuration conf, boolean isTimeoutTest, - boolean isCoalesceTest) { + boolean isCoalesceTest) { this.conf = conf; this.isTimeoutTest = isTimeoutTest; this.isCoalesceTest = isCoalesceTest; @@ -189,17 +184,17 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable { * locks and conditions. */ Callback<Object, Object> putSuccessCallback = - new SuccessCallback<Object, Object>( + new SuccessCallback<Object, Object>( lock, callbacksReceived, condition); Callback<Object, Exception> putFailureCallback = - new FailureCallback<Object, Exception>( + new FailureCallback<Object, Exception>( lock, callbacksReceived, txnFail, condition); Callback<Long, Long> incrementSuccessCallback = - new SuccessCallback<Long, Long>( + new SuccessCallback<Long, Long>( lock, callbacksReceived, condition); Callback<Long, Exception> incrementFailureCallback = - new FailureCallback<Long, Exception>( + new FailureCallback<Long, Exception>( lock, callbacksReceived, txnFail, condition); Status status = Status.READY; @@ -235,9 +230,9 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable { for (AtomicIncrementRequest increment : increments) { if (batchIncrements) { CellIdentifier identifier = new CellIdentifier(increment.key(), - increment.qualifier()); + increment.qualifier()); AtomicIncrementRequest request - = incrementBuffer.get(identifier); + = incrementBuffer.get(identifier); if (request == null) { incrementBuffer.put(identifier, increment); } else { @@ -245,7 +240,7 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable { } } else { client.atomicIncrement(increment).addCallbacks( - incrementSuccessCallback, incrementFailureCallback); + incrementSuccessCallback, incrementFailureCallback); } } } @@ -254,7 +249,7 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable { Collection<AtomicIncrementRequest> increments = incrementBuffer.values(); for (AtomicIncrementRequest increment : increments) { client.atomicIncrement(increment).addCallbacks( - incrementSuccessCallback, incrementFailureCallback); + incrementSuccessCallback, incrementFailureCallback); } callbacksExpected.addAndGet(increments.size()); } @@ -273,14 +268,14 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable { long timeRemaining; try { while ((callbacksReceived.get() < callbacksExpected.get()) - && !txnFail.get()) { + && !txnFail.get()) { timeRemaining = timeout - (System.nanoTime() - startTime); timeRemaining = (timeRemaining >= 0) ? timeRemaining : 0; try { if (!condition.await(timeRemaining, TimeUnit.NANOSECONDS)) { txnFail.set(true); logger.warn("HBase callbacks timed out. " - + "Transaction will be rolled back."); + + "Transaction will be rolled back."); } } catch (Exception ex) { logger.error("Exception while waiting for callbacks from HBase."); @@ -348,35 +343,35 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable { Preconditions.checkNotNull(cf, "Column family cannot be empty, please specify in configuration file"); //Check foe event serializer, if null set event serializer type - if(eventSerializerType == null || eventSerializerType.isEmpty()) { + if (eventSerializerType == null || eventSerializerType.isEmpty()) { eventSerializerType = "org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer"; logger.info("No serializer defined, Will use default"); } serializerContext.putAll(context.getSubProperties( - HBaseSinkConfigurationConstants.CONFIG_SERIALIZER_PREFIX)); + HBaseSinkConfigurationConstants.CONFIG_SERIALIZER_PREFIX)); columnFamily = cf.getBytes(Charsets.UTF_8); try { @SuppressWarnings("unchecked") Class<? extends AsyncHbaseEventSerializer> clazz = - (Class<? extends AsyncHbaseEventSerializer>) - Class.forName(eventSerializerType); + (Class<? extends AsyncHbaseEventSerializer>) + Class.forName(eventSerializerType); serializer = clazz.newInstance(); serializer.configure(serializerContext); serializer.initialize(tableName.getBytes(Charsets.UTF_8), columnFamily); } catch (Exception e) { - logger.error("Could not instantiate event serializer." , e); + logger.error("Could not instantiate event serializer.", e); Throwables.propagate(e); } - if(sinkCounter == null) { + if (sinkCounter == null) { sinkCounter = new SinkCounter(this.getName()); } timeout = context.getLong(HBaseSinkConfigurationConstants.CONFIG_TIMEOUT, - HBaseSinkConfigurationConstants.DEFAULT_TIMEOUT); - if(timeout <= 0){ + HBaseSinkConfigurationConstants.DEFAULT_TIMEOUT); + if (timeout <= 0) { logger.warn("Timeout should be positive for Hbase sink. " - + "Sink will not timeout."); + + "Sink will not timeout."); timeout = HBaseSinkConfigurationConstants.DEFAULT_TIMEOUT; } //Convert to nanos. @@ -384,7 +379,7 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable { zkQuorum = context.getString( HBaseSinkConfigurationConstants.ZK_QUORUM, "").trim(); - if(!zkQuorum.isEmpty()) { + if (!zkQuorum.isEmpty()) { zkBaseDir = context.getString( HBaseSinkConfigurationConstants.ZK_ZNODE_PARENT, HBaseSinkConfigurationConstants.DEFAULT_ZK_ZNODE_PARENT); @@ -394,32 +389,33 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable { } zkQuorum = ZKConfig.getZKQuorumServersString(conf); zkBaseDir = conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT, - HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT); + HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT); } Preconditions.checkState(zkQuorum != null && !zkQuorum.isEmpty(), "The Zookeeper quorum cannot be null and should be specified."); enableWal = context.getBoolean(HBaseSinkConfigurationConstants - .CONFIG_ENABLE_WAL, HBaseSinkConfigurationConstants.DEFAULT_ENABLE_WAL); + .CONFIG_ENABLE_WAL, HBaseSinkConfigurationConstants.DEFAULT_ENABLE_WAL); logger.info("The write to WAL option is set to: " + String.valueOf(enableWal)); - if(!enableWal) { + if (!enableWal) { logger.warn("AsyncHBaseSink's enableWal configuration is set to false. " + - "All writes to HBase will have WAL disabled, and any data in the " + - "memstore of this region in the Region Server could be lost!"); + "All writes to HBase will have WAL disabled, and any data in the " + + "memstore of this region in the Region Server could be lost!"); } batchIncrements = context.getBoolean( - HBaseSinkConfigurationConstants.CONFIG_COALESCE_INCREMENTS, - HBaseSinkConfigurationConstants.DEFAULT_COALESCE_INCREMENTS); + HBaseSinkConfigurationConstants.CONFIG_COALESCE_INCREMENTS, + HBaseSinkConfigurationConstants.DEFAULT_COALESCE_INCREMENTS); - if(batchIncrements) { + if (batchIncrements) { incrementBuffer = Maps.newHashMap(); logger.info("Increment coalescing is enabled. Increments will be " + - "buffered."); + "buffered."); } - maxConsecutiveFails = context.getInteger(HBaseSinkConfigurationConstants.CONFIG_MAX_CONSECUTIVE_FAILS, - HBaseSinkConfigurationConstants.DEFAULT_MAX_CONSECUTIVE_FAILS); + maxConsecutiveFails = + context.getInteger(HBaseSinkConfigurationConstants.CONFIG_MAX_CONSECUTIVE_FAILS, + HBaseSinkConfigurationConstants.DEFAULT_MAX_CONSECUTIVE_FAILS); } @@ -432,10 +428,11 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable { boolean isConfNull() { return conf == null; } + @Override - public void start(){ + public void start() { Preconditions.checkArgument(client == null, "Please call stop " - + "before calling start on an old instance."); + + "before calling start on an old instance."); sinkCounter.start(); sinkCounter.incrementConnectionCreatedCount(); client = initHBaseClient(); @@ -446,31 +443,31 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable { logger.info("Initializing HBase Client"); sinkCallbackPool = Executors.newCachedThreadPool(new ThreadFactoryBuilder() - .setNameFormat(this.getName() + " HBase Call Pool").build()); + .setNameFormat(this.getName() + " HBase Call Pool").build()); logger.info("Callback pool created"); client = new HBaseClient(zkQuorum, zkBaseDir, - new NioClientSocketChannelFactory(sinkCallbackPool, sinkCallbackPool)); + new NioClientSocketChannelFactory(sinkCallbackPool, sinkCallbackPool)); final CountDownLatch latch = new CountDownLatch(1); final AtomicBoolean fail = new AtomicBoolean(false); client.ensureTableFamilyExists( - tableName.getBytes(Charsets.UTF_8), columnFamily).addCallbacks( - new Callback<Object, Object>() { - @Override - public Object call(Object arg) throws Exception { - latch.countDown(); - logger.info("table found"); - return null; - } - }, - new Callback<Object, Object>() { - @Override - public Object call(Object arg) throws Exception { - fail.set(true); - latch.countDown(); - return null; - } - }); + tableName.getBytes(Charsets.UTF_8), columnFamily).addCallbacks( + new Callback<Object, Object>() { + @Override + public Object call(Object arg) throws Exception { + latch.countDown(); + logger.info("table found"); + return null; + } + }, + new Callback<Object, Object>() { + @Override + public Object call(Object arg) throws Exception { + fail.set(true); + latch.countDown(); + return null; + } + }); try { logger.info("waiting on callback"); @@ -481,14 +478,14 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable { throw new FlumeException( "Interrupted while waiting for Hbase Callbacks", e); } - if(fail.get()){ + if (fail.get()) { sinkCounter.incrementConnectionFailedCount(); if (client != null) { shutdownHBaseClient(); } throw new FlumeException( "Could not start sink. " + - "Table or column family does not exist in Hbase."); + "Table or column family does not exist in Hbase."); } else { open = true; } @@ -497,7 +494,7 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable { } @Override - public void stop(){ + public void stop() { serializer.cleanUp(); if (client != null) { shutdownHBaseClient(); @@ -514,7 +511,7 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable { } } catch (InterruptedException e) { logger.error("Interrupted while waiting for asynchbase sink pool to " + - "die", e); + "die", e); if (sinkCallbackPool != null) { sinkCallbackPool.shutdownNow(); } @@ -546,7 +543,7 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable { }); if (!waiter.await(timeout, TimeUnit.NANOSECONDS)) { logger.error("HBase connection could not be closed within timeout! HBase cluster might " + - "be down!"); + "be down!"); } } catch (Exception ex) { logger.warn("Error while attempting to close connections to HBase"); @@ -569,7 +566,7 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable { } catch (Throwable e) { logger.error("Failed to commit transaction." + "Transaction rolled back.", e); - if(e instanceof Error || e instanceof RuntimeException){ + if (e instanceof Error || e instanceof RuntimeException) { logger.error("Failed to commit transaction." + "Transaction rolled back.", e); Throwables.propagate(e); @@ -583,14 +580,15 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable { txn.close(); } } - private class SuccessCallback<R,T> implements Callback<R,T> { + + private class SuccessCallback<R, T> implements Callback<R, T> { private Lock lock; private AtomicInteger callbacksReceived; private Condition condition; private final boolean isTimeoutTesting; public SuccessCallback(Lock lck, AtomicInteger callbacksReceived, - Condition condition) { + Condition condition) { lock = lck; this.callbacksReceived = callbacksReceived; this.condition = condition; @@ -614,7 +612,7 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable { private void doCall() throws Exception { callbacksReceived.incrementAndGet(); lock.lock(); - try{ + try { condition.signal(); } finally { lock.unlock(); @@ -622,14 +620,15 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable { } } - private class FailureCallback<R,T extends Exception> implements Callback<R,T> { + private class FailureCallback<R, T extends Exception> implements Callback<R, T> { private Lock lock; private AtomicInteger callbacksReceived; private AtomicBoolean txnFail; private Condition condition; private final boolean isTimeoutTesting; + public FailureCallback(Lock lck, AtomicInteger callbacksReceived, - AtomicBoolean txnFail, Condition condition){ + AtomicBoolean txnFail, Condition condition) { this.lock = lck; this.callbacksReceived = callbacksReceived; this.txnFail = txnFail; @@ -665,7 +664,7 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable { } private void checkIfChannelExceptionAndThrow(Throwable e) - throws EventDeliveryException { + throws EventDeliveryException { if (e instanceof ChannelException) { throw new EventDeliveryException("Error in processing transaction.", e); } else if (e instanceof Error || e instanceof RuntimeException) { @@ -678,13 +677,14 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable { private final byte[] row; private final byte[] column; private final int hashCode; + // Since the sink operates only on one table and one cf, // we use the data from the owning sink public CellIdentifier(byte[] row, byte[] column) { this.row = row; this.column = column; this.hashCode = - (Arrays.hashCode(row) * 31) * (Arrays.hashCode(column) * 31); + (Arrays.hashCode(row) * 31) * (Arrays.hashCode(column) * 31); } @Override @@ -701,7 +701,7 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable { return false; } else { return (COMPARATOR.compare(row, o.row) == 0 - && COMPARATOR.compare(column, o.column) == 0); + && COMPARATOR.compare(column, o.column) == 0); } } } http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHbaseEventSerializer.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHbaseEventSerializer.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHbaseEventSerializer.java index 9ae6c28..481fce8 100644 --- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHbaseEventSerializer.java +++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHbaseEventSerializer.java @@ -34,8 +34,7 @@ import org.hbase.async.PutRequest; * of this interface is expected by the {@linkplain AsyncHBaseSink} to serialize * the events. */ -public interface AsyncHbaseEventSerializer extends Configurable, -ConfigurableComponent { +public interface AsyncHbaseEventSerializer extends Configurable, ConfigurableComponent { /** * Initialize the event serializer. @@ -47,7 +46,7 @@ ConfigurableComponent { public void initialize(byte[] table, byte[] cf); /** - * @param Event to be written to HBase. + * @param event Event to be written to HBase */ public void setEvent(Event event);
