Author: jbellis
Date: Tue Jun 22 04:07:46 2010
New Revision: 956769
URL: http://svn.apache.org/viewvc?rev=956769&view=rev
Log:
fix race condition in SSTable*Iterator.
patch by Sylvain Lebresne; reviewed by jbellis for CASSANDRA-1130
Added:
cassandra/trunk/src/java/org/apache/cassandra/io/util/FileMark.java
- copied, changed from r956250,
cassandra/trunk/src/java/org/apache/cassandra/io/util/FileDataInput.java
cassandra/trunk/test/unit/org/apache/cassandra/db/RowIterationTest.java
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableNamesIterator.java
cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableSliceIterator.java
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/IndexHelper.java
cassandra/trunk/src/java/org/apache/cassandra/io/util/BufferedRandomAccessFile.java
cassandra/trunk/src/java/org/apache/cassandra/io/util/FileDataInput.java
cassandra/trunk/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableNamesIterator.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableNamesIterator.java?rev=956769&r1=956768&r2=956769&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableNamesIterator.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableNamesIterator.java
Tue Jun 22 04:07:46 2010
@@ -35,6 +35,7 @@ import org.apache.cassandra.db.marshal.A
import org.apache.cassandra.io.sstable.IndexHelper;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.io.util.FileDataInput;
+import org.apache.cassandra.io.util.FileMark;
import org.apache.cassandra.utils.BloomFilter;
import org.apache.cassandra.utils.FBUtilities;
@@ -112,14 +113,14 @@ public class SSTableNamesIterator extend
ranges.add(indexInfo);
}
- file.mark();
+ FileMark mark = file.mark();
for (IndexHelper.IndexInfo indexInfo : ranges)
{
- file.reset();
+ file.reset(mark);
long curOffsert = file.skipBytes((int) indexInfo.offset);
assert curOffsert == indexInfo.offset;
// TODO only completely deserialize columns we are interested
in
- while (file.bytesPastMark() < indexInfo.offset +
indexInfo.width)
+ while (file.bytesPastMark(mark) < indexInfo.offset +
indexInfo.width)
{
final IColumn column =
cf.getColumnSerializer().deserialize(file);
// we check vs the original Set, not the filtered List,
for efficiency
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableSliceIterator.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableSliceIterator.java?rev=956769&r1=956768&r2=956769&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableSliceIterator.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableSliceIterator.java
Tue Jun 22 04:07:46 2010
@@ -33,6 +33,7 @@ import org.apache.cassandra.db.marshal.A
import org.apache.cassandra.io.sstable.IndexHelper;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.io.util.FileDataInput;
+import org.apache.cassandra.io.util.FileMark;
import com.google.common.base.Predicate;
import com.google.common.collect.AbstractIterator;
@@ -172,6 +173,7 @@ class SSTableSliceIterator extends Abstr
private int curRangeIndex;
private Deque<IColumn> blockColumns = new ArrayDeque<IColumn>();
+ private final FileMark mark;
public ColumnGroupReader(SSTableReader ssTable, FileDataInput input)
{
@@ -188,7 +190,7 @@ class SSTableSliceIterator extends Abstr
{
throw new IOError(e);
}
- file.mark();
+ this.mark = file.mark();
curRangeIndex = IndexHelper.indexFor(startColumn, indexes,
comparator, reversed);
if (reversed && curRangeIndex == indexes.size())
curRangeIndex--;
@@ -241,10 +243,10 @@ class SSTableSliceIterator extends Abstr
boolean outOfBounds = false;
- file.reset();
+ file.reset(mark);
long curOffset = file.skipBytes((int) curColPosition.offset);
assert curOffset == curColPosition.offset;
- while (file.bytesPastMark() < curColPosition.offset +
curColPosition.width && !outOfBounds)
+ while (file.bytesPastMark(mark) < curColPosition.offset +
curColPosition.width && !outOfBounds)
{
IColumn column =
emptyColumnFamily.getColumnSerializer().deserialize(file);
if (reversed)
Modified:
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/IndexHelper.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/IndexHelper.java?rev=956769&r1=956768&r2=956769&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/IndexHelper.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/IndexHelper.java
Tue Jun 22 04:07:46 2010
@@ -25,6 +25,7 @@ import org.apache.cassandra.db.marshal.A
import org.apache.cassandra.utils.BloomFilter;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.io.util.FileDataInput;
+import org.apache.cassandra.io.util.FileMark;
/**
* Provides helper to serialize, deserialize and use column indexes.
@@ -70,12 +71,12 @@ public class IndexHelper
ArrayList<IndexInfo> indexList = new ArrayList<IndexInfo>();
int columnIndexSize = in.readInt();
- in.mark();
- while (in.bytesPastMark() < columnIndexSize)
+ FileMark mark = in.mark();
+ while (in.bytesPastMark(mark) < columnIndexSize)
{
indexList.add(IndexInfo.deserialize(in));
}
- assert in.bytesPastMark() == columnIndexSize;
+ assert in.bytesPastMark(mark) == columnIndexSize;
return indexList;
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/io/util/BufferedRandomAccessFile.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/util/BufferedRandomAccessFile.java?rev=956769&r1=956768&r2=956769&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/io/util/BufferedRandomAccessFile.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/io/util/BufferedRandomAccessFile.java
Tue Jun 22 04:07:46 2010
@@ -56,7 +56,6 @@ public class BufferedRandomAccessFile ex
private long maxHi_; // this.lo + this.buff.length
private boolean hitEOF_; // buffer contains last file block?
private long diskPos_; // disk position
- private long markedPointer;
private long fileLength = -1; // cache for file size
/*
@@ -408,22 +407,34 @@ public class BufferedRandomAccessFile ex
return length() - getFilePointer();
}
- public void mark()
+ public FileMark mark()
{
- markedPointer = getFilePointer();
+ return new BufferedRandomAccessFileMark(getFilePointer());
}
- public void reset() throws IOException
+ public void reset(FileMark mark) throws IOException
{
- seek(markedPointer);
+ assert mark instanceof BufferedRandomAccessFileMark;
+ seek(((BufferedRandomAccessFileMark) mark).pointer);
}
- public int bytesPastMark()
+ public int bytesPastMark(FileMark mark)
{
- long bytes = getFilePointer() - markedPointer;
+ assert mark instanceof BufferedRandomAccessFileMark;
+ long bytes = getFilePointer() - ((BufferedRandomAccessFileMark)
mark).pointer;
assert bytes >= 0;
if (bytes > Integer.MAX_VALUE)
throw new UnsupportedOperationException("Overflow: " + bytes);
return (int) bytes;
}
+
+ private static class BufferedRandomAccessFileMark implements FileMark
+ {
+ long pointer;
+
+ BufferedRandomAccessFileMark(long pointer)
+ {
+ this.pointer = pointer;
+ }
+ }
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/io/util/FileDataInput.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/util/FileDataInput.java?rev=956769&r1=956768&r2=956769&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/util/FileDataInput.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/util/FileDataInput.java
Tue Jun 22 04:07:46 2010
@@ -33,9 +33,9 @@ public interface FileDataInput extends D
public long bytesRemaining() throws IOException;
- public void mark();
+ public FileMark mark();
- public void reset() throws IOException;
+ public void reset(FileMark mark) throws IOException;
- public int bytesPastMark();
+ public int bytesPastMark(FileMark mark);
}
Copied: cassandra/trunk/src/java/org/apache/cassandra/io/util/FileMark.java
(from r956250,
cassandra/trunk/src/java/org/apache/cassandra/io/util/FileDataInput.java)
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/util/FileMark.java?p2=cassandra/trunk/src/java/org/apache/cassandra/io/util/FileMark.java&p1=cassandra/trunk/src/java/org/apache/cassandra/io/util/FileDataInput.java&r1=956250&r2=956769&rev=956769&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/util/FileDataInput.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/util/FileMark.java Tue Jun
22 04:07:46 2010
@@ -20,22 +20,4 @@ package org.apache.cassandra.io.util;
*
*/
-
-import java.io.DataInput;
-import java.io.IOException;
-import java.io.Closeable;
-
-public interface FileDataInput extends DataInput, Closeable
-{
- public String getPath();
-
- public boolean isEOF() throws IOException;
-
- public long bytesRemaining() throws IOException;
-
- public void mark();
-
- public void reset() throws IOException;
-
- public int bytesPastMark();
-}
+public interface FileMark {}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java?rev=956769&r1=956768&r2=956769&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java
Tue Jun 22 04:07:46 2010
@@ -29,7 +29,6 @@ public class MappedFileDataInput extends
private final MappedByteBuffer buffer;
private final String filename;
private int position;
- private int markedPosition;
public MappedFileDataInput(MappedByteBuffer buffer, String filename, int
position)
{
@@ -48,30 +47,26 @@ public class MappedFileDataInput extends
@Override
public boolean markSupported()
{
- return true;
+ return false;
}
@Override
- public void mark(int ignored)
+ public void reset(FileMark mark) throws IOException
{
- markedPosition = position;
+ assert mark instanceof MappedFileDataInputMark;
+ seekInternal(((MappedFileDataInputMark) mark).position);
}
- @Override
- public void reset() throws IOException
+ public FileMark mark()
{
- seekInternal(markedPosition);
+ return new MappedFileDataInputMark(position);
}
- public void mark()
+ public int bytesPastMark(FileMark mark)
{
- mark(-1);
- }
-
- public int bytesPastMark()
- {
- assert position >= markedPosition;
- return position - markedPosition;
+ assert mark instanceof MappedFileDataInputMark;
+ assert position >= ((MappedFileDataInputMark) mark).position;
+ return position - ((MappedFileDataInputMark) mark).position;
}
public boolean isEOF() throws IOException
@@ -423,4 +418,14 @@ public class MappedFileDataInput extends
public final String readUTF() throws IOException {
return DataInputStream.readUTF(this);
}
+
+ private static class MappedFileDataInputMark implements FileMark
+ {
+ int position;
+
+ MappedFileDataInputMark(int position)
+ {
+ this.position = position;
+ }
+ }
}
Added: cassandra/trunk/test/unit/org/apache/cassandra/db/RowIterationTest.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/RowIterationTest.java?rev=956769&view=auto
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/RowIterationTest.java
(added)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/RowIterationTest.java Tue
Jun 22 04:07:46 2010
@@ -0,0 +1,62 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.db;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.ArrayList;
+import java.util.Set;
+import java.util.HashSet;
+
+import org.apache.cassandra.Util;
+
+import org.junit.Test;
+
+import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.CleanupHelper;
+import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.utils.FBUtilities;
+import static junit.framework.Assert.assertEquals;
+
+public class RowIterationTest extends CleanupHelper
+{
+ public static final String TABLE1 = "Keyspace2";
+ public static final InetAddress LOCAL = FBUtilities.getLocalAddress();
+
+ @Test
+ public void testRowIteration() throws IOException, ExecutionException,
InterruptedException
+ {
+ Table table = Table.open(TABLE1);
+ ColumnFamilyStore store = table.getColumnFamilyStore("Super3");
+
+ final int ROWS_PER_SSTABLE = 10;
+ Set<DecoratedKey> inserted = new HashSet<DecoratedKey>();
+ for (int i = 0; i < ROWS_PER_SSTABLE; i++) {
+ DecoratedKey key = Util.dk(String.valueOf(i));
+ RowMutation rm = new RowMutation(TABLE1, key.key);
+ rm.add(new QueryPath("Super3", "sc".getBytes(),
String.valueOf(i).getBytes()), new byte[ROWS_PER_SSTABLE * 10 - i * 2], new
TimestampClock(i));
+ rm.apply();
+ inserted.add(key);
+ }
+ store.forceBlockingFlush();
+ assertEquals(inserted.toString(), inserted.size(),
Util.getRangeSlice(store).size());
+ }
+}