http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/service/paxos/PrepareResponse.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/paxos/PrepareResponse.java b/src/java/org/apache/cassandra/service/paxos/PrepareResponse.java index e766e34..bf07402 100644 --- a/src/java/org/apache/cassandra/service/paxos/PrepareResponse.java +++ b/src/java/org/apache/cassandra/service/paxos/PrepareResponse.java @@ -23,14 +23,14 @@ package org.apache.cassandra.service.paxos; import java.io.DataInput; import java.io.IOException; -import java.nio.ByteBuffer; +import java.util.UUID; -import org.apache.cassandra.db.ArrayBackedSortedColumns; -import org.apache.cassandra.db.ColumnFamily; -import org.apache.cassandra.db.ColumnSerializer; +import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.db.rows.SerializationHelper; +import org.apache.cassandra.db.partitions.PartitionUpdate; import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.util.DataOutputPlus; -import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.utils.UUIDSerializer; public class PrepareResponse @@ -49,7 +49,7 @@ public class PrepareResponse public PrepareResponse(boolean promised, Commit inProgressCommit, Commit mostRecentCommit) { - assert inProgressCommit.key == mostRecentCommit.key; + assert inProgressCommit.update.partitionKey().equals(mostRecentCommit.update.partitionKey()); assert inProgressCommit.update.metadata() == mostRecentCommit.update.metadata(); this.promised = promised; @@ -68,38 +68,53 @@ public class PrepareResponse public void serialize(PrepareResponse response, DataOutputPlus out, int version) throws IOException { out.writeBoolean(response.promised); - ByteBufferUtil.writeWithShortLength(response.inProgressCommit.key, out); - UUIDSerializer.serializer.serialize(response.inProgressCommit.ballot, out, version); - ColumnFamily.serializer.serialize(response.inProgressCommit.update, out, version); - UUIDSerializer.serializer.serialize(response.mostRecentCommit.ballot, out, version); - ColumnFamily.serializer.serialize(response.mostRecentCommit.update, out, version); + Commit.serializer.serialize(response.inProgressCommit, out, version); + + if (version < MessagingService.VERSION_30) + { + UUIDSerializer.serializer.serialize(response.mostRecentCommit.ballot, out, version); + PartitionUpdate.serializer.serialize(response.mostRecentCommit.update, out, version); + } + else + { + Commit.serializer.serialize(response.mostRecentCommit, out, version); + } } public PrepareResponse deserialize(DataInput in, int version) throws IOException { boolean success = in.readBoolean(); - ByteBuffer key = ByteBufferUtil.readWithShortLength(in); - return new PrepareResponse(success, - new Commit(key, - UUIDSerializer.serializer.deserialize(in, version), - ColumnFamily.serializer.deserialize(in, - ArrayBackedSortedColumns.factory, - ColumnSerializer.Flag.LOCAL, version)), - new Commit(key, - UUIDSerializer.serializer.deserialize(in, version), - ColumnFamily.serializer.deserialize(in, - ArrayBackedSortedColumns.factory, - ColumnSerializer.Flag.LOCAL, version))); + Commit inProgress = Commit.serializer.deserialize(in, version); + Commit mostRecent; + if (version < MessagingService.VERSION_30) + { + UUID ballot = UUIDSerializer.serializer.deserialize(in, version); + PartitionUpdate update = PartitionUpdate.serializer.deserialize(in, version, SerializationHelper.Flag.LOCAL, inProgress.update.partitionKey()); + mostRecent = new Commit(ballot, update); + } + else + { + mostRecent = Commit.serializer.deserialize(in, version); + } + return new PrepareResponse(success, inProgress, mostRecent); } public long serializedSize(PrepareResponse response, int version) { - return 1 - + 2 + response.inProgressCommit.key.remaining() - + UUIDSerializer.serializer.serializedSize(response.inProgressCommit.ballot, version) - + ColumnFamily.serializer.serializedSize(response.inProgressCommit.update, version) - + UUIDSerializer.serializer.serializedSize(response.mostRecentCommit.ballot, version) - + ColumnFamily.serializer.serializedSize(response.mostRecentCommit.update, version); + TypeSizes sizes = TypeSizes.NATIVE; + long size = sizes.sizeof(response.promised) + + Commit.serializer.serializedSize(response.inProgressCommit, version); + + if (version < MessagingService.VERSION_30) + { + size += UUIDSerializer.serializer.serializedSize(response.mostRecentCommit.ballot, version); + size += PartitionUpdate.serializer.serializedSize(response.mostRecentCommit.update, version, sizes); + } + else + { + size += Commit.serializer.serializedSize(response.mostRecentCommit, version); + } + return size; } } }
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/streaming/StreamReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamReader.java b/src/java/org/apache/cassandra/streaming/StreamReader.java index 1a3980d..66eb220 100644 --- a/src/java/org/apache/cassandra/streaming/StreamReader.java +++ b/src/java/org/apache/cassandra/streaming/StreamReader.java @@ -18,31 +18,35 @@ package org.apache.cassandra.streaming; import java.io.*; +import java.nio.ByteBuffer; import java.nio.channels.Channels; import java.nio.channels.ReadableByteChannel; import java.util.Collection; import java.util.UUID; import com.google.common.base.Throwables; -import org.apache.cassandra.io.sstable.format.SSTableFormat; -import org.apache.cassandra.io.sstable.format.SSTableWriter; -import org.apache.cassandra.io.sstable.format.Version; +import com.google.common.collect.UnmodifiableIterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.ning.compress.lzf.LZFInputStream; +import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.Schema; -import org.apache.cassandra.db.ColumnFamilyStore; -import org.apache.cassandra.db.DecoratedKey; -import org.apache.cassandra.db.Directories; -import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.db.context.CounterContext; import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.SSTableSimpleIterator; +import org.apache.cassandra.io.sstable.format.SSTableFormat; +import org.apache.cassandra.io.sstable.format.SSTableWriter; +import org.apache.cassandra.io.sstable.format.Version; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.streaming.messages.FileMessageHeader; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.BytesReadTracker; +import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Pair; @@ -60,6 +64,7 @@ public class StreamReader protected final long repairedAt; protected final SSTableFormat.Type format; protected final int sstableLevel; + protected final SerializationHeader.Component header; protected Descriptor desc; @@ -69,10 +74,11 @@ public class StreamReader this.cfId = header.cfId; this.estimatedKeys = header.estimatedKeys; this.sections = header.sections; - this.inputVersion = header.format.info.getVersion(header.version); + this.inputVersion = header.version; this.repairedAt = header.repairedAt; this.format = header.format; this.sstableLevel = header.sstableLevel; + this.header = header.header; } /** @@ -98,17 +104,18 @@ public class StreamReader DataInputStream dis = new DataInputStream(new LZFInputStream(Channels.newInputStream(channel))); BytesReadTracker in = new BytesReadTracker(dis); + StreamDeserializer deserializer = new StreamDeserializer(cfs.metadata, in, inputVersion, header.toHeader(cfs.metadata)); try { while (in.getBytesRead() < totalSize) { - writeRow(writer, in, cfs); - + writePartition(deserializer, writer, cfs); // TODO move this to BytesReadTracker session.progress(desc, ProgressInfo.Direction.IN, in.getBytesRead(), totalSize); } return writer; - } catch (Throwable e) + } + catch (Throwable e) { writer.abort(); drain(dis, in.getBytesRead()); @@ -126,7 +133,7 @@ public class StreamReader throw new IOException("Insufficient disk space to store " + totalSize + " bytes"); desc = Descriptor.fromFilename(cfs.getTempSSTablePath(cfs.directories.getLocationForDisk(localDir), format)); - return SSTableWriter.create(desc, estimatedKeys, repairedAt, sstableLevel); + return SSTableWriter.create(desc, estimatedKeys, repairedAt, sstableLevel, header.toHeader(cfs.metadata)); } protected void drain(InputStream dis, long bytesRead) throws IOException @@ -156,10 +163,141 @@ public class StreamReader return size; } - protected void writeRow(SSTableWriter writer, DataInput in, ColumnFamilyStore cfs) throws IOException + protected void writePartition(StreamDeserializer deserializer, SSTableWriter writer, ColumnFamilyStore cfs) throws IOException { - DecoratedKey key = StorageService.getPartitioner().decorateKey(ByteBufferUtil.readWithShortLength(in)); - writer.appendFromStream(key, cfs.metadata, in, inputVersion); - cfs.invalidateCachedRow(key); + DecoratedKey key = deserializer.newPartition(); + writer.append(deserializer); + deserializer.checkForExceptions(); + cfs.invalidateCachedPartition(key); + } + + public static class StreamDeserializer extends UnmodifiableIterator<Unfiltered> implements UnfilteredRowIterator + { + private final CFMetaData metadata; + private final DataInput in; + private final SerializationHeader header; + private final SerializationHelper helper; + + private DecoratedKey key; + private DeletionTime partitionLevelDeletion; + private SSTableSimpleIterator iterator; + private Row staticRow; + private IOException exception; + + private final CounterFilteredRow counterRow; + + public StreamDeserializer(CFMetaData metadata, DataInput in, Version version, SerializationHeader header) + { + assert version.storeRows() : "We don't allow streaming from pre-3.0 nodes"; + this.metadata = metadata; + this.in = in; + this.helper = new SerializationHelper(version.correspondingMessagingVersion(), SerializationHelper.Flag.PRESERVE_SIZE); + this.header = header; + this.counterRow = metadata.isCounter() ? new CounterFilteredRow() : null; + } + + public DecoratedKey newPartition() throws IOException + { + key = StorageService.getPartitioner().decorateKey(ByteBufferUtil.readWithShortLength(in)); + partitionLevelDeletion = DeletionTime.serializer.deserialize(in); + iterator = SSTableSimpleIterator.create(metadata, in, header, helper, partitionLevelDeletion); + staticRow = iterator.readStaticRow(); + return key; + } + + public CFMetaData metadata() + { + return metadata; + } + + public PartitionColumns columns() + { + // We don't know which columns we'll get so assume it can be all of them + return metadata.partitionColumns(); + } + + public boolean isReverseOrder() + { + return false; + } + + public DecoratedKey partitionKey() + { + return key; + } + + public DeletionTime partitionLevelDeletion() + { + return partitionLevelDeletion; + } + + public Row staticRow() + { + return staticRow; + } + + public RowStats stats() + { + return header.stats(); + } + + public boolean hasNext() + { + try + { + return iterator.hasNext(); + } + catch (IOError e) + { + if (e.getCause() != null && e.getCause() instanceof IOException) + { + exception = (IOException)e.getCause(); + return false; + } + throw e; + } + } + + public Unfiltered next() + { + // Note that in practice we know that IOException will be thrown by hasNext(), because that's + // where the actual reading happens, so we don't bother catching RuntimeException here (contrarily + // to what we do in hasNext) + Unfiltered unfiltered = iterator.next(); + return metadata.isCounter() && unfiltered.kind() == Unfiltered.Kind.ROW + ? maybeMarkLocalToBeCleared((Row) unfiltered) + : unfiltered; + } + + private Row maybeMarkLocalToBeCleared(Row row) + { + return metadata.isCounter() + ? counterRow.setTo(row) + : row; + } + + public void checkForExceptions() throws IOException + { + if (exception != null) + throw exception; + } + + public void close() + { + } + } + + private static class CounterFilteredRow extends WrappingRow + { + protected Cell filterCell(Cell cell) + { + if (!cell.isCounterCell()) + return cell; + + ByteBuffer marked = CounterContext.instance().markLocalToBeCleared(cell.value()); + return marked == cell.value() + ? cell + : Cells.create(cell.column(), true, marked, cell.livenessInfo(), cell.path()); + } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/streaming/StreamSession.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java index 44522db..d27c4e2 100644 --- a/src/java/org/apache/cassandra/streaming/StreamSession.java +++ b/src/java/org/apache/cassandra/streaming/StreamSession.java @@ -24,8 +24,6 @@ import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; -import javax.annotation.Nullable; - import com.google.common.base.Function; import com.google.common.collect.*; @@ -37,7 +35,7 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Keyspace; -import org.apache.cassandra.db.RowPosition; +import org.apache.cassandra.db.PartitionPosition; import org.apache.cassandra.dht.AbstractBounds; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; @@ -316,7 +314,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber { for (ColumnFamilyStore cfStore : stores) { - final List<AbstractBounds<RowPosition>> rowBoundsList = new ArrayList<>(ranges.size()); + final List<AbstractBounds<PartitionPosition>> rowBoundsList = new ArrayList<>(ranges.size()); for (Range<Token> range : ranges) rowBoundsList.add(Range.makeRowRange(range)); refs.addAll(cfStore.selectAndReference(new Function<View, List<SSTableReader>>() @@ -327,7 +325,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber Set<SSTableReader> sstables = Sets.newHashSet(); if (filteredSSTables != null) { - for (AbstractBounds<RowPosition> rowBounds : rowBoundsList) + for (AbstractBounds<PartitionPosition> rowBounds : rowBoundsList) { // sstableInBounds may contain early opened sstables for (SSTableReader sstable : view.sstablesInBounds(rowBounds)) http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java index 1936a94..47832f0 100644 --- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java +++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java @@ -79,6 +79,7 @@ public class CompressedStreamReader extends StreamReader CompressedInputStream cis = new CompressedInputStream(Channels.newInputStream(channel), compressionInfo); BytesReadTracker in = new BytesReadTracker(new DataInputStream(cis)); + StreamDeserializer deserializer = new StreamDeserializer(cfs.metadata, in, inputVersion, header.toHeader(cfs.metadata)); try { for (Pair<Long, Long> section : sections) @@ -92,8 +93,7 @@ public class CompressedStreamReader extends StreamReader while (in.getBytesRead() < sectionLength) { - writeRow(writer, in, cfs); - + writePartition(deserializer, writer, cfs); // when compressed, report total bytes of compressed chunks read since remoteFile.size is the sum of chunks transferred session.progress(desc, ProgressInfo.Direction.IN, cis.getTotalCompressedBytesRead(), totalSize); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java index e9c99fe..b8e7979 100644 --- a/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java +++ b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java @@ -23,11 +23,15 @@ import java.util.ArrayList; import java.util.List; import java.util.UUID; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.config.Schema; +import org.apache.cassandra.db.SerializationHeader; import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.compress.CompressionMetadata; import org.apache.cassandra.io.sstable.format.SSTableFormat; import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.io.sstable.format.Version; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.streaming.compress.CompressionInfo; import org.apache.cassandra.utils.Pair; @@ -43,7 +47,7 @@ public class FileMessageHeader public final UUID cfId; public final int sequenceNumber; /** SSTable version */ - public final String version; + public final Version version; /** SSTable format **/ public final SSTableFormat.Type format; @@ -52,16 +56,18 @@ public class FileMessageHeader public final CompressionInfo compressionInfo; public final long repairedAt; public final int sstableLevel; + public final SerializationHeader.Component header; public FileMessageHeader(UUID cfId, int sequenceNumber, - String version, + Version version, SSTableFormat.Type format, long estimatedKeys, List<Pair<Long, Long>> sections, CompressionInfo compressionInfo, long repairedAt, - int sstableLevel) + int sstableLevel, + SerializationHeader.Component header) { this.cfId = cfId; this.sequenceNumber = sequenceNumber; @@ -72,6 +78,7 @@ public class FileMessageHeader this.compressionInfo = compressionInfo; this.repairedAt = repairedAt; this.sstableLevel = sstableLevel; + this.header = header; } /** @@ -134,7 +141,7 @@ public class FileMessageHeader { UUIDSerializer.serializer.serialize(header.cfId, out, version); out.writeInt(header.sequenceNumber); - out.writeUTF(header.version); + out.writeUTF(header.version.toString()); //We can't stream to a node that doesn't understand a new sstable format if (version < StreamMessage.VERSION_22 && header.format != SSTableFormat.Type.LEGACY && header.format != SSTableFormat.Type.BIG) @@ -153,13 +160,16 @@ public class FileMessageHeader CompressionInfo.serializer.serialize(header.compressionInfo, out, version); out.writeLong(header.repairedAt); out.writeInt(header.sstableLevel); + + if (version >= StreamMessage.VERSION_30) + SerializationHeader.serializer.serialize(header.header, out); } public FileMessageHeader deserialize(DataInput in, int version) throws IOException { UUID cfId = UUIDSerializer.serializer.deserialize(in, MessagingService.current_version); int sequenceNumber = in.readInt(); - String sstableVersion = in.readUTF(); + Version sstableVersion = DatabaseDescriptor.getSSTableFormat().info.getVersion(in.readUTF()); SSTableFormat.Type format = SSTableFormat.Type.LEGACY; if (version >= StreamMessage.VERSION_22) @@ -173,14 +183,18 @@ public class FileMessageHeader CompressionInfo compressionInfo = CompressionInfo.serializer.deserialize(in, MessagingService.current_version); long repairedAt = in.readLong(); int sstableLevel = in.readInt(); - return new FileMessageHeader(cfId, sequenceNumber, sstableVersion, format, estimatedKeys, sections, compressionInfo, repairedAt, sstableLevel); + SerializationHeader.Component header = version >= StreamMessage.VERSION_30 + ? SerializationHeader.serializer.deserialize(sstableVersion, in) + : null; + + return new FileMessageHeader(cfId, sequenceNumber, sstableVersion, format, estimatedKeys, sections, compressionInfo, repairedAt, sstableLevel, header); } public long serializedSize(FileMessageHeader header, int version) { long size = UUIDSerializer.serializer.serializedSize(header.cfId, version); size += TypeSizes.NATIVE.sizeof(header.sequenceNumber); - size += TypeSizes.NATIVE.sizeof(header.version); + size += TypeSizes.NATIVE.sizeof(header.version.toString()); if (version >= StreamMessage.VERSION_22) size += TypeSizes.NATIVE.sizeof(header.format.name); @@ -195,6 +209,10 @@ public class FileMessageHeader } size += CompressionInfo.serializer.serializedSize(header.compressionInfo, version); size += TypeSizes.NATIVE.sizeof(header.sstableLevel); + + if (version >= StreamMessage.VERSION_30) + size += SerializationHeader.serializer.serializedSize(header.header); + return size; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java index 5b34bd8..82e6620 100644 --- a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java +++ b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java @@ -70,13 +70,14 @@ public class OutgoingFileMessage extends StreamMessage } this.header = new FileMessageHeader(sstable.metadata.cfId, sequenceNumber, - sstable.descriptor.version.toString(), + sstable.descriptor.version, sstable.descriptor.formatType, estimatedKeys, sections, compressionInfo, repairedAt, - keepSSTableLevel ? sstable.getSSTableLevel() : 0); + keepSSTableLevel ? sstable.getSSTableLevel() : 0, + sstable.header == null ? null : sstable.header.toComponent()); } public synchronized void serialize(DataOutputStreamPlus out, int version, StreamSession session) throws IOException http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java b/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java index d4e8a81..3db2dbf 100644 --- a/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java +++ b/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java @@ -34,7 +34,8 @@ public abstract class StreamMessage /** Streaming protocol version */ public static final int VERSION_20 = 2; public static final int VERSION_22 = 3; - public static final int CURRENT_VERSION = VERSION_22; + public static final int VERSION_30 = 4; + public static final int CURRENT_VERSION = VERSION_30; public static void serialize(StreamMessage message, DataOutputStreamPlus out, int version, StreamSession session) throws IOException {
