Merge branch 'cassandra-2.0' into cassandra-2.1

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/73952075
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/73952075
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/73952075

Branch: refs/heads/cassandra-2.1
Commit: 73952075253c535b35a42269edc86133a5dd9f6d
Parents: 94c826e 878d616
Author: Yuki Morishita <yu...@apache.org>
Authored: Tue Jul 28 16:33:04 2015 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Tue Jul 28 16:33:04 2015 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 ++
 .../apache/cassandra/db/ColumnFamilyStore.java  |  3 ++-
 .../cassandra/io/sstable/SSTableReader.java     | 28 ++++++++++++--------
 3 files changed, 21 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/73952075/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 1ce95d6,5ce2cc7..c4bb21c
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -15,25 -4,6 +15,26 @@@ Merged from 2.0
   * Complete CASSANDRA-8448 fix (CASSANDRA-9519)
   * Don't include auth credentials in debug log (CASSANDRA-9682)
   * Can't transition from write survey to normal mode (CASSANDRA-9740)
 + * Scrub (recover) sstables even when -Index.db is missing, (CASSANDRA-9591)
 + * Fix growing pending background compaction (CASSANDRA-9662)
++ * Don't track hotness when opening from snapshot for validation 
(CASSANDRA-9382)
 +
 +
 +2.1.8
 + * (cqlsh) Fix bad check for CQL compatibility when DESCRIBE'ing
 +   COMPACT STORAGE tables with no clustering columns
 + * Warn when an extra-large partition is compacted (CASSANDRA-9643)
 + * Eliminate strong self-reference chains in sstable ref tidiers 
(CASSANDRA-9656)
 + * Ensure StreamSession uses canonical sstable reader instances 
(CASSANDRA-9700) 
 + * Ensure memtable book keeping is not corrupted in the event we shrink usage 
(CASSANDRA-9681)
 + * Update internal python driver for cqlsh (CASSANDRA-9064)
 + * Fix IndexOutOfBoundsException when inserting tuple with too many
 +   elements using the string literal notation (CASSANDRA-9559)
 + * Allow JMX over SSL directly from nodetool (CASSANDRA-9090)
 + * Fix incorrect result for IN queries where column not found (CASSANDRA-9540)
 + * Enable describe on indices (CASSANDRA-7814)
 + * ColumnFamilyStore.selectAndReference may block during compaction 
(CASSANDRA-9637)
 +Merged from 2.0:
   * Avoid NPE in AuthSuccess#decode (CASSANDRA-9727)
   * Add listen_address to system.local (CASSANDRA-9603)
   * Bug fixes to resultset metadata construction (CASSANDRA-9636)
@@@ -929,112 -480,10 +930,113 @@@ Merged from 1.2
   * Fix bug with some IN queries missig results (CASSANDRA-7105)
   * Fix availability validation for LOCAL_ONE CL (CASSANDRA-7319)
   * Hint streaming can cause decommission to fail (CASSANDRA-7219)
 - * RepairTask didn't send a correct message on IllegalArgumentException 
(CASSANDRA-7336)
  
  
 -2.0.7
 +2.1.0-beta2
 + * Increase default CL space to 8GB (CASSANDRA-7031)
 + * Add range tombstones to read repair digests (CASSANDRA-6863)
 + * Fix BTree.clear for large updates (CASSANDRA-6943)
 + * Fail write instead of logging a warning when unable to append to CL
 +   (CASSANDRA-6764)
 + * Eliminate possibility of CL segment appearing twice in active list 
 +   (CASSANDRA-6557)
 + * Apply DONTNEED fadvise to commitlog segments (CASSANDRA-6759)
 + * Switch CRC component to Adler and include it for compressed sstables 
 +   (CASSANDRA-4165)
 + * Allow cassandra-stress to set compaction strategy options (CASSANDRA-6451)
 + * Add broadcast_rpc_address option to cassandra.yaml (CASSANDRA-5899)
 + * Auto reload GossipingPropertyFileSnitch config (CASSANDRA-5897)
 + * Fix overflow of memtable_total_space_in_mb (CASSANDRA-6573)
 + * Fix ABTC NPE and apply update function correctly (CASSANDRA-6692)
 + * Allow nodetool to use a file or prompt for password (CASSANDRA-6660)
 + * Fix AIOOBE when concurrently accessing ABSC (CASSANDRA-6742)
 + * Fix assertion error in ALTER TYPE RENAME (CASSANDRA-6705)
 + * Scrub should not always clear out repaired status (CASSANDRA-5351)
 + * Improve handling of range tombstone for wide partitions (CASSANDRA-6446)
 + * Fix ClassCastException for compact table with composites (CASSANDRA-6738)
 + * Fix potentially repairing with wrong nodes (CASSANDRA-6808)
 + * Change caching option syntax (CASSANDRA-6745)
 + * Fix stress to do proper counter reads (CASSANDRA-6835)
 + * Fix help message for stress counter_write (CASSANDRA-6824)
 + * Fix stress smart Thrift client to pick servers correctly (CASSANDRA-6848)
 + * Add logging levels (minimal, normal or verbose) to stress tool 
