Author: jbellis
Date: Fri Aug 5 15:43:58 2011
New Revision: 1154274
URL: http://svn.apache.org/viewvc?rev=1154274&view=rev
Log:
fix tracker getting out of sync with underlying data source
patch by jbellis; reviewed by slebresne for CASSANDRA-2901
Modified:
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
cassandra/trunk/src/java/org/apache/cassandra/utils/BytesReadTracker.java
cassandra/trunk/test/unit/org/apache/cassandra/utils/BytesReadTrackerTest.java
Modified:
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java?rev=1154274&r1=1154273&r2=1154274&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
Fri Aug 5 15:43:58 2011
@@ -21,11 +21,7 @@ package org.apache.cassandra.io.sstable;
*/
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.EOFException;
-import java.io.IOError;
-import java.io.IOException;
+import java.io.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,7 +40,6 @@ public class SSTableIdentityIterator imp
private static final Logger logger =
LoggerFactory.getLogger(SSTableIdentityIterator.class);
private final DecoratedKey key;
- private final long finishedAt;
private final DataInput input;
private final long dataStart;
public final long dataSize;
@@ -110,7 +105,6 @@ public class SSTableIdentityIterator imp
this.expireBefore = (int)(System.currentTimeMillis() / 1000);
this.fromRemote = fromRemote;
this.validateColumns = checkData;
- finishedAt = dataStart + dataSize;
try
{
@@ -118,6 +112,9 @@ public class SSTableIdentityIterator imp
{
RandomAccessReader file = (RandomAccessReader) input;
file.seek(this.dataStart);
+ if (dataStart + dataSize > file.length())
+ throw new IOException(String.format("dataSize of %s
starting at %s would be larger than file %s length %s",
+ dataSize, dataStart, file.getPath(),
file.length()));
if (checkData)
{
try
@@ -141,6 +138,7 @@ public class SSTableIdentityIterator imp
logger.debug("Invalid row summary in {}; will rebuild
it", sstable);
}
file.seek(this.dataStart);
+ inputWithTracker.reset(0);
}
}
@@ -150,11 +148,7 @@ public class SSTableIdentityIterator imp
ColumnFamily.serializer().deserializeFromSSTableNoColumns(columnFamily,
inputWithTracker);
columnCount = inputWithTracker.readInt();
- if (input instanceof RandomAccessReader)
- {
- RandomAccessReader file = (RandomAccessReader) input;
- columnPosition = file.getFilePointer();
- }
+ columnPosition = dataStart + inputWithTracker.getBytesRead();
}
catch (IOException e)
{
@@ -174,15 +168,7 @@ public class SSTableIdentityIterator imp
public boolean hasNext()
{
- if (input instanceof RandomAccessReader)
- {
- RandomAccessReader file = (RandomAccessReader) input;
- return file.getFilePointer() < finishedAt;
- }
- else
- {
- return inputWithTracker.getBytesRead() < dataSize;
- }
+ return inputWithTracker.getBytesRead() < dataSize;
}
public IColumn next()
@@ -230,36 +216,21 @@ public class SSTableIdentityIterator imp
public void echoData(DataOutput out) throws IOException
{
- // only effective when input is from file
- if (input instanceof RandomAccessReader)
- {
- RandomAccessReader file = (RandomAccessReader) input;
- file.seek(dataStart);
- while (file.getFilePointer() < finishedAt)
- {
- out.write(file.readByte());
- }
- }
- else
- {
+ if (!(input instanceof RandomAccessReader))
throw new UnsupportedOperationException();
- }
+
+ ((RandomAccessReader) input).seek(dataStart);
+ inputWithTracker.reset(0);
+ while (inputWithTracker.getBytesRead() < dataSize)
+ out.write(inputWithTracker.readByte());
}
public ColumnFamily getColumnFamilyWithColumns() throws IOException
{
+ assert inputWithTracker.getBytesRead() == headerSize();
ColumnFamily cf = columnFamily.cloneMeShallow();
- if (input instanceof RandomAccessReader)
- {
- RandomAccessReader file = (RandomAccessReader) input;
- file.seek(columnPosition - 4); // seek to before column count int
- ColumnFamily.serializer().deserializeColumns(inputWithTracker, cf,
false, fromRemote);
- }
- else
- {
- // since we already read column count, just pass that value and
continue deserialization
- ColumnFamily.serializer().deserializeColumns(inputWithTracker, cf,
columnCount, false, fromRemote);
- }
+ // since we already read column count, just pass that value and
continue deserialization
+ ColumnFamily.serializer().deserializeColumns(inputWithTracker, cf,
columnCount, false, fromRemote);
if (validateColumns)
{
try
@@ -274,6 +245,11 @@ public class SSTableIdentityIterator imp
return cf;
}
+ private long headerSize()
+ {
+ return columnPosition - dataStart;
+ }
+
public int compareTo(SSTableIdentityIterator o)
{
return key.compareTo(o.key);
@@ -281,23 +257,18 @@ public class SSTableIdentityIterator imp
public void reset()
{
- // only effective when input is from file
- if (input instanceof RandomAccessReader)
+ if (!(input instanceof RandomAccessReader))
+ throw new UnsupportedOperationException();
+
+ RandomAccessReader file = (RandomAccessReader) input;
+ try
{
- RandomAccessReader file = (RandomAccessReader) input;
- try
- {
- file.seek(columnPosition);
- }
- catch (IOException e)
- {
- throw new IOError(e);
- }
- inputWithTracker.reset();
+ file.seek(columnPosition);
}
- else
+ catch (IOException e)
{
- throw new UnsupportedOperationException();
+ throw new IOError(e);
}
+ inputWithTracker.reset(headerSize());
}
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java?rev=1154274&r1=1154273&r2=1154274&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
Fri Aug 5 15:43:58 2011
@@ -117,7 +117,7 @@ public class IncomingStreamReader
long bytesRead = 0;
while (bytesRead < length)
{
- in.reset();
+ in.reset(0);
key =
SSTableReader.decodeKey(StorageService.getPartitioner(), localFile.desc,
ByteBufferUtil.readWithShortLength(in));
long dataSize = SSTableReader.readRowSize(in,
localFile.desc);
ColumnFamily cf = null;
Modified:
cassandra/trunk/src/java/org/apache/cassandra/utils/BytesReadTracker.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/utils/BytesReadTracker.java?rev=1154274&r1=1154273&r2=1154274&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/utils/BytesReadTracker.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/utils/BytesReadTracker.java
Fri Aug 5 15:43:58 2011
@@ -40,13 +40,13 @@ public class BytesReadTracker implements
{
return bytesRead;
}
-
+
/**
- * reset counter to 0
+ * reset counter to @param count
*/
- public void reset()
+ public void reset(long count)
{
- bytesRead = 0;
+ bytesRead = count;
}
public boolean readBoolean() throws IOException
Modified:
cassandra/trunk/test/unit/org/apache/cassandra/utils/BytesReadTrackerTest.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/utils/BytesReadTrackerTest.java?rev=1154274&r1=1154273&r2=1154274&view=diff
==============================================================================
---
cassandra/trunk/test/unit/org/apache/cassandra/utils/BytesReadTrackerTest.java
(original)
+++
cassandra/trunk/test/unit/org/apache/cassandra/utils/BytesReadTrackerTest.java
Fri Aug 5 15:43:58 2011
@@ -115,7 +115,7 @@ public class BytesReadTrackerTest
dis.close();
}
- tracker.reset();
+ tracker.reset(0);
assertEquals(0, tracker.getBytesRead());
}
@@ -152,6 +152,8 @@ public class BytesReadTrackerTest
int s = tracker.readUnsignedShort();
assertEquals(1, s);
assertEquals(3, tracker.getBytesRead());
+
+ assertEquals(testData.length, tracker.getBytesRead());
}
finally
{
@@ -185,6 +187,8 @@ public class BytesReadTrackerTest
tracker.readFully(out);
assertEquals("890", new String(out));
assertEquals(10, tracker.getBytesRead());
+
+ assertEquals(testData.length, tracker.getBytesRead());
}
finally
{