Updated Branches: refs/heads/trunk fbe63ab3d -> dfc9faf28
Fix streaming retry patch by yukim; reviewed by slebresne for CASSANDRA-5775 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/dfc9faf2 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/dfc9faf2 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/dfc9faf2 Branch: refs/heads/trunk Commit: dfc9faf28bccdd0e0da0681698ca5223968369d1 Parents: fbe63ab Author: Yuki Morishita <[email protected]> Authored: Mon Jul 22 08:50:58 2013 -0500 Committer: Yuki Morishita <[email protected]> Committed: Mon Jul 22 08:51:56 2013 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/streaming/StreamReader.java | 31 ++++++++--- .../cassandra/streaming/StreamSession.java | 19 +++++-- .../cassandra/streaming/StreamTransferTask.java | 4 +- .../compress/CompressedStreamReader.java | 13 ++--- .../streaming/messages/ReceivedMessage.java | 57 ++++++++++++++++++++ .../streaming/messages/StreamMessage.java | 7 +-- 7 files changed, 107 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfc9faf2/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 5e2f062..f7beb5e 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -17,6 +17,7 @@ * Update deletion timestamp in Commit#updatesWithPaxosTime (CASSANDRA-5787) * Thrift cas() method crashes if input columns are not sorted (CASSANDRA-5786) * Order columns names correctly when querying for CAS (CASSANDRA-5788) + * Fix streaming retry (CASSANDRA-5775) 2.0.0-beta1 http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfc9faf2/src/java/org/apache/cassandra/streaming/StreamReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamReader.java b/src/java/org/apache/cassandra/streaming/StreamReader.java index 5c19eb1..862f5a2 100644 --- a/src/java/org/apache/cassandra/streaming/StreamReader.java +++ b/src/java/org/apache/cassandra/streaming/StreamReader.java @@ -20,6 +20,7 @@ package org.apache.cassandra.streaming; import java.io.DataInput; import java.io.DataInputStream; import java.io.IOException; +import java.io.InputStream; import java.nio.channels.Channels; import java.nio.channels.ReadableByteChannel; import java.util.Collection; @@ -76,17 +77,12 @@ public class StreamReader Pair<String, String> kscf = Schema.instance.getCF(cfId); ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right); - Directories.DataDirectory localDir = cfs.directories.getLocationCapableOfSize(totalSize); - if (localDir == null) - throw new IOException("Insufficient disk space to store " + totalSize + " bytes"); - desc = Descriptor.fromFilename(cfs.getTempSSTablePath(cfs.directories.getLocationForDisk(localDir))); - SSTableWriter writer = new SSTableWriter(desc.filenameFor(Component.DATA), estimatedKeys); + SSTableWriter writer = createWriter(cfs, totalSize); + DataInputStream dis = new DataInputStream(new LZFInputStream(Channels.newInputStream(channel))); + BytesReadTracker in = new BytesReadTracker(dis); try { - DataInputStream dis = new DataInputStream(new LZFInputStream(Channels.newInputStream(channel))); - BytesReadTracker in = new BytesReadTracker(dis); - while (in.getBytesRead() < totalSize) { writeRow(writer, in, cfs); @@ -98,6 +94,7 @@ public class StreamReader catch (Throwable e) { writer.abort(); + drain(dis, in.getBytesRead()); if (e instanceof IOException) throw (IOException) e; else @@ -105,6 +102,24 @@ public class StreamReader } } + protected SSTableWriter createWriter(ColumnFamilyStore cfs, long totalSize) throws IOException + { + Directories.DataDirectory localDir = cfs.directories.getLocationCapableOfSize(totalSize); + if (localDir == null) + throw new IOException("Insufficient disk space to store " + totalSize + " bytes"); + desc = Descriptor.fromFilename(cfs.getTempSSTablePath(cfs.directories.getLocationForDisk(localDir))); + + return new SSTableWriter(desc.filenameFor(Component.DATA), estimatedKeys); + } + + protected void drain(InputStream dis, long bytesRead) throws IOException + { + long toSkip = totalSize() - bytesRead; + toSkip = toSkip - dis.skip(toSkip); + while (toSkip > 0) + toSkip = toSkip - dis.skip(toSkip); + } + protected long totalSize() { long size = 0; http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfc9faf2/src/java/org/apache/cassandra/streaming/StreamSession.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java index aeb4419..2c4b47d 100644 --- a/src/java/org/apache/cassandra/streaming/StreamSession.java +++ b/src/java/org/apache/cassandra/streaming/StreamSession.java @@ -19,7 +19,6 @@ package org.apache.cassandra.streaming; import java.io.IOException; import java.net.InetAddress; -import java.net.Socket; import java.util.*; import java.util.concurrent.Future; @@ -359,7 +358,12 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe break; case FILE: - received((FileMessage) message); + receive((FileMessage) message); + break; + + case RECEIVED: + ReceivedMessage received = (ReceivedMessage) message; + received(received.cfId, received.sequenceNumber); break; case RETRY: @@ -455,7 +459,6 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe { StreamingMetrics.totalOutgoingBytes.inc(header.size()); metrics.outgoingBytes.inc(header.size()); - transfers.get(header.cfId).complete(header.sequenceNumber); } /** @@ -463,10 +466,12 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe * * @param message received file */ - public void received(FileMessage message) + public void receive(FileMessage message) { StreamingMetrics.totalIncomingBytes.inc(message.header.size()); metrics.incomingBytes.inc(message.header.size()); + // send back file received message + handler.sendMessage(new ReceivedMessage(message.header.cfId, message.header.sequenceNumber)); receivers.get(message.header.cfId).received(message.sstable); } @@ -476,6 +481,11 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe streamResult.handleProgress(progress); } + public void received(UUID cfId, int sequenceNumber) + { + transfers.get(cfId).complete(sequenceNumber); + } + /** * Call back on receiving {@code StreamMessage.Type.RETRY} message. * @@ -513,6 +523,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe public void doRetry(FileMessageHeader header, Throwable e) { + logger.warn("retrying for following error", e); // retry retries++; if (retries > DatabaseDescriptor.getMaxStreamingRetries()) http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfc9faf2/src/java/org/apache/cassandra/streaming/StreamTransferTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java index 956692d..61ad058 100644 --- a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java +++ b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java @@ -49,9 +49,9 @@ public class StreamTransferTask extends StreamTask } /** - * Complete sending file. + * Received ACK for file at {@code sequenceNumber}. * - * @param sequenceNumber sequence number of completed file transfer + * @param sequenceNumber sequence number of file */ public void complete(int sequenceNumber) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfc9faf2/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java index 1e8308f..da290c3 100644 --- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java +++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java @@ -61,20 +61,16 @@ public class CompressedStreamReader extends StreamReader public SSTableReader read(ReadableByteChannel channel) throws IOException { long totalSize = totalSize(); - CompressedInputStream cis = new CompressedInputStream(Channels.newInputStream(channel), compressionInfo); Pair<String, String> kscf = Schema.instance.getCF(cfId); ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right); - Directories.DataDirectory localDir = cfs.directories.getLocationCapableOfSize(totalSize); - if (localDir == null) - throw new IOException("Insufficient disk space to store " + totalSize + " bytes"); - desc = Descriptor.fromFilename(cfs.getTempSSTablePath(cfs.directories.getLocationForDisk(localDir))); - SSTableWriter writer = new SSTableWriter(desc.filenameFor(Component.DATA), estimatedKeys); + SSTableWriter writer = createWriter(cfs, totalSize); + + CompressedInputStream cis = new CompressedInputStream(Channels.newInputStream(channel), compressionInfo); + BytesReadTracker in = new BytesReadTracker(new DataInputStream(cis)); try { - BytesReadTracker in = new BytesReadTracker(new DataInputStream(cis)); - for (Pair<Long, Long> section : sections) { long length = section.right - section.left; @@ -93,6 +89,7 @@ public class CompressedStreamReader extends StreamReader catch (Throwable e) { writer.abort(); + drain(cis, in.getBytesRead()); if (e instanceof IOException) throw (IOException) e; else http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfc9faf2/src/java/org/apache/cassandra/streaming/messages/ReceivedMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/messages/ReceivedMessage.java b/src/java/org/apache/cassandra/streaming/messages/ReceivedMessage.java new file mode 100644 index 0000000..daf8bf1 --- /dev/null +++ b/src/java/org/apache/cassandra/streaming/messages/ReceivedMessage.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.streaming.messages; + +import java.io.*; +import java.nio.channels.Channels; +import java.nio.channels.ReadableByteChannel; +import java.nio.channels.WritableByteChannel; +import java.util.UUID; + +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.streaming.StreamSession; +import org.apache.cassandra.utils.UUIDSerializer; + +public class ReceivedMessage extends StreamMessage +{ + public static Serializer<ReceivedMessage> serializer = new Serializer<ReceivedMessage>() + { + public ReceivedMessage deserialize(ReadableByteChannel in, int version, StreamSession session) throws IOException + { + DataInput input = new DataInputStream(Channels.newInputStream(in)); + return new ReceivedMessage(UUIDSerializer.serializer.deserialize(input, MessagingService.current_version), input.readInt()); + } + + public void serialize(ReceivedMessage message, WritableByteChannel out, int version, StreamSession session) throws IOException + { + DataOutput output = new DataOutputStream(Channels.newOutputStream(out)); + UUIDSerializer.serializer.serialize(message.cfId, output, MessagingService.current_version); + output.writeInt(message.sequenceNumber); + } + }; + + public final UUID cfId; + public final int sequenceNumber; + + public ReceivedMessage(UUID cfId, int sequenceNumber) + { + super(Type.RECEIVED); + this.cfId = cfId; + this.sequenceNumber = sequenceNumber; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfc9faf2/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java b/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java index f737675..11e9955 100644 --- a/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java +++ b/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java @@ -65,9 +65,10 @@ public abstract class StreamMessage { PREPARE(1, 5, PrepareMessage.serializer), FILE(2, 0, FileMessage.serializer), - RETRY(3, 1, RetryMessage.serializer), - COMPLETE(4, 4, CompleteMessage.serializer), - SESSION_FAILED(5, 5, SessionFailedMessage.serializer); + RECEIVED(3, 1, ReceivedMessage.serializer), + RETRY(4, 1, RetryMessage.serializer), + COMPLETE(5, 4, CompleteMessage.serializer), + SESSION_FAILED(6, 5, SessionFailedMessage.serializer); public static Type get(byte type) {