(CASSANDRA-6849)
 + * Fix race condition in Batch CLE (CASSANDRA-6860)
 + * Improve cleanup/scrub/upgradesstables failure handling (CASSANDRA-6774)
 + * ByteBuffer write() methods for serializing sstables (CASSANDRA-6781)
 + * Proper compare function for CollectionType (CASSANDRA-6783)
 + * Update native server to Netty 4 (CASSANDRA-6236)
 + * Fix off-by-one error in stress (CASSANDRA-6883)
 + * Make OpOrder AutoCloseable (CASSANDRA-6901)
 + * Remove sync repair JMX interface (CASSANDRA-6900)
 + * Add multiple memory allocation options for memtables (CASSANDRA-6689, 6694)
 + * Remove adjusted op rate from stress output (CASSANDRA-6921)
 + * Add optimized CF.hasColumns() implementations (CASSANDRA-6941)
 + * Serialize batchlog mutations with the version of the target node
 +   (CASSANDRA-6931)
 + * Optimize CounterColumn#reconcile() (CASSANDRA-6953)
 + * Properly remove 1.2 sstable support in 2.1 (CASSANDRA-6869)
 + * Lock counter cells, not partitions (CASSANDRA-6880)
 + * Track presence of legacy counter shards in sstables (CASSANDRA-6888)
 + * Ensure safe resource cleanup when replacing sstables (CASSANDRA-6912)
 + * Add failure handler to async callback (CASSANDRA-6747)
 + * Fix AE when closing SSTable without releasing reference (CASSANDRA-7000)
 + * Clean up IndexInfo on keyspace/table drops (CASSANDRA-6924)
 + * Only snapshot relative SSTables when sequential repair (CASSANDRA-7024)
 + * Require nodetool rebuild_index to specify index names (CASSANDRA-7038)
 + * fix cassandra stress errors on reads with native protocol (CASSANDRA-7033)
 + * Use OpOrder to guard sstable references for reads (CASSANDRA-6919)
 + * Preemptive opening of compaction result (CASSANDRA-6916)
 + * Multi-threaded scrub/cleanup/upgradesstables (CASSANDRA-5547)
 + * Optimize cellname comparison (CASSANDRA-6934)
 + * Native protocol v3 (CASSANDRA-6855)
 + * Optimize Cell liveness checks and clean up Cell (CASSANDRA-7119)
 + * Support consistent range movements (CASSANDRA-2434)
