Merge branch 'cassandra-2.1' into trunk

Conflicts:
        src/java/org/apache/cassandra/db/compaction/CompactionManager.java
        src/java/org/apache/cassandra/db/compaction/Scrubber.java
        src/java/org/apache/cassandra/service/StorageService.java
        src/java/org/apache/cassandra/streaming/StreamReader.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/201a0551
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/201a0551
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/201a0551

Branch: refs/heads/trunk
Commit: 201a05511791c6ea9adad40c0bab4e1e4714d8ee
Parents: 8badc28 2291a60
Author: Yuki Morishita <[email protected]>
Authored: Wed Nov 19 17:31:34 2014 -0600
Committer: Yuki Morishita <[email protected]>
Committed: Wed Nov 19 17:31:34 2014 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/cassandra/db/ColumnFamilyStore.java  |   5 -
 .../org/apache/cassandra/db/Directories.java    | 161 ++++++++++++-------
 .../db/compaction/CompactionManager.java        |  12 +-
 .../cassandra/db/compaction/Scrubber.java       |   6 +-
 .../cassandra/io/util/DiskAwareRunnable.java    |  14 +-
 .../cassandra/service/StorageService.java       |  20 ---
 .../cassandra/streaming/StreamReader.java       |   2 +-
 .../cassandra/streaming/StreamReceiveTask.java  |   8 +-
 .../apache/cassandra/db/DirectoriesTest.java    | 128 +++++++++++++++
 10 files changed, 252 insertions(+), 105 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/201a0551/CHANGES.txt
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/201a0551/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/201a0551/src/java/org/apache/cassandra/db/Directories.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/201a0551/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 05b916f,61628ff..55311a0
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@@ -1046,78 -989,62 +1048,78 @@@ public class CompactionManager implemen
              if (!new File(sstable.getFilename()).exists())
              {
                  logger.info("Skipping anticompaction for {}, required sstable 
was compacted and is no longer available.", sstable);
 +                i.remove();
                  continue;
              }
 +            if (groupMaxDataAge < sstable.maxDataAge)
 +                groupMaxDataAge = sstable.maxDataAge;
 +        }
 +
 +     
 +        if (anticompactionGroup.size() == 0)
 +        {
 +            logger.info("No valid anticompactions for this group, All 
sstables were compacted and are no longer available");
 +            return 0;
 +        }
  
 -            logger.info("Anticompacting {}", sstable);
 -            Set<SSTableReader> sstableAsSet = new HashSet<>();
 -            sstableAsSet.add(sstable);
 +        logger.info("Anticompacting {}", anticompactionGroup);
 +        Set<SSTableReader> sstableAsSet = new HashSet<>(anticompactionGroup);
  
-         File destination = cfs.directories.getDirectoryForNewSSTables();
 -            File destination = 
cfs.directories.getWriteableLocationAsFile(cfs.getExpectedCompactedFileSize(sstableAsSet,
 OperationType.ANTICOMPACTION));
 -            SSTableRewriter repairedSSTableWriter = new SSTableRewriter(cfs, 
sstableAsSet, sstable.maxDataAge, false);
 -            SSTableRewriter unRepairedSSTableWriter = new 
SSTableRewriter(cfs, sstableAsSet, sstable.maxDataAge, false);
++        File destination = 
cfs.directories.getWriteableLocationAsFile(cfs.getExpectedCompactedFileSize(sstableAsSet,
 OperationType.ANTICOMPACTION));
 +        SSTableRewriter repairedSSTableWriter = new SSTableRewriter(cfs, 
sstableAsSet, groupMaxDataAge, false);
 +        SSTableRewriter unRepairedSSTableWriter = new SSTableRewriter(cfs, 
sstableAsSet, groupMaxDataAge, false);
  
 -            try (AbstractCompactionStrategy.ScannerList scanners = 
cfs.getCompactionStrategy().getScanners(new 
HashSet<>(Collections.singleton(sstable)));
 -                 CompactionController controller = new 
CompactionController(cfs, sstableAsSet, CFMetaData.DEFAULT_GC_GRACE_SECONDS))
 -            {
 -                
repairedSSTableWriter.switchWriter(CompactionManager.createWriter(cfs, 
destination, expectedBloomFilterSize, repairedAt, sstable));
 -                
unRepairedSSTableWriter.switchWriter(CompactionManager.createWriter(cfs, 
destination, expectedBloomFilterSize, ActiveRepairService.UNREPAIRED_SSTABLE, 
sstable));
 +        long repairedKeyCount = 0;
 +        long unrepairedKeyCount = 0;
 +        AbstractCompactionStrategy strategy = cfs.getCompactionStrategy();
 +        try (AbstractCompactionStrategy.ScannerList scanners = 
strategy.getScanners(anticompactionGroup);
 +             CompactionController controller = new CompactionController(cfs, 
sstableAsSet, CFMetaData.DEFAULT_GC_GRACE_SECONDS))
 +        {
 +            int expectedBloomFilterSize = 
Math.max(cfs.metadata.getMinIndexInterval(), 
(int)(SSTableReader.getApproximateKeyCount(anticompactionGroup)));
  
 -                CompactionIterable ci = new 
CompactionIterable(OperationType.ANTICOMPACTION, scanners.scanners, controller);
 -                Iterator<AbstractCompactedRow> iter = ci.iterator();
 -                while(iter.hasNext())
 +            
repairedSSTableWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs,
 destination, expectedBloomFilterSize, repairedAt, sstableAsSet));
 +            
unRepairedSSTableWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs,
 destination, expectedBloomFilterSize, ActiveRepairService.UNREPAIRED_SSTABLE, 
sstableAsSet));
 +
 +            CompactionIterable ci = new 
