Add Static Analysis to warn on unsafe use of Autocloseable instances Patch by tjake and carlyeks, reviewed by benedict for CASSANDRA-9431
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7aafe053 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7aafe053 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7aafe053 Branch: refs/heads/trunk Commit: 7aafe053e7ffffc3b2e4ac1b2a444749df3dbbaa Parents: 6fe6c99 Author: T Jake Luciani <[email protected]> Authored: Wed May 20 10:23:18 2015 -0400 Committer: T Jake Luciani <[email protected]> Committed: Wed May 27 17:53:26 2015 -0400 ---------------------------------------------------------------------- CHANGES.txt | 1 + build.xml | 44 +++- eclipse_compiler.properties | 88 ++++++++ .../apache/cassandra/cache/AutoSavingCache.java | 1 + .../cassandra/cache/SerializingCache.java | 5 + .../apache/cassandra/db/BatchlogManager.java | 7 +- .../org/apache/cassandra/db/ColumnFamily.java | 8 +- .../apache/cassandra/db/ColumnFamilyStore.java | 31 +-- src/java/org/apache/cassandra/db/Memtable.java | 30 +-- .../cassandra/db/MutationVerbHandler.java | 24 +- .../apache/cassandra/db/RangeSliceReply.java | 5 +- .../cassandra/db/SizeEstimatesRecorder.java | 17 +- .../org/apache/cassandra/db/SystemKeyspace.java | 8 +- .../cassandra/db/commitlog/CommitLog.java | 8 +- .../db/commitlog/CommitLogArchiver.java | 10 +- .../db/commitlog/CommitLogReplayer.java | 16 +- .../db/commitlog/CommitLogSegment.java | 1 + .../compaction/AbstractCompactionStrategy.java | 1 + .../db/compaction/CompactionManager.java | 222 +++++++++++-------- .../cassandra/db/compaction/CompactionTask.java | 71 +++--- .../DateTieredCompactionStrategy.java | 3 + .../db/compaction/LazilyCompactedRow.java | 15 +- .../compaction/LeveledCompactionStrategy.java | 2 + .../cassandra/db/compaction/Scrubber.java | 3 + .../SizeTieredCompactionStrategy.java | 3 + .../cassandra/db/compaction/Upgrader.java | 12 +- .../compaction/WrappingCompactionStrategy.java | 1 + .../writers/DefaultCompactionWriter.java | 2 + .../writers/MajorLeveledCompactionWriter.java | 3 + .../writers/MaxSSTableSizeWriter.java | 3 + .../SplittingSizeTieredCompactionWriter.java | 3 + .../cassandra/db/marshal/CompositeType.java | 3 +- .../apache/cassandra/gms/FailureDetector.java | 8 +- .../hadoop/AbstractColumnFamilyInputFormat.java | 15 +- .../hadoop/ColumnFamilyInputFormat.java | 1 + .../hadoop/ColumnFamilyOutputFormat.java | 1 + .../hadoop/ColumnFamilyRecordReader.java | 1 + .../hadoop/ColumnFamilyRecordWriter.java | 1 + .../apache/cassandra/hadoop/ConfigHelper.java | 1 + .../cassandra/hadoop/cql3/CqlConfigHelper.java | 11 +- .../cassandra/hadoop/cql3/CqlRecordWriter.java | 48 ++-- .../cassandra/hadoop/pig/CqlNativeStorage.java | 21 +- .../io/compress/CompressionMetadata.java | 65 +++--- .../cassandra/io/sstable/CQLSSTableWriter.java | 1 + .../cassandra/io/sstable/IndexSummary.java | 14 +- .../io/sstable/IndexSummaryBuilder.java | 1 + .../io/sstable/IndexSummaryManager.java | 1 + .../apache/cassandra/io/sstable/SSTable.java | 8 +- .../cassandra/io/sstable/SSTableLoader.java | 5 +- .../io/sstable/format/SSTableReader.java | 62 +++--- .../io/sstable/format/big/BigTableReader.java | 22 +- .../io/sstable/format/big/BigTableWriter.java | 8 +- .../format/big/SSTableNamesIterator.java | 2 + .../io/util/DataIntegrityMetadata.java | 8 +- .../cassandra/io/util/DataOutputBuffer.java | 2 +- .../cassandra/io/util/PoolingSegmentedFile.java | 1 + .../cassandra/io/util/RandomAccessReader.java | 4 + .../cassandra/io/util/SafeMemoryWriter.java | 2 +- .../cassandra/locator/CloudstackSnitch.java | 13 +- .../cassandra/locator/PropertyFileSnitch.java | 8 +- .../apache/cassandra/net/MessagingService.java | 5 + .../cassandra/net/OutboundTcpConnection.java | 1 + .../apache/cassandra/security/SSLFactory.java | 1 + .../cassandra/service/ActiveRepairService.java | 2 + .../cassandra/service/FileCacheService.java | 1 + .../apache/cassandra/service/StorageProxy.java | 3 +- .../cassandra/service/pager/PagingState.java | 3 +- .../cassandra/streaming/ConnectionHandler.java | 5 + .../cassandra/streaming/StreamReader.java | 1 + .../cassandra/streaming/StreamWriter.java | 25 +-- .../compress/CompressedStreamReader.java | 1 + .../compress/CompressedStreamWriter.java | 18 +- .../streaming/messages/IncomingFileMessage.java | 1 + .../streaming/messages/StreamInitMessage.java | 8 +- .../thrift/CustomTNonBlockingServer.java | 2 + .../thrift/CustomTThreadPoolServer.java | 15 +- .../cassandra/thrift/SSLTransportFactory.java | 1 + .../thrift/TCustomNonblockingServerSocket.java | 1 + .../cassandra/thrift/TCustomServerSocket.java | 1 + .../thrift/TFramedTransportFactory.java | 1 + .../cassandra/thrift/THsHaDisruptorServer.java | 1 + .../apache/cassandra/tools/SSTableExport.java | 15 +- .../apache/cassandra/tools/SSTableImport.java | 97 ++++---- .../cassandra/tools/StandaloneScrubber.java | 7 +- .../cassandra/tools/StandaloneVerifier.java | 8 +- .../cassandra/utils/BloomFilterSerializer.java | 1 + .../org/apache/cassandra/utils/FBUtilities.java | 14 +- .../apache/cassandra/utils/FilterFactory.java | 1 + .../utils/NativeSSTableLoaderClient.java | 4 +- .../apache/cassandra/utils/concurrent/Ref.java | 2 +- .../apache/cassandra/utils/concurrent/Refs.java | 6 + .../cassandra/utils/obs/OffHeapBitSet.java | 1 + 92 files changed, 706 insertions(+), 533 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index a5b3220..ad2845f 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.2 + * Static Analysis to warn on unsafe use of Autocloseable instances (CASSANDRA-9431) * Update commitlog archiving examples now that commitlog segments are not recycled (CASSANDRA-9350) * Extend Transactional API to sstable lifecycle management (CASSANDRA-8568) http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/build.xml ---------------------------------------------------------------------- diff --git a/build.xml b/build.xml index cbedf20..bb401ee 100644 --- a/build.xml +++ b/build.xml @@ -68,8 +68,8 @@ <property name="dist.dir" value="${build.dir}/dist"/> <property name="tmp.dir" value="${java.io.tmpdir}"/> - <property name="source.version" value="1.7"/> - <property name="target.version" value="1.7"/> + <property name="source.version" value="1.7"/> + <property name="target.version" value="1.7"/> <condition property="version" value="${base.version}"> <isset property="release"/> @@ -114,6 +114,8 @@ <property name="jacoco.execfile" value="${jacoco.export.dir}/jacoco.exec" /> <property name="jacoco.version" value="0.7.1.201405082137"/> + <property name="ecj.version" value="4.4.2"/> + <condition property="maven-ant-tasks.jar.exists"> <available file="${build.dir}/maven-ant-tasks-${maven-ant-tasks.version}.jar" /> </condition> @@ -1047,7 +1049,7 @@ </tar> </target> - <target name="release" depends="artifacts,rat-init" + <target name="release" depends="eclipse-warnings,artifacts,rat-init" description="Create and QC release artifacts"> <checksum forceOverwrite="yes" todir="${build.dir}" fileext=".md5" algorithm="MD5"> @@ -1406,7 +1408,7 @@ </target> <target name="test-all" - depends="test,long-test,test-compression,pig-test,test-clientutil-jar" + depends="eclipse-warnings,test,long-test,test-compression,pig-test,test-clientutil-jar" description="Run all tests except for those under test-burn" /> <!-- Use JaCoCo ant extension without needing externally saved lib --> @@ -1894,6 +1896,40 @@ <delete dir="build/eclipse-classes" /> </target> + + <target name="eclipse-warnings" depends="build" description="Run eclipse compiler code analysis"> + <property name="ecj.log.dir" value="${build.dir}/ecj" /> + <property name="ecj.warnings.file" value="${ecj.log.dir}/eclipse_compiler_checks.txt"/> + <delete dir="${ecj.log.dir}" /> + <mkdir dir="${ecj.log.dir}" /> + + <property name="ecj.properties" value="${basedir}/eclipse_compiler.properties" /> + + <echo message="Running Eclipse Code Analysis. Output logged to ${ecj.warnings.file}" /> + + <java + jar="${build.dir.lib}/jars/ecj-${ecj.version}.jar" + fork="true" + failonerror="true" + maxmemory="512m"> + <arg value="-source"/> + <arg value="${source.version}" /> + <arg value="-target"/> + <arg value="${target.version}" /> + <arg value="-d" /> + <arg value="none" /> + <arg value="-proc:none" /> + <arg value="-log" /> + <arg value="${ecj.warnings.file}" /> + <arg value="-properties" /> + <arg value="${ecj.properties}" /> + <arg value="-cp" /> + <arg value="${toString:cassandra.classpath}" /> + <arg value="${build.src.java}" /> + </java> + </target> + + <!-- Publish artifacts to Maven repositories --> <target name="mvn-install" depends="maven-declare-dependencies,artifacts,jar,sources-jar,javadoc-jar" http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/eclipse_compiler.properties ---------------------------------------------------------------------- diff --git a/eclipse_compiler.properties b/eclipse_compiler.properties new file mode 100644 index 0000000..e1f2802 --- /dev/null +++ b/eclipse_compiler.properties @@ -0,0 +1,88 @@ +# These options come from +# http://grepcode.com/file/repo1.maven.org/maven2/org.eclipse.jdt.core.compiler/ecj/4.2.1/org/eclipse/jdt/internal/compiler/impl/CompilerOptions.java#CompilerOptions + +#Look for important errors +# +# Autoclosables not in try-with-references +org.eclipse.jdt.core.compiler.problem.explicitlyClosedAutoCloseable=error +org.eclipse.jdt.core.compiler.problem.potentiallyUnclosedCloseable=error +org.eclipse.jdt.core.compiler.problem.unclosedCloseable=ignore +#Ignore and disable all other checks too keep the logs clean + + +org.eclipse.jdt.core.compiler.problem.annotationSuperInterface=ignore +org.eclipse.jdt.core.compiler.problem.autoboxing=ignore +org.eclipse.jdt.core.compiler.problem.comparingIdentical=ignore +org.eclipse.jdt.core.compiler.problem.deadCode=ignore +org.eclipse.jdt.core.compiler.problem.deprecation=ignore +org.eclipse.jdt.core.compiler.problem.deprecationInDeprecatedCode=disabled +org.eclipse.jdt.core.compiler.problem.deprecationWhenOverridingDeprecatedMethod=disabled +org.eclipse.jdt.core.compiler.problem.discouragedReference=ignore +org.eclipse.jdt.core.compiler.problem.emptyStatement=ignore +org.eclipse.jdt.core.compiler.problem.fallthroughCase=ignore +org.eclipse.jdt.core.compiler.problem.fatalOptionalError=disabled +org.eclipse.jdt.core.compiler.problem.fieldHiding=ignore +org.eclipse.jdt.core.compiler.problem.finalParameterBound=ignore +org.eclipse.jdt.core.compiler.problem.finallyBlockNotCompletingNormally=ignore +org.eclipse.jdt.core.compiler.problem.forbiddenReference=ignore +org.eclipse.jdt.core.compiler.problem.hiddenCatchBlock=ignore +org.eclipse.jdt.core.compiler.problem.includeNullInfoFromAsserts=disabled +org.eclipse.jdt.core.compiler.problem.incompatibleNonInheritedInterfaceMethod=ignore +org.eclipse.jdt.core.compiler.problem.incompleteEnumSwitch=ignore +org.eclipse.jdt.core.compiler.problem.indirectStaticAccess=ignore +org.eclipse.jdt.core.compiler.problem.localVariableHiding=ignore +org.eclipse.jdt.core.compiler.problem.methodWithConstructorName=ignore +org.eclipse.jdt.core.compiler.problem.missingDefaultCase=ignore +org.eclipse.jdt.core.compiler.problem.missingDeprecatedAnnotation=ignore +org.eclipse.jdt.core.compiler.problem.missingEnumCaseDespiteDefault=disabled +org.eclipse.jdt.core.compiler.problem.missingHashCodeMethod=ignore +org.eclipse.jdt.core.compiler.problem.missingOverrideAnnotation=ignore +org.eclipse.jdt.core.compiler.problem.missingOverrideAnnotationForInterfaceMethodImplementation=disabled +org.eclipse.jdt.core.compiler.problem.missingSerialVersion=ignore +org.eclipse.jdt.core.compiler.problem.missingSynchronizedOnInheritedMethod=ignore +org.eclipse.jdt.core.compiler.problem.noEffectAssignment=ignore +org.eclipse.jdt.core.compiler.problem.noImplicitStringConversion=ignore +org.eclipse.jdt.core.compiler.problem.nonExternalizedStringLiteral=ignore +org.eclipse.jdt.core.compiler.problem.nullAnnotationInferenceConflict=error +org.eclipse.jdt.core.compiler.problem.nullReference=ignore +org.eclipse.jdt.core.compiler.problem.nullSpecViolation=error +org.eclipse.jdt.core.compiler.problem.nullUncheckedConversion=ignore +org.eclipse.jdt.core.compiler.problem.overridingPackageDefaultMethod=ignore +org.eclipse.jdt.core.compiler.problem.parameterAssignment=ignore +org.eclipse.jdt.core.compiler.problem.possibleAccidentalBooleanAssignment=ignore +org.eclipse.jdt.core.compiler.problem.potentialNullReference=ignore +org.eclipse.jdt.core.compiler.problem.rawTypeReference=ignore +org.eclipse.jdt.core.compiler.problem.redundantNullAnnotation=ignore +org.eclipse.jdt.core.compiler.problem.redundantNullCheck=ignore +org.eclipse.jdt.core.compiler.problem.redundantSpecificationOfTypeArguments=ignore +org.eclipse.jdt.core.compiler.problem.redundantSuperinterface=ignore +org.eclipse.jdt.core.compiler.problem.reportMethodCanBePotentiallyStatic=ignore +org.eclipse.jdt.core.compiler.problem.reportMethodCanBeStatic=ignore +org.eclipse.jdt.core.compiler.problem.specialParameterHidingField=disabled +org.eclipse.jdt.core.compiler.problem.staticAccessReceiver=ignore +org.eclipse.jdt.core.compiler.problem.suppressOptionalErrors=enabled +org.eclipse.jdt.core.compiler.problem.suppressWarnings=enabled +org.eclipse.jdt.core.compiler.problem.syntheticAccessEmulation=ignore +org.eclipse.jdt.core.compiler.problem.typeParameterHiding=ignore +org.eclipse.jdt.core.compiler.problem.unavoidableGenericTypeProblems=disabled +org.eclipse.jdt.core.compiler.problem.uncheckedTypeOperation=ignore +org.eclipse.jdt.core.compiler.problem.undocumentedEmptyBlock=ignore +org.eclipse.jdt.core.compiler.problem.unhandledWarningToken=ignore +org.eclipse.jdt.core.compiler.problem.unnecessaryElse=ignore +org.eclipse.jdt.core.compiler.problem.unnecessaryTypeCheck=ignore +org.eclipse.jdt.core.compiler.problem.unqualifiedFieldAccess=ignore +org.eclipse.jdt.core.compiler.problem.unusedDeclaredThrownException=ignore +org.eclipse.jdt.core.compiler.problem.unusedDeclaredThrownExceptionExemptExceptionAndThrowable=disabled +org.eclipse.jdt.core.compiler.problem.unusedDeclaredThrownExceptionIncludeDocCommentReference=disabled +org.eclipse.jdt.core.compiler.problem.unusedDeclaredThrownExceptionWhenOverriding=disabled +org.eclipse.jdt.core.compiler.problem.unusedImport=ignore +org.eclipse.jdt.core.compiler.problem.unusedLabel=ignore +org.eclipse.jdt.core.compiler.problem.unusedLocal=ignore +org.eclipse.jdt.core.compiler.problem.unusedObjectAllocation=ignore +org.eclipse.jdt.core.compiler.problem.unusedParameter=ignore +org.eclipse.jdt.core.compiler.problem.unusedParameterIncludeDocCommentReference=disabled +org.eclipse.jdt.core.compiler.problem.unusedParameterWhenImplementingAbstract=disabled +org.eclipse.jdt.core.compiler.problem.unusedParameterWhenOverridingConcrete=disabled +org.eclipse.jdt.core.compiler.problem.unusedPrivateMember=ignore +org.eclipse.jdt.core.compiler.problem.unusedWarningToken=ignore +org.eclipse.jdt.core.compiler.problem.varargsArgumentNeedCast=ignore http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/cache/AutoSavingCache.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cache/AutoSavingCache.java b/src/java/org/apache/cassandra/cache/AutoSavingCache.java index b381224..a204a18 100644 --- a/src/java/org/apache/cassandra/cache/AutoSavingCache.java +++ b/src/java/org/apache/cassandra/cache/AutoSavingCache.java @@ -227,6 +227,7 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K return info.forProgress(keysWritten, Math.max(keysWritten, keysEstimate)); } + @SuppressWarnings("resource") public void saveCache() { logger.debug("Deleting old {} files.", cacheType); http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/cache/SerializingCache.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cache/SerializingCache.java b/src/java/org/apache/cassandra/cache/SerializingCache.java index 911b500..0e38922 100644 --- a/src/java/org/apache/cassandra/cache/SerializingCache.java +++ b/src/java/org/apache/cassandra/cache/SerializingCache.java @@ -155,6 +155,7 @@ public class SerializingCache<K, V> implements ICache<K, V> map.clear(); } + @SuppressWarnings("resource") public V get(K key) { RefCountedMemory mem = map.get(key); @@ -172,6 +173,7 @@ public class SerializingCache<K, V> implements ICache<K, V> } } + @SuppressWarnings("resource") public void put(K key, V value) { RefCountedMemory mem = serialize(value); @@ -193,6 +195,7 @@ public class SerializingCache<K, V> implements ICache<K, V> old.unreference(); } + @SuppressWarnings("resource") public boolean putIfAbsent(K key, V value) { RefCountedMemory mem = serialize(value); @@ -216,6 +219,7 @@ public class SerializingCache<K, V> implements ICache<K, V> return old == null; } + @SuppressWarnings("resource") public boolean replace(K key, V oldToReplace, V value) { // if there is no old value in our map, we fail @@ -259,6 +263,7 @@ public class SerializingCache<K, V> implements ICache<K, V> public void remove(K key) { + @SuppressWarnings("resource") RefCountedMemory mem = map.remove(key); if (mem != null) mem.unreference(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/db/BatchlogManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/BatchlogManager.java b/src/java/org/apache/cassandra/db/BatchlogManager.java index f5137fd..dd84ac8 100644 --- a/src/java/org/apache/cassandra/db/BatchlogManager.java +++ b/src/java/org/apache/cassandra/db/BatchlogManager.java @@ -148,20 +148,17 @@ public class BatchlogManager implements BatchlogManagerMBean private static ByteBuffer serializeMutations(Collection<Mutation> mutations, int version) { - DataOutputBuffer buf = new DataOutputBuffer(); - - try + try (DataOutputBuffer buf = new DataOutputBuffer()) { buf.writeInt(mutations.size()); for (Mutation mutation : mutations) Mutation.serializer.serialize(mutation, buf, version); + return buf.buffer(); } catch (IOException e) { throw new AssertionError(); // cannot happen. } - - return buf.buffer(); } private void replayAllFailedBatches() throws ExecutionException, InterruptedException http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/db/ColumnFamily.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamily.java b/src/java/org/apache/cassandra/db/ColumnFamily.java index 006ced7..a7243a2 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamily.java +++ b/src/java/org/apache/cassandra/db/ColumnFamily.java @@ -521,9 +521,11 @@ public abstract class ColumnFamily implements Iterable<Cell>, IRowCacheEntry public ByteBuffer toBytes() { - DataOutputBuffer out = new DataOutputBuffer(); - serializer.serialize(this, out, MessagingService.current_version); - return ByteBuffer.wrap(out.getData(), 0, out.getLength()); + try (DataOutputBuffer out = new DataOutputBuffer()) + { + serializer.serialize(this, out, MessagingService.current_version); + return ByteBuffer.wrap(out.getData(), 0, out.getLength()); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index b41ac75..63ffb16 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -1795,6 +1795,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean return repairedSSTables; } + @SuppressWarnings("resource") public RefViewFragment selectAndReference(Function<View, List<SSTableReader>> filter) { while (true) @@ -1966,6 +1967,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean * * @param range The range of keys and columns within those keys to fetch */ + @SuppressWarnings("resource") private AbstractScanIterator getSequentialIterator(final DataRange range, long now) { assert !(range.keyRange() instanceof Range) || !((Range<?>)range.keyRange()).isWrapAround() || range.keyRange().right.isMinimum() : range.keyRange(); @@ -1980,24 +1982,27 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean { protected Row computeNext() { - // pull a row out of the iterator - if (!iterator.hasNext()) - return endOfData(); + while (true) + { + // pull a row out of the iterator + if (!iterator.hasNext()) + return endOfData(); - Row current = iterator.next(); - DecoratedKey key = current.key; + Row current = iterator.next(); + DecoratedKey key = current.key; - if (!range.stopKey().isMinimum() && range.stopKey().compareTo(key) < 0) - return endOfData(); + if (!range.stopKey().isMinimum() && range.stopKey().compareTo(key) < 0) + return endOfData(); - // skipping outside of assigned range - if (!range.contains(key)) - return computeNext(); + // skipping outside of assigned range + if (!range.contains(key)) + continue; - if (logger.isTraceEnabled()) - logger.trace("scanned {}", metadata.getKeyValidator().getString(key.getKey())); + if (logger.isTraceEnabled()) + logger.trace("scanned {}", metadata.getKeyValidator().getString(key.getKey())); - return current; + return current; + } } public void close() throws IOException http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/db/Memtable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java index 55b0bfe..ccf92be 100644 --- a/src/java/org/apache/cassandra/db/Memtable.java +++ b/src/java/org/apache/cassandra/db/Memtable.java @@ -423,19 +423,21 @@ public class Memtable implements Comparable<Memtable> private static int estimateRowOverhead(final int count) { // calculate row overhead - final OpOrder.Group group = new OpOrder().start(); - int rowOverhead; - MemtableAllocator allocator = MEMORY_POOL.newAllocator(); - ConcurrentNavigableMap<RowPosition, Object> rows = new ConcurrentSkipListMap<>(); - final Object val = new Object(); - for (int i = 0 ; i < count ; i++) - rows.put(allocator.clone(new BufferDecoratedKey(new LongToken(i), ByteBufferUtil.EMPTY_BYTE_BUFFER), group), val); - double avgSize = ObjectSizes.measureDeep(rows) / (double) count; - rowOverhead = (int) ((avgSize - Math.floor(avgSize)) < 0.05 ? Math.floor(avgSize) : Math.ceil(avgSize)); - rowOverhead -= ObjectSizes.measureDeep(new LongToken(0)); - rowOverhead += AtomicBTreeColumns.EMPTY_SIZE; - allocator.setDiscarding(); - allocator.setDiscarded(); - return rowOverhead; + try (final OpOrder.Group group = new OpOrder().start()) + { + int rowOverhead; + MemtableAllocator allocator = MEMORY_POOL.newAllocator(); + ConcurrentNavigableMap<RowPosition, Object> rows = new ConcurrentSkipListMap<>(); + final Object val = new Object(); + for (int i = 0; i < count; i++) + rows.put(allocator.clone(new BufferDecoratedKey(new LongToken(i), ByteBufferUtil.EMPTY_BYTE_BUFFER), group), val); + double avgSize = ObjectSizes.measureDeep(rows) / (double) count; + rowOverhead = (int) ((avgSize - Math.floor(avgSize)) < 0.05 ? Math.floor(avgSize) : Math.ceil(avgSize)); + rowOverhead -= ObjectSizes.measureDeep(new LongToken(0)); + rowOverhead += AtomicBTreeColumns.EMPTY_SIZE; + allocator.setDiscarding(); + allocator.setDiscarded(); + return rowOverhead; + } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/db/MutationVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/MutationVerbHandler.java b/src/java/org/apache/cassandra/db/MutationVerbHandler.java index 92bfdb5..3baa93e 100644 --- a/src/java/org/apache/cassandra/db/MutationVerbHandler.java +++ b/src/java/org/apache/cassandra/db/MutationVerbHandler.java @@ -59,18 +59,20 @@ public class MutationVerbHandler implements IVerbHandler<Mutation> */ private void forwardToLocalNodes(Mutation mutation, MessagingService.Verb verb, byte[] forwardBytes, InetAddress from) throws IOException { - DataInputStream in = new DataInputStream(new FastByteArrayInputStream(forwardBytes)); - int size = in.readInt(); - - // tell the recipients who to send their ack to - MessageOut<Mutation> message = new MessageOut<>(verb, mutation, Mutation.serializer).withParameter(Mutation.FORWARD_FROM, from.getAddress()); - // Send a message to each of the addresses on our Forward List - for (int i = 0; i < size; i++) + try (DataInputStream in = new DataInputStream(new FastByteArrayInputStream(forwardBytes))) { - InetAddress address = CompactEndpointSerializationHelper.deserialize(in); - int id = in.readInt(); - Tracing.trace("Enqueuing forwarded write to {}", address); - MessagingService.instance().sendOneWay(message, id, address); + int size = in.readInt(); + + // tell the recipients who to send their ack to + MessageOut<Mutation> message = new MessageOut<>(verb, mutation, Mutation.serializer).withParameter(Mutation.FORWARD_FROM, from.getAddress()); + // Send a message to each of the addresses on our Forward List + for (int i = 0; i < size; i++) + { + InetAddress address = CompactEndpointSerializationHelper.deserialize(in); + int id = in.readInt(); + Tracing.trace("Enqueuing forwarded write to {}", address); + MessagingService.instance().sendOneWay(message, id, address); + } } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/db/RangeSliceReply.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/RangeSliceReply.java b/src/java/org/apache/cassandra/db/RangeSliceReply.java index 5964ea8..ed1f523 100644 --- a/src/java/org/apache/cassandra/db/RangeSliceReply.java +++ b/src/java/org/apache/cassandra/db/RangeSliceReply.java @@ -57,7 +57,10 @@ public class RangeSliceReply public static RangeSliceReply read(byte[] body, int version) throws IOException { - return serializer.deserialize(new DataInputStream(new FastByteArrayInputStream(body)), version); + try (DataInputStream dis = new DataInputStream(new FastByteArrayInputStream(body))) + { + return serializer.deserialize(dis, version); + } } private static class RangeSliceReplySerializer implements IVersionedSerializer<RangeSliceReply> http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/db/SizeEstimatesRecorder.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SizeEstimatesRecorder.java b/src/java/org/apache/cassandra/db/SizeEstimatesRecorder.java index b0c114a..c68109c 100644 --- a/src/java/org/apache/cassandra/db/SizeEstimatesRecorder.java +++ b/src/java/org/apache/cassandra/db/SizeEstimatesRecorder.java @@ -82,6 +82,7 @@ public class SizeEstimatesRecorder extends MigrationListener implements Runnable } } + @SuppressWarnings("resource") private void recordSizeEstimates(ColumnFamilyStore table, Collection<Range<Token>> localRanges) { // for each local primary range, estimate (crudely) mean partition size and partitions count. @@ -90,22 +91,24 @@ public class SizeEstimatesRecorder extends MigrationListener implements Runnable { // filter sstables that have partitions in this range. Refs<SSTableReader> refs = null; - while (refs == null) - { - ColumnFamilyStore.ViewFragment view = table.select(table.viewFilter(Range.makeRowRange(range))); - refs = Refs.tryRef(view.sstables); - } - long partitionsCount, meanPartitionSize; + try { + while (refs == null) + { + ColumnFamilyStore.ViewFragment view = table.select(table.viewFilter(Range.makeRowRange(range))); + refs = Refs.tryRef(view.sstables); + } + // calculate the estimates. partitionsCount = estimatePartitionsCount(refs, range); meanPartitionSize = estimateMeanPartitionSize(refs); } finally { - refs.release(); + if (refs != null) + refs.release(); } estimates.put(range, Pair.create(partitionsCount, meanPartitionSize)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/db/SystemKeyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java index 67a3162..9956728 100644 --- a/src/java/org/apache/cassandra/db/SystemKeyspace.java +++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java @@ -437,17 +437,16 @@ public final class SystemKeyspace private static Map<UUID, ByteBuffer> truncationAsMapEntry(ColumnFamilyStore cfs, long truncatedAt, ReplayPosition position) { - DataOutputBuffer out = new DataOutputBuffer(); - try + try (DataOutputBuffer out = new DataOutputBuffer()) { ReplayPosition.serializer.serialize(position, out); out.writeLong(truncatedAt); + return Collections.singletonMap(cfs.metadata.cfId, ByteBuffer.wrap(out.getData(), 0, out.getLength())); } catch (IOException e) { throw new RuntimeException(e); } - return Collections.singletonMap(cfs.metadata.cfId, ByteBuffer.wrap(out.getData(), 0, out.getLength())); } public static ReplayPosition getTruncatedPosition(UUID cfId) @@ -1116,9 +1115,8 @@ public final class SystemKeyspace private static ByteBuffer rangeToBytes(Range<Token> range) { - try + try (DataOutputBuffer out = new DataOutputBuffer()) { - DataOutputBuffer out = new DataOutputBuffer(); Range.tokenSerializer.serialize(range, out, MessagingService.VERSION_22); return out.buffer(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/db/commitlog/CommitLog.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java index a8dda28..a81145d 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java @@ -257,12 +257,10 @@ public class CommitLog implements CommitLogMBean } Allocation alloc = allocator.allocate(mutation, (int) totalSize); - try + ICRC32 checksum = CRC32Factory.instance.create(); + final ByteBuffer buffer = alloc.getBuffer(); + try (BufferedDataOutputStreamPlus dos = new DataOutputBufferFixed(buffer)) { - ICRC32 checksum = CRC32Factory.instance.create(); - final ByteBuffer buffer = alloc.getBuffer(); - BufferedDataOutputStreamPlus dos = new DataOutputBufferFixed(buffer); - // checksummed length dos.writeInt((int) size); checksum.update(buffer, buffer.position() - 4, 4); http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java index 27abae3..02072de 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java @@ -63,11 +63,8 @@ public class CommitLogArchiver public CommitLogArchiver() { Properties commitlog_commands = new Properties(); - InputStream stream = null; - try + try (InputStream stream = getClass().getClassLoader().getResourceAsStream("commitlog_archiving.properties")) { - stream = getClass().getClassLoader().getResourceAsStream("commitlog_archiving.properties"); - if (stream == null) { logger.debug("No commitlog_archiving properties found; archive + pitr will be disabled"); @@ -113,10 +110,7 @@ public class CommitLogArchiver { throw new RuntimeException("Unable to load commitlog_archiving.properties", e); } - finally - { - FileUtils.closeQuietly(stream); - } + } public void maybeArchive(final CommitLogSegment segment) http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java index 6f9039d..a59e70e 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java @@ -268,6 +268,7 @@ public class CommitLogReplayer } } + @SuppressWarnings("resource") public void recover(File file) throws IOException { CommitLogDescriptor desc = CommitLogDescriptor.fromFileName(file.getName()); @@ -340,6 +341,7 @@ public class CommitLogReplayer FileDataInput sectionReader = reader; if (compressor != null) + { try { int start = (int) reader.getFilePointer(); @@ -363,6 +365,7 @@ public class CommitLogReplayer logger.error("Unexpected exception decompressing section {}", e); continue; } + } if (!replaySyncSection(sectionReader, replayEnd, desc)) break; @@ -469,9 +472,9 @@ public class CommitLogReplayer void replayMutation(byte[] inputBuffer, int size, final long entryLocation, final CommitLogDescriptor desc) throws IOException { - FastByteArrayInputStream bufIn = new FastByteArrayInputStream(inputBuffer, 0, size); + final Mutation mutation; - try + try (FastByteArrayInputStream bufIn = new FastByteArrayInputStream(inputBuffer, 0, size)) { mutation = Mutation.serializer.deserialize(new DataInputStream(bufIn), desc.getMessagingVersion(), @@ -499,15 +502,12 @@ public class CommitLogReplayer { JVMStabilityInspector.inspectThrowable(t); File f = File.createTempFile("mutation", "dat"); - DataOutputStream out = new DataOutputStream(new FileOutputStream(f)); - try + + try (DataOutputStream out = new DataOutputStream(new FileOutputStream(f))) { out.write(inputBuffer, 0, size); } - finally - { - out.close(); - } + String st = String.format("Unexpected error deserializing mutation; saved to %s and ignored. This may be caused by replaying a mutation against a table with the same name but incompatible schema. Exception follows: ", f.getAbsolutePath()); logger.error(st, t); http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java index d04690d..ee160c3 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java @@ -160,6 +160,7 @@ public abstract class CommitLogSegment * Allocate space in this buffer for the provided mutation, and return the allocated Allocation object. * Returns null if there is not enough space in this segment, and a new segment is needed. */ + @SuppressWarnings("resource") //we pass the op order around Allocation allocate(Mutation mutation, int size) { final OpOrder.Group opGroup = appendOrder.start(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java index 38107c0..97e7041 100644 --- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java @@ -268,6 +268,7 @@ public abstract class AbstractCompactionStrategy * allow for a more memory efficient solution if we know the sstable don't overlap (see * LeveledCompactionStrategy for instance). */ + @SuppressWarnings("resource") public ScannerList getScanners(Collection<SSTableReader> sstables, Range<Token> range) { RateLimiter limiter = CompactionManager.instance.getRateLimiter(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index 2079325..ffed554 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -42,6 +42,7 @@ import javax.management.openmbean.OpenDataException; import javax.management.openmbean.TabularData; import com.google.common.base.Predicate; +import com.google.common.base.Throwables; import com.google.common.collect.*; import com.google.common.util.concurrent.*; import org.slf4j.Logger; @@ -70,9 +71,11 @@ import org.apache.cassandra.metrics.CompactionMetrics; import org.apache.cassandra.repair.Validator; import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.utils.CloseableIterator; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.JVMStabilityInspector; import org.apache.cassandra.utils.MerkleTree; +import org.apache.cassandra.utils.Pair; import org.apache.cassandra.utils.WrappedRunnable; import org.apache.cassandra.utils.UUIDGen; import org.apache.cassandra.utils.concurrent.OpOrder; @@ -243,6 +246,7 @@ public class CompactionManager implements CompactionManagerMBean } } + @SuppressWarnings("resource") private AllSSTableOpStatus parallelAllSSTableOperation(final ColumnFamilyStore cfs, final OneSSTableOperation operation, OperationType operationType) throws ExecutionException, InterruptedException { try (LifecycleTransaction compacting = cfs.markAllCompacting(operationType);) @@ -259,7 +263,8 @@ public class CompactionManager implements CompactionManagerMBean } Iterable<SSTableReader> sstables = operation.filterSSTables(compacting.originals()); - List<Future<Object>> futures = new ArrayList<>(); + List<Pair<LifecycleTransaction,Future<Object>>> futures = new ArrayList<>(); + for (final SSTableReader sstable : sstables) { @@ -270,7 +275,7 @@ public class CompactionManager implements CompactionManagerMBean } final LifecycleTransaction txn = compacting.split(singleton(sstable)); - futures.add(executor.submit(new Callable<Object>() + futures.add(Pair.create(txn,executor.submit(new Callable<Object>() { @Override public Object call() throws Exception @@ -278,13 +283,37 @@ public class CompactionManager implements CompactionManagerMBean operation.execute(txn); return this; } - })); + }))); } assert compacting.originals().isEmpty(); - for (Future<Object> f : futures) - f.get(); + + //Collect all exceptions + Exception exception = null; + + for (Pair<LifecycleTransaction, Future<Object>> f : futures) + { + try + { + f.right.get(); + } + catch (InterruptedException | ExecutionException e) + { + if (exception == null) + exception = new Exception(); + + exception.addSuppressed(e); + } + finally + { + f.left.close(); + } + } + + if (exception != null) + Throwables.propagate(exception); + return AllSSTableOpStatus.SUCCESSFUL; } } @@ -407,6 +436,7 @@ public class CompactionManager implements CompactionManagerMBean { Runnable runnable = new WrappedRunnable() { @Override + @SuppressWarnings("resource") public void runMayThrow() throws Exception { LifecycleTransaction modifier = null; @@ -427,6 +457,7 @@ public class CompactionManager implements CompactionManagerMBean if (executor.isShutdown()) { logger.info("Compaction executor has shut down, not submitting anticompaction"); + sstables.release(); return Futures.immediateCancelledFuture(); } @@ -659,35 +690,35 @@ public class CompactionManager implements CompactionManagerMBean private void scrubOne(ColumnFamilyStore cfs, LifecycleTransaction modifier, boolean skipCorrupted, boolean checkData) throws IOException { - Scrubber scrubber = new Scrubber(cfs, modifier, skipCorrupted, false, checkData); + CompactionInfo.Holder scrubInfo = null; - CompactionInfo.Holder scrubInfo = scrubber.getScrubInfo(); - metrics.beginCompaction(scrubInfo); - try + try (Scrubber scrubber = new Scrubber(cfs, modifier, skipCorrupted, false, checkData)) { + scrubInfo = scrubber.getScrubInfo(); + metrics.beginCompaction(scrubInfo); scrubber.scrub(); } finally { - scrubber.close(); - metrics.finishCompaction(scrubInfo); + if (scrubInfo != null) + metrics.finishCompaction(scrubInfo); } } private void verifyOne(ColumnFamilyStore cfs, SSTableReader sstable, boolean extendedVerify) throws IOException { - Verifier verifier = new Verifier(cfs, sstable, false); + CompactionInfo.Holder verifyInfo = null; - CompactionInfo.Holder verifyInfo = verifier.getVerifyInfo(); - metrics.beginCompaction(verifyInfo); - try + try (Verifier verifier = new Verifier(cfs, sstable, false)) { + verifyInfo = verifier.getVerifyInfo(); + metrics.beginCompaction(verifyInfo); verifier.verify(extendedVerify); } finally { - verifier.close(); - metrics.finishCompaction(verifyInfo); + if (verifyInfo != null) + metrics.finishCompaction(verifyInfo); } } @@ -798,10 +829,11 @@ public class CompactionManager implements CompactionManagerMBean if (ci.isStopRequested()) throw new CompactionInterruptedException(ci.getCompactionInfo()); - SSTableIdentityIterator row = (SSTableIdentityIterator) scanner.next(); - row = cleanupStrategy.cleanup(row); + @SuppressWarnings("resource") + SSTableIdentityIterator row = cleanupStrategy.cleanup((SSTableIdentityIterator) scanner.next()); if (row == null) continue; + @SuppressWarnings("resource") AbstractCompactedRow compactedRow = new LazilyCompactedRow(controller, Collections.singletonList(row)); if (writer.append(compactedRow) != null) totalkeysWritten++; @@ -991,35 +1023,40 @@ public class CompactionManager implements CompactionManagerMBean if (!cfs.isValid()) return; - Refs<SSTableReader> sstables = null; - try - { + String snapshotName = validator.desc.sessionId.toString(); + boolean isSnapshotValidation = cfs.snapshotExists(snapshotName); - String snapshotName = validator.desc.sessionId.toString(); - int gcBefore; - boolean isSnapshotValidation = cfs.snapshotExists(snapshotName); - if (isSnapshotValidation) - { - // If there is a snapshot created for the session then read from there. - // note that we populate the parent repair session when creating the snapshot, meaning the sstables in the snapshot are the ones we - // are supposed to validate. - sstables = cfs.getSnapshotSSTableReader(snapshotName); + int gcBefore; + if (isSnapshotValidation) + { + // If there is a snapshot created for the session then read from there. + // note that we populate the parent repair session when creating the snapshot, meaning the sstables in the snapshot are the ones we + // are supposed to validate. + try (Refs<SSTableReader> sstables = cfs.getSnapshotSSTableReader(snapshotName)) + { // Computing gcbefore based on the current time wouldn't be very good because we know each replica will execute // this at a different time (that's the whole purpose of repair with snaphsot). So instead we take the creation // time of the snapshot, which should give us roughtly the same time on each replica (roughtly being in that case // 'as good as in the non-snapshot' case) gcBefore = cfs.gcBefore(cfs.getSnapshotCreationTime(snapshotName)); + + buildMerkleTree(cfs, sstables, validator, gcBefore); + + // review comment: should this be in a try/finally? it was previously + cfs.clearSnapshot(snapshotName); } - else + } + else + { + // flush first so everyone is validating data that is as similar as possible + StorageService.instance.forceKeyspaceFlush(cfs.keyspace.getName(), cfs.name); + ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(validator.desc.parentSessionId); + try (ColumnFamilyStore.RefViewFragment sstableCandidates = cfs.selectAndReference(prs.isIncremental ? ColumnFamilyStore.UNREPAIRED_SSTABLES : ColumnFamilyStore.CANONICAL_SSTABLES)) { - // flush first so everyone is validating data that is as similar as possible - StorageService.instance.forceKeyspaceFlush(cfs.keyspace.getName(), cfs.name); - ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(validator.desc.parentSessionId); - ColumnFamilyStore.RefViewFragment sstableCandidates = cfs.selectAndReference(prs.isIncremental ? ColumnFamilyStore.UNREPAIRED_SSTABLES : ColumnFamilyStore.CANONICAL_SSTABLES); + Refs<SSTableReader> refs = sstableCandidates.refs; Set<SSTableReader> sstablesToValidate = new HashSet<>(); - for (SSTableReader sstable : sstableCandidates.sstables) { if (new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(Collections.singletonList(validator.desc.range))) @@ -1036,83 +1073,75 @@ public class CompactionManager implements CompactionManagerMBean throw new RuntimeException("Cannot start multiple repair sessions over the same sstables"); } - sstables = Refs.tryRef(sstablesToValidate); - if (sstables == null) - { - logger.error("Could not reference sstables"); - throw new RuntimeException("Could not reference sstables"); - } - sstableCandidates.release(); + refs.relaseAllExcept(sstablesToValidate); prs.addSSTables(cfs.metadata.cfId, sstablesToValidate); if (validator.gcBefore > 0) gcBefore = validator.gcBefore; else gcBefore = getDefaultGcBefore(cfs); - } - // Create Merkle tree suitable to hold estimated partitions for given range. - // We blindly assume that partition is evenly distributed on all sstables for now. - long numPartitions = 0; - for (SSTableReader sstable : sstables) - { - numPartitions += sstable.estimatedKeysForRanges(singleton(validator.desc.range)); + + buildMerkleTree(cfs, refs, validator, gcBefore); } - // determine tree depth from number of partitions, but cap at 20 to prevent large tree. - int depth = numPartitions > 0 ? (int) Math.min(Math.floor(Math.log(numPartitions)), 20) : 0; - MerkleTree tree = new MerkleTree(cfs.partitioner, validator.desc.range, MerkleTree.RECOMMENDED_DEPTH, (int) Math.pow(2, depth)); + } + } - long start = System.nanoTime(); - try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables, validator.desc.range)) + private void buildMerkleTree(ColumnFamilyStore cfs, Refs<SSTableReader> sstables, Validator validator, int gcBefore) + { + // Create Merkle tree suitable to hold estimated partitions for given range. + // We blindly assume that partition is evenly distributed on all sstables for now. + long numPartitions = 0; + for (SSTableReader sstable : sstables) + { + numPartitions += sstable.estimatedKeysForRanges(Collections.singleton(validator.desc.range)); + } + // determine tree depth from number of partitions, but cap at 20 to prevent large tree. + int depth = numPartitions > 0 ? (int) Math.min(Math.floor(Math.log(numPartitions)), 20) : 0; + MerkleTree tree = new MerkleTree(cfs.partitioner, validator.desc.range, MerkleTree.RECOMMENDED_DEPTH, (int) Math.pow(2, depth)); + + long start = System.nanoTime(); + try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables, validator.desc.range)) + { + CompactionIterable ci = new ValidationCompactionIterable(cfs, scanners.scanners, gcBefore); + metrics.beginCompaction(ci); + try (CloseableIterator<AbstractCompactedRow> iter = ci.iterator();) { - CompactionIterable ci = new ValidationCompactionIterable(cfs, scanners.scanners, gcBefore); - Iterator<AbstractCompactedRow> iter = ci.iterator(); - metrics.beginCompaction(ci); - try - { - // validate the CF as we iterate over it - validator.prepare(cfs, tree); - while (iter.hasNext()) - { - if (ci.isStopRequested()) - throw new CompactionInterruptedException(ci.getCompactionInfo()); - AbstractCompactedRow row = iter.next(); - validator.add(row); - } - validator.complete(); - } - finally + // validate the CF as we iterate over it + validator.prepare(cfs, tree); + while (iter.hasNext()) { - if (isSnapshotValidation) - { - cfs.clearSnapshot(snapshotName); - } - - metrics.finishCompaction(ci); + if (ci.isStopRequested()) + throw new CompactionInterruptedException(ci.getCompactionInfo()); + @SuppressWarnings("resource") + AbstractCompactedRow row = iter.next(); + validator.add(row); } + validator.complete(); } - - if (logger.isDebugEnabled()) + catch (Exception e) + { + Throwables.propagate(e); + } + finally { - // MT serialize may take time - long duration = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); - logger.debug("Validation finished in {} msec, depth {} for {} keys, serialized size {} bytes for {}", - duration, - depth, - numPartitions, - MerkleTree.serializer.serializedSize(tree, 0), - validator.desc); + metrics.finishCompaction(ci); } } - finally + + if (logger.isDebugEnabled()) { - if (sstables != null) - sstables.release(); + // MT serialize may take time + long duration = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); + logger.debug("Validation finished in {} msec, depth {} for {} keys, serialized size {} bytes for {}", + duration, + depth, + numPartitions, + MerkleTree.serializer.serializedSize(tree, 0), + validator.desc); } } - - /** * Splits up an sstable into two new sstables. The first of the new tables will store repaired ranges, the second * will store the non-repaired ranges. Once anticompation is completed, the original sstable is marked as compacted @@ -1187,12 +1216,14 @@ public class CompactionManager implements CompactionManagerMBean unRepairedSSTableWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs, destination, expectedBloomFilterSize, ActiveRepairService.UNREPAIRED_SSTABLE, sstableAsSet)); CompactionIterable ci = new CompactionIterable(OperationType.ANTICOMPACTION, scanners.scanners, controller, DatabaseDescriptor.getSSTableFormat(), UUIDGen.getTimeUUID()); - Iterator<AbstractCompactedRow> iter = ci.iterator(); metrics.beginCompaction(ci); try { + @SuppressWarnings("resource") + CloseableIterator<AbstractCompactedRow> iter = ci.iterator(); while (iter.hasNext()) { + @SuppressWarnings("resource") AbstractCompactedRow row = iter.next(); // if current range from sstable is repaired, save it into the new repaired sstable if (Range.isInRanges(row.key.getToken(), ranges)) @@ -1315,6 +1346,7 @@ public class CompactionManager implements CompactionManagerMBean private static class ValidationCompactionIterable extends CompactionIterable { + @SuppressWarnings("resource") public ValidationCompactionIterable(ColumnFamilyStore cfs, List<ISSTableScanner> scanners, int gcBefore) { super(OperationType.VALIDATION, scanners, new ValidationCompactionController(cfs, gcBefore), DatabaseDescriptor.getSSTableFormat(), UUIDGen.getTimeUUID()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/db/compaction/CompactionTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java index e593ec0..7089016 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java @@ -17,9 +17,9 @@ */ package org.apache.cassandra.db.compaction; +import java.util.Arrays; import java.util.Collection; import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -44,6 +44,7 @@ import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.db.compaction.CompactionManager.CompactionExecutorStatsCollector; import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.service.ActiveRepairService; +import org.apache.cassandra.utils.CloseableIterator; import org.apache.cassandra.utils.UUIDGen; public class CompactionTask extends AbstractCompactionTask @@ -159,45 +160,49 @@ public class CompactionTask extends AbstractCompactionTask try (AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(actuallyCompact)) { ci = new CompactionIterable(compactionType, scanners.scanners, controller, sstableFormat, taskId); - Iterator<AbstractCompactedRow> iter = ci.iterator(); - if (collector != null) - collector.beginCompaction(ci); - long lastCheckObsoletion = start; - - if (!controller.cfs.getCompactionStrategy().isActive) - throw new CompactionInterruptedException(ci.getCompactionInfo()); - - try (CompactionAwareWriter writer = getCompactionAwareWriter(cfs, transaction, actuallyCompact)) + try (CloseableIterator<AbstractCompactedRow> iter = ci.iterator()) { - estimatedKeys = writer.estimatedKeys(); - while (iter.hasNext()) - { - if (ci.isStopRequested()) - throw new CompactionInterruptedException(ci.getCompactionInfo()); + if (collector != null) + collector.beginCompaction(ci); + long lastCheckObsoletion = start; - AbstractCompactedRow row = iter.next(); - if (writer.append(row)) - totalKeysWritten++; + if (!controller.cfs.getCompactionStrategy().isActive) + throw new CompactionInterruptedException(ci.getCompactionInfo()); - if (System.nanoTime() - lastCheckObsoletion > TimeUnit.MINUTES.toNanos(1L)) + try (CompactionAwareWriter writer = getCompactionAwareWriter(cfs, transaction, actuallyCompact)) + { + estimatedKeys = writer.estimatedKeys(); + while (iter.hasNext()) { - controller.maybeRefreshOverlaps(); - lastCheckObsoletion = System.nanoTime(); + if (ci.isStopRequested()) + throw new CompactionInterruptedException(ci.getCompactionInfo()); + + try (AbstractCompactedRow row = iter.next()) + { + if (writer.append(row)) + totalKeysWritten++; + + if (System.nanoTime() - lastCheckObsoletion > TimeUnit.MINUTES.toNanos(1L)) + { + controller.maybeRefreshOverlaps(); + lastCheckObsoletion = System.nanoTime(); + } + } } - } - // don't replace old sstables yet, as we need to mark the compaction finished in the system table - newSStables = writer.finish(); - } - finally - { - // point of no return -- the new sstables are live on disk; next we'll start deleting the old ones - // (in replaceCompactedSSTables) - if (taskId != null) - SystemKeyspace.finishCompaction(taskId); + // don't replace old sstables yet, as we need to mark the compaction finished in the system table + newSStables = writer.finish(); + } + finally + { + // point of no return -- the new sstables are live on disk; next we'll start deleting the old ones + // (in replaceCompactedSSTables) + if (taskId != null) + SystemKeyspace.finishCompaction(taskId); - if (collector != null) - collector.finishCompaction(ci); + if (collector != null) + collector.finishCompaction(ci); + } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java index 18d5f7b..43f998a 100644 --- a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java @@ -56,6 +56,7 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy } @Override + @SuppressWarnings("resource") public synchronized AbstractCompactionTask getNextBackgroundTask(int gcBefore) { if (!isEnabled()) @@ -366,6 +367,7 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy } @Override + @SuppressWarnings("resource") public synchronized Collection<AbstractCompactionTask> getMaximalTask(int gcBefore, boolean splitOutput) { LifecycleTransaction modifier = cfs.markAllCompacting(OperationType.COMPACTION); @@ -376,6 +378,7 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy } @Override + @SuppressWarnings("resource") public synchronized AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> sstables, int gcBefore) { assert !sstables.isEmpty(); // checked for by CM.submitUserDefined http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java index c3d764e..9eb624e 100644 --- a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java +++ b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java @@ -39,6 +39,7 @@ import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.io.util.SequentialWriter; import org.apache.cassandra.utils.MergeIterator; import org.apache.cassandra.utils.StreamingHistogram; +import org.apache.cassandra.utils.Throwables; /** * LazilyCompactedRow only computes the row bloom filter and column index in memory @@ -157,13 +158,11 @@ public class LazilyCompactedRow extends AbstractCompactedRow // no special-case for rows.size == 1, we're actually skipping some bytes here so just // blindly updating everything wouldn't be correct - DataOutputBuffer out = new DataOutputBuffer(); - - // initialize indexBuilder for the benefit of its tombstoneTracker, used by our reducing iterator - indexBuilder = new ColumnIndex.Builder(emptyColumnFamily, key.getKey(), out); - - try + try (DataOutputBuffer out = new DataOutputBuffer()) { + // initialize indexBuilder for the benefit of its tombstoneTracker, used by our reducing iterator + indexBuilder = new ColumnIndex.Builder(emptyColumnFamily, key.getKey(), out); + DeletionTime.serializer.serialize(emptyColumnFamily.deletionInfo().getTopLevelDeletion(), out); // do not update digest in case of missing or purged row level tombstones, see CASSANDRA-8979 @@ -192,6 +191,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow public void close() { + Throwable accumulate = null; for (OnDiskAtomIterator row : rows) { try @@ -200,10 +200,11 @@ public class LazilyCompactedRow extends AbstractCompactedRow } catch (IOException e) { - throw new RuntimeException(e); + accumulate = Throwables.merge(accumulate, e); } } closed = true; + Throwables.maybeFail(accumulate); } protected class Reducer extends MergeIterator.Reducer<OnDiskAtom, OnDiskAtom> http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java index c434d31..9eb58ff 100644 --- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java @@ -88,6 +88,7 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy * the only difference between background and maximal in LCS is that maximal is still allowed * (by explicit user request) even when compaction is disabled. */ + @SuppressWarnings("resource") public synchronized AbstractCompactionTask getNextBackgroundTask(int gcBefore) { if (!isEnabled()) @@ -126,6 +127,7 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy } } + @SuppressWarnings("resource") public synchronized Collection<AbstractCompactionTask> getMaximalTask(int gcBefore, boolean splitOutput) { Iterable<SSTableReader> sstables = manifest.getAllSSTables(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/db/compaction/Scrubber.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java index 93b76bd..10952e7 100644 --- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java +++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java @@ -86,6 +86,7 @@ public class Scrubber implements Closeable this(cfs, transaction, skipCorrupted, new OutputHandler.LogOutput(), isOffline, checkData); } + @SuppressWarnings("resource") public Scrubber(ColumnFamilyStore cfs, LifecycleTransaction transaction, boolean skipCorrupted, OutputHandler outputHandler, boolean isOffline, boolean checkData) throws IOException { this.cfs = cfs; @@ -204,6 +205,7 @@ public class Scrubber implements Closeable continue; } + @SuppressWarnings("resource") AbstractCompactedRow compactedRow = new LazilyCompactedRow(controller, Collections.singletonList(atoms)); if (writer.tryAppend(compactedRow) == null) emptyRows++; @@ -234,6 +236,7 @@ public class Scrubber implements Closeable continue; } + @SuppressWarnings("resource") AbstractCompactedRow compactedRow = new LazilyCompactedRow(controller, Collections.singletonList(atoms)); if (writer.tryAppend(compactedRow) == null) emptyRows++; http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java index 94c3daf..74a9757 100644 --- a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java @@ -175,6 +175,7 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy return sstr.getReadMeter() == null ? 0.0 : sstr.getReadMeter().twoHourRate() / sstr.estimatedKeys(); } + @SuppressWarnings("resource") public synchronized AbstractCompactionTask getNextBackgroundTask(int gcBefore) { if (!isEnabled()) @@ -193,6 +194,7 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy } } + @SuppressWarnings("resource") public Collection<AbstractCompactionTask> getMaximalTask(final int gcBefore, boolean splitOutput) { Iterable<SSTableReader> filteredSSTables = filterSuspectSSTables(sstables); @@ -206,6 +208,7 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy return Arrays.<AbstractCompactionTask>asList(new CompactionTask(cfs, txn, gcBefore, false)); } + @SuppressWarnings("resource") public AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> sstables, final int gcBefore) { assert !sstables.isEmpty(); // checked for by CM.submitUserDefined http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/db/compaction/Upgrader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/Upgrader.java b/src/java/org/apache/cassandra/db/compaction/Upgrader.java index 6556a71..ca975b8 100644 --- a/src/java/org/apache/cassandra/db/compaction/Upgrader.java +++ b/src/java/org/apache/cassandra/db/compaction/Upgrader.java @@ -20,6 +20,8 @@ package org.apache.cassandra.db.compaction; import java.io.File; import java.util.*; +import com.google.common.base.Throwables; + import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.DecoratedKey; @@ -28,6 +30,7 @@ import org.apache.cassandra.io.sstable.*; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.sstable.format.SSTableWriter; import org.apache.cassandra.io.sstable.metadata.MetadataCollector; +import org.apache.cassandra.utils.CloseableIterator; import org.apache.cassandra.utils.OutputHandler; import org.apache.cassandra.utils.UUIDGen; @@ -83,12 +86,13 @@ public class Upgrader outputHandler.output("Upgrading " + sstable); try (SSTableRewriter writer = new SSTableRewriter(cfs, transaction, CompactionTask.getMaxDataAge(transaction.originals()), true); - AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(transaction.originals())) + AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(transaction.originals()); + CloseableIterator<AbstractCompactedRow> iter = new CompactionIterable(compactionType, scanners.scanners, controller, DatabaseDescriptor.getSSTableFormat(), UUIDGen.getTimeUUID()).iterator()) { - Iterator<AbstractCompactedRow> iter = new CompactionIterable(compactionType, scanners.scanners, controller, DatabaseDescriptor.getSSTableFormat(), UUIDGen.getTimeUUID()).iterator(); writer.switchWriter(createCompactionWriter(sstable.getSSTableMetadata().repairedAt)); while (iter.hasNext()) { + @SuppressWarnings("resource") AbstractCompactedRow row = iter.next(); writer.append(row); } @@ -96,6 +100,10 @@ public class Upgrader writer.finish(); outputHandler.output("Upgrade of " + sstable + " complete."); } + catch (Exception e) + { + Throwables.propagate(e); + } finally { controller.close(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/db/compaction/WrappingCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/WrappingCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/WrappingCompactionStrategy.java index c511bcd..adda0c9 100644 --- a/src/java/org/apache/cassandra/db/compaction/WrappingCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/WrappingCompactionStrategy.java @@ -344,6 +344,7 @@ public final class WrappingCompactionStrategy extends AbstractCompactionStrategy } @Override + @SuppressWarnings("resource") public synchronized ScannerList getScanners(Collection<SSTableReader> sstables, Range<Token> range) { List<SSTableReader> repairedSSTables = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java index 0b31061..7d88458 100644 --- a/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java +++ b/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java @@ -41,12 +41,14 @@ public class DefaultCompactionWriter extends CompactionAwareWriter { protected static final Logger logger = LoggerFactory.getLogger(DefaultCompactionWriter.class); + @SuppressWarnings("resource") public DefaultCompactionWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables, boolean offline, OperationType compactionType) { super(cfs, txn, nonExpiredSSTables, offline); logger.debug("Expected bloom filter size : {}", estimatedTotalKeys); long expectedWriteSize = cfs.getExpectedCompactedFileSize(nonExpiredSSTables, compactionType); File sstableDirectory = cfs.directories.getLocationForDisk(getWriteDirectory(expectedWriteSize)); + @SuppressWarnings("resource") SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getTempSSTablePath(sstableDirectory)), estimatedTotalKeys, minRepairedAt, http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java index 014b4af..95d7a0c 100644 --- a/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java +++ b/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java @@ -47,6 +47,7 @@ public class MajorLeveledCompactionWriter extends CompactionAwareWriter private int sstablesWritten = 0; private final boolean skipAncestors; + @SuppressWarnings("resource") public MajorLeveledCompactionWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables, long maxSSTableSize, boolean offline, OperationType compactionType) { super(cfs, txn, nonExpiredSSTables, offline); @@ -61,6 +62,7 @@ public class MajorLeveledCompactionWriter extends CompactionAwareWriter if (skipAncestors) logger.warn("Many sstables involved in compaction, skipping storing ancestor information to avoid running out of memory"); + @SuppressWarnings("resource") SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getTempSSTablePath(sstableDirectory)), keysPerSSTable, minRepairedAt, @@ -71,6 +73,7 @@ public class MajorLeveledCompactionWriter extends CompactionAwareWriter } @Override + @SuppressWarnings("resource") public boolean append(AbstractCompactedRow row) { long posBefore = sstableWriter.currentWriter().getOnDiskFilePointer(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java index 8903ff7..d30a612 100644 --- a/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java +++ b/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java @@ -39,6 +39,7 @@ public class MaxSSTableSizeWriter extends CompactionAwareWriter private final long estimatedSSTables; private final Set<SSTableReader> allSSTables; + @SuppressWarnings("resource") public MaxSSTableSizeWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables, long maxSSTableSize, int level, boolean offline, OperationType compactionType) { super(cfs, txn, nonExpiredSSTables, offline); @@ -50,6 +51,7 @@ public class MaxSSTableSizeWriter extends CompactionAwareWriter estimatedTotalKeys = SSTableReader.getApproximateKeyCount(nonExpiredSSTables); estimatedSSTables = Math.max(1, estimatedTotalKeys / maxSSTableSize); File sstableDirectory = cfs.directories.getLocationForDisk(getWriteDirectory(expectedWriteSize)); + @SuppressWarnings("resource") SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getTempSSTablePath(sstableDirectory)), estimatedTotalKeys / estimatedSSTables, minRepairedAt, @@ -66,6 +68,7 @@ public class MaxSSTableSizeWriter extends CompactionAwareWriter if (sstableWriter.currentWriter().getOnDiskFilePointer() > maxSSTableSize) { File sstableDirectory = cfs.directories.getLocationForDisk(getWriteDirectory(expectedWriteSize)); + @SuppressWarnings("resource") SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getTempSSTablePath(sstableDirectory)), estimatedTotalKeys / estimatedSSTables, minRepairedAt, http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java index 81ea6b1..9ff1325 100644 --- a/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java +++ b/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java @@ -56,6 +56,7 @@ public class SplittingSizeTieredCompactionWriter extends CompactionAwareWriter this(cfs, txn, nonExpiredSSTables, compactionType, DEFAULT_SMALLEST_SSTABLE_BYTES); } + @SuppressWarnings("resource") public SplittingSizeTieredCompactionWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables, OperationType compactionType, long smallestSSTable) { super(cfs, txn, nonExpiredSSTables, false); @@ -83,6 +84,7 @@ public class SplittingSizeTieredCompactionWriter extends CompactionAwareWriter File sstableDirectory = cfs.directories.getLocationForDisk(getWriteDirectory(Math.round(totalSize * ratios[currentRatioIndex]))); long currentPartitionsToWrite = Math.round(estimatedTotalKeys * ratios[currentRatioIndex]); currentBytesToWrite = Math.round(totalSize * ratios[currentRatioIndex]); + @SuppressWarnings("resource") SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getTempSSTablePath(sstableDirectory)), currentPartitionsToWrite, minRepairedAt, @@ -104,6 +106,7 @@ public class SplittingSizeTieredCompactionWriter extends CompactionAwareWriter currentBytesToWrite = Math.round(totalSize * ratios[currentRatioIndex]); long currentPartitionsToWrite = Math.round(ratios[currentRatioIndex] * estimatedTotalKeys); File sstableDirectory = cfs.directories.getLocationForDisk(getWriteDirectory(Math.round(totalSize * ratios[currentRatioIndex]))); + @SuppressWarnings("resource") SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getTempSSTablePath(sstableDirectory)), currentPartitionsToWrite, minRepairedAt, http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/db/marshal/CompositeType.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/marshal/CompositeType.java b/src/java/org/apache/cassandra/db/marshal/CompositeType.java index 1bc772d..f3c041e 100644 --- a/src/java/org/apache/cassandra/db/marshal/CompositeType.java +++ b/src/java/org/apache/cassandra/db/marshal/CompositeType.java @@ -402,9 +402,8 @@ public class CompositeType extends AbstractCompositeType public ByteBuffer build() { - try + try (DataOutputBuffer out = new DataOutputBufferFixed(serializedSize)) { - DataOutputBuffer out = new DataOutputBufferFixed(serializedSize); if (isStatic) out.writeShort(STATIC_MARKER); http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aafe053/src/java/org/apache/cassandra/gms/FailureDetector.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/FailureDetector.java b/src/java/org/apache/cassandra/gms/FailureDetector.java index b8c20d7..45867a3 100644 --- a/src/java/org/apache/cassandra/gms/FailureDetector.java +++ b/src/java/org/apache/cassandra/gms/FailureDetector.java @@ -177,20 +177,14 @@ public class FailureDetector implements IFailureDetector, FailureDetectorMBean { File file = FileUtils.createTempFile("failuredetector-", ".dat"); - OutputStream os = null; - try + try (OutputStream os = new BufferedOutputStream(new FileOutputStream(file, true))) { - os = new BufferedOutputStream(new FileOutputStream(file, true)); os.write(toString().getBytes()); } catch (IOException e) { throw new FSWriteError(e, file); } - finally - { - FileUtils.closeQuietly(os); - } } public void setPhiConvictThreshold(double phi)
