Merge branch cassandra-3.0 into cassandra-3.11
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/689484b2 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/689484b2 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/689484b2 Branch: refs/heads/cassandra-3.11 Commit: 689484b25832dda7086730b496ba1eca1bf10b5d Parents: 8ea0afa 9b8692c Author: Benjamin Lerer <[email protected]> Authored: Tue Mar 28 16:17:37 2017 +0200 Committer: Benjamin Lerer <[email protected]> Committed: Tue Mar 28 16:20:10 2017 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + NEWS.txt | 4 +- .../db/commitlog/CommitLogSegment.java | 4 +- .../db/commitlog/MemoryMappedSegment.java | 4 +- .../cassandra/db/lifecycle/LogReplica.java | 10 +- .../cassandra/hints/ChecksummedDataInput.java | 4 +- .../apache/cassandra/hints/HintsCatalog.java | 6 +- .../org/apache/cassandra/hints/HintsReader.java | 2 +- .../org/apache/cassandra/hints/HintsWriter.java | 6 +- .../cassandra/io/sstable/SSTableRewriter.java | 4 +- .../io/sstable/format/SSTableReader.java | 4 +- .../apache/cassandra/io/util/ChannelProxy.java | 4 +- .../apache/cassandra/io/util/FileHandle.java | 4 +- .../cassandra/service/CassandraDaemon.java | 6 +- .../apache/cassandra/service/StartupChecks.java | 12 +- .../org/apache/cassandra/utils/CLibrary.java | 399 ------------------ .../apache/cassandra/utils/CLibraryDarwin.java | 131 ------ .../apache/cassandra/utils/CLibraryLinux.java | 131 ------ .../apache/cassandra/utils/CLibraryWindows.java | 103 ----- .../apache/cassandra/utils/CLibraryWrapper.java | 40 -- .../org/apache/cassandra/utils/HeapUtils.java | 2 +- .../apache/cassandra/utils/NativeLibrary.java | 403 +++++++++++++++++++ .../cassandra/utils/NativeLibraryDarwin.java | 129 ++++++ .../cassandra/utils/NativeLibraryLinux.java | 129 ++++++ .../cassandra/utils/NativeLibraryWindows.java | 124 ++++++ .../cassandra/utils/NativeLibraryWrapper.java | 44 ++ .../org/apache/cassandra/utils/SyncUtil.java | 6 +- .../org/apache/cassandra/utils/UUIDGen.java | 2 +- .../apache/cassandra/utils/WindowsTimer.java | 4 + .../apache/cassandra/utils/CLibraryTest.java | 44 -- .../cassandra/utils/NativeLibraryTest.java | 44 ++ 31 files changed, 921 insertions(+), 889 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/689484b2/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index c7cc26e,ca79a01..c9e93c4 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,23 -1,5 +1,24 @@@ -3.0.13 +3.11.0 + * cdc column addition strikes again (CASSANDRA-13382) + * Fix static column indexes (CASSANDRA-13277) + * DataOutputBuffer.asNewBuffer broken (CASSANDRA-13298) + * unittest CipherFactoryTest failed on MacOS (CASSANDRA-13370) + * Forbid SELECT restrictions and CREATE INDEX over non-frozen UDT columns (CASSANDRA-13247) + * Default logging we ship will incorrectly print "?:?" for "%F:%L" pattern (CASSANDRA-13317) + * Possible AssertionError in UnfilteredRowIteratorWithLowerBound (CASSANDRA-13366) + * Support unaligned memory access for AArch64 (CASSANDRA-13326) + * Improve SASI range iterator efficiency on intersection with an empty range (CASSANDRA-12915). + * Fix equality comparisons of columns using the duration type (CASSANDRA-13174) + * Obfuscate password in stress-graphs (CASSANDRA-12233) + * Move to FastThreadLocalThread and FastThreadLocal (CASSANDRA-13034) + * nodetool stopdaemon errors out (CASSANDRA-13030) + * Tables in system_distributed should not use gcgs of 0 (CASSANDRA-12954) + * Fix primary index calculation for SASI (CASSANDRA-12910) + * More fixes to the TokenAllocator (CASSANDRA-12990) + * NoReplicationTokenAllocator should work with zero replication factor (CASSANDRA-12983) + * Address message coalescing regression (CASSANDRA-12676) +Merged from 3.0: + * Use the Kernel32 library to retrieve the PID on Windows and fix startup checks (CASSANDRA-13333) * Fix code to not exchange schema across major versions (CASSANDRA-13274) * Dropping column results in "corrupt" SSTable (CASSANDRA-13337) * Bugs handling range tombstones in the sstable iterators (CASSANDRA-13340) http://git-wip-us.apache.org/repos/asf/cassandra/blob/689484b2/NEWS.txt ---------------------------------------------------------------------- diff --cc NEWS.txt index 4c2e217,b6faef4..1c14748 --- a/NEWS.txt +++ b/NEWS.txt @@@ -13,14 -13,20 +13,16 @@@ restore snapshots created with the prev 'sstableloader' tool. You can upgrade the file format of your snapshots using the provided 'sstableupgrade' tool. - -3.0.13 +3.11.0 ====== Upgrading --------- + - The NativeAccessMBean isAvailable method will only return true if the + native library has been successfully linked. Previously it was returning + true if JNA could be found but was not taking into account link failures. - -3.0.12 -====== - -Upgrading ---------- + - Primary ranges in the system.size_estimates table are now based on the keyspace + replication settings and adjacent ranges are no longer merged (CASSANDRA-9639). - In 2.1, the default for otc_coalescing_strategy was 'DISABLED'. In 2.2 and 3.0, it was changed to 'TIMEHORIZON', but that value was shown to be a performance regression. The default for 3.11.0 and newer has http://git-wip-us.apache.org/repos/asf/cassandra/blob/689484b2/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/689484b2/src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/689484b2/src/java/org/apache/cassandra/db/lifecycle/LogReplica.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/lifecycle/LogReplica.java index fd65f5b,44400d1..a8c39dc --- a/src/java/org/apache/cassandra/db/lifecycle/LogReplica.java +++ b/src/java/org/apache/cassandra/db/lifecycle/LogReplica.java @@@ -19,12 -19,9 +19,12 @@@ package org.apache.cassandra.db.lifecycle; import java.io.File; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import org.apache.cassandra.io.util.FileUtils; - import org.apache.cassandra.utils.CLibrary; + import org.apache.cassandra.utils.NativeLibrary; /** * Because a column family may have sstables on different disks and disks can @@@ -40,23 -37,22 +40,23 @@@ final class LogReplica implements AutoCloseable { private final File file; - private int folderDescriptor; + private int directoryDescriptor; + private final Map<String, String> errors = new HashMap<>(); - static LogReplica create(File folder, String fileName) + static LogReplica create(File directory, String fileName) { - return new LogReplica(new File(fileName), CLibrary.tryOpenDirectory(directory.getPath())); - return new LogReplica(new File(fileName), NativeLibrary.tryOpenDirectory(folder.getPath())); ++ return new LogReplica(new File(fileName), NativeLibrary.tryOpenDirectory(directory.getPath())); } static LogReplica open(File file) { - return new LogReplica(file, CLibrary.tryOpenDirectory(file.getParentFile().getPath())); + return new LogReplica(file, NativeLibrary.tryOpenDirectory(file.getParentFile().getPath())); } - LogReplica(File file, int folderDescriptor) + LogReplica(File file, int directoryDescriptor) { this.file = file; - this.folderDescriptor = folderDescriptor; + this.directoryDescriptor = directoryDescriptor; } File file() @@@ -85,15 -66,15 +85,15 @@@ FileUtils.appendAndSync(file, record.toString()); // If the file did not exist before appending the first - // line, then sync the folder as well since now it must exist + // line, then sync the directory as well since now it must exist if (!existed) - syncFolder(); + syncDirectory(); } - void syncFolder() + void syncDirectory() { - if (folderDescriptor >= 0) - NativeLibrary.trySync(folderDescriptor); + if (directoryDescriptor >= 0) - CLibrary.trySync(directoryDescriptor); ++ NativeLibrary.trySync(directoryDescriptor); } void delete() @@@ -109,10 -90,10 +109,10 @@@ public void close() { - if (folderDescriptor >= 0) + if (directoryDescriptor >= 0) { - CLibrary.tryCloseFD(directoryDescriptor); - NativeLibrary.tryCloseFD(folderDescriptor); - folderDescriptor = -1; ++ NativeLibrary.tryCloseFD(directoryDescriptor); + directoryDescriptor = -1; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/689484b2/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/hints/ChecksummedDataInput.java index 0db95af,095d7f4..d32faaf --- a/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java +++ b/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java @@@ -22,15 -22,12 +22,15 @@@ import java.io.IOException import java.nio.ByteBuffer; import java.util.zip.CRC32; -import org.apache.cassandra.io.util.ChannelProxy; -import org.apache.cassandra.io.util.DataPosition; -import org.apache.cassandra.io.util.RandomAccessReader; +import com.google.common.base.Preconditions; + +import org.apache.cassandra.io.compress.BufferType; +import org.apache.cassandra.io.util.*; - import org.apache.cassandra.utils.CLibrary; ++import org.apache.cassandra.utils.NativeLibrary; +import org.apache.cassandra.utils.memory.BufferPool; /** - * A {@link RandomAccessReader} wrapper that calctulates the CRC in place. + * A {@link RandomAccessReader} wrapper that calculates the CRC in place. * * Useful for {@link org.apache.cassandra.hints.HintsReader}, for example, where we must verify the CRC, yet don't want * to allocate an extra byte array just that purpose. The CRC can be embedded in the input stream and checked via checkCrc(). @@@ -206,18 -141,6 +206,18 @@@ public class ChecksummedDataInput exten crcPosition = buffer.position(); } + protected void readBuffer() + { + buffer.clear(); + while ((channel.read(buffer, bufferOffset)) == 0) {} + buffer.flip(); + } + + public void tryUncacheRead() + { - CLibrary.trySkipCache(getChannel().getFileDescriptor(), 0, getSourcePosition(), getPath()); ++ NativeLibrary.trySkipCache(getChannel().getFileDescriptor(), 0, getSourcePosition(), getPath()); + } + private void updateCrc() { if (crcPosition == buffer.position() || crcUpdateDisabled) http://git-wip-us.apache.org/repos/asf/cassandra/blob/689484b2/src/java/org/apache/cassandra/hints/HintsReader.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/689484b2/src/java/org/apache/cassandra/hints/HintsWriter.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/hints/HintsWriter.java index dca915a,31a440d..48b8c7c --- a/src/java/org/apache/cassandra/hints/HintsWriter.java +++ b/src/java/org/apache/cassandra/hints/HintsWriter.java @@@ -33,7 -33,8 +33,7 @@@ import org.apache.cassandra.config.Data import org.apache.cassandra.io.FSWriteError; import org.apache.cassandra.io.util.DataOutputBuffer; import org.apache.cassandra.io.util.DataOutputBufferFixed; - import org.apache.cassandra.utils.CLibrary; -import org.apache.cassandra.net.MessagingService; + import org.apache.cassandra.utils.NativeLibrary; import org.apache.cassandra.utils.SyncUtil; import org.apache.cassandra.utils.Throwables; http://git-wip-us.apache.org/repos/asf/cassandra/blob/689484b2/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/689484b2/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/689484b2/src/java/org/apache/cassandra/io/util/ChannelProxy.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/689484b2/src/java/org/apache/cassandra/io/util/FileHandle.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/io/util/FileHandle.java index c2e4a6c,0000000..9e03d3b mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/io/util/FileHandle.java +++ b/src/java/org/apache/cassandra/io/util/FileHandle.java @@@ -1,441 -1,0 +1,441 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.io.util; + +import java.util.Objects; +import java.util.Optional; + +import com.google.common.util.concurrent.RateLimiter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.cache.ChunkCache; +import org.apache.cassandra.io.compress.BufferType; +import org.apache.cassandra.io.compress.CompressionMetadata; - import org.apache.cassandra.utils.CLibrary; ++import org.apache.cassandra.utils.NativeLibrary; +import org.apache.cassandra.utils.concurrent.Ref; +import org.apache.cassandra.utils.concurrent.RefCounted; +import org.apache.cassandra.utils.concurrent.SharedCloseableImpl; + +import static org.apache.cassandra.utils.Throwables.maybeFail; + +/** + * {@link FileHandle} provides access to a file for reading, including the ones written by various {@link SequentialWriter} + * instances, and it is typically used by {@link org.apache.cassandra.io.sstable.format.SSTableReader}. + * + * Use {@link FileHandle.Builder} to create an instance, and call {@link #createReader()} (and its variants) to + * access the readers for the underlying file. + * + * You can use {@link Builder#complete()} several times during its lifecycle with different {@code overrideLength}(i.e. early opening file). + * For that reason, the builder keeps a reference to the file channel and makes a copy for each {@link Builder#complete()} call. + * Therefore, it is important to close the {@link Builder} when it is no longer needed, as well as any {@link FileHandle} + * instances. + */ +public class FileHandle extends SharedCloseableImpl +{ + private static final Logger logger = LoggerFactory.getLogger(FileHandle.class); + + public final ChannelProxy channel; + + public final long onDiskLength; + + /* + * Rebufferer factory to use when constructing RandomAccessReaders + */ + private final RebuffererFactory rebuffererFactory; + + /* + * Optional CompressionMetadata when dealing with compressed file + */ + private final Optional<CompressionMetadata> compressionMetadata; + + private FileHandle(Cleanup cleanup, + ChannelProxy channel, + RebuffererFactory rebuffererFactory, + CompressionMetadata compressionMetadata, + long onDiskLength) + { + super(cleanup); + this.rebuffererFactory = rebuffererFactory; + this.channel = channel; + this.compressionMetadata = Optional.ofNullable(compressionMetadata); + this.onDiskLength = onDiskLength; + } + + private FileHandle(FileHandle copy) + { + super(copy); + channel = copy.channel; + rebuffererFactory = copy.rebuffererFactory; + compressionMetadata = copy.compressionMetadata; + onDiskLength = copy.onDiskLength; + } + + /** + * @return Path to the file this factory is referencing + */ + public String path() + { + return channel.filePath(); + } + + public long dataLength() + { + return compressionMetadata.map(c -> c.dataLength).orElseGet(rebuffererFactory::fileLength); + } + + public RebuffererFactory rebuffererFactory() + { + return rebuffererFactory; + } + + public Optional<CompressionMetadata> compressionMetadata() + { + return compressionMetadata; + } + + @Override + public void addTo(Ref.IdentityCollection identities) + { + super.addTo(identities); + compressionMetadata.ifPresent(metadata -> metadata.addTo(identities)); + } + + @Override + public FileHandle sharedCopy() + { + return new FileHandle(this); + } + + /** + * Create {@link RandomAccessReader} with configured method of reading content of the file. + * + * @return RandomAccessReader for the file + */ + public RandomAccessReader createReader() + { + return createReader(null); + } + + /** + * Create {@link RandomAccessReader} with configured method of reading content of the file. + * Reading from file will be rate limited by given {@link RateLimiter}. + * + * @param limiter RateLimiter to use for rate limiting read + * @return RandomAccessReader for the file + */ + public RandomAccessReader createReader(RateLimiter limiter) + { + return new RandomAccessReader(instantiateRebufferer(limiter)); + } + + public FileDataInput createReader(long position) + { + RandomAccessReader reader = createReader(); + reader.seek(position); + return reader; + } + + /** + * Drop page cache from start to given {@code before}. + * + * @param before uncompressed position from start of the file to be dropped from cache. if 0, to end of file. + */ + public void dropPageCache(long before) + { + long position = compressionMetadata.map(metadata -> { + if (before >= metadata.dataLength) + return 0L; + else + return metadata.chunkFor(before).offset; + }).orElse(before); - CLibrary.trySkipCache(channel.getFileDescriptor(), 0, position, path()); ++ NativeLibrary.trySkipCache(channel.getFileDescriptor(), 0, position, path()); + } + + private Rebufferer instantiateRebufferer(RateLimiter limiter) + { + Rebufferer rebufferer = rebuffererFactory.instantiateRebufferer(); + + if (limiter != null) + rebufferer = new LimitingRebufferer(rebufferer, limiter, DiskOptimizationStrategy.MAX_BUFFER_SIZE); + return rebufferer; + } + + /** + * Perform clean up of all resources held by {@link FileHandle}. + */ + private static class Cleanup implements RefCounted.Tidy + { + final ChannelProxy channel; + final RebuffererFactory rebufferer; + final CompressionMetadata compressionMetadata; + final Optional<ChunkCache> chunkCache; + + private Cleanup(ChannelProxy channel, + RebuffererFactory rebufferer, + CompressionMetadata compressionMetadata, + ChunkCache chunkCache) + { + this.channel = channel; + this.rebufferer = rebufferer; + this.compressionMetadata = compressionMetadata; + this.chunkCache = Optional.ofNullable(chunkCache); + } + + public String name() + { + return channel.filePath(); + } + + public void tidy() + { + chunkCache.ifPresent(cache -> cache.invalidateFile(name())); + try + { + if (compressionMetadata != null) + { + compressionMetadata.close(); + } + } + finally + { + try + { + channel.close(); + } + finally + { + rebufferer.close(); + } + } + } + } + + /** + * Configures how the file will be read (compressed, mmapped, use cache etc.) + */ + public static class Builder implements AutoCloseable + { + private final String path; + + private ChannelProxy channel; + private CompressionMetadata compressionMetadata; + private MmappedRegions regions; + private ChunkCache chunkCache; + private int bufferSize = RandomAccessReader.DEFAULT_BUFFER_SIZE; + private BufferType bufferType = BufferType.OFF_HEAP; + + private boolean mmapped = false; + private boolean compressed = false; + + public Builder(String path) + { + this.path = path; + } + + public Builder(ChannelProxy channel) + { + this.channel = channel; + this.path = channel.filePath(); + } + + public Builder compressed(boolean compressed) + { + this.compressed = compressed; + return this; + } + + /** + * Set {@link ChunkCache} to use. + * + * @param chunkCache ChunkCache object to use for caching + * @return this object + */ + public Builder withChunkCache(ChunkCache chunkCache) + { + this.chunkCache = chunkCache; + return this; + } + + /** + * Provide {@link CompressionMetadata} to use when reading compressed file. + * + * @param metadata CompressionMetadata to use + * @return this object + */ + public Builder withCompressionMetadata(CompressionMetadata metadata) + { + this.compressed = Objects.nonNull(metadata); + this.compressionMetadata = metadata; + return this; + } + + /** + * Set whether to use mmap for reading + * + * @param mmapped true if using mmap + * @return this instance + */ + public Builder mmapped(boolean mmapped) + { + this.mmapped = mmapped; + return this; + } + + /** + * Set the buffer size to use (if appropriate). + * + * @param bufferSize Buffer size in bytes + * @return this instance + */ + public Builder bufferSize(int bufferSize) + { + this.bufferSize = bufferSize; + return this; + } + + /** + * Set the buffer type (on heap or off heap) to use (if appropriate). + * + * @param bufferType Buffer type to use + * @return this instance + */ + public Builder bufferType(BufferType bufferType) + { + this.bufferType = bufferType; + return this; + } + + /** + * Complete building {@link FileHandle} without overriding file length. + * + * @see #complete(long) + */ + public FileHandle complete() + { + return complete(-1L); + } + + /** + * Complete building {@link FileHandle} with the given length, which overrides the file length. + * + * @param overrideLength Override file length (in bytes) so that read cannot go further than this value. + * If the value is less than or equal to 0, then the value is ignored. + * @return Built file + */ + @SuppressWarnings("resource") + public FileHandle complete(long overrideLength) + { + if (channel == null) + { + channel = new ChannelProxy(path); + } + + ChannelProxy channelCopy = channel.sharedCopy(); + try + { + if (compressed && compressionMetadata == null) + compressionMetadata = CompressionMetadata.create(channelCopy.filePath()); + + long length = overrideLength > 0 ? overrideLength : compressed ? compressionMetadata.compressedFileLength : channelCopy.size(); + + RebuffererFactory rebuffererFactory; + if (mmapped) + { + if (compressed) + { + regions = MmappedRegions.map(channelCopy, compressionMetadata); + rebuffererFactory = maybeCached(new CompressedChunkReader.Mmap(channelCopy, compressionMetadata, + regions)); + } + else + { + updateRegions(channelCopy, length); + rebuffererFactory = new MmapRebufferer(channelCopy, length, regions.sharedCopy()); + } + } + else + { + regions = null; + if (compressed) + { + rebuffererFactory = maybeCached(new CompressedChunkReader.Standard(channelCopy, compressionMetadata)); + } + else + { + rebuffererFactory = maybeCached(new SimpleChunkReader(channelCopy, length, bufferType, bufferSize)); + } + } + Cleanup cleanup = new Cleanup(channelCopy, rebuffererFactory, compressionMetadata, chunkCache); + return new FileHandle(cleanup, channelCopy, rebuffererFactory, compressionMetadata, length); + } + catch (Throwable t) + { + channelCopy.close(); + throw t; + } + } + + public Throwable close(Throwable accumulate) + { + if (!compressed && regions != null) + accumulate = regions.close(accumulate); + if (channel != null) + return channel.close(accumulate); + + return accumulate; + } + + public void close() + { + maybeFail(close(null)); + } + + private RebuffererFactory maybeCached(ChunkReader reader) + { + if (chunkCache != null && chunkCache.capacity() > 0) + return chunkCache.wrap(reader); + return reader; + } + + private void updateRegions(ChannelProxy channel, long length) + { + if (regions != null && !regions.isValid(channel)) + { + Throwable err = regions.close(null); + if (err != null) + logger.error("Failed to close mapped regions", err); + + regions = null; + } + + if (regions == null) + regions = MmappedRegions.map(channel, length); + else + regions.extend(length); + } + } + + @Override + public String toString() + { + return getClass().getSimpleName() + "(path='" + path() + '\'' + + ", length=" + rebuffererFactory.fileLength() + + ')'; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/689484b2/src/java/org/apache/cassandra/service/CassandraDaemon.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/689484b2/src/java/org/apache/cassandra/service/StartupChecks.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/StartupChecks.java index 2774931,19b6620..89fa12c --- a/src/java/org/apache/cassandra/service/StartupChecks.java +++ b/src/java/org/apache/cassandra/service/StartupChecks.java @@@ -42,9 -39,7 +42,9 @@@ import org.apache.cassandra.exceptions. import org.apache.cassandra.exceptions.StartupException; import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.util.FileUtils; - import org.apache.cassandra.utils.CLibrary; -import org.apache.cassandra.utils.*; ++import org.apache.cassandra.utils.NativeLibrary; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.SigarLibrary; /** * Verifies that the system and environment is in a fit state to be started. @@@ -78,9 -73,8 +78,9 @@@ public class StartupCheck private final List<StartupCheck> DEFAULT_TESTS = ImmutableList.of(checkJemalloc, checkValidLaunchDate, checkJMXPorts, + checkJMXProperties, inspectJvmOptions, - checkJnaInitialization, + checkNativeLibraryInitialization, initSigarLibrary, checkMaxMapCount, checkDataDirs, @@@ -207,9 -188,9 +207,9 @@@ { public void execute() throws StartupException { - // Fail-fast if JNA is not available or failing to initialize properly - if (!CLibrary.jnaAvailable()) - throw new StartupException(StartupException.ERR_WRONG_MACHINE_STATE, "JNA failing to initialize properly. "); + // Fail-fast if the native library could not be linked. + if (!NativeLibrary.isAvailable()) - throw new StartupException(3, "The native library could not be initialized properly. "); ++ throw new StartupException(StartupException.ERR_WRONG_MACHINE_STATE, "The native library could not be initialized properly. "); } }; http://git-wip-us.apache.org/repos/asf/cassandra/blob/689484b2/src/java/org/apache/cassandra/utils/HeapUtils.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/689484b2/src/java/org/apache/cassandra/utils/NativeLibrary.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/utils/NativeLibrary.java index 0000000,f96859e..e461127 mode 000000,100644..100644 --- a/src/java/org/apache/cassandra/utils/NativeLibrary.java +++ b/src/java/org/apache/cassandra/utils/NativeLibrary.java @@@ -1,0 -1,400 +1,403 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.cassandra.utils; + + import java.io.File; + import java.io.FileDescriptor; + import java.io.FileInputStream; + import java.io.IOException; + import java.lang.reflect.Field; + import java.nio.channels.FileChannel; + import java.util.concurrent.TimeUnit; + + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; + + import com.sun.jna.LastErrorException; ++import sun.nio.ch.FileChannelImpl; + + import static org.apache.cassandra.utils.NativeLibrary.OSType.LINUX; + import static org.apache.cassandra.utils.NativeLibrary.OSType.MAC; + import static org.apache.cassandra.utils.NativeLibrary.OSType.WINDOWS; + import static org.apache.cassandra.utils.NativeLibrary.OSType.AIX; + + public final class NativeLibrary + { + private static final Logger logger = LoggerFactory.getLogger(NativeLibrary.class); + + public enum OSType + { + LINUX, + MAC, + WINDOWS, + AIX, + OTHER; + } + + private static final OSType osType; + + private static final int MCL_CURRENT; + private static final int MCL_FUTURE; + + private static final int ENOMEM = 12; + + private static final int F_GETFL = 3; /* get file status flags */ + private static final int F_SETFL = 4; /* set file status flags */ + private static final int F_NOCACHE = 48; /* Mac OS X specific flag, turns cache on/off */ + private static final int O_DIRECT = 040000; /* fcntl.h */ + private static final int O_RDONLY = 00000000; /* fcntl.h */ + + private static final int POSIX_FADV_NORMAL = 0; /* fadvise.h */ + private static final int POSIX_FADV_RANDOM = 1; /* fadvise.h */ + private static final int POSIX_FADV_SEQUENTIAL = 2; /* fadvise.h */ + private static final int POSIX_FADV_WILLNEED = 3; /* fadvise.h */ + private static final int POSIX_FADV_DONTNEED = 4; /* fadvise.h */ + private static final int POSIX_FADV_NOREUSE = 5; /* fadvise.h */ + + private static final NativeLibraryWrapper wrappedLibrary; + private static boolean jnaLockable = false; + ++ private static final Field FILE_DESCRIPTOR_FD_FIELD; ++ private static final Field FILE_CHANNEL_FD_FIELD; ++ + static + { ++ FILE_DESCRIPTOR_FD_FIELD = FBUtilities.getProtectedField(FileDescriptor.class, "fd"); ++ FILE_CHANNEL_FD_FIELD = FBUtilities.getProtectedField(FileChannelImpl.class, "fd"); ++ + // detect the OS type the JVM is running on and then set the CLibraryWrapper + // instance to a compatable implementation of CLibraryWrapper for that OS type + osType = getOsType(); + switch (osType) + { + case MAC: wrappedLibrary = new NativeLibraryDarwin(); break; + case WINDOWS: wrappedLibrary = new NativeLibraryWindows(); break; + case LINUX: + case AIX: + case OTHER: + default: wrappedLibrary = new NativeLibraryLinux(); + } + + if (System.getProperty("os.arch").toLowerCase().contains("ppc")) + { + if (osType == LINUX) + { + MCL_CURRENT = 0x2000; + MCL_FUTURE = 0x4000; + } + else if (osType == AIX) + { + MCL_CURRENT = 0x100; + MCL_FUTURE = 0x200; + } + else + { + MCL_CURRENT = 1; + MCL_FUTURE = 2; + } + } + else + { + MCL_CURRENT = 1; + MCL_FUTURE = 2; + } + } + + private NativeLibrary() {} + + /** + * @return the detected OSType of the Operating System running the JVM using crude string matching + */ + private static OSType getOsType() + { + String osName = System.getProperty("os.name").toLowerCase(); + if (osName.contains("mac")) + return MAC; + else if (osName.contains("windows")) + return WINDOWS; + else if (osName.contains("aix")) + return AIX; + else + // fall back to the Linux impl for all unknown OS types until otherwise implicitly supported as needed + return LINUX; + } + + private static int errno(RuntimeException e) + { + assert e instanceof LastErrorException; + try + { + return ((LastErrorException) e).getErrorCode(); + } + catch (NoSuchMethodError x) + { + logger.warn("Obsolete version of JNA present; unable to read errno. Upgrade to JNA 3.2.7 or later"); + return 0; + } + } + + /** + * Checks if the library has been successfully linked. + * @return {@code true} if the library has been successfully linked, {@code false} otherwise. + */ + public static boolean isAvailable() + { + return wrappedLibrary.isAvailable(); + } + + public static boolean jnaMemoryLockable() + { + return jnaLockable; + } + + public static void tryMlockall() + { + try + { + wrappedLibrary.callMlockall(MCL_CURRENT); + jnaLockable = true; + logger.info("JNA mlockall successful"); + } + catch (UnsatisfiedLinkError e) + { + // this will have already been logged by CLibrary, no need to repeat it + } + catch (RuntimeException e) + { + if (!(e instanceof LastErrorException)) + throw e; + + if (errno(e) == ENOMEM && osType == LINUX) + { + logger.warn("Unable to lock JVM memory (ENOMEM)." + + " This can result in part of the JVM being swapped out, especially with mmapped I/O enabled." + + " Increase RLIMIT_MEMLOCK or run Cassandra as root."); + } + else if (osType != MAC) + { + // OS X allows mlockall to be called, but always returns an error + logger.warn("Unknown mlockall error {}", errno(e)); + } + } + } + + public static void trySkipCache(String path, long offset, long len) + { + File f = new File(path); + if (!f.exists()) + return; + + try (FileInputStream fis = new FileInputStream(f)) + { + trySkipCache(getfd(fis.getChannel()), offset, len, path); + } + catch (IOException e) + { + logger.warn("Could not skip cache", e); + } + } + + public static void trySkipCache(int fd, long offset, long len, String path) + { + if (len == 0) + trySkipCache(fd, 0, 0, path); + + while (len > 0) + { + int sublen = (int) Math.min(Integer.MAX_VALUE, len); + trySkipCache(fd, offset, sublen, path); + len -= sublen; + offset -= sublen; + } + } + + public static void trySkipCache(int fd, long offset, int len, String path) + { + if (fd < 0) + return; + + try + { + if (osType == LINUX) + { + int result = wrappedLibrary.callPosixFadvise(fd, offset, len, POSIX_FADV_DONTNEED); + if (result != 0) + NoSpamLogger.log( + logger, + NoSpamLogger.Level.WARN, + 10, + TimeUnit.MINUTES, + "Failed trySkipCache on file: {} Error: " + wrappedLibrary.callStrerror(result).getString(0), + path); + } + } + catch (UnsatisfiedLinkError e) + { + // if JNA is unavailable just skipping Direct I/O + // instance of this class will act like normal RandomAccessFile + } + catch (RuntimeException e) + { + if (!(e instanceof LastErrorException)) + throw e; + - logger.warn(String.format("posix_fadvise(%d, %d) failed, errno (%d).", fd, offset, errno(e))); ++ logger.warn("posix_fadvise({}, {}) failed, errno ({}).", fd, offset, errno(e)); + } + } + + public static int tryFcntl(int fd, int command, int flags) + { + // fcntl return value may or may not be useful, depending on the command + int result = -1; + + try + { + result = wrappedLibrary.callFcntl(fd, command, flags); + } + catch (UnsatisfiedLinkError e) + { + // if JNA is unavailable just skipping + } + catch (RuntimeException e) + { + if (!(e instanceof LastErrorException)) + throw e; + - logger.warn(String.format("fcntl(%d, %d, %d) failed, errno (%d).", fd, command, flags, errno(e))); ++ logger.warn("fcntl({}, {}, {}) failed, errno ({}).", fd, command, flags, errno(e)); + } + + return result; + } + + public static int tryOpenDirectory(String path) + { + int fd = -1; + + try + { + return wrappedLibrary.callOpen(path, O_RDONLY); + } + catch (UnsatisfiedLinkError e) + { + // JNA is unavailable just skipping Direct I/O + } + catch (RuntimeException e) + { + if (!(e instanceof LastErrorException)) + throw e; + - logger.warn(String.format("open(%s, O_RDONLY) failed, errno (%d).", path, errno(e))); ++ logger.warn("open({}, O_RDONLY) failed, errno ({}).", path, errno(e)); + } + + return fd; + } + + public static void trySync(int fd) + { + if (fd == -1) + return; + + try + { + wrappedLibrary.callFsync(fd); + } + catch (UnsatisfiedLinkError e) + { + // JNA is unavailable just skipping Direct I/O + } + catch (RuntimeException e) + { + if (!(e instanceof LastErrorException)) + throw e; + + logger.warn("fsync({}) failed, errorno ({}) {}", fd, errno(e), e); + } + } + + public static void tryCloseFD(int fd) + { + if (fd == -1) + return; + + try + { + wrappedLibrary.callClose(fd); + } + catch (UnsatisfiedLinkError e) + { + // JNA is unavailable just skipping Direct I/O + } + catch (RuntimeException e) + { + if (!(e instanceof LastErrorException)) + throw e; + - logger.warn(String.format("close(%d) failed, errno (%d).", fd, errno(e))); ++ logger.warn("close({}) failed, errno ({}).", fd, errno(e)); + } + } + + public static int getfd(FileChannel channel) + { - Field field = FBUtilities.getProtectedField(channel.getClass(), "fd"); - + try + { - return getfd((FileDescriptor)field.get(channel)); ++ return getfd((FileDescriptor)FILE_CHANNEL_FD_FIELD.get(channel)); + } + catch (IllegalArgumentException|IllegalAccessException e) + { + logger.warn("Unable to read fd field from FileChannel"); + } + return -1; + } + + /** + * Get system file descriptor from FileDescriptor object. + * @param descriptor - FileDescriptor objec to get fd from + * @return file descriptor, -1 or error + */ + public static int getfd(FileDescriptor descriptor) + { - Field field = FBUtilities.getProtectedField(descriptor.getClass(), "fd"); - + try + { - return field.getInt(descriptor); ++ return FILE_DESCRIPTOR_FD_FIELD.getInt(descriptor); + } + catch (Exception e) + { + JVMStabilityInspector.inspectThrowable(e); + logger.warn("Unable to read fd field from FileDescriptor"); + } + + return -1; + } + + /** + * @return the PID of the JVM or -1 if we failed to get the PID + */ + public static long getProcessID() + { + try + { + return wrappedLibrary.callGetpid(); + } + catch (Exception e) + { + logger.info("Failed to get PID from JNA", e); + } + + return -1; + } + } http://git-wip-us.apache.org/repos/asf/cassandra/blob/689484b2/src/java/org/apache/cassandra/utils/NativeLibraryWindows.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/utils/NativeLibraryWindows.java index 0000000,0868c7a..e6e823c mode 000000,100644..100644 --- a/src/java/org/apache/cassandra/utils/NativeLibraryWindows.java +++ b/src/java/org/apache/cassandra/utils/NativeLibraryWindows.java @@@ -1,0 -1,124 +1,124 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.cassandra.utils; + + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; + + import com.sun.jna.LastErrorException; + import com.sun.jna.Native; + import com.sun.jna.Pointer; + + /** + * A {@code NativeLibraryWrapper} implementation for Windows. - * <p> This implementation only offer support for the {@code callGetpid} method ++ * <p> This implementation only offers support for the {@code callGetpid} method + * using the Windows/Kernel32 library.</p> + * + * @see org.apache.cassandra.utils.NativeLibraryWrapper + * @see NativeLibrary + */ + public class NativeLibraryWindows implements NativeLibraryWrapper + { - private static boolean available; - + private static final Logger logger = LoggerFactory.getLogger(NativeLibraryWindows.class); + ++ private static boolean available; ++ + static + { + try + { + Native.register("kernel32"); + available = true; + } + catch (NoClassDefFoundError e) + { + logger.warn("JNA not found. Native methods will be disabled."); + } + catch (UnsatisfiedLinkError e) + { + logger.error("Failed to link the Windows/Kernel32 library against JNA. Native methods will be unavailable.", e); + } + catch (NoSuchMethodError e) + { + logger.warn("Obsolete version of JNA present; unable to register Windows/Kernel32 library. Upgrade to JNA 3.2.7 or later"); + } + } + + /** + * Retrieves the process identifier of the calling process (<a href='https://msdn.microsoft.com/en-us/library/windows/desktop/ms683180(v=vs.85).aspx'>GetCurrentProcessId function</a>). + * + * @return the process identifier of the calling process + */ + private static native long GetCurrentProcessId() throws LastErrorException; + + public int callMlockall(int flags) throws UnsatisfiedLinkError, RuntimeException + { + throw new UnsatisfiedLinkError(); + } + + public int callMunlockall() throws UnsatisfiedLinkError, RuntimeException + { + throw new UnsatisfiedLinkError(); + } + + public int callFcntl(int fd, int command, long flags) throws UnsatisfiedLinkError, RuntimeException + { + throw new UnsatisfiedLinkError(); + } + + public int callPosixFadvise(int fd, long offset, int len, int flag) throws UnsatisfiedLinkError, RuntimeException + { + throw new UnsatisfiedLinkError(); + } + + public int callOpen(String path, int flags) throws UnsatisfiedLinkError, RuntimeException + { + throw new UnsatisfiedLinkError(); + } + + public int callFsync(int fd) throws UnsatisfiedLinkError, RuntimeException + { + throw new UnsatisfiedLinkError(); + } + + public int callClose(int fd) throws UnsatisfiedLinkError, RuntimeException + { + throw new UnsatisfiedLinkError(); + } + + public Pointer callStrerror(int errnum) throws UnsatisfiedLinkError, RuntimeException + { + throw new UnsatisfiedLinkError(); + } + + /** + * @return the PID of the JVM running + * @throws UnsatisfiedLinkError if we fail to link against Sigar + * @throws RuntimeException if another unexpected error is thrown by Sigar + */ + public long callGetpid() throws UnsatisfiedLinkError, RuntimeException + { + return GetCurrentProcessId(); + } + + public boolean isAvailable() + { + return available; + } + } http://git-wip-us.apache.org/repos/asf/cassandra/blob/689484b2/src/java/org/apache/cassandra/utils/UUIDGen.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/689484b2/src/java/org/apache/cassandra/utils/WindowsTimer.java ----------------------------------------------------------------------
