Merge branch 'cassandra-3.0' into cassandra-3.5
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/af9b9cd5 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/af9b9cd5 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/af9b9cd5 Branch: refs/heads/cassandra-3.5 Commit: af9b9cd5ae248746475ada5434d08370148bbce0 Parents: 4111e4f bd4cab2 Author: Josh McKenzie <josh.mcken...@datastax.com> Authored: Fri Apr 1 11:51:13 2016 -0400 Committer: Josh McKenzie <josh.mcken...@datastax.com> Committed: Fri Apr 1 11:52:45 2016 -0400 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/db/ColumnFamilyStore.java | 134 ++++++++++------ .../org/apache/cassandra/db/Directories.java | 16 +- src/java/org/apache/cassandra/db/Memtable.java | 29 +++- .../db/commitlog/CommitLogSegmentManager.java | 4 +- .../cassandra/io/util/DiskAwareRunnable.java | 54 ------- .../apache/cassandra/cql3/OutOfSpaceTest.java | 157 +++++++++++++++++++ 7 files changed, 276 insertions(+), 119 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/af9b9cd5/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index b542a67,b9376bc..73d9ab5 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -109,9 -75,9 +109,10 @@@ Merged from 2.2 * (cqlsh) Support utf-8/cp65001 encoding on Windows (CASSANDRA-11030) * Fix paging on DISTINCT queries repeats result when first row in partition changes (CASSANDRA-10010) + * (cqlsh) Support timezone conversion using pytz (CASSANDRA-10397) * cqlsh: change default encoding to UTF-8 (CASSANDRA-11124) Merged from 2.1: + * Fix out-of-space error treatment in memtable flushing (CASSANDRA-11448). * Don't do defragmentation if reading from repaired sstables (CASSANDRA-10342) * Fix streaming_socket_timeout_in_ms not enforced (CASSANDRA-11286) * Avoid dropping message too quickly due to missing unit conversion (CASSANDRA-11302) http://git-wip-us.apache.org/repos/asf/cassandra/blob/af9b9cd5/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java index f193c4d,aa12b80..6e01b34 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@@ -910,6 -893,7 +910,7 @@@ public class ColumnFamilyStore implemen final OpOrder.Barrier writeBarrier; final CountDownLatch latch = new CountDownLatch(1); final ReplayPosition lastReplayPosition; - volatile FSWriteError flushFailure = null; ++ volatile Throwable flushFailure = null; private PostFlush(boolean flushSecondaryIndexes, OpOrder.Barrier writeBarrier, ReplayPosition lastReplayPosition) { @@@ -951,6 -936,9 +953,9 @@@ } metric.pendingFlushes.dec(); + + if (flushFailure != null) - throw flushFailure; ++ throw Throwables.propagate(flushFailure); } } @@@ -1050,81 -1038,26 +1055,106 @@@ metric.memtableSwitchCount.inc(); - for (Memtable memtable : memtables) + try { - List<Future<SSTableMultiWriter>> futures = new ArrayList<>(); - long totalBytesOnDisk = 0; - long maxBytesOnDisk = 0; - long minBytesOnDisk = Long.MAX_VALUE; - List<SSTableReader> sstables = new ArrayList<>(); - try (LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.FLUSH)) + for (Memtable memtable : memtables) + { - // flush the memtable - MoreExecutors.sameThreadExecutor().execute(memtable.flushRunnable()); - reclaim(memtable); ++ flushMemtable(memtable); + } + } - catch (FSWriteError e) ++ catch (Throwable t) + { - JVMStabilityInspector.inspectThrowable(e); - // If we weren't killed, try to continue work but do not allow CommitLog to be discarded. - postFlush.flushFailure = e; ++ JVMStabilityInspector.inspectThrowable(t); ++ postFlush.flushFailure = t; + } - + // signal the post-flush we've done our work + postFlush.latch.countDown(); + } + ++ public void flushMemtable(Memtable memtable) ++ { ++ List<Future<SSTableMultiWriter>> futures = new ArrayList<>(); ++ long totalBytesOnDisk = 0; ++ long maxBytesOnDisk = 0; ++ long minBytesOnDisk = Long.MAX_VALUE; ++ List<SSTableReader> sstables = new ArrayList<>(); ++ try (LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.FLUSH)) ++ { ++ List<Memtable.FlushRunnable> flushRunnables = null; ++ List<SSTableMultiWriter> flushResults = null; ++ ++ try + { + // flush the memtable - List<Memtable.FlushRunnable> flushRunnables = memtable.flushRunnables(txn); ++ flushRunnables = memtable.flushRunnables(txn); + + for (int i = 0; i < flushRunnables.size(); i++) + futures.add(perDiskflushExecutors[i].submit(flushRunnables.get(i))); + - List<SSTableMultiWriter> flushResults = Lists.newArrayList(FBUtilities.waitOnFutures(futures)); ++ flushResults = Lists.newArrayList(FBUtilities.waitOnFutures(futures)); ++ } ++ catch (Throwable t) ++ { ++ t = memtable.abortRunnables(flushRunnables, t); ++ t = txn.abort(t); ++ throw Throwables.propagate(t); ++ } + - try ++ try ++ { ++ Iterator<SSTableMultiWriter> writerIterator = flushResults.iterator(); ++ while (writerIterator.hasNext()) + { - Iterator<SSTableMultiWriter> writerIterator = flushResults.iterator(); - while (writerIterator.hasNext()) ++ @SuppressWarnings("resource") ++ SSTableMultiWriter writer = writerIterator.next(); ++ if (writer.getFilePointer() > 0) + { - @SuppressWarnings("resource") - SSTableMultiWriter writer = writerIterator.next(); - if (writer.getFilePointer() > 0) - { - writer.setOpenResult(true).prepareToCommit(); - } - else - { - maybeFail(writer.abort(null)); - writerIterator.remove(); - } ++ writer.setOpenResult(true).prepareToCommit(); ++ } ++ else ++ { ++ maybeFail(writer.abort(null)); ++ writerIterator.remove(); + } + } - catch (Throwable t) - { - for (SSTableMultiWriter writer : flushResults) - t = writer.abort(t); - t = txn.abort(t); - Throwables.propagate(t); - } ++ } ++ catch (Throwable t) ++ { ++ for (SSTableMultiWriter writer : flushResults) ++ t = writer.abort(t); ++ t = txn.abort(t); ++ Throwables.propagate(t); ++ } + - txn.prepareToCommit(); ++ txn.prepareToCommit(); + - Throwable accumulate = null; - for (SSTableMultiWriter writer : flushResults) - accumulate = writer.commit(accumulate); ++ Throwable accumulate = null; ++ for (SSTableMultiWriter writer : flushResults) ++ accumulate = writer.commit(accumulate); + - maybeFail(txn.commit(accumulate)); ++ maybeFail(txn.commit(accumulate)); + - for (SSTableMultiWriter writer : flushResults) ++ for (SSTableMultiWriter writer : flushResults) ++ { ++ Collection<SSTableReader> flushedSSTables = writer.finished(); ++ for (SSTableReader sstable : flushedSSTables) + { - Collection<SSTableReader> flushedSSTables = writer.finished(); - for (SSTableReader sstable : flushedSSTables) ++ if (sstable != null) + { - if (sstable != null) - { - sstables.add(sstable); - long size = sstable.bytesOnDisk(); - totalBytesOnDisk += size; - maxBytesOnDisk = Math.max(maxBytesOnDisk, size); - minBytesOnDisk = Math.min(minBytesOnDisk, size); - } ++ sstables.add(sstable); ++ long size = sstable.bytesOnDisk(); ++ totalBytesOnDisk += size; ++ maxBytesOnDisk = Math.max(maxBytesOnDisk, size); ++ minBytesOnDisk = Math.min(minBytesOnDisk, size); + } + } + } - memtable.cfs.replaceFlushed(memtable, sstables); - reclaim(memtable); - logger.debug("Flushed to {} ({} sstables, {} bytes), biggest {} bytes, smallest {} bytes", sstables, sstables.size(), totalBytesOnDisk, maxBytesOnDisk, minBytesOnDisk); + } - // signal the post-flush we've done our work - postFlush.latch.countDown(); ++ memtable.cfs.replaceFlushed(memtable, sstables); ++ reclaim(memtable); ++ logger.debug("Flushed to {} ({} sstables, {} bytes), biggest {} bytes, smallest {} bytes", sstables, sstables.size(), totalBytesOnDisk, maxBytesOnDisk, minBytesOnDisk); + } + private void reclaim(final Memtable memtable) { // issue a read barrier for reclaiming the memory, and offload the wait to another thread http://git-wip-us.apache.org/repos/asf/cassandra/blob/af9b9cd5/src/java/org/apache/cassandra/db/Directories.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/Directories.java index 73c6007,5f23baa..5bd2cf0 --- a/src/java/org/apache/cassandra/db/Directories.java +++ b/src/java/org/apache/cassandra/db/Directories.java @@@ -320,7 -306,7 +320,7 @@@ public class Directorie * which may return any non-blacklisted directory - even a data directory that has no usable space. * Do not use this method in production code. * -- * @throws IOError if all directories are blacklisted. ++ * @throws FSWriteError if all directories are blacklisted. */ public File getDirectoryForNewSSTables() { @@@ -330,11 -316,11 +330,14 @@@ /** * Returns a non-blacklisted data directory that _currently_ has {@code writeSize} bytes as usable space. * -- * @throws IOError if all directories are blacklisted. ++ * @throws FSWriteError if all directories are blacklisted. */ public File getWriteableLocationAsFile(long writeSize) { -- return getLocationForDisk(getWriteableLocation(writeSize)); ++ File location = getLocationForDisk(getWriteableLocation(writeSize)); ++ if (location == null) ++ throw new FSWriteError(new IOException("No configured data directory contains enough space to write " + writeSize + " bytes"), ""); ++ return location; } /** @@@ -366,9 -352,9 +369,10 @@@ } /** -- * Returns a non-blacklisted data directory that _currently_ has {@code writeSize} bytes as usable space. ++ * Returns a non-blacklisted data directory that _currently_ has {@code writeSize} bytes as usable space, null if ++ * there is not enough space left in all directories. * -- * @throws IOError if all directories are blacklisted. ++ * @throws FSWriteError if all directories are blacklisted. */ public DataDirectory getWriteableLocation(long writeSize) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/af9b9cd5/src/java/org/apache/cassandra/db/Memtable.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/Memtable.java index 244c7b6,5d5f7bf..bc054b4 --- a/src/java/org/apache/cassandra/db/Memtable.java +++ b/src/java/org/apache/cassandra/db/Memtable.java @@@ -24,6 -25,6 +24,7 @@@ import java.util.concurrent.atomic.Atom import java.util.concurrent.atomic.AtomicReference; import com.google.common.annotations.VisibleForTesting; ++import com.google.common.base.Throwables; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@@ -44,10 -44,11 +45,11 @@@ import org.apache.cassandra.db.rows.Unf import org.apache.cassandra.dht.*; import org.apache.cassandra.dht.Murmur3Partitioner.LongToken; import org.apache.cassandra.index.transactions.UpdateTransaction; ++import org.apache.cassandra.io.FSWriteError; import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.SSTableMultiWriter; import org.apache.cassandra.io.sstable.SSTableTxnWriter; -import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.sstable.metadata.MetadataCollector; -import org.apache.cassandra.io.util.DiskAwareRunnable; import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; @@@ -255,32 -254,9 +257,47 @@@ public class Memtable implements Compar return partitions.size(); } - public FlushRunnable flushRunnable() + public List<FlushRunnable> flushRunnables(LifecycleTransaction txn) { - return new FlushRunnable(lastReplayPosition.get()); + List<Range<Token>> localRanges = Range.sort(StorageService.instance.getLocalRanges(cfs.keyspace.getName())); + + if (!cfs.getPartitioner().splitter().isPresent() || localRanges.isEmpty()) + return Collections.singletonList(new FlushRunnable(lastReplayPosition.get(), txn)); + + return createFlushRunnables(localRanges, txn); + } + + private List<FlushRunnable> createFlushRunnables(List<Range<Token>> localRanges, LifecycleTransaction txn) + { + assert cfs.getPartitioner().splitter().isPresent(); + + Directories.DataDirectory[] locations = cfs.getDirectories().getWriteableLocations(); + List<PartitionPosition> boundaries = StorageService.getDiskBoundaries(localRanges, cfs.getPartitioner(), locations); + List<FlushRunnable> runnables = new ArrayList<>(boundaries.size()); + PartitionPosition rangeStart = cfs.getPartitioner().getMinimumToken().minKeyBound(); + ReplayPosition context = lastReplayPosition.get(); - for (int i = 0; i < boundaries.size(); i++) ++ try + { - PartitionPosition t = boundaries.get(i); - runnables.add(new FlushRunnable(context, rangeStart, t, locations[i], txn)); - rangeStart = t; ++ for (int i = 0; i < boundaries.size(); i++) ++ { ++ PartitionPosition t = boundaries.get(i); ++ runnables.add(new FlushRunnable(context, rangeStart, t, locations[i], txn)); ++ rangeStart = t; ++ } ++ return runnables; ++ } ++ catch (Throwable e) ++ { ++ throw Throwables.propagate(abortRunnables(runnables, e)); + } - return runnables; ++ } ++ ++ public Throwable abortRunnables(List<FlushRunnable> runnables, Throwable t) ++ { ++ if (runnables != null) ++ for (FlushRunnable runnable : runnables) ++ t = runnable.writer.abort(t); ++ return t; } public String toString() @@@ -387,12 -345,21 +404,12 @@@ * 1.2); // bloom filter and row index overhead this.isBatchLogTable = cfs.name.equals(SystemKeyspace.BATCHES) && cfs.keyspace.getName().equals(SystemKeyspace.NAME); - } - public long getExpectedWriteSize() - { - return estimatedSize; - } + if (flushLocation == null) - writer = createFlushWriter(txn, cfs.getSSTablePath(getDirectories().getLocationForDisk(getDirectories().getWriteableLocation(estimatedSize))), columnsCollector.get(), statsCollector.get()); ++ writer = createFlushWriter(txn, cfs.getSSTablePath(getDirectories().getWriteableLocationAsFile(estimatedSize)), columnsCollector.get(), statsCollector.get()); + else + writer = createFlushWriter(txn, cfs.getSSTablePath(getDirectories().getLocationForDisk(flushLocation)), columnsCollector.get(), statsCollector.get()); - protected void runMayThrow() throws Exception - { - long writeSize = getExpectedWriteSize(); - Directories.DataDirectory dataDirectory = getWriteDirectory(writeSize); - File sstableDirectory = cfs.getDirectories().getLocationForDisk(dataDirectory); - assert sstableDirectory != null : "Flush task is not bound to any disk"; - Collection<SSTableReader> sstables = writeSortedContents(context, sstableDirectory); - cfs.replaceFlushed(Memtable.this, sstables); } protected Directories getDirectories() http://git-wip-us.apache.org/repos/asf/cassandra/blob/af9b9cd5/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/af9b9cd5/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java index a083218,1a15d6f..0000000 deleted file mode 100644,100644 --- a/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java +++ /dev/null @@@ -1,54 -1,42 +1,0 @@@ --/* -- * Licensed to the Apache Software Foundation (ASF) under one -- * or more contributor license agreements. See the NOTICE file -- * distributed with this work for additional information -- * regarding copyright ownership. The ASF licenses this file -- * to you under the Apache License, Version 2.0 (the -- * "License"); you may not use this file except in compliance -- * with the License. You may obtain a copy of the License at -- * -- * http://www.apache.org/licenses/LICENSE-2.0 -- * -- * Unless required by applicable law or agreed to in writing, software -- * distributed under the License is distributed on an "AS IS" BASIS, -- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -- * See the License for the specific language governing permissions and -- * limitations under the License. -- */ --package org.apache.cassandra.io.util; - -import java.io.IOException; -- --import org.apache.cassandra.db.Directories; -import org.apache.cassandra.io.FSWriteError; --import org.apache.cassandra.utils.WrappedRunnable; -- --public abstract class DiskAwareRunnable extends WrappedRunnable --{ -- protected Directories.DataDirectory getWriteDirectory(long writeSize) -- { - Directories.DataDirectory directory; - directory = getDirectory(); - - if (directory == null) // ok panic - write anywhere - directory = getDirectories().getWriteableLocation(writeSize); - - Directories.DataDirectory directory = getDirectories().getWriteableLocation(writeSize); -- if (directory == null) - throw new RuntimeException("Insufficient disk space to write " + writeSize + " bytes"); - throw new FSWriteError(new IOException("Insufficient disk space to write " + writeSize + " bytes"), ""); -- -- return directory; -- } -- -- /** -- * Get sstable directories for the CF. -- * @return Directories instance for the CF. -- */ -- protected abstract Directories getDirectories(); - protected abstract Directories.DataDirectory getDirectory(); - - /** - * Called if no disk is available with free space for the full write size. - * @return true if the scope of the task was successfully reduced. - */ - public boolean reduceScopeForLimitedSpace() - { - return false; - } --}