Author: jbellis
Date: Tue Jun 22 04:07:46 2010
New Revision: 956769

URL: http://svn.apache.org/viewvc?rev=956769&view=rev
Log:
fix race condition in SSTable*Iterator.
patch by Sylvain Lebresne; reviewed by jbellis for CASSANDRA-1130

Added:
    cassandra/trunk/src/java/org/apache/cassandra/io/util/FileMark.java
      - copied, changed from r956250, 
cassandra/trunk/src/java/org/apache/cassandra/io/util/FileDataInput.java
    cassandra/trunk/test/unit/org/apache/cassandra/db/RowIterationTest.java
Modified:
    
cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableNamesIterator.java
    
cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableSliceIterator.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/IndexHelper.java
    
cassandra/trunk/src/java/org/apache/cassandra/io/util/BufferedRandomAccessFile.java
    cassandra/trunk/src/java/org/apache/cassandra/io/util/FileDataInput.java
    
cassandra/trunk/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableNamesIterator.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableNamesIterator.java?rev=956769&r1=956768&r2=956769&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableNamesIterator.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableNamesIterator.java
 Tue Jun 22 04:07:46 2010
@@ -35,6 +35,7 @@ import org.apache.cassandra.db.marshal.A
 import org.apache.cassandra.io.sstable.IndexHelper;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.io.util.FileDataInput;
+import org.apache.cassandra.io.util.FileMark;
 import org.apache.cassandra.utils.BloomFilter;
 import org.apache.cassandra.utils.FBUtilities;
 
@@ -112,14 +113,14 @@ public class SSTableNamesIterator extend
                 ranges.add(indexInfo);
             }
 
