Merge branch 'cassandra-2.2' into cassandra-3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/05bacc75 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/05bacc75 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/05bacc75 Branch: refs/heads/trunk Commit: 05bacc756e983e2850af7fbd265951983e66f4a0 Parents: 70ee4ed dbefa85 Author: Carl Yeksigian <c...@apache.org> Authored: Wed Jun 15 10:52:55 2016 -0400 Committer: Carl Yeksigian <c...@apache.org> Committed: Wed Jun 15 10:52:55 2016 -0400 ---------------------------------------------------------------------- .../cql3/selection/SelectionColumnMapping.java | 20 ++++++++++++++++++ .../cql3/selection/SelectionColumns.java | 20 ++++++++++++++++++ .../db/lifecycle/LogAwareFileLister.java | 20 ++++++++++++++++++ .../apache/cassandra/db/lifecycle/LogFile.java | 20 ++++++++++++++++++ .../cassandra/db/lifecycle/LogRecord.java | 20 ++++++++++++++++++ .../db/lifecycle/SSTableIntervalTree.java | 20 ++++++++++++++++++ .../cassandra/db/lifecycle/SSTableSet.java | 22 +++++++++++++++++++- .../cassandra/db/transform/BaseIterator.java | 20 ++++++++++++++++++ .../cassandra/db/transform/BasePartitions.java | 20 ++++++++++++++++++ .../apache/cassandra/db/transform/BaseRows.java | 20 ++++++++++++++++++ .../apache/cassandra/db/transform/Filter.java | 20 ++++++++++++++++++ .../db/transform/FilteredPartitions.java | 20 ++++++++++++++++++ .../cassandra/db/transform/FilteredRows.java | 20 ++++++++++++++++++ .../cassandra/db/transform/MoreContents.java | 20 ++++++++++++++++++ .../cassandra/db/transform/MorePartitions.java | 20 ++++++++++++++++++ .../apache/cassandra/db/transform/MoreRows.java | 20 ++++++++++++++++++ .../apache/cassandra/db/transform/Stack.java | 20 ++++++++++++++++++ .../db/transform/StoppingTransformation.java | 20 ++++++++++++++++++ .../cassandra/db/transform/Transformation.java | 20 ++++++++++++++++++ .../db/transform/UnfilteredPartitions.java | 20 ++++++++++++++++++ .../cassandra/db/transform/UnfilteredRows.java | 20 ++++++++++++++++++ src/java/org/apache/cassandra/index/Index.java | 20 ++++++++++++++++++ .../apache/cassandra/index/IndexRegistry.java | 20 ++++++++++++++++++ .../index/internal/CassandraIndex.java | 20 ++++++++++++++++++ .../index/internal/CassandraIndexSearcher.java | 20 ++++++++++++++++++ .../cassandra/index/internal/IndexEntry.java | 20 ++++++++++++++++++ .../index/internal/keys/KeysIndex.java | 20 ++++++++++++++++++ .../cassandra/locator/PendingRangeMaps.java | 20 ++++++++++++++++++ .../cassandra/repair/RepairParallelism.java | 20 ++++++++++++++++++ .../apache/cassandra/tools/JsonTransformer.java | 22 +++++++++++++++++++- .../apache/cassandra/utils/OverlapIterator.java | 22 +++++++++++++++++++- .../utils/RMIServerSocketFactoryImpl.java | 20 ++++++++++++++++++ .../org/apache/cassandra/utils/SyncUtil.java | 20 ++++++++++++++++++ .../apache/cassandra/utils/concurrent/Ref.java | 20 ++++++++++++++++++ .../apache/cassandra/utils/concurrent/Refs.java | 20 ++++++++++++++++++ .../io/compress/CompressorPerformance.java | 20 ++++++++++++++++++ .../test/microbench/PendingRangesBench.java | 20 ++++++++++++++++++ .../cassandra/cql3/IndexQueryPagingTest.java | 20 ++++++++++++++++++ .../selection/SelectionColumnMappingTest.java | 20 ++++++++++++++++++ .../validation/operations/SelectLimitTest.java | 20 ++++++++++++++++++ .../SelectOrderedPartitionerTest.java | 20 ++++++++++++++++++ .../db/SinglePartitionSliceCommandTest.java | 20 ++++++++++++++++++ .../commitlog/CommitLogSegmentManagerTest.java | 22 +++++++++++++++++++- .../rows/RowAndDeletionMergeIteratorTest.java | 20 ++++++++++++++++++ .../gms/ArrayBackedBoundedStatsTest.java | 20 ++++++++++++++++++ .../apache/cassandra/index/CustomIndexTest.java | 20 ++++++++++++++++++ .../index/internal/CustomCassandraIndex.java | 20 ++++++++++++++++++ .../io/util/BufferedDataOutputStreamTest.java | 20 ++++++++++++++++++ .../io/util/NIODataInputStreamTest.java | 20 ++++++++++++++++++ .../io/util/RandomAccessReaderTest.java | 20 ++++++++++++++++++ .../cassandra/locator/PendingRangeMapsTest.java | 20 ++++++++++++++++++ .../cassandra/net/MessagingServiceTest.java | 20 ++++++++++++++++++ .../service/RMIServerSocketFactoryImplTest.java | 20 ++++++++++++++++++ .../apache/cassandra/utils/TopKSamplerTest.java | 20 ++++++++++++++++++ 54 files changed, 1084 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/src/java/org/apache/cassandra/cql3/selection/SelectionColumnMapping.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/src/java/org/apache/cassandra/db/lifecycle/LogAwareFileLister.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/lifecycle/LogAwareFileLister.java index 4d3d46d,0000000..e9072c4 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/db/lifecycle/LogAwareFileLister.java +++ b/src/java/org/apache/cassandra/db/lifecycle/LogAwareFileLister.java @@@ -1,183 -1,0 +1,203 @@@ ++/* ++ * ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ * ++ */ +package org.apache.cassandra.db.lifecycle; + +import java.io.File; +import java.io.IOException; +import java.nio.file.DirectoryStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.*; +import java.util.function.BiFunction; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +import com.google.common.annotations.VisibleForTesting; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.db.Directories; + +import static org.apache.cassandra.db.Directories.*; + +/** + * A class for listing files in a folder. + */ +final class LogAwareFileLister +{ + private static final Logger logger = LoggerFactory.getLogger(LogAwareFileLister.class); + + // The folder to scan + private final Path folder; + + // The filter determines which files the client wants returned + private final BiFunction<File, FileType, Boolean> filter; //file, file type + + // The behavior when we fail to list files + private final OnTxnErr onTxnErr; + + // The unfiltered result + NavigableMap<File, Directories.FileType> files = new TreeMap<>(); + + @VisibleForTesting + LogAwareFileLister(Path folder, BiFunction<File, FileType, Boolean> filter, OnTxnErr onTxnErr) + { + this.folder = folder; + this.filter = filter; + this.onTxnErr = onTxnErr; + } + + public List<File> list() + { + try + { + return innerList(); + } + catch (Throwable t) + { + throw new RuntimeException(String.format("Failed to list files in %s", folder), t); + } + } + + List<File> innerList() throws Throwable + { + list(Files.newDirectoryStream(folder)) + .stream() + .filter((f) -> !LogFile.isLogFile(f)) + .forEach((f) -> files.put(f, FileType.FINAL)); + + // Since many file systems are not atomic, we cannot be sure we have listed a consistent disk state + // (Linux would permit this, but for simplicity we keep our behaviour the same across platforms) + // so we must be careful to list txn log files AFTER every other file since these files are deleted last, + // after all other files are removed + list(Files.newDirectoryStream(folder, '*' + LogFile.EXT)) + .stream() + .filter(LogFile::isLogFile) + .forEach(this::classifyFiles); + + // Finally we apply the user filter before returning our result + return files.entrySet().stream() + .filter((e) -> filter.apply(e.getKey(), e.getValue())) + .map(Map.Entry::getKey) + .collect(Collectors.toList()); + } + + static List<File> list(DirectoryStream<Path> stream) throws IOException + { + try + { + return StreamSupport.stream(stream.spliterator(), false) + .map(Path::toFile) + .filter((f) -> !f.isDirectory()) + .collect(Collectors.toList()); + } + finally + { + stream.close(); + } + } + + /** + * We read txn log files, if we fail we throw only if the user has specified + * OnTxnErr.THROW, else we log an error and apply the txn log anyway + */ + void classifyFiles(File txnFile) + { + LogFile txn = LogFile.make(txnFile); + readTxnLog(txn); + classifyFiles(txn); + files.put(txnFile, FileType.TXN_LOG); + } + + void readTxnLog(LogFile txn) + { + if (!txn.verify() && onTxnErr == OnTxnErr.THROW) + throw new LogTransaction.CorruptTransactionLogException("Some records failed verification. See earlier in log for details.", txn); + } + + void classifyFiles(LogFile txnFile) + { + Map<LogRecord, Set<File>> oldFiles = txnFile.getFilesOfType(folder, files.navigableKeySet(), LogRecord.Type.REMOVE); + Map<LogRecord, Set<File>> newFiles = txnFile.getFilesOfType(folder, files.navigableKeySet(), LogRecord.Type.ADD); + + if (txnFile.completed()) + { // last record present, filter regardless of disk status + setTemporary(txnFile, oldFiles.values(), newFiles.values()); + return; + } + + if (allFilesPresent(oldFiles)) + { // all old files present, transaction is in progress, this will filter as aborted + setTemporary(txnFile, oldFiles.values(), newFiles.values()); + return; + } + + // some old files are missing, we expect the txn file to either also be missing or completed, so check + // disk state again to resolve any previous races on non-atomic directory listing platforms + + // if txn file also gone, then do nothing (all temporary should be gone, we could remove them if any) + if (!txnFile.exists()) + return; + + // otherwise read the file again to see if it is completed now + readTxnLog(txnFile); + + if (txnFile.completed()) + { // if after re-reading the txn is completed then filter accordingly + setTemporary(txnFile, oldFiles.values(), newFiles.values()); + return; + } + + logger.error("Failed to classify files in {}\n" + + "Some old files are missing but the txn log is still there and not completed\n" + + "Files in folder:\n{}\nTxn: {}\n{}", + folder, + files.isEmpty() + ? "\t-" + : String.join("\n", files.keySet().stream().map(f -> String.format("\t%s", f)).collect(Collectors.toList())), + txnFile.toString(), + String.join("\n", txnFile.getRecords().stream().map(r -> String.format("\t%s", r)).collect(Collectors.toList()))); + + // some old files are missing and yet the txn is still there and not completed + // something must be wrong (see comment at the top of LogTransaction requiring txn to be + // completed before obsoleting or aborting sstables) + throw new RuntimeException(String.format("Failed to list directory files in %s, inconsistent disk state for transaction %s", + folder, + txnFile)); + } + + /** See if all files are present */ + private static boolean allFilesPresent(Map<LogRecord, Set<File>> oldFiles) + { + return !oldFiles.entrySet().stream() + .filter((e) -> e.getKey().numFiles > e.getValue().size()) + .findFirst().isPresent(); + } + + private void setTemporary(LogFile txnFile, Collection<Set<File>> oldFiles, Collection<Set<File>> newFiles) + { + Collection<Set<File>> temporary = txnFile.committed() ? oldFiles : newFiles; + temporary.stream() + .flatMap(Set::stream) + .forEach((f) -> this.files.put(f, FileType.TEMPORARY)); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/src/java/org/apache/cassandra/db/lifecycle/LogFile.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/lifecycle/LogFile.java index 4c3e550,0000000..6d0c835 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/db/lifecycle/LogFile.java +++ b/src/java/org/apache/cassandra/db/lifecycle/LogFile.java @@@ -1,397 -1,0 +1,417 @@@ ++/* ++ * ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ * ++ */ +package org.apache.cassandra.db.lifecycle; + +import java.io.File; +import java.nio.file.Path; +import java.util.*; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Iterables; + +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.db.compaction.OperationType; +import org.apache.cassandra.db.lifecycle.LogRecord.Type; +import org.apache.cassandra.io.sstable.SSTable; +import org.apache.cassandra.io.sstable.format.big.BigFormat; +import org.apache.cassandra.utils.Throwables; + +import static org.apache.cassandra.utils.Throwables.merge; + +/** + * A transaction log file. We store transaction records into a log file, which is + * copied into multiple identical replicas on different disks, @see LogFileReplica. + * + * This class supports the transactional logic of LogTransaction and the removing + * of unfinished leftovers when a transaction is completed, or aborted, or when + * we clean up on start-up. + * + * @see LogTransaction + */ +final class LogFile +{ + private static final Logger logger = LoggerFactory.getLogger(LogFile.class); + + static String EXT = ".log"; + static char SEP = '_'; + // cc_txn_opname_id.log (where cc is one of the sstable versions defined in BigVersion) + static Pattern FILE_REGEX = Pattern.compile(String.format("^(.{2})_txn_(.*)_(.*)%s$", EXT)); + + // A set of physical files on disk, each file is an identical replica + private final LogReplicaSet replicas = new LogReplicaSet(); + + // The transaction records, this set must be ORDER PRESERVING + private final LinkedHashSet<LogRecord> records = new LinkedHashSet<>(); + + // The type of the transaction + private final OperationType type; + + // The unique id of the transaction + private final UUID id; + + static LogFile make(File logReplica) + { + return make(logReplica.getName(), Collections.singletonList(logReplica)); + } + + static LogFile make(String fileName, List<File> logReplicas) + { + Matcher matcher = LogFile.FILE_REGEX.matcher(fileName); + boolean matched = matcher.matches(); + assert matched && matcher.groupCount() == 3; + + // For now we don't need this but it is there in case we need to change + // file format later on, the version is the sstable version as defined in BigFormat + //String version = matcher.group(1); + + OperationType operationType = OperationType.fromFileName(matcher.group(2)); + UUID id = UUID.fromString(matcher.group(3)); + + return new LogFile(operationType, id, logReplicas); + } + + Throwable syncFolder(Throwable accumulate) + { + return replicas.syncFolder(accumulate); + } + + OperationType type() + { + return type; + } + + UUID id() + { + return id; + } + + Throwable removeUnfinishedLeftovers(Throwable accumulate) + { + try + { + deleteFilesForRecordsOfType(committed() ? Type.REMOVE : Type.ADD); + + // we sync the parent folders between contents and log deletion + // to ensure there is a happens before edge between them + Throwables.maybeFail(syncFolder(accumulate)); + + accumulate = replicas.delete(accumulate); + } + catch (Throwable t) + { + accumulate = merge(accumulate, t); + } + + return accumulate; + } + + static boolean isLogFile(File file) + { + return LogFile.FILE_REGEX.matcher(file.getName()).matches(); + } + + LogFile(OperationType type, UUID id, List<File> replicas) + { + this(type, id); + this.replicas.addReplicas(replicas); + } + + LogFile(OperationType type, UUID id) + { + this.type = type; + this.id = id; + } + + boolean verify() + { + records.clear(); + if (!replicas.readRecords(records)) + { + logger.error("Failed to read records from {}", replicas); + return false; + } + + records.forEach(LogFile::verifyRecord); + + Optional<LogRecord> firstInvalid = records.stream().filter(LogRecord::isInvalidOrPartial).findFirst(); + if (!firstInvalid.isPresent()) + return true; + + LogRecord failedOn = firstInvalid.get(); + if (getLastRecord() != failedOn) + { + logError(failedOn); + return false; + } + + records.stream().filter((r) -> r != failedOn).forEach(LogFile::verifyRecordWithCorruptedLastRecord); + if (records.stream() + .filter((r) -> r != failedOn) + .filter(LogRecord::isInvalid) + .map(LogFile::logError) + .findFirst().isPresent()) + { + logError(failedOn); + return false; + } + + // if only the last record is corrupt and all other records have matching files on disk, @see verifyRecord, + // then we simply exited whilst serializing the last record and we carry on + logger.warn(String.format("Last record of transaction %s is corrupt or incomplete [%s], " + + "but all previous records match state on disk; continuing", + id, + failedOn.error())); + return true; + } + + static LogRecord logError(LogRecord record) + { + logger.error("{}", record.error()); + return record; + } + + static void verifyRecord(LogRecord record) + { + if (record.checksum != record.computeChecksum()) + { + record.setError(String.format("Invalid checksum for sstable [%s], record [%s]: [%d] should have been [%d]", + record.fileName(), + record, + record.checksum, + record.computeChecksum())); + return; + } + + if (record.type != Type.REMOVE) + return; + + // Paranoid sanity checks: we create another record by looking at the files as they are + // on disk right now and make sure the information still matches. We don't want to delete + // files by mistake if the user has copied them from backup and forgot to remove a txn log + // file that obsoleted the very same files. So we check the latest update time and make sure + // it matches. Because we delete files from oldest to newest, the latest update time should + // always match. + record.status.onDiskRecord = record.withExistingFiles(); + if (record.updateTime != record.status.onDiskRecord.updateTime && record.status.onDiskRecord.numFiles > 0) + { + record.setError(String.format("Unexpected files detected for sstable [%s], " + + "record [%s]: last update time [%tT] should have been [%tT]", + record.fileName(), + record, + record.status.onDiskRecord.updateTime, + record.updateTime)); + + } + } + + static void verifyRecordWithCorruptedLastRecord(LogRecord record) + { + if (record.type == Type.REMOVE && record.status.onDiskRecord.numFiles < record.numFiles) + { // if we found a corruption in the last record, then we continue only + // if the number of files matches exactly for all previous records. + record.setError(String.format("Incomplete fileset detected for sstable [%s], record [%s]: " + + "number of files [%d] should have been [%d]. Treating as unrecoverable " + + "due to corruption of the final record.", + record.fileName(), + record.raw, + record.status.onDiskRecord.numFiles, + record.numFiles)); + } + } + + void commit() + { + assert !completed() : "Already completed!"; + addRecord(LogRecord.makeCommit(System.currentTimeMillis())); + } + + void abort() + { + assert !completed() : "Already completed!"; + addRecord(LogRecord.makeAbort(System.currentTimeMillis())); + } + + private boolean isLastRecordValidWithType(Type type) + { + LogRecord lastRecord = getLastRecord(); + return lastRecord != null && + lastRecord.type == type && + lastRecord.isValid(); + } + + boolean committed() + { + return isLastRecordValidWithType(Type.COMMIT); + } + + boolean aborted() + { + return isLastRecordValidWithType(Type.ABORT); + } + + boolean completed() + { + return committed() || aborted(); + } + + void add(Type type, SSTable table) + { + if (!addRecord(makeRecord(type, table))) + throw new IllegalStateException(); + } + + private LogRecord makeRecord(Type type, SSTable table) + { + assert type == Type.ADD || type == Type.REMOVE; + + File folder = table.descriptor.directory; + replicas.maybeCreateReplica(folder, getFileName(folder), records); + return LogRecord.make(type, table); + } + + private boolean addRecord(LogRecord record) + { + if (records.contains(record)) + return false; + + replicas.append(record); + + return records.add(record); + } + + void remove(Type type, SSTable table) + { + LogRecord record = makeRecord(type, table); + assert records.contains(record) : String.format("[%s] is not tracked by %s", record, id); + + deleteRecordFiles(record); + records.remove(record); + } + + boolean contains(Type type, SSTable table) + { + return records.contains(makeRecord(type, table)); + } + + void deleteFilesForRecordsOfType(Type type) + { + records.stream() + .filter(type::matches) + .forEach(LogFile::deleteRecordFiles); + records.clear(); + } + + private static void deleteRecordFiles(LogRecord record) + { + List<File> files = record.getExistingFiles(); + + // we sort the files in ascending update time order so that the last update time + // stays the same even if we only partially delete files, see comment in isInvalid() + files.sort((f1, f2) -> Long.compare(f1.lastModified(), f2.lastModified())); + + files.forEach(LogTransaction::delete); + } + + /** + * Extract from the files passed in all those that are of the given type. + * + * Scan all records and select those that are of the given type, valid, and + * located in the same folder. For each such record extract from the files passed in + * those that belong to this record. + * + * @return a map linking each mapped record to its files, where the files where passed in as parameters. + */ + Map<LogRecord, Set<File>> getFilesOfType(Path folder, NavigableSet<File> files, Type type) + { + Map<LogRecord, Set<File>> ret = new HashMap<>(); + + records.stream() + .filter(type::matches) + .filter(LogRecord::isValid) + .filter(r -> r.isInFolder(folder)) + .forEach((r) -> ret.put(r, getRecordFiles(files, r))); + + return ret; + } + + LogRecord getLastRecord() + { + return Iterables.getLast(records, null); + } + + private static Set<File> getRecordFiles(NavigableSet<File> files, LogRecord record) + { + String fileName = record.fileName(); + return files.stream().filter(f -> f.getName().startsWith(fileName)).collect(Collectors.toSet()); + } + + boolean exists() + { + return replicas.exists(); + } + + void close() + { + replicas.close(); + } + + @Override + public String toString() + { + return replicas.toString(); + } + + @VisibleForTesting + List<File> getFiles() + { + return replicas.getFiles(); + } + + @VisibleForTesting + List<String> getFilePaths() + { + return replicas.getFilePaths(); + } + + private String getFileName(File folder) + { + String fileName = StringUtils.join(BigFormat.latestVersion, + LogFile.SEP, + "txn", + LogFile.SEP, + type.fileName, + LogFile.SEP, + id.toString(), + LogFile.EXT); + return StringUtils.join(folder, File.separator, fileName); + } + + Collection<LogRecord> getRecords() + { + return records; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/src/java/org/apache/cassandra/db/lifecycle/LogRecord.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/lifecycle/LogRecord.java index 9b7d59e,0000000..d7eb774 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/db/lifecycle/LogRecord.java +++ b/src/java/org/apache/cassandra/db/lifecycle/LogRecord.java @@@ -1,309 -1,0 +1,329 @@@ ++/* ++ * ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ * ++ */ +package org.apache.cassandra.db.lifecycle; + +import java.io.File; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.*; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.zip.CRC32; + +import org.apache.cassandra.io.sstable.SSTable; +import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.utils.FBUtilities; + +/** + * A decoded line in a transaction log file replica. + * + * @see LogReplica and LogFile. + */ +final class LogRecord +{ + public enum Type + { + UNKNOWN, // a record that cannot be parsed + ADD, // new files to be retained on commit + REMOVE, // old files to be retained on abort + COMMIT, // commit flag + ABORT; // abort flag + + public static Type fromPrefix(String prefix) + { + return valueOf(prefix.toUpperCase()); + } + + public boolean hasFile() + { + return this == Type.ADD || this == Type.REMOVE; + } + + public boolean matches(LogRecord record) + { + return this == record.type; + } + + public boolean isFinal() { return this == Type.COMMIT || this == Type.ABORT; } + } + + /** + * The status of a record after it has been verified, any parsing errors + * are also store here. + */ + public final static class Status + { + // if there are any errors, they end up here + Optional<String> error = Optional.empty(); + + // if the record was only partially matched across files this is true + boolean partial = false; + + // if the status of this record on disk is required (e.g. existing files), it is + // stored here for caching + LogRecord onDiskRecord; + + void setError(String error) + { + if (!this.error.isPresent()) + this.error = Optional.of(error); + } + + boolean hasError() + { + return error.isPresent(); + } + } + + // the type of record, see Type + public final Type type; + // for sstable records, the absolute path of the table desc + public final Optional<String> absolutePath; + // for sstable records, the last update time of all files (may not be available for NEW records) + public final long updateTime; + // for sstable records, the total number of files (may not be accurate for NEW records) + public final int numFiles; + // the raw string as written or read from a file + public final String raw; + // the checksum of this record, written at the end of the record string + public final long checksum; + // the status of this record, @see Status class + public final Status status; + + // (add|remove|commit|abort):[*,*,*][checksum] + static Pattern REGEX = Pattern.compile("^(add|remove|commit|abort):\\[([^,]*),?([^,]*),?([^,]*)\\]\\[(\\d*)\\]$", Pattern.CASE_INSENSITIVE); + + public static LogRecord make(String line) + { + try + { + Matcher matcher = REGEX.matcher(line); + if (!matcher.matches()) + return new LogRecord(Type.UNKNOWN, null, 0, 0, 0, line) + .setError(String.format("Failed to parse [%s]", line)); + + Type type = Type.fromPrefix(matcher.group(1)); + return new LogRecord(type, + matcher.group(2), + Long.valueOf(matcher.group(3)), + Integer.valueOf(matcher.group(4)), + Long.valueOf(matcher.group(5)), line); + } + catch (Throwable t) + { + return new LogRecord(Type.UNKNOWN, null, 0, 0, 0, line).setError(t); + } + } + + public static LogRecord makeCommit(long updateTime) + { + return new LogRecord(Type.COMMIT, updateTime); + } + + public static LogRecord makeAbort(long updateTime) + { + return new LogRecord(Type.ABORT, updateTime); + } + + public static LogRecord make(Type type, SSTable table) + { + String absoluteTablePath = FileUtils.getCanonicalPath(table.descriptor.baseFilename()); + return make(type, getExistingFiles(absoluteTablePath), table.getAllFilePaths().size(), absoluteTablePath); + } + + public LogRecord withExistingFiles() + { + return make(type, getExistingFiles(), 0, absolutePath.get()); + } + + public static LogRecord make(Type type, List<File> files, int minFiles, String absolutePath) + { + long lastModified = files.stream().map(File::lastModified).reduce(0L, Long::max); + return new LogRecord(type, absolutePath, lastModified, Math.max(minFiles, files.size())); + } + + private LogRecord(Type type, long updateTime) + { + this(type, null, updateTime, 0, 0, null); + } + + private LogRecord(Type type, + String absolutePath, + long updateTime, + int numFiles) + { + this(type, absolutePath, updateTime, numFiles, 0, null); + } + + private LogRecord(Type type, + String absolutePath, + long updateTime, + int numFiles, + long checksum, + String raw) + { + assert !type.hasFile() || absolutePath != null : "Expected file path for file records"; + + this.type = type; + this.absolutePath = type.hasFile() ? Optional.of(absolutePath) : Optional.<String>empty(); + this.updateTime = type == Type.REMOVE ? updateTime : 0; + this.numFiles = type.hasFile() ? numFiles : 0; + this.status = new Status(); + if (raw == null) + { + assert checksum == 0; + this.checksum = computeChecksum(); + this.raw = format(); + } + else + { + this.checksum = checksum; + this.raw = raw; + } + } + + LogRecord setError(Throwable t) + { + return setError(t.getMessage()); + } + + LogRecord setError(String error) + { + status.setError(error); + return this; + } + + String error() + { + return status.error.orElse(""); + } + + void setPartial() + { + status.partial = true; + } + + boolean partial() + { + return status.partial; + } + + boolean isValid() + { + return !status.hasError() && type != Type.UNKNOWN; + } + + boolean isInvalid() + { + return !isValid(); + } + + boolean isInvalidOrPartial() + { + return isInvalid() || partial(); + } + + private String format() + { + return String.format("%s:[%s,%d,%d][%d]", + type.toString(), + absolutePath(), + updateTime, + numFiles, + checksum); + } + + public List<File> getExistingFiles() + { + assert absolutePath.isPresent() : "Expected a path in order to get existing files"; + return getExistingFiles(absolutePath.get()); + } + + public static List<File> getExistingFiles(String absoluteFilePath) + { + Path path = Paths.get(absoluteFilePath); + File[] files = path.getParent().toFile().listFiles((dir, name) -> name.startsWith(path.getFileName().toString())); + // files may be null if the directory does not exist yet, e.g. when tracking new files + return files == null ? Collections.emptyList() : Arrays.asList(files); + } + + public boolean isFinal() + { + return type.isFinal(); + } + + String fileName() + { + return absolutePath.isPresent() ? Paths.get(absolutePath.get()).getFileName().toString() : ""; + } + + boolean isInFolder(Path folder) + { + return absolutePath.isPresent() + ? FileUtils.isContained(folder.toFile(), Paths.get(absolutePath.get()).toFile()) + : false; + } + + String absolutePath() + { + return absolutePath.isPresent() ? absolutePath.get() : ""; + } + + @Override + public int hashCode() + { + // see comment in equals + return Objects.hash(type, absolutePath, numFiles, updateTime); + } + + @Override + public boolean equals(Object obj) + { + if (!(obj instanceof LogRecord)) + return false; + + final LogRecord other = (LogRecord)obj; + + // we exclude on purpose checksum, error and full file path + // since records must match across log file replicas on different disks + return type == other.type && + absolutePath.equals(other.absolutePath) && + numFiles == other.numFiles && + updateTime == other.updateTime; + } + + @Override + public String toString() + { + return raw; + } + + long computeChecksum() + { + CRC32 crc32 = new CRC32(); + crc32.update((absolutePath()).getBytes(FileUtils.CHARSET)); + crc32.update(type.toString().getBytes(FileUtils.CHARSET)); + FBUtilities.updateChecksumInt(crc32, (int) updateTime); + FBUtilities.updateChecksumInt(crc32, (int) (updateTime >>> 32)); + FBUtilities.updateChecksumInt(crc32, numFiles); + return crc32.getValue() & (Long.MAX_VALUE); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/src/java/org/apache/cassandra/db/lifecycle/SSTableIntervalTree.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/src/java/org/apache/cassandra/db/lifecycle/SSTableSet.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/lifecycle/SSTableSet.java index 6cc26d6,0000000..07a3b2b mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/db/lifecycle/SSTableSet.java +++ b/src/java/org/apache/cassandra/db/lifecycle/SSTableSet.java @@@ -1,12 -1,0 +1,32 @@@ ++/* ++ * ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ * ++ */ +package org.apache.cassandra.db.lifecycle; + +public enum SSTableSet +{ + // returns the "canonical" version of any current sstable, i.e. if an sstable is being replaced and is only partially + // visible to reads, this sstable will be returned as its original entirety, and its replacement will not be returned + // (even if it completely replaces it) + CANONICAL, + // returns the live versions of all sstables, i.e. including partially written sstables + LIVE, + NONCOMPACTING - } ++} http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/src/java/org/apache/cassandra/db/transform/BaseIterator.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/transform/BaseIterator.java index 9b95dfa,0000000..dd928eb mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/db/transform/BaseIterator.java +++ b/src/java/org/apache/cassandra/db/transform/BaseIterator.java @@@ -1,129 -1,0 +1,149 @@@ ++/* ++ * ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ * ++ */ +package org.apache.cassandra.db.transform; + +import java.util.Iterator; +import java.util.NoSuchElementException; + +import net.nicoulaj.compilecommand.annotations.DontInline; +import org.apache.cassandra.utils.CloseableIterator; + +import static org.apache.cassandra.utils.Throwables.maybeFail; +import static org.apache.cassandra.utils.Throwables.merge; + +abstract class BaseIterator<V, I extends CloseableIterator<? extends V>, O extends V> extends Stack implements AutoCloseable, Iterator<O> +{ + I input; + V next; + Stop stop; // applies at the end of the current next() + + static class Stop + { + // TODO: consider moving "next" into here, so that a stop() when signalled outside of a function call (e.g. in attach) + // can take effect immediately; this doesn't seem to be necessary at the moment, but it might cause least surprise in future + boolean isSignalled; + } + + // responsibility for initialising next lies with the subclass + BaseIterator(BaseIterator<? extends V, ? extends I, ?> copyFrom) + { + super(copyFrom); + this.input = copyFrom.input; + this.next = copyFrom.next; + this.stop = copyFrom.stop; + } + + BaseIterator(I input) + { + this.input = input; + this.stop = new Stop(); + } + + /** + * run the corresponding runOnClose method for the first length transformations. + * + * used in hasMoreContents to close the methods preceding the MoreContents + */ + protected abstract Throwable runOnClose(int length); + + /** + * apply the relevant method from the transformation to the value. + * + * used in hasMoreContents to apply the functions that follow the MoreContents + */ + protected abstract V applyOne(V value, Transformation transformation); + + public final void close() + { + Throwable fail = runOnClose(length); + if (next instanceof AutoCloseable) + { + try { ((AutoCloseable) next).close(); } + catch (Throwable t) { fail = merge(fail, t); } + } + try { input.close(); } + catch (Throwable t) { fail = merge(fail, t); } + maybeFail(fail); + } + + public final O next() + { + if (next == null && !hasNext()) + throw new NoSuchElementException(); + + O next = (O) this.next; + this.next = null; + return next; + } + + // may set next != null if the next contents are a transforming iterator that already has data to return, + // in which case we immediately have more contents to yield + protected final boolean hasMoreContents() + { + return moreContents.length > 0 && tryGetMoreContents(); + } + + @DontInline + private boolean tryGetMoreContents() + { + for (int i = 0 ; i < moreContents.length ; i++) + { + MoreContentsHolder holder = moreContents[i]; + MoreContents provider = holder.moreContents; + I newContents = (I) provider.moreContents(); + if (newContents == null) + continue; + + input.close(); + input = newContents; + Stack prefix = EMPTY; + if (newContents instanceof BaseIterator) + { + // we're refilling with transformed contents, so swap in its internals directly + // TODO: ensure that top-level data is consistent. i.e. staticRow, partitionlevelDeletion etc are same? + BaseIterator abstr = (BaseIterator) newContents; + prefix = abstr; + input = (I) abstr.input; + next = apply((V) abstr.next, holder.length); // must apply all remaining functions to the next, if any + } + + // since we're truncating our transformation stack to only those occurring after the extend transformation + // we have to run any prior runOnClose methods + maybeFail(runOnClose(holder.length)); + refill(prefix, holder, i); + + if (next != null || input.hasNext()) + return true; + + i = -1; + } + return false; + } + + // apply the functions [from..length) + private V apply(V next, int from) + { + while (next != null & from < length) + next = applyOne(next, stack[from++]); + return next; + } +} + http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/src/java/org/apache/cassandra/db/transform/BasePartitions.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/transform/BasePartitions.java index e795760,0000000..026a39d mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/db/transform/BasePartitions.java +++ b/src/java/org/apache/cassandra/db/transform/BasePartitions.java @@@ -1,100 -1,0 +1,120 @@@ ++/* ++ * ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ * ++ */ +package org.apache.cassandra.db.transform; + +import java.util.Collections; + +import org.apache.cassandra.db.partitions.BasePartitionIterator; +import org.apache.cassandra.db.rows.BaseRowIterator; +import org.apache.cassandra.utils.Throwables; + +import static org.apache.cassandra.utils.Throwables.merge; + +public abstract class BasePartitions<R extends BaseRowIterator<?>, I extends BasePartitionIterator<? extends BaseRowIterator<?>>> +extends BaseIterator<BaseRowIterator<?>, I, R> +implements BasePartitionIterator<R> +{ + + public BasePartitions(I input) + { + super(input); + } + + BasePartitions(BasePartitions<?, ? extends I> copyFrom) + { + super(copyFrom); + } + + + // ********************************* + + + protected BaseRowIterator<?> applyOne(BaseRowIterator<?> value, Transformation transformation) + { + return value == null ? null : transformation.applyToPartition(value); + } + + void add(Transformation transformation) + { + transformation.attachTo(this); + super.add(transformation); + next = applyOne(next, transformation); + } + + protected Throwable runOnClose(int length) + { + Throwable fail = null; + Transformation[] fs = stack; + for (int i = 0 ; i < length ; i++) + { + try + { + fs[i].onClose(); + } + catch (Throwable t) + { + fail = merge(fail, t); + } + } + return fail; + } + + public final boolean hasNext() + { + BaseRowIterator<?> next = null; + try + { + + Stop stop = this.stop; + while (this.next == null) + { + Transformation[] fs = stack; + int len = length; + + while (!stop.isSignalled && input.hasNext()) + { + next = input.next(); + for (int i = 0 ; next != null & i < len ; i++) + next = fs[i].applyToPartition(next); + + if (next != null) + { + this.next = next; + return true; + } + } + + if (stop.isSignalled || !hasMoreContents()) + return false; + } + return true; + + } + catch (Throwable t) + { + if (next != null) + Throwables.close(t, Collections.singleton(next)); + throw t; + } + } + +} + http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/src/java/org/apache/cassandra/db/transform/BaseRows.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/transform/BaseRows.java index 78526e8,0000000..b0e642b mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/db/transform/BaseRows.java +++ b/src/java/org/apache/cassandra/db/transform/BaseRows.java @@@ -1,139 -1,0 +1,159 @@@ ++/* ++ * ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ * ++*/ +package org.apache.cassandra.db.transform; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.PartitionColumns; +import org.apache.cassandra.db.rows.*; + +import static org.apache.cassandra.utils.Throwables.merge; + +public abstract class BaseRows<R extends Unfiltered, I extends BaseRowIterator<? extends Unfiltered>> +extends BaseIterator<Unfiltered, I, R> +implements BaseRowIterator<R> +{ + + private Row staticRow; + + public BaseRows(I input) + { + super(input); + staticRow = input.staticRow(); + } + + // swap parameter order to avoid casting errors + BaseRows(BaseRows<?, ? extends I> copyFrom) + { + super(copyFrom); + staticRow = copyFrom.staticRow; + } + + public CFMetaData metadata() + { + return input.metadata(); + } + + public boolean isReverseOrder() + { + return input.isReverseOrder(); + } + + public PartitionColumns columns() + { + return input.columns(); + } + + public DecoratedKey partitionKey() + { + return input.partitionKey(); + } + + public Row staticRow() + { + return staticRow; + } + + + // ************************** + + + @Override + protected Throwable runOnClose(int length) + { + Throwable fail = null; + Transformation[] fs = stack; + for (int i = 0 ; i < length ; i++) + { + try + { + fs[i].onPartitionClose(); + } + catch (Throwable t) + { + fail = merge(fail, t); + } + } + return fail; + } + + @Override + void add(Transformation transformation) + { + transformation.attachTo(this); + super.add(transformation); + + // transform any existing data + staticRow = transformation.applyToStatic(staticRow); + next = applyOne(next, transformation); + } + + @Override + protected Unfiltered applyOne(Unfiltered value, Transformation transformation) + { + return value == null + ? null + : value instanceof Row + ? transformation.applyToRow((Row) value) + : transformation.applyToMarker((RangeTombstoneMarker) value); + } + + @Override + public final boolean hasNext() + { + Stop stop = this.stop; + while (this.next == null) + { + Transformation[] fs = stack; + int len = length; + + while (!stop.isSignalled && input.hasNext()) + { + Unfiltered next = input.next(); + + if (next.isRow()) + { + Row row = (Row) next; + for (int i = 0 ; row != null && i < len ; i++) + row = fs[i].applyToRow(row); + next = row; + } + else + { + RangeTombstoneMarker rtm = (RangeTombstoneMarker) next; + for (int i = 0 ; rtm != null && i < len ; i++) + rtm = fs[i].applyToMarker(rtm); + next = rtm; + } + + if (next != null) + { + this.next = next; + return true; + } + } + + if (stop.isSignalled || !hasMoreContents()) + return false; + } + return true; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/src/java/org/apache/cassandra/db/transform/Filter.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/transform/Filter.java index 3bf831f,0000000..138d3c8 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/db/transform/Filter.java +++ b/src/java/org/apache/cassandra/db/transform/Filter.java @@@ -1,56 -1,0 +1,76 @@@ ++/* ++ * ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ * ++ */ +package org.apache.cassandra.db.transform; + +import org.apache.cassandra.db.DeletionPurger; +import org.apache.cassandra.db.rows.*; + +final class Filter extends Transformation +{ + private final boolean filterEmpty; // generally maps to !isForThrift, but also false for direct row filtration + private final int nowInSec; + public Filter(boolean filterEmpty, int nowInSec) + { + this.filterEmpty = filterEmpty; + this.nowInSec = nowInSec; + } + + public RowIterator applyToPartition(BaseRowIterator iterator) + { + RowIterator filtered = iterator instanceof UnfilteredRows + ? new FilteredRows(this, (UnfilteredRows) iterator) + : new FilteredRows((UnfilteredRowIterator) iterator, this); + + if (filterEmpty && closeIfEmpty(filtered)) + return null; + + return filtered; + } + + public Row applyToStatic(Row row) + { + if (row.isEmpty()) + return Rows.EMPTY_STATIC_ROW; + + row = row.purge(DeletionPurger.PURGE_ALL, nowInSec); + return row == null ? Rows.EMPTY_STATIC_ROW : row; + } + + public Row applyToRow(Row row) + { + return row.purge(DeletionPurger.PURGE_ALL, nowInSec); + } + + public RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker) + { + return null; + } + + private static boolean closeIfEmpty(BaseRowIterator<?> iter) + { + if (iter.isEmpty()) + { + iter.close(); + return true; + } + return false; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/src/java/org/apache/cassandra/db/transform/FilteredPartitions.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/transform/FilteredPartitions.java index 5a802dc,0000000..09e36b4 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/db/transform/FilteredPartitions.java +++ b/src/java/org/apache/cassandra/db/transform/FilteredPartitions.java @@@ -1,40 -1,0 +1,60 @@@ ++/* ++ * ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ * ++ */ +package org.apache.cassandra.db.transform; + +import org.apache.cassandra.db.partitions.BasePartitionIterator; +import org.apache.cassandra.db.partitions.PartitionIterator; +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; +import org.apache.cassandra.db.rows.RowIterator; + +public final class FilteredPartitions extends BasePartitions<RowIterator, BasePartitionIterator<?>> implements PartitionIterator +{ + // wrap basic iterator for transformation + FilteredPartitions(PartitionIterator input) + { + super(input); + } + + // wrap basic unfiltered iterator for transformation, applying filter as first transformation + FilteredPartitions(UnfilteredPartitionIterator input, Filter filter) + { + super(input); + add(filter); + } + + // copy from an UnfilteredPartitions, applying a filter to convert it + FilteredPartitions(Filter filter, UnfilteredPartitions copyFrom) + { + super(copyFrom); + add(filter); + } + + /** + * Filter any RangeTombstoneMarker from the iterator's iterators, transforming it into a PartitionIterator. + */ + public static PartitionIterator filter(UnfilteredPartitionIterator iterator, int nowInSecs) + { + Filter filter = new Filter(!iterator.isForThrift(), nowInSecs); + if (iterator instanceof UnfilteredPartitions) + return new FilteredPartitions(filter, (UnfilteredPartitions) iterator); + return new FilteredPartitions(iterator, filter); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/src/java/org/apache/cassandra/db/transform/FilteredRows.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/transform/FilteredRows.java index b21b451,0000000..818d3bb mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/db/transform/FilteredRows.java +++ b/src/java/org/apache/cassandra/db/transform/FilteredRows.java @@@ -1,40 -1,0 +1,60 @@@ ++/* ++ * ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ * ++ */ +package org.apache.cassandra.db.transform; + +import org.apache.cassandra.db.rows.BaseRowIterator; +import org.apache.cassandra.db.rows.Row; +import org.apache.cassandra.db.rows.RowIterator; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; + +public final class FilteredRows extends BaseRows<Row, BaseRowIterator<?>> implements RowIterator +{ + FilteredRows(RowIterator input) + { + super(input); + } + + FilteredRows(UnfilteredRowIterator input, Filter filter) + { + super(input); + add(filter); + } + + FilteredRows(Filter filter, UnfilteredRows input) + { + super(input); + add(filter); + } + + @Override + public boolean isEmpty() + { + return staticRow().isEmpty() && !hasNext(); + } + + /** + * Filter any RangeTombstoneMarker from the iterator, transforming it into a RowIterator. + */ + public static RowIterator filter(UnfilteredRowIterator iterator, int nowInSecs) + { + return new Filter(false, nowInSecs).applyToPartition(iterator); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/src/java/org/apache/cassandra/db/transform/MoreContents.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/transform/MoreContents.java index 7e392ca,0000000..5277b07 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/db/transform/MoreContents.java +++ b/src/java/org/apache/cassandra/db/transform/MoreContents.java @@@ -1,8 -1,0 +1,28 @@@ ++/* ++ * ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ * ++ */ +package org.apache.cassandra.db.transform; + +// a shared internal interface, that is hidden to provide type-safety to the user +interface MoreContents<I> +{ + public abstract I moreContents(); +} + http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/src/java/org/apache/cassandra/db/transform/MorePartitions.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/transform/MorePartitions.java index 5cfcc4c,0000000..898eb7d mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/db/transform/MorePartitions.java +++ b/src/java/org/apache/cassandra/db/transform/MorePartitions.java @@@ -1,35 -1,0 +1,55 @@@ ++/* ++ * ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ * ++ */ +package org.apache.cassandra.db.transform; + +import org.apache.cassandra.db.partitions.BasePartitionIterator; +import org.apache.cassandra.db.partitions.PartitionIterator; +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; + +import static org.apache.cassandra.db.transform.Transformation.add; +import static org.apache.cassandra.db.transform.Transformation.mutable; + +/** + * An interface for providing new partitions for a partitions iterator. + * + * The new contents are produced as a normal arbitrary PartitionIterator or UnfilteredPartitionIterator (as appropriate) + * + * The transforming iterator invokes this method when any current source is exhausted, then then inserts the + * new contents as the new source. + * + * If the new source is itself a product of any transformations, the two transforming iterators are merged + * so that control flow always occurs at the outermost point + */ +public interface MorePartitions<I extends BasePartitionIterator<?>> extends MoreContents<I> +{ + + public static UnfilteredPartitionIterator extend(UnfilteredPartitionIterator iterator, MorePartitions<? super UnfilteredPartitionIterator> more) + { + return add(mutable(iterator), more); + } + + public static PartitionIterator extend(PartitionIterator iterator, MorePartitions<? super PartitionIterator> more) + { + return add(mutable(iterator), more); + } + +} + http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/src/java/org/apache/cassandra/db/transform/MoreRows.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/transform/MoreRows.java index f406a49,0000000..786e215 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/db/transform/MoreRows.java +++ b/src/java/org/apache/cassandra/db/transform/MoreRows.java @@@ -1,36 -1,0 +1,56 @@@ ++/* ++ * ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ * ++ */ +package org.apache.cassandra.db.transform; + +import org.apache.cassandra.db.rows.BaseRowIterator; +import org.apache.cassandra.db.rows.RowIterator; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; + +import static org.apache.cassandra.db.transform.Transformation.add; +import static org.apache.cassandra.db.transform.Transformation.mutable; + +/** + * An interface for providing new row contents for a partition. + * + * The new contents are produced as a normal arbitrary RowIterator or UnfilteredRowIterator (as appropriate), + * with matching staticRow, partitionKey and partitionLevelDeletion. + * + * The transforming iterator invokes this method when any current source is exhausted, then then inserts the + * new contents as the new source. + * + * If the new source is itself a product of any transformations, the two transforming iterators are merged + * so that control flow always occurs at the outermost point + */ +public interface MoreRows<I extends BaseRowIterator<?>> extends MoreContents<I> +{ + + public static UnfilteredRowIterator extend(UnfilteredRowIterator iterator, MoreRows<? super UnfilteredRowIterator> more) + { + return add(mutable(iterator), more); + } + + public static RowIterator extend(RowIterator iterator, MoreRows<? super RowIterator> more) + { + return add(mutable(iterator), more); + } + +} + http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/src/java/org/apache/cassandra/db/transform/Stack.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/transform/Stack.java index aac1679,0000000..f680ec9 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/db/transform/Stack.java +++ b/src/java/org/apache/cassandra/db/transform/Stack.java @@@ -1,81 -1,0 +1,101 @@@ ++/* ++ * ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ * ++ */ +package org.apache.cassandra.db.transform; + +import java.util.Arrays; + +class Stack +{ + static final Stack EMPTY = new Stack(); + + Transformation[] stack; + int length; // number of used stack entries + MoreContentsHolder[] moreContents; // stack of more contents providers (if any; usually zero or one) + + // an internal placeholder for a MoreContents, storing the associated stack length at time it was applied + static class MoreContentsHolder + { + final MoreContents moreContents; + int length; + private MoreContentsHolder(MoreContents moreContents, int length) + { + this.moreContents = moreContents; + this.length = length; + } + } + + Stack() + { + stack = new Transformation[0]; + moreContents = new MoreContentsHolder[0]; + } + + Stack(Stack copy) + { + stack = copy.stack; + length = copy.length; + moreContents = copy.moreContents; + } + + void add(Transformation add) + { + if (length == stack.length) + stack = resize(stack); + stack[length++] = add; + } + + void add(MoreContents more) + { + this.moreContents = Arrays.copyOf(moreContents, moreContents.length + 1); + this.moreContents[moreContents.length - 1] = new MoreContentsHolder(more, length); + } + + private static <E> E[] resize(E[] array) + { + int newLen = array.length == 0 ? 5 : array.length * 2; + return Arrays.copyOf(array, newLen); + } + + // reinitialise the transformations after a moreContents applies + void refill(Stack prefix, MoreContentsHolder holder, int index) + { + // drop the transformations that were present when the MoreContents was attached, + // and prefix any transformations in the new contents (if it's a transformer) + moreContents = splice(prefix.moreContents, prefix.moreContents.length, moreContents, index, moreContents.length); + stack = splice(prefix.stack, prefix.length, stack, holder.length, length); + length += prefix.length - holder.length; + holder.length = prefix.length; + } + + private static <E> E[] splice(E[] prefix, int prefixCount, E[] keep, int keepFrom, int keepTo) + { + int keepCount = keepTo - keepFrom; + int newCount = prefixCount + keepCount; + if (newCount > keep.length) + keep = Arrays.copyOf(keep, newCount); + if (keepFrom != prefixCount) + System.arraycopy(keep, keepFrom, keep, prefixCount, keepCount); + if (prefixCount != 0) + System.arraycopy(prefix, 0, keep, 0, prefixCount); + return keep; + } +} + http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/src/java/org/apache/cassandra/db/transform/StoppingTransformation.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/transform/StoppingTransformation.java index f3afdc0,0000000..534091e mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/db/transform/StoppingTransformation.java +++ b/src/java/org/apache/cassandra/db/transform/StoppingTransformation.java @@@ -1,60 -1,0 +1,80 @@@ ++/* ++ * ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ * ++ */ +package org.apache.cassandra.db.transform; + +import net.nicoulaj.compilecommand.annotations.DontInline; +import org.apache.cassandra.db.rows.BaseRowIterator; + +// A Transformation that can stop an iterator earlier than its natural exhaustion +public abstract class StoppingTransformation<I extends BaseRowIterator<?>> extends Transformation<I> +{ + private BaseIterator.Stop stop; + private BaseIterator.Stop stopInPartition; + + /** + * If invoked by a subclass, any partitions iterator this transformation has been applied to will terminate + * after any currently-processing item is returned, as will any row/unfiltered iterator + */ + @DontInline + protected void stop() + { + if (stop != null) + stop.isSignalled = true; + stopInPartition(); + } + + /** + * If invoked by a subclass, any rows/unfiltered iterator this transformation has been applied to will terminate + * after any currently-processing item is returned + */ + @DontInline + protected void stopInPartition() + { + if (stopInPartition != null) + stopInPartition.isSignalled = true; + } + + @Override + protected void attachTo(BasePartitions partitions) + { + assert this.stop == null; + this.stop = partitions.stop; + } + + @Override + protected void attachTo(BaseRows rows) + { + assert this.stopInPartition == null; + this.stopInPartition = rows.stop; + } + + @Override + protected void onClose() + { + stop = null; + } + + @Override + protected void onPartitionClose() + { + stopInPartition = null; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/src/java/org/apache/cassandra/db/transform/Transformation.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/transform/Transformation.java index 29e2e15,0000000..6a31ece mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/db/transform/Transformation.java +++ b/src/java/org/apache/cassandra/db/transform/Transformation.java @@@ -1,145 -1,0 +1,165 @@@ ++/* ++ * ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ * ++ */ +package org.apache.cassandra.db.transform; + +import org.apache.cassandra.db.DeletionTime; +import org.apache.cassandra.db.partitions.PartitionIterator; +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; +import org.apache.cassandra.db.rows.*; + +/** + * We have a single common superclass for all Transformations to make implementation efficient. + * we have a shared stack for all transformations, and can share the same transformation across partition and row + * iterators, reducing garbage. Internal code is also simplified by always having a basic no-op implementation to invoke. + * + * Only the necessary methods need be overridden. Early termination is provided by invoking the method's stop or stopInPartition + * methods, rather than having their own abstract method to invoke, as this is both more efficient and simpler to reason about. + */ +public abstract class Transformation<I extends BaseRowIterator<?>> +{ + // internal methods for StoppableTransformation only + void attachTo(BasePartitions partitions) { } + void attachTo(BaseRows rows) { } + + /** + * Run on the close of any (logical) partitions iterator this function was applied to + * + * We stipulate logical, because if applied to a transformed iterator the lifetime of the iterator + * object may be longer than the lifetime of the "logical" iterator it was applied to; if the iterator + * is refilled with MoreContents, for instance, the iterator may outlive this function + */ + protected void onClose() { } + + /** + * Run on the close of any (logical) rows iterator this function was applied to + * + * We stipulate logical, because if applied to a transformed iterator the lifetime of the iterator + * object may be longer than the lifetime of the "logical" iterator it was applied to; if the iterator + * is refilled with MoreContents, for instance, the iterator may outlive this function + */ + protected void onPartitionClose() { } + + /** + * Applied to any rows iterator (partition) we encounter in a partitions iterator + */ + protected I applyToPartition(I partition) + { + return partition; + } + + /** + * Applied to any row we encounter in a rows iterator + */ + protected Row applyToRow(Row row) + { + return row; + } + + /** + * Applied to any RTM we encounter in a rows/unfiltered iterator + */ + protected RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker) + { + return marker; + } + + /** + * Applied to the static row of any rows iterator. + * + * NOTE that this is only applied to the first iterator in any sequence of iterators filled by a MoreContents; + * the static data for such iterators is all expected to be equal + */ + protected Row applyToStatic(Row row) + { + return row; + } + + /** + * Applied to the partition-level deletion of any rows iterator. + * + * NOTE that this is only applied to the first iterator in any sequence of iterators filled by a MoreContents; + * the static data for such iterators is all expected to be equal + */ + protected DeletionTime applyToDeletion(DeletionTime deletionTime) + { + return deletionTime; + } + + + //****************************************************** + // Static Application Methods + //****************************************************** + + + public static UnfilteredPartitionIterator apply(UnfilteredPartitionIterator iterator, Transformation<? super UnfilteredRowIterator> transformation) + { + return add(mutable(iterator), transformation); + } + public static PartitionIterator apply(PartitionIterator iterator, Transformation<? super RowIterator> transformation) + { + return add(mutable(iterator), transformation); + } + public static UnfilteredRowIterator apply(UnfilteredRowIterator iterator, Transformation<?> transformation) + { + return add(mutable(iterator), transformation); + } + public static RowIterator apply(RowIterator iterator, Transformation<?> transformation) + { + return add(mutable(iterator), transformation); + } + + static UnfilteredPartitions mutable(UnfilteredPartitionIterator iterator) + { + return iterator instanceof UnfilteredPartitions + ? (UnfilteredPartitions) iterator + : new UnfilteredPartitions(iterator); + } + static FilteredPartitions mutable(PartitionIterator iterator) + { + return iterator instanceof FilteredPartitions + ? (FilteredPartitions) iterator + : new FilteredPartitions(iterator); + } + static UnfilteredRows mutable(UnfilteredRowIterator iterator) + { + return iterator instanceof UnfilteredRows + ? (UnfilteredRows) iterator + : new UnfilteredRows(iterator); + } + static FilteredRows mutable(RowIterator iterator) + { + return iterator instanceof FilteredRows + ? (FilteredRows) iterator + : new FilteredRows(iterator); + } + + static <E extends BaseIterator> E add(E to, Transformation add) + { + to.add(add); + return to; + } + static <E extends BaseIterator> E add(E to, MoreContents add) + { + to.add(add); + return to; + } + +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/src/java/org/apache/cassandra/db/transform/UnfilteredPartitions.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/transform/UnfilteredPartitions.java index 4e40545,0000000..bad14ad mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/db/transform/UnfilteredPartitions.java +++ b/src/java/org/apache/cassandra/db/transform/UnfilteredPartitions.java @@@ -1,27 -1,0 +1,47 @@@ ++/* ++ * ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ * ++ */ +package org.apache.cassandra.db.transform; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; + +final class UnfilteredPartitions extends BasePartitions<UnfilteredRowIterator, UnfilteredPartitionIterator> implements UnfilteredPartitionIterator +{ + final boolean isForThrift; + + // wrap an iterator for transformation + public UnfilteredPartitions(UnfilteredPartitionIterator input) + { + super(input); + this.isForThrift = input.isForThrift(); + } + + public boolean isForThrift() + { + return isForThrift; + } + + public CFMetaData metadata() + { + return input.metadata(); + } +}