Author: jbellis
Date: Tue Dec 29 06:13:04 2009
New Revision: 894312
URL: http://svn.apache.org/viewvc?rev=894312&view=rev
Log:
replace DataInputBuffer with ByteArrayInputStream/DataInputStream. patch by
Todd Blose; reviewed by jbellis for CASSANDRA-656
Removed:
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/DataInputBuffer.java
incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/DataInputBufferTest.java
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryVerbHandler.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLogHeader.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeCommand.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeReply.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceReply.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IndexHelper.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/StreamRequestVerbHandler.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Streaming.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/FastSerializer.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyTest.java
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ReadMessageTest.java
incubator/cassandra/trunk/test/unit/org/apache/cassandra/gms/GossipDigestTest.java
incubator/cassandra/trunk/test/unit/org/apache/cassandra/utils/FilterTest.java
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryVerbHandler.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryVerbHandler.java?rev=894312&r1=894311&r2=894312&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryVerbHandler.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryVerbHandler.java
Tue Dec 29 06:13:04 2009
@@ -18,9 +18,11 @@
package org.apache.cassandra.db;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.Message;
-import org.apache.cassandra.io.DataInputBuffer;
import org.apache.log4j.Logger;
@@ -31,12 +33,11 @@
public void doVerb(Message message)
{
byte[] bytes = message.getMessageBody();
- DataInputBuffer buffer = new DataInputBuffer();
- buffer.reset(bytes, bytes.length);
+ ByteArrayInputStream buffer = new ByteArrayInputStream(bytes);
try
{
- RowMutationMessage rmMsg =
RowMutationMessage.serializer().deserialize(buffer);
+ RowMutationMessage rmMsg =
RowMutationMessage.serializer().deserialize(new DataInputStream(buffer));
RowMutation rm = rmMsg.getRowMutation();
rm.applyBinary();
}
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java?rev=894312&r1=894311&r2=894312&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
Tue Dec 29 06:13:04 2009
@@ -20,7 +20,6 @@
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.io.BufferedRandomAccessFile;
-import org.apache.cassandra.io.DataInputBuffer;
import org.apache.cassandra.io.DataOutputBuffer;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.FileUtils;
@@ -285,8 +284,6 @@
Set<Table> tablesRecovered = new HashSet<Table>();
assert
StageManager.getStage(StageManager.mutationStage_).getCompletedTasks() == 0;
int rows = 0;
-
- DataInputBuffer bufIn = new DataInputBuffer();
for (File file : clogs)
{
int bufferSize = (int)Math.min(file.length(), 32 * 1024 * 1024);
@@ -320,7 +317,8 @@
// last CL entry didn't get completely written. that's ok.
break;
}
- bufIn.reset(bytes, bytes.length);
+
+ ByteArrayInputStream bufIn = new ByteArrayInputStream(bytes);
Checksum checksum = new CRC32();
checksum.update(bytes, 0, bytes.length);
if (claimedCRC32 != checksum.getValue())
@@ -331,7 +329,7 @@
}
/* deserialize the commit log entry */
- final RowMutation rm =
RowMutation.serializer().deserialize(bufIn);
+ final RowMutation rm =
RowMutation.serializer().deserialize(new DataInputStream(bufIn));
if (logger_.isDebugEnabled())
logger_.debug(String.format("replaying mutation for %s.%s:
%s",
rm.getTable(),
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLogHeader.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLogHeader.java?rev=894312&r1=894311&r2=894312&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLogHeader.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLogHeader.java
Tue Dec 29 06:13:04 2009
@@ -23,9 +23,7 @@
import java.util.Arrays;
import org.apache.cassandra.io.ICompactSerializer;
-import org.apache.cassandra.io.DataInputBuffer;
import org.apache.cassandra.utils.BitSetSerializer;
-import org.apache.cassandra.config.DatabaseDescriptor;
class CommitLogHeader
{
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeCommand.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeCommand.java?rev=894312&r1=894311&r2=894312&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeCommand.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeCommand.java
Tue Dec 29 06:13:04 2009
@@ -18,6 +18,7 @@
*/
package org.apache.cassandra.db;
+import java.io.ByteArrayInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.DataInputStream;
@@ -25,7 +26,6 @@
import org.apache.cassandra.io.ICompactSerializer;
import org.apache.cassandra.io.DataOutputBuffer;
-import org.apache.cassandra.io.DataInputBuffer;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
@@ -63,9 +63,8 @@
public static RangeCommand read(Message message) throws IOException
{
byte[] bytes = message.getMessageBody();
- DataInputBuffer dib = new DataInputBuffer();
- dib.reset(bytes, bytes.length);
- return serializer.deserialize(new DataInputStream(dib));
+ ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
+ return serializer.deserialize(new DataInputStream(bis));
}
public String toString()
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeReply.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeReply.java?rev=894312&r1=894311&r2=894312&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeReply.java
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeReply.java
Tue Dec 29 06:13:04 2009
@@ -19,11 +19,12 @@
package org.apache.cassandra.db;
import java.util.*;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
import java.io.IOException;
import org.apache.commons.lang.StringUtils;
-import org.apache.cassandra.io.DataInputBuffer;
import org.apache.cassandra.io.DataOutputBuffer;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.utils.FBUtilities;
@@ -63,17 +64,17 @@
public static RangeReply read(byte[] body) throws IOException
{
- DataInputBuffer bufIn = new DataInputBuffer();
- boolean rangeCompletedLocally;
- bufIn.reset(body, body.length);
- rangeCompletedLocally = bufIn.readBoolean();
+ ByteArrayInputStream bufIn = new ByteArrayInputStream(body);
+ boolean rangeCompletedLocally;
+ DataInputStream dis = new DataInputStream(bufIn);
+ rangeCompletedLocally = dis.readBoolean();
List<String> keys = new ArrayList<String>();
- while (bufIn.getPosition() < body.length)
+ while (dis.available() > 0)
{
- keys.add(bufIn.readUTF());
+ keys.add(dis.readUTF());
}
-
+
return new RangeReply(keys, rangeCompletedLocally);
}
}
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java?rev=894312&r1=894311&r2=894312&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java
Tue Dec 29 06:13:04 2009
@@ -37,7 +37,6 @@
package org.apache.cassandra.db;
import org.apache.cassandra.concurrent.StageManager;
-import org.apache.cassandra.io.DataInputBuffer;
import org.apache.cassandra.io.DataOutputBuffer;
import org.apache.cassandra.io.ICompactSerializer;
import org.apache.cassandra.net.Message;
@@ -50,6 +49,7 @@
import org.apache.thrift.TSerializer;
import org.apache.thrift.protocol.TBinaryProtocol;
+import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
@@ -105,9 +105,8 @@
public static RangeSliceCommand read(Message message) throws IOException
{
byte[] bytes = message.getMessageBody();
- DataInputBuffer dib = new DataInputBuffer();
- dib.reset(bytes, bytes.length);
- return serializer.deserialize(new DataInputStream(dib));
+ ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
+ return serializer.deserialize(new DataInputStream(bis));
}
}
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceReply.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceReply.java?rev=894312&r1=894311&r2=894312&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceReply.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceReply.java
Tue Dec 29 06:13:04 2009
@@ -20,13 +20,14 @@
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.io.DataInputBuffer;
import org.apache.cassandra.io.DataOutputBuffer;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.commons.lang.StringUtils;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
import java.io.IOException;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
@@ -67,14 +68,14 @@
public static RangeSliceReply read(byte[] body) throws IOException
{
- DataInputBuffer bufIn = new DataInputBuffer();
- bufIn.reset(body, body.length);
- boolean completed = bufIn.readBoolean();
- int rowCount = bufIn.readInt();
+ ByteArrayInputStream bufIn = new ByteArrayInputStream(body);
+ DataInputStream dis = new DataInputStream(bufIn);
+ boolean completed = dis.readBoolean();
+ int rowCount = dis.readInt();
List<Row> rows = new ArrayList<Row>(rowCount);
for (int i = 0; i < rowCount; i++)
{
- rows.add(Row.serializer().deserialize(bufIn));
+ rows.add(Row.serializer().deserialize(dis));
}
return new RangeSliceReply(rows, completed);
}
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java?rev=894312&r1=894311&r2=894312&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java
Tue Dec 29 06:13:04 2009
@@ -20,7 +20,6 @@
import java.io.*;
-import org.apache.cassandra.io.DataInputBuffer;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.Message;
@@ -33,12 +32,11 @@
public void doVerb(Message message)
{
byte[] body = message.getMessageBody();
- DataInputBuffer buffer = new DataInputBuffer();
- buffer.reset(body, body.length);
+ ByteArrayInputStream buffer = new ByteArrayInputStream(body);
try
{
- RowMutationMessage rmMsg =
RowMutationMessage.serializer().deserialize(buffer);
+ RowMutationMessage rmMsg =
RowMutationMessage.serializer().deserialize(new DataInputStream(buffer));
RowMutation rm = rmMsg.getRowMutation();
rm.apply();
}
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java?rev=894312&r1=894311&r2=894312&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java
Tue Dec 29 06:13:04 2009
@@ -18,11 +18,12 @@
package org.apache.cassandra.db;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
import java.io.IOException;
import java.util.List;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.io.DataInputBuffer;
import org.apache.cassandra.io.DataOutputBuffer;
import java.net.InetAddress;
import org.apache.cassandra.net.IVerbHandler;
@@ -37,7 +38,7 @@
{
protected static class ReadContext
{
- protected DataInputBuffer bufIn_ = new DataInputBuffer();
+ protected ByteArrayInputStream bufIn_;
protected DataOutputBuffer bufOut_ = new DataOutputBuffer();
}
@@ -65,7 +66,7 @@
readCtx = new ReadContext();
tls_.set(readCtx);
}
- readCtx.bufIn_.reset(body, body.length);
+ readCtx.bufIn_ = new ByteArrayInputStream(body);
try
{
@@ -74,7 +75,7 @@
/* Don't service reads! */
throw new RuntimeException("Cannot service reads while
bootstrapping!");
}
- ReadCommand command =
ReadCommand.serializer().deserialize(readCtx.bufIn_);
+ ReadCommand command = ReadCommand.serializer().deserialize(new
DataInputStream(readCtx.bufIn_));
Table table = Table.open(command.table);
Row row = command.getRow(table);
ReadResponse readResponse;
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java?rev=894312&r1=894311&r2=894312&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
Tue Dec 29 06:13:04 2009
@@ -20,7 +20,6 @@
import java.io.*;
-import org.apache.cassandra.io.DataInputBuffer;
import java.net.InetAddress;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.Message;
@@ -36,12 +35,11 @@
public void doVerb(Message message)
{
byte[] bytes = message.getMessageBody();
- DataInputBuffer buffer = new DataInputBuffer();
- buffer.reset(bytes, bytes.length);
+ ByteArrayInputStream buffer = new ByteArrayInputStream(bytes);
try
{
- RowMutation rm = RowMutation.serializer().deserialize(buffer);
+ RowMutation rm = RowMutation.serializer().deserialize(new
DataInputStream(buffer));
if (logger_.isDebugEnabled())
logger_.debug("Applying " + rm);
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IndexHelper.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IndexHelper.java?rev=894312&r1=894311&r2=894312&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IndexHelper.java
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IndexHelper.java
Tue Dec 29 06:13:04 2009
@@ -92,9 +92,9 @@
int size = file.readInt();
byte[] bytes = new byte[size];
file.readFully(bytes);
- DataInputBuffer bufIn = new DataInputBuffer();
- bufIn.reset(bytes, bytes.length);
- return BloomFilter.serializer().deserialize(bufIn);
+
+ ByteArrayInputStream bufIn = new ByteArrayInputStream(bytes);
+ return BloomFilter.serializer().deserialize(new
DataInputStream(bufIn));
}
/**
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/StreamRequestVerbHandler.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/StreamRequestVerbHandler.java?rev=894312&r1=894311&r2=894312&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/StreamRequestVerbHandler.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/StreamRequestVerbHandler.java
Tue Dec 29 06:13:04 2009
@@ -18,6 +18,8 @@
package org.apache.cassandra.io;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
import java.io.File;
import java.io.IOException;
import java.io.IOError;
@@ -53,11 +55,10 @@
logger_.debug("Received a StreamRequestMessage from " +
message.getFrom());
byte[] body = message.getMessageBody();
- DataInputBuffer bufIn = new DataInputBuffer();
- bufIn.reset(body, body.length);
+ ByteArrayInputStream bufIn = new ByteArrayInputStream(body);
try
{
- StreamRequestMessage streamRequestMessage =
StreamRequestMessage.serializer().deserialize(bufIn);
+ StreamRequestMessage streamRequestMessage =
StreamRequestMessage.serializer().deserialize(new DataInputStream(bufIn));
StreamRequestMetadata[] streamRequestMetadata =
streamRequestMessage.streamRequestMetadata_;
for (StreamRequestMetadata srm : streamRequestMetadata)
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Streaming.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Streaming.java?rev=894312&r1=894311&r2=894312&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Streaming.java
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Streaming.java
Tue Dec 29 06:13:04 2009
@@ -23,6 +23,8 @@
import java.net.InetAddress;
import java.util.*;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
import java.io.IOException;
import java.io.File;
import java.io.IOError;
@@ -147,12 +149,11 @@
public void doVerb(Message message)
{
byte[] body = message.getMessageBody();
- DataInputBuffer bufIn = new DataInputBuffer();
- bufIn.reset(body, body.length);
+ ByteArrayInputStream bufIn = new ByteArrayInputStream(body);
try
{
- StreamInitiateMessage biMsg =
StreamInitiateMessage.serializer().deserialize(bufIn);
+ StreamInitiateMessage biMsg =
StreamInitiateMessage.serializer().deserialize(new DataInputStream(bufIn));
StreamContextManager.StreamContext[] streamContexts =
biMsg.getStreamContext();
if (streamContexts.length == 0 &&
StorageService.instance().isBootstrapMode())
@@ -311,12 +312,11 @@
public void doVerb(Message message)
{
byte[] body = message.getMessageBody();
- DataInputBuffer bufIn = new DataInputBuffer();
- bufIn.reset(body, body.length);
+ ByteArrayInputStream bufIn = new ByteArrayInputStream(body);
try
{
- StreamContextManager.StreamStatusMessage streamStatusMessage =
StreamContextManager.StreamStatusMessage.serializer().deserialize(bufIn);
+ StreamContextManager.StreamStatusMessage streamStatusMessage =
StreamContextManager.StreamStatusMessage.serializer().deserialize(new
DataInputStream(bufIn));
StreamContextManager.StreamStatus streamStatus =
streamStatusMessage.getStreamStatus();
switch (streamStatus.getAction())
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/FastSerializer.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/FastSerializer.java?rev=894312&r1=894311&r2=894312&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/FastSerializer.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/FastSerializer.java
Tue Dec 29 06:13:04 2009
@@ -18,9 +18,10 @@
package org.apache.cassandra.net.io;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
import java.io.IOException;
-import org.apache.cassandra.io.DataInputBuffer;
import org.apache.cassandra.io.DataOutputBuffer;
import org.apache.cassandra.net.Message;
@@ -35,8 +36,7 @@
public Message deserialize(byte[] bytes) throws IOException
{
- DataInputBuffer bufIn = new DataInputBuffer();
- bufIn.reset(bytes, bytes.length);
- return Message.serializer().deserialize(bufIn);
+ ByteArrayInputStream bufIn = new ByteArrayInputStream(bytes);
+ return Message.serializer().deserialize(new DataInputStream(bufIn));
}
}
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java?rev=894312&r1=894311&r2=894312&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
Tue Dec 29 06:13:04 2009
@@ -33,7 +33,6 @@
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.CompactionIterator.CompactedRow;
-import org.apache.cassandra.io.DataInputBuffer;
import org.apache.cassandra.io.ICompactSerializer;
import org.apache.cassandra.io.SSTable;
import org.apache.cassandra.io.SSTableReader;
@@ -707,12 +706,11 @@
public void doVerb(Message message)
{
byte[] bytes = message.getMessageBody();
- DataInputBuffer buffer = new DataInputBuffer();
- buffer.reset(bytes, bytes.length);
-
+
+ ByteArrayInputStream buffer = new ByteArrayInputStream(bytes);
try
{
- CFPair request = this.deserialize(buffer);
+ CFPair request = this.deserialize(new DataInputStream(buffer));
// trigger readonly-compaction
logger.debug("Queueing readonly compaction for request from "
+ message.getFrom() + " for " + request);
@@ -775,13 +773,12 @@
public void doVerb(Message message)
{
byte[] bytes = message.getMessageBody();
- DataInputBuffer buffer = new DataInputBuffer();
- buffer.reset(bytes, bytes.length);
+ ByteArrayInputStream buffer = new ByteArrayInputStream(bytes);
try
{
// deserialize the remote tree, and register it
- Validator rvalidator = this.deserialize(buffer);
+ Validator rvalidator = this.deserialize(new
DataInputStream(buffer));
AntiEntropyService.instance().rendezvous(rvalidator.cf,
message.getFrom(), rvalidator.tree);
}
catch (IOException e)
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java?rev=894312&r1=894311&r2=894312&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java
Tue Dec 29 06:13:04 2009
@@ -18,6 +18,8 @@
package org.apache.cassandra.service;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -27,7 +29,6 @@
import org.apache.cassandra.db.ReadResponse;
import org.apache.cassandra.db.Row;
import org.apache.cassandra.db.ColumnFamily;
-import org.apache.cassandra.io.DataInputBuffer;
import java.net.InetAddress;
import org.apache.cassandra.net.IAsyncCallback;
import org.apache.cassandra.net.Message;
@@ -53,17 +54,16 @@
if (responses_.size() == ConsistencyManager.this.replicas_.size())
handleDigestResponses();
}
-
- private void handleDigestResponses()
- {
- DataInputBuffer bufIn = new DataInputBuffer();
+
+ private void handleDigestResponses()
+ {
for (Message response : responses_)
{
try
{
byte[] body = response.getMessageBody();
- bufIn.reset(body, body.length);
- ReadResponse result =
ReadResponse.serializer().deserialize(bufIn);
+ ByteArrayInputStream bufIn = new
ByteArrayInputStream(body);
+ ReadResponse result =
ReadResponse.serializer().deserialize(new DataInputStream(bufIn));
byte[] digest = result.digest();
if (!Arrays.equals(ColumnFamily.digest(row_.cf), digest))
{
@@ -77,8 +77,8 @@
}
}
}
-
- private void doReadRepair() throws IOException
+
+ private void doReadRepair() throws IOException
{
replicas_.add(FBUtilities.getLocalAddress());
IResponseResolver<Row> readResponseResolver = new
ReadResponseResolver(table_, replicas_.size());
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java?rev=894312&r1=894311&r2=894312&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
Tue Dec 29 06:13:04 2009
@@ -18,6 +18,8 @@
package org.apache.cassandra.service;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -28,7 +30,6 @@
import org.apache.cassandra.db.Row;
import org.apache.cassandra.db.RowMutation;
import org.apache.cassandra.db.RowMutationMessage;
-import org.apache.cassandra.io.DataInputBuffer;
import java.net.InetAddress;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.utils.FBUtilities;
@@ -77,12 +78,11 @@
* query exists then we need to compare the digest with
* the digest of the data that is received.
*/
- DataInputBuffer bufIn = new DataInputBuffer();
for (Message response : responses)
{
byte[] body = response.getMessageBody();
- bufIn.reset(body, body.length);
- ReadResponse result = ReadResponse.serializer().deserialize(bufIn);
+ ByteArrayInputStream bufIn = new ByteArrayInputStream(body);
+ ReadResponse result = ReadResponse.serializer().deserialize(new
DataInputStream(bufIn));
if (result.isDigestQuery())
{
digest = result.digest();
@@ -168,11 +168,10 @@
for (Message response : responses)
{
byte[] body = response.getMessageBody();
- DataInputBuffer bufIn = new DataInputBuffer();
- bufIn.reset(body, body.length);
+ ByteArrayInputStream bufIn = new ByteArrayInputStream(body);
try
{
- ReadResponse result =
ReadResponse.serializer().deserialize(bufIn);
+ ReadResponse result =
ReadResponse.serializer().deserialize(new DataInputStream(bufIn));
if (!result.isDigestQuery())
{
isDataPresent = true;
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=894312&r1=894311&r2=894312&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
Tue Dec 29 06:13:04 2009
@@ -17,6 +17,8 @@
*/
package org.apache.cassandra.service;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
import java.io.IOError;
import java.io.IOException;
import java.util.*;
@@ -31,7 +33,6 @@
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.*;
-import org.apache.cassandra.io.DataInputBuffer;
import java.net.InetAddress;
import org.apache.cassandra.net.IAsyncResult;
import org.apache.cassandra.net.Message;
@@ -346,9 +347,8 @@
{
byte[] body;
body = iar.get(DatabaseDescriptor.getRpcTimeout(),
TimeUnit.MILLISECONDS);
- DataInputBuffer bufIn = new DataInputBuffer();
- bufIn.reset(body, body.length);
- ReadResponse response =
ReadResponse.serializer().deserialize(bufIn);
+ ByteArrayInputStream bufIn = new ByteArrayInputStream(body);
+ ReadResponse response = ReadResponse.serializer().deserialize(new
DataInputStream(bufIn));
if (response.row() != null)
rows.add(response.row());
}
Modified:
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyTest.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyTest.java?rev=894312&r1=894311&r2=894312&view=diff
==============================================================================
---
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyTest.java
(original)
+++
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyTest.java
Tue Dec 29 06:13:04 2009
@@ -18,13 +18,14 @@
*/
package org.apache.cassandra.db;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.TreeMap;
import org.junit.Test;
-import org.apache.cassandra.io.DataInputBuffer;
import org.apache.cassandra.io.DataOutputBuffer;
import org.apache.cassandra.db.filter.QueryPath;
import static org.apache.cassandra.Util.column;
@@ -43,9 +44,8 @@
DataOutputBuffer bufOut = new DataOutputBuffer();
ColumnFamily.serializer().serialize(cf, bufOut);
- DataInputBuffer bufIn = new DataInputBuffer();
- bufIn.reset(bufOut.getData(), bufOut.getLength());
- cf = ColumnFamily.serializer().deserialize(bufIn);
+ ByteArrayInputStream bufIn = new
ByteArrayInputStream(bufOut.getData(), 0, bufOut.getLength());
+ cf = ColumnFamily.serializer().deserialize(new DataInputStream(bufIn));
assert cf != null;
assert cf.name().equals("Standard1");
assert cf.getSortedColumns().size() == 1;
@@ -72,9 +72,8 @@
ColumnFamily.serializer().serialize(cf, bufOut);
// verify
- DataInputBuffer bufIn = new DataInputBuffer();
- bufIn.reset(bufOut.getData(), bufOut.getLength());
- cf = ColumnFamily.serializer().deserialize(bufIn);
+ ByteArrayInputStream bufIn = new
ByteArrayInputStream(bufOut.getData(), 0, bufOut.getLength());
+ cf = ColumnFamily.serializer().deserialize(new DataInputStream(bufIn));
for (String cName : map.navigableKeySet())
{
assert new
String(cf.getColumn(cName.getBytes()).value()).equals(map.get(cName));
Modified:
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ReadMessageTest.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ReadMessageTest.java?rev=894312&r1=894311&r2=894312&view=diff
==============================================================================
---
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ReadMessageTest.java
(original)
+++
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ReadMessageTest.java
Tue Dec 29 06:13:04 2009
@@ -20,6 +20,8 @@
import static org.junit.Assert.*;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -28,7 +30,6 @@
import org.junit.Assert;
import org.junit.Test;
-import org.apache.cassandra.io.DataInputBuffer;
import org.apache.cassandra.io.DataOutputBuffer;
import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.db.marshal.AsciiType;
@@ -61,11 +62,11 @@
{
ReadCommandSerializer rms = ReadCommand.serializer();
DataOutputBuffer dos = new DataOutputBuffer();
- DataInputBuffer dis = new DataInputBuffer();
+ ByteArrayInputStream bis;
rms.serialize(rm, dos);
- dis.reset(dos.getData(), dos.getLength());
- return rms.deserialize(dis);
+ bis = new ByteArrayInputStream(dos.getData(), 0, dos.getLength());
+ return rms.deserialize(new DataInputStream(bis));
}
@Test
Modified:
incubator/cassandra/trunk/test/unit/org/apache/cassandra/gms/GossipDigestTest.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/gms/GossipDigestTest.java?rev=894312&r1=894311&r2=894312&view=diff
==============================================================================
---
incubator/cassandra/trunk/test/unit/org/apache/cassandra/gms/GossipDigestTest.java
(original)
+++
incubator/cassandra/trunk/test/unit/org/apache/cassandra/gms/GossipDigestTest.java
Tue Dec 29 06:13:04 2009
@@ -23,9 +23,10 @@
import static org.junit.Assert.*;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
import java.io.IOException;
-import org.apache.cassandra.io.DataInputBuffer;
import org.apache.cassandra.io.DataOutputBuffer;
import java.net.InetAddress;
import org.junit.Test;
@@ -49,9 +50,8 @@
DataOutputBuffer output = new DataOutputBuffer();
GossipDigest.serializer().serialize(expected, output);
- DataInputBuffer input = new DataInputBuffer();
- input.reset(output.getData(), output.getLength());
- GossipDigest actual = GossipDigest.serializer().deserialize(input);
+ ByteArrayInputStream input = new
ByteArrayInputStream(output.getData(), 0, output.getLength());
+ GossipDigest actual = GossipDigest.serializer().deserialize(new
DataInputStream(input));
assertEquals(0, expected.compareTo(actual));
}
Modified:
incubator/cassandra/trunk/test/unit/org/apache/cassandra/utils/FilterTest.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/utils/FilterTest.java?rev=894312&r1=894311&r2=894312&view=diff
==============================================================================
---
incubator/cassandra/trunk/test/unit/org/apache/cassandra/utils/FilterTest.java
(original)
+++
incubator/cassandra/trunk/test/unit/org/apache/cassandra/utils/FilterTest.java
Tue Dec 29 06:13:04 2009
@@ -18,6 +18,8 @@
*/
package org.apache.cassandra.utils;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
@@ -25,7 +27,6 @@
import org.junit.Test;
-import org.apache.cassandra.io.DataInputBuffer;
import org.apache.cassandra.io.DataOutputBuffer;
public class FilterTest
@@ -102,9 +103,8 @@
DataOutputBuffer out = new DataOutputBuffer();
f.getSerializer().serialize(f, out);
- DataInputBuffer in = new DataInputBuffer();
- in.reset(out.getData(), out.getLength());
- Filter f2 = f.getSerializer().deserialize(in);
+ ByteArrayInputStream in = new ByteArrayInputStream(out.getData(), 0,
out.getLength());
+ Filter f2 = f.getSerializer().deserialize(new DataInputStream(in));
assert f2.isPresent("a");
assert !f2.isPresent("b");