http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/service/paxos/PaxosState.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/paxos/PaxosState.java b/src/java/org/apache/cassandra/service/paxos/PaxosState.java index ee1ba6a..cf7f3d3 100644 --- a/src/java/org/apache/cassandra/service/paxos/PaxosState.java +++ b/src/java/org/apache/cassandra/service/paxos/PaxosState.java @@ -27,7 +27,7 @@ import com.google.common.base.Throwables; import com.google.common.util.concurrent.Striped; import com.google.common.util.concurrent.Uninterruptibles; -import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.*; import org.apache.cassandra.tracing.Tracing; @@ -41,7 +41,7 @@ public class PaxosState private final Commit accepted; private final Commit mostRecentCommit; - public PaxosState(DecoratedKey key, CFMetaData metadata) + public PaxosState(DecoratedKey key, TableMetadata metadata) { this(Commit.emptyCommit(key, metadata), Commit.emptyCommit(key, metadata), Commit.emptyCommit(key, metadata)); } @@ -92,7 +92,7 @@ public class PaxosState } finally { - Keyspace.open(toPrepare.update.metadata().ksName).getColumnFamilyStore(toPrepare.update.metadata().cfId).metric.casPrepare.addNano(System.nanoTime() - start); + Keyspace.open(toPrepare.update.metadata().keyspace).getColumnFamilyStore(toPrepare.update.metadata().id).metric.casPrepare.addNano(System.nanoTime() - start); } } @@ -127,7 +127,7 @@ public class PaxosState } finally { - Keyspace.open(proposal.update.metadata().ksName).getColumnFamilyStore(proposal.update.metadata().cfId).metric.casPropose.addNano(System.nanoTime() - start); + Keyspace.open(proposal.update.metadata().keyspace).getColumnFamilyStore(proposal.update.metadata().id).metric.casPropose.addNano(System.nanoTime() - start); } } @@ -143,7 +143,7 @@ public class PaxosState // erase the in-progress update. // The table may have been truncated since the proposal was initiated. In that case, we // don't want to perform the mutation and potentially resurrect truncated data - if (UUIDGen.unixTimestamp(proposal.ballot) >= SystemKeyspace.getTruncatedAt(proposal.update.metadata().cfId)) + if (UUIDGen.unixTimestamp(proposal.ballot) >= SystemKeyspace.getTruncatedAt(proposal.update.metadata().id)) { Tracing.trace("Committing proposal {}", proposal); Mutation mutation = proposal.makeMutation(); @@ -158,7 +158,7 @@ public class PaxosState } finally { - Keyspace.open(proposal.update.metadata().ksName).getColumnFamilyStore(proposal.update.metadata().cfId).metric.casCommit.addNano(System.nanoTime() - start); + Keyspace.open(proposal.update.metadata().keyspace).getColumnFamilyStore(proposal.update.metadata().id).metric.casCommit.addNano(System.nanoTime() - start); } } }
http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java b/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java index 5915eab..381c498 100644 --- a/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java +++ b/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java @@ -28,12 +28,13 @@ import java.util.concurrent.ConcurrentHashMap; import com.google.common.base.Predicate; import com.google.common.collect.Iterables; + +import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.db.DecoratedKey; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.net.MessageIn; import org.apache.cassandra.utils.UUIDGen; @@ -49,7 +50,7 @@ public class PrepareCallback extends AbstractPaxosCallback<PrepareResponse> private final Map<InetAddress, Commit> commitsByReplica = new ConcurrentHashMap<InetAddress, Commit>(); - public PrepareCallback(DecoratedKey key, CFMetaData metadata, int targets, ConsistencyLevel consistency, long queryStartNanoTime) + public PrepareCallback(DecoratedKey key, TableMetadata metadata, int targets, ConsistencyLevel consistency, long queryStartNanoTime) { super(targets, consistency, queryStartNanoTime); // need to inject the right key in the empty commit so comparing with empty commits in the reply works as expected @@ -89,7 +90,7 @@ public class PrepareCallback extends AbstractPaxosCallback<PrepareResponse> latch.countDown(); } - public Iterable<InetAddress> replicasMissingMostRecentCommit(CFMetaData metadata, int nowInSec) + public Iterable<InetAddress> replicasMissingMostRecentCommit(TableMetadata metadata, int nowInSec) { // In general, we need every replicas that have answered to the prepare (a quorum) to agree on the MRC (see // coment in StorageProxy.beginAndRepairPaxos(), but basically we need to make sure at least a quorum of nodes http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/streaming/StreamReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamReader.java b/src/java/org/apache/cassandra/streaming/StreamReader.java index fab9372..0eee6f0 100644 --- a/src/java/org/apache/cassandra/streaming/StreamReader.java +++ b/src/java/org/apache/cassandra/streaming/StreamReader.java @@ -31,8 +31,8 @@ import org.slf4j.LoggerFactory; import com.ning.compress.lzf.LZFInputStream; -import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.config.Schema; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.db.*; import org.apache.cassandra.db.rows.*; import org.apache.cassandra.io.sstable.SSTableMultiWriter; @@ -42,22 +42,19 @@ import org.apache.cassandra.io.sstable.format.SSTableFormat; import org.apache.cassandra.io.sstable.format.Version; import org.apache.cassandra.io.util.RewindableDataInputStreamPlus; import org.apache.cassandra.io.util.DataInputPlus; -import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.streaming.messages.FileMessageHeader; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.io.util.TrackedInputStream; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Pair; -import static org.apache.cassandra.utils.Throwables.extractIOExceptionCause; - /** * StreamReader reads from stream and writes to SSTable. */ public class StreamReader { private static final Logger logger = LoggerFactory.getLogger(StreamReader.class); - protected final UUID cfId; + protected final TableId tableId; protected final long estimatedKeys; protected final Collection<Pair<Long, Long>> sections; protected final StreamSession session; @@ -71,7 +68,7 @@ public class StreamReader public StreamReader(FileMessageHeader header, StreamSession session) { this.session = session; - this.cfId = header.cfId; + this.tableId = header.tableId; this.estimatedKeys = header.estimatedKeys; this.sections = header.sections; this.inputVersion = header.version; @@ -92,15 +89,11 @@ public class StreamReader { long totalSize = totalSize(); - Pair<String, String> kscf = Schema.instance.getCF(cfId); - ColumnFamilyStore cfs = null; - if (kscf != null) - cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right); - - if (kscf == null || cfs == null) + ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(tableId); + if (cfs == null) { // schema was dropped during streaming - throw new IOException("CF " + cfId + " was dropped during streaming"); + throw new IOException("CF " + tableId + " was dropped during streaming"); } logger.debug("[Stream #{}] Start receiving file #{} from {}, repairedAt = {}, size = {}, ks = '{}', table = '{}'.", @@ -108,7 +101,7 @@ public class StreamReader cfs.getColumnFamilyName()); TrackedInputStream in = new TrackedInputStream(new LZFInputStream(Channels.newInputStream(channel))); - StreamDeserializer deserializer = new StreamDeserializer(cfs.metadata, in, inputVersion, getHeader(cfs.metadata), + StreamDeserializer deserializer = new StreamDeserializer(cfs.metadata(), in, inputVersion, getHeader(cfs.metadata()), totalSize, session.planId()); SSTableMultiWriter writer = null; try @@ -142,7 +135,7 @@ public class StreamReader } } - protected SerializationHeader getHeader(CFMetaData metadata) + protected SerializationHeader getHeader(TableMetadata metadata) { return header != null? header.toHeader(metadata) : null; //pre-3.0 sstable have no SerializationHeader } @@ -153,7 +146,7 @@ public class StreamReader if (localDir == null) throw new IOException(String.format("Insufficient disk space to store %s", FBUtilities.prettyPrintMemory(totalSize))); - RangeAwareSSTableWriter writer = new RangeAwareSSTableWriter(cfs, estimatedKeys, repairedAt, format, sstableLevel, totalSize, session.getTransaction(cfId), getHeader(cfs.metadata)); + RangeAwareSSTableWriter writer = new RangeAwareSSTableWriter(cfs, estimatedKeys, repairedAt, format, sstableLevel, totalSize, session.getTransaction(tableId), getHeader(cfs.metadata())); StreamHook.instance.reportIncomingFile(cfs, writer, session, fileSeqNum); return writer; } @@ -181,7 +174,7 @@ public class StreamReader public static final String BUFFER_FILE_PREFIX = "buf"; public static final String BUFFER_FILE_SUFFIX = "dat"; - private final CFMetaData metadata; + private final TableMetadata metadata; private final DataInputPlus in; private final SerializationHeader header; private final SerializationHelper helper; @@ -192,7 +185,7 @@ public class StreamReader private Row staticRow; private IOException exception; - public StreamDeserializer(CFMetaData metadata, InputStream in, Version version, SerializationHeader header, + public StreamDeserializer(TableMetadata metadata, InputStream in, Version version, SerializationHeader header, long totalSize, UUID sessionId) throws IOException { this.metadata = metadata; @@ -203,22 +196,22 @@ public class StreamReader public StreamDeserializer newPartition() throws IOException { - key = metadata.decorateKey(ByteBufferUtil.readWithShortLength(in)); + key = metadata.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(in)); partitionLevelDeletion = DeletionTime.serializer.deserialize(in); iterator = SSTableSimpleIterator.create(metadata, in, header, helper, partitionLevelDeletion); staticRow = iterator.readStaticRow(); return this; } - public CFMetaData metadata() + public TableMetadata metadata() { return metadata; } - public PartitionColumns columns() + public RegularAndStaticColumns columns() { // We don't know which columns we'll get so assume it can be all of them - return metadata.partitionColumns(); + return metadata.regularAndStaticColumns(); } public boolean isReverseOrder() @@ -308,13 +301,13 @@ public class StreamReader } } - private static File getTempBufferFile(CFMetaData metadata, long totalSize, UUID sessionId) throws IOException + private static File getTempBufferFile(TableMetadata metadata, long totalSize, UUID sessionId) throws IOException { - ColumnFamilyStore cfs = Keyspace.open(metadata.ksName).getColumnFamilyStore(metadata.cfName); + ColumnFamilyStore cfs = Keyspace.open(metadata.keyspace).getColumnFamilyStore(metadata.name); if (cfs == null) { // schema was dropped during streaming - throw new RuntimeException(String.format("CF %s.%s was dropped during streaming", metadata.ksName, metadata.cfName)); + throw new RuntimeException(String.format("Table %s was dropped during streaming", metadata.toString())); } long maxSize = Math.min(MAX_SPILL_FILE_SIZE, totalSize); http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java index 6c60b74..d0c4d50 100644 --- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java +++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java @@ -21,7 +21,6 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Set; -import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -30,7 +29,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.concurrent.NamedThreadFactory; -import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.Mutation; @@ -45,8 +43,8 @@ import org.apache.cassandra.dht.Token; import org.apache.cassandra.io.sstable.ISSTableScanner; import org.apache.cassandra.io.sstable.SSTableMultiWriter; import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.schema.TableId; import org.apache.cassandra.utils.JVMStabilityInspector; -import org.apache.cassandra.utils.Pair; import org.apache.cassandra.utils.Throwables; import org.apache.cassandra.utils.concurrent.Refs; @@ -75,9 +73,9 @@ public class StreamReceiveTask extends StreamTask private int remoteSSTablesReceived = 0; - public StreamReceiveTask(StreamSession session, UUID cfId, int totalFiles, long totalSize) + public StreamReceiveTask(StreamSession session, TableId tableId, int totalFiles, long totalSize) { - super(session, cfId); + super(session, tableId); this.totalFiles = totalFiles; this.totalSize = totalSize; // this is an "offline" transaction, as we currently manually expose the sstables once done; @@ -102,7 +100,7 @@ public class StreamReceiveTask extends StreamTask } remoteSSTablesReceived++; - assert cfId.equals(sstable.getCfId()); + assert tableId.equals(sstable.getTableId()); Collection<SSTableReader> finished = null; try @@ -136,7 +134,7 @@ public class StreamReceiveTask extends StreamTask public synchronized LifecycleTransaction getTransaction() { if (done) - throw new RuntimeException(String.format("Stream receive task %s of cf %s already finished.", session.planId(), cfId)); + throw new RuntimeException(String.format("Stream receive task %s of cf %s already finished.", session.planId(), tableId)); return txn; } @@ -156,8 +154,8 @@ public class StreamReceiveTask extends StreamTask ColumnFamilyStore cfs = null; try { - Pair<String, String> kscf = Schema.instance.getCF(task.cfId); - if (kscf == null) + cfs = ColumnFamilyStore.getIfExists(task.tableId); + if (cfs == null) { // schema was dropped during streaming task.sstables.clear(); @@ -165,9 +163,8 @@ public class StreamReceiveTask extends StreamTask task.session.taskCompleted(task); return; } - cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right); - hasViews = !Iterables.isEmpty(View.findAll(kscf.left, kscf.right)); - hasCDC = cfs.metadata.params.cdc; + hasViews = !Iterables.isEmpty(View.findAll(cfs.metadata.keyspace, cfs.getTableName())); + hasCDC = cfs.metadata().params.cdc; Collection<SSTableReader> readers = task.sstables; @@ -193,7 +190,7 @@ public class StreamReceiveTask extends StreamTask { try (UnfilteredRowIterator rowIterator = scanner.next()) { - Mutation m = new Mutation(PartitionUpdate.fromIterator(rowIterator, ColumnFilter.all(cfs.metadata))); + Mutation m = new Mutation(PartitionUpdate.fromIterator(rowIterator, ColumnFilter.all(cfs.metadata()))); // MV *can* be applied unsafe if there's no CDC on the CFS as we flush below // before transaction is done. @@ -215,7 +212,7 @@ public class StreamReceiveTask extends StreamTask cfs.indexManager.buildAllIndexesBlocking(readers); //invalidate row and counter cache - if (cfs.isRowCacheEnabled() || cfs.metadata.isCounter()) + if (cfs.isRowCacheEnabled() || cfs.metadata().isCounter()) { List<Bounds<Token>> boundsToInvalidate = new ArrayList<>(readers.size()); readers.forEach(sstable -> boundsToInvalidate.add(new Bounds<Token>(sstable.first.getToken(), sstable.last.getToken()))); @@ -230,7 +227,7 @@ public class StreamReceiveTask extends StreamTask cfs.keyspace.getName(), cfs.getTableName()); } - if (cfs.metadata.isCounter()) + if (cfs.metadata().isCounter()) { int invalidatedKeys = cfs.invalidateCounterCache(nonOverlappingBounds); if (invalidatedKeys > 0) http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/streaming/StreamSession.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java index 736d30f..faa05d1 100644 --- a/src/java/org/apache/cassandra/streaming/StreamSession.java +++ b/src/java/org/apache/cassandra/streaming/StreamSession.java @@ -45,6 +45,7 @@ import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.gms.*; import org.apache.cassandra.metrics.StreamingMetrics; +import org.apache.cassandra.schema.TableId; import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.streaming.messages.*; import org.apache.cassandra.utils.CassandraVersion; @@ -147,9 +148,9 @@ public class StreamSession implements IEndpointStateChangeSubscriber protected final Set<StreamRequest> requests = Sets.newConcurrentHashSet(); // streaming tasks are created and managed per ColumnFamily ID @VisibleForTesting - protected final ConcurrentHashMap<UUID, StreamTransferTask> transfers = new ConcurrentHashMap<>(); + protected final ConcurrentHashMap<TableId, StreamTransferTask> transfers = new ConcurrentHashMap<>(); // data receivers, filled after receiving prepare message - private final Map<UUID, StreamReceiveTask> receivers = new ConcurrentHashMap<>(); + private final Map<TableId, StreamReceiveTask> receivers = new ConcurrentHashMap<>(); private final StreamingMetrics metrics; /* can be null when session is created in remote */ private final StreamConnectionFactory factory; @@ -223,10 +224,10 @@ public class StreamSession implements IEndpointStateChangeSubscriber } - public LifecycleTransaction getTransaction(UUID cfId) + public LifecycleTransaction getTransaction(TableId tableId) { - assert receivers.containsKey(cfId); - return receivers.get(cfId).getTransaction(); + assert receivers.containsKey(tableId); + return receivers.get(tableId).getTransaction(); } private boolean isKeepAliveSupported() @@ -424,13 +425,13 @@ public class StreamSession implements IEndpointStateChangeSubscriber continue; } - UUID cfId = details.ref.get().metadata.cfId; - StreamTransferTask task = transfers.get(cfId); + TableId tableId = details.ref.get().metadata().id; + StreamTransferTask task = transfers.get(tableId); if (task == null) { //guarantee atomicity - StreamTransferTask newTask = new StreamTransferTask(this, cfId); - task = transfers.putIfAbsent(cfId, newTask); + StreamTransferTask newTask = new StreamTransferTask(this, tableId); + task = transfers.putIfAbsent(tableId, newTask); if (task == null) task = newTask; } @@ -525,7 +526,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber case RECEIVED: ReceivedMessage received = (ReceivedMessage) message; - received(received.cfId, received.sequenceNumber); + received(received.tableId, received.sequenceNumber); break; case COMPLETE: @@ -634,7 +635,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber StreamingMetrics.totalOutgoingBytes.inc(headerSize); metrics.outgoingBytes.inc(headerSize); // schedule timeout for receiving ACK - StreamTransferTask task = transfers.get(header.cfId); + StreamTransferTask task = transfers.get(header.tableId); if (task != null) { task.scheduleTimeout(header.sequenceNumber, 12, TimeUnit.HOURS); @@ -652,8 +653,8 @@ public class StreamSession implements IEndpointStateChangeSubscriber StreamingMetrics.totalIncomingBytes.inc(headerSize); metrics.incomingBytes.inc(headerSize); // send back file received message - handler.sendMessage(new ReceivedMessage(message.header.cfId, message.header.sequenceNumber)); - receivers.get(message.header.cfId).received(message.sstable); + handler.sendMessage(new ReceivedMessage(message.header.tableId, message.header.sequenceNumber)); + receivers.get(message.header.tableId).received(message.sstable); } public void progress(String filename, ProgressInfo.Direction direction, long bytes, long total) @@ -662,9 +663,9 @@ public class StreamSession implements IEndpointStateChangeSubscriber streamResult.handleProgress(progress); } - public void received(UUID cfId, int sequenceNumber) + public void received(TableId tableId, int sequenceNumber) { - transfers.get(cfId).complete(sequenceNumber); + transfers.get(tableId).complete(sequenceNumber); } /** @@ -723,13 +724,13 @@ public class StreamSession implements IEndpointStateChangeSubscriber public synchronized void taskCompleted(StreamReceiveTask completedTask) { - receivers.remove(completedTask.cfId); + receivers.remove(completedTask.tableId); maybeCompleted(); } public synchronized void taskCompleted(StreamTransferTask completedTask) { - transfers.remove(completedTask.cfId); + transfers.remove(completedTask.tableId); maybeCompleted(); } @@ -793,7 +794,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber { failIfFinished(); if (summary.files > 0) - receivers.put(summary.cfId, new StreamReceiveTask(this, summary.cfId, summary.files, summary.totalSize)); + receivers.put(summary.tableId, new StreamReceiveTask(this, summary.tableId, summary.files, summary.totalSize)); } private void startStreamingFiles() http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/streaming/StreamSummary.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamSummary.java b/src/java/org/apache/cassandra/streaming/StreamSummary.java index c427283..0d94f57 100644 --- a/src/java/org/apache/cassandra/streaming/StreamSummary.java +++ b/src/java/org/apache/cassandra/streaming/StreamSummary.java @@ -28,6 +28,7 @@ import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.schema.TableId; import org.apache.cassandra.utils.UUIDSerializer; /** @@ -37,7 +38,7 @@ public class StreamSummary implements Serializable { public static final IVersionedSerializer<StreamSummary> serializer = new StreamSummarySerializer(); - public final UUID cfId; + public final TableId tableId; /** * Number of files to transfer. Can be 0 if nothing to transfer for some streaming request. @@ -45,9 +46,9 @@ public class StreamSummary implements Serializable public final int files; public final long totalSize; - public StreamSummary(UUID cfId, int files, long totalSize) + public StreamSummary(TableId tableId, int files, long totalSize) { - this.cfId = cfId; + this.tableId = tableId; this.files = files; this.totalSize = totalSize; } @@ -58,20 +59,20 @@ public class StreamSummary implements Serializable if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; StreamSummary summary = (StreamSummary) o; - return files == summary.files && totalSize == summary.totalSize && cfId.equals(summary.cfId); + return files == summary.files && totalSize == summary.totalSize && tableId.equals(summary.tableId); } @Override public int hashCode() { - return Objects.hashCode(cfId, files, totalSize); + return Objects.hashCode(tableId, files, totalSize); } @Override public String toString() { final StringBuilder sb = new StringBuilder("StreamSummary{"); - sb.append("path=").append(cfId); + sb.append("path=").append(tableId); sb.append(", files=").append(files); sb.append(", totalSize=").append(totalSize); sb.append('}'); @@ -80,25 +81,24 @@ public class StreamSummary implements Serializable public static class StreamSummarySerializer implements IVersionedSerializer<StreamSummary> { - // arbitrary version is fine for UUIDSerializer for now... public void serialize(StreamSummary summary, DataOutputPlus out, int version) throws IOException { - UUIDSerializer.serializer.serialize(summary.cfId, out, MessagingService.current_version); + summary.tableId.serialize(out); out.writeInt(summary.files); out.writeLong(summary.totalSize); } public StreamSummary deserialize(DataInputPlus in, int version) throws IOException { - UUID cfId = UUIDSerializer.serializer.deserialize(in, MessagingService.current_version); + TableId tableId = TableId.deserialize(in); int files = in.readInt(); long totalSize = in.readLong(); - return new StreamSummary(cfId, files, totalSize); + return new StreamSummary(tableId, files, totalSize); } public long serializedSize(StreamSummary summary, int version) { - long size = UUIDSerializer.serializer.serializedSize(summary.cfId, MessagingService.current_version); + long size = summary.tableId.serializedSize(); size += TypeSizes.sizeof(summary.files); size += TypeSizes.sizeof(summary.totalSize); return size; http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/streaming/StreamTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamTask.java b/src/java/org/apache/cassandra/streaming/StreamTask.java index ac72cff..1e22c34 100644 --- a/src/java/org/apache/cassandra/streaming/StreamTask.java +++ b/src/java/org/apache/cassandra/streaming/StreamTask.java @@ -17,7 +17,7 @@ */ package org.apache.cassandra.streaming; -import java.util.UUID; +import org.apache.cassandra.schema.TableId; /** * StreamTask is an abstraction of the streaming task performed over specific ColumnFamily. @@ -27,12 +27,12 @@ public abstract class StreamTask /** StreamSession that this task belongs */ protected final StreamSession session; - protected final UUID cfId; + protected final TableId tableId; - protected StreamTask(StreamSession session, UUID cfId) + protected StreamTask(StreamSession session, TableId tableId) { this.session = session; - this.cfId = cfId; + this.tableId = tableId; } /** @@ -56,6 +56,6 @@ public abstract class StreamTask */ public StreamSummary getSummary() { - return new StreamSummary(cfId, getTotalNumberOfFiles(), getTotalSize()); + return new StreamSummary(tableId, getTotalNumberOfFiles(), getTotalSize()); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/streaming/StreamTransferTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java index 4f313c3..aa3251b 100644 --- a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java +++ b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java @@ -27,6 +27,7 @@ import com.google.common.base.Throwables; import org.apache.cassandra.concurrent.NamedThreadFactory; import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.schema.TableId; import org.apache.cassandra.streaming.messages.OutgoingFileMessage; import org.apache.cassandra.utils.Pair; import org.apache.cassandra.utils.concurrent.Ref; @@ -47,14 +48,14 @@ public class StreamTransferTask extends StreamTask private long totalSize; - public StreamTransferTask(StreamSession session, UUID cfId) + public StreamTransferTask(StreamSession session, TableId tableId) { - super(session, cfId); + super(session, tableId); } public synchronized void addTransferFile(Ref<SSTableReader> ref, long estimatedKeys, List<Pair<Long, Long>> sections, long repairedAt) { - assert ref.get() != null && cfId.equals(ref.get().metadata.cfId); + assert ref.get() != null && tableId.equals(ref.get().metadata().id); OutgoingFileMessage message = new OutgoingFileMessage(ref, sequenceNumber.getAndIncrement(), estimatedKeys, sections, repairedAt, session.keepSSTableLevel()); message = StreamHook.instance.reportOutgoingFile(session, ref.get(), message); files.put(message.header.sequenceNumber, message); http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java index 2044d4d..6ac607f 100644 --- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java +++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java @@ -25,9 +25,7 @@ import com.google.common.base.Throwables; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.ColumnFamilyStore; -import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.io.compress.CompressionMetadata; import org.apache.cassandra.io.sstable.SSTableMultiWriter; import org.apache.cassandra.io.util.TrackedInputStream; @@ -66,15 +64,12 @@ public class CompressedStreamReader extends StreamReader { long totalSize = totalSize(); - Pair<String, String> kscf = Schema.instance.getCF(cfId); - ColumnFamilyStore cfs = null; - if (kscf != null) - cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right); + ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(tableId); - if (kscf == null || cfs == null) + if (cfs == null) { // schema was dropped during streaming - throw new IOException("CF " + cfId + " was dropped during streaming"); + throw new IOException("CF " + tableId + " was dropped during streaming"); } logger.debug("[Stream #{}] Start receiving file #{} from {}, repairedAt = {}, size = {}, ks = '{}', table = '{}'.", @@ -85,7 +80,7 @@ public class CompressedStreamReader extends StreamReader ChecksumType.CRC32, cfs::getCrcCheckChance); TrackedInputStream in = new TrackedInputStream(cis); - StreamDeserializer deserializer = new StreamDeserializer(cfs.metadata, in, inputVersion, getHeader(cfs.metadata), + StreamDeserializer deserializer = new StreamDeserializer(cfs.metadata(), in, inputVersion, getHeader(cfs.metadata()), totalSize, session.planId()); SSTableMultiWriter writer = null; try http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/streaming/management/StreamSummaryCompositeData.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/management/StreamSummaryCompositeData.java b/src/java/org/apache/cassandra/streaming/management/StreamSummaryCompositeData.java index 9ef23ab..a1f2496 100644 --- a/src/java/org/apache/cassandra/streaming/management/StreamSummaryCompositeData.java +++ b/src/java/org/apache/cassandra/streaming/management/StreamSummaryCompositeData.java @@ -19,18 +19,18 @@ package org.apache.cassandra.streaming.management; import java.util.HashMap; import java.util.Map; -import java.util.UUID; import javax.management.openmbean.*; import com.google.common.base.Throwables; +import org.apache.cassandra.schema.TableId; import org.apache.cassandra.streaming.StreamSummary; /** */ public class StreamSummaryCompositeData { - private static final String[] ITEM_NAMES = new String[]{"cfId", + private static final String[] ITEM_NAMES = new String[]{"tableId", "files", "totalSize"}; private static final String[] ITEM_DESCS = new String[]{"ColumnFamilu ID", @@ -60,7 +60,7 @@ public class StreamSummaryCompositeData public static CompositeData toCompositeData(StreamSummary streamSummary) { Map<String, Object> valueMap = new HashMap<>(); - valueMap.put(ITEM_NAMES[0], streamSummary.cfId.toString()); + valueMap.put(ITEM_NAMES[0], streamSummary.tableId.toString()); valueMap.put(ITEM_NAMES[1], streamSummary.files); valueMap.put(ITEM_NAMES[2], streamSummary.totalSize); try @@ -76,7 +76,7 @@ public class StreamSummaryCompositeData public static StreamSummary fromCompositeData(CompositeData cd) { Object[] values = cd.getAll(ITEM_NAMES); - return new StreamSummary(UUID.fromString((String) values[0]), + return new StreamSummary(TableId.fromString((String) values[0]), (int) values[1], (long) values[2]); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java index b0639ea..a37420b 100644 --- a/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java +++ b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java @@ -20,7 +20,6 @@ package org.apache.cassandra.streaming.messages; import java.io.IOException; import java.util.ArrayList; import java.util.List; -import java.util.UUID; import org.apache.cassandra.db.SerializationHeader; import org.apache.cassandra.db.TypeSizes; @@ -30,9 +29,9 @@ import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.io.sstable.format.Version; import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.schema.TableId; import org.apache.cassandra.streaming.compress.CompressionInfo; import org.apache.cassandra.utils.Pair; -import org.apache.cassandra.utils.UUIDSerializer; /** * StreamingFileHeader is appended before sending actual data to describe what it's sending. @@ -41,7 +40,7 @@ public class FileMessageHeader { public static FileMessageHeaderSerializer serializer = new FileMessageHeaderSerializer(); - public final UUID cfId; + public final TableId tableId; public final int sequenceNumber; /** SSTable version */ public final Version version; @@ -64,7 +63,7 @@ public class FileMessageHeader /* cached size value */ private transient final long size; - public FileMessageHeader(UUID cfId, + public FileMessageHeader(TableId tableId, int sequenceNumber, Version version, SSTableFormat.Type format, @@ -75,7 +74,7 @@ public class FileMessageHeader int sstableLevel, SerializationHeader.Component header) { - this.cfId = cfId; + this.tableId = tableId; this.sequenceNumber = sequenceNumber; this.version = version; this.format = format; @@ -89,7 +88,7 @@ public class FileMessageHeader this.size = calculateSize(); } - public FileMessageHeader(UUID cfId, + public FileMessageHeader(TableId tableId, int sequenceNumber, Version version, SSTableFormat.Type format, @@ -100,7 +99,7 @@ public class FileMessageHeader int sstableLevel, SerializationHeader.Component header) { - this.cfId = cfId; + this.tableId = tableId; this.sequenceNumber = sequenceNumber; this.version = version; this.format = format; @@ -152,7 +151,7 @@ public class FileMessageHeader public String toString() { final StringBuilder sb = new StringBuilder("Header ("); - sb.append("cfId: ").append(cfId); + sb.append("tableId: ").append(tableId); sb.append(", #").append(sequenceNumber); sb.append(", version: ").append(version); sb.append(", format: ").append(format); @@ -171,13 +170,13 @@ public class FileMessageHeader if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; FileMessageHeader that = (FileMessageHeader) o; - return sequenceNumber == that.sequenceNumber && cfId.equals(that.cfId); + return sequenceNumber == that.sequenceNumber && tableId.equals(that.tableId); } @Override public int hashCode() { - int result = cfId.hashCode(); + int result = tableId.hashCode(); result = 31 * result + sequenceNumber; return result; } @@ -186,7 +185,7 @@ public class FileMessageHeader { public CompressionInfo serialize(FileMessageHeader header, DataOutputPlus out, int version) throws IOException { - UUIDSerializer.serializer.serialize(header.cfId, out, version); + header.tableId.serialize(out); out.writeInt(header.sequenceNumber); out.writeUTF(header.version.toString()); out.writeUTF(header.format.name); @@ -212,7 +211,7 @@ public class FileMessageHeader public FileMessageHeader deserialize(DataInputPlus in, int version) throws IOException { - UUID cfId = UUIDSerializer.serializer.deserialize(in, MessagingService.current_version); + TableId tableId = TableId.deserialize(in); int sequenceNumber = in.readInt(); Version sstableVersion = SSTableFormat.Type.current().info.getVersion(in.readUTF()); SSTableFormat.Type format = SSTableFormat.Type.validate(in.readUTF()); @@ -227,12 +226,12 @@ public class FileMessageHeader int sstableLevel = in.readInt(); SerializationHeader.Component header = SerializationHeader.serializer.deserialize(sstableVersion, in); - return new FileMessageHeader(cfId, sequenceNumber, sstableVersion, format, estimatedKeys, sections, compressionInfo, repairedAt, sstableLevel, header); + return new FileMessageHeader(tableId, sequenceNumber, sstableVersion, format, estimatedKeys, sections, compressionInfo, repairedAt, sstableLevel, header); } public long serializedSize(FileMessageHeader header, int version) { - long size = UUIDSerializer.serializer.serializedSize(header.cfId, version); + long size = header.tableId.serializedSize(); size += TypeSizes.sizeof(header.sequenceNumber); size += TypeSizes.sizeof(header.version.toString()); size += TypeSizes.sizeof(header.format.name); http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java index 6723d17..fba9ec4 100644 --- a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java +++ b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java @@ -72,7 +72,7 @@ public class OutgoingFileMessage extends StreamMessage SSTableReader sstable = ref.get(); filename = sstable.getFilename(); - this.header = new FileMessageHeader(sstable.metadata.cfId, + this.header = new FileMessageHeader(sstable.metadata().id, sequenceNumber, sstable.descriptor.version, sstable.descriptor.formatType, @@ -81,7 +81,7 @@ public class OutgoingFileMessage extends StreamMessage sstable.compression ? sstable.getCompressionMetadata() : null, repairedAt, keepSSTableLevel ? sstable.getSSTableLevel() : 0, - sstable.header == null ? null : sstable.header.toComponent()); + sstable.header.toComponent()); } public synchronized void serialize(DataOutputStreamPlus out, int version, StreamSession session) throws IOException http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/streaming/messages/ReceivedMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/messages/ReceivedMessage.java b/src/java/org/apache/cassandra/streaming/messages/ReceivedMessage.java index 251b9c8..55dd7e6 100644 --- a/src/java/org/apache/cassandra/streaming/messages/ReceivedMessage.java +++ b/src/java/org/apache/cassandra/streaming/messages/ReceivedMessage.java @@ -20,14 +20,12 @@ package org.apache.cassandra.streaming.messages; import java.io.*; import java.nio.channels.Channels; import java.nio.channels.ReadableByteChannel; -import java.util.UUID; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataInputPlus.DataInputStreamPlus; import org.apache.cassandra.io.util.DataOutputStreamPlus; -import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.schema.TableId; import org.apache.cassandra.streaming.StreamSession; -import org.apache.cassandra.utils.UUIDSerializer; public class ReceivedMessage extends StreamMessage { @@ -37,23 +35,23 @@ public class ReceivedMessage extends StreamMessage public ReceivedMessage deserialize(ReadableByteChannel in, int version, StreamSession session) throws IOException { DataInputPlus input = new DataInputStreamPlus(Channels.newInputStream(in)); - return new ReceivedMessage(UUIDSerializer.serializer.deserialize(input, MessagingService.current_version), input.readInt()); + return new ReceivedMessage(TableId.deserialize(input), input.readInt()); } public void serialize(ReceivedMessage message, DataOutputStreamPlus out, int version, StreamSession session) throws IOException { - UUIDSerializer.serializer.serialize(message.cfId, out, MessagingService.current_version); + message.tableId.serialize(out); out.writeInt(message.sequenceNumber); } }; - public final UUID cfId; + public final TableId tableId; public final int sequenceNumber; - public ReceivedMessage(UUID cfId, int sequenceNumber) + public ReceivedMessage(TableId tableId, int sequenceNumber) { super(Type.RECEIVED); - this.cfId = cfId; + this.tableId = tableId; this.sequenceNumber = sequenceNumber; } @@ -61,7 +59,7 @@ public class ReceivedMessage extends StreamMessage public String toString() { final StringBuilder sb = new StringBuilder("Received ("); - sb.append(cfId).append(", #").append(sequenceNumber).append(')'); + sb.append(tableId).append(", #").append(sequenceNumber).append(')'); return sb.toString(); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/tools/JsonTransformer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/JsonTransformer.java b/src/java/org/apache/cassandra/tools/JsonTransformer.java index dde732a..1d05103 100644 --- a/src/java/org/apache/cassandra/tools/JsonTransformer.java +++ b/src/java/org/apache/cassandra/tools/JsonTransformer.java @@ -30,8 +30,9 @@ import java.util.List; import java.util.concurrent.TimeUnit; import java.util.stream.Stream; -import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.config.ColumnDefinition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.cassandra.db.*; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.db.marshal.CollectionType; @@ -46,14 +47,14 @@ import org.apache.cassandra.db.rows.Row; import org.apache.cassandra.db.rows.Unfiltered; import org.apache.cassandra.db.rows.UnfilteredRowIterator; import org.apache.cassandra.io.sstable.ISSTableScanner; +import org.apache.cassandra.schema.ColumnMetadata; +import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.utils.ByteBufferUtil; import org.codehaus.jackson.JsonFactory; import org.codehaus.jackson.JsonGenerator; import org.codehaus.jackson.impl.Indenter; -import org.codehaus.jackson.util.DefaultPrettyPrinter; import org.codehaus.jackson.util.DefaultPrettyPrinter.NopIndenter; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.codehaus.jackson.util.DefaultPrettyPrinter; public final class JsonTransformer { @@ -68,7 +69,7 @@ public final class JsonTransformer private final CompactIndenter arrayIndenter = new CompactIndenter(); - private final CFMetaData metadata; + private final TableMetadata metadata; private final ISSTableScanner currentScanner; @@ -76,7 +77,7 @@ public final class JsonTransformer private long currentPosition = 0; - private JsonTransformer(JsonGenerator json, ISSTableScanner currentScanner, boolean rawTime, CFMetaData metadata) + private JsonTransformer(JsonGenerator json, ISSTableScanner currentScanner, boolean rawTime, TableMetadata metadata) { this.json = json; this.metadata = metadata; @@ -89,7 +90,7 @@ public final class JsonTransformer json.setPrettyPrinter(prettyPrinter); } - public static void toJson(ISSTableScanner currentScanner, Stream<UnfilteredRowIterator> partitions, boolean rawTime, CFMetaData metadata, OutputStream out) + public static void toJson(ISSTableScanner currentScanner, Stream<UnfilteredRowIterator> partitions, boolean rawTime, TableMetadata metadata, OutputStream out) throws IOException { try (JsonGenerator json = jsonFactory.createJsonGenerator(new OutputStreamWriter(out, StandardCharsets.UTF_8))) @@ -101,7 +102,7 @@ public final class JsonTransformer } } - public static void keysToJson(ISSTableScanner currentScanner, Stream<DecoratedKey> keys, boolean rawTime, CFMetaData metadata, OutputStream out) throws IOException + public static void keysToJson(ISSTableScanner currentScanner, Stream<DecoratedKey> keys, boolean rawTime, TableMetadata metadata, OutputStream out) throws IOException { try (JsonGenerator json = jsonFactory.createJsonGenerator(new OutputStreamWriter(out, StandardCharsets.UTF_8))) { @@ -119,7 +120,7 @@ public final class JsonTransformer private void serializePartitionKey(DecoratedKey key) { - AbstractType<?> keyValidator = metadata.getKeyValidator(); + AbstractType<?> keyValidator = metadata.partitionKeyType; objectIndenter.setCompact(true); try { @@ -223,7 +224,7 @@ public final class JsonTransformer } catch (IOException e) { - String key = metadata.getKeyValidator().getString(partition.partitionKey().getKey()); + String key = metadata.partitionKeyType.getString(partition.partitionKey().getKey()); logger.error("Fatal error parsing partition: {}", key, e); } } @@ -334,10 +335,10 @@ public final class JsonTransformer objectIndenter.setCompact(true); json.writeStartArray(); arrayIndenter.setCompact(true); - List<ColumnDefinition> clusteringColumns = metadata.clusteringColumns(); + List<ColumnMetadata> clusteringColumns = metadata.clusteringColumns(); for (int i = 0; i < clusteringColumns.size(); i++) { - ColumnDefinition column = clusteringColumns.get(i); + ColumnMetadata column = clusteringColumns.get(i); if (i >= clustering.size()) { json.writeString("*"); http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/tools/SSTableExpiredBlockers.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/SSTableExpiredBlockers.java b/src/java/org/apache/cassandra/tools/SSTableExpiredBlockers.java index 1f407cb..56c57d9 100644 --- a/src/java/org/apache/cassandra/tools/SSTableExpiredBlockers.java +++ b/src/java/org/apache/cassandra/tools/SSTableExpiredBlockers.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.tools; -import java.io.IOException; import java.io.PrintStream; import java.util.Collections; import java.util.HashSet; @@ -27,8 +26,8 @@ import java.util.Set; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.Multimap; -import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.config.Schema; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.schema.Schema; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Directories; import org.apache.cassandra.db.Keyspace; @@ -46,7 +45,7 @@ import org.apache.cassandra.io.sstable.format.SSTableReader; */ public class SSTableExpiredBlockers { - public static void main(String[] args) throws IOException + public static void main(String[] args) { PrintStream out = System.out; if (args.length < 2) @@ -61,11 +60,7 @@ public class SSTableExpiredBlockers String columnfamily = args[args.length - 1]; Schema.instance.loadFromDisk(false); - CFMetaData metadata = Schema.instance.getCFMetaData(keyspace, columnfamily); - if (metadata == null) - throw new IllegalArgumentException(String.format("Unknown keyspace/table %s.%s", - keyspace, - columnfamily)); + TableMetadata metadata = Schema.instance.validateTable(keyspace, columnfamily); Keyspace ks = Keyspace.openWithoutSSTables(keyspace); ColumnFamilyStore cfs = ks.getColumnFamilyStore(columnfamily); http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/tools/SSTableExport.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/SSTableExport.java b/src/java/org/apache/cassandra/tools/SSTableExport.java index 52d5ecf..913ee1f 100644 --- a/src/java/org/apache/cassandra/tools/SSTableExport.java +++ b/src/java/org/apache/cassandra/tools/SSTableExport.java @@ -27,7 +27,7 @@ import java.util.stream.StreamSupport; import org.apache.commons.cli.*; -import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.cql3.ColumnIdentifier; import org.apache.cassandra.db.DecoratedKey; @@ -43,6 +43,7 @@ import org.apache.cassandra.io.sstable.KeyIterator; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.sstable.metadata.MetadataComponent; import org.apache.cassandra.io.sstable.metadata.MetadataType; +import org.apache.cassandra.schema.TableMetadataRef; import org.apache.cassandra.utils.FBUtilities; /** @@ -88,10 +89,10 @@ public class SSTableExport * Construct table schema from info stored in SSTable's Stats.db * * @param desc SSTable's descriptor - * @return Restored CFMetaData + * @return Restored TableMetadata * @throws IOException when Stats.db cannot be read */ - public static CFMetaData metadataFromSSTable(Descriptor desc) throws IOException + public static TableMetadata metadataFromSSTable(Descriptor desc) throws IOException { if (!desc.version.isCompatible()) throw new IOException("Cannot process old and unsupported SSTable version."); @@ -101,7 +102,7 @@ public class SSTableExport SerializationHeader.Component header = (SerializationHeader.Component) sstableMetadata.get(MetadataType.HEADER); IPartitioner partitioner = FBUtilities.newPartitioner(desc); - CFMetaData.Builder builder = CFMetaData.Builder.create("keyspace", "table").withPartitioner(partitioner); + TableMetadata.Builder builder = TableMetadata.builder("keyspace", "table").partitioner(partitioner); header.getStaticColumns().entrySet().stream() .forEach(entry -> { ColumnIdentifier ident = ColumnIdentifier.getInterned(UTF8Type.instance.getString(entry.getKey()), true); @@ -112,7 +113,7 @@ public class SSTableExport ColumnIdentifier ident = ColumnIdentifier.getInterned(UTF8Type.instance.getString(entry.getKey()), true); builder.addRegularColumn(ident, entry.getValue()); }); - builder.addPartitionKey("PartitionKey", header.getKeyType()); + builder.addPartitionKeyColumn("PartitionKey", header.getKeyType()); for (int i = 0; i < header.getClusteringTypes().size(); i++) { builder.addClusteringColumn("clustering" + (i > 0 ? i : ""), header.getClusteringTypes().get(i)); @@ -170,7 +171,7 @@ public class SSTableExport Descriptor desc = Descriptor.fromFilename(ssTableFileName); try { - CFMetaData metadata = metadataFromSSTable(desc); + TableMetadata metadata = metadataFromSSTable(desc); if (cmd.hasOption(ENUMERATE_KEYS_OPTION)) { try (KeyIterator iter = new KeyIterator(desc, metadata)) @@ -183,14 +184,14 @@ public class SSTableExport } else { - SSTableReader sstable = SSTableReader.openNoValidation(desc, metadata); + SSTableReader sstable = SSTableReader.openNoValidation(desc, TableMetadataRef.forOfflineTools(metadata)); IPartitioner partitioner = sstable.getPartitioner(); final ISSTableScanner currentScanner; if ((keys != null) && (keys.length > 0)) { List<AbstractBounds<PartitionPosition>> bounds = Arrays.stream(keys) .filter(key -> !excludes.contains(key)) - .map(metadata.getKeyValidator()::fromString) + .map(metadata.partitionKeyType::fromString) .map(partitioner::decorateKey) .sorted() .map(DecoratedKey::getToken) @@ -202,7 +203,7 @@ public class SSTableExport currentScanner = sstable.getScanner(); } Stream<UnfilteredRowIterator> partitions = iterToStream(currentScanner).filter(i -> - excludes.isEmpty() || !excludes.contains(metadata.getKeyValidator().getString(i.partitionKey().getKey())) + excludes.isEmpty() || !excludes.contains(metadata.partitionKeyType.getString(i.partitionKey().getKey())) ); if (cmd.hasOption(DEBUG_OUTPUT_OPTION)) { @@ -213,19 +214,19 @@ public class SSTableExport if (!partition.partitionLevelDeletion().isLive()) { - System.out.println("[" + metadata.getKeyValidator().getString(partition.partitionKey().getKey()) + "]@" + + System.out.println("[" + metadata.partitionKeyType.getString(partition.partitionKey().getKey()) + "]@" + position.get() + " " + partition.partitionLevelDeletion()); } if (!partition.staticRow().isEmpty()) { - System.out.println("[" + metadata.getKeyValidator().getString(partition.partitionKey().getKey()) + "]@" + + System.out.println("[" + metadata.partitionKeyType.getString(partition.partitionKey().getKey()) + "]@" + position.get() + " " + partition.staticRow().toString(metadata, true)); } partition.forEachRemaining(row -> { System.out.println( - "[" + metadata.getKeyValidator().getString(partition.partitionKey().getKey()) + "]@" - + position.get() + " " + row.toString(metadata, false, true)); + "[" + metadata.partitionKeyType.getString(partition.partitionKey().getKey()) + "]@" + + position.get() + " " + row.toString(metadata, false, true)); position.set(currentScanner.getCurrentPosition()); }); }); http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/tools/SSTableLevelResetter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/SSTableLevelResetter.java b/src/java/org/apache/cassandra/tools/SSTableLevelResetter.java index 915edf1..3a66ef9 100644 --- a/src/java/org/apache/cassandra/tools/SSTableLevelResetter.java +++ b/src/java/org/apache/cassandra/tools/SSTableLevelResetter.java @@ -21,7 +21,7 @@ import java.io.PrintStream; import java.util.Map; import java.util.Set; -import org.apache.cassandra.config.Schema; +import org.apache.cassandra.schema.Schema; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Directories; import org.apache.cassandra.db.Keyspace; @@ -69,7 +69,7 @@ public class SSTableLevelResetter String keyspaceName = args[1]; String columnfamily = args[2]; // validate columnfamily - if (Schema.instance.getCFMetaData(keyspaceName, columnfamily) == null) + if (Schema.instance.getTableMetadataRef(keyspaceName, columnfamily) == null) { System.err.println("ColumnFamily not found: " + keyspaceName + "/" + columnfamily); System.exit(1); http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/tools/SSTableOfflineRelevel.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/SSTableOfflineRelevel.java b/src/java/org/apache/cassandra/tools/SSTableOfflineRelevel.java index 9f0395b..1116575 100644 --- a/src/java/org/apache/cassandra/tools/SSTableOfflineRelevel.java +++ b/src/java/org/apache/cassandra/tools/SSTableOfflineRelevel.java @@ -34,7 +34,7 @@ import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimap; import com.google.common.collect.SetMultimap; -import org.apache.cassandra.config.Schema; +import org.apache.cassandra.schema.Schema; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.Directories; @@ -92,7 +92,7 @@ public class SSTableOfflineRelevel String columnfamily = args[args.length - 1]; Schema.instance.loadFromDisk(false); - if (Schema.instance.getCFMetaData(keyspace, columnfamily) == null) + if (Schema.instance.getTableMetadataRef(keyspace, columnfamily) == null) throw new IllegalArgumentException(String.format("Unknown keyspace/columnFamily %s.%s", keyspace, columnfamily)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/tools/StandaloneSSTableUtil.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/StandaloneSSTableUtil.java b/src/java/org/apache/cassandra/tools/StandaloneSSTableUtil.java index 2e8ee0b..adfe7e0 100644 --- a/src/java/org/apache/cassandra/tools/StandaloneSSTableUtil.java +++ b/src/java/org/apache/cassandra/tools/StandaloneSSTableUtil.java @@ -18,9 +18,8 @@ */ package org.apache.cassandra.tools; -import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.config.Schema; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.schema.Schema; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Directories; import org.apache.cassandra.db.lifecycle.LifecycleTransaction; @@ -52,7 +51,7 @@ public class StandaloneSSTableUtil Util.initDatabaseDescriptor(); Schema.instance.loadFromDisk(false); - CFMetaData metadata = Schema.instance.getCFMetaData(options.keyspaceName, options.cfName); + TableMetadata metadata = Schema.instance.getTableMetadata(options.keyspaceName, options.cfName); if (metadata == null) throw new IllegalArgumentException(String.format("Unknown keyspace/table %s.%s", options.keyspaceName, @@ -82,7 +81,7 @@ public class StandaloneSSTableUtil } } - private static void listFiles(Options options, CFMetaData metadata, OutputHandler handler) throws IOException + private static void listFiles(Options options, TableMetadata metadata, OutputHandler handler) throws IOException { Directories directories = new Directories(metadata, ColumnFamilyStore.getInitialDirectories()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/tools/StandaloneScrubber.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/StandaloneScrubber.java b/src/java/org/apache/cassandra/tools/StandaloneScrubber.java index 54b340e..f7f48c8 100644 --- a/src/java/org/apache/cassandra/tools/StandaloneScrubber.java +++ b/src/java/org/apache/cassandra/tools/StandaloneScrubber.java @@ -28,7 +28,7 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import org.apache.commons.cli.*; -import org.apache.cassandra.config.Schema; +import org.apache.cassandra.schema.Schema; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Directories; import org.apache.cassandra.db.Keyspace; @@ -61,7 +61,7 @@ public class StandaloneScrubber // load keyspace descriptions. Schema.instance.loadFromDisk(false); - if (Schema.instance.getKSMetaData(options.keyspaceName) == null) + if (Schema.instance.getKeyspaceMetadata(options.keyspaceName) == null) throw new IllegalArgumentException(String.format("Unknown keyspace %s", options.keyspaceName)); // Do not load sstables since they might be broken http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/tools/StandaloneSplitter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/StandaloneSplitter.java b/src/java/org/apache/cassandra/tools/StandaloneSplitter.java index aaaa9db..c5be02e 100644 --- a/src/java/org/apache/cassandra/tools/StandaloneSplitter.java +++ b/src/java/org/apache/cassandra/tools/StandaloneSplitter.java @@ -22,7 +22,7 @@ import java.io.File; import java.util.*; import java.util.concurrent.TimeUnit; -import org.apache.cassandra.config.Schema; +import org.apache.cassandra.schema.Schema; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.commons.cli.*; http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java b/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java index e55b3a8..ed25e42 100644 --- a/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java +++ b/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file @@ -22,7 +22,6 @@ import java.util.concurrent.TimeUnit; import org.apache.commons.cli.*; -import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Directories; import org.apache.cassandra.db.Keyspace; @@ -33,6 +32,7 @@ import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.io.sstable.*; import org.apache.cassandra.io.sstable.format.SSTableFormat; import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.schema.Schema; import org.apache.cassandra.utils.JVMStabilityInspector; import org.apache.cassandra.utils.OutputHandler; @@ -55,7 +55,7 @@ public class StandaloneUpgrader // load keyspace descriptions. Schema.instance.loadFromDisk(false); - if (Schema.instance.getCFMetaData(options.keyspace, options.cf) == null) + if (Schema.instance.getTableMetadataRef(options.keyspace, options.cf) == null) throw new IllegalArgumentException(String.format("Unknown keyspace/table %s.%s", options.keyspace, options.cf)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/tools/StandaloneVerifier.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/StandaloneVerifier.java b/src/java/org/apache/cassandra/tools/StandaloneVerifier.java index ee55dd5..40dfbf7 100644 --- a/src/java/org/apache/cassandra/tools/StandaloneVerifier.java +++ b/src/java/org/apache/cassandra/tools/StandaloneVerifier.java @@ -18,7 +18,7 @@ */ package org.apache.cassandra.tools; -import org.apache.cassandra.config.Schema; +import org.apache.cassandra.schema.Schema; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Directories; import org.apache.cassandra.db.Keyspace; @@ -56,7 +56,7 @@ public class StandaloneVerifier boolean hasFailed = false; - if (Schema.instance.getCFMetaData(options.keyspaceName, options.cfName) == null) + if (Schema.instance.getTableMetadataRef(options.keyspaceName, options.cfName) == null) throw new IllegalArgumentException(String.format("Unknown keyspace/table %s.%s", options.keyspaceName, options.cfName)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/tools/nodetool/Cleanup.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/nodetool/Cleanup.java b/src/java/org/apache/cassandra/tools/nodetool/Cleanup.java index c964b2f..4102846 100644 --- a/src/java/org/apache/cassandra/tools/nodetool/Cleanup.java +++ b/src/java/org/apache/cassandra/tools/nodetool/Cleanup.java @@ -24,7 +24,7 @@ import java.util.ArrayList; import java.util.List; import io.airlift.command.Option; -import org.apache.cassandra.config.SchemaConstants; +import org.apache.cassandra.schema.SchemaConstants; import org.apache.cassandra.tools.NodeProbe; import org.apache.cassandra.tools.NodeTool.NodeToolCmd; http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/tools/nodetool/Repair.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/nodetool/Repair.java b/src/java/org/apache/cassandra/tools/nodetool/Repair.java index 350601a..48f929f 100644 --- a/src/java/org/apache/cassandra/tools/nodetool/Repair.java +++ b/src/java/org/apache/cassandra/tools/nodetool/Repair.java @@ -31,7 +31,7 @@ import java.util.Set; import com.google.common.collect.Sets; -import org.apache.cassandra.config.SchemaConstants; +import org.apache.cassandra.schema.SchemaConstants; import org.apache.cassandra.repair.RepairParallelism; import org.apache.cassandra.repair.messages.RepairOption; import org.apache.cassandra.tools.NodeProbe; http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/tracing/TraceKeyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tracing/TraceKeyspace.java b/src/java/org/apache/cassandra/tracing/TraceKeyspace.java index ac8b4f7..20c992c 100644 --- a/src/java/org/apache/cassandra/tracing/TraceKeyspace.java +++ b/src/java/org/apache/cassandra/tracing/TraceKeyspace.java @@ -20,18 +20,23 @@ package org.apache.cassandra.tracing; import java.net.InetAddress; import java.nio.ByteBuffer; import java.util.*; +import java.util.concurrent.TimeUnit; -import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.config.SchemaConstants; +import org.apache.cassandra.cql3.statements.CreateTableStatement; import org.apache.cassandra.db.Mutation; -import org.apache.cassandra.db.rows.Row; import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.db.rows.Row; import org.apache.cassandra.schema.KeyspaceMetadata; import org.apache.cassandra.schema.KeyspaceParams; +import org.apache.cassandra.schema.SchemaConstants; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.schema.Tables; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.UUIDGen; +import static java.lang.String.format; + public final class TraceKeyspace { private TraceKeyspace() @@ -41,36 +46,41 @@ public final class TraceKeyspace public static final String SESSIONS = "sessions"; public static final String EVENTS = "events"; - private static final CFMetaData Sessions = - compile(SESSIONS, - "tracing sessions", - "CREATE TABLE %s (" - + "session_id uuid," - + "command text," - + "client inet," - + "coordinator inet," - + "duration int," - + "parameters map<text, text>," - + "request text," - + "started_at timestamp," - + "PRIMARY KEY ((session_id)))"); - - private static final CFMetaData Events = - compile(EVENTS, - "tracing events", - "CREATE TABLE %s (" - + "session_id uuid," - + "event_id timeuuid," - + "activity text," - + "source inet," - + "source_elapsed int," - + "thread text," - + "PRIMARY KEY ((session_id), event_id))"); - - private static CFMetaData compile(String name, String description, String schema) + private static final TableMetadata Sessions = + parse(SESSIONS, + "tracing sessions", + "CREATE TABLE %s (" + + "session_id uuid," + + "command text," + + "client inet," + + "coordinator inet," + + "duration int," + + "parameters map<text, text>," + + "request text," + + "started_at timestamp," + + "PRIMARY KEY ((session_id)))"); + + private static final TableMetadata Events = + parse(EVENTS, + "tracing events", + "CREATE TABLE %s (" + + "session_id uuid," + + "event_id timeuuid," + + "activity text," + + "source inet," + + "source_elapsed int," + + "thread text," + + "PRIMARY KEY ((session_id), event_id))"); + + private static TableMetadata parse(String table, String description, String cql) { - return CFMetaData.compile(String.format(schema, name), SchemaConstants.TRACE_KEYSPACE_NAME) - .comment(description); + return CreateTableStatement.parse(format(cql, table), SchemaConstants.TRACE_KEYSPACE_NAME) + .id(TableId.forSystemTable(SchemaConstants.TRACE_KEYSPACE_NAME, table)) + .dcLocalReadRepairChance(0.0) + .gcGraceSeconds(0) + .memtableFlushPeriod((int) TimeUnit.HOURS.toMillis(1)) + .comment(description) + .build(); } public static KeyspaceMetadata metadata() http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/transport/Server.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/Server.java b/src/java/org/apache/cassandra/transport/Server.java index 1eeecac..f38d83d 100644 --- a/src/java/org/apache/cassandra/transport/Server.java +++ b/src/java/org/apache/cassandra/transport/Server.java @@ -49,6 +49,8 @@ import io.netty.util.internal.logging.Slf4JLoggerFactory; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.EncryptionOptions; import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.schema.Schema; +import org.apache.cassandra.schema.SchemaChangeListener; import org.apache.cassandra.security.SSLFactory; import org.apache.cassandra.service.*; import org.apache.cassandra.transport.messages.EventMessage; @@ -100,7 +102,7 @@ public class Server implements CassandraDaemon.Server eventExecutorGroup = builder.eventExecutorGroup; EventNotifier notifier = new EventNotifier(this); StorageService.instance.register(notifier); - MigrationManager.instance.register(notifier); + Schema.instance.registerListener(notifier); } public void stop() @@ -448,7 +450,7 @@ public class Server implements CassandraDaemon.Server } } - private static class EventNotifier extends MigrationListener implements IEndpointLifecycleSubscriber + private static class EventNotifier extends SchemaChangeListener implements IEndpointLifecycleSubscriber { private final Server server; @@ -584,12 +586,12 @@ public class Server implements CassandraDaemon.Server send(new Event.SchemaChange(Event.SchemaChange.Change.CREATED, ksName)); } - public void onCreateColumnFamily(String ksName, String cfName) + public void onCreateTable(String ksName, String cfName) { send(new Event.SchemaChange(Event.SchemaChange.Change.CREATED, Event.SchemaChange.Target.TABLE, ksName, cfName)); } - public void onCreateUserType(String ksName, String typeName) + public void onCreateType(String ksName, String typeName) { send(new Event.SchemaChange(Event.SchemaChange.Change.CREATED, Event.SchemaChange.Target.TYPE, ksName, typeName)); } @@ -606,28 +608,28 @@ public class Server implements CassandraDaemon.Server ksName, aggregateName, AbstractType.asCQLTypeStringList(argTypes))); } - public void onUpdateKeyspace(String ksName) + public void onAlterKeyspace(String ksName) { send(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, ksName)); } - public void onUpdateColumnFamily(String ksName, String cfName, boolean affectsStatements) + public void onAlterTable(String ksName, String cfName, boolean affectsStatements) { send(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, Event.SchemaChange.Target.TABLE, ksName, cfName)); } - public void onUpdateUserType(String ksName, String typeName) + public void onAlterType(String ksName, String typeName) { send(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, Event.SchemaChange.Target.TYPE, ksName, typeName)); } - public void onUpdateFunction(String ksName, String functionName, List<AbstractType<?>> argTypes) + public void onAlterFunction(String ksName, String functionName, List<AbstractType<?>> argTypes) { send(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, Event.SchemaChange.Target.FUNCTION, ksName, functionName, AbstractType.asCQLTypeStringList(argTypes))); } - public void onUpdateAggregate(String ksName, String aggregateName, List<AbstractType<?>> argTypes) + public void onAlterAggregate(String ksName, String aggregateName, List<AbstractType<?>> argTypes) { send(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, Event.SchemaChange.Target.AGGREGATE, ksName, aggregateName, AbstractType.asCQLTypeStringList(argTypes))); @@ -638,12 +640,12 @@ public class Server implements CassandraDaemon.Server send(new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, ksName)); } - public void onDropColumnFamily(String ksName, String cfName) + public void onDropTable(String ksName, String cfName) { send(new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, Event.SchemaChange.Target.TABLE, ksName, cfName)); } - public void onDropUserType(String ksName, String typeName) + public void onDropType(String ksName, String typeName) { send(new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, Event.SchemaChange.Target.TYPE, ksName, typeName)); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/triggers/TriggerExecutor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/triggers/TriggerExecutor.java b/src/java/org/apache/cassandra/triggers/TriggerExecutor.java index 703e69a..906b342 100644 --- a/src/java/org/apache/cassandra/triggers/TriggerExecutor.java +++ b/src/java/org/apache/cassandra/triggers/TriggerExecutor.java @@ -33,6 +33,7 @@ import org.apache.cassandra.db.*; import org.apache.cassandra.db.partitions.PartitionUpdate; import org.apache.cassandra.exceptions.CassandraException; import org.apache.cassandra.exceptions.InvalidRequestException; +import org.apache.cassandra.schema.TableId; import org.apache.cassandra.schema.TriggerMetadata; import org.apache.cassandra.schema.Triggers; import org.apache.cassandra.utils.FBUtilities; @@ -86,7 +87,7 @@ public class TriggerExecutor if (intermediate == null || intermediate.isEmpty()) return updates; - return PartitionUpdate.merge(validateForSinglePartition(updates.metadata().cfId, updates.partitionKey(), intermediate)); + return PartitionUpdate.merge(validateForSinglePartition(updates.metadata().id, updates.partitionKey(), intermediate)); } /** @@ -157,9 +158,9 @@ public class TriggerExecutor return merged; } - private List<PartitionUpdate> validateForSinglePartition(UUID cfId, - DecoratedKey key, - Collection<Mutation> tmutations) + private List<PartitionUpdate> validateForSinglePartition(TableId tableId, + DecoratedKey key, + Collection<Mutation> tmutations) throws InvalidRequestException { validate(tmutations); @@ -169,7 +170,7 @@ public class TriggerExecutor List<PartitionUpdate> updates = Lists.newArrayList(Iterables.getOnlyElement(tmutations).getPartitionUpdates()); if (updates.size() > 1) throw new InvalidRequestException("The updates generated by triggers are not all for the same partition"); - validateSamePartition(cfId, key, Iterables.getOnlyElement(updates)); + validateSamePartition(tableId, key, Iterables.getOnlyElement(updates)); return updates; } @@ -178,20 +179,20 @@ public class TriggerExecutor { for (PartitionUpdate update : mutation.getPartitionUpdates()) { - validateSamePartition(cfId, key, update); + validateSamePartition(tableId, key, update); updates.add(update); } } return updates; } - private void validateSamePartition(UUID cfId, DecoratedKey key, PartitionUpdate update) + private void validateSamePartition(TableId tableId, DecoratedKey key, PartitionUpdate update) throws InvalidRequestException { if (!key.equals(update.partitionKey())) throw new InvalidRequestException("Partition key of additional mutation does not match primary update key"); - if (!cfId.equals(update.metadata().cfId)) + if (!tableId.equals(update.metadata().id)) throw new InvalidRequestException("table of additional mutation does not match primary update table"); } @@ -211,7 +212,7 @@ public class TriggerExecutor */ private List<Mutation> executeInternal(PartitionUpdate update) { - Triggers triggers = update.metadata().getTriggers(); + Triggers triggers = update.metadata().triggers; if (triggers.isEmpty()) return null; List<Mutation> tmutations = Lists.newLinkedList(); @@ -238,7 +239,7 @@ public class TriggerExecutor } catch (Exception ex) { - throw new RuntimeException(String.format("Exception while executing trigger on table with ID: %s", update.metadata().cfId), ex); + throw new RuntimeException(String.format("Exception while executing trigger on table with ID: %s", update.metadata().id), ex); } finally {
