Author: jbellis
Date: Wed Sep 22 03:42:09 2010
New Revision: 999744

URL: http://svn.apache.org/viewvc?rev=999744&view=rev
Log:
bug fixes:
- index writes that happen during index-build process
- fix circular Table.open during index creation by passing Table object to CFS 
instead of String name
- avoid writing empty index sstables

patch by jbellis; reviewed by gdusbabek for CASSANDRA-1415


Modified:
    cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
    cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
    cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java
    cassandra/trunk/src/java/org/apache/cassandra/db/Table.java

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=999744&r1=999743&r2=999744&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Wed 
Sep 22 03:42:09 2010
@@ -33,7 +33,6 @@ import javax.management.ObjectName;
 import com.google.common.collect.Iterables;
 import org.apache.commons.collections.IteratorUtils;
 import org.apache.commons.lang.ArrayUtils;
-import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -101,7 +100,7 @@ public class ColumnFamilyStore implement
     
     private Set<Memtable> memtablesPendingFlush = new 
ConcurrentSkipListSet<Memtable>();
 
-    public final String table;
+    public final Table table;
     public final String columnFamily;
     public final IPartitioner partitioner;
     private final String mbeanName;
@@ -131,7 +130,7 @@ public class ColumnFamilyStore implement
     private int minCompactionThreshold;
     private int maxCompactionThreshold;
 
-    private ColumnFamilyStore(String table, String columnFamilyName, 
IPartitioner partitioner, int generation, CFMetaData metadata)
+    private ColumnFamilyStore(Table table, String columnFamilyName, 
IPartitioner partitioner, int generation, CFMetaData metadata)
     {
         assert metadata != null : "null metadata for " + table + ":" + 
columnFamilyName;
         this.table = table;
@@ -149,7 +148,7 @@ public class ColumnFamilyStore implement
         
         // scan for sstables corresponding to this cf and load them
         List<SSTableReader> sstables = new ArrayList<SSTableReader>();
-        for (Map.Entry<Descriptor,Set<Component>> sstableFiles : files(table, 
columnFamilyName, false).entrySet())
+        for (Map.Entry<Descriptor,Set<Component>> sstableFiles : 
files(table.name, columnFamilyName, false).entrySet())
         {
             SSTableReader sstable;
             try
@@ -163,11 +162,11 @@ public class ColumnFamilyStore implement
             }
             sstables.add(sstable);
         }
-        ssTables = new SSTableTracker(table, columnFamilyName);
+        ssTables = new SSTableTracker(table.name, columnFamilyName);
         ssTables.add(sstables);
 
         // create the private ColumnFamilyStores for the secondary column 
indexes
-        indexedColumns = new TreeMap<byte[], 
ColumnFamilyStore>(getComparator());
+        indexedColumns = new ConcurrentSkipListMap<byte[], 
ColumnFamilyStore>(getComparator());
         for (ColumnDefinition info : metadata.column_metadata.values())
         {
             if (info.index_type != null)
@@ -196,32 +195,39 @@ public class ColumnFamilyStore implement
         AbstractType columnComparator = (rowPartitioner instanceof 
OrderPreservingPartitioner || rowPartitioner instanceof ByteOrderedPartitioner)
                                         ? BytesType.instance
                                         : new 
LocalByPartionerType(StorageService.getPartitioner());
-        final CFMetaData indexedCfMetadata = 
CFMetaData.newIndexMetadata(table, columnFamily, info, columnComparator);
+        final CFMetaData indexedCfMetadata = 
CFMetaData.newIndexMetadata(table.name, columnFamily, info, columnComparator);
         ColumnFamilyStore indexedCfs = 
ColumnFamilyStore.createColumnFamilyStore(table,
                                                                                
  indexedCfMetadata.cfName,
                                                                                
  new LocalPartitioner(metadata.column_metadata.get(info.name).validator),
                                                                                
  indexedCfMetadata);
-        if (!SystemTable.isIndexBuilt(table, indexedCfMetadata.cfName))
+        // record that the column is supposed to be indexed, before we start 
building it
+        // (so we don't omit indexing writes that happen during build process)
+        indexedColumns.put(info.name, indexedCfs);
+        if (!SystemTable.isIndexBuilt(table.name, indexedCfMetadata.cfName))
         {
             logger.info("Creating index {}.{}", table, 
indexedCfMetadata.cfName);
-            Runnable runnable = new WrappedRunnable()
+            try
             {
-                public void runMayThrow() throws IOException
-                {
-                    buildSecondaryIndexes(getSSTables(), 
FBUtilities.getSingleColumnSet(info.name));
-                    logger.info("Index {} complete", indexedCfMetadata.cfName);
-                    SystemTable.setIndexBuilt(table, indexedCfMetadata.cfName);
-                }
-            };
-            forceFlush(runnable);
+                forceBlockingFlush();
+            }
+            catch (ExecutionException e)
+            {
+                throw new RuntimeException(e);
+            }
+            catch (InterruptedException e)
+            {
+                throw new AssertionError(e);
+            }
+            buildSecondaryIndexes(getSSTables(), 
FBUtilities.getSingleColumnSet(info.name));
+            logger.info("Index {} complete", indexedCfMetadata.cfName);
+            SystemTable.setIndexBuilt(table.name, indexedCfMetadata.cfName);
         }