CompactionIterable(OperationType.ANTICOMPACTION, scanners.scanners, controller, 
DatabaseDescriptor.getSSTableFormat());
 +            Iterator<AbstractCompactedRow> iter = ci.iterator();
 +            while(iter.hasNext())
 +            {
 +                AbstractCompactedRow row = iter.next();
 +                // if current range from sstable is repaired, save it into 
the new repaired sstable
 +                if (Range.isInRanges(row.key.getToken(), ranges))
                  {
 -                    AbstractCompactedRow row = iter.next();
 -                    // if current range from sstable is repaired, save it 
into the new repaired sstable
 -                    if (Range.isInRanges(row.key.getToken(), ranges))
 -                    {
 -                        repairedSSTableWriter.append(row);
 -                        repairedKeyCount++;
 -                    }
 -                    // otherwise save into the new 'non-repaired' table
 -                    else
 -                    {
 -                        unRepairedSSTableWriter.append(row);
 -                        unrepairedKeyCount++;
 -                    }
 +                    repairedSSTableWriter.append(row);
 +                    repairedKeyCount++;
 +                }
 +                // otherwise save into the new 'non-repaired' table
 +                else
 +                {
 +                    unRepairedSSTableWriter.append(row);
 +                    unrepairedKeyCount++;
                  }
 -                // we have the same readers being rewritten by both writers, 
so we ask the first one NOT to close them
 -                // so that the second one can do so safely, without leaving 
us with references < 0 or any other ugliness
 -                // add repaired table with a non-null timestamp field to be 
saved in SSTableMetadata#repairedAt
 -                
anticompactedSSTables.addAll(repairedSSTableWriter.finish(repairedAt));
 -                
anticompactedSSTables.addAll(unRepairedSSTableWriter.finish(ActiveRepairService.UNREPAIRED_SSTABLE));
 -                
cfs.getDataTracker().markCompactedSSTablesReplaced(sstableAsSet, 
anticompactedSSTables, OperationType.ANTICOMPACTION);
 -            }
 -            catch (Throwable e)
 -            {
 -                JVMStabilityInspector.inspectThrowable(e);
 -                logger.error("Error anticompacting " + sstable, e);
 -                repairedSSTableWriter.abort();
 -                unRepairedSSTableWriter.abort();
              }
 +            // we have the same readers being rewritten by both writers, so 
we ask the first one NOT to close them
 +            // so that the second one can do so safely, without leaving us 
with references < 0 or any other ugliness
 +            List<SSTableReader> anticompactedSSTables = new ArrayList<>();
 +            
anticompactedSSTables.addAll(repairedSSTableWriter.finish(repairedAt));
 +            
