FLUME-2937. Integrate checkstyle for non-test classes Based on the Google checkstyle file with modifications.
The changes here do not change the generated Java bytecode (after stripping line numbers). They are syntax / whitespace ONLY. Code review: https://reviews.apache.org/r/49403/ Reviewed by Hari. Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/2252fb19 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/2252fb19 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/2252fb19 Branch: refs/heads/trunk Commit: 2252fb1938a4fd578f88c64eb444c74777c46212 Parents: 2fe3938 Author: Mike Percy <[email protected]> Authored: Sun Jun 26 02:57:37 2016 -0700 Committer: Mike Percy <[email protected]> Committed: Wed Jun 29 19:16:57 2016 -0700 ---------------------------------------------------------------------- flume-checkstyle/pom.xml | 36 ++ .../resources/flume/checkstyle-suppressions.xml | 43 +++ .../src/main/resources/flume/checkstyle.xml | 177 ++++++++++ .../flume/api/SecureRpcClientFactory.java | 4 +- .../apache/flume/api/SecureThriftRpcClient.java | 41 ++- .../flume/auth/FlumeAuthenticationUtil.java | 6 +- .../flume/auth/KerberosAuthenticator.java | 29 +- .../org/apache/flume/auth/KerberosUser.java | 4 +- .../apache/flume/auth/SimpleAuthenticator.java | 4 +- .../java/org/apache/flume/auth/UGIExecutor.java | 8 +- .../channel/file/BadCheckpointException.java | 3 +- .../flume/channel/file/CheckpointRebuilder.java | 110 +++---- .../org/apache/flume/channel/file/Commit.java | 16 +- .../channel/file/CorruptEventException.java | 3 +- .../file/EventQueueBackingStoreFactory.java | 71 ++-- .../file/EventQueueBackingStoreFile.java | 138 ++++---- .../file/EventQueueBackingStoreFileV2.java | 2 +- .../file/EventQueueBackingStoreFileV3.java | 78 ++--- .../apache/flume/channel/file/EventUtils.java | 2 +- .../apache/flume/channel/file/FileChannel.java | 156 ++++----- .../channel/file/FileChannelConfiguration.java | 6 +- .../apache/flume/channel/file/FlumeEvent.java | 26 +- .../flume/channel/file/FlumeEventPointer.java | 10 +- .../flume/channel/file/FlumeEventQueue.java | 145 ++++---- .../java/org/apache/flume/channel/file/Log.java | 220 +++++++------ .../org/apache/flume/channel/file/LogFile.java | 177 +++++----- .../flume/channel/file/LogFileFactory.java | 73 +++-- .../file/LogFileRetryableIOException.java | 3 + .../apache/flume/channel/file/LogFileV2.java | 19 +- .../apache/flume/channel/file/LogFileV3.java | 113 ++++--- .../apache/flume/channel/file/LogRecord.java | 22 +- .../org/apache/flume/channel/file/LogUtils.java | 2 +- .../org/apache/flume/channel/file/Pair.java | 4 +- .../java/org/apache/flume/channel/file/Put.java | 13 +- .../flume/channel/file/ReplayHandler.java | 41 ++- .../org/apache/flume/channel/file/Rollback.java | 9 +- .../flume/channel/file/Serialization.java | 86 +++-- .../org/apache/flume/channel/file/Take.java | 10 +- .../channel/file/TransactionEventRecord.java | 27 +- .../flume/channel/file/TransactionIDOracle.java | 4 +- .../flume/channel/file/WritableUtils.java | 8 +- .../flume/channel/file/WriteOrderOracle.java | 4 +- .../encryption/AESCTRNoPaddingProvider.java | 16 +- .../channel/file/encryption/CipherProvider.java | 17 +- .../file/encryption/CipherProviderFactory.java | 4 +- .../file/encryption/CipherProviderType.java | 1 - .../encryption/DecryptionFailureException.java | 1 - .../file/encryption/JCEFileKeyProvider.java | 10 +- .../file/encryption/KeyProviderType.java | 1 - .../flume/channel/file/proto/ProtosFactory.java | 11 +- .../channel/jdbc/ConfigurationConstants.java | 1 - .../apache/flume/channel/jdbc/JdbcChannel.java | 1 + .../flume/channel/jdbc/JdbcChannelProvider.java | 2 +- .../channel/jdbc/impl/DerbySchemaHandler.java | 24 +- .../jdbc/impl/JdbcChannelProviderImpl.java | 22 +- .../channel/jdbc/impl/JdbcTransactionImpl.java | 1 - .../flume/channel/jdbc/impl/SchemaHandler.java | 4 - .../channel/jdbc/impl/SchemaHandlerFactory.java | 22 +- .../flume/channel/kafka/KafkaChannel.java | 87 ++--- .../kafka/KafkaChannelConfiguration.java | 15 +- .../flume/channel/SpillableMemoryChannel.java | 311 +++++++++--------- .../LoadBalancingLog4jAppender.java | 18 +- .../clients/log4jappender/Log4jAppender.java | 26 +- .../clients/log4jappender/Log4jAvroHeaders.java | 13 +- .../src/main/java/org/apache/flume/Context.java | 10 +- .../flume/conf/BasicConfigurationConstants.java | 1 - .../flume/conf/ComponentConfiguration.java | 11 +- .../conf/ComponentConfigurationFactory.java | 9 +- .../apache/flume/conf/FlumeConfiguration.java | 142 ++++---- .../flume/conf/FlumeConfigurationError.java | 2 +- .../flume/conf/sink/SinkConfiguration.java | 4 +- .../flume/conf/sink/SinkGroupConfiguration.java | 3 +- .../flume/conf/source/SourceConfiguration.java | 4 +- .../java/org/apache/flume/ChannelFactory.java | 9 +- .../java/org/apache/flume/ChannelSelector.java | 1 - .../src/main/java/org/apache/flume/Clock.java | 4 +- .../main/java/org/apache/flume/SinkFactory.java | 8 +- .../main/java/org/apache/flume/SinkRunner.java | 6 +- .../java/org/apache/flume/SourceFactory.java | 7 +- .../java/org/apache/flume/SourceRunner.java | 2 +- .../main/java/org/apache/flume/SystemClock.java | 6 +- .../main/java/org/apache/flume/Transaction.java | 11 +- .../apache/flume/annotations/Disposable.java | 2 + .../flume/annotations/InterfaceStability.java | 1 + .../apache/flume/annotations/Recyclable.java | 2 + .../apache/flume/channel/AbstractChannel.java | 6 +- .../flume/channel/AbstractChannelSelector.java | 2 +- .../channel/BasicTransactionSemantics.java | 1 + .../flume/channel/ChannelSelectorFactory.java | 2 +- .../flume/channel/DefaultChannelFactory.java | 3 +- .../org/apache/flume/channel/MemoryChannel.java | 98 +++--- .../channel/MultiplexingChannelSelector.java | 8 +- .../flume/channel/PseudoTxnMemoryChannel.java | 6 +- .../channel/ReplicatingChannelSelector.java | 6 +- .../apache/flume/client/avro/AvroCLIClient.java | 17 +- .../avro/ReliableSpoolingFileEventReader.java | 62 ++-- .../org/apache/flume/event/EventHelper.java | 10 +- .../flume/formatter/output/BucketPath.java | 327 +++++++++---------- .../formatter/output/DefaultPathManager.java | 2 +- .../flume/formatter/output/PathManager.java | 32 +- .../formatter/output/PathManagerFactory.java | 89 ++--- .../flume/formatter/output/PathManagerType.java | 20 +- .../formatter/output/RollTimePathManager.java | 56 ++-- .../flume/instrumentation/ChannelCounter.java | 10 +- .../flume/instrumentation/GangliaServer.java | 6 - .../instrumentation/MonitoredCounterGroup.java | 21 +- .../flume/instrumentation/MonitoringType.java | 2 +- .../flume/instrumentation/SinkCounter.java | 2 - .../flume/instrumentation/SourceCounter.java | 17 +- .../instrumentation/http/HTTPMetricsServer.java | 8 +- .../kafka/KafkaChannelCounter.java | 6 +- .../flume/instrumentation/util/JMXPollUtil.java | 18 +- .../flume/interceptor/HostInterceptor.java | 3 +- .../interceptor/RegexExtractorInterceptor.java | 11 +- .../interceptor/RegexFilteringInterceptor.java | 13 +- .../SearchAndReplaceInterceptor.java | 6 +- .../flume/interceptor/StaticInterceptor.java | 13 +- .../flume/interceptor/TimestampInterceptor.java | 1 + .../apache/flume/lifecycle/LifecycleAware.java | 58 ++-- .../flume/lifecycle/LifecycleSupervisor.java | 31 +- .../serialization/AvroEventDeserializer.java | 5 +- .../serialization/BodyTextEventSerializer.java | 3 +- .../HeaderAndBodyTextEventSerializer.java | 2 +- .../flume/serialization/LineDeserializer.java | 3 +- .../ResettableFileInputStream.java | 28 +- .../org/apache/flume/sink/AbstractRpcSink.java | 51 ++- .../org/apache/flume/sink/AbstractSink.java | 5 +- .../flume/sink/AbstractSinkProcessor.java | 4 +- .../apache/flume/sink/AbstractSinkSelector.java | 2 +- .../apache/flume/sink/DefaultSinkFactory.java | 3 +- .../apache/flume/sink/DefaultSinkProcessor.java | 3 +- .../flume/sink/FailoverSinkProcessor.java | 31 +- .../flume/sink/LoadBalancingSinkProcessor.java | 34 +- .../java/org/apache/flume/sink/LoggerSink.java | 5 +- .../java/org/apache/flume/sink/NullSink.java | 2 +- .../org/apache/flume/sink/RollingFileSink.java | 12 +- .../apache/flume/sink/SinkProcessorFactory.java | 5 +- .../java/org/apache/flume/sink/ThriftSink.java | 12 +- .../flume/source/AbstractEventDrivenSource.java | 6 +- .../flume/source/AbstractPollableSource.java | 10 +- .../org/apache/flume/source/AbstractSource.java | 4 +- .../org/apache/flume/source/AvroSource.java | 102 +++--- .../flume/source/BasicSourceSemantics.java | 9 +- .../flume/source/DefaultSourceFactory.java | 5 +- .../org/apache/flume/source/ExecSource.java | 50 +-- .../ExecSourceConfigurationConstants.java | 3 +- .../flume/source/MultiportSyslogTCPSource.java | 4 +- .../NetcatSourceConfigurationConstants.java | 6 +- .../flume/source/PollableSourceRunner.java | 8 +- .../flume/source/SequenceGeneratorSource.java | 6 +- .../flume/source/SpoolDirectorySource.java | 20 +- ...olDirectorySourceConfigurationConstants.java | 1 + .../org/apache/flume/source/StressSource.java | 9 +- .../org/apache/flume/source/SyslogParser.java | 13 +- .../apache/flume/source/SyslogTcpSource.java | 8 +- .../apache/flume/source/SyslogUDPSource.java | 26 +- .../org/apache/flume/source/SyslogUtils.java | 262 +++++++-------- .../org/apache/flume/source/ThriftSource.java | 61 ++-- .../apache/flume/source/http/BLOBHandler.java | 8 +- .../apache/flume/source/http/HTTPSource.java | 44 +-- .../apache/flume/source/http/JSONHandler.java | 14 +- .../apache/flume/tools/DirectMemoryUtils.java | 10 +- .../org/apache/flume/tools/GetJavaProperty.java | 2 +- .../flume/tools/TimestampRoundDownUtil.java | 9 +- .../org/apache/flume/tools/VersionInfo.java | 16 +- .../flume/agent/embedded/EmbeddedAgent.java | 38 ++- .../embedded/EmbeddedAgentConfiguration.java | 53 ++- .../flume/agent/embedded/EmbeddedSource.java | 6 +- .../source/thriftLegacy/ThriftLegacySource.java | 10 +- .../node/AbstractConfigurationProvider.java | 121 ++++--- .../java/org/apache/flume/node/Application.java | 114 +++---- .../flume/node/ConfigurationProvider.java | 7 +- ...lingPropertiesFileConfigurationProvider.java | 13 +- .../node/SimpleMaterializedConfiguration.java | 2 +- .../org/apache/flume/api/AbstractRpcClient.java | 2 +- .../org/apache/flume/api/FailoverRpcClient.java | 6 +- .../java/org/apache/flume/api/HostInfo.java | 2 +- .../flume/api/LoadBalancingRpcClient.java | 9 +- .../apache/flume/api/NettyAvroRpcClient.java | 38 +-- .../api/RpcClientConfigurationConstants.java | 6 +- .../org/apache/flume/api/RpcClientFactory.java | 16 +- .../org/apache/flume/api/ThriftRpcClient.java | 120 ++++--- .../org/apache/flume/event/EventBuilder.java | 2 +- .../java/org/apache/flume/event/JSONEvent.java | 6 +- .../org/apache/flume/event/SimpleEvent.java | 2 +- .../org/apache/flume/util/OrderSelector.java | 5 +- .../org/apache/flume/sink/kite/DatasetSink.java | 12 +- .../sink/kite/NonRecoverableEventException.java | 1 - .../sink/kite/parser/EntityParserFactory.java | 1 - .../sink/kite/policy/FailurePolicyFactory.java | 1 - .../flume/sink/hdfs/AbstractHDFSWriter.java | 15 +- .../flume/sink/hdfs/BucketClosedException.java | 2 +- .../apache/flume/sink/hdfs/BucketWriter.java | 127 ++++--- .../sink/hdfs/HDFSCompressedDataStream.java | 9 +- .../apache/flume/sink/hdfs/HDFSDataStream.java | 19 +- .../apache/flume/sink/hdfs/HDFSEventSink.java | 76 ++--- .../flume/sink/hdfs/HDFSSequenceFile.java | 7 +- .../apache/flume/sink/hdfs/KerberosUser.java | 4 +- .../sink/hdfs/SequenceFileSerializerType.java | 3 +- .../sink/hive/HiveDelimitedTextSerializer.java | 16 +- .../flume/sink/hive/HiveEventSerializer.java | 1 - .../org/apache/flume/sink/hive/HiveSink.java | 30 +- .../org/apache/flume/sink/hive/HiveWriter.java | 141 ++++---- .../java/org/apache/flume/sink/irc/IRCSink.java | 4 +- ...ElasticSearchIndexRequestBuilderFactory.java | 5 +- ...ElasticSearchIndexRequestBuilderFactory.java | 2 +- .../sink/elasticsearch/ElasticSearchSink.java | 1 + ...entSerializerIndexRequestBuilderFactory.java | 2 +- .../client/ElasticSearchClientFactory.java | 6 +- .../client/ElasticSearchRestClient.java | 23 +- .../client/ElasticSearchTransportClient.java | 6 +- .../elasticsearch/client/RoundRobinList.java | 2 +- .../apache/flume/sink/hbase/AsyncHBaseSink.java | 254 +++++++------- .../sink/hbase/AsyncHbaseEventSerializer.java | 5 +- .../org/apache/flume/sink/hbase/HBaseSink.java | 117 +++---- .../flume/sink/hbase/HbaseEventSerializer.java | 10 +- .../sink/hbase/RegexHbaseEventSerializer.java | 50 +-- .../hbase/SimpleAsyncHbaseEventSerializer.java | 33 +- .../sink/hbase/SimpleHbaseEventSerializer.java | 73 ++--- .../flume/sink/hbase/SimpleRowKeyGenerator.java | 22 +- .../org/apache/flume/sink/kafka/KafkaSink.java | 45 +-- .../flume/sink/kafka/KafkaSinkConstants.java | 15 +- .../sink/solr/morphline/BlobDeserializer.java | 3 +- .../flume/sink/solr/morphline/BlobHandler.java | 3 +- .../solr/morphline/MorphlineHandlerImpl.java | 6 +- .../solr/morphline/MorphlineInterceptor.java | 5 +- .../sink/solr/morphline/MorphlineSink.java | 8 +- .../source/jms/DefaultJMSMessageConverter.java | 28 +- .../flume/source/jms/InitialContextFactory.java | 1 - .../flume/source/jms/JMSMessageConsumer.java | 84 +++-- .../source/jms/JMSMessageConsumerFactory.java | 10 +- .../org/apache/flume/source/jms/JMSSource.java | 124 ++++--- .../apache/flume/source/kafka/KafkaSource.java | 55 ++-- .../source/kafka/KafkaSourceConstants.java | 9 +- .../taildir/ReliableTaildirEventReader.java | 52 +-- .../apache/flume/source/taildir/TailFile.java | 88 +++-- .../flume/source/taildir/TaildirMatcher.java | 106 +++--- .../flume/source/taildir/TaildirSource.java | 18 +- .../TaildirSourceConfigurationConstants.java | 4 +- .../TestRpcClientCommunicationFailure.java | 60 ++-- .../flume/tools/FileChannelIntegrityTool.java | 85 +++-- .../org/apache/flume/tools/FlumeToolType.java | 2 +- .../org/apache/flume/tools/FlumeToolsMain.java | 19 +- pom.xml | 66 ++++ 244 files changed, 3823 insertions(+), 3436 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-checkstyle/pom.xml ---------------------------------------------------------------------- diff --git a/flume-checkstyle/pom.xml b/flume-checkstyle/pom.xml new file mode 100644 index 0000000..31db3c0 --- /dev/null +++ b/flume-checkstyle/pom.xml @@ -0,0 +1,36 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <!-- + <parent> + <artifactId>flume-parent</artifactId> + <groupId>org.apache.flume</groupId> + <version>1.7.0-SNAPSHOT</version> + </parent> + --> + + <groupId>org.apache.flume</groupId> + <artifactId>flume-checkstyle</artifactId> + <name>Flume checkstyle project</name> + <version>1.7.0-SNAPSHOT</version> +</project> http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-checkstyle/src/main/resources/flume/checkstyle-suppressions.xml ---------------------------------------------------------------------- diff --git a/flume-checkstyle/src/main/resources/flume/checkstyle-suppressions.xml b/flume-checkstyle/src/main/resources/flume/checkstyle-suppressions.xml new file mode 100644 index 0000000..49c8834 --- /dev/null +++ b/flume-checkstyle/src/main/resources/flume/checkstyle-suppressions.xml @@ -0,0 +1,43 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!DOCTYPE suppressions PUBLIC + "-//Puppy Crawl//DTD Suppressions 1.0//EN" + "http://www.puppycrawl.com/dtds/suppressions_1_0.dtd"> +<suppressions> + + <!-- Suppress all style checks for generated code --> + <suppress checks=".*" + files="generated-sources|com/cloudera/flume/handlers/thrift|org/apache/flume/thrift/|org/apache/flume/source/scribe|ProtosFactory.java"/> + + <!-- The "legacy" sources have a weird camelCaps package name --> + <suppress checks="PackageName" + files="org/apache/flume/source/avroLegacy|org/apache/flume/source/thriftLegacy"/> + + <!-- TODO: Rearrange methods in below classes to keep overloaded methods adjacent --> + <suppress checks="OverloadMethodsDeclarationOrder" + files="channel/file|RpcClientFactory\.java|BucketPath\.java|SinkGroup\.java|DefaultSinkProcessor\.java|RegexExtractorInterceptorMillisSerializer\.java|SimpleAsyncHbaseEventSerializer\.java|hdfs/BucketWriter\.java"/> + + <!-- TODO: Fix inner class names to follow standard convention --> + <suppress checks="TypeName" + files="SyslogUDPSource\.java|SyslogTcpSource\.java|TaildirSource\.java"/> + + <!-- TODO: Add default cases to switch statements --> + <suppress checks="MissingSwitchDefault" + files="SyslogUtils\.java|ReliableTaildirEventReader\.java"/> + + <!-- TODO: Avoid empty catch blocks --> + <suppress checks="EmptyCatchBlock" + files="channel/file/LogFile\.java"/> + + <!-- TODO: Avoid empty if blocks --> + <suppress checks="EmptyBlockCheck" + files="ElasticSearchClientFactory\.java"/> + + <!-- TODO: Fix line length issues --> + <suppress checks="LineLengthCheck" + files="channel/MemoryChannel\.java|ReliableSpoolingFileEventReader\.java"/> + + <!-- TODO: Move helper classes to their own files --> + <suppress checks="OneTopLevelClass" + files="KafkaSource\.java|KafkaChannel\.java|KafkaSink\.java"/> + +</suppressions> http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-checkstyle/src/main/resources/flume/checkstyle.xml ---------------------------------------------------------------------- diff --git a/flume-checkstyle/src/main/resources/flume/checkstyle.xml b/flume-checkstyle/src/main/resources/flume/checkstyle.xml new file mode 100644 index 0000000..e8913f0 --- /dev/null +++ b/flume-checkstyle/src/main/resources/flume/checkstyle.xml @@ -0,0 +1,177 @@ +<?xml version="1.0"?> +<!DOCTYPE module PUBLIC + "-//Puppy Crawl//DTD Check Configuration 1.3//EN" + "http://www.puppycrawl.com/dtds/configuration_1_3.dtd"> + +<!-- + Checkstyle configuration that checks the Google coding conventions from Google Java Style + that can be found at https://google.github.io/styleguide/javaguide.html. + + Checkstyle is very configurable. Be sure to read the documentation at + http://checkstyle.sf.net (or in your downloaded distribution). + + To completely disable a check, just comment it out or delete it from the file. + + Authors: Max Vetrenko, Ruslan Diachenko, Roman Ivanov. + --> + +<module name = "Checker"> + <property name="charset" value="UTF-8"/> + + <property name="severity" value="warning"/> + + <property name="fileExtensions" value="java, properties, xml"/> + <!-- Checks for whitespace --> + <!-- See http://checkstyle.sf.net/config_whitespace.html --> + <module name="FileTabCharacter"> + <property name="eachLine" value="true"/> + </module> + + <module name="TreeWalker"> + <module name="OuterTypeFilename"/> + <module name="IllegalTokenText"> + <property name="tokens" value="STRING_LITERAL, CHAR_LITERAL"/> + <property name="format" value="\\u00(08|09|0(a|A)|0(c|C)|0(d|D)|22|27|5(C|c))|\\(0(10|11|12|14|15|42|47)|134)"/> + <property name="message" value="Avoid using corresponding octal or Unicode escape."/> + </module> + <module name="AvoidEscapedUnicodeCharacters"> + <property name="allowEscapesForControlCharacters" value="true"/> + <property name="allowByTailComment" value="true"/> + <property name="allowNonPrintableEscapes" value="true"/> + </module> + <module name="LineLength"> + <property name="max" value="100"/> + <property name="ignorePattern" value="^package.*|^import.*|a href|href|http://|https://|ftp://"/> + </module> + <module name="AvoidStarImport"> + <property name="allowStaticMemberImports" value="true"/> + </module> + <module name="OneTopLevelClass"/> + <module name="NoLineWrap"/> + <module name="EmptyBlock"> + <property name="option" value="TEXT"/> + <property name="tokens" value="LITERAL_TRY, LITERAL_FINALLY, LITERAL_IF, LITERAL_ELSE, LITERAL_SWITCH"/> + </module> + <module name="NeedBraces"> + <property name="allowSingleLineStatement" value="true"/> + </module> + <module name="LeftCurly"> + <property name="maxLineLength" value="100"/> + </module> + <module name="RightCurly"/> + <module name="RightCurly"> + <property name="option" value="alone"/> + <property name="tokens" value="CLASS_DEF, METHOD_DEF, CTOR_DEF, LITERAL_FOR, LITERAL_WHILE, LITERAL_DO, STATIC_INIT, INSTANCE_INIT"/> + </module> + <module name="WhitespaceAround"> + <property name="allowEmptyConstructors" value="true"/> + <property name="allowEmptyMethods" value="true"/> + <property name="allowEmptyTypes" value="true"/> + <property name="allowEmptyLoops" value="true"/> + <message key="ws.notFollowed" + value="WhitespaceAround: ''{0}'' is not followed by whitespace. Empty blocks may only be represented as '{}' when not part of a multi-block statement (4.1.3)"/> + <message key="ws.notPreceded" + value="WhitespaceAround: ''{0}'' is not preceded with whitespace."/> + </module> + <module name="OneStatementPerLine"/> + <module name="ArrayTypeStyle"/> + <module name="MissingSwitchDefault"/> + <module name="FallThrough"/> + <module name="UpperEll"/> + <module name="ModifierOrder"/> + <module name="EmptyLineSeparator"> + <property name="allowNoEmptyLineBetweenFields" value="true"/> + <property name="allowMultipleEmptyLines" value="false"/> + <property name="allowMultipleEmptyLinesInsideClassMembers" value="false"/> + <property name="tokens" value="IMPORT, CLASS_DEF, INTERFACE_DEF, ENUM_DEF, STATIC_INIT, INSTANCE_INIT, CTOR_DEF, VARIABLE_DEF"/> + </module> + <module name="SeparatorWrap"> + <property name="tokens" value="DOT"/> + <property name="option" value="nl"/> + </module> + <module name="SeparatorWrap"> + <property name="tokens" value="COMMA"/> + <property name="option" value="EOL"/> + </module> + <module name="PackageName"> + <property name="format" value="^[a-z]+(\.[a-z][a-z0-9]*)*$"/> + <message key="name.invalidPattern" + value="Package name ''{0}'' must match pattern ''{1}''."/> + </module> + <module name="TypeName"> + <message key="name.invalidPattern" + value="Type name ''{0}'' must match pattern ''{1}''."/> + </module> + <module name="ClassTypeParameterName"> + <property name="format" value="(^[A-Z][0-9]?)$|([A-Z][a-zA-Z0-9]*[T]$)"/> + <message key="name.invalidPattern" + value="Class type name ''{0}'' must match pattern ''{1}''."/> + </module> + <module name="MethodTypeParameterName"> + <property name="format" value="(^[A-Z][0-9]?)$|([A-Z][a-zA-Z0-9]*[T]$)"/> + <message key="name.invalidPattern" + value="Method type name ''{0}'' must match pattern ''{1}''."/> + </module> + <module name="InterfaceTypeParameterName"> + <property name="format" value="(^[A-Z][0-9]?)$|([A-Z][a-zA-Z0-9]*[T]$)"/> + <message key="name.invalidPattern" + value="Interface type name ''{0}'' must match pattern ''{1}''."/> + </module> + <module name="NoFinalizer"/> + <module name="GenericWhitespace"> + <message key="ws.followed" + value="GenericWhitespace ''{0}'' is followed by whitespace."/> + <message key="ws.preceded" + value="GenericWhitespace ''{0}'' is preceded with whitespace."/> + <message key="ws.illegalFollow" + value="GenericWhitespace ''{0}'' should followed by whitespace."/> + <message key="ws.notPreceded" + value="GenericWhitespace ''{0}'' is not preceded with whitespace."/> + </module> + <module name="Indentation"> + <property name="basicOffset" value="2"/> + <property name="braceAdjustment" value="0"/> + <property name="caseIndent" value="2"/> + <property name="throwsIndent" value="4"/> + <property name="lineWrappingIndentation" value="4"/> + <property name="arrayInitIndent" value="2"/> + </module> + <module name="OverloadMethodsDeclarationOrder"/> + <module name="MethodParamPad"/> + <module name="AnnotationLocation"> + <property name="tokens" value="CLASS_DEF, INTERFACE_DEF, ENUM_DEF, METHOD_DEF, CTOR_DEF"/> + </module> + <module name="AnnotationLocation"> + <property name="tokens" value="VARIABLE_DEF"/> + <property name="allowSamelineMultipleAnnotations" value="true"/> + </module> + <module name="AtclauseOrder"> + <property name="tagOrder" value="@param, @return, @throws, @deprecated"/> + <property name="target" value="CLASS_DEF, INTERFACE_DEF, ENUM_DEF, METHOD_DEF, CTOR_DEF, VARIABLE_DEF"/> + </module> + <module name="JavadocMethod"> + <property name="scope" value="public"/> + <property name="allowMissingJavadoc" value="true"/> + <property name="allowMissingParamTags" value="true"/> + <property name="allowMissingThrowsTags" value="true"/> + <property name="allowMissingReturnTag" value="true"/> + <property name="minLineCount" value="0"/> + <property name="allowedAnnotations" value="Override, Test"/> + <property name="allowThrowsTagsForSubclasses" value="true"/> + </module> + <module name="MethodName"> + <property name="format" value="^[a-z][a-z0-9][a-zA-Z0-9_]*$"/> + <message key="name.invalidPattern" + value="Method name ''{0}'' must match pattern ''{1}''."/> + </module> + <module name="SingleLineJavadoc"> + <property name="ignoreInlineTags" value="false"/> + </module> + <module name="EmptyCatchBlock"> + <property name="exceptionVariableName" value="expected"/> + </module> + <module name="CommentsIndentation"> + <property name="tokens" value="BLOCK_COMMENT_BEGIN"/> + </module> + </module> +</module> http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-auth/src/main/java/org/apache/flume/api/SecureRpcClientFactory.java ---------------------------------------------------------------------- diff --git a/flume-ng-auth/src/main/java/org/apache/flume/api/SecureRpcClientFactory.java b/flume-ng-auth/src/main/java/org/apache/flume/api/SecureRpcClientFactory.java index c976458..35356cd 100644 --- a/flume-ng-auth/src/main/java/org/apache/flume/api/SecureRpcClientFactory.java +++ b/flume-ng-auth/src/main/java/org/apache/flume/api/SecureRpcClientFactory.java @@ -26,8 +26,8 @@ import java.util.Properties; public class SecureRpcClientFactory { /** - * Return a secure {@linkplain org.apache.flume.api.RpcClient} that uses Thrift for communicating with - * the next hop. + * Return a secure {@linkplain org.apache.flume.api.RpcClient} that uses Thrift for communicating + * with the next hop. * @param props * @return - An {@linkplain org.apache.flume.api.RpcClient} which uses thrift configured with the * given parameters. http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-auth/src/main/java/org/apache/flume/api/SecureThriftRpcClient.java ---------------------------------------------------------------------- diff --git a/flume-ng-auth/src/main/java/org/apache/flume/api/SecureThriftRpcClient.java b/flume-ng-auth/src/main/java/org/apache/flume/api/SecureThriftRpcClient.java index f31582c..395bc1f 100644 --- a/flume-ng-auth/src/main/java/org/apache/flume/api/SecureThriftRpcClient.java +++ b/flume-ng-auth/src/main/java/org/apache/flume/api/SecureThriftRpcClient.java @@ -22,7 +22,10 @@ import org.apache.flume.FlumeException; import org.apache.flume.auth.FlumeAuthenticationUtil; import org.apache.flume.auth.FlumeAuthenticator; import org.apache.flume.auth.PrivilegedExecutor; -import org.apache.thrift.transport.*; +import org.apache.thrift.transport.TSaslClientTransport; +import org.apache.thrift.transport.TSocket; +import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportException; import javax.security.auth.callback.CallbackHandler; import javax.security.sasl.Sasl; @@ -52,9 +55,9 @@ public class SecureThriftRpcClient extends ThriftRpcClient { String clientPrincipal = properties.getProperty(CLIENT_PRINCIPAL); String keytab = properties.getProperty(CLIENT_KEYTAB); this.privilegedExecutor = FlumeAuthenticationUtil.getAuthenticator(clientPrincipal, keytab); - if(!privilegedExecutor.isAuthenticated()) { + if (!privilegedExecutor.isAuthenticated()) { throw new FlumeException("Authentication failed in Kerberos mode for " + - "principal " + clientPrincipal + " keytab " + keytab); + "principal " + clientPrincipal + " keytab " + keytab); } } @@ -78,31 +81,33 @@ public class SecureThriftRpcClient extends ThriftRpcClient { */ public static class UgiSaslClientTransport extends TSaslClientTransport { PrivilegedExecutor privilegedExecutor; + public UgiSaslClientTransport(String mechanism, String authorizationId, String protocol, String serverName, Map<String, String> props, - CallbackHandler cbh, TTransport transport, PrivilegedExecutor privilegedExecutor) throws IOException { - super(mechanism, authorizationId, protocol, serverName, props, cbh, - transport); + CallbackHandler cbh, TTransport transport, PrivilegedExecutor privilegedExecutor) + throws IOException { + super(mechanism, authorizationId, protocol, serverName, props, cbh, transport); this.privilegedExecutor = privilegedExecutor; } - // open the SASL transport with using the current UserGroupInformation - // This is needed to get the current login context stored + /** + * Open the SASL transport with using the current UserGroupInformation. + * This is needed to get the current login context stored + */ @Override public void open() throws FlumeException { try { this.privilegedExecutor.execute( - new PrivilegedExceptionAction<Void>() { - public Void run() throws FlumeException { - // this is a workaround to using UgiSaslClientTransport.super.open() - // which results in IllegalAccessError - callSuperClassOpen(); - return null; - } - }); + new PrivilegedExceptionAction<Void>() { + public Void run() throws FlumeException { + // this is a workaround to using UgiSaslClientTransport.super.open() + // which results in IllegalAccessError + callSuperClassOpen(); + return null; + } + }); } catch (InterruptedException e) { - throw new FlumeException( - "Interrupted while opening underlying transport", e); + throw new FlumeException("Interrupted while opening underlying transport", e); } catch (Exception e) { throw new FlumeException("Failed to open SASL transport", e); } http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-auth/src/main/java/org/apache/flume/auth/FlumeAuthenticationUtil.java ---------------------------------------------------------------------- diff --git a/flume-ng-auth/src/main/java/org/apache/flume/auth/FlumeAuthenticationUtil.java b/flume-ng-auth/src/main/java/org/apache/flume/auth/FlumeAuthenticationUtil.java index 5627652..87cef31 100644 --- a/flume-ng-auth/src/main/java/org/apache/flume/auth/FlumeAuthenticationUtil.java +++ b/flume-ng-auth/src/main/java/org/apache/flume/auth/FlumeAuthenticationUtil.java @@ -50,10 +50,10 @@ public class FlumeAuthenticationUtil { * * @throws org.apache.flume.auth.SecurityException */ - public synchronized static FlumeAuthenticator getAuthenticator( + public static synchronized FlumeAuthenticator getAuthenticator( String principal, String keytab) throws SecurityException { - if(principal == null && keytab == null) { + if (principal == null && keytab == null) { return SimpleAuthenticator.getSimpleAuthenticator(); } @@ -62,7 +62,7 @@ public class FlumeAuthenticationUtil { Preconditions.checkArgument(keytab != null, "Keytab can not be null when Principal is provided"); - if(kerbAuthenticator == null) { + if (kerbAuthenticator == null) { kerbAuthenticator = new KerberosAuthenticator(); } kerbAuthenticator.authenticate(principal, keytab); http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-auth/src/main/java/org/apache/flume/auth/KerberosAuthenticator.java ---------------------------------------------------------------------- diff --git a/flume-ng-auth/src/main/java/org/apache/flume/auth/KerberosAuthenticator.java b/flume-ng-auth/src/main/java/org/apache/flume/auth/KerberosAuthenticator.java index 4a0e0f4..45091f5 100644 --- a/flume-ng-auth/src/main/java/org/apache/flume/auth/KerberosAuthenticator.java +++ b/flume-ng-auth/src/main/java/org/apache/flume/auth/KerberosAuthenticator.java @@ -18,6 +18,7 @@ package org.apache.flume.auth; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,7 +37,6 @@ import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; -import com.google.common.base.Preconditions; import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION; /** @@ -66,10 +66,10 @@ class KerberosAuthenticator implements FlumeAuthenticator { @Override public synchronized PrivilegedExecutor proxyAs(String proxyUserName) { - if(proxyUserName == null || proxyUserName.isEmpty()) { + if (proxyUserName == null || proxyUserName.isEmpty()) { return this; } - if(proxyCache.get(proxyUserName) == null) { + if (proxyCache.get(proxyUserName) == null) { UserGroupInformation proxyUgi; proxyUgi = UserGroupInformation.createProxyUser(proxyUserName, ugi); printUGI(proxyUgi); @@ -131,13 +131,13 @@ class KerberosAuthenticator implements FlumeAuthenticator { KerberosUser newUser = new KerberosUser(resolvedPrincipal, keytab); Preconditions.checkState(prevUser == null || prevUser.equals(newUser), - "Cannot use multiple kerberos principals in the same agent. " + - " Must restart agent to use new principal or keytab. " + - "Previous = %s, New = %s", prevUser, newUser); + "Cannot use multiple kerberos principals in the same agent. " + + " Must restart agent to use new principal or keytab. " + + "Previous = %s, New = %s", prevUser, newUser); // enable the kerberos mode of UGI, before doing anything else - if(!UserGroupInformation.isSecurityEnabled()) { + if (!UserGroupInformation.isSecurityEnabled()) { Configuration conf = new Configuration(false); conf.set(HADOOP_SECURITY_AUTHENTICATION, "kerberos"); UserGroupInformation.setConfiguration(conf); @@ -147,7 +147,7 @@ class KerberosAuthenticator implements FlumeAuthenticator { UserGroupInformation curUser = null; try { curUser = UserGroupInformation.getLoginUser(); - if(curUser != null && !curUser.hasKerberosCredentials()) { + if (curUser != null && !curUser.hasKerberosCredentials()) { curUser = null; } } catch (IOException e) { @@ -166,8 +166,8 @@ class KerberosAuthenticator implements FlumeAuthenticator { if (curUser != null && curUser.getUserName().equals(ugi.getUserName())) { LOG.debug("Using existing principal login: {}", ugi); } else { - LOG.info("Attempting kerberos Re-login as principal ({}) " - , new Object[] { ugi.getUserName() } ); + LOG.info("Attempting kerberos Re-login as principal ({}) ", + new Object[] { ugi.getUserName() } ); ugi.reloginFromKeytab(); } } else { @@ -192,9 +192,10 @@ class KerberosAuthenticator implements FlumeAuthenticator { // dump login information AuthenticationMethod authMethod = ugi.getAuthenticationMethod(); LOG.info("\n{} \nUser: {} \nAuth method: {} \nKeytab: {} \n", - new Object[]{ authMethod.equals(AuthenticationMethod.PROXY) ? - "Proxy as: " : "Logged as: ", ugi.getUserName(), authMethod, - ugi.isFromKeytab() } + new Object[] { + authMethod.equals(AuthenticationMethod.PROXY) ? "Proxy as: " : "Logged as: ", + ugi.getUserName(), authMethod, ugi.isFromKeytab() + } ); } } @@ -224,7 +225,7 @@ class KerberosAuthenticator implements FlumeAuthenticator { @VisibleForTesting String getUserName() { - if(ugi != null) { + if (ugi != null) { return ugi.getUserName(); } else { return null; http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-auth/src/main/java/org/apache/flume/auth/KerberosUser.java ---------------------------------------------------------------------- diff --git a/flume-ng-auth/src/main/java/org/apache/flume/auth/KerberosUser.java b/flume-ng-auth/src/main/java/org/apache/flume/auth/KerberosUser.java index dd37721..22852de 100644 --- a/flume-ng-auth/src/main/java/org/apache/flume/auth/KerberosUser.java +++ b/flume-ng-auth/src/main/java/org/apache/flume/auth/KerberosUser.java @@ -46,7 +46,9 @@ public class KerberosUser { return false; } final KerberosUser other = (KerberosUser) obj; - if ((this.principal == null) ? (other.principal != null) : !this.principal.equals(other.principal)) { + if ((this.principal == null) ? + (other.principal != null) : + !this.principal.equals(other.principal)) { return false; } if ((this.keyTab == null) ? (other.keyTab != null) : !this.keyTab.equals(other.keyTab)) { http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-auth/src/main/java/org/apache/flume/auth/SimpleAuthenticator.java ---------------------------------------------------------------------- diff --git a/flume-ng-auth/src/main/java/org/apache/flume/auth/SimpleAuthenticator.java b/flume-ng-auth/src/main/java/org/apache/flume/auth/SimpleAuthenticator.java index f7b5bea..d2791a1 100644 --- a/flume-ng-auth/src/main/java/org/apache/flume/auth/SimpleAuthenticator.java +++ b/flume-ng-auth/src/main/java/org/apache/flume/auth/SimpleAuthenticator.java @@ -57,10 +57,10 @@ class SimpleAuthenticator implements FlumeAuthenticator { @Override public synchronized PrivilegedExecutor proxyAs(String proxyUserName) { - if(proxyUserName == null || proxyUserName.isEmpty()) { + if (proxyUserName == null || proxyUserName.isEmpty()) { return this; } - if(proxyCache.get(proxyUserName) == null) { + if (proxyCache.get(proxyUserName) == null) { UserGroupInformation proxyUgi; try { proxyUgi = UserGroupInformation.createProxyUser(proxyUserName, http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-auth/src/main/java/org/apache/flume/auth/UGIExecutor.java ---------------------------------------------------------------------- diff --git a/flume-ng-auth/src/main/java/org/apache/flume/auth/UGIExecutor.java b/flume-ng-auth/src/main/java/org/apache/flume/auth/UGIExecutor.java index cd62b91..a6ebd86 100644 --- a/flume-ng-auth/src/main/java/org/apache/flume/auth/UGIExecutor.java +++ b/flume-ng-auth/src/main/java/org/apache/flume/auth/UGIExecutor.java @@ -55,7 +55,7 @@ class UGIExecutor implements PrivilegedExecutor { private void ensureValidAuth() { reloginUGI(ugi); - if(ugi.getAuthenticationMethod().equals(AuthenticationMethod.PROXY)) { + if (ugi.getAuthenticationMethod().equals(AuthenticationMethod.PROXY)) { reloginUGI(ugi.getRealUser()); } } @@ -70,9 +70,9 @@ class UGIExecutor implements PrivilegedExecutor { */ private void reloginUGI(UserGroupInformation ugi) { try { - if(ugi.hasKerberosCredentials()) { + if (ugi.hasKerberosCredentials()) { long now = System.currentTimeMillis(); - if(now - lastReloginAttempt < MIN_TIME_BEFORE_RELOGIN) { + if (now - lastReloginAttempt < MIN_TIME_BEFORE_RELOGIN) { return; } lastReloginAttempt = now; @@ -86,7 +86,7 @@ class UGIExecutor implements PrivilegedExecutor { @VisibleForTesting String getUserName() { - if(ugi != null) { + if (ugi != null) { return ugi.getUserName(); } else { return null; http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/BadCheckpointException.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/BadCheckpointException.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/BadCheckpointException.java index 588506a..b75c29e 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/BadCheckpointException.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/BadCheckpointException.java @@ -24,12 +24,13 @@ import org.apache.flume.FlumeException; * Exception thrown when the checkpoint directory contains invalid data, * probably due to the channel stopping while the checkpoint was written. */ -public class BadCheckpointException extends FlumeException{ +public class BadCheckpointException extends FlumeException { private static final long serialVersionUID = -5038652693746472779L; public BadCheckpointException(String msg) { super(msg); } + public BadCheckpointException(String msg, Throwable t) { super(msg, t); } http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java index b961ae2..a0ecdeb 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java @@ -23,12 +23,6 @@ import com.google.common.collect.HashMultimap; import com.google.common.collect.Lists; import com.google.common.collect.SetMultimap; import com.google.common.collect.Sets; - -import java.io.EOFException; -import java.io.File; -import java.io.IOException; -import java.util.List; -import java.util.Set; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.GnuParser; @@ -37,26 +31,28 @@ import org.apache.commons.cli.Options; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.EOFException; +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.Set; + public class CheckpointRebuilder { private final List<File> logFiles; private final FlumeEventQueue queue; - private final Set<ComparableFlumeEventPointer> committedPuts = - Sets.newHashSet(); - private final Set<ComparableFlumeEventPointer> pendingTakes = - Sets.newHashSet(); + private final Set<ComparableFlumeEventPointer> committedPuts = Sets.newHashSet(); + private final Set<ComparableFlumeEventPointer> pendingTakes = Sets.newHashSet(); private final SetMultimap<Long, ComparableFlumeEventPointer> uncommittedPuts = - HashMultimap.create(); + HashMultimap.create(); private final SetMultimap<Long, ComparableFlumeEventPointer> - uncommittedTakes = HashMultimap.create(); + uncommittedTakes = HashMultimap.create(); private final boolean fsyncPerTransaction; - private static Logger LOG = - LoggerFactory.getLogger(CheckpointRebuilder.class); + private static Logger LOG = LoggerFactory.getLogger(CheckpointRebuilder.class); - public CheckpointRebuilder(List<File> logFiles, - FlumeEventQueue queue, boolean fsyncPerTransaction) throws - IOException { + public CheckpointRebuilder(List<File> logFiles, FlumeEventQueue queue, + boolean fsyncPerTransaction) throws IOException { this.logFiles = logFiles; this.queue = queue; this.fsyncPerTransaction = fsyncPerTransaction; @@ -68,8 +64,8 @@ public class CheckpointRebuilder { for (File logFile : logFiles) { try { logReaders.add(LogFileFactory.getSequentialReader(logFile, null, - fsyncPerTransaction)); - } catch(EOFException e) { + fsyncPerTransaction)); + } catch (EOFException e) { LOG.warn("Ignoring " + logFile + " due to EOF", e); } } @@ -84,27 +80,24 @@ public class CheckpointRebuilder { TransactionEventRecord record = entry.getEvent(); long trans = record.getTransactionID(); long writeOrderID = record.getLogWriteOrderID(); - transactionIDSeed = Math.max(trans, transactionIDSeed); - writeOrderIDSeed = Math.max(writeOrderID, writeOrderIDSeed); + transactionIDSeed = Math.max(trans, transactionIDSeed); + writeOrderIDSeed = Math.max(writeOrderID, writeOrderIDSeed); if (record.getRecordType() == TransactionEventRecord.Type.PUT.get()) { uncommittedPuts.put(record.getTransactionID(), - new ComparableFlumeEventPointer( + new ComparableFlumeEventPointer( new FlumeEventPointer(fileID, offset), record.getLogWriteOrderID())); - } else if (record.getRecordType() - == TransactionEventRecord.Type.TAKE.get()) { + } else if (record.getRecordType() == TransactionEventRecord.Type.TAKE.get()) { Take take = (Take) record; uncommittedTakes.put(record.getTransactionID(), - new ComparableFlumeEventPointer( + new ComparableFlumeEventPointer( new FlumeEventPointer(take.getFileID(), take.getOffset()), record.getLogWriteOrderID())); - } else if (record.getRecordType() - == TransactionEventRecord.Type.COMMIT.get()) { + } else if (record.getRecordType() == TransactionEventRecord.Type.COMMIT.get()) { Commit commit = (Commit) record; - if (commit.getType() - == TransactionEventRecord.Type.PUT.get()) { + if (commit.getType() == TransactionEventRecord.Type.PUT.get()) { Set<ComparableFlumeEventPointer> puts = - uncommittedPuts.get(record.getTransactionID()); + uncommittedPuts.get(record.getTransactionID()); if (puts != null) { for (ComparableFlumeEventPointer put : puts) { if (!pendingTakes.remove(put)) { @@ -114,7 +107,7 @@ public class CheckpointRebuilder { } } else { Set<ComparableFlumeEventPointer> takes = - uncommittedTakes.get(record.getTransactionID()); + uncommittedTakes.get(record.getTransactionID()); if (takes != null) { for (ComparableFlumeEventPointer take : takes) { if (!committedPuts.remove(take)) { @@ -123,8 +116,7 @@ public class CheckpointRebuilder { } } } - } else if (record.getRecordType() - == TransactionEventRecord.Type.ROLLBACK.get()) { + } else if (record.getRecordType() == TransactionEventRecord.Type.ROLLBACK.get()) { if (uncommittedPuts.containsKey(record.getTransactionID())) { uncommittedPuts.removeAll(record.getTransactionID()); } else { @@ -134,18 +126,16 @@ public class CheckpointRebuilder { } } } catch (Exception e) { - LOG.warn("Error while generating checkpoint " - + "using fast generation logic", e); + LOG.warn("Error while generating checkpoint using fast generation logic", e); return false; } finally { - TransactionIDOracle.setSeed(transactionIDSeed); - WriteOrderOracle.setSeed(writeOrderIDSeed); + TransactionIDOracle.setSeed(transactionIDSeed); + WriteOrderOracle.setSeed(writeOrderIDSeed); for (LogFile.SequentialReader reader : logReaders) { reader.close(); } } - Set<ComparableFlumeEventPointer> sortedPuts = - Sets.newTreeSet(committedPuts); + Set<ComparableFlumeEventPointer> sortedPuts = Sets.newTreeSet(committedPuts); int count = 0; for (ComparableFlumeEventPointer put : sortedPuts) { queue.addTail(put.pointer); @@ -159,9 +149,9 @@ public class CheckpointRebuilder { long checkpointLogOrderID = 0; List<LogFile.MetaDataWriter> metaDataWriters = Lists.newArrayList(); for (File logFile : logFiles) { - String name = logFile.getName(); - metaDataWriters.add(LogFileFactory.getMetaDataWriter(logFile, - Integer.parseInt(name.substring(name.lastIndexOf('-') + 1)))); + String name = logFile.getName(); + metaDataWriters.add(LogFileFactory.getMetaDataWriter(logFile, + Integer.parseInt(name.substring(name.lastIndexOf('-') + 1)))); } try { if (queue.checkpoint(true)) { @@ -171,8 +161,7 @@ public class CheckpointRebuilder { } } } catch (Exception e) { - LOG.warn("Error while generating checkpoint " - + "using fast generation logic", e); + LOG.warn("Error while generating checkpoint using fast generation logic", e); } finally { for (LogFile.MetaDataWriter metaDataWriter : metaDataWriters) { metaDataWriter.close(); @@ -181,14 +170,14 @@ public class CheckpointRebuilder { } private final class ComparableFlumeEventPointer - implements Comparable<ComparableFlumeEventPointer> { + implements Comparable<ComparableFlumeEventPointer> { private final FlumeEventPointer pointer; private final long orderID; - public ComparableFlumeEventPointer(FlumeEventPointer pointer, long orderID){ + public ComparableFlumeEventPointer(FlumeEventPointer pointer, long orderID) { Preconditions.checkNotNull(pointer, "FlumeEventPointer cannot be" - + "null while creating a ComparableFlumeEventPointer"); + + "null while creating a ComparableFlumeEventPointer"); this.pointer = pointer; this.orderID = orderID; } @@ -204,22 +193,22 @@ public class CheckpointRebuilder { } @Override - public int hashCode(){ + public int hashCode() { return pointer.hashCode(); } @Override - public boolean equals(Object o){ - if(this == o){ + public boolean equals(Object o) { + if (this == o) { return true; } - if(o == null){ + if (o == null) { return false; } - if(o.getClass() != this.getClass()){ + if (o.getClass() != this.getClass()) { return false; } - return pointer.equals(((ComparableFlumeEventPointer)o).pointer); + return pointer.equals(((ComparableFlumeEventPointer) o).pointer); } } @@ -245,20 +234,19 @@ public class CheckpointRebuilder { } int capacity = Integer.parseInt(cli.getOptionValue("t")); File checkpointFile = new File(checkpointDir, "checkpoint"); - if(checkpointFile.exists()) { + if (checkpointFile.exists()) { LOG.error("Cannot execute fast replay", - new IllegalStateException("Checkpoint exists" + checkpointFile)); + new IllegalStateException("Checkpoint exists" + checkpointFile)); } else { EventQueueBackingStore backingStore = EventQueueBackingStoreFactory.get(checkpointFile, capacity, "channel"); FlumeEventQueue queue = new FlumeEventQueue(backingStore, - new File(checkpointDir, "inflighttakes"), - new File(checkpointDir, "inflightputs"), - new File(checkpointDir, Log.QUEUE_SET)); - CheckpointRebuilder rebuilder = new CheckpointRebuilder(logFiles, - queue, true); - if(rebuilder.rebuild()) { + new File(checkpointDir, "inflighttakes"), + new File(checkpointDir, "inflightputs"), + new File(checkpointDir, Log.QUEUE_SET)); + CheckpointRebuilder rebuilder = new CheckpointRebuilder(logFiles, queue, true); + if (rebuilder.rebuild()) { rebuilder.writeCheckpoint(); } else { LOG.error("Could not rebuild the checkpoint due to errors."); http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Commit.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Commit.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Commit.java index 3663244..8fd53cc 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Commit.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Commit.java @@ -36,44 +36,52 @@ class Commit extends TransactionEventRecord { * Type of Commit Take|Put */ private short type; + Commit(Long transactionID, Long logWriteOrderID) { super(transactionID, logWriteOrderID); } + Commit(Long transactionID, Long logWriteOrderID, short type) { this(transactionID, logWriteOrderID); this.type = type; } + @Override public void readFields(DataInput in) throws IOException { super.readFields(in); type = in.readShort(); } + @Override void writeProtos(OutputStream out) throws IOException { - ProtosFactory.Commit.Builder commitBuilder = - ProtosFactory.Commit.newBuilder(); + ProtosFactory.Commit.Builder commitBuilder = ProtosFactory.Commit.newBuilder(); commitBuilder.setType(type); commitBuilder.build().writeDelimitedTo(out); } + @Override void readProtos(InputStream in) throws IOException { - ProtosFactory.Commit commit = Preconditions.checkNotNull(ProtosFactory. - Commit.parseDelimitedFrom(in), "Commit cannot be null"); + ProtosFactory.Commit commit = + Preconditions.checkNotNull(ProtosFactory.Commit.parseDelimitedFrom(in), + "Commit cannot be null"); type = (short) commit.getType(); } short getType() { return type; } + @Override public void write(DataOutput out) throws IOException { super.write(out); out.writeShort(type); } + @Override short getRecordType() { return Type.COMMIT.get(); } + @Override public String toString() { StringBuilder builder = new StringBuilder(); http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CorruptEventException.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CorruptEventException.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CorruptEventException.java index 691d291..5438f2e 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CorruptEventException.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CorruptEventException.java @@ -18,10 +18,9 @@ */ package org.apache.flume.channel.file; - public class CorruptEventException extends Exception { - private static final long serialVersionUID = -2986946303540798416L; + public CorruptEventException() { super(); } http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFactory.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFactory.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFactory.java index 456df34..dcd6f98 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFactory.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFactory.java @@ -18,92 +18,91 @@ */ package org.apache.flume.channel.file; -import java.io.File; -import java.io.IOException; -import java.io.RandomAccessFile; - +import com.google.common.io.Files; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.io.Files; +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; class EventQueueBackingStoreFactory { - private static final Logger LOG = LoggerFactory - .getLogger(EventQueueBackingStoreFactory.class); + private static final Logger LOG = LoggerFactory.getLogger(EventQueueBackingStoreFactory.class); + private EventQueueBackingStoreFactory() {} + static EventQueueBackingStore get(File checkpointFile, int capacity, - String name) throws Exception { + String name) throws Exception { return get(checkpointFile, capacity, name, true); } static EventQueueBackingStore get(File checkpointFile, int capacity, - String name, boolean upgrade) throws Exception { + String name, boolean upgrade) throws Exception { return get(checkpointFile, null, capacity, name, upgrade, false, false); } - static EventQueueBackingStore get(File checkpointFile, - File backupCheckpointDir, int capacity,String name, - boolean upgrade, boolean shouldBackup, boolean compressBackup) - throws Exception { + + static EventQueueBackingStore get(File checkpointFile, File backupCheckpointDir, + int capacity, String name, boolean upgrade, + boolean shouldBackup, boolean compressBackup) throws Exception { File metaDataFile = Serialization.getMetaDataFile(checkpointFile); RandomAccessFile checkpointFileHandle = null; try { boolean checkpointExists = checkpointFile.exists(); boolean metaDataExists = metaDataFile.exists(); - if(metaDataExists) { + if (metaDataExists) { // if we have a metadata file but no checkpoint file, we have a problem // delete everything in the checkpoint directory and force // a full replay. - if(!checkpointExists || checkpointFile.length() == 0) { + if (!checkpointExists || checkpointFile.length() == 0) { LOG.warn("MetaData file for checkpoint " - + " exists but checkpoint does not. Checkpoint = " + checkpointFile - + ", metaDataFile = " + metaDataFile); + + " exists but checkpoint does not. Checkpoint = " + checkpointFile + + ", metaDataFile = " + metaDataFile); throw new BadCheckpointException( - "The last checkpoint was not completed correctly, " + - "since Checkpoint file does not exist while metadata " + - "file does."); + "The last checkpoint was not completed correctly, " + + "since Checkpoint file does not exist while metadata " + + "file does."); } } // brand new, use v3 - if(!checkpointExists) { - if(!checkpointFile.createNewFile()) { + if (!checkpointExists) { + if (!checkpointFile.createNewFile()) { throw new IOException("Cannot create " + checkpointFile); } return new EventQueueBackingStoreFileV3(checkpointFile, capacity, name, backupCheckpointDir, shouldBackup, compressBackup); } // v3 due to meta file, version will be checked by backing store - if(metaDataExists) { + if (metaDataExists) { return new EventQueueBackingStoreFileV3(checkpointFile, capacity, - name, backupCheckpointDir, shouldBackup, compressBackup); + name, backupCheckpointDir, shouldBackup, compressBackup); } checkpointFileHandle = new RandomAccessFile(checkpointFile, "r"); - int version = (int)checkpointFileHandle.readLong(); - if(Serialization.VERSION_2 == version) { - if(upgrade) { + int version = (int) checkpointFileHandle.readLong(); + if (Serialization.VERSION_2 == version) { + if (upgrade) { return upgrade(checkpointFile, capacity, name, backupCheckpointDir, - shouldBackup, compressBackup); + shouldBackup, compressBackup); } return new EventQueueBackingStoreFileV2(checkpointFile, capacity, name); } LOG.error("Found version " + Integer.toHexString(version) + " in " + checkpointFile); throw new BadCheckpointException("Checkpoint file exists with " + - Serialization.VERSION_3 + " but no metadata file found."); + Serialization.VERSION_3 + " but no metadata file found."); } finally { - if(checkpointFileHandle != null) { + if (checkpointFileHandle != null) { try { checkpointFileHandle.close(); - } catch(IOException e) { + } catch (IOException e) { LOG.warn("Unable to close " + checkpointFile, e); } } } } - private static EventQueueBackingStore upgrade(File checkpointFile, - int capacity, String name, File backupCheckpointDir, - boolean shouldBackup, boolean compressBackup) - throws Exception { + private static EventQueueBackingStore upgrade(File checkpointFile, int capacity, String name, + File backupCheckpointDir, boolean shouldBackup, + boolean compressBackup) throws Exception { LOG.info("Attempting upgrade of " + checkpointFile + " for " + name); EventQueueBackingStoreFileV2 backingStoreV2 = new EventQueueBackingStoreFileV2(checkpointFile, capacity, name); @@ -115,7 +114,7 @@ class EventQueueBackingStoreFactory { EventQueueBackingStoreFileV3.upgrade(backingStoreV2, checkpointFile, metaDataFile); return new EventQueueBackingStoreFileV3(checkpointFile, capacity, name, - backupCheckpointDir, shouldBackup, compressBackup); + backupCheckpointDir, shouldBackup, compressBackup); } } http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java index 2b0987b..73f1d4c 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java @@ -18,6 +18,15 @@ */ package org.apache.flume.channel.file; +import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableSortedSet; +import com.google.common.collect.Maps; +import com.google.common.collect.SetMultimap; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.File; import java.io.IOException; import java.io.RandomAccessFile; @@ -34,21 +43,9 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import com.google.common.base.Throwables; -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableSortedSet; -import com.google.common.collect.Maps; -import com.google.common.collect.SetMultimap; - - abstract class EventQueueBackingStoreFile extends EventQueueBackingStore { - private static final Logger LOG = LoggerFactory - .getLogger(EventQueueBackingStoreFile.class); - private static final int MAX_ALLOC_BUFFER_SIZE = 2*1024*1024; // 2MB + private static final Logger LOG = LoggerFactory.getLogger(EventQueueBackingStoreFile.class); + private static final int MAX_ALLOC_BUFFER_SIZE = 2 * 1024 * 1024; // 2MB protected static final int HEADER_SIZE = 1029; protected static final int INDEX_VERSION = 0; protected static final int INDEX_WRITE_ORDER_ID = 1; @@ -71,15 +68,15 @@ abstract class EventQueueBackingStoreFile extends EventQueueBackingStore { private final ExecutorService checkpointBackUpExecutor; protected EventQueueBackingStoreFile(int capacity, String name, - File checkpointFile) throws IOException, + File checkpointFile) throws IOException, BadCheckpointException { this(capacity, name, checkpointFile, null, false, false); } protected EventQueueBackingStoreFile(int capacity, String name, - File checkpointFile, File checkpointBackupDir, - boolean backupCheckpoint, boolean compressBackup) - throws IOException, BadCheckpointException { + File checkpointFile, File checkpointBackupDir, + boolean backupCheckpoint, boolean compressBackup) + throws IOException, BadCheckpointException { super(capacity, name); this.checkpointFile = checkpointFile; this.shouldBackup = backupCheckpoint; @@ -87,7 +84,7 @@ abstract class EventQueueBackingStoreFile extends EventQueueBackingStore { this.backupDir = checkpointBackupDir; checkpointFileHandle = new RandomAccessFile(checkpointFile, "rw"); long totalBytes = (capacity + HEADER_SIZE) * Serialization.SIZE_OF_LONG; - if(checkpointFileHandle.length() == 0) { + if (checkpointFileHandle.length() == 0) { allocate(checkpointFile, totalBytes); checkpointFileHandle.seek(INDEX_VERSION * Serialization.SIZE_OF_LONG); checkpointFileHandle.writeLong(getVersion()); @@ -95,7 +92,7 @@ abstract class EventQueueBackingStoreFile extends EventQueueBackingStore { LOG.info("Preallocated " + checkpointFile + " to " + checkpointFileHandle.length() + " for capacity " + capacity); } - if(checkpointFile.length() != totalBytes) { + if (checkpointFile.length() != totalBytes) { String msg = "Configured capacity is " + capacity + " but the " + " checkpoint file capacity is " + ((checkpointFile.length() / Serialization.SIZE_OF_LONG) - HEADER_SIZE) @@ -108,20 +105,20 @@ abstract class EventQueueBackingStoreFile extends EventQueueBackingStore { elementsBuffer = mappedBuffer.asLongBuffer(); long version = elementsBuffer.get(INDEX_VERSION); - if(version != (long) getVersion()) { + if (version != (long) getVersion()) { throw new BadCheckpointException("Invalid version: " + version + " " + - name + ", expected " + getVersion()); + name + ", expected " + getVersion()); } long checkpointComplete = elementsBuffer.get(INDEX_CHECKPOINT_MARKER); - if(checkpointComplete != (long) CHECKPOINT_COMPLETE) { + if (checkpointComplete != (long) CHECKPOINT_COMPLETE) { throw new BadCheckpointException("Checkpoint was not completed correctly," - + " probably because the agent stopped while the channel was" - + " checkpointing."); + + " probably because the agent stopped while the channel was" + + " checkpointing."); } if (shouldBackup) { checkpointBackUpExecutor = Executors.newSingleThreadExecutor( - new ThreadFactoryBuilder().setNameFormat( - getName() + " - CheckpointBackUpThread").build()); + new ThreadFactoryBuilder().setNameFormat( + getName() + " - CheckpointBackUpThread").build()); } else { checkpointBackUpExecutor = null; } @@ -142,13 +139,13 @@ abstract class EventQueueBackingStoreFile extends EventQueueBackingStore { * @param backupDirectory - the directory to which the backup files should be * copied. * @throws IOException - if the copy failed, or if there is not enough disk - * space to copy the checkpoint files over. + * space to copy the checkpoint files over. */ protected void backupCheckpoint(File backupDirectory) throws IOException { int availablePermits = backupCompletedSema.drainPermits(); Preconditions.checkState(availablePermits == 0, - "Expected no permits to be available in the backup semaphore, " + - "but " + availablePermits + " permits were available."); + "Expected no permits to be available in the backup semaphore, " + + "but " + availablePermits + " permits were available."); if (slowdownBackup) { try { TimeUnit.SECONDS.sleep(10); @@ -160,45 +157,45 @@ abstract class EventQueueBackingStoreFile extends EventQueueBackingStore { if (backupExists(backupDirectory)) { if (!backupFile.delete()) { throw new IOException("Error while doing backup of checkpoint. Could " + - "not remove" + backupFile.toString() + "."); + "not remove" + backupFile.toString() + "."); } } Serialization.deleteAllFiles(backupDirectory, Log.EXCLUDES); File checkpointDir = checkpointFile.getParentFile(); File[] checkpointFiles = checkpointDir.listFiles(); Preconditions.checkNotNull(checkpointFiles, "Could not retrieve files " + - "from the checkpoint directory. Cannot complete backup of the " + - "checkpoint."); + "from the checkpoint directory. Cannot complete backup of the " + + "checkpoint."); for (File origFile : checkpointFiles) { - if(Log.EXCLUDES.contains(origFile.getName())) { + if (Log.EXCLUDES.contains(origFile.getName())) { continue; } if (compressBackup && origFile.equals(checkpointFile)) { Serialization.compressFile(origFile, new File(backupDirectory, - origFile.getName() + COMPRESSED_FILE_EXTENSION)); + origFile.getName() + COMPRESSED_FILE_EXTENSION)); } else { Serialization.copyFile(origFile, new File(backupDirectory, - origFile.getName())); + origFile.getName())); } } Preconditions.checkState(!backupFile.exists(), "The backup file exists " + - "while it is not supposed to. Are multiple channels configured to use " + - "this directory: " + backupDirectory.toString() + " as backup?"); + "while it is not supposed to. Are multiple channels configured to use " + + "this directory: " + backupDirectory.toString() + " as backup?"); if (!backupFile.createNewFile()) { LOG.error("Could not create backup file. Backup of checkpoint will " + - "not be used during replay even if checkpoint is bad."); + "not be used during replay even if checkpoint is bad."); } } /** * Restore the checkpoint, if it is found to be bad. + * * @return true - if the previous backup was successfully completed and * restore was successfully completed. * @throws IOException - If restore failed due to IOException - * */ public static boolean restoreBackup(File checkpointDir, File backupDir) - throws IOException { + throws IOException { if (!backupExists(backupDir)) { return false; } @@ -210,14 +207,14 @@ abstract class EventQueueBackingStoreFile extends EventQueueBackingStore { for (File backupFile : backupFiles) { String fileName = backupFile.getName(); if (!fileName.equals(BACKUP_COMPLETE_FILENAME) && - !fileName.equals(Log.FILE_LOCK)) { - if (fileName.endsWith(COMPRESSED_FILE_EXTENSION)){ + !fileName.equals(Log.FILE_LOCK)) { + if (fileName.endsWith(COMPRESSED_FILE_EXTENSION)) { Serialization.decompressFile( - backupFile, new File(checkpointDir, - fileName.substring(0, fileName.lastIndexOf(".")))); + backupFile, new File(checkpointDir, + fileName.substring(0, fileName.lastIndexOf(".")))); } else { Serialization.copyFile(backupFile, new File(checkpointDir, - fileName)); + fileName)); } } } @@ -233,14 +230,14 @@ abstract class EventQueueBackingStoreFile extends EventQueueBackingStore { if (shouldBackup) { int permits = backupCompletedSema.drainPermits(); Preconditions.checkState(permits <= 1, "Expected only one or less " + - "permits to checkpoint, but got " + String.valueOf(permits) + - " permits"); - if(permits < 1) { + "permits to checkpoint, but got " + String.valueOf(permits) + + " permits"); + if (permits < 1) { // Force the checkpoint to not happen by throwing an exception. throw new IOException("Previous backup of checkpoint files is still " + - "in progress. Will attempt to checkpoint only at the end of the " + - "next checkpoint interval. Try increasing the checkpoint interval " + - "if this error happens often."); + "in progress. Will attempt to checkpoint only at the end of the " + + "next checkpoint interval. Try increasing the checkpoint interval " + + "if this error happens often."); } } // Start checkpoint @@ -249,12 +246,12 @@ abstract class EventQueueBackingStoreFile extends EventQueueBackingStore { } @Override - void checkpoint() throws IOException { + void checkpoint() throws IOException { setLogWriteOrderID(WriteOrderOracle.next()); LOG.info("Updating checkpoint metadata: logWriteOrderID: " + getLogWriteOrderID() + ", queueSize: " + getSize() + ", queueHead: " - + getHead()); + + getHead()); elementsBuffer.put(INDEX_WRITE_ORDER_ID, getLogWriteOrderID()); try { writeCheckpointMetaData(); @@ -286,8 +283,8 @@ abstract class EventQueueBackingStoreFile extends EventQueueBackingStore { */ private void startBackupThread() { Preconditions.checkNotNull(checkpointBackUpExecutor, - "Expected the checkpoint backup exector to be non-null, " + - "but it is null. Checkpoint will not be backed up."); + "Expected the checkpoint backup exector to be non-null, " + + "but it is null. Checkpoint will not be backed up."); LOG.info("Attempting to back up checkpoint."); checkpointBackUpExecutor.submit(new Runnable() { @@ -317,16 +314,14 @@ abstract class EventQueueBackingStoreFile extends EventQueueBackingStore { } catch (IOException e) { LOG.info("Error closing " + checkpointFile, e); } - if(checkpointBackUpExecutor != null && !checkpointBackUpExecutor - .isShutdown()) { + if (checkpointBackUpExecutor != null && !checkpointBackUpExecutor.isShutdown()) { checkpointBackUpExecutor.shutdown(); try { // Wait till the executor dies. - while (!checkpointBackUpExecutor.awaitTermination(1, - TimeUnit.SECONDS)); + while (!checkpointBackUpExecutor.awaitTermination(1, TimeUnit.SECONDS)) {} } catch (InterruptedException ex) { LOG.warn("Interrupted while waiting for checkpoint backup to " + - "complete"); + "complete"); } } } @@ -362,18 +357,19 @@ abstract class EventQueueBackingStoreFile extends EventQueueBackingStore { @Override protected void incrementFileID(int fileID) { AtomicInteger counter = logFileIDReferenceCounts.get(fileID); - if(counter == null) { + if (counter == null) { counter = new AtomicInteger(0); logFileIDReferenceCounts.put(fileID, counter); } counter.incrementAndGet(); } + @Override protected void decrementFileID(int fileID) { AtomicInteger counter = logFileIDReferenceCounts.get(fileID); Preconditions.checkState(counter != null, "null counter "); int count = counter.decrementAndGet(); - if(count == 0) { + if (count == 0) { logFileIDReferenceCounts.remove(fileID); } } @@ -391,7 +387,7 @@ abstract class EventQueueBackingStoreFile extends EventQueueBackingStore { * totalBytes <= MAX_ALLOC_BUFFER_SIZE, so this can be cast to int * without a problem. */ - checkpointFile.write(new byte[(int)totalBytes]); + checkpointFile.write(new byte[(int) totalBytes]); } else { byte[] initBuffer = new byte[MAX_ALLOC_BUFFER_SIZE]; long remainingBytes = totalBytes; @@ -404,7 +400,7 @@ abstract class EventQueueBackingStoreFile extends EventQueueBackingStore { * so casting to int is fine. */ if (remainingBytes > 0) { - checkpointFile.write(initBuffer, 0, (int)remainingBytes); + checkpointFile.write(initBuffer, 0, (int) remainingBytes); } } success = true; @@ -412,7 +408,7 @@ abstract class EventQueueBackingStoreFile extends EventQueueBackingStore { try { checkpointFile.close(); } catch (IOException e) { - if(success) { + if (success) { throw e; } } @@ -436,9 +432,9 @@ abstract class EventQueueBackingStoreFile extends EventQueueBackingStore { } int capacity = (int) ((file.length() - (HEADER_SIZE * 8L)) / 8L); EventQueueBackingStoreFile backingStore = (EventQueueBackingStoreFile) - EventQueueBackingStoreFactory.get(file,capacity, "debug", false); + EventQueueBackingStoreFactory.get(file, capacity, "debug", false); System.out.println("File Reference Counts" - + backingStore.logFileIDReferenceCounts); + + backingStore.logFileIDReferenceCounts); System.out.println("Queue Capacity " + backingStore.getCapacity()); System.out.println("Queue Size " + backingStore.getSize()); System.out.println("Queue Head " + backingStore.getHead()); @@ -447,7 +443,7 @@ abstract class EventQueueBackingStoreFile extends EventQueueBackingStore { int fileID = (int) (value >>> 32); int offset = (int) value; System.out.println(index + ":" + Long.toHexString(value) + " fileID = " - + fileID + ", offset = " + offset); + + fileID + ", offset = " + offset); } FlumeEventQueue queue = new FlumeEventQueue(backingStore, inflightTakesFile, inflightPutsFile, @@ -462,7 +458,7 @@ abstract class EventQueueBackingStoreFile extends EventQueueBackingStore { int fileID = (int) (value >>> 32); int offset = (int) value; System.out.println(Long.toHexString(value) + " fileID = " - + fileID + ", offset = " + offset); + + fileID + ", offset = " + offset); } } SetMultimap<Long, Long> takeMap = queue.deserializeInflightTakes(); @@ -474,7 +470,7 @@ abstract class EventQueueBackingStoreFile extends EventQueueBackingStore { int fileID = (int) (value >>> 32); int offset = (int) value; System.out.println(Long.toHexString(value) + " fileID = " - + fileID + ", offset = " + offset); + + fileID + ", offset = " + offset); } } } http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV2.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV2.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV2.java index abd2ea3..71183aa 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV2.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV2.java @@ -28,7 +28,6 @@ import com.google.common.base.Preconditions; final class EventQueueBackingStoreFileV2 extends EventQueueBackingStoreFile { - private static final int INDEX_SIZE = 2; private static final int INDEX_HEAD = 3; private static final int INDEX_ACTIVE_LOG = 5; @@ -55,6 +54,7 @@ final class EventQueueBackingStoreFileV2 extends EventQueueBackingStoreFile { } } } + @Override protected int getVersion() { return Serialization.VERSION_2;