++ * Display min timestamp in sstablemetadata viewer (CASSANDRA-6767)
 +Merged from 2.0:
 + * Avoid race-prone second "scrub" of system keyspace (CASSANDRA-6797)
 + * Pool CqlRecordWriter clients by inetaddress rather than Range
 +   (CASSANDRA-6665)
 + * Fix compaction_history timestamps (CASSANDRA-6784)
 + * Compare scores of full replica ordering in DES (CASSANDRA-6683)
 + * fix CME in SessionInfo updateProgress affecting netstats (CASSANDRA-6577)
 + * Allow repairing between specific replicas (CASSANDRA-6440)
 + * Allow per-dc enabling of hints (CASSANDRA-6157)
 + * Add compatibility for Hadoop 0.2.x (CASSANDRA-5201)
 + * Fix EstimatedHistogram races (CASSANDRA-6682)
 + * Failure detector correctly converts initial value to nanos (CASSANDRA-6658)
 + * Add nodetool taketoken to relocate vnodes (CASSANDRA-4445)
 + * Expose bulk loading progress over JMX (CASSANDRA-4757)
 + * Correctly handle null with IF conditions and TTL (CASSANDRA-6623)
 + * Account for range/row tombstones in tombstone drop
 +   time histogram (CASSANDRA-6522)
 + * Stop CommitLogSegment.close() from calling sync() (CASSANDRA-6652)
 + * Make commitlog failure handling configurable (CASSANDRA-6364)
 + * Avoid overlaps in LCS (CASSANDRA-6688)
 + * Improve support for paginating over composites (CASSANDRA-4851)
 + * Fix count(*) queries in a mixed cluster (CASSANDRA-6707)
 + * Improve repair tasks(snapshot, differencing) concurrency (CASSANDRA-6566)
 + * Fix replaying pre-2.0 commit logs (CASSANDRA-6714)
 + * Add static columns to CQL3 (CASSANDRA-6561)
 + * Optimize single partition batch statements (CASSANDRA-6737)
 + * Disallow post-query re-ordering when paging (CASSANDRA-6722)
 + * Fix potential paging bug with deleted columns (CASSANDRA-6748)
 + * Fix NPE on BulkLoader caused by losing StreamEvent (CASSANDRA-6636)
 + * Fix truncating compression metadata (CASSANDRA-6791)
 + * Add CMSClassUnloadingEnabled JVM option (CASSANDRA-6541)
 + * Catch memtable flush exceptions during shutdown (CASSANDRA-6735)
 + * Fix upgradesstables NPE for non-CF-based indexes (CASSANDRA-6645)
 + * Fix UPDATE updating PRIMARY KEY columns implicitly (CASSANDRA-6782)
 + * Fix IllegalArgumentException when updating from 1.2 with SuperColumns
 +   (CASSANDRA-6733)
 + * FBUtilities.singleton() should use the CF comparator (CASSANDRA-6778)
 + * Fix CQLSStableWriter.addRow(Map<String, Object>) (CASSANDRA-6526)
 + * Fix HSHA server introducing corrupt data (CASSANDRA-6285)
 + * Fix CAS conditions for COMPACT STORAGE tables (CASSANDRA-6813)
 + * Starting threads in OutboundTcpConnectionPool constructor causes race 
