Author: jbellis
Date: Wed Dec 9 16:36:31 2009
New Revision: 888864
URL: http://svn.apache.org/viewvc?rev=888864&view=rev
Log:
Rename BootstrapMetadata and friends to StreamRequest* as that is what those
essentially are. Move them to cassandra.io where rest of streaming stuff
resides. patch by Jaakko Laine; reviewed by jbellis for CASSANDRA-564
Added:
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/StreamRequestMessage.java
(with props)
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/StreamRequestMetadata.java
(contents, props changed)
- copied, changed from r888712,
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadata.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/StreamRequestVerbHandler.java
(contents, props changed)
- copied, changed from r888712,
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataVerbHandler.java
Removed:
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadata.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataMessage.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataVerbHandler.java
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Streaming.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java?rev=888864&r1=888863&r2=888864&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
Wed Dec 9 16:36:31 2009
@@ -33,21 +33,13 @@
import org.apache.cassandra.locator.TokenMetadata;
import org.apache.cassandra.locator.AbstractReplicationStrategy;
import org.apache.cassandra.net.*;
- import org.apache.cassandra.net.io.StreamContextManager;
- import org.apache.cassandra.net.io.IStreamComplete;
import org.apache.cassandra.service.StorageService;
- import org.apache.cassandra.service.StreamManager;
- import org.apache.cassandra.utils.LogUtil;
import org.apache.cassandra.utils.SimpleCondition;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.gms.IFailureDetector;
- import org.apache.cassandra.io.DataInputBuffer;
- import org.apache.cassandra.io.SSTableReader;
- import org.apache.cassandra.io.SSTableWriter;
- import org.apache.cassandra.db.ColumnFamilyStore;
- import org.apache.cassandra.db.Table;
+ import org.apache.cassandra.io.Streaming;
import com.google.common.collect.Multimap;
import com.google.common.collect.ArrayListMultimap;
@@ -56,7 +48,7 @@
* This class handles the bootstrapping responsibilities for the local
endpoint.
*
* - bootstrapTokenVerb asks the most-loaded node what Token to use to split
its Range in two.
- * - bootstrapMetadataVerb tells source nodes to send us the necessary Ranges
+ * - streamRequestVerb tells source nodes to send us the necessary Ranges
* - source nodes send streamInitiateVerb to us to say "get ready to receive
data" [if there is data to send]
* - when we have everything set up to receive the data, we send
streamInitiateDoneVerb back to the source nodes and they start streaming
* - when streaming is complete, we send streamFinishedVerb to the source so
it can clean up on its end
@@ -96,12 +88,10 @@
for (Map.Entry<InetAddress, Collection<Range>> entry :
getWorkMap(rangesWithSourceTarget).asMap().entrySet())
{
InetAddress source = entry.getKey();
- if (logger.isDebugEnabled())
- logger.debug("Sending BootstrapMetadataMessage to " +
source + " for " + StringUtils.join(entry.getValue(), ", "));
- BootstrapMetadata bsMetadata = new
BootstrapMetadata(address, entry.getValue());
- Message message =
BootstrapMetadataMessage.makeBootstrapMetadataMessage(new
BootstrapMetadataMessage(bsMetadata));
- MessagingService.instance().sendOneWay(message, source);
StorageService.instance().addBootstrapSource(source);
+ if (logger.isDebugEnabled())
+ logger.debug("Requesting from " + source + " ranges "
+ StringUtils.join(entry.getValue(), ", "));
+ Streaming.requestRanges(source, entry.getValue());
}
}
}).start();
Added:
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/StreamRequestMessage.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/StreamRequestMessage.java?rev=888864&view=auto
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/StreamRequestMessage.java
(added)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/StreamRequestMessage.java
Wed Dec 9 16:36:31 2009
@@ -0,0 +1,93 @@
+ /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io;
+
+import java.io.*;
+
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.concurrent.StageManager;
+
+ /**
+ * This class encapsulates the message that needs to be sent to nodes
+ * that handoff data. The message contains information about ranges
+ * that need to be transferred and the target node.
+*/
+class StreamRequestMessage
+{
+ private static ICompactSerializer<StreamRequestMessage> serializer_;
+ static
+ {
+ serializer_ = new StreamRequestMessageSerializer();
+ }
+
+ protected static ICompactSerializer<StreamRequestMessage> serializer()
+ {
+ return serializer_;
+ }
+
+ protected static Message makeStreamRequestMessage(StreamRequestMessage
streamRequestMessage)
+ {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(bos);
+ try
+ {
+ StreamRequestMessage.serializer().serialize(streamRequestMessage,
dos);
+ }
+ catch (IOException e)
+ {
+ throw new IOError(e);
+ }
+ return new Message(FBUtilities.getLocalAddress(),
StageManager.streamStage_, StorageService.streamRequestVerbHandler_,
bos.toByteArray() );
+ }
+
+ protected StreamRequestMetadata[] streamRequestMetadata_ = new
StreamRequestMetadata[0];
+
+ // TODO only actually ever need one BM, not an array
+ StreamRequestMessage(StreamRequestMetadata... streamRequestMetadata)
+ {
+ assert streamRequestMetadata != null;
+ streamRequestMetadata_ = streamRequestMetadata;
+ }
+}
+
+class StreamRequestMessageSerializer implements
ICompactSerializer<StreamRequestMessage>
+{
+ public void serialize(StreamRequestMessage streamRequestMessage,
DataOutputStream dos) throws IOException
+ {
+ StreamRequestMetadata[] streamRequestMetadata =
streamRequestMessage.streamRequestMetadata_;
+ dos.writeInt(streamRequestMetadata.length);
+ for (StreamRequestMetadata bsmd : streamRequestMetadata)
+ {
+ StreamRequestMetadata.serializer().serialize(bsmd, dos);
+ }
+ }
+
+ public StreamRequestMessage deserialize(DataInputStream dis) throws
IOException
+ {
+ int size = dis.readInt();
+ StreamRequestMetadata[] streamRequestMetadata = new
StreamRequestMetadata[size];
+ for ( int i = 0; i < size; ++i )
+ {
+ streamRequestMetadata[i] =
StreamRequestMetadata.serializer().deserialize(dis);
+ }
+ return new StreamRequestMessage(streamRequestMetadata);
+ }
+}
Propchange:
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/StreamRequestMessage.java
------------------------------------------------------------------------------
svn:eol-style = native
Copied:
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/StreamRequestMetadata.java
(from r888712,
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadata.java)
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/StreamRequestMetadata.java?p2=incubator/cassandra/trunk/src/java/org/apache/cassandra/io/StreamRequestMetadata.java&p1=incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadata.java&r1=888712&r2=888864&rev=888864&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadata.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/StreamRequestMetadata.java
Wed Dec 9 16:36:31 2009
@@ -16,37 +16,34 @@
* limitations under the License.
*/
-package org.apache.cassandra.dht;
+package org.apache.cassandra.io;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
+import java.net.InetAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.Collection;
-import org.apache.cassandra.io.ICompactSerializer;
import org.apache.cassandra.net.CompactEndPointSerializationHelper;
-import java.net.InetAddress;
-
-
+import org.apache.cassandra.dht.Range;
/**
- * This encapsulates information of the list of
- * ranges that a target node requires in order to
- * be bootstrapped. This will be bundled in a
- * BootstrapMetadataMessage and sent to nodes that
- * are going to handoff the data.
+ * This encapsulates information of the list of ranges that a target
+ * node requires to be transferred. This will be bundled in a
+ * StreamRequestsMessage and sent to nodes that are going to handoff
+ * the data.
*/
-class BootstrapMetadata
+class StreamRequestMetadata
{
- private static ICompactSerializer<BootstrapMetadata> serializer_;
+ private static ICompactSerializer<StreamRequestMetadata> serializer_;
static
{
- serializer_ = new BootstrapMetadataSerializer();
+ serializer_ = new StreamRequestMetadataSerializer();
}
- protected static ICompactSerializer<BootstrapMetadata> serializer()
+ protected static ICompactSerializer<StreamRequestMetadata> serializer()
{
return serializer_;
}
@@ -54,7 +51,7 @@
protected InetAddress target_;
protected Collection<Range> ranges_;
- BootstrapMetadata(InetAddress target, Collection<Range> ranges)
+ StreamRequestMetadata(InetAddress target, Collection<Range> ranges)
{
target_ = target;
ranges_ = ranges;
@@ -74,19 +71,19 @@
}
}
-class BootstrapMetadataSerializer implements
ICompactSerializer<BootstrapMetadata>
+class StreamRequestMetadataSerializer implements
ICompactSerializer<StreamRequestMetadata>
{
- public void serialize(BootstrapMetadata bsMetadata, DataOutputStream dos)
throws IOException
+ public void serialize(StreamRequestMetadata srMetadata, DataOutputStream
dos) throws IOException
{
- CompactEndPointSerializationHelper.serialize(bsMetadata.target_, dos);
- dos.writeInt(bsMetadata.ranges_.size());
- for (Range range : bsMetadata.ranges_)
+ CompactEndPointSerializationHelper.serialize(srMetadata.target_, dos);
+ dos.writeInt(srMetadata.ranges_.size());
+ for (Range range : srMetadata.ranges_)
{
Range.serializer().serialize(range, dos);
}
}
- public BootstrapMetadata deserialize(DataInputStream dis) throws
IOException
+ public StreamRequestMetadata deserialize(DataInputStream dis) throws
IOException
{
InetAddress target =
CompactEndPointSerializationHelper.deserialize(dis);
int size = dis.readInt();
@@ -95,7 +92,7 @@
{
ranges.add(Range.serializer().deserialize(dis));
}
- return new BootstrapMetadata( target, ranges );
+ return new StreamRequestMetadata( target, ranges );
}
}
Propchange:
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/StreamRequestMetadata.java
------------------------------------------------------------------------------
svn:eol-style = native
Copied:
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/StreamRequestVerbHandler.java
(from r888712,
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataVerbHandler.java)
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/StreamRequestVerbHandler.java?p2=incubator/cassandra/trunk/src/java/org/apache/cassandra/io/StreamRequestVerbHandler.java&p1=incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataVerbHandler.java&r1=888712&r2=888864&rev=888864&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataVerbHandler.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/StreamRequestVerbHandler.java
Wed Dec 9 16:36:31 2009
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.cassandra.dht;
+package org.apache.cassandra.io;
import java.io.File;
import java.io.IOException;
@@ -27,9 +27,6 @@
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.Table;
-import org.apache.cassandra.io.DataInputBuffer;
-import org.apache.cassandra.io.SSTableReader;
-import org.apache.cassandra.io.Streaming;
import java.net.InetAddress;
import org.apache.cassandra.net.IVerbHandler;
@@ -43,34 +40,31 @@
import org.apache.commons.lang.StringUtils;
/**
- * This verb handler handles the BootstrapMetadataMessage that is sent
- * by the leader to the nodes that are responsible for handing off data.
+ * This verb handler handles the StreamRequestMessage that is sent by
+ * the node requesting range transfer.
*/
-public class BootstrapMetadataVerbHandler implements IVerbHandler
+public class StreamRequestVerbHandler implements IVerbHandler
{
- private static Logger logger_ =
Logger.getLogger(BootstrapMetadataVerbHandler.class);
+ private static Logger logger_ =
Logger.getLogger(StreamRequestVerbHandler.class);
public void doVerb(Message message)
{
if (logger_.isDebugEnabled())
- logger_.debug("Received a BootstrapMetadataMessage from " +
message.getFrom());
-
- /* Cannot bootstrap another node if I'm in bootstrap mode myself! */
- assert !StorageService.instance().isBootstrapMode();
+ logger_.debug("Received a StreamRequestMessage from " +
message.getFrom());
byte[] body = message.getMessageBody();
DataInputBuffer bufIn = new DataInputBuffer();
bufIn.reset(body, body.length);
try
{
- BootstrapMetadataMessage bsMetadataMessage =
BootstrapMetadataMessage.serializer().deserialize(bufIn);
- BootstrapMetadata[] bsMetadata = bsMetadataMessage.bsMetadata_;
+ StreamRequestMessage streamRequestMessage =
StreamRequestMessage.serializer().deserialize(bufIn);
+ StreamRequestMetadata[] streamRequestMetadata =
streamRequestMessage.streamRequestMetadata_;
- for (BootstrapMetadata bsmd : bsMetadata)
+ for (StreamRequestMetadata srm : streamRequestMetadata)
{
if (logger_.isDebugEnabled())
- logger_.debug(bsmd.toString());
- Streaming.transferRanges(bsmd.target_, bsmd.ranges_, null);
+ logger_.debug(srm.toString());
+ Streaming.transferRanges(srm.target_, srm.ranges_, null);
}
}
catch (IOException ex)
Propchange:
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/StreamRequestVerbHandler.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Streaming.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Streaming.java?rev=888864&r1=888863&r2=888864&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Streaming.java
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Streaming.java
Wed Dec 9 16:36:31 2009
@@ -126,6 +126,16 @@
}
}
+ /**
+ * Request ranges to be transferred
+ */
+ public static void requestRanges(InetAddress source, Collection<Range>
ranges)
+ {
+ StreamRequestMetadata streamRequestMetadata = new
StreamRequestMetadata(FBUtilities.getLocalAddress(), ranges);
+ Message message = StreamRequestMessage.makeStreamRequestMessage(new
StreamRequestMessage(streamRequestMetadata));
+ MessagingService.instance().sendOneWay(message, source);
+ }
+
public static class StreamInitiateVerbHandler implements IVerbHandler
{
/*
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=888864&r1=888863&r2=888864&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
Wed Dec 9 16:36:31 2009
@@ -41,7 +41,6 @@
import org.apache.cassandra.locator.TokenMetadata;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.concurrent.StageManager;
@@ -83,14 +82,6 @@
}
};
- private static Comparator<Row> rowComparator = new Comparator<Row>()
- {
- public int compare(Row r1, Row r2)
- {
- return keyComparator.compare(r1.key, r2.key);
- }
- };
-
/**
* Use this method to have this RowMutation applied
* across all replicas. This method will take care
@@ -308,7 +299,6 @@
List<Row> rows = new ArrayList<Row>();
List<IAsyncResult> iars = new ArrayList<IAsyncResult>();
- int commandIndex = 0;
for (ReadCommand command: commands)
{
@@ -337,7 +327,6 @@
ReadResponse response =
ReadResponse.serializer().deserialize(bufIn);
if (response.row() != null)
rows.add(response.row());
- commandIndex++;
}
return rows;
}
@@ -412,7 +401,6 @@
for (ReadCommand command: commands)
{
- // TODO: throw a thrift exception if we do not have N nodes
assert !command.isDigestQuery();
ReadCommand readMessageDigestOnly = command.copy();
readMessageDigestOnly.setDigestQuery(true);
@@ -707,7 +695,6 @@
List<InetAddress> endpoints =
StorageService.instance().getLiveNaturalEndpoints(command.key);
/* Remove the local storage endpoint from the list. */
endpoints.remove(FBUtilities.getLocalAddress());
- // TODO: throw a thrift exception if we do not have N nodes
if (logger.isDebugEnabled())
logger.debug("weakreadlocal reading " + command);
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=888864&r1=888863&r2=888864&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
Wed Dec 9 16:36:31 2009
@@ -42,6 +42,7 @@
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.io.SSTableReader;
import org.apache.cassandra.io.Streaming;
+import org.apache.cassandra.io.StreamRequestVerbHandler;
import org.apache.log4j.Logger;
import org.apache.log4j.Level;
@@ -76,7 +77,7 @@
public final static String streamInitiateDoneVerbHandler_ =
"BOOTSTRAP-INITIATE-DONE-VERB-HANDLER";
public final static String streamFinishedVerbHandler_ =
"BOOTSTRAP-TERMINATE-VERB-HANDLER";
public final static String dataFileVerbHandler_ = "DATA-FILE-VERB-HANDLER";
- public final static String bootstrapMetadataVerbHandler_ =
"BS-METADATA-VERB-HANDLER";
+ public final static String streamRequestVerbHandler_ =
"BS-METADATA-VERB-HANDLER";
public final static String rangeVerbHandler_ = "RANGE-VERB-HANDLER";
public final static String rangeSliceVerbHandler_ =
"RANGE-SLICE-VERB-HANDLER";
public final static String bootstrapTokenVerbHandler_ =
"SPLITS-VERB-HANDLER";
@@ -216,7 +217,7 @@
MessagingService.instance().registerVerbHandlers(rangeSliceVerbHandler_, new
RangeSliceVerbHandler());
// see BootStrapper for a summary of how the bootstrap verbs interact
MessagingService.instance().registerVerbHandlers(bootstrapTokenVerbHandler_,
new BootStrapper.BootstrapTokenVerbHandler());
-
MessagingService.instance().registerVerbHandlers(bootstrapMetadataVerbHandler_,
new BootstrapMetadataVerbHandler() );
+
MessagingService.instance().registerVerbHandlers(streamRequestVerbHandler_, new
StreamRequestVerbHandler() );
MessagingService.instance().registerVerbHandlers(streamInitiateVerbHandler_,
new Streaming.StreamInitiateVerbHandler());
MessagingService.instance().registerVerbHandlers(streamInitiateDoneVerbHandler_,
new Streaming.StreamInitiateDoneVerbHandler());
MessagingService.instance().registerVerbHandlers(streamFinishedVerbHandler_,
new Streaming.StreamFinishedVerbHandler());