-            file.mark();
+            FileMark mark = file.mark();
             for (IndexHelper.IndexInfo indexInfo : ranges)
             {
-                file.reset();
+                file.reset(mark);
                 long curOffsert = file.skipBytes((int) indexInfo.offset);
                 assert curOffsert == indexInfo.offset;
                 // TODO only completely deserialize columns we are interested 
in
-                while (file.bytesPastMark() < indexInfo.offset + 
indexInfo.width)
+                while (file.bytesPastMark(mark) < indexInfo.offset + 
indexInfo.width)
                 {
                     final IColumn column = 
cf.getColumnSerializer().deserialize(file);
                     // we check vs the original Set, not the filtered List, 
for efficiency

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableSliceIterator.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableSliceIterator.java?rev=956769&r1=956768&r2=956769&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableSliceIterator.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableSliceIterator.java
 Tue Jun 22 04:07:46 2010
@@ -33,6 +33,7 @@ import org.apache.cassandra.db.marshal.A
 import org.apache.cassandra.io.sstable.IndexHelper;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.io.util.FileDataInput;
+import org.apache.cassandra.io.util.FileMark;
 
 import com.google.common.base.Predicate;
 import com.google.common.collect.AbstractIterator;
@@ -172,6 +173,7 @@ class SSTableSliceIterator extends Abstr
 
         private int curRangeIndex;
         private Deque<IColumn> blockColumns = new ArrayDeque<IColumn>();
+        private final FileMark mark;
 
         public ColumnGroupReader(SSTableReader ssTable, FileDataInput input)
         {
@@ -188,7 +190,7 @@ class SSTableSliceIterator extends Abstr
             {
                 throw new IOError(e);
             }
-            file.mark();
+            this.mark = file.mark();
             curRangeIndex = IndexHelper.indexFor(startColumn, indexes, 
comparator, reversed);
             if (reversed && curRangeIndex == indexes.size())
                 curRangeIndex--;
@@ -241,10 +243,10 @@ class SSTableSliceIterator extends Abstr
 
             boolean outOfBounds = false;
 
-            file.reset();
+            file.reset(mark);
             long curOffset = file.skipBytes((int) curColPosition.offset); 
             assert curOffset == curColPosition.offset;
-            while (file.bytesPastMark() < curColPosition.offset + 
curColPosition.width && !outOfBounds)
+            while (file.bytesPastMark(mark) < curColPosition.offset + 
curColPosition.width && !outOfBounds)
             {
                 IColumn column = 
emptyColumnFamily.getColumnSerializer().deserialize(file);
                 if (reversed)

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/IndexHelper.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/IndexHelper.java?rev=956769&r1=956768&r2=956769&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/IndexHelper.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/IndexHelper.java 
Tue Jun 22 04:07:46 2010
@@ -25,6 +25,7 @@ import org.apache.cassandra.db.marshal.A
 import org.apache.cassandra.utils.BloomFilter;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.io.util.FileDataInput;
+import org.apache.cassandra.io.util.FileMark;
 
 /**
  * Provides helper to serialize, deserialize and use column indexes.
@@ -70,12 +71,12 @@ public class IndexHelper
         ArrayList<IndexInfo> indexList = new ArrayList<IndexInfo>();
 
                int columnIndexSize = in.readInt();
-        in.mark();
-        while (in.bytesPastMark() < columnIndexSize)
+        FileMark mark = in.mark();
+        while (in.bytesPastMark(mark) < columnIndexSize)
         {
             indexList.add(IndexInfo.deserialize(in));
         }
-        assert in.bytesPastMark() == columnIndexSize;
+        assert in.bytesPastMark(mark) == columnIndexSize;
 
         return indexList;
        }

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/io/util/BufferedRandomAccessFile.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/util/BufferedRandomAccessFile.java?rev=956769&r1=956768&r2=956769&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/io/util/BufferedRandomAccessFile.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/io/util/BufferedRandomAccessFile.java
 Tue Jun 22 04:07:46 2010
@@ -56,7 +56,6 @@ public class BufferedRandomAccessFile ex
     private long maxHi_; // this.lo + this.buff.length
     private boolean hitEOF_; // buffer contains last file block?
     private long diskPos_; // disk position
-    private long markedPointer;
     private long fileLength = -1; // cache for file size
 
     /*
@@ -408,22 +407,34 @@ public class BufferedRandomAccessFile ex
         return length() - getFilePointer();
     }
 
-    public void mark()
+    public FileMark mark()
     {
-        markedPointer = getFilePointer();
+        return new BufferedRandomAccessFileMark(getFilePointer());
     }
 
-    public void reset() throws IOException
+    public void reset(FileMark mark) throws IOException
     {
-        seek(markedPointer);
+        assert mark instanceof BufferedRandomAccessFileMark;
+        seek(((BufferedRandomAccessFileMark) mark).pointer);
     }
 
-    public int bytesPastMark()
+    public int bytesPastMark(FileMark mark)
     {
-        long bytes = getFilePointer() - markedPointer;
+        assert mark instanceof BufferedRandomAccessFileMark;
+        long bytes = getFilePointer() - ((BufferedRandomAccessFileMark) 
mark).pointer;
         assert bytes >= 0;
         if (bytes > Integer.MAX_VALUE)
             throw new UnsupportedOperationException("Overflow: " + bytes);
         return (int) bytes;
     }
+
+    private static class BufferedRandomAccessFileMark implements FileMark
+    {
+        long pointer;
+
+        BufferedRandomAccessFileMark(long pointer)
+        {
+            this.pointer = pointer;
+        }
+    }
 }

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/io/util/FileDataInput.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/util/FileDataInput.java?rev=956769&r1=956768&r2=956769&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/util/FileDataInput.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/util/FileDataInput.java 
Tue Jun 22 04:07:46 2010
@@ -33,9 +33,9 @@ public interface FileDataInput extends D
 
     public long bytesRemaining() throws IOException;
 
-    public void mark();
+    public FileMark mark();
 
-    public void reset() throws IOException;
+    public void reset(FileMark mark) throws IOException;
 
-    public int bytesPastMark();
+    public int bytesPastMark(FileMark mark);
 }

Copied: cassandra/trunk/src/java/org/apache/cassandra/io/util/FileMark.java 
(from r956250, 
cassandra/trunk/src/java/org/apache/cassandra/io/util/FileDataInput.java)
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/util/FileMark.java?p2=cassandra/trunk/src/java/org/apache/cassandra/io/util/FileMark.java&p1=cassandra/trunk/src/java/org/apache/cassandra/io/util/FileDataInput.java&r1=956250&r2=956769&rev=956769&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/util/FileDataInput.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/util/FileMark.java Tue Jun 
22 04:07:46 2010
@@ -20,22 +20,4 @@ package org.apache.cassandra.io.util;
  * 
  */
 
-
-import java.io.DataInput;
-import java.io.IOException;
-import java.io.Closeable;
-
-public interface FileDataInput extends DataInput, Closeable
-{
-    public String getPath();
-
-    public boolean isEOF() throws IOException;
-
-    public long bytesRemaining() throws IOException;
-
-    public void mark();
-
-    public void reset() throws IOException;
-
-    public int bytesPastMark();
-}
+public interface FileMark {}

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java?rev=956769&r1=956768&r2=956769&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java 
(original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java 
Tue Jun 22 04:07:46 2010
@@ -29,7 +29,6 @@ public class MappedFileDataInput extends
     private final MappedByteBuffer buffer;
     private final String filename;
     private int position;
-    private int markedPosition;
 
     public MappedFileDataInput(MappedByteBuffer buffer, String filename, int 
position)
     {
@@ -48,30 +47,26 @@ public class MappedFileDataInput extends
     @Override
     public boolean markSupported()
     {
-        return true;
+        return false;
     }
 
     @Override
-    public void mark(int ignored)
+    public void reset(FileMark mark) throws IOException
     {
-        markedPosition = position;
+        assert mark instanceof MappedFileDataInputMark;
+        seekInternal(((MappedFileDataInputMark) mark).position);
     }
 
-    @Override
-    public void reset() throws IOException
+    public FileMark mark()
     {
-        seekInternal(markedPosition);
+        return new MappedFileDataInputMark(position);
     }
 
-    public void mark()
+    public int bytesPastMark(FileMark mark)
     {
-        mark(-1);
-    }
-
-    public int bytesPastMark()
-    {
-        assert position >= markedPosition;
-        return position - markedPosition;
+        assert mark instanceof MappedFileDataInputMark;
+        assert position >= ((MappedFileDataInputMark) mark).position;
+        return position - ((MappedFileDataInputMark) mark).position;
     }
 
     public boolean isEOF() throws IOException
@@ -423,4 +418,14 @@ public class MappedFileDataInput extends
     public final String readUTF() throws IOException {
         return DataInputStream.readUTF(this);
     }
+
+    private static class MappedFileDataInputMark implements FileMark
+    {
+        int position;
+
+        MappedFileDataInputMark(int position)
+        {
+            this.position = position;
+        }
+    }
 }

Added: cassandra/trunk/test/unit/org/apache/cassandra/db/RowIterationTest.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/RowIterationTest.java?rev=956769&view=auto
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/RowIterationTest.java 
(added)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/RowIterationTest.java Tue 
Jun 22 04:07:46 2010
@@ -0,0 +1,62 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.db;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.ArrayList;
+import java.util.Set;
+import java.util.HashSet;
+
+import org.apache.cassandra.Util;
+
+import org.junit.Test;
+
+import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.CleanupHelper;
+import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.utils.FBUtilities;
+import static junit.framework.Assert.assertEquals;
+
+public class RowIterationTest extends CleanupHelper
+{
+    public static final String TABLE1 = "Keyspace2";
+    public static final InetAddress LOCAL = FBUtilities.getLocalAddress();
+
+    @Test
+    public void testRowIteration() throws IOException, ExecutionException, 
InterruptedException
+    {
+        Table table = Table.open(TABLE1);
+        ColumnFamilyStore store = table.getColumnFamilyStore("Super3");
+
+        final int ROWS_PER_SSTABLE = 10;
+        Set<DecoratedKey> inserted = new HashSet<DecoratedKey>();
+        for (int i = 0; i < ROWS_PER_SSTABLE; i++) {
+            DecoratedKey key = Util.dk(String.valueOf(i));
+            RowMutation rm = new RowMutation(TABLE1, key.key);
+            rm.add(new QueryPath("Super3", "sc".getBytes(), 
String.valueOf(i).getBytes()), new byte[ROWS_PER_SSTABLE * 10 - i * 2], new 
TimestampClock(i));
+            rm.apply();
+            inserted.add(key);
+        }
+        store.forceBlockingFlush();
+        assertEquals(inserted.toString(), inserted.size(), 
Util.getRangeSlice(store).size());
+    }
+}


Reply via email to