anticompactedSSTables.addAll(unRepairedSSTableWriter.finish(ActiveRepairService.UNREPAIRED_SSTABLE));
 +            cfs.getDataTracker().markCompactedSSTablesReplaced(sstableAsSet, 
anticompactedSSTables, OperationType.ANTICOMPACTION);
 +
 +            logger.debug("Repaired {} keys out of {} for {}/{} in {}", 
repairedKeyCount,
 +                                                                       
repairedKeyCount + unrepairedKeyCount,
 +                                                                       
cfs.keyspace.getName(),
 +                                                                       
cfs.getColumnFamilyName(),
 +                                                                       
anticompactionGroup);
 +            return anticompactedSSTables.size();
          }
 -        String format = "Repaired {} keys of {} for {}/{}";
 -        logger.debug(format, repairedKeyCount, (repairedKeyCount + 
unrepairedKeyCount), cfs.keyspace, cfs.getColumnFamilyName());
 -        String format2 = "Anticompaction completed successfully, 
anticompacted from {} to {} sstable(s).";
 -        logger.info(format2, repairedSSTables.size(), 
anticompactedSSTables.size());
 -
 -        return anticompactedSSTables;
 +        catch (Throwable e)
 +        {
 +            JVMStabilityInspector.inspectThrowable(e);
 +            logger.error("Error anticompacting " + anticompactionGroup, e);
 +            repairedSSTableWriter.abort();
 +            unRepairedSSTableWriter.abort();
 +        }
 +        return 0;
      }
  
      /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/201a0551/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/Scrubber.java
index aad1bed,2f53ab9..e126239
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@@ -83,11 -80,11 +83,12 @@@ public class Scrubber implements Closea
          this.outputHandler = outputHandler;
          this.skipCorrupted = skipCorrupted;
          this.isOffline = isOffline;
- 
 +        this.rowIndexEntrySerializer = 
sstable.descriptor.version.getSSTableFormat().getIndexSerializer(sstable.metadata);
  
+         List<SSTableReader> toScrub = Collections.singletonList(sstable);
+ 
          // Calculate the expected compacted filesize
-         this.destination = cfs.directories.getDirectoryForNewSSTables();
+         this.destination = 
cfs.directories.getWriteableLocationAsFile(cfs.getExpectedCompactedFileSize(toScrub,
 OperationType.SCRUB));
          if (destination == null)
              throw new IOException("disk full");
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/201a0551/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageService.java
index e9e36f4,79cea8e..d5fa22c
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@@ -3512,26 -3532,11 +3512,6 @@@ public class StorageService extends Not
          }
      }
  
-     public synchronized void requestGC()
-     {
-         if (hasUnreclaimedSpace())
-         {
-             logger.info("requesting GC to free disk space");
-             System.gc();
-             Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
-         }
-     }
- 
-     private boolean hasUnreclaimedSpace()
-     {
-         for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
-         {
-             if (cfs.hasUnreclaimedSpace())
-                 return true;
-         }
-         return false;
-     }
- 
 -    public boolean isClientMode()
 -    {
 -        return isClientMode;
 -    }
 -
      public String getOperationMode()
      {
          return operationMode.toString();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/201a0551/src/java/org/apache/cassandra/streaming/StreamReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamReader.java
index 8c4efcd,c96a925..6144afb
--- a/src/java/org/apache/cassandra/streaming/StreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReader.java
@@@ -119,14 -113,14 +119,14 @@@ public class StreamReade
          }
      }
  
 -    protected SSTableWriter createWriter(ColumnFamilyStore cfs, long 
totalSize, long repairedAt) throws IOException
 +    protected SSTableWriter createWriter(ColumnFamilyStore cfs, long 
totalSize, long repairedAt, SSTableFormat.Type format) throws IOException
      {
-         Directories.DataDirectory localDir = 
cfs.directories.getWriteableLocation();
+         Directories.DataDirectory localDir = 
cfs.directories.getWriteableLocation(totalSize);
          if (localDir == null)
              throw new IOException("Insufficient disk space to store " + 
totalSize + " bytes");
 -        desc = 
Descriptor.fromFilename(cfs.getTempSSTablePath(cfs.directories.getLocationForDisk(localDir)));
 +        desc = 
Descriptor.fromFilename(cfs.getTempSSTablePath(cfs.directories.getLocationForDisk(localDir),
 format));
  
 -        return new SSTableWriter(desc.filenameFor(Component.DATA), 
estimatedKeys, repairedAt);
 +        return SSTableWriter.create(desc, estimatedKeys, repairedAt, 
sstableLevel);
      }
  
      protected void drain(InputStream dis, long bytesRead) throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/201a0551/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/201a0551/test/unit/org/apache/cassandra/db/DirectoriesTest.java
----------------------------------------------------------------------

Reply via email to