Author: jbellis
Date: Wed Sep 22 03:41:18 2010
New Revision: 999739
URL: http://svn.apache.org/viewvc?rev=999739&view=rev
Log:
Table.rebuildIndex
patch by jbellis; reviewed by gdusbabek for CASSANDRA-1415
Added:
cassandra/trunk/src/java/org/apache/cassandra/io/ICompactionInfo.java
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/KeyIterator.java
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java?rev=999739&r1=999738&r2=999739&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java Wed
Sep 22 03:41:18 2010
@@ -495,6 +495,19 @@ public class CompactionManager implement
return tablePairs;
}
+ public Future submitIndexBuild(final ColumnFamilyStore cfs, final
KeyIterator iter)
+ {
+ Runnable runnable = new Runnable()
+ {
+ public void run()
+ {
+ executor.beginCompaction(cfs, iter);
+ Table.open(cfs.table).rebuildIndex(cfs, iter);
+ }
+ };
+ return executor.submit(runnable);
+ }
+
private static class AntiCompactionIterator extends CompactionIterator
{
private Set<SSTableScanner> scanners;
@@ -566,7 +579,7 @@ public class CompactionManager implement
private static class CompactionExecutor extends
DebuggableThreadPoolExecutor
{
private volatile ColumnFamilyStore cfs;
- private volatile CompactionIterator ci;
+ private volatile ICompactionInfo ci;
public CompactionExecutor()
{
@@ -581,7 +594,7 @@ public class CompactionManager implement
ci = null;
}
- void beginCompaction(ColumnFamilyStore cfs, CompactionIterator ci)
+ void beginCompaction(ColumnFamilyStore cfs, ICompactionInfo ci)
{
this.cfs = cfs;
this.ci = ci;
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=999739&r1=999738&r2=999739&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Wed Sep 22
03:41:18 2010
@@ -30,10 +30,10 @@ import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import org.apache.cassandra.config.*;
+import org.apache.cassandra.db.clock.AbstractReconciler;
import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.dht.LocalToken;
-import org.apache.cassandra.io.sstable.Component;
-import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.KeyIterator;
import org.apache.cassandra.io.sstable.SSTableDeletingReference;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.io.util.FileUtils;
@@ -42,6 +42,7 @@ import org.apache.commons.lang.ArrayUtil
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.thrift.ColumnParent;
import org.apache.cassandra.utils.FBUtilities;
import org.cliffc.high_scale_lib.NonBlockingHashMap;
@@ -365,51 +366,11 @@ public class Table
{
synchronized (indexLockFor(mutation.key()))
{
- // read old indexed values
- QueryFilter filter = QueryFilter.getNamesFilter(key,
new QueryPath(cfs.getColumnFamilyName()), mutatedIndexedColumns);
- ColumnFamily oldIndexedColumns =
cfs.getColumnFamily(filter);
-
- // ignore obsolete column updates
- if (oldIndexedColumns != null)
- {
- for (IColumn oldColumn : oldIndexedColumns)
- {
- if (cfs.metadata.reconciler.reconcile((Column)
oldColumn, (Column) cf.getColumn(oldColumn.name())).equals(oldColumn))
- {
- cf.remove(oldColumn.name());
-
mutatedIndexedColumns.remove(oldColumn.name());
- oldIndexedColumns.remove(oldColumn.name());
- }
- }
- }
+ ColumnFamily oldIndexedColumns =
readCurrentIndexedColumns(key, cfs, mutatedIndexedColumns);
+ ignoreObsoleteMutations(cf, cfs.metadata.reconciler,
mutatedIndexedColumns, oldIndexedColumns);
- // apply the mutation
applyCF(cfs, key, cf, memtablesToFlush);
-
- // add new index entries
- for (byte[] columnName : mutatedIndexedColumns)
- {
- IColumn column = cf.getColumn(columnName);
- DecoratedKey<LocalToken> valueKey =
cfs.getIndexKeyFor(columnName, column.value());
- ColumnFamily cfi =
cfs.newIndexedColumnFamily(columnName);
- cfi.addColumn(new Column(mutation.key(),
ArrayUtils.EMPTY_BYTE_ARRAY, column.clock()));
-
applyCF(cfs.getIndexedColumnFamilyStore(columnName), valueKey, cfi,
memtablesToFlush);
- }
-
- // remove the old index entries
- if (oldIndexedColumns != null)
- {
- int localDeletionTime =
(int)(System.currentTimeMillis() / 1000);
- for (Map.Entry<byte[], IColumn> entry :
oldIndexedColumns.getColumnsMap().entrySet())
- {
- byte[] columnName = entry.getKey();
- IColumn column = entry.getValue();
- DecoratedKey<LocalToken> valueKey =
cfs.getIndexKeyFor(columnName, column.value());
- ColumnFamily cfi =
cfs.newIndexedColumnFamily(columnName);
- cfi.deleteColumn(mutation.key(),
localDeletionTime, column.clock());
-
applyCF(cfs.getIndexedColumnFamilyStore(columnName), valueKey, cfi,
memtablesToFlush);
- }
- }
+ applyIndexUpdates(mutation.key(), memtablesToFlush,
cf, cfs, mutatedIndexedColumns, oldIndexedColumns);
}
}
@@ -428,24 +389,84 @@ public class Table
entry.getKey().maybeSwitchMemtable(entry.getValue(),
writeCommitLog);
}
- public void applyIndexedCF(ColumnFamilyStore indexedCfs, DecoratedKey
rowKey, DecoratedKey indexedKey, ColumnFamily indexedColumnFamily)
+ private static void ignoreObsoleteMutations(ColumnFamily cf,
AbstractReconciler reconciler, SortedSet<byte[]> mutatedIndexedColumns,
ColumnFamily oldIndexedColumns)
{
- Memtable memtableToFlush;
- flusherLock.readLock().lock();
- try
+ if (oldIndexedColumns == null)
+ return;
+
+ for (IColumn oldColumn : oldIndexedColumns)
{
- synchronized (indexLockFor(rowKey.key))
+ if (reconciler.reconcile((Column) oldColumn, (Column)
cf.getColumn(oldColumn.name())).equals(oldColumn))
{
- memtableToFlush = indexedCfs.apply(indexedKey,
indexedColumnFamily);
+ cf.remove(oldColumn.name());
+ mutatedIndexedColumns.remove(oldColumn.name());
+ oldIndexedColumns.remove(oldColumn.name());
}
}
- finally
+ }
+
+ private static ColumnFamily readCurrentIndexedColumns(DecoratedKey key,
ColumnFamilyStore cfs, SortedSet<byte[]> mutatedIndexedColumns)
+ {
+ QueryFilter filter = QueryFilter.getNamesFilter(key, new
QueryPath(cfs.getColumnFamilyName()), mutatedIndexedColumns);
+ return cfs.getColumnFamily(filter);
+ }
+
+ private static void applyIndexUpdates(byte[] key,
+ HashMap<ColumnFamilyStore, Memtable>
memtablesToFlush,
+ ColumnFamily cf,
+ ColumnFamilyStore cfs,
+ SortedSet<byte[]>
mutatedIndexedColumns,
+ ColumnFamily oldIndexedColumns)
+ {
+ // add new index entries
+ for (byte[] columnName : mutatedIndexedColumns)
{
- flusherLock.readLock().unlock();
+ IColumn column = cf.getColumn(columnName);
+ DecoratedKey<LocalToken> valueKey = cfs.getIndexKeyFor(columnName,
column.value());
+ ColumnFamily cfi = cfs.newIndexedColumnFamily(columnName);
+ cfi.addColumn(new Column(key, ArrayUtils.EMPTY_BYTE_ARRAY,
column.clock()));
+ applyCF(cfs.getIndexedColumnFamilyStore(columnName), valueKey,
cfi, memtablesToFlush);
}
- if (memtableToFlush != null)
- indexedCfs.maybeSwitchMemtable(memtableToFlush, false);
+ // remove the old index entries
+ if (oldIndexedColumns != null)
+ {
+ int localDeletionTime = (int) (System.currentTimeMillis() / 1000);
+ for (Map.Entry<byte[], IColumn> entry :
oldIndexedColumns.getColumnsMap().entrySet())
+ {
+ byte[] columnName = entry.getKey();
+ IColumn column = entry.getValue();
+ DecoratedKey<LocalToken> valueKey =
cfs.getIndexKeyFor(columnName, column.value());
+ ColumnFamily cfi = cfs.newIndexedColumnFamily(columnName);
+ cfi.deleteColumn(key, localDeletionTime, column.clock());
+ applyCF(cfs.getIndexedColumnFamilyStore(columnName), valueKey,
cfi, memtablesToFlush);
+ }
+ }
+ }
+
+ public void rebuildIndex(ColumnFamilyStore cfs, KeyIterator iter)
+ {
+ while (iter.hasNext())
+ {
+ DecoratedKey key = iter.next();
+ HashMap<ColumnFamilyStore,Memtable> memtablesToFlush = new
HashMap<ColumnFamilyStore, Memtable>(2);
+ flusherLock.readLock().lock();
+ try
+ {
+ synchronized (indexLockFor(key.key))
+ {
+ ColumnFamily cf = readCurrentIndexedColumns(key, cfs,
cfs.getIndexedColumns());
+ applyIndexUpdates(key.key, memtablesToFlush, cf, cfs,
cf.getColumnNames(), null);
+ }
+ }
+ finally
+ {
+ flusherLock.readLock().unlock();
+ }
+
+ for (Map.Entry<ColumnFamilyStore, Memtable> entry :
memtablesToFlush.entrySet())
+ entry.getKey().maybeSwitchMemtable(entry.getValue(), false);
+ }
}
private Object indexLockFor(byte[] key)
Modified:
cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java?rev=999739&r1=999738&r2=999739&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java
Wed Sep 22 03:41:18 2010
@@ -28,6 +28,7 @@ import java.util.ArrayList;
import java.util.Iterator;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,11 +36,11 @@ import org.apache.commons.collections.it
import org.apache.cassandra.utils.ReducingIterator;
import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.io.sstable.SSTableScanner;
-public class CompactionIterator extends
ReducingIterator<SSTableIdentityIterator, AbstractCompactedRow> implements
Closeable
+public class CompactionIterator extends
ReducingIterator<SSTableIdentityIterator, AbstractCompactedRow>
+implements Closeable, ICompactionInfo
{
private static Logger logger =
LoggerFactory.getLogger(CompactionIterator.class);
Added: cassandra/trunk/src/java/org/apache/cassandra/io/ICompactionInfo.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/ICompactionInfo.java?rev=999739&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/ICompactionInfo.java
(added)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/ICompactionInfo.java Wed
Sep 22 03:41:18 2010
@@ -0,0 +1,8 @@
+package org.apache.cassandra.io;
+
+public interface ICompactionInfo
+{
+ public long getTotalBytes();
+
+ public long getBytesRead();
+}
Added: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/KeyIterator.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/KeyIterator.java?rev=999739&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/KeyIterator.java
(added)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/KeyIterator.java
Wed Sep 22 03:41:18 2010
@@ -0,0 +1,64 @@
+package org.apache.cassandra.io.sstable;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOError;
+import java.io.IOException;
+
+import com.google.common.collect.AbstractIterator;
+
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.io.ICompactionInfo;
+import org.apache.cassandra.io.util.BufferedRandomAccessFile;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class KeyIterator extends AbstractIterator<DecoratedKey> implements
ICompactionInfo, Closeable
+{
+ private final BufferedRandomAccessFile in;
+ private final Descriptor desc;
+
+ public KeyIterator(Descriptor desc) throws IOException
+ {
+ this.desc = desc;
+ in = new BufferedRandomAccessFile(new
File(desc.filenameFor(SSTable.COMPONENT_INDEX)), "r");
+ }
+
+ protected DecoratedKey computeNext()
+ {
+ try
+ {
+ if (in.isEOF())
+ return endOfData();
+ DecoratedKey key =
SSTableReader.decodeKey(StorageService.getPartitioner(), desc,
FBUtilities.readShortByteArray(in));
+ in.readLong(); // skip data position
+ return key;
+ }
+ catch (IOException e)
+ {
+ throw new IOError(e);
+ }
+ }
+
+ public void close() throws IOException
+ {
+ in.close();
+ }
+
+ public long getBytesRead()
+ {
+ return in.getFilePointer();
+ }
+
+ public long getTotalBytes()
+ {
+ try
+ {
+ return in.length();
+ }
+ catch (IOException e)
+ {
+ throw new IOError(e);
+ }
+ }
+}
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java?rev=999739&r1=999738&r2=999739&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java Wed
Sep 22 03:41:18 2010
@@ -23,7 +23,8 @@ import java.io.File;
import java.io.FilenameFilter;
import java.io.IOError;
import java.io.IOException;
-import java.util.*;
+import java.util.HashSet;
+import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Modified:
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java?rev=999739&r1=999738&r2=999739&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
Wed Sep 22 03:41:18 2010
@@ -22,8 +22,8 @@ package org.apache.cassandra.io.sstable;
import java.io.*;
import java.util.Set;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
-import org.apache.commons.lang.ArrayUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -225,7 +225,7 @@ public class SSTableWriter extends SSTab
return;
ColumnFamilyStore cfs =
Table.open(desc.ksname).getColumnFamilyStore(desc.cfname);
- Set<byte[]> indexedColumns = cfs.getIndexedColumns();
+
// remove existing files
ifile.delete();
ffile.delete();
@@ -255,54 +255,11 @@ public class SSTableWriter extends SSTab
{
key = SSTableReader.decodeKey(StorageService.getPartitioner(),
desc, FBUtilities.readShortByteArray(dfile));
long dataSize = SSTableReader.readRowSize(dfile, desc);
- if (!indexedColumns.isEmpty())
- {
- // skip bloom filter and column index
- dfile.readFully(new byte[dfile.readInt()]);
- dfile.readFully(new byte[dfile.readInt()]);
-
- // index the column data
- ColumnFamily cf = ColumnFamily.create(desc.ksname,
desc.cfname);
-
ColumnFamily.serializer().deserializeFromSSTableNoColumns(cf, dfile);
- int columns = dfile.readInt();
- for (int i = 0; i < columns; i++)
- {
- IColumn iColumn =
cf.getColumnSerializer().deserialize(dfile);
- if (indexedColumns.contains(iColumn.name()))
- {
- DecoratedKey valueKey =
cfs.getIndexKeyFor(iColumn.name(), iColumn.value());
- ColumnFamily indexedCf =
cfs.newIndexedColumnFamily(iColumn.name());
- indexedCf.addColumn(new Column(key.key,
ArrayUtils.EMPTY_BYTE_ARRAY, iColumn.clock()));
- logger.debug("adding indexed column row mutation
for key {}", valueKey);
-
Table.open(desc.ksname).applyIndexedCF(cfs.getIndexedColumnFamilyStore(iColumn.name()),
- key,
- valueKey,
- indexedCf);
- }
- }
- }
-
iwriter.afterAppend(key, dataPosition);
dataPosition = dfile.getFilePointer() + dataSize;
dfile.seek(dataPosition);
rows++;
}
-
- for (byte[] column : cfs.getIndexedColumns())
- {
- try
- {
-
cfs.getIndexedColumnFamilyStore(column).forceBlockingFlush();
- }
- catch (ExecutionException e)
- {
- throw new RuntimeException(e);
- }
- catch (InterruptedException e)
- {
- throw new AssertionError(e);
- }
- }
}
finally
{
@@ -313,7 +270,26 @@ public class SSTableWriter extends SSTab
}
catch (IOException e)
{
- logger.error("Failed to close data or index file during
recovery of " + desc, e);
+ throw new IOError(e);
+ }
+ }
+
+ if (!cfs.getIndexedColumns().isEmpty())
+ {
+ Future future = CompactionManager.instance.submitIndexBuild(cfs,
new KeyIterator(desc));
+ try
+ {
+ future.get();
+ for (byte[] column : cfs.getIndexedColumns())
+
cfs.getIndexedColumnFamilyStore(column).forceBlockingFlush();
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError(e);
+ }
+ catch (ExecutionException e)
+ {
+ throw new RuntimeException(e);
}
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java?rev=999739&r1=999738&r2=999739&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java Wed
Sep 22 03:41:18 2010
@@ -23,27 +23,20 @@ import java.io.IOException;
import java.io.PrintStream;
import java.util.*;
-import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.commons.cli.*;
+
import org.apache.cassandra.config.ConfigurationException;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.IColumn;
import org.apache.cassandra.db.TimestampClock;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.io.sstable.Component;
-import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.io.sstable.SSTable;
-import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
-import org.apache.cassandra.io.sstable.SSTableReader;
-import org.apache.cassandra.io.sstable.SSTableScanner;
-import org.apache.cassandra.io.util.BufferedRandomAccessFile;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.io.sstable.*;
import static org.apache.cassandra.utils.FBUtilities.bytesToHex;
import static org.apache.cassandra.utils.FBUtilities.hexToBytes;
-import org.apache.commons.cli.*;
/**
* Export SSTables to JSON format.
@@ -158,18 +151,14 @@ public class SSTableExport
public static void enumeratekeys(String ssTableFile, PrintStream outs)
throws IOException
{
- IPartitioner partitioner = StorageService.getPartitioner();
Descriptor desc = Descriptor.fromFilename(ssTableFile);
- BufferedRandomAccessFile input = new
BufferedRandomAccessFile(desc.filenameFor(Component.PRIMARY_INDEX), "r");
- while (!input.isEOF())
+ KeyIterator iter = new KeyIterator(desc);
+ while (iter.hasNext())
{
- DecoratedKey decoratedKey = SSTableReader.decodeKey(partitioner,
- desc,
-
FBUtilities.readShortByteArray(input));
- long dataPosition = input.readLong();
- outs.println(bytesToHex(decoratedKey.key));
+ DecoratedKey key = iter.next();
+ outs.println(bytesToHex(key.key));
}
-
+ iter.close();
outs.flush();
}