Modified: incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java?rev=915463&r1=915462&r2=915463&view=diff ============================================================================== --- incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java (original) +++ incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java Tue Feb 23 18:13:07 2010 @@ -1,145 +1,166 @@ -package org.apache.cassandra.streaming; - -import java.io.*; -import java.net.InetAddress; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; - -import org.apache.log4j.Logger; - -import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.db.ColumnFamilyStore; -import org.apache.cassandra.db.Table; -import org.apache.cassandra.streaming.StreamInitiateMessage; -import org.apache.cassandra.net.IVerbHandler; -import org.apache.cassandra.net.Message; -import org.apache.cassandra.net.MessagingService; -import org.apache.cassandra.streaming.StreamInManager; -import org.apache.cassandra.service.StorageService; -import org.apache.cassandra.utils.FBUtilities; - -public class StreamInitiateVerbHandler implements IVerbHandler -{ - private static Logger logger = Logger.getLogger(StreamInitiateVerbHandler.class); - - /* - * Here we handle the StreamInitiateMessage. Here we get the - * array of StreamContexts. We get file names for the column - * families associated with the files and replace them with the - * file names as obtained from the column family store on the - * receiving end. - */ - public void doVerb(Message message) - { - byte[] body = message.getMessageBody(); - ByteArrayInputStream bufIn = new ByteArrayInputStream(body); - if (logger.isDebugEnabled()) - logger.debug(String.format("StreamInitiateVerbeHandler.doVerb %s %s %s", message.getVerb(), message.getMessageId(), message.getMessageType())); - - try - { - StreamInitiateMessage biMsg = StreamInitiateMessage.serializer().deserialize(new DataInputStream(bufIn)); - PendingFile[] pendingFiles = biMsg.getStreamContext(); - - if (pendingFiles.length == 0) - { - if (logger.isDebugEnabled()) - logger.debug("no data needed from " + message.getFrom()); - if (StorageService.instance.isBootstrapMode()) - StorageService.instance.removeBootstrapSource(message.getFrom(), new String(message.getHeader(StreamOut.TABLE_NAME))); - return; - } - - Map<String, String> fileNames = getNewNames(pendingFiles); - Map<String, String> pathNames = new HashMap<String, String>(); - for (String ssName : fileNames.keySet()) - pathNames.put(ssName, DatabaseDescriptor.getNextAvailableDataLocation()); - /* - * For each of stream context's in the incoming message - * generate the new file names and store the new file names - * in the StreamContextManager. - */ - for (PendingFile pendingFile : pendingFiles) - { - CompletedFileStatus streamStatus = new CompletedFileStatus(pendingFile.getTargetFile(), pendingFile.getExpectedBytes() ); - String file = getNewFileNameFromOldContextAndNames(fileNames, pathNames, pendingFile); - - if (logger.isDebugEnabled()) - logger.debug("Received Data from : " + message.getFrom() + " " + pendingFile.getTargetFile() + " " + file); - pendingFile.setTargetFile(file); - addStreamContext(message.getFrom(), pendingFile, streamStatus); - } - - StreamInManager.registerStreamCompletionHandler(message.getFrom(), new StreamCompletionHandler()); - if (logger.isDebugEnabled()) - logger.debug("Sending a stream initiate done message ..."); - Message doneMessage = new Message(FBUtilities.getLocalAddress(), "", StorageService.Verb.STREAM_INITIATE_DONE, new byte[0] ); - MessagingService.instance.sendOneWay(doneMessage, message.getFrom()); - } - catch (IOException ex) - { - throw new IOError(ex); - } - } - - public String getNewFileNameFromOldContextAndNames(Map<String, String> fileNames, - Map<String, String> pathNames, - PendingFile pendingFile) - { - File sourceFile = new File( pendingFile.getTargetFile() ); - String[] piece = FBUtilities.strip(sourceFile.getName(), "-"); - String cfName = piece[0]; - String ssTableNum = piece[1]; - String typeOfFile = piece[2]; - - String newFileNameExpanded = fileNames.get(pendingFile.getTable() + "-" + cfName + "-" + ssTableNum); - String path = pathNames.get(pendingFile.getTable() + "-" + cfName + "-" + ssTableNum); - //Drop type (Data.db) from new FileName - String newFileName = newFileNameExpanded.replace("Data.db", typeOfFile); - return path + File.separator + pendingFile.getTable() + File.separator + newFileName; - } - - // todo: this method needs to be private, or package at the very least for easy unit testing. - public Map<String, String> getNewNames(PendingFile[] pendingFiles) throws IOException - { - /* - * Mapping for each file with unique CF-i ---> new file name. For eg. - * for a file with name <CF>-<i>-Data.db there is a corresponding - * <CF>-<i>-Index.db. We maintain a mapping from <CF>-<i> to a newly - * generated file name. - */ - Map<String, String> fileNames = new HashMap<String, String>(); - /* Get the distinct entries from StreamContexts i.e have one entry per Data/Index/Filter file set */ - Set<String> distinctEntries = new HashSet<String>(); - for ( PendingFile pendingFile : pendingFiles) - { - String[] pieces = FBUtilities.strip(new File(pendingFile.getTargetFile()).getName(), "-"); - distinctEntries.add(pendingFile.getTable() + "-" + pieces[0] + "-" + pieces[1] ); - } - - /* Generate unique file names per entry */ - for ( String distinctEntry : distinctEntries ) - { - String tableName; - String[] pieces = FBUtilities.strip(distinctEntry, "-"); - tableName = pieces[0]; - Table table = Table.open( tableName ); - - ColumnFamilyStore cfStore = table.getColumnFamilyStore(pieces[1]); - if (logger.isDebugEnabled()) - logger.debug("Generating file name for " + distinctEntry + " ..."); - fileNames.put(distinctEntry, cfStore.getTempSSTableFileName()); - } - - return fileNames; - } - - private void addStreamContext(InetAddress host, PendingFile pendingFile, CompletedFileStatus streamStatus) - { - if (logger.isDebugEnabled()) - logger.debug("Adding stream context " + pendingFile + " for " + host + " ..."); - StreamInManager.addStreamContext(host, pendingFile, streamStatus); - } -} +package org.apache.cassandra.streaming; +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +import java.io.*; +import java.net.InetAddress; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.log4j.Logger; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Table; +import org.apache.cassandra.streaming.StreamInitiateMessage; +import org.apache.cassandra.net.IVerbHandler; +import org.apache.cassandra.net.Message; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.streaming.StreamInManager; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.utils.FBUtilities; + +public class StreamInitiateVerbHandler implements IVerbHandler +{ + private static Logger logger = Logger.getLogger(StreamInitiateVerbHandler.class); + + /* + * Here we handle the StreamInitiateMessage. Here we get the + * array of StreamContexts. We get file names for the column + * families associated with the files and replace them with the + * file names as obtained from the column family store on the + * receiving end. + */ + public void doVerb(Message message) + { + byte[] body = message.getMessageBody(); + ByteArrayInputStream bufIn = new ByteArrayInputStream(body); + if (logger.isDebugEnabled()) + logger.debug(String.format("StreamInitiateVerbeHandler.doVerb %s %s %s", message.getVerb(), message.getMessageId(), message.getMessageType())); + + try + { + StreamInitiateMessage biMsg = StreamInitiateMessage.serializer().deserialize(new DataInputStream(bufIn)); + PendingFile[] pendingFiles = biMsg.getStreamContext(); + + if (pendingFiles.length == 0) + { + if (logger.isDebugEnabled()) + logger.debug("no data needed from " + message.getFrom()); + if (StorageService.instance.isBootstrapMode()) + StorageService.instance.removeBootstrapSource(message.getFrom(), new String(message.getHeader(StreamOut.TABLE_NAME))); + return; + } + + Map<String, String> fileNames = getNewNames(pendingFiles); + Map<String, String> pathNames = new HashMap<String, String>(); + for (String ssName : fileNames.keySet()) + pathNames.put(ssName, DatabaseDescriptor.getNextAvailableDataLocation()); + /* + * For each of stream context's in the incoming message + * generate the new file names and store the new file names + * in the StreamContextManager. + */ + for (PendingFile pendingFile : pendingFiles) + { + CompletedFileStatus streamStatus = new CompletedFileStatus(pendingFile.getTargetFile(), pendingFile.getExpectedBytes() ); + String file = getNewFileNameFromOldContextAndNames(fileNames, pathNames, pendingFile); + + if (logger.isDebugEnabled()) + logger.debug("Received Data from : " + message.getFrom() + " " + pendingFile.getTargetFile() + " " + file); + pendingFile.setTargetFile(file); + addStreamContext(message.getFrom(), pendingFile, streamStatus); + } + + StreamInManager.registerStreamCompletionHandler(message.getFrom(), new StreamCompletionHandler()); + if (logger.isDebugEnabled()) + logger.debug("Sending a stream initiate done message ..."); + Message doneMessage = new Message(FBUtilities.getLocalAddress(), "", StorageService.Verb.STREAM_INITIATE_DONE, new byte[0] ); + MessagingService.instance.sendOneWay(doneMessage, message.getFrom()); + } + catch (IOException ex) + { + throw new IOError(ex); + } + } + + public String getNewFileNameFromOldContextAndNames(Map<String, String> fileNames, + Map<String, String> pathNames, + PendingFile pendingFile) + { + File sourceFile = new File( pendingFile.getTargetFile() ); + String[] piece = FBUtilities.strip(sourceFile.getName(), "-"); + String cfName = piece[0]; + String ssTableNum = piece[1]; + String typeOfFile = piece[2]; + + String newFileNameExpanded = fileNames.get(pendingFile.getTable() + "-" + cfName + "-" + ssTableNum); + String path = pathNames.get(pendingFile.getTable() + "-" + cfName + "-" + ssTableNum); + //Drop type (Data.db) from new FileName + String newFileName = newFileNameExpanded.replace("Data.db", typeOfFile); + return path + File.separator + pendingFile.getTable() + File.separator + newFileName; + } + + // todo: this method needs to be private, or package at the very least for easy unit testing. + public Map<String, String> getNewNames(PendingFile[] pendingFiles) throws IOException + { + /* + * Mapping for each file with unique CF-i ---> new file name. For eg. + * for a file with name <CF>-<i>-Data.db there is a corresponding + * <CF>-<i>-Index.db. We maintain a mapping from <CF>-<i> to a newly + * generated file name. + */ + Map<String, String> fileNames = new HashMap<String, String>(); + /* Get the distinct entries from StreamContexts i.e have one entry per Data/Index/Filter file set */ + Set<String> distinctEntries = new HashSet<String>(); + for ( PendingFile pendingFile : pendingFiles) + { + String[] pieces = FBUtilities.strip(new File(pendingFile.getTargetFile()).getName(), "-"); + distinctEntries.add(pendingFile.getTable() + "-" + pieces[0] + "-" + pieces[1] ); + } + + /* Generate unique file names per entry */ + for ( String distinctEntry : distinctEntries ) + { + String tableName; + String[] pieces = FBUtilities.strip(distinctEntry, "-"); + tableName = pieces[0]; + Table table = Table.open( tableName ); + + ColumnFamilyStore cfStore = table.getColumnFamilyStore(pieces[1]); + if (logger.isDebugEnabled()) + logger.debug("Generating file name for " + distinctEntry + " ..."); + fileNames.put(distinctEntry, cfStore.getTempSSTableFileName()); + } + + return fileNames; + } + + private void addStreamContext(InetAddress host, PendingFile pendingFile, CompletedFileStatus streamStatus) + { + if (logger.isDebugEnabled()) + logger.debug("Adding stream context " + pendingFile + " for " + host + " ..."); + StreamInManager.addStreamContext(host, pendingFile, streamStatus); + } +}
Modified: incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java?rev=915463&r1=915462&r2=915463&view=diff ============================================================================== --- incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java (original) +++ incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java Tue Feb 23 18:13:07 2010 @@ -1,76 +1,97 @@ -package org.apache.cassandra.streaming; - -import java.io.*; - -import org.apache.cassandra.concurrent.StageManager; -import org.apache.cassandra.io.ICompactSerializer; -import org.apache.cassandra.net.Message; -import org.apache.cassandra.service.StorageService; -import org.apache.cassandra.utils.FBUtilities; - -/** -* This class encapsulates the message that needs to be sent to nodes -* that handoff data. The message contains information about ranges -* that need to be transferred and the target node. -*/ -class StreamRequestMessage -{ - private static ICompactSerializer<StreamRequestMessage> serializer_; - static - { - serializer_ = new StreamRequestMessageSerializer(); - } - - protected static ICompactSerializer<StreamRequestMessage> serializer() - { - return serializer_; - } - - protected static Message makeStreamRequestMessage(StreamRequestMessage streamRequestMessage) - { - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - DataOutputStream dos = new DataOutputStream(bos); - try - { - StreamRequestMessage.serializer().serialize(streamRequestMessage, dos); - } - catch (IOException e) - { - throw new IOError(e); - } - return new Message(FBUtilities.getLocalAddress(), StageManager.STREAM_STAGE, StorageService.Verb.STREAM_REQUEST, bos.toByteArray() ); - } - - protected StreamRequestMetadata[] streamRequestMetadata_ = new StreamRequestMetadata[0]; - - // TODO only actually ever need one BM, not an array - StreamRequestMessage(StreamRequestMetadata... streamRequestMetadata) - { - assert streamRequestMetadata != null; - streamRequestMetadata_ = streamRequestMetadata; - } - - private static class StreamRequestMessageSerializer implements ICompactSerializer<StreamRequestMessage> - { - public void serialize(StreamRequestMessage streamRequestMessage, DataOutputStream dos) throws IOException - { - StreamRequestMetadata[] streamRequestMetadata = streamRequestMessage.streamRequestMetadata_; - dos.writeInt(streamRequestMetadata.length); - for (StreamRequestMetadata bsmd : streamRequestMetadata) - { - StreamRequestMetadata.serializer().serialize(bsmd, dos); - } - } - - public StreamRequestMessage deserialize(DataInputStream dis) throws IOException - { - int size = dis.readInt(); - StreamRequestMetadata[] streamRequestMetadata = new StreamRequestMetadata[size]; - for (int i = 0; i < size; ++i) - { - streamRequestMetadata[i] = StreamRequestMetadata.serializer().deserialize(dis); - } - return new StreamRequestMessage(streamRequestMetadata); - } - } -} +package org.apache.cassandra.streaming; +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +import java.io.*; + +import org.apache.cassandra.concurrent.StageManager; +import org.apache.cassandra.io.ICompactSerializer; +import org.apache.cassandra.net.Message; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.utils.FBUtilities; + +/** +* This class encapsulates the message that needs to be sent to nodes +* that handoff data. The message contains information about ranges +* that need to be transferred and the target node. +*/ +class StreamRequestMessage +{ + private static ICompactSerializer<StreamRequestMessage> serializer_; + static + { + serializer_ = new StreamRequestMessageSerializer(); + } + + protected static ICompactSerializer<StreamRequestMessage> serializer() + { + return serializer_; + } + + protected static Message makeStreamRequestMessage(StreamRequestMessage streamRequestMessage) + { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(bos); + try + { + StreamRequestMessage.serializer().serialize(streamRequestMessage, dos); + } + catch (IOException e) + { + throw new IOError(e); + } + return new Message(FBUtilities.getLocalAddress(), StageManager.STREAM_STAGE, StorageService.Verb.STREAM_REQUEST, bos.toByteArray() ); + } + + protected StreamRequestMetadata[] streamRequestMetadata_ = new StreamRequestMetadata[0]; + + // TODO only actually ever need one BM, not an array + StreamRequestMessage(StreamRequestMetadata... streamRequestMetadata) + { + assert streamRequestMetadata != null; + streamRequestMetadata_ = streamRequestMetadata; + } + + private static class StreamRequestMessageSerializer implements ICompactSerializer<StreamRequestMessage> + { + public void serialize(StreamRequestMessage streamRequestMessage, DataOutputStream dos) throws IOException + { + StreamRequestMetadata[] streamRequestMetadata = streamRequestMessage.streamRequestMetadata_; + dos.writeInt(streamRequestMetadata.length); + for (StreamRequestMetadata bsmd : streamRequestMetadata) + { + StreamRequestMetadata.serializer().serialize(bsmd, dos); + } + } + + public StreamRequestMessage deserialize(DataInputStream dis) throws IOException + { + int size = dis.readInt(); + StreamRequestMetadata[] streamRequestMetadata = new StreamRequestMetadata[size]; + for (int i = 0; i < size; ++i) + { + streamRequestMetadata[i] = StreamRequestMetadata.serializer().deserialize(dis); + } + return new StreamRequestMessage(streamRequestMetadata); + } + } +} Modified: incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamRequestMetadata.java URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamRequestMetadata.java?rev=915463&r1=915462&r2=915463&view=diff ============================================================================== --- incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamRequestMetadata.java (original) +++ incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamRequestMetadata.java Tue Feb 23 18:13:07 2010 @@ -1,4 +1,25 @@ package org.apache.cassandra.streaming; +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + import java.io.DataInputStream; import java.io.DataOutputStream; Modified: incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/thrift/ThriftGlue.java URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/thrift/ThriftGlue.java?rev=915463&r1=915462&r2=915463&view=diff ============================================================================== --- incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/thrift/ThriftGlue.java (original) +++ incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/thrift/ThriftGlue.java Tue Feb 23 18:13:07 2010 @@ -1,4 +1,25 @@ package org.apache.cassandra.thrift; +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + import java.util.List; Modified: incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/tools/NodeCmd.java URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/tools/NodeCmd.java?rev=915463&r1=915462&r2=915463&view=diff ============================================================================== --- incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/tools/NodeCmd.java (original) +++ incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/tools/NodeCmd.java Tue Feb 23 18:13:07 2010 @@ -1,4 +1,25 @@ package org.apache.cassandra.tools; +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + import java.io.IOException; import java.io.PrintStream; Modified: incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/utils/LatencyTracker.java URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/utils/LatencyTracker.java?rev=915463&r1=915462&r2=915463&view=diff ============================================================================== --- incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/utils/LatencyTracker.java (original) +++ incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/utils/LatencyTracker.java Tue Feb 23 18:13:07 2010 @@ -1,4 +1,25 @@ package org.apache.cassandra.utils; +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + import java.util.concurrent.atomic.AtomicLong; Modified: incubator/cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/dht/BoundsTest.java URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/dht/BoundsTest.java?rev=915463&r1=915462&r2=915463&view=diff ============================================================================== --- incubator/cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/dht/BoundsTest.java (original) +++ incubator/cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/dht/BoundsTest.java Tue Feb 23 18:13:07 2010 @@ -1,73 +1,94 @@ -package org.apache.cassandra.dht; - -import java.util.*; - -import junit.framework.TestCase; -import org.apache.cassandra.utils.FBUtilities; - -public class BoundsTest extends TestCase -{ - public void testRestrictTo() throws Exception - { - IPartitioner p = new OrderPreservingPartitioner(); - Token min = p.getMinimumToken(); - Range wraps = new Range(new StringToken("m"), new StringToken("e")); - Range normal = new Range(wraps.right, wraps.left); - Bounds all = new Bounds(min, min, p); - Bounds almostAll = new Bounds(new StringToken("a"), min, p); - - Set<AbstractBounds> S; - Set<AbstractBounds> S2; - - S = all.restrictTo(wraps); - assert S.equals(new HashSet<AbstractBounds>(Arrays.asList(wraps))); - - S = almostAll.restrictTo(wraps); - S2 = new HashSet<AbstractBounds>(Arrays.asList(new Bounds(new StringToken("a"), new StringToken("e"), p), - new Range(new StringToken("m"), min))); - assert S.equals(S2); - - S = all.restrictTo(normal); - assert S.equals(new HashSet<AbstractBounds>(Arrays.asList(normal))); - } - - public void testNoIntersectionWrapped() - { - IPartitioner p = new OrderPreservingPartitioner(); - Range node = new Range(new StringToken("z"), new StringToken("a")); - Bounds bounds; - - bounds = new Bounds(new StringToken("m"), new StringToken("n"), p); - assert bounds.restrictTo(node).equals(Collections.<AbstractBounds>emptySet()); - - bounds = new Bounds(new StringToken("b"), node.left, p); - assert bounds.restrictTo(node).equals(Collections.<AbstractBounds>emptySet()); - } - - public void testSmallBoundsFullRange() - { - IPartitioner p = new OrderPreservingPartitioner(); - Range node; - Bounds bounds = new Bounds(new StringToken("b"), new StringToken("c"), p); - - node = new Range(new StringToken("d"), new StringToken("d")); - assert bounds.restrictTo(node).equals(new HashSet(Arrays.asList(bounds))); - } - - public void testNoIntersectionUnwrapped() - { - IPartitioner p = new OrderPreservingPartitioner(); - Token min = p.getMinimumToken(); - Range node = new Range(new StringToken("m"), new StringToken("n")); - Bounds bounds; - - bounds = new Bounds(new StringToken("z"), min, p); - assert bounds.restrictTo(node).equals(Collections.<AbstractBounds>emptySet()); - - bounds = new Bounds(new StringToken("a"), node.left, p); - assert bounds.restrictTo(node).equals(Collections.<AbstractBounds>emptySet()); - - bounds = new Bounds(min, new StringToken("b"), p); - assert bounds.restrictTo(node).equals(Collections.<AbstractBounds>emptySet()); - } -} +package org.apache.cassandra.dht; +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +import java.util.*; + +import junit.framework.TestCase; +import org.apache.cassandra.utils.FBUtilities; + +public class BoundsTest extends TestCase +{ + public void testRestrictTo() throws Exception + { + IPartitioner p = new OrderPreservingPartitioner(); + Token min = p.getMinimumToken(); + Range wraps = new Range(new StringToken("m"), new StringToken("e")); + Range normal = new Range(wraps.right, wraps.left); + Bounds all = new Bounds(min, min, p); + Bounds almostAll = new Bounds(new StringToken("a"), min, p); + + Set<AbstractBounds> S; + Set<AbstractBounds> S2; + + S = all.restrictTo(wraps); + assert S.equals(new HashSet<AbstractBounds>(Arrays.asList(wraps))); + + S = almostAll.restrictTo(wraps); + S2 = new HashSet<AbstractBounds>(Arrays.asList(new Bounds(new StringToken("a"), new StringToken("e"), p), + new Range(new StringToken("m"), min))); + assert S.equals(S2); + + S = all.restrictTo(normal); + assert S.equals(new HashSet<AbstractBounds>(Arrays.asList(normal))); + } + + public void testNoIntersectionWrapped() + { + IPartitioner p = new OrderPreservingPartitioner(); + Range node = new Range(new StringToken("z"), new StringToken("a")); + Bounds bounds; + + bounds = new Bounds(new StringToken("m"), new StringToken("n"), p); + assert bounds.restrictTo(node).equals(Collections.<AbstractBounds>emptySet()); + + bounds = new Bounds(new StringToken("b"), node.left, p); + assert bounds.restrictTo(node).equals(Collections.<AbstractBounds>emptySet()); + } + + public void testSmallBoundsFullRange() + { + IPartitioner p = new OrderPreservingPartitioner(); + Range node; + Bounds bounds = new Bounds(new StringToken("b"), new StringToken("c"), p); + + node = new Range(new StringToken("d"), new StringToken("d")); + assert bounds.restrictTo(node).equals(new HashSet(Arrays.asList(bounds))); + } + + public void testNoIntersectionUnwrapped() + { + IPartitioner p = new OrderPreservingPartitioner(); + Token min = p.getMinimumToken(); + Range node = new Range(new StringToken("m"), new StringToken("n")); + Bounds bounds; + + bounds = new Bounds(new StringToken("z"), min, p); + assert bounds.restrictTo(node).equals(Collections.<AbstractBounds>emptySet()); + + bounds = new Bounds(new StringToken("a"), node.left, p); + assert bounds.restrictTo(node).equals(Collections.<AbstractBounds>emptySet()); + + bounds = new Bounds(min, new StringToken("b"), p); + assert bounds.restrictTo(node).equals(Collections.<AbstractBounds>emptySet()); + } +} Modified: incubator/cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/dht/RangeIntersectionTest.java URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/dht/RangeIntersectionTest.java?rev=915463&r1=915462&r2=915463&view=diff ============================================================================== --- incubator/cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/dht/RangeIntersectionTest.java (original) +++ incubator/cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/dht/RangeIntersectionTest.java Tue Feb 23 18:13:07 2010 @@ -1,131 +1,152 @@ -package org.apache.cassandra.dht; - -import java.util.Arrays; -import java.util.List; -import java.util.Set; - -import org.apache.commons.lang.StringUtils; -import org.junit.Test; - -public class RangeIntersectionTest -{ - static void assertIntersection(Range one, Range two, Range ... ranges) - { - Set<Range> correct = Range.rangeSet(ranges); - Set<Range> result1 = one.intersectionWith(two); - assert result1.equals(correct) : String.format("%s != %s", - StringUtils.join(result1, ","), - StringUtils.join(correct, ",")); - Set<Range> result2 = two.intersectionWith(one); - assert result2.equals(correct) : String.format("%s != %s", - StringUtils.join(result2, ","), - StringUtils.join(correct, ",")); - } - - private void assertNoIntersection(Range wraps1, Range nowrap3) - { - assertIntersection(wraps1, nowrap3); - } - - @Test - public void testIntersectionWithAll() - { - Range all0 = new Range(new BigIntegerToken("0"), new BigIntegerToken("0")); - Range all10 = new Range(new BigIntegerToken("10"), new BigIntegerToken("10")); - Range all100 = new Range(new BigIntegerToken("100"), new BigIntegerToken("100")); - Range all1000 = new Range(new BigIntegerToken("1000"), new BigIntegerToken("1000")); - Range wraps = new Range(new BigIntegerToken("100"), new BigIntegerToken("10")); - - assertIntersection(all0, wraps, wraps); - assertIntersection(all10, wraps, wraps); - assertIntersection(all100, wraps, wraps); - assertIntersection(all1000, wraps, wraps); - } - - @Test - public void testIntersectionContains() - { - Range wraps1 = new Range(new BigIntegerToken("100"), new BigIntegerToken("10")); - Range wraps2 = new Range(new BigIntegerToken("90"), new BigIntegerToken("20")); - Range wraps3 = new Range(new BigIntegerToken("90"), new BigIntegerToken("0")); - Range nowrap1 = new Range(new BigIntegerToken("100"), new BigIntegerToken("110")); - Range nowrap2 = new Range(new BigIntegerToken("0"), new BigIntegerToken("10")); - Range nowrap3 = new Range(new BigIntegerToken("0"), new BigIntegerToken("9")); - - assertIntersection(wraps1, wraps2, wraps1); - assertIntersection(wraps3, wraps2, wraps3); - - assertIntersection(wraps1, nowrap1, nowrap1); - assertIntersection(wraps1, nowrap2, nowrap2); - assertIntersection(nowrap2, nowrap3, nowrap3); - - assertIntersection(wraps1, wraps1, wraps1); - assertIntersection(nowrap1, nowrap1, nowrap1); - assertIntersection(nowrap2, nowrap2, nowrap2); - assertIntersection(wraps3, wraps3, wraps3); - } - - @Test - public void testNoIntersection() - { - Range wraps1 = new Range(new BigIntegerToken("100"), new BigIntegerToken("10")); - Range wraps2 = new Range(new BigIntegerToken("100"), new BigIntegerToken("0")); - Range nowrap1 = new Range(new BigIntegerToken("0"), new BigIntegerToken("100")); - Range nowrap2 = new Range(new BigIntegerToken("100"), new BigIntegerToken("200")); - Range nowrap3 = new Range(new BigIntegerToken("10"), new BigIntegerToken("100")); - - assertNoIntersection(wraps1, nowrap3); - assertNoIntersection(wraps2, nowrap1); - assertNoIntersection(nowrap1, nowrap2); - } - - @Test - public void testIntersectionOneWraps() - { - Range wraps1 = new Range(new BigIntegerToken("100"), new BigIntegerToken("10")); - Range wraps2 = new Range(new BigIntegerToken("100"), new BigIntegerToken("0")); - Range nowrap1 = new Range(new BigIntegerToken("0"), new BigIntegerToken("200")); - Range nowrap2 = new Range(new BigIntegerToken("0"), new BigIntegerToken("100")); - - assertIntersection(wraps1, - nowrap1, - new Range(new BigIntegerToken("0"), new BigIntegerToken("10")), - new Range(new BigIntegerToken("100"), new BigIntegerToken("200"))); - assertIntersection(wraps2, - nowrap1, - new Range(new BigIntegerToken("100"), new BigIntegerToken("200"))); - assertIntersection(wraps1, - nowrap2, - new Range(new BigIntegerToken("0"), new BigIntegerToken("10"))); - } - - @Test - public void testIntersectionTwoWraps() - { - Range wraps1 = new Range(new BigIntegerToken("100"), new BigIntegerToken("20")); - Range wraps2 = new Range(new BigIntegerToken("120"), new BigIntegerToken("90")); - Range wraps3 = new Range(new BigIntegerToken("120"), new BigIntegerToken("110")); - Range wraps4 = new Range(new BigIntegerToken("10"), new BigIntegerToken("0")); - Range wraps5 = new Range(new BigIntegerToken("10"), new BigIntegerToken("1")); - Range wraps6 = new Range(new BigIntegerToken("30"), new BigIntegerToken("10")); - - assertIntersection(wraps1, - wraps2, - new Range(new BigIntegerToken("120"), new BigIntegerToken("20"))); - assertIntersection(wraps1, - wraps3, - new Range(new BigIntegerToken("120"), new BigIntegerToken("20")), - new Range(new BigIntegerToken("100"), new BigIntegerToken("110"))); - assertIntersection(wraps1, - wraps4, - new Range(new BigIntegerToken("10"), new BigIntegerToken("20")), - new Range(new BigIntegerToken("100"), new BigIntegerToken("0"))); - assertIntersection(wraps1, - wraps5, - new Range(new BigIntegerToken("10"), new BigIntegerToken("20")), - new Range(new BigIntegerToken("100"), new BigIntegerToken("1"))); - assertIntersection(wraps1, - wraps6, - new Range(new BigIntegerToken("100"), new BigIntegerToken("10"))); - } -} +package org.apache.cassandra.dht; +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +import java.util.Arrays; +import java.util.List; +import java.util.Set; + +import org.apache.commons.lang.StringUtils; +import org.junit.Test; + +public class RangeIntersectionTest +{ + static void assertIntersection(Range one, Range two, Range ... ranges) + { + Set<Range> correct = Range.rangeSet(ranges); + Set<Range> result1 = one.intersectionWith(two); + assert result1.equals(correct) : String.format("%s != %s", + StringUtils.join(result1, ","), + StringUtils.join(correct, ",")); + Set<Range> result2 = two.intersectionWith(one); + assert result2.equals(correct) : String.format("%s != %s", + StringUtils.join(result2, ","), + StringUtils.join(correct, ",")); + } + + private void assertNoIntersection(Range wraps1, Range nowrap3) + { + assertIntersection(wraps1, nowrap3); + } + + @Test + public void testIntersectionWithAll() + { + Range all0 = new Range(new BigIntegerToken("0"), new BigIntegerToken("0")); + Range all10 = new Range(new BigIntegerToken("10"), new BigIntegerToken("10")); + Range all100 = new Range(new BigIntegerToken("100"), new BigIntegerToken("100")); + Range all1000 = new Range(new BigIntegerToken("1000"), new BigIntegerToken("1000")); + Range wraps = new Range(new BigIntegerToken("100"), new BigIntegerToken("10")); + + assertIntersection(all0, wraps, wraps); + assertIntersection(all10, wraps, wraps); + assertIntersection(all100, wraps, wraps); + assertIntersection(all1000, wraps, wraps); + } + + @Test + public void testIntersectionContains() + { + Range wraps1 = new Range(new BigIntegerToken("100"), new BigIntegerToken("10")); + Range wraps2 = new Range(new BigIntegerToken("90"), new BigIntegerToken("20")); + Range wraps3 = new Range(new BigIntegerToken("90"), new BigIntegerToken("0")); + Range nowrap1 = new Range(new BigIntegerToken("100"), new BigIntegerToken("110")); + Range nowrap2 = new Range(new BigIntegerToken("0"), new BigIntegerToken("10")); + Range nowrap3 = new Range(new BigIntegerToken("0"), new BigIntegerToken("9")); + + assertIntersection(wraps1, wraps2, wraps1); + assertIntersection(wraps3, wraps2, wraps3); + + assertIntersection(wraps1, nowrap1, nowrap1); + assertIntersection(wraps1, nowrap2, nowrap2); + assertIntersection(nowrap2, nowrap3, nowrap3); + + assertIntersection(wraps1, wraps1, wraps1); + assertIntersection(nowrap1, nowrap1, nowrap1); + assertIntersection(nowrap2, nowrap2, nowrap2); + assertIntersection(wraps3, wraps3, wraps3); + } + + @Test + public void testNoIntersection() + { + Range wraps1 = new Range(new BigIntegerToken("100"), new BigIntegerToken("10")); + Range wraps2 = new Range(new BigIntegerToken("100"), new BigIntegerToken("0")); + Range nowrap1 = new Range(new BigIntegerToken("0"), new BigIntegerToken("100")); + Range nowrap2 = new Range(new BigIntegerToken("100"), new BigIntegerToken("200")); + Range nowrap3 = new Range(new BigIntegerToken("10"), new BigIntegerToken("100")); + + assertNoIntersection(wraps1, nowrap3); + assertNoIntersection(wraps2, nowrap1); + assertNoIntersection(nowrap1, nowrap2); + } + + @Test + public void testIntersectionOneWraps() + { + Range wraps1 = new Range(new BigIntegerToken("100"), new BigIntegerToken("10")); + Range wraps2 = new Range(new BigIntegerToken("100"), new BigIntegerToken("0")); + Range nowrap1 = new Range(new BigIntegerToken("0"), new BigIntegerToken("200")); + Range nowrap2 = new Range(new BigIntegerToken("0"), new BigIntegerToken("100")); + + assertIntersection(wraps1, + nowrap1, + new Range(new BigIntegerToken("0"), new BigIntegerToken("10")), + new Range(new BigIntegerToken("100"), new BigIntegerToken("200"))); + assertIntersection(wraps2, + nowrap1, + new Range(new BigIntegerToken("100"), new BigIntegerToken("200"))); + assertIntersection(wraps1, + nowrap2, + new Range(new BigIntegerToken("0"), new BigIntegerToken("10"))); + } + + @Test + public void testIntersectionTwoWraps() + { + Range wraps1 = new Range(new BigIntegerToken("100"), new BigIntegerToken("20")); + Range wraps2 = new Range(new BigIntegerToken("120"), new BigIntegerToken("90")); + Range wraps3 = new Range(new BigIntegerToken("120"), new BigIntegerToken("110")); + Range wraps4 = new Range(new BigIntegerToken("10"), new BigIntegerToken("0")); + Range wraps5 = new Range(new BigIntegerToken("10"), new BigIntegerToken("1")); + Range wraps6 = new Range(new BigIntegerToken("30"), new BigIntegerToken("10")); + + assertIntersection(wraps1, + wraps2, + new Range(new BigIntegerToken("120"), new BigIntegerToken("20"))); + assertIntersection(wraps1, + wraps3, + new Range(new BigIntegerToken("120"), new BigIntegerToken("20")), + new Range(new BigIntegerToken("100"), new BigIntegerToken("110"))); + assertIntersection(wraps1, + wraps4, + new Range(new BigIntegerToken("10"), new BigIntegerToken("20")), + new Range(new BigIntegerToken("100"), new BigIntegerToken("0"))); + assertIntersection(wraps1, + wraps5, + new Range(new BigIntegerToken("10"), new BigIntegerToken("20")), + new Range(new BigIntegerToken("100"), new BigIntegerToken("1"))); + assertIntersection(wraps1, + wraps6, + new Range(new BigIntegerToken("100"), new BigIntegerToken("10"))); + } +}