-        indexedColumns.put(info.name, indexedCfs);
     }
 
     public void buildSecondaryIndexes(Collection<SSTableReader> sstables, 
SortedSet<byte[]> columns)
     {
         logger.debug("Submitting index build to compactionmanager");
-        Table.IndexBuilder builder = 
Table.open(table).createIndexBuilder(this, columns, new 
ReducingKeyIterator(sstables));
+        Table.IndexBuilder builder = table.createIndexBuilder(this, columns, 
new ReducingKeyIterator(sstables));
         Future future = CompactionManager.instance.submitIndexBuild(this, 
builder);
         try
         {
@@ -305,16 +311,16 @@ public class ColumnFamilyStore implement
         return count > 0 ? (int) (sum / count) : 0;
     }
 
-    public static ColumnFamilyStore createColumnFamilyStore(String table, 
String columnFamily)
+    public static ColumnFamilyStore createColumnFamilyStore(Table table, 
String columnFamily)
     {
-        return createColumnFamilyStore(table, columnFamily, 
StorageService.getPartitioner(), DatabaseDescriptor.getCFMetaData(table, 
columnFamily));
+        return createColumnFamilyStore(table, columnFamily, 
StorageService.getPartitioner(), DatabaseDescriptor.getCFMetaData(table.name, 
columnFamily));
     }
 
-    public static synchronized ColumnFamilyStore 
createColumnFamilyStore(String table, String columnFamily, IPartitioner 
partitioner, CFMetaData metadata)
+    public static synchronized ColumnFamilyStore createColumnFamilyStore(Table 
table, String columnFamily, IPartitioner partitioner, CFMetaData metadata)
     {
         // get the max generation number, to prevent generation conflicts
         List<Integer> generations = new ArrayList<Integer>();
-        for (Descriptor desc : files(table, columnFamily, true).keySet())
+        for (Descriptor desc : files(table.name, columnFamily, true).keySet())
             generations.add(desc.generation);
         Collections.sort(generations);
         int value = (generations.size() > 0) ? 
(generations.get(generations.size() - 1)) : 0;
@@ -411,7 +417,7 @@ public class ColumnFamilyStore implement
     public String getFlushPath()
     {
         long guessedSize = 2 * DatabaseDescriptor.getMemtableThroughput() * 
1024*1024; // 2* adds room for keys, column indexes
-        String location = 
DatabaseDescriptor.getDataFileLocationForTable(table, guessedSize);
+        String location = 
DatabaseDescriptor.getDataFileLocationForTable(table.name, guessedSize);
         if (location == null)
             throw new RuntimeException("Insufficient disk space to flush");
         return getTempSSTablePath(location);
@@ -420,7 +426,7 @@ public class ColumnFamilyStore implement
     public String getTempSSTablePath(String directory)
     {
         Descriptor desc = new Descriptor(new File(directory),
-                                         table,
+                                         table.name,
                                          columnFamily,
                                          fileIndexGenerator.incrementAndGet(),
                                          true);
@@ -428,7 +434,7 @@ public class ColumnFamilyStore implement
     }
 
     /** flush the given memtable and swap in a new one for its CFS, if it 
hasn't been frozen already.  threadsafe. */
-    Future<?> maybeSwitchMemtable(Memtable oldMemtable, final boolean 
writeCommitLog, final Runnable afterFlush)
+    Future<?> maybeSwitchMemtable(Memtable oldMemtable, final boolean 
writeCommitLog)
     {
         /**
          *  If we can get the writelock, that means no new updates can come in 
and 
@@ -446,9 +452,16 @@ public class ColumnFamilyStore implement
             final CommitLogSegment.CommitLogContext ctx = writeCommitLog ? 
CommitLog.instance().getContext() : null;
             logger.info("switching in a fresh Memtable for " + columnFamily + 
" at " + ctx);
 
-            // submit the memtable for any indexed sub-cfses, and our own
-            final CountDownLatch latch = new CountDownLatch(1 + 
indexedColumns.size());
-            for (ColumnFamilyStore cfs : 
Iterables.concat(indexedColumns.values(), Collections.singleton(this)))
+            // submit the memtable for any indexed sub-cfses, and our own.
+            List<ColumnFamilyStore> icc = new 
ArrayList<ColumnFamilyStore>(indexedColumns.size());
+            icc.add(this);
+            for (ColumnFamilyStore indexCfs : indexedColumns.values())
+            {
+                if (!indexCfs.memtable.isClean())
+                    icc.add(indexCfs);
+            }
+            final CountDownLatch latch = new CountDownLatch(icc.size());
+            for (ColumnFamilyStore cfs : icc)
             {
                 submitFlush(cfs.memtable, latch);
                 cfs.memtable = new Memtable(cfs, cfs.partitioner);
@@ -467,8 +480,6 @@ public class ColumnFamilyStore implement
                         // if we're not writing to the commit log, we are 
replaying the log, so marking
                         // the log header with "you can discard anything 
written before the context" is not valid
                         
CommitLog.instance().discardCompletedSegments(metadata.cfId, ctx);
-                        if (afterFlush != null)
-                            afterFlush.run();
                     }
                 }
             });
@@ -498,15 +509,10 @@ public class ColumnFamilyStore implement
 
     public Future<?> forceFlush()
     {
-        return forceFlush(null);
-    }
-
-    public Future<?> forceFlush(Runnable afterFlush)
-    {
         if (memtable.isClean())
             return null;
 
-        return maybeSwitchMemtable(memtable, true, afterFlush);
+        return maybeSwitchMemtable(memtable, true);
     }
 
     public void forceBlockingFlush() throws ExecutionException, 
InterruptedException
@@ -721,11 +727,6 @@ public class ColumnFamilyStore implement
         CompactionManager.instance.submitCleanup(ColumnFamilyStore.this);
     }
 
-    public Table getTable()
-    {
-        return Table.open(table);
-    }
-
     void markCompacted(Collection<SSTableReader> sstables)
     {
         ssTables.markCompacted(sstables);
@@ -1342,7 +1343,7 @@ public class ColumnFamilyStore implement
             {
                 // mkdir
                 File dataDirectory = 
ssTable.getDescriptor().directory.getParentFile();
-                String snapshotDirectoryPath = 
Table.getSnapshotPath(dataDirectory.getAbsolutePath(), table, snapshotName);
+                String snapshotDirectoryPath = 
Table.getSnapshotPath(dataDirectory.getAbsolutePath(), table.name, 
snapshotName);
                 FileUtils.createDirectory(snapshotDirectoryPath);
 
                 // hard links
@@ -1531,7 +1532,7 @@ public class ColumnFamilyStore implement
         // complete as much of the job as possible.  Don't let errors long the 
way prevent as much renaming as possible
         // from happening.
         IOException mostRecentProblem = null;
-        for (File existing : DefsTable.getFiles(table, columnFamily))
+        for (File existing : DefsTable.getFiles(table.name, columnFamily))
         {
             try
             {
@@ -1552,11 +1553,6 @@ public class ColumnFamilyStore implement
         }
     }
 
-    public static Future<?> submitPostFlush(Runnable runnable)
-    {
-        return postFlushExecutor.submit(runnable);
-    }
-
     public long getBloomFilterFalsePositives()
     {
         long count = 0L;

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java?rev=999744&r1=999743&r2=999744&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java Wed 
Sep 22 03:42:09 2010
@@ -220,7 +220,7 @@ public class CompactionManager implement
     {
         // The collection of sstables passed may be empty (but not null); even 
if
         // it is not empty, it may compact down to nothing if all rows are 
deleted.
-        Table table = cfs.getTable();
+        Table table = cfs.table;
         if (DatabaseDescriptor.isSnapshotBeforeCompaction())
             table.snapshot("compact-" + cfs.columnFamily);
         logger.info("Compacting [" + StringUtils.join(sstables, ",") + "]");
@@ -324,7 +324,7 @@ public class CompactionManager implement
     private List<SSTableReader> doAntiCompaction(ColumnFamilyStore cfs, 
Collection<SSTableReader> sstables, Collection<Range> ranges, InetAddress 
target)
             throws IOException
     {
-        Table table = cfs.getTable();
+        Table table = cfs.table;
         logger.info("AntiCompacting [" + StringUtils.join(sstables, ",") + 
"]");
         // Calculate the expected compacted filesize
         long expectedRangeFileSize = 
cfs.getExpectedCompactedFileSize(sstables) / 2;
@@ -396,7 +396,7 @@ public class CompactionManager implement
     private void doCleanupCompaction(ColumnFamilyStore cfs) throws IOException
     {
         Collection<SSTableReader> originalSSTables = cfs.getSSTables();
-        List<SSTableReader> sstables = doAntiCompaction(cfs, originalSSTables, 
StorageService.instance.getLocalRanges(cfs.getTable().name), null);
+        List<SSTableReader> sstables = doAntiCompaction(cfs, originalSSTables, 
StorageService.instance.getLocalRanges(cfs.table.name), null);
         if (!sstables.isEmpty())
         {
             cfs.replaceCompactedSSTables(originalSSTables, sstables);

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java?rev=999744&r1=999743&r2=999744&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java Wed Sep 22 
03:42:09 2010
@@ -194,7 +194,7 @@ public class Memtable implements Compara
 
     public String getTableName()
     {
-        return cfs.getTable().name;
+        return cfs.table.name;
     }
 
     /**

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java?rev=999744&r1=999743&r2=999744&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java Wed Sep 
22 03:42:09 2010
@@ -347,13 +347,20 @@ public class SystemTable
         return cfs.getColumnFamily(filter) != null;
     }
 
-    public static void setIndexBuilt(String table, String indexName) throws 
IOException
+    public static void setIndexBuilt(String table, String indexName)
     {
         ColumnFamily cf = ColumnFamily.create(Table.SYSTEM_TABLE, INDEX_CF);
         cf.addColumn(new Column(indexName.getBytes(UTF_8), 
ArrayUtils.EMPTY_BYTE_ARRAY, new TimestampClock(System.currentTimeMillis())));
         RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, 
table.getBytes(UTF_8));
         rm.add(cf);
-        rm.apply();
+        try
+        {
+            rm.apply();
+        }
+        catch (IOException e)
+        {
+            throw new IOError(e);
+        }
     }
 
     public static class StorageMetadata

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=999744&r1=999743&r2=999744&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Wed Sep 22 
03:42:09 2010
@@ -241,7 +241,10 @@ public class Table
         }
 
         for (CFMetaData cfm : new 
ArrayList<CFMetaData>(DatabaseDescriptor.getTableDefinition(table).cfMetaData().values()))
+        {
+            logger.debug("Initializing {}.{}", name, cfm.cfName);
             initCf(cfm.cfId, cfm.cfName);
+        }
 
         // check 10x as often as the lifetime, so we can exceed lifetime by 
10% at most
         int checkMs = DatabaseDescriptor.getMemtableLifetimeMS() / 10;
@@ -292,7 +295,7 @@ public class Table
     {
         assert !columnFamilyStores.containsKey(cfId) : String.format("tried to 
init %s as %s, but already used by %s",
                                                                      cfName, 
cfId, columnFamilyStores.get(cfId));
-        columnFamilyStores.put(cfId, 
ColumnFamilyStore.createColumnFamilyStore(name, cfName));
+        columnFamilyStores.put(cfId, 
ColumnFamilyStore.createColumnFamilyStore(this, cfName));
     }
     
     public void reloadCf(Integer cfId) throws IOException
@@ -386,7 +389,7 @@ public class Table
 
         // flush memtables that got filled up.  usually mTF will be empty and 
this will be a no-op
         for (Map.Entry<ColumnFamilyStore, Memtable> entry : 
memtablesToFlush.entrySet())
-            entry.getKey().maybeSwitchMemtable(entry.getValue(), 
writeCommitLog, null);
+            entry.getKey().maybeSwitchMemtable(entry.getValue(), 
writeCommitLog);
     }
 
     private static void ignoreObsoleteMutations(ColumnFamily cf, 
AbstractReconciler reconciler, SortedSet<byte[]> mutatedIndexedColumns, 
ColumnFamily oldIndexedColumns)
@@ -485,7 +488,7 @@ public class Table
                 }
 
                 for (Map.Entry<ColumnFamilyStore, Memtable> entry : 
memtablesToFlush.entrySet())
-                    entry.getKey().maybeSwitchMemtable(entry.getValue(), 
false, null);
+                    entry.getKey().maybeSwitchMemtable(entry.getValue(), 
false);
             }
 
             try


Reply via email to