This is an automated email from the ASF dual-hosted git repository. samt pushed a commit to branch cassandra-3.0 in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 0a1e8d168bac0f562774814a808e313e1d2d6571 Merge: c092c46 c8c3c26 Author: Sam Tunnicliffe <s...@beobal.com> AuthorDate: Fri Jun 12 11:21:14 2020 +0100 Merge branch 'cassandra-2.2' into cassandra-3.0 CHANGES.txt | 1 + .../cassandra/cql3/functions/UDFunction.java | 22 ++++++++-------- src/java/org/apache/cassandra/db/Directories.java | 28 ++++++++++----------- ...Directories.java => DisallowedDirectories.java} | 29 +++++++++++----------- ...sMBean.java => DisallowedDirectoriesMBean.java} | 3 ++- .../db/compaction/AbstractCompactionStrategy.java | 6 ++--- .../cassandra/db/compaction/LeveledManifest.java | 2 +- .../cassandra/hints/HintsDispatchExecutor.java | 2 +- .../org/apache/cassandra/hints/HintsStore.java | 10 ++++---- .../cassandra/io/sstable/format/SSTableReader.java | 2 +- .../cassandra/service/DefaultFSErrorHandler.java | 6 ++--- test/unit/org/apache/cassandra/Util.java | 4 +-- .../org/apache/cassandra/db/DirectoriesTest.java | 2 +- ....java => CorruptedSSTablesCompactionsTest.java} | 17 +++++++------ 14 files changed, 69 insertions(+), 65 deletions(-) diff --cc CHANGES.txt index 3fdbb96,b10a057..d506dc8 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,19 -1,5 +1,20 @@@ -2.2.18 +3.0.21 + * Fix replica-side filtering returning stale data with CL > ONE (CASSANDRA-8272, CASSANDRA-8273) + * Fix duplicated row on 2.x upgrades when multi-rows range tombstones interact with collection ones (CASSANDRA-15805) + * Rely on snapshotted session infos on StreamResultFuture.maybeComplete to avoid race conditions (CASSANDRA-15667) + * EmptyType doesn't override writeValue so could attempt to write bytes when expected not to (CASSANDRA-15790) + * Fix index queries on partition key columns when some partitions contains only static data (CASSANDRA-13666) + * Avoid creating duplicate rows during major upgrades (CASSANDRA-15789) + * liveDiskSpaceUsed and totalDiskSpaceUsed get corrupted if IndexSummaryRedistribution gets interrupted (CASSANDRA-15674) + * Fix Debian init start/stop (CASSANDRA-15770) + * Fix infinite loop on index query paging in tables with clustering (CASSANDRA-14242) + * Fix chunk index overflow due to large sstable with small chunk length (CASSANDRA-15595) + * cqlsh return non-zero status when STDIN CQL fails (CASSANDRA-15623) + * Don't skip sstables in slice queries based only on local min/max/deletion timestamp (CASSANDRA-15690) + * Memtable memory allocations may deadlock (CASSANDRA-15367) + * Run evictFromMembership in GossipStage (CASSANDRA-15592) +Merged from 2.2: + * Fix nomenclature of allow and deny lists (CASSANDRA-15862) * Remove generated files from source artifact (CASSANDRA-15849) * Remove duplicated tools binaries from tarballs (CASSANDRA-15768) * Duplicate results with DISTINCT queries in mixed mode (CASSANDRA-15501) diff --cc src/java/org/apache/cassandra/cql3/functions/UDFunction.java index 7b69342,1e5cea6..27f9eb8 --- a/src/java/org/apache/cassandra/cql3/functions/UDFunction.java +++ b/src/java/org/apache/cassandra/cql3/functions/UDFunction.java @@@ -71,111 -49,10 +71,111 @@@ public abstract class UDFunction extend protected final String language; protected final String body; - protected final DataType[] argDataTypes; - protected final DataType returnDataType; + protected final TypeCodec<Object>[] argCodecs; + protected final TypeCodec<Object> returnCodec; protected final boolean calledOnNullInput; + // - // Access to classes is controlled via a whitelist and a blacklist. ++ // Access to classes is controlled via allow and disallow lists. + // + // When a class is requested (both during compilation and runtime), - // the whitelistedPatterns array is searched first, whether the ++ // the allowedPatterns array is searched first, whether the + // requested name matches one of the patterns. If not, nothing is + // returned from the class-loader - meaning ClassNotFoundException + // during runtime and "type could not resolved" during compilation. + // - // If a whitelisted pattern has been found, the blacklistedPatterns ++ // If an allowed pattern has been found, the disallowedPatterns + // array is searched for a match. If a match is found, class-loader + // rejects access. Otherwise the class/resource can be loaded. + // - private static final String[] whitelistedPatterns = ++ private static final String[] allowedPatterns = + { + "com/datastax/driver/core/", + "com/google/common/reflect/TypeToken", + "java/io/IOException.class", + "java/io/Serializable.class", + "java/lang/", + "java/math/", + "java/net/InetAddress.class", + "java/net/Inet4Address.class", + "java/net/Inet6Address.class", + "java/net/UnknownHostException.class", // req'd by InetAddress + "java/net/NetworkInterface.class", // req'd by InetAddress + "java/net/SocketException.class", // req'd by InetAddress + "java/nio/Buffer.class", + "java/nio/ByteBuffer.class", + "java/text/", + "java/time/", + "java/util/", + "org/apache/cassandra/cql3/functions/JavaUDF.class", + "org/apache/cassandra/exceptions/", + }; - // Only need to blacklist a pattern, if it would otherwise be allowed via whitelistedPatterns - private static final String[] blacklistedPatterns = ++ // Only need to disallow a pattern, if it would otherwise be allowed via allowedPatterns ++ private static final String[] disallowedPatterns = + { + "com/datastax/driver/core/Cluster.class", + "com/datastax/driver/core/Metrics.class", + "com/datastax/driver/core/NettyOptions.class", + "com/datastax/driver/core/Session.class", + "com/datastax/driver/core/Statement.class", + "com/datastax/driver/core/TimestampGenerator.class", // indirectly covers ServerSideTimestampGenerator + ThreadLocalMonotonicTimestampGenerator + "java/lang/Compiler.class", + "java/lang/InheritableThreadLocal.class", + "java/lang/Package.class", + "java/lang/Process.class", + "java/lang/ProcessBuilder.class", + "java/lang/ProcessEnvironment.class", + "java/lang/ProcessImpl.class", + "java/lang/Runnable.class", + "java/lang/Runtime.class", + "java/lang/Shutdown.class", + "java/lang/Thread.class", + "java/lang/ThreadGroup.class", + "java/lang/ThreadLocal.class", + "java/lang/instrument/", + "java/lang/invoke/", + "java/lang/management/", + "java/lang/ref/", + "java/lang/reflect/", + "java/util/ServiceLoader.class", + "java/util/Timer.class", + "java/util/concurrent/", + "java/util/function/", + "java/util/jar/", + "java/util/logging/", + "java/util/prefs/", + "java/util/spi/", + "java/util/stream/", + "java/util/zip/", + }; + + static boolean secureResource(String resource) + { + while (resource.startsWith("/")) + resource = resource.substring(1); + - for (String white : whitelistedPatterns) - if (resource.startsWith(white)) ++ for (String allowed : allowedPatterns) ++ if (resource.startsWith(allowed)) + { + - // resource is in whitelistedPatterns, let's see if it is not explicityl blacklisted - for (String black : blacklistedPatterns) - if (resource.startsWith(black)) ++ // resource is in allowedPatterns, let's see if it is not explicitly disallowed ++ for (String disallowed : disallowedPatterns) ++ if (resource.startsWith(disallowed)) + { + logger.trace("access denied: resource {}", resource); + return false; + } + + return true; + } + + logger.trace("access denied: resource {}", resource); + return false; + } + + // setup the UDF class loader with no parent class loader so that we have full control about what class/resource UDF uses + static final ClassLoader udfClassLoader = new UDFClassLoader(); + protected UDFunction(FunctionName name, List<ColumnIdentifier> argNames, List<AbstractType<?>> argTypes, diff --cc src/java/org/apache/cassandra/db/Directories.java index b104509,d1aa650..af3f63c --- a/src/java/org/apache/cassandra/db/Directories.java +++ b/src/java/org/apache/cassandra/db/Directories.java @@@ -331,37 -313,9 +331,37 @@@ public class Directorie } /** - * Returns a temporary subdirectory on non-blacklisted data directory ++ * Returns a temporary subdirectory on allowed data directory + * that _currently_ has {@code writeSize} bytes as usable space. + * This method does not create the temporary directory. + * - * @throws IOError if all directories are blacklisted. ++ * @throws IOError if all directories are disallowed. + */ + public File getTemporaryWriteableDirectoryAsFile(long writeSize) + { + File location = getLocationForDisk(getWriteableLocation(writeSize)); + if (location == null) + return null; + return new File(location, TMP_SUBDIR); + } + + public void removeTemporaryDirectories() + { + for (File dataDir : dataPaths) + { + File tmpDir = new File(dataDir, TMP_SUBDIR); + if (tmpDir.exists()) + { + logger.debug("Removing temporary directory {}", tmpDir); + FileUtils.deleteRecursive(tmpDir); + } + } + } + + /** - * Returns a non-blacklisted data directory that _currently_ has {@code writeSize} bytes as usable space. + * Returns an allowed data directory that _currently_ has {@code writeSize} bytes as usable space. * - * @throws IOError if all directories are blacklisted. + * @throws IOError if all directories are disallowed. */ public DataDirectory getWriteableLocation(long writeSize) { @@@ -369,13 -323,13 +369,13 @@@ long totalAvailable = 0L; - // pick directories with enough space and so that resulting sstable dirs aren't blacklisted for writes. + // pick directories with enough space and so that resulting sstable dirs aren't disallowed for writes. boolean tooBig = false; - for (DataDirectory dataDir : dataDirectories) + for (DataDirectory dataDir : paths) { - if (BlacklistedDirectories.isUnwritable(getLocationForDisk(dataDir))) + if (DisallowedDirectories.isUnwritable(getLocationForDisk(dataDir))) { - logger.trace("removing blacklisted candidate {}", dataDir.location); + logger.trace("removing disallowed candidate {}", dataDir.location); continue; } DataDirectoryCandidate candidate = new DataDirectoryCandidate(dataDir); @@@ -392,9 -346,9 +392,9 @@@ if (candidates.isEmpty()) if (tooBig) - throw new RuntimeException("Insufficient disk space to write " + writeSize + " bytes"); + throw new FSDiskFullWriteError(new IOException("Insufficient disk space to write " + writeSize + " bytes"), ""); else - throw new FSWriteError(new IOException("All configured data directories have been blacklisted as unwritable for erroring out"), ""); + throw new FSWriteError(new IOException("All configured data directories have been disallowed as unwritable for erroring out"), ""); // shortcut for single data directory systems if (candidates.size() == 1) @@@ -437,9 -391,9 +437,9 @@@ long writeSize = expectedTotalWriteSize / estimatedSSTables; long totalAvailable = 0L; - for (DataDirectory dataDir : dataDirectories) + for (DataDirectory dataDir : paths) { - if (BlacklistedDirectories.isUnwritable(getLocationForDisk(dataDir))) + if (DisallowedDirectories.isUnwritable(getLocationForDisk(dataDir))) continue; DataDirectoryCandidate candidate = new DataDirectoryCandidate(dataDir); // exclude directory if its total writeSize does not fit to data directory diff --cc src/java/org/apache/cassandra/db/DisallowedDirectories.java index afa726b,c0518e2..75b5e79 --- a/src/java/org/apache/cassandra/db/DisallowedDirectories.java +++ b/src/java/org/apache/cassandra/db/DisallowedDirectories.java @@@ -25,16 -25,14 +25,16 @@@ import java.util.Collections import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; +import com.google.common.annotations.VisibleForTesting; + - import org.apache.cassandra.utils.JVMStabilityInspector; import org.apache.cassandra.utils.MBeanWrapper; - public class BlacklistedDirectories implements BlacklistedDirectoriesMBean + public class DisallowedDirectories implements DisallowedDirectoriesMBean { - public static final String MBEAN_NAME = "org.apache.cassandra.db:type=BlacklistedDirectories"; - private static final Logger logger = LoggerFactory.getLogger(BlacklistedDirectories.class); - private static final BlacklistedDirectories instance = new BlacklistedDirectories(); + public static final String DEPRECATED_MBEAN_NAME = "org.apache.cassandra.db:type=BlacklistedDirectories"; + public static final String MBEAN_NAME = "org.apache.cassandra.db:type=DisallowedDirectories"; + private static final Logger logger = LoggerFactory.getLogger(DisallowedDirectories.class); + private static final DisallowedDirectories instance = new DisallowedDirectories(); private final Set<File> unreadableDirectories = new CopyOnWriteArraySet<File>(); private final Set<File> unwritableDirectories = new CopyOnWriteArraySet<File>(); @@@ -90,19 -89,8 +91,19 @@@ } /** + * Testing only! + * Clear the set of unwritable directories. + */ + @VisibleForTesting + public static void clearUnwritableUnsafe() + { + instance.unwritableDirectories.clear(); + } + + + /** - * Tells whether or not the directory is blacklisted for reads. - * @return whether or not the directory is blacklisted for reads. + * Tells whether or not the directory is disallowed for reads. + * @return whether or not the directory is disallowed for reads. */ public static boolean isUnreadable(File directory) { diff --cc src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java index 2348d19,f9ed780..7219504 --- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java @@@ -247,27 -243,22 +247,27 @@@ public abstract class AbstractCompactio } /** - * Filters SSTables that are to be blacklisted from the given collection + * Filters SSTables that are to be excluded from the given collection * - * @param originalCandidates The collection to check for blacklisted SSTables - * @return list of the SSTables with blacklisted ones filtered out + * @param originalCandidates The collection to check for excluded SSTables + * @return list of the SSTables with excluded ones filtered out */ - public static Iterable<SSTableReader> filterSuspectSSTables(Iterable<SSTableReader> originalCandidates) + public static List<SSTableReader> filterSuspectSSTables(Iterable<SSTableReader> originalCandidates) { - return Iterables.filter(originalCandidates, new Predicate<SSTableReader>() + List<SSTableReader> filtered = new ArrayList<>(); + for (SSTableReader sstable : originalCandidates) { - public boolean apply(SSTableReader sstable) - { - return !sstable.isMarkedSuspect(); - } - }); + if (!sstable.isMarkedSuspect()) + filtered.add(sstable); + } + return filtered; } + + public ScannerList getScanners(Collection<SSTableReader> sstables, Range<Token> range) + { + return range == null ? getScanners(sstables, (Collection<Range<Token>>)null) : getScanners(sstables, Collections.singleton(range)); + } /** * Returns a list of KeyScanners given sstables and a range on which to scan. * The default implementation simply grab one SSTableScanner per-sstable, but overriding this method diff --cc src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java index d6c28d4,0000000..c562dd0 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java +++ b/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java @@@ -1,309 -1,0 +1,309 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.hints; + +import java.io.File; +import java.net.InetAddress; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.BooleanSupplier; +import java.util.function.Function; +import java.util.function.Supplier; + +import com.google.common.util.concurrent.RateLimiter; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor; +import org.apache.cassandra.concurrent.NamedThreadFactory; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.io.FSReadError; +import org.apache.cassandra.service.StorageService; + +/** + * A multi-threaded (by default) executor for dispatching hints. + * + * Most of dispatch is triggered by {@link HintsDispatchTrigger} running every ~10 seconds. + */ +final class HintsDispatchExecutor +{ + private static final Logger logger = LoggerFactory.getLogger(HintsDispatchExecutor.class); + + private final File hintsDirectory; + private final ExecutorService executor; + private final AtomicBoolean isPaused; + private final Function<InetAddress, Boolean> isAlive; + private final Map<UUID, Future> scheduledDispatches; + + HintsDispatchExecutor(File hintsDirectory, int maxThreads, AtomicBoolean isPaused, Function<InetAddress, Boolean> isAlive) + { + this.hintsDirectory = hintsDirectory; + this.isPaused = isPaused; + this.isAlive = isAlive; + + scheduledDispatches = new ConcurrentHashMap<>(); + executor = new JMXEnabledThreadPoolExecutor(maxThreads, 1, TimeUnit.MINUTES, + new LinkedBlockingQueue<>(), + new NamedThreadFactory("HintsDispatcher", Thread.MIN_PRIORITY), + "internal"); + } + + /* + * It's safe to terminate dispatch in process and to deschedule dispatch. + */ + void shutdownBlocking() + { + scheduledDispatches.clear(); + executor.shutdownNow(); + try + { + executor.awaitTermination(1, TimeUnit.MINUTES); + } + catch (InterruptedException e) + { + throw new AssertionError(e); + } + } + + boolean isScheduled(HintsStore store) + { + return scheduledDispatches.containsKey(store.hostId); + } + + Future dispatch(HintsStore store) + { + return dispatch(store, store.hostId); + } + + Future dispatch(HintsStore store, UUID hostId) + { + /* + * It is safe to perform dispatch for the same host id concurrently in two or more threads, + * however there is nothing to win from it - so we don't. + * + * Additionally, having just one dispatch task per host id ensures that we'll never violate our per-destination + * rate limit, without having to share a ratelimiter between threads. + * + * It also simplifies reasoning about dispatch sessions. + */ + return scheduledDispatches.computeIfAbsent(hostId, uuid -> executor.submit(new DispatchHintsTask(store, hostId))); + } + + Future transfer(HintsCatalog catalog, Supplier<UUID> hostIdSupplier) + { + return executor.submit(new TransferHintsTask(catalog, hostIdSupplier)); + } + + void completeDispatchBlockingly(HintsStore store) + { + Future future = scheduledDispatches.get(store.hostId); + try + { + if (future != null) + future.get(); + } + catch (ExecutionException | InterruptedException e) + { + throw new RuntimeException(e); + } + } + + void interruptDispatch(UUID hostId) + { + Future future = scheduledDispatches.remove(hostId); + + if (null != future) + future.cancel(true); + } + + private final class TransferHintsTask implements Runnable + { + private final HintsCatalog catalog; + + /* + * Supplies target hosts to stream to. Generally returns the one the DynamicSnitch thinks is closest. + * We use a supplier here to be able to get a new host if the current one dies during streaming. + */ + private final Supplier<UUID> hostIdSupplier; + + private TransferHintsTask(HintsCatalog catalog, Supplier<UUID> hostIdSupplier) + { + this.catalog = catalog; + this.hostIdSupplier = hostIdSupplier; + } + + @Override + public void run() + { + UUID hostId = hostIdSupplier.get(); + InetAddress address = StorageService.instance.getEndpointForHostId(hostId); + logger.info("Transferring all hints to {}: {}", address, hostId); + if (transfer(hostId)) + return; + + logger.warn("Failed to transfer all hints to {}: {}; will retry in {} seconds", address, hostId, 10); + + try + { + TimeUnit.SECONDS.sleep(10); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + + hostId = hostIdSupplier.get(); + logger.info("Transferring all hints to {}: {}", address, hostId); + if (!transfer(hostId)) + { + logger.error("Failed to transfer all hints to {}: {}", address, hostId); + throw new RuntimeException("Failed to transfer all hints to " + hostId); + } + } + + private boolean transfer(UUID hostId) + { + catalog.stores() + .map(store -> new DispatchHintsTask(store, hostId)) + .forEach(Runnable::run); + + return !catalog.hasFiles(); + } + } + + private final class DispatchHintsTask implements Runnable + { + private final HintsStore store; + private final UUID hostId; + private final RateLimiter rateLimiter; + + DispatchHintsTask(HintsStore store, UUID hostId) + { + this.store = store; + this.hostId = hostId; + + // rate limit is in bytes per second. Uses Double.MAX_VALUE if disabled (set to 0 in cassandra.yaml). + // max rate is scaled by the number of nodes in the cluster (CASSANDRA-5272). + // the goal is to bound maximum hints traffic going towards a particular node from the rest of the cluster, + // not total outgoing hints traffic from this node - this is why the rate limiter is not shared between + // all the dispatch tasks (as there will be at most one dispatch task for a particular host id at a time). + int nodesCount = Math.max(1, StorageService.instance.getTokenMetadata().getAllEndpoints().size() - 1); + int throttleInKB = DatabaseDescriptor.getHintedHandoffThrottleInKB() / nodesCount; + this.rateLimiter = RateLimiter.create(throttleInKB == 0 ? Double.MAX_VALUE : throttleInKB * 1024); + } + + public void run() + { + try + { + dispatch(); + } + finally + { + scheduledDispatches.remove(hostId); + } + } + + private void dispatch() + { + while (true) + { + if (isPaused.get()) + break; + + HintsDescriptor descriptor = store.poll(); + if (descriptor == null) + break; + + try + { + if (!dispatch(descriptor)) + break; + } + catch (FSReadError e) + { + logger.error("Failed to dispatch hints file {}: file is corrupted ({})", descriptor.fileName(), e); + store.cleanUp(descriptor); - store.blacklist(descriptor); ++ store.markCorrupted(descriptor); + throw e; + } + } + } + + /* + * Will return true if dispatch was successful, false if we hit a failure (destination node went down, for example). + */ + private boolean dispatch(HintsDescriptor descriptor) + { + logger.trace("Dispatching hints file {}", descriptor.fileName()); + + InetAddress address = StorageService.instance.getEndpointForHostId(hostId); + if (address != null) + return deliver(descriptor, address); + + // address == null means the target no longer exist; find new home for each hint entry. + convert(descriptor); + return true; + } + + private boolean deliver(HintsDescriptor descriptor, InetAddress address) + { + File file = new File(hintsDirectory, descriptor.fileName()); + InputPosition offset = store.getDispatchOffset(descriptor); + + BooleanSupplier shouldAbort = () -> !isAlive.apply(address) || isPaused.get(); + try (HintsDispatcher dispatcher = HintsDispatcher.create(file, rateLimiter, address, descriptor.hostId, shouldAbort)) + { + if (offset != null) + dispatcher.seek(offset); + + if (dispatcher.dispatch()) + { + store.delete(descriptor); + store.cleanUp(descriptor); + logger.info("Finished hinted handoff of file {} to endpoint {}: {}", descriptor.fileName(), address, hostId); + return true; + } + else + { + store.markDispatchOffset(descriptor, dispatcher.dispatchPosition()); + store.offerFirst(descriptor); + logger.info("Finished hinted handoff of file {} to endpoint {}: {}, partially", descriptor.fileName(), address, hostId); + return false; + } + } + } + + // for each hint in the hints file for a node that isn't part of the ring anymore, write RF hints for each replica + private void convert(HintsDescriptor descriptor) + { + File file = new File(hintsDirectory, descriptor.fileName()); + + try (HintsReader reader = HintsReader.open(file, rateLimiter)) + { + reader.forEach(page -> page.hintsIterator().forEachRemaining(HintsService.instance::writeForAllReplicas)); + store.delete(descriptor); + store.cleanUp(descriptor); + logger.info("Finished converting hints file {}", descriptor.fileName()); + } + } + } +} diff --cc src/java/org/apache/cassandra/hints/HintsStore.java index 032de5a,0000000..b08fc72 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/hints/HintsStore.java +++ b/src/java/org/apache/cassandra/hints/HintsStore.java @@@ -1,220 -1,0 +1,220 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.hints; + +import java.io.File; +import java.io.IOException; +import java.net.InetAddress; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.ConcurrentLinkedQueue; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.gms.FailureDetector; +import org.apache.cassandra.io.FSWriteError; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.utils.SyncUtil; + +/** + * Encapsulates the state of a peer's hints: the queue of hints files for dispatch, and the current writer (if any). + * + * The queue for dispatch is multi-threading safe. + * + * The writer MUST only be accessed by {@link HintsWriteExecutor}. + */ +final class HintsStore +{ + private static final Logger logger = LoggerFactory.getLogger(HintsStore.class); + + public final UUID hostId; + private final File hintsDirectory; + private final ImmutableMap<String, Object> writerParams; + + private final Map<HintsDescriptor, InputPosition> dispatchPositions; + private final Deque<HintsDescriptor> dispatchDequeue; - private final Queue<HintsDescriptor> blacklistedFiles; ++ private final Queue<HintsDescriptor> corruptedFiles; + + // last timestamp used in a descriptor; make sure to not reuse the same timestamp for new descriptors. + private volatile long lastUsedTimestamp; + private volatile HintsWriter hintsWriter; + + private HintsStore(UUID hostId, File hintsDirectory, ImmutableMap<String, Object> writerParams, List<HintsDescriptor> descriptors) + { + this.hostId = hostId; + this.hintsDirectory = hintsDirectory; + this.writerParams = writerParams; + + dispatchPositions = new ConcurrentHashMap<>(); + dispatchDequeue = new ConcurrentLinkedDeque<>(descriptors); - blacklistedFiles = new ConcurrentLinkedQueue<>(); ++ corruptedFiles = new ConcurrentLinkedQueue<>(); + + //noinspection resource + lastUsedTimestamp = descriptors.stream().mapToLong(d -> d.timestamp).max().orElse(0L); + } + + static HintsStore create(UUID hostId, File hintsDirectory, ImmutableMap<String, Object> writerParams, List<HintsDescriptor> descriptors) + { + descriptors.sort((d1, d2) -> Long.compare(d1.timestamp, d2.timestamp)); + return new HintsStore(hostId, hintsDirectory, writerParams, descriptors); + } + + @VisibleForTesting + int getDispatchQueueSize() + { + return dispatchDequeue.size(); + } + + InetAddress address() + { + return StorageService.instance.getEndpointForHostId(hostId); + } + + boolean isLive() + { + InetAddress address = address(); + return address != null && FailureDetector.instance.isAlive(address); + } + + HintsDescriptor poll() + { + return dispatchDequeue.poll(); + } + + void offerFirst(HintsDescriptor descriptor) + { + dispatchDequeue.offerFirst(descriptor); + } + + void offerLast(HintsDescriptor descriptor) + { + dispatchDequeue.offerLast(descriptor); + } + + void deleteAllHints() + { + HintsDescriptor descriptor; + while ((descriptor = poll()) != null) + { + cleanUp(descriptor); + delete(descriptor); + } + - while ((descriptor = blacklistedFiles.poll()) != null) ++ while ((descriptor = corruptedFiles.poll()) != null) + { + cleanUp(descriptor); + delete(descriptor); + } + } + + void delete(HintsDescriptor descriptor) + { + File hintsFile = new File(hintsDirectory, descriptor.fileName()); + if (hintsFile.delete()) + logger.info("Deleted hint file {}", descriptor.fileName()); + else + logger.error("Failed to delete hint file {}", descriptor.fileName()); + + //noinspection ResultOfMethodCallIgnored + new File(hintsDirectory, descriptor.checksumFileName()).delete(); + } + + boolean hasFiles() + { + return !dispatchDequeue.isEmpty(); + } + + InputPosition getDispatchOffset(HintsDescriptor descriptor) + { + return dispatchPositions.get(descriptor); + } + + void markDispatchOffset(HintsDescriptor descriptor, InputPosition inputPosition) + { + dispatchPositions.put(descriptor, inputPosition); + } + + void cleanUp(HintsDescriptor descriptor) + { + dispatchPositions.remove(descriptor); + } + - void blacklist(HintsDescriptor descriptor) ++ void markCorrupted(HintsDescriptor descriptor) + { - blacklistedFiles.add(descriptor); ++ corruptedFiles.add(descriptor); + } + + /* + * Methods dealing with HintsWriter. + * + * All of these, with the exception of isWriting(), are for exclusively single-threaded use by HintsWriteExecutor. + */ + + boolean isWriting() + { + return hintsWriter != null; + } + + HintsWriter getOrOpenWriter() + { + if (hintsWriter == null) + hintsWriter = openWriter(); + return hintsWriter; + } + + HintsWriter getWriter() + { + return hintsWriter; + } + + private HintsWriter openWriter() + { + lastUsedTimestamp = Math.max(System.currentTimeMillis(), lastUsedTimestamp + 1); + HintsDescriptor descriptor = new HintsDescriptor(hostId, lastUsedTimestamp, writerParams); + + try + { + return HintsWriter.create(hintsDirectory, descriptor); + } + catch (IOException e) + { + throw new FSWriteError(e, descriptor.fileName()); + } + } + + void closeWriter() + { + if (hintsWriter != null) + { + hintsWriter.close(); + offerLast(hintsWriter.descriptor()); + hintsWriter = null; + SyncUtil.trySyncDir(hintsDirectory); + } + } + + void fsyncWriter() + { + if (hintsWriter != null) + hintsWriter.fsync(); + } +} diff --cc test/unit/org/apache/cassandra/Util.java index d758efe,f6b4771..a49440d --- a/test/unit/org/apache/cassandra/Util.java +++ b/test/unit/org/apache/cassandra/Util.java @@@ -539,113 -383,4 +539,113 @@@ public class Uti { thread.join(10000); } + + public static AssertionError runCatchingAssertionError(Runnable test) + { + try + { + test.run(); + return null; + } + catch (AssertionError e) + { + return e; + } + } + + /** + * Wrapper function used to run a test that can sometimes flake for uncontrollable reasons. + * + * If the given test fails on the first run, it is executed the given number of times again, expecting all secondary + * runs to succeed. If they do, the failure is understood as a flake and the test is treated as passing. + * + * Do not use this if the test is deterministic and its success is not influenced by external factors (such as time, + * selection of random seed, network failures, etc.). If the test can be made independent of such factors, it is + * probably preferable to do so rather than use this method. + * + * @param test The test to run. + * @param rerunsOnFailure How many times to re-run it if it fails. All reruns must pass. + * @param message Message to send to System.err on initial failure. + */ + public static void flakyTest(Runnable test, int rerunsOnFailure, String message) + { + AssertionError e = runCatchingAssertionError(test); + if (e == null) + return; // success + System.err.format("Test failed. %s%n" + + "Re-running %d times to verify it isn't failing more often than it should.%n" + + "Failure was: %s%n", message, rerunsOnFailure, e); + e.printStackTrace(); + + int rerunsFailed = 0; + for (int i = 0; i < rerunsOnFailure; ++i) + { + AssertionError t = runCatchingAssertionError(test); + if (t != null) + { + ++rerunsFailed; + e.addSuppressed(t); + } + } + if (rerunsFailed > 0) + { + System.err.format("Test failed in %d of the %d reruns.%n", rerunsFailed, rerunsOnFailure); + throw e; + } + + System.err.println("All reruns succeeded. Failure treated as flake."); + } + + // for use with Optional in tests, can be used as an argument to orElseThrow + public static Supplier<AssertionError> throwAssert(final String message) + { + return () -> new AssertionError(message); + } + + public static class UnfilteredSource extends AbstractUnfilteredRowIterator implements UnfilteredRowIterator + { + Iterator<Unfiltered> content; + + public UnfilteredSource(CFMetaData cfm, DecoratedKey partitionKey, Row staticRow, Iterator<Unfiltered> content) + { + super(cfm, + partitionKey, + DeletionTime.LIVE, + cfm.partitionColumns(), + staticRow != null ? staticRow : Rows.EMPTY_STATIC_ROW, + false, + EncodingStats.NO_STATS); + this.content = content; + } + + @Override + protected Unfiltered computeNext() + { + return content.hasNext() ? content.next() : endOfData(); + } + } + + public static UnfilteredPartitionIterator executeLocally(PartitionRangeReadCommand command, + ColumnFamilyStore cfs, + ReadOrderGroup orderGroup) + { + return command.queryStorage(cfs, orderGroup); + } + + public static Closeable markDirectoriesUnwriteable(ColumnFamilyStore cfs) + { + try + { + for ( ; ; ) + { + DataDirectory dir = cfs.getDirectories().getWriteableLocation(1); - BlacklistedDirectories.maybeMarkUnwritable(cfs.getDirectories().getLocationForDisk(dir)); ++ DisallowedDirectories.maybeMarkUnwritable(cfs.getDirectories().getLocationForDisk(dir)); + } + } + catch (IOError e) + { + // Expected -- marked all directories as unwritable + } - return () -> BlacklistedDirectories.clearUnwritableUnsafe(); ++ return () -> DisallowedDirectories.clearUnwritableUnsafe(); + } } diff --cc test/unit/org/apache/cassandra/db/compaction/CorruptedSSTablesCompactionsTest.java index 6378e09,5208401..9276737 --- a/test/unit/org/apache/cassandra/db/compaction/CorruptedSSTablesCompactionsTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/CorruptedSSTablesCompactionsTest.java @@@ -29,34 -29,26 +29,34 @@@ import org.junit.AfterClass import org.junit.BeforeClass; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.Util; -import org.apache.cassandra.config.KSMetaData; +import org.apache.cassandra.config.*; import org.apache.cassandra.db.*; +import org.apache.cassandra.db.marshal.LongType; import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.util.FileUtils; -import org.apache.cassandra.locator.SimpleStrategy; -import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.schema.*; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; -import static org.apache.cassandra.Util.cellname; - public class BlacklistingCompactionsTest + public class CorruptedSSTablesCompactionsTest { - private static final Logger logger = LoggerFactory.getLogger(BlacklistingCompactionsTest.class); ++ private static final Logger logger = LoggerFactory.getLogger(CorruptedSSTablesCompactionsTest.class); + + private static Random random; + - private static final String KEYSPACE1 = "BlacklistingCompactionsTest"; + private static final String KEYSPACE1 = "CorruptedSSTablesCompactionsTest"; - private static final String CF_STANDARD1 = "Standard1"; - // seed hardcoded to one we know works: - private static final Random random = new Random(1); + private static final String STANDARD_STCS = "Standard_STCS"; + private static final String STANDARD_LCS = "Standard_LCS"; + private static int maxValueSize; @After public void leakDetect() throws InterruptedException @@@ -112,18 -81,18 +112,19 @@@ } @Test - public void testBlacklistingWithSizeTieredCompactionStrategy() throws Exception + public void testCorruptedSSTablesWithSizeTieredCompactionStrategy() throws Exception { - testBlacklisting(STANDARD_STCS); - testCorruptedSSTables(SizeTieredCompactionStrategy.class.getCanonicalName()); ++ testCorruptedSSTables(STANDARD_STCS); } @Test - public void testBlacklistingWithLeveledCompactionStrategy() throws Exception + public void testCorruptedSSTablesWithLeveledCompactionStrategy() throws Exception { - testBlacklisting(STANDARD_LCS); - testCorruptedSSTables(LeveledCompactionStrategy.class.getCanonicalName()); ++ testCorruptedSSTables(STANDARD_LCS); } - private void testBlacklisting(String tableName) throws Exception - public void testCorruptedSSTables(String compactionStrategy) throws Exception ++ ++ public void testCorruptedSSTables(String tableName) throws Exception { // this test does enough rows to force multiple block indexes to be used Keyspace keyspace = Keyspace.open(KEYSPACE1); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org