Merge branch 'cassandra-2.1' into trunk
Conflicts:
src/java/org/apache/cassandra/db/compaction/CompactionManager.java
src/java/org/apache/cassandra/db/compaction/Scrubber.java
src/java/org/apache/cassandra/service/StorageService.java
src/java/org/apache/cassandra/streaming/StreamReader.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/201a0551
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/201a0551
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/201a0551
Branch: refs/heads/trunk
Commit: 201a05511791c6ea9adad40c0bab4e1e4714d8ee
Parents: 8badc28 2291a60
Author: Yuki Morishita <[email protected]>
Authored: Wed Nov 19 17:31:34 2014 -0600
Committer: Yuki Morishita <[email protected]>
Committed: Wed Nov 19 17:31:34 2014 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/db/ColumnFamilyStore.java | 5 -
.../org/apache/cassandra/db/Directories.java | 161 ++++++++++++-------
.../db/compaction/CompactionManager.java | 12 +-
.../cassandra/db/compaction/Scrubber.java | 6 +-
.../cassandra/io/util/DiskAwareRunnable.java | 14 +-
.../cassandra/service/StorageService.java | 20 ---
.../cassandra/streaming/StreamReader.java | 2 +-
.../cassandra/streaming/StreamReceiveTask.java | 8 +-
.../apache/cassandra/db/DirectoriesTest.java | 128 +++++++++++++++
10 files changed, 252 insertions(+), 105 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/201a0551/CHANGES.txt
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/201a0551/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/201a0551/src/java/org/apache/cassandra/db/Directories.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/201a0551/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 05b916f,61628ff..55311a0
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@@ -1046,78 -989,62 +1048,78 @@@ public class CompactionManager implemen
if (!new File(sstable.getFilename()).exists())
{
logger.info("Skipping anticompaction for {}, required sstable
was compacted and is no longer available.", sstable);
+ i.remove();
continue;
}
+ if (groupMaxDataAge < sstable.maxDataAge)
+ groupMaxDataAge = sstable.maxDataAge;
+ }
+
+
+ if (anticompactionGroup.size() == 0)
+ {
+ logger.info("No valid anticompactions for this group, All
sstables were compacted and are no longer available");
+ return 0;
+ }
- logger.info("Anticompacting {}", sstable);
- Set<SSTableReader> sstableAsSet = new HashSet<>();
- sstableAsSet.add(sstable);
+ logger.info("Anticompacting {}", anticompactionGroup);
+ Set<SSTableReader> sstableAsSet = new HashSet<>(anticompactionGroup);
- File destination = cfs.directories.getDirectoryForNewSSTables();
- File destination =
cfs.directories.getWriteableLocationAsFile(cfs.getExpectedCompactedFileSize(sstableAsSet,
OperationType.ANTICOMPACTION));
- SSTableRewriter repairedSSTableWriter = new SSTableRewriter(cfs,
sstableAsSet, sstable.maxDataAge, false);
- SSTableRewriter unRepairedSSTableWriter = new
SSTableRewriter(cfs, sstableAsSet, sstable.maxDataAge, false);
++ File destination =
cfs.directories.getWriteableLocationAsFile(cfs.getExpectedCompactedFileSize(sstableAsSet,
OperationType.ANTICOMPACTION));
+ SSTableRewriter repairedSSTableWriter = new SSTableRewriter(cfs,
sstableAsSet, groupMaxDataAge, false);
+ SSTableRewriter unRepairedSSTableWriter = new SSTableRewriter(cfs,
sstableAsSet, groupMaxDataAge, false);
- try (AbstractCompactionStrategy.ScannerList scanners =
cfs.getCompactionStrategy().getScanners(new
HashSet<>(Collections.singleton(sstable)));
- CompactionController controller = new
CompactionController(cfs, sstableAsSet, CFMetaData.DEFAULT_GC_GRACE_SECONDS))
- {
-
repairedSSTableWriter.switchWriter(CompactionManager.createWriter(cfs,
destination, expectedBloomFilterSize, repairedAt, sstable));
-
unRepairedSSTableWriter.switchWriter(CompactionManager.createWriter(cfs,
destination, expectedBloomFilterSize, ActiveRepairService.UNREPAIRED_SSTABLE,
sstable));
+ long repairedKeyCount = 0;
+ long unrepairedKeyCount = 0;
+ AbstractCompactionStrategy strategy = cfs.getCompactionStrategy();
+ try (AbstractCompactionStrategy.ScannerList scanners =
strategy.getScanners(anticompactionGroup);
+ CompactionController controller = new CompactionController(cfs,
sstableAsSet, CFMetaData.DEFAULT_GC_GRACE_SECONDS))
+ {
+ int expectedBloomFilterSize =
Math.max(cfs.metadata.getMinIndexInterval(),
(int)(SSTableReader.getApproximateKeyCount(anticompactionGroup)));
- CompactionIterable ci = new
CompactionIterable(OperationType.ANTICOMPACTION, scanners.scanners, controller);
- Iterator<AbstractCompactedRow> iter = ci.iterator();
- while(iter.hasNext())
+
repairedSSTableWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs,
destination, expectedBloomFilterSize, repairedAt, sstableAsSet));
+
unRepairedSSTableWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs,
destination, expectedBloomFilterSize, ActiveRepairService.UNREPAIRED_SSTABLE,
sstableAsSet));
+
+ CompactionIterable ci = new
CompactionIterable(OperationType.ANTICOMPACTION, scanners.scanners, controller,
DatabaseDescriptor.getSSTableFormat());
+ Iterator<AbstractCompactedRow> iter = ci.iterator();
+ while(iter.hasNext())
+ {
+ AbstractCompactedRow row = iter.next();
+ // if current range from sstable is repaired, save it into
the new repaired sstable
+ if (Range.isInRanges(row.key.getToken(), ranges))
{
- AbstractCompactedRow row = iter.next();
- // if current range from sstable is repaired, save it
into the new repaired sstable
- if (Range.isInRanges(row.key.getToken(), ranges))
- {
- repairedSSTableWriter.append(row);
- repairedKeyCount++;
- }
- // otherwise save into the new 'non-repaired' table
- else
- {
- unRepairedSSTableWriter.append(row);
- unrepairedKeyCount++;
- }
+ repairedSSTableWriter.append(row);
+ repairedKeyCount++;
+ }
+ // otherwise save into the new 'non-repaired' table
+ else
+ {
+ unRepairedSSTableWriter.append(row);
+ unrepairedKeyCount++;
}
- // we have the same readers being rewritten by both writers,
so we ask the first one NOT to close them
- // so that the second one can do so safely, without leaving
us with references < 0 or any other ugliness
- // add repaired table with a non-null timestamp field to be
saved in SSTableMetadata#repairedAt
-
anticompactedSSTables.addAll(repairedSSTableWriter.finish(repairedAt));
-
anticompactedSSTables.addAll(unRepairedSSTableWriter.finish(ActiveRepairService.UNREPAIRED_SSTABLE));
-
cfs.getDataTracker().markCompactedSSTablesReplaced(sstableAsSet,
anticompactedSSTables, OperationType.ANTICOMPACTION);
- }
- catch (Throwable e)
- {
- JVMStabilityInspector.inspectThrowable(e);
- logger.error("Error anticompacting " + sstable, e);
- repairedSSTableWriter.abort();
- unRepairedSSTableWriter.abort();
}
+ // we have the same readers being rewritten by both writers, so
we ask the first one NOT to close them
+ // so that the second one can do so safely, without leaving us
with references < 0 or any other ugliness
+ List<SSTableReader> anticompactedSSTables = new ArrayList<>();
+
anticompactedSSTables.addAll(repairedSSTableWriter.finish(repairedAt));
+
anticompactedSSTables.addAll(unRepairedSSTableWriter.finish(ActiveRepairService.UNREPAIRED_SSTABLE));
+ cfs.getDataTracker().markCompactedSSTablesReplaced(sstableAsSet,
anticompactedSSTables, OperationType.ANTICOMPACTION);
+
+ logger.debug("Repaired {} keys out of {} for {}/{} in {}",
repairedKeyCount,
+
repairedKeyCount + unrepairedKeyCount,
+
cfs.keyspace.getName(),
+
cfs.getColumnFamilyName(),
+
anticompactionGroup);
+ return anticompactedSSTables.size();
}
- String format = "Repaired {} keys of {} for {}/{}";
- logger.debug(format, repairedKeyCount, (repairedKeyCount +
unrepairedKeyCount), cfs.keyspace, cfs.getColumnFamilyName());
- String format2 = "Anticompaction completed successfully,
anticompacted from {} to {} sstable(s).";
- logger.info(format2, repairedSSTables.size(),
anticompactedSSTables.size());
-
- return anticompactedSSTables;
+ catch (Throwable e)
+ {
+ JVMStabilityInspector.inspectThrowable(e);
+ logger.error("Error anticompacting " + anticompactionGroup, e);
+ repairedSSTableWriter.abort();
+ unRepairedSSTableWriter.abort();
+ }
+ return 0;
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/201a0551/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/Scrubber.java
index aad1bed,2f53ab9..e126239
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@@ -83,11 -80,11 +83,12 @@@ public class Scrubber implements Closea
this.outputHandler = outputHandler;
this.skipCorrupted = skipCorrupted;
this.isOffline = isOffline;
-
+ this.rowIndexEntrySerializer =
sstable.descriptor.version.getSSTableFormat().getIndexSerializer(sstable.metadata);
+ List<SSTableReader> toScrub = Collections.singletonList(sstable);
+
// Calculate the expected compacted filesize
- this.destination = cfs.directories.getDirectoryForNewSSTables();
+ this.destination =
cfs.directories.getWriteableLocationAsFile(cfs.getExpectedCompactedFileSize(toScrub,
OperationType.SCRUB));
if (destination == null)
throw new IOException("disk full");
http://git-wip-us.apache.org/repos/asf/cassandra/blob/201a0551/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageService.java
index e9e36f4,79cea8e..d5fa22c
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@@ -3512,26 -3532,11 +3512,6 @@@ public class StorageService extends Not
}
}
- public synchronized void requestGC()
- {
- if (hasUnreclaimedSpace())
- {
- logger.info("requesting GC to free disk space");
- System.gc();
- Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
- }
- }
-
- private boolean hasUnreclaimedSpace()
- {
- for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
- {
- if (cfs.hasUnreclaimedSpace())
- return true;
- }
- return false;
- }
-
- public boolean isClientMode()
- {
- return isClientMode;
- }
-
public String getOperationMode()
{
return operationMode.toString();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/201a0551/src/java/org/apache/cassandra/streaming/StreamReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamReader.java
index 8c4efcd,c96a925..6144afb
--- a/src/java/org/apache/cassandra/streaming/StreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReader.java
@@@ -119,14 -113,14 +119,14 @@@ public class StreamReade
}
}
- protected SSTableWriter createWriter(ColumnFamilyStore cfs, long
totalSize, long repairedAt) throws IOException
+ protected SSTableWriter createWriter(ColumnFamilyStore cfs, long
totalSize, long repairedAt, SSTableFormat.Type format) throws IOException
{
- Directories.DataDirectory localDir =
cfs.directories.getWriteableLocation();
+ Directories.DataDirectory localDir =
cfs.directories.getWriteableLocation(totalSize);
if (localDir == null)
throw new IOException("Insufficient disk space to store " +
totalSize + " bytes");
- desc =
Descriptor.fromFilename(cfs.getTempSSTablePath(cfs.directories.getLocationForDisk(localDir)));
+ desc =
Descriptor.fromFilename(cfs.getTempSSTablePath(cfs.directories.getLocationForDisk(localDir),
format));
- return new SSTableWriter(desc.filenameFor(Component.DATA),
estimatedKeys, repairedAt);
+ return SSTableWriter.create(desc, estimatedKeys, repairedAt,
sstableLevel);
}
protected void drain(InputStream dis, long bytesRead) throws IOException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/201a0551/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/201a0551/test/unit/org/apache/cassandra/db/DirectoriesTest.java
----------------------------------------------------------------------