conditions (CASSANDRA-7177)
 + * Allow overriding cassandra-rackdc.properties file (CASSANDRA-7072)
 + * Set JMX RMI port to 7199 (CASSANDRA-7087)
 + * Use LOCAL_QUORUM for data reads at LOCAL_SERIAL (CASSANDRA-6939)
 + * Log a warning for large batches (CASSANDRA-6487)
   * Put nodes in hibernate when join_ring is false (CASSANDRA-6961)
   * Avoid early loading of non-system keyspaces before compaction-leftovers 
     cleanup at startup (CASSANDRA-6913)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/73952075/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 20e74dc,c125cf0..ad66f8e
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@@ -2353,9 -1890,10 +2353,10 @@@ public class ColumnFamilyStore implemen
                  {
                      if (logger.isDebugEnabled())
                          logger.debug("using snapshot sstable {}", 
entries.getKey());
-                     sstable = SSTableReader.open(entries.getKey(), 
entries.getValue(), metadata, partitioner);
+                     // open without tracking hotness
+                     sstable = SSTableReader.open(entries.getKey(), 
entries.getValue(), metadata, partitioner, true, false);
                      // This is technically not necessary since it's a 
snapshot but makes things easier
 -                    sstable.acquireReference();
 +                    refs.tryRef(sstable);
                  }
                  else if (logger.isDebugEnabled())
                  {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/73952075/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index 92c9b55,39d46e9..32eb1b9
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@@ -164,30 -66,15 +164,35 @@@ public class SSTableReader extends SSTa
      private static final Logger logger = 
LoggerFactory.getLogger(SSTableReader.class);
  
      private static final ScheduledThreadPoolExecutor syncExecutor = new 
ScheduledThreadPoolExecutor(1);
+     static
+     {
+         // Immediately remove readMeter sync task when cancelled.
+         syncExecutor.setRemoveOnCancelPolicy(true);
+     }
      private static final RateLimiter meterSyncThrottle = 
RateLimiter.create(100.0);
  
 +    public static final Comparator<SSTableReader> maxTimestampComparator = 
new Comparator<SSTableReader>()
 +    {
 +        public int compare(SSTableReader o1, SSTableReader o2)
 +        {
 +            long ts1 = o1.getMaxTimestamp();
 +            long ts2 = o2.getMaxTimestamp();
 +            return (ts1 > ts2 ? -1 : (ts1 == ts2 ? 0 : 1));
 +        }
 +    };
 +
 +    public static final Comparator<SSTableReader> sstableComparator = new 
Comparator<SSTableReader>()
 +    {
 +        public int compare(SSTableReader o1, SSTableReader o2)
 +        {
 +            return o1.first.compareTo(o2.first);
 +        }
 +    };
 +
 +    public static final Ordering<SSTableReader> sstableOrdering = 
Ordering.from(sstableComparator);
 +
      /**
 -     * maxDataAge is a timestamp in local server time (e.g. 
System.currentTimeMilli) which represents an uppper bound
 +     * maxDataAge is a timestamp in local server time (e.g. 
System.currentTimeMilli) which represents an upper bound
       * to the newest piece of data stored in the sstable. In other words, 
this sstable does not contain items created
       * later than maxDataAge.
       *
@@@ -377,50 -155,14 +382,50 @@@
          return open(desc, componentsFor(desc), metadata, p);
      }
  
 +    public static SSTableReader open(Descriptor descriptor, Set<Component> 
components, CFMetaData metadata, IPartitioner partitioner) throws IOException
 +    {
-         return open(descriptor, components, metadata, partitioner, true);
++        return open(descriptor, components, metadata, partitioner, true, 
true);
 +    }
 +
 +    // use only for offline or "Standalone" operations
      public static SSTableReader openNoValidation(Descriptor descriptor, 
Set<Component> components, CFMetaData metadata) throws IOException
      {
-         return open(descriptor, components, metadata, 
StorageService.getPartitioner(), false);
+         return open(descriptor, components, metadata, 
StorageService.getPartitioner(), false, false); // do not track hotness
      }
  
 +    /**
 +     * Open SSTable reader to be used in batch mode(such as sstableloader).
 +     *
 +     * @param descriptor
 +     * @param components
 +     * @param metadata
 +     * @param partitioner
 +     * @return opened SSTableReader
 +     * @throws IOException
 +     */
      public static SSTableReader openForBatch(Descriptor descriptor, 
Set<Component> components, CFMetaData metadata, IPartitioner partitioner) 
throws IOException
      {
 -        SSTableMetadata sstableMetadata = openMetadata(descriptor, 
components, partitioner, true);
 +        // Minimum components without which we can't do anything
 +        assert components.contains(Component.DATA) : "Data component is 
missing for sstable" + descriptor;
 +        assert components.contains(Component.PRIMARY_INDEX) : "Primary index 
component is missing for sstable " + descriptor;
 +
 +        Map<MetadataType, MetadataComponent> sstableMetadata = 
descriptor.getMetadataSerializer().deserialize(descriptor,
 +                                                                              
                                 EnumSet.of(MetadataType.VALIDATION, 
MetadataType.STATS));
 +        ValidationMetadata validationMetadata = (ValidationMetadata) 
sstableMetadata.get(MetadataType.VALIDATION);
 +        StatsMetadata statsMetadata = (StatsMetadata) 
sstableMetadata.get(MetadataType.STATS);
 +
 +        // Check if sstable is created using same partitioner.
 +        // Partitioner can be null, which indicates older version of sstable 
or no stats available.
 +        // In that case, we skip the check.
 +        String partitionerName = partitioner.getClass().getCanonicalName();
 +        if (validationMetadata != null && 
!partitionerName.equals(validationMetadata.partitioner))
 +        {
 +            logger.error(String.format("Cannot open %s; partitioner %s does 
not match system partitioner %s.  Note that the default partitioner starting 
with Cassandra 1.2 is Murmur3Partitioner, so you will need to edit that to 
match your old partitioner if upgrading.",
 +                                              descriptor, 
validationMetadata.partitioner, partitionerName));
 +            System.exit(1);
 +        }
 +
 +        logger.info("Opening {} ({} bytes)", descriptor, new 
File(descriptor.filenameFor(Component.DATA)).length());
          SSTableReader sstable = new SSTableReader(descriptor,
                                                    components,
                                                    metadata,
@@@ -432,73 -174,83 +437,74 @@@
          // special implementation of load to use non-pooled SegmentedFile 
builders
          SegmentedFile.Builder ibuilder = new BufferedSegmentedFile.Builder();
          SegmentedFile.Builder dbuilder = sstable.compression
 -                                       ? new CompressedSegmentedFile.Builder()
 +                                       ? new 
CompressedSegmentedFile.Builder(null)
                                         : new BufferedSegmentedFile.Builder();
 -        if (!loadSummary(sstable, ibuilder, dbuilder, sstable.metadata))
 -            sstable.buildSummary(false, ibuilder, dbuilder, false);
 +        if (!sstable.loadSummary(ibuilder, dbuilder))
 +            sstable.buildSummary(false, ibuilder, dbuilder, false, 
Downsampling.BASE_SAMPLING_LEVEL);
          sstable.ifile = 
ibuilder.complete(sstable.descriptor.filenameFor(Component.PRIMARY_INDEX));
          sstable.dfile = 
dbuilder.complete(sstable.descriptor.filenameFor(Component.DATA));
 -
          sstable.bf = FilterFactory.AlwaysPresent;
-         sstable.setup(true);
++        sstable.setup(false);
          return sstable;
      }
  
-     private static SSTableReader open(Descriptor descriptor,
 -    public static SSTableReader open(Descriptor descriptor, Set<Component> 
components, CFMetaData metadata, IPartitioner partitioner) throws IOException
 -    {
 -        // Don't track read rates for tables in the system keyspace and don't 
bother trying to load or persist
 -        // the read meter when in client mode
 -        boolean trackHotness = !(Keyspace.SYSTEM_KS.equals(descriptor.ksname) 
|| Config.isClientMode());
 -        return open(descriptor, components, metadata, partitioner, true, 
trackHotness);
 -    }
 -
+     public static SSTableReader open(Descriptor descriptor,
                                        Set<Component> components,
                                        CFMetaData metadata,
                                        IPartitioner partitioner,
-                                       boolean validate) throws IOException
+                                       boolean validate,
+                                       boolean trackHotness) throws IOException
      {
 -        long start = System.nanoTime();
 -        SSTableMetadata sstableMetadata = openMetadata(descriptor, 
components, partitioner, validate);
 -
 -        SSTableReader sstable = new SSTableReader(descriptor,
 -                                                  components,
 -                                                  metadata,
 -                                                  partitioner,
 -                                                  System.currentTimeMillis(),
 -                                                  sstableMetadata,
 -                                                  trackHotness);
 -
 -        sstable.load();
 -
 -        if (validate)
 -            sstable.validate();
 -
 -        logger.debug("INDEX LOAD TIME for {}: {} ms.", descriptor, 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
 -
 -        if (sstable.getKeyCache() != null)
 -            logger.debug("key cache contains {}/{} keys", 
sstable.getKeyCache().size(), sstable.getKeyCache().getCapacity());
 -
 -        return sstable;
 -    }
 -
 -    private static SSTableMetadata openMetadata(Descriptor descriptor,
 -                                                Set<Component> components,
 -                                                IPartitioner partitioner,
 -                                                boolean primaryIndexRequired) 
throws IOException
 -    {
 -        assert partitioner != null;
          // Minimum components without which we can't do anything
          assert components.contains(Component.DATA) : "Data component is 
missing for sstable" + descriptor;
 +        assert !validate || components.contains(Component.PRIMARY_INDEX) : 
"Primary index component is missing for sstable " + descriptor;
  
 -        assert !primaryIndexRequired || 
components.contains(Component.PRIMARY_INDEX)
 -                : "Primary index component is missing for sstable " + 
descriptor;
 -
 -        logger.info("Opening {} ({} bytes)", descriptor, new 
File(descriptor.filenameFor(COMPONENT_DATA)).length());
 -
 -        SSTableMetadata sstableMetadata = 
SSTableMetadata.serializer.deserialize(descriptor).left;
 +        Map<MetadataType, MetadataComponent> sstableMetadata = 
descriptor.getMetadataSerializer().deserialize(descriptor,
 +                                                                              
                                EnumSet.of(MetadataType.VALIDATION, 
MetadataType.STATS));
 +        ValidationMetadata validationMetadata = (ValidationMetadata) 
sstableMetadata.get(MetadataType.VALIDATION);
 +        StatsMetadata statsMetadata = (StatsMetadata) 
sstableMetadata.get(MetadataType.STATS);
  
          // Check if sstable is created using same partitioner.
          // Partitioner can be null, which indicates older version of sstable 
or no stats available.
          // In that case, we skip the check.
          String partitionerName = partitioner.getClass().getCanonicalName();
 -        if (sstableMetadata.partitioner != null && 
!partitionerName.equals(sstableMetadata.partitioner))
 +        if (validationMetadata != null && 
!partitionerName.equals(validationMetadata.partitioner))
          {
              logger.error(String.format("Cannot open %s; partitioner %s does 
not match system partitioner %s.  Note that the default partitioner starting 
with Cassandra 1.2 is Murmur3Partitioner, so you will need to edit that to 
match your old partitioner if upgrading.",
 -                                       descriptor, 
sstableMetadata.partitioner, partitionerName));
 +                                              descriptor, 
validationMetadata.partitioner, partitionerName));
              System.exit(1);
          }
 -        return sstableMetadata;
 +
 +        logger.info("Opening {} ({} bytes)", descriptor, new 
File(descriptor.filenameFor(Component.DATA)).length());
 +        SSTableReader sstable = new SSTableReader(descriptor,
 +                                                  components,
 +                                                  metadata,
 +                                                  partitioner,
 +                                                  System.currentTimeMillis(),
 +                                                  statsMetadata,
 +                                                  OpenReason.NORMAL);
 +
 +        try
 +        {
 +            // load index and filter
 +            long start = System.nanoTime();
 +            sstable.load(validationMetadata);
 +            logger.debug("INDEX LOAD TIME for {}: {} ms.", descriptor, 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
 +
-             sstable.setup(!validate);
++            sstable.setup(trackHotness);
 +            if (validate)
 +                sstable.validate();
 +
 +            if (sstable.getKeyCache() != null)
 +                logger.debug("key cache contains {}/{} keys", 
sstable.getKeyCache().size(), sstable.getKeyCache().getCapacity());
 +
 +            return sstable;
 +        }
 +        catch (Throwable t)
 +        {
 +            sstable.selfRef().release();
 +            throw t;
 +        }
      }
  
      public static void logOpenException(Descriptor descriptor, IOException e)
@@@ -624,43 -388,35 +630,43 @@@
          this.dfile = dfile;
          this.indexSummary = indexSummary;
          this.bf = bloomFilter;
-         this.setup(false);
++        this.setup(true);
      }
  
 -    /**
 -     * Clean up all opened resources.
 -     *
 -     * @throws IOException
 -     */
 -    public void close() throws IOException
 +    public static long getTotalBytes(Iterable<SSTableReader> sstables)
      {
 -        if (readMeterSyncFuture != null)
 -            readMeterSyncFuture.cancel(true);
 +        long sum = 0;
 +        for (SSTableReader sstable : sstables)
 +            sum += sstable.onDiskLength();
 +        return sum;
 +    }
  
 -        // Force finalizing mmapping if necessary
 +    public static long getTotalUncompressedBytes(Iterable<SSTableReader> 
sstables)
 +    {
 +        long sum = 0;
 +        for (SSTableReader sstable : sstables)
 +            sum += sstable.uncompressedLength();
  
 -        if (null != ifile)
 -            ifile.cleanup();
 +        return sum;
 +    }
  
 -        dfile.cleanup();
 -        // close the BF so it can be opened later.
 -        if (null != bf)
 -            bf.close();
 +    public boolean equals(Object that)
 +    {
 +        return that instanceof SSTableReader && ((SSTableReader) 
that).descriptor.equals(this.descriptor);
 +    }
  
 -        if (null != indexSummary)
 -            indexSummary.close();
 +    public int hashCode()
 +    {
 +        return this.descriptor.hashCode();
      }
  
 -    public void setTrackedBy(DataTracker tracker)
 +    public String getFilename()
 +    {
 +        return dfile.path;
 +    }
 +
 +    public void setupKeyCache()
      {
 -        deletingTask.setTracker(tracker);
          // under normal operation we can do this at any time, but SSTR is 
also used outside C* proper,
          // e.g. by BulkLoader, which does not initialize the cache.  As a 
kludge, we set up the cache
          // here when we know we're being wired into the rest of the server 
infrastructure.
@@@ -2090,155 -1484,73 +2096,155 @@@
      }
  
      /**
 -     * @param sstables
 -     * @return true if all desired references were acquired.  Otherwise, it 
will unreference any partial acquisition, and return false.
 +     * Increment the total row read count and read rate for this SSTable.  
This should not be incremented for range
 +     * slice queries, row cache hits, or non-query reads, like compaction.
       */
 -    public static boolean acquireReferences(Iterable<SSTableReader> sstables)
 +    public void incrementReadCount()
      {
 -        SSTableReader failed = null;
 -        for (SSTableReader sstable : sstables)
 +        if (readMeter != null)
 +            readMeter.mark();
 +    }
 +
 +    public static class SizeComparator implements Comparator<SSTableReader>
 +    {
 +        public int compare(SSTableReader o1, SSTableReader o2)
          {
 -            if (!sstable.acquireReference())
 -            {
 -                failed = sstable;
 -                break;
 -            }
 +            return Longs.compare(o1.onDiskLength(), o2.onDiskLength());
          }
 +    }
  
 -        if (failed == null)
 -            return true;
 +    public Ref<SSTableReader> tryRef()
 +    {
 +        return selfRef.tryRef();
 +    }
  
 -        for (SSTableReader sstable : sstables)
 -        {
 -            if (sstable == failed)
 -                break;
 -            sstable.releaseReference();
 -        }
 -        return false;
 +    public Ref<SSTableReader> selfRef()
 +    {
 +        return selfRef;
      }
  
 -    public static void releaseReferences(Iterable<SSTableReader> sstables)
 +    public Ref<SSTableReader> ref()
      {
 -        for (SSTableReader sstable : sstables)
 -        {
 -            sstable.releaseReference();
 -        }
 +        return selfRef.ref();
      }
  
-     void setup(boolean isOffline)
 -    private void dropPageCache()
++    void setup(boolean trackHotness)
      {
-         tidy.setup(this, isOffline);
 -        dropPageCache(dfile.path);
 -        if (null != ifile)
 -            dropPageCache(ifile.path);
++        tidy.setup(this, trackHotness);
 +        this.readMeter = tidy.global.readMeter;
      }
  
 -    private void dropPageCache(String filePath)
 +    @VisibleForTesting
 +    public void overrideReadMeter(RestorableMeter readMeter)
      {
 -        RandomAccessFile file = null;
 +        this.readMeter = tidy.global.readMeter = readMeter;
 +    }
  
 -        try
 +    /**
 +     * One instance per SSTableReader we create. This references the 
type-shared tidy, which in turn references
 +     * the globally shared tidy, i.e.
 +     *
 +     * InstanceTidier => DescriptorTypeTitdy => GlobalTidy
 +     *
 +     * We can create many InstanceTidiers (one for every time we reopen an 
sstable with MOVED_START for example), but there can only be
 +     * two DescriptorTypeTidy (FINAL and TEMPLINK) and only one GlobalTidy 
for one single logical sstable.
 +     *
 +     * When the InstanceTidier cleansup, it releases its reference to its 
DescriptorTypeTidy; when all InstanceTidiers
 +     * for that type have run, the DescriptorTypeTidy cleansup. 
DescriptorTypeTidy behaves in the same way towards GlobalTidy.
 +     *
 +     * For ease, we stash a direct reference to both our type-shared and 
global tidier
 +     */
 +    private static final class InstanceTidier implements Tidy
 +    {
 +        private final Descriptor descriptor;
 +        private final CFMetaData metadata;
 +        private IFilter bf;
 +        private IndexSummary summary;
 +
 +        private SegmentedFile dfile;
 +        private SegmentedFile ifile;
 +        private Runnable runOnClose;
 +        private boolean isReplaced = false;
 +
 +        // a reference to our shared per-Descriptor.Type tidy instance, that
 +        // we will release when we are ourselves released
 +        private Ref<DescriptorTypeTidy> typeRef;
 +
 +        // a convenience stashing of the shared per-descriptor-type tidy 
instance itself
 +        // and the per-logical-sstable globally shared state that it is 
linked to
 +        private DescriptorTypeTidy type;
 +        private GlobalTidy global;
 +
 +        private boolean setup;
 +
-         void setup(SSTableReader reader, boolean isOffline)
++        void setup(SSTableReader reader, boolean trackHotness)
          {
 -            file = new RandomAccessFile(filePath, "r");
 +            this.setup = true;
 +            this.bf = reader.bf;
 +            this.summary = reader.indexSummary;
 +            this.dfile = reader.dfile;
 +            this.ifile = reader.ifile;
 +            // get a new reference to the shared descriptor-type tidy
 +            this.typeRef = DescriptorTypeTidy.get(reader);
 +            this.type = typeRef.get();
 +            this.global = type.globalRef.get();
-             if (!isOffline)
++            if (trackHotness)
 +                global.ensureReadMeter();
 +        }
  
 -            int fd = CLibrary.getfd(file.getFD());
 +        InstanceTidier(Descriptor descriptor, CFMetaData metadata)
 +        {
 +            this.descriptor = descriptor;
 +            this.metadata = metadata;
 +        }
  
 -            if (fd > 0)
 -            {
 -                if (logger.isDebugEnabled())
 -                    logger.debug(String.format("Dropping page cache of file 
%s.", filePath));
 +        public void tidy()
 +        {
 +            // don't try to cleanup if the sstablereader was never fully 
constructed
 +            if (!setup)
 +                return;
  
 -                CLibrary.trySkipCache(fd, 0, 0);
 +            final ColumnFamilyStore cfs = 
Schema.instance.getColumnFamilyStoreInstance(metadata.cfId);
 +            final OpOrder.Barrier barrier;
 +            if (cfs != null)
 +            {
 +                barrier = cfs.readOrdering.newBarrier();
 +                barrier.issue();
              }
 +            else
 +                barrier = null;
 +
 +            ScheduledExecutors.nonPeriodicTasks.execute(new Runnable()
 +            {
 +                public void run()
 +                {
 +                    if (barrier != null)
 +                        barrier.await();
 +                    if (bf != null)
 +                        bf.close();
 +                    if (summary != null)
 +                        summary.close();
 +                    if (runOnClose != null)
 +                        runOnClose.run();
 +                    if (dfile != null)
 +                        dfile.close();
 +                    if (ifile != null)
 +                        ifile.close();
 +                    typeRef.release();
 +                }
 +            });
          }
 -        catch (IOException e)
 +
 +        public String name()
          {
 -            // we don't care if cache cleanup fails
 +            return descriptor.toString();
          }
 -        finally
 +
 +        void releaseSummary()
          {
 -            FileUtils.closeQuietly(file);
 +            summary.close();
 +            assert summary.isCleanedUp();
 +            summary = null;
          }
      }
  

Reply via email to