Merge branch 'cassandra-3.0' into cassandra-3.11
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/66e21459 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/66e21459 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/66e21459 Branch: refs/heads/trunk Commit: 66e214592875e296bb540a966f1648f1106b2464 Parents: 1cafc3c 0fe82be Author: Yuki Morishita <[email protected]> Authored: Tue Dec 13 15:59:14 2016 -0800 Committer: Yuki Morishita <[email protected]> Committed: Tue Dec 13 15:59:14 2016 -0800 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/db/ColumnFamilyStore.java | 10 ++- .../cassandra/db/PartitionRangeReadCommand.java | 4 +- .../cassandra/db/compaction/CompactionTask.java | 81 ++++++++++---------- .../cassandra/db/lifecycle/LogTransaction.java | 3 +- .../apache/cassandra/db/lifecycle/Tracker.java | 34 ++++---- .../cassandra/index/SecondaryIndexManager.java | 4 +- .../db/lifecycle/LifecycleTransactionTest.java | 5 +- .../cassandra/db/lifecycle/TrackerTest.java | 6 +- .../org/apache/cassandra/tools/ToolsTester.java | 3 - 10 files changed, 85 insertions(+), 66 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/66e21459/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index 28ebf36,145afb9..f95dd81 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -168,12 -59,6 +168,13 @@@ Merged from 3.0 * Correct log message for statistics of offheap memtable flush (CASSANDRA-12776) * Explicitly set locale for string validation (CASSANDRA-12541,CASSANDRA-12542,CASSANDRA-12543,CASSANDRA-12545) Merged from 2.2: ++ * Temporarily fix bug that creates commit log when running offline tools (CASSANDRA-8616) + * Reduce granuality of OpOrder.Group during index build (CASSANDRA-12796) + * Test bind parameters and unset parameters in InsertUpdateIfConditionTest (CASSANDRA-12980) + * Use saved tokens when setting local tokens on StorageService.joinRing (CASSANDRA-12935) + * cqlsh: fix DESC TYPES errors (CASSANDRA-12914) + * Fix leak on skipped SSTables in sstableupgrade (CASSANDRA-12899) + * Avoid blocking gossip during pending range calculation (CASSANDRA-12281) * Fix purgeability of tombstones with max timestamp (CASSANDRA-12792) * Fail repair if participant dies during sync or anticompaction (CASSANDRA-12901) * cqlsh COPY: unprotected pk values before converting them if not using prepared statements (CASSANDRA-12863) http://git-wip-us.apache.org/repos/asf/cassandra/blob/66e21459/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 881fb00,39ed804..a5f76bd4 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@@ -405,9 -388,13 +405,13 @@@ public class ColumnFamilyStore implemen logger.info("Initializing {}.{}", keyspace.getName(), name); - // scan for sstables corresponding to this cf and load them - data = new Tracker(this, loadSSTables); + // Create Memtable only on online + Memtable initialMemtable = null; + if (DatabaseDescriptor.isDaemonInitialized()) - initialMemtable = new Memtable(new AtomicReference<>(CommitLog.instance.getContext()), this); ++ initialMemtable = new Memtable(new AtomicReference<>(CommitLog.instance.getCurrentPosition()), this); + data = new Tracker(initialMemtable, loadSSTables); + // scan for sstables corresponding to this cf and load them if (data.loadsstables) { Directories.SSTableLister sstableFiles = directories.sstableLister(Directories.OnTxnErr.IGNORE).skipTemporary(true); @@@ -2118,7 -1957,7 +2122,7 @@@ { public Void call() { - cfs.data.reset(); - cfs.data.reset(new Memtable(new AtomicReference<>(ReplayPosition.NONE), cfs)); ++ cfs.data.reset(new Memtable(new AtomicReference<>(CommitLogPosition.NONE), cfs)); return null; } }, true, false); http://git-wip-us.apache.org/repos/asf/cassandra/blob/66e21459/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java index 50b568e,17adef0..045fc26 --- a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java +++ b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java @@@ -221,7 -199,8 +221,9 @@@ public class PartitionRangeReadCommand if (!sstable.isRepaired()) oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, sstable.getMinLocalDeletionTime()); } - return checkCacheFilter(UnfilteredPartitionIterators.mergeLazily(iterators, nowInSec()), cfs); ++ // iterators can be empty for offline tools + return iterators.isEmpty() ? EmptyIterators.unfilteredPartition(metadata(), isForThrift()) + : checkCacheFilter(UnfilteredPartitionIterators.mergeLazily(iterators, nowInSec()), cfs); } catch (RuntimeException | Error e) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/66e21459/src/java/org/apache/cassandra/db/compaction/CompactionTask.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/compaction/CompactionTask.java index 2f90c7b,f0a1f47..a9d6c7c --- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java @@@ -228,47 -213,29 +228,50 @@@ public class CompactionTask extends Abs } } -- // log a bunch of statistics about the result and save to system table compaction_history - - long durationInNano = System.nanoTime() - start; - long dTime = TimeUnit.NANOSECONDS.toMillis(durationInNano); - long startsize = inputSizeBytes; - long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); - long startsize = SSTableReader.getTotalBytes(transaction.originals()); -- long endsize = SSTableReader.getTotalBytes(newSStables); -- double ratio = (double) endsize / (double) startsize; -- -- StringBuilder newSSTableNames = new StringBuilder(); -- for (SSTableReader reader : newSStables) -- newSSTableNames.append(reader.descriptor.baseFilename()).append(","); - - long totalSourceRows = 0; - for (int i = 0; i < mergedRowCounts.length; i++) - totalSourceRows += mergedRowCounts[i] * (i + 1); - - String mergeSummary = updateCompactionHistory(cfs.keyspace.getName(), cfs.getColumnFamilyName(), mergedRowCounts, startsize, endsize); - logger.debug(String.format("Compacted (%s) %d sstables to [%s] to level=%d. %s to %s (~%d%% of original) in %,dms. Read Throughput = %s, Write Throughput = %s, Row Throughput = ~%,d/s. %,d total partitions merged to %,d. Partition merge counts were {%s}", - taskId, - transaction.originals().size(), - newSSTableNames.toString(), - getLevel(), - FBUtilities.prettyPrintMemory(startsize), - FBUtilities.prettyPrintMemory(endsize), - (int) (ratio * 100), - dTime, - FBUtilities.prettyPrintMemoryPerSecond(startsize, durationInNano), - FBUtilities.prettyPrintMemoryPerSecond(endsize, durationInNano), - (int) totalSourceCQLRows / (TimeUnit.NANOSECONDS.toSeconds(durationInNano) + 1), - totalSourceRows, - totalKeysWritten, - mergeSummary)); - logger.trace("CF Total Bytes Compacted: {}", FBUtilities.prettyPrintMemory(CompactionTask.addToTotalBytesCompacted(endsize))); - logger.trace("Actual #keys: {}, Estimated #keys:{}, Err%: {}", totalKeysWritten, estimatedKeys, ((double)(totalKeysWritten - estimatedKeys)/totalKeysWritten)); - cfs.getCompactionStrategyManager().compactionLogger.compaction(startTime, transaction.originals(), System.currentTimeMillis(), newSStables); - - // update the metrics - cfs.metric.compactionBytesWritten.inc(endsize); -- - if (offline) + if (transaction.isOffline()) + { Refs.release(Refs.selfRefs(newSStables)); + } + else + { - double mbps = dTime > 0 ? (double) endsize / (1024 * 1024) / ((double) dTime / 1000) : 0; - Summary mergeSummary = updateCompactionHistory(cfs.keyspace.getName(), cfs.getColumnFamilyName(), mergedRowCounts, startsize, endsize); - logger.debug(String.format("Compacted (%s) %d sstables to [%s] to level=%d. %,d bytes to %,d (~%d%% of original) in %,dms = %fMB/s. %,d total partitions merged to %,d. Partition merge counts were {%s}", - taskId, transaction.originals().size(), newSSTableNames.toString(), getLevel(), startsize, endsize, (int) (ratio * 100), dTime, mbps, mergeSummary.totalSourceRows, totalKeysWritten, mergeSummary.partitionMerge)); - logger.trace(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize))); ++ // log a bunch of statistics about the result and save to system table compaction_history ++ ++ long durationInNano = System.nanoTime() - start; ++ long dTime = TimeUnit.NANOSECONDS.toMillis(durationInNano); ++ long startsize = inputSizeBytes; ++ long endsize = SSTableReader.getTotalBytes(newSStables); ++ double ratio = (double) endsize / (double) startsize; ++ ++ StringBuilder newSSTableNames = new StringBuilder(); ++ for (SSTableReader reader : newSStables) ++ newSSTableNames.append(reader.descriptor.baseFilename()).append(","); ++ long totalSourceRows = 0; ++ for (int i = 0; i < mergedRowCounts.length; i++) ++ totalSourceRows += mergedRowCounts[i] * (i + 1); ++ ++ String mergeSummary = updateCompactionHistory(cfs.keyspace.getName(), cfs.getTableName(), mergedRowCounts, startsize, endsize); ++ logger.debug(String.format("Compacted (%s) %d sstables to [%s] to level=%d. %s to %s (~%d%% of original) in %,dms. Read Throughput = %s, Write Throughput = %s, Row Throughput = ~%,d/s. %,d total partitions merged to %,d. Partition merge counts were {%s}", ++ taskId, ++ transaction.originals().size(), ++ newSSTableNames.toString(), ++ getLevel(), ++ FBUtilities.prettyPrintMemory(startsize), ++ FBUtilities.prettyPrintMemory(endsize), ++ (int) (ratio * 100), ++ dTime, ++ FBUtilities.prettyPrintMemoryPerSecond(startsize, durationInNano), ++ FBUtilities.prettyPrintMemoryPerSecond(endsize, durationInNano), ++ (int) totalSourceCQLRows / (TimeUnit.NANOSECONDS.toSeconds(durationInNano) + 1), ++ totalSourceRows, ++ totalKeysWritten, ++ mergeSummary)); ++ logger.trace("CF Total Bytes Compacted: {}", FBUtilities.prettyPrintMemory(CompactionTask.addToTotalBytesCompacted(endsize))); + logger.trace("Actual #keys: {}, Estimated #keys:{}, Err%: {}", totalKeysWritten, estimatedKeys, ((double)(totalKeysWritten - estimatedKeys)/totalKeysWritten)); ++ cfs.getCompactionStrategyManager().compactionLogger.compaction(startTime, transaction.originals(), System.currentTimeMillis(), newSStables); ++ ++ // update the metrics ++ cfs.metric.compactionBytesWritten.inc(endsize); + } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/66e21459/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/66e21459/src/java/org/apache/cassandra/db/lifecycle/Tracker.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/lifecycle/Tracker.java index f464e08,9feaa3e..e2fcb06 --- a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java +++ b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java @@@ -31,8 -31,7 +31,7 @@@ import com.google.common.collect.* import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Directories; import org.apache.cassandra.db.Memtable; - import org.apache.cassandra.db.commitlog.CommitLog; -import org.apache.cassandra.db.commitlog.ReplayPosition; +import org.apache.cassandra.db.commitlog.CommitLogPosition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/cassandra/blob/66e21459/src/java/org/apache/cassandra/index/SecondaryIndexManager.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/66e21459/test/unit/org/apache/cassandra/db/lifecycle/LifecycleTransactionTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/db/lifecycle/LifecycleTransactionTest.java index 6435e3e,0d87cc9..4514b72 --- a/test/unit/org/apache/cassandra/db/lifecycle/LifecycleTransactionTest.java +++ b/test/unit/org/apache/cassandra/db/lifecycle/LifecycleTransactionTest.java @@@ -30,6 -31,8 +31,8 @@@ import junit.framework.Assert import org.apache.cassandra.MockSchema; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ColumnFamilyStore; + import org.apache.cassandra.db.Memtable; -import org.apache.cassandra.db.commitlog.ReplayPosition; ++import org.apache.cassandra.db.commitlog.CommitLogPosition; import org.apache.cassandra.db.compaction.OperationType; import org.apache.cassandra.db.lifecycle.LifecycleTransaction.ReaderState; import org.apache.cassandra.db.lifecycle.LifecycleTransaction.ReaderState.Action; @@@ -268,7 -271,7 +271,7 @@@ public class LifecycleTransactionTest e private static Tracker tracker(ColumnFamilyStore cfs, List<SSTableReader> readers) { - Tracker tracker = new Tracker(cfs, false); - Tracker tracker = new Tracker(new Memtable(new AtomicReference<>(ReplayPosition.NONE), cfs), false); ++ Tracker tracker = new Tracker(new Memtable(new AtomicReference<>(CommitLogPosition.NONE), cfs), false); tracker.addInitialSSTables(readers); return tracker; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/66e21459/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/66e21459/test/unit/org/apache/cassandra/tools/ToolsTester.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/tools/ToolsTester.java index 97b19c9,0000000..ead4e31 mode 100644,000000..100644 --- a/test/unit/org/apache/cassandra/tools/ToolsTester.java +++ b/test/unit/org/apache/cassandra/tools/ToolsTester.java @@@ -1,296 -1,0 +1,293 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tools; + +import java.io.File; +import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.lang.management.ThreadInfo; +import java.lang.management.ThreadMXBean; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.security.Permission; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import org.apache.commons.io.FileUtils; +import org.junit.BeforeClass; + +import org.slf4j.LoggerFactory; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Base unit test class for standalone tools + */ +public abstract class ToolsTester +{ + private static List<ThreadInfo> initialThreads; + + static final String[] EXPECTED_THREADS_WITH_SCHEMA = { - "(NativePool|SlabPool|HeapPool)Cleaner", - "COMMIT-LOG-ALLOCATOR", - "COMMIT-LOG-WRITER", + "PerDiskMemtableFlushWriter_0:[1-9]", + "MemtablePostFlush:[1-9]", + "MemtableFlushWriter:[1-9]", + "MemtableReclaimMemory:[1-9]", + }; + static final String[] OPTIONAL_THREADS_WITH_SCHEMA = { + "ScheduledTasks:[1-9]", + "OptionalTasks:[1-9]", + "Reference-Reaper:[1-9]", + "LocalPool-Cleaner:[1-9]", + "CacheCleanupExecutor:[1-9]", + "CompactionExecutor:[1-9]", + "ValidationExecutor:[1-9]", + "NonPeriodicTasks:[1-9]", + "Sampler:[1-9]", + "SecondaryIndexManagement:[1-9]", + "Strong-Reference-Leak-Detector:[1-9]", + "Background_Reporter:[1-9]", + "EXPIRING-MAP-REAPER:[1-9]", + }; + + public void assertNoUnexpectedThreadsStarted(String[] expectedThreadNames, String[] optionalThreadNames) + { + ThreadMXBean threads = ManagementFactory.getThreadMXBean(); + + Set<String> initial = initialThreads + .stream() + .map(ThreadInfo::getThreadName) + .collect(Collectors.toSet()); + + Set<String> current = Arrays.stream(threads.getThreadInfo(threads.getAllThreadIds())) + .map(ThreadInfo::getThreadName) + .collect(Collectors.toSet()); + + List<Pattern> expected = expectedThreadNames != null + ? Arrays.stream(expectedThreadNames).map(Pattern::compile).collect(Collectors.toList()) + : Collections.emptyList(); + + List<Pattern> optional = optionalThreadNames != null + ? Arrays.stream(optionalThreadNames).map(Pattern::compile).collect(Collectors.toList()) + : Collections.emptyList(); + + current.removeAll(initial); + + List<Pattern> notPresent = expected.stream() + .filter(threadNamePattern -> !current.stream().anyMatch(threadName -> threadNamePattern.matcher(threadName).matches())) + .collect(Collectors.toList()); + + Set<String> remain = current.stream() + .filter(threadName -> expected.stream().anyMatch(pattern -> pattern.matcher(threadName).matches())) + .filter(threadName -> optional.stream().anyMatch(pattern -> pattern.matcher(threadName).matches())) + .collect(Collectors.toSet()); + + if (!current.isEmpty()) + System.err.println("Unexpected thread names: " + remain); + if (!notPresent.isEmpty()) + System.err.println("Mandatory thread missing: " + notPresent); + + assertTrue("Wrong thread status", remain.isEmpty() && notPresent.isEmpty()); + } + + public void assertSchemaNotLoaded() + { + assertClassNotLoaded("org.apache.cassandra.config.Schema"); + } + + public void assertSchemaLoaded() + { + assertClassLoaded("org.apache.cassandra.config.Schema"); + } + + public void assertKeyspaceNotLoaded() + { + assertClassNotLoaded("org.apache.cassandra.db.Keyspace"); + } + + public void assertKeyspaceLoaded() + { + assertClassLoaded("org.apache.cassandra.db.Keyspace"); + } + + public void assertServerNotLoaded() + { + assertClassNotLoaded("org.apache.cassandra.transport.Server"); + } + + public void assertSystemKSNotLoaded() + { + assertClassNotLoaded("org.apache.cassandra.db.SystemKeyspace"); + } + + public void assertCLSMNotLoaded() + { + assertClassNotLoaded("org.apache.cassandra.db.commitlog.CommitLogSegmentManager"); + } + + public void assertClassLoaded(String clazz) + { + assertClassLoadedStatus(clazz, true); + } + + public void assertClassNotLoaded(String clazz) + { + assertClassLoadedStatus(clazz, false); + } + + private void assertClassLoadedStatus(String clazz, boolean expected) + { + for (ClassLoader cl = Thread.currentThread().getContextClassLoader(); cl != null; cl = cl.getParent()) + { + try + { + Method mFindLoadedClass = ClassLoader.class.getDeclaredMethod("findLoadedClass", String.class); + mFindLoadedClass.setAccessible(true); + boolean loaded = mFindLoadedClass.invoke(cl, clazz) != null; + + if (expected) + { + if (loaded) + return; + } + else + assertFalse(clazz + " has been loaded", loaded); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + } + + if (expected) + fail(clazz + " has not been loaded"); + } + + public void runTool(int expectedExitCode, String clazz, String... args) + { + try + { + // install security manager to get informed about the exit-code + System.setSecurityManager(new SecurityManager() + { + public void checkExit(int status) + { + throw new SystemExitException(status); + } + + public void checkPermission(Permission perm) + { + } + + public void checkPermission(Permission perm, Object context) + { + } + }); + + try + { + Class.forName(clazz).getDeclaredMethod("main", String[].class).invoke(null, (Object) args); + } + catch (InvocationTargetException e) + { + Throwable cause = e.getCause(); + if (cause instanceof Error) + throw (Error) cause; + if (cause instanceof RuntimeException) + throw (RuntimeException) cause; + throw e; + } + + assertEquals("Unexpected exit code", expectedExitCode, 0); + } + catch (SystemExitException e) + { + assertEquals("Unexpected exit code", expectedExitCode, e.status); + } + catch (InvocationTargetException e) + { + throw new RuntimeException(e.getTargetException()); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + finally + { + // uninstall security manager + System.setSecurityManager(null); + } + } + + @BeforeClass + public static void setupTester() + { + System.setProperty("cassandra.partitioner", "org.apache.cassandra.dht.Murmur3Partitioner"); + + // may start an async appender + LoggerFactory.getLogger(ToolsTester.class); + + ThreadMXBean threads = ManagementFactory.getThreadMXBean(); + initialThreads = Arrays.asList(threads.getThreadInfo(threads.getAllThreadIds())); + } + + public static class SystemExitException extends Error + { + public final int status; + + public SystemExitException(int status) + { + this.status = status; + } + } + + public static String findOneSSTable(String ks, String cf) throws IOException + { + File cfDir = sstableDir(ks, cf); + File[] sstableFiles = cfDir.listFiles((file) -> file.isFile() && file.getName().endsWith("-Data.db")); + return sstableFiles[0].getAbsolutePath(); + } + + public static String sstableDirName(String ks, String cf) throws IOException + { + return sstableDir(ks, cf).getAbsolutePath(); + } + + public static File sstableDir(String ks, String cf) throws IOException + { + File dataDir = copySSTables(); + File ksDir = new File(dataDir, ks); + File[] cfDirs = ksDir.listFiles((dir, name) -> cf.equals(name) || name.startsWith(cf + '-')); + return cfDirs[0]; + } + + public static File copySSTables() throws IOException + { + File dataDir = new File("build/test/cassandra/data"); + File srcDir = new File("test/data/legacy-sstables/ma"); + FileUtils.copyDirectory(new File(srcDir, "legacy_tables"), new File(dataDir, "legacy_sstables")); + return dataDir; + } +}
