Merge branch 'cassandra-3.0' into cassandra-3.11

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2f268eda
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2f268eda
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2f268eda

Branch: refs/heads/cassandra-3.11
Commit: 2f268eda3d44f8b14b71b7f4b3f4c25e2dfb2c11
Parents: b207f2e 6f90e55
Author: Branimir Lambov <branimir.lam...@datastax.com>
Authored: Tue Dec 6 12:11:15 2016 +0200
Committer: Branimir Lambov <branimir.lam...@datastax.com>
Committed: Tue Dec 6 12:12:19 2016 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/db/ColumnFamilyStore.java  | 41 ++++++++------------
 .../apache/cassandra/index/CustomIndexTest.java | 37 ++++++++++++++++++
 3 files changed, 55 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f268eda/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index c5d2da2,5242adf..6da6b4f
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,113 -1,5 +1,114 @@@
 -3.0.11
 +3.10
 + * Use correct bounds for all-data range when filtering (CASSANDRA-12666)
 + * Remove timing window in test case (CASSANDRA-12875)
 + * Resolve unit testing without JCE security libraries installed 
(CASSANDRA-12945)
 + * Fix inconsistencies in cassandra-stress load balancing policy 
(CASSANDRA-12919)
 + * Fix validation of non-frozen UDT cells (CASSANDRA-12916)
 + * Don't shut down socket input/output on StreamSession (CASSANDRA-12903)
 + * Fix Murmur3PartitionerTest (CASSANDRA-12858)
 + * Move cqlsh syntax rules into separate module and allow easier 
customization (CASSANDRA-12897)
 + * Fix CommitLogSegmentManagerTest (CASSANDRA-12283)
 + * Fix cassandra-stress truncate option (CASSANDRA-12695)
 + * Fix crossNode value when receiving messages (CASSANDRA-12791)
 + * Don't load MX4J beans twice (CASSANDRA-12869)
 + * Extend native protocol request flags, add versions to SUPPORTED, and 
introduce ProtocolVersion enum (CASSANDRA-12838)
 + * Set JOINING mode when running pre-join tasks (CASSANDRA-12836)
 + * remove net.mintern.primitive library due to license issue (CASSANDRA-12845)
 + * Properly format IPv6 addresses when logging JMX service URL 
(CASSANDRA-12454)
 + * Optimize the vnode allocation for single replica per DC (CASSANDRA-12777)
 + * Use non-token restrictions for bounds when token restrictions are 
overridden (CASSANDRA-12419)
 + * Fix CQLSH auto completion for PER PARTITION LIMIT (CASSANDRA-12803)
 + * Use different build directories for Eclipse and Ant (CASSANDRA-12466)
 + * Avoid potential AttributeError in cqlsh due to no table metadata 
(CASSANDRA-12815)
 + * Fix RandomReplicationAwareTokenAllocatorTest.testExistingCluster 
(CASSANDRA-12812)
 + * Upgrade commons-codec to 1.9 (CASSANDRA-12790)
 + * Make the fanout size for LeveledCompactionStrategy to be configurable 
(CASSANDRA-11550)
 + * Add duration data type (CASSANDRA-11873)
 + * Fix timeout in ReplicationAwareTokenAllocatorTest (CASSANDRA-12784)
 + * Improve sum aggregate functions (CASSANDRA-12417)
 + * Make cassandra.yaml docs for batch_size_*_threshold_in_kb reflect changes 
in CASSANDRA-10876 (CASSANDRA-12761)
 + * cqlsh fails to format collections when using aliases (CASSANDRA-11534)
 + * Check for hash conflicts in prepared statements (CASSANDRA-12733)
 + * Exit query parsing upon first error (CASSANDRA-12598)
 + * Fix cassandra-stress to use single seed in UUID generation 
(CASSANDRA-12729)
 + * CQLSSTableWriter does not allow Update statement (CASSANDRA-12450)
 + * Config class uses boxed types but DD exposes primitive types 
(CASSANDRA-12199)
 + * Add pre- and post-shutdown hooks to Storage Service (CASSANDRA-12461)
 + * Add hint delivery metrics (CASSANDRA-12693)
 + * Remove IndexInfo cache from FileIndexInfoRetriever (CASSANDRA-12731)
 + * ColumnIndex does not reuse buffer (CASSANDRA-12502)
 + * cdc column addition still breaks schema migration tasks (CASSANDRA-12697)
 + * Upgrade metrics-reporter dependencies (CASSANDRA-12089)
 + * Tune compaction thread count via nodetool (CASSANDRA-12248)
 + * Add +=/-= shortcut syntax for update queries (CASSANDRA-12232)
 + * Include repair session IDs in repair start message (CASSANDRA-12532)
 + * Add a blocking task to Index, run before joining the ring (CASSANDRA-12039)
 + * Fix NPE when using CQLSSTableWriter (CASSANDRA-12667)
 + * Support optional backpressure strategies at the coordinator 
(CASSANDRA-9318)
 + * Make randompartitioner work with new vnode allocation (CASSANDRA-12647)
 + * Fix cassandra-stress graphing (CASSANDRA-12237)
 + * Allow filtering on partition key columns for queries without secondary 
indexes (CASSANDRA-11031)
 + * Fix Cassandra Stress reporting thread model and precision (CASSANDRA-12585)
 + * Add JMH benchmarks.jar (CASSANDRA-12586)
 + * Cleanup uses of AlterTableStatementColumn (CASSANDRA-12567)
 + * Add keep-alive to streaming (CASSANDRA-11841)
 + * Tracing payload is passed through newSession(..) (CASSANDRA-11706)
 + * avoid deleting non existing sstable files and improve related log messages 
(CASSANDRA-12261)
 + * json/yaml output format for nodetool compactionhistory (CASSANDRA-12486)
 + * Retry all internode messages once after a connection is
 +   closed and reopened (CASSANDRA-12192)
 + * Add support to rebuild from targeted replica (CASSANDRA-9875)
 + * Add sequence distribution type to cassandra stress (CASSANDRA-12490)
 + * "SELECT * FROM foo LIMIT ;" does not error out (CASSANDRA-12154)
 + * Define executeLocally() at the ReadQuery Level (CASSANDRA-12474)
 + * Extend read/write failure messages with a map of replica addresses
 +   to error codes in the v5 native protocol (CASSANDRA-12311)
 + * Fix rebuild of SASI indexes with existing index files (CASSANDRA-12374)
 + * Let DatabaseDescriptor not implicitly startup services (CASSANDRA-9054, 
12550)
 + * Fix clustering indexes in presence of static columns in SASI 
(CASSANDRA-12378)
 + * Fix queries on columns with reversed type on SASI indexes (CASSANDRA-12223)
 + * Added slow query log (CASSANDRA-12403)
 + * Count full coordinated request against timeout (CASSANDRA-12256)
 + * Allow TTL with null value on insert and update (CASSANDRA-12216)
 + * Make decommission operation resumable (CASSANDRA-12008)
 + * Add support to one-way targeted repair (CASSANDRA-9876)
 + * Remove clientutil jar (CASSANDRA-11635)
 + * Fix compaction throughput throttle (CASSANDRA-12366, CASSANDRA-12717)
 + * Delay releasing Memtable memory on flush until PostFlush has finished 
running (CASSANDRA-12358)
 + * Cassandra stress should dump all setting on startup (CASSANDRA-11914)
 + * Make it possible to compact a given token range (CASSANDRA-10643)
 + * Allow updating DynamicEndpointSnitch properties via JMX (CASSANDRA-12179)
 + * Collect metrics on queries by consistency level (CASSANDRA-7384)
 + * Add support for GROUP BY to SELECT statement (CASSANDRA-10707)
 + * Deprecate memtable_cleanup_threshold and update default for 
memtable_flush_writers (CASSANDRA-12228)
 + * Upgrade to OHC 0.4.4 (CASSANDRA-12133)
 + * Add version command to cassandra-stress (CASSANDRA-12258)
 + * Create compaction-stress tool (CASSANDRA-11844)
 + * Garbage-collecting compaction operation and schema option (CASSANDRA-7019)
 + * Add beta protocol flag for v5 native protocol (CASSANDRA-12142)
 + * Support filtering on non-PRIMARY KEY columns in the CREATE
 +   MATERIALIZED VIEW statement's WHERE clause (CASSANDRA-10368)
 + * Unify STDOUT and SYSTEMLOG logback format (CASSANDRA-12004)
 + * COPY FROM should raise error for non-existing input files (CASSANDRA-12174)
 + * Faster write path (CASSANDRA-12269)
 + * Option to leave omitted columns in INSERT JSON unset (CASSANDRA-11424)
 + * Support json/yaml output in nodetool tpstats (CASSANDRA-12035)
 + * Expose metrics for successful/failed authentication attempts 
(CASSANDRA-10635)
 + * Prepend snapshot name with "truncated" or "dropped" when a snapshot
 +   is taken before truncating or dropping a table (CASSANDRA-12178)
 + * Optimize RestrictionSet (CASSANDRA-12153)
 + * cqlsh does not automatically downgrade CQL version (CASSANDRA-12150)
 + * Omit (de)serialization of state variable in UDAs (CASSANDRA-9613)
 + * Create a system table to expose prepared statements (CASSANDRA-8831)
 + * Reuse DataOutputBuffer from ColumnIndex (CASSANDRA-11970)
 + * Remove DatabaseDescriptor dependency from SegmentedFile (CASSANDRA-11580)
 + * Add supplied username to authentication error messages (CASSANDRA-12076)
 + * Remove pre-startup check for open JMX port (CASSANDRA-12074)
 + * Remove compaction Severity from DynamicEndpointSnitch (CASSANDRA-11738)
 + * Restore resumable hints delivery (CASSANDRA-11960)
 + * Properly report LWT contention (CASSANDRA-12626)
 +Merged from 3.0:
+  * Make sure sstables only get committed when it's safe to discard commit log 
records (CASSANDRA-12956)
   * Reject default_time_to_live option when creating or altering MVs 
(CASSANDRA-12868)
   * Nodetool should use a more sane max heap size (CASSANDRA-12739)
   * LocalToken ensures token values are cloned on heap (CASSANDRA-12651)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f268eda/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index f46e6f7,113e10d..881fb00
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@@ -963,37 -920,49 +963,19 @@@ public class ColumnFamilyStore implemen
       * Both synchronises custom secondary indexes and provides ordering 
guarantees for futures on switchMemtable/flush
       * etc, which expect to be able to wait until the flush (and all prior 
flushes) requested have completed.
       */
 -    private final class PostFlush implements Callable<ReplayPosition>
 +    private final class PostFlush implements Callable<CommitLogPosition>
      {
--        final boolean flushSecondaryIndexes;
--        final OpOrder.Barrier writeBarrier;
 -        final CountDownLatch memtablesFlushLatch = new CountDownLatch(1);
 -        final CountDownLatch secondaryIndexFlushLatch = new CountDownLatch(1);
 -        volatile Throwable flushFailure = null;
 +        final CountDownLatch latch = new CountDownLatch(1);
-         volatile Throwable flushFailure = null;
          final List<Memtable> memtables;
++        volatile Throwable flushFailure = null;
  
-         private PostFlush(boolean flushSecondaryIndexes,
-                           OpOrder.Barrier writeBarrier,
 -        private PostFlush(boolean flushSecondaryIndexes, OpOrder.Barrier 
writeBarrier,
--                          List<Memtable> memtables)
++        private PostFlush(List<Memtable> memtables)
          {
--            this.writeBarrier = writeBarrier;
--            this.flushSecondaryIndexes = flushSecondaryIndexes;
              this.memtables = memtables;
          }
  
 -        public ReplayPosition call()
 +        public CommitLogPosition call()
          {
--            writeBarrier.await();
--
--            /**
--             * we can flush 2is as soon as the barrier completes, as they 
will be consistent with (or ahead of) the
--             * flushed memtables and CL position, which is as good as we can 
guarantee.
--             * TODO: SecondaryIndex should support setBarrier(), so custom 
implementations can co-ordinate exactly
--             * with CL as we do with memtables/CFS-backed SecondaryIndexes.
--             */
- 
-             if (flushSecondaryIndexes)
-                 indexManager.flushAllNonCFSBackedIndexesBlocking();
 -            try
 -            {
 -                if (flushSecondaryIndexes)
 -                {
 -                    indexManager.flushAllNonCFSBackedIndexesBlocking();
 -                }
 -            }
 -            catch (Throwable e)
 -            {
 -                flushFailure = merge(flushFailure, e);
 -            }
 -            finally
 -            {
 -                secondaryIndexFlushLatch.countDown();
 -            }
--
              try
              {
                  // we wait on the latch for the commitLogUpperBound to be 
set, and so that waiters
@@@ -1075,10 -1043,9 +1057,10 @@@
  
              // we then issue the barrier; this lets us wait for all 
operations started prior to the barrier to complete;
              // since this happens after wiring up the commitLogUpperBound, we 
also know all operations with earlier
 -            // replay positions have also completed, i.e. the memtables are 
done and ready to flush
 +            // commit log segment position have also completed, i.e. the 
memtables are done and ready to flush
              writeBarrier.issue();
--            postFlush = new PostFlush(!truncate, writeBarrier, memtables);
++            postFlush = new PostFlush(memtables);
 +            postFlushTask = ListenableFutureTask.create(postFlush);
          }
  
          public void run()
@@@ -1096,19 -1063,21 +1078,21 @@@
  
              try
              {
--                for (Memtable memtable : memtables)
--                    flushMemtable(memtable);
++                // Flush "data" memtable with non-cf 2i first;
++                flushMemtable(memtables.get(0), true);
++                for (int i = 1; i < memtables.size(); i++)
++                    flushMemtable(memtables.get(i), false);
              }
 -            catch (Throwable e)
 +            catch (Throwable t)
              {
 -                JVMStabilityInspector.inspectThrowable(e);
 -                // If we weren't killed, try to continue work but do not 
allow CommitLog to be discarded.
 -                postFlush.flushFailure = e;
 +                JVMStabilityInspector.inspectThrowable(t);
 +                postFlush.flushFailure = t;
              }
 -
              // signal the post-flush we've done our work
 -            postFlush.memtablesFlushLatch.countDown();
 +            postFlush.latch.countDown();
          }
  
--        public Collection<SSTableReader> flushMemtable(Memtable memtable)
++        public Collection<SSTableReader> flushMemtable(Memtable memtable, 
boolean flushNonCf2i)
          {
              if (memtable.isClean() || truncate)
              {
@@@ -1117,93 -1086,28 +1101,102 @@@
                  return Collections.emptyList();
              }
  
 -            Collection<SSTableReader> readers = Collections.emptyList();
 -            try (SSTableTxnWriter writer = memtable.flush())
 +            List<Future<SSTableMultiWriter>> futures = new ArrayList<>();
 +            long totalBytesOnDisk = 0;
 +            long maxBytesOnDisk = 0;
 +            long minBytesOnDisk = Long.MAX_VALUE;
 +            List<SSTableReader> sstables = new ArrayList<>();
 +            try (LifecycleTransaction txn = 
LifecycleTransaction.offline(OperationType.FLUSH))
              {
 +                List<Memtable.FlushRunnable> flushRunnables = null;
 +                List<SSTableMultiWriter> flushResults = null;
 +
                  try
                  {
 -                    postFlush.secondaryIndexFlushLatch.await();
 +                    // flush the memtable
 +                    flushRunnables = memtable.flushRunnables(txn);
 +
 +                    for (int i = 0; i < flushRunnables.size(); i++)
 +                        
futures.add(perDiskflushExecutors[i].submit(flushRunnables.get(i)));
 +
++                    /**
++                     * we can flush 2is as soon as the barrier completes, as 
they will be consistent with (or ahead of) the
++                     * flushed memtables and CL position, which is as good as 
we can guarantee.
++                     * TODO: SecondaryIndex should support setBarrier(), so 
custom implementations can co-ordinate exactly
++                     * with CL as we do with memtables/CFS-backed 
SecondaryIndexes.
++                     */
++                    if (flushNonCf2i)
++                        indexManager.flushAllNonCFSBackedIndexesBlocking();
++
 +                    flushResults = 
Lists.newArrayList(FBUtilities.waitOnFutures(futures));
                  }
 -                catch (InterruptedException e)
 +                catch (Throwable t)
                  {
 -                    postFlush.flushFailure = merge(postFlush.flushFailure, e);
 +                    t = memtable.abortRunnables(flushRunnables, t);
 +                    t = txn.abort(t);
 +                    throw Throwables.propagate(t);
                  }
  
 -                if (postFlush.flushFailure == null && writer.getFilePointer() 
> 0)
 -                    // sstables should contain non-repaired data.
 -                    readers = writer.finish(true);
 -                else
 -                    maybeFail(writer.abort(postFlush.flushFailure));
 -            }
 +                try
 +                {
 +                    Iterator<SSTableMultiWriter> writerIterator = 
flushResults.iterator();
 +                    while (writerIterator.hasNext())
 +                    {
 +                        @SuppressWarnings("resource")
 +                        SSTableMultiWriter writer = writerIterator.next();
 +                        if (writer.getFilePointer() > 0)
 +                        {
 +                            writer.setOpenResult(true).prepareToCommit();
 +                        }
 +                        else
 +                        {
 +                            maybeFail(writer.abort(null));
 +                            writerIterator.remove();
 +                        }
 +                    }
 +                }
 +                catch (Throwable t)
 +                {
 +                    for (SSTableMultiWriter writer : flushResults)
 +                        t = writer.abort(t);
 +                    t = txn.abort(t);
 +                    Throwables.propagate(t);
 +                }
 +
 +                txn.prepareToCommit();
 +
 +                Throwable accumulate = null;
 +                for (SSTableMultiWriter writer : flushResults)
 +                    accumulate = writer.commit(accumulate);
  
 -            memtable.cfs.replaceFlushed(memtable, readers);
 +                maybeFail(txn.commit(accumulate));
 +
 +                for (SSTableMultiWriter writer : flushResults)
 +                {
 +                    Collection<SSTableReader> flushedSSTables = 
writer.finished();
 +                    for (SSTableReader sstable : flushedSSTables)
 +                    {
 +                        if (sstable != null)
 +                        {
 +                            sstables.add(sstable);
 +                            long size = sstable.bytesOnDisk();
 +                            totalBytesOnDisk += size;
 +                            maxBytesOnDisk = Math.max(maxBytesOnDisk, size);
 +                            minBytesOnDisk = Math.min(minBytesOnDisk, size);
 +                        }
 +                    }
 +                }
 +            }
 +            memtable.cfs.replaceFlushed(memtable, sstables);
              reclaim(memtable);
 -            return readers;
 +            
memtable.cfs.compactionStrategyManager.compactionLogger.flush(sstables);
 +            logger.debug("Flushed to {} ({} sstables, {}), biggest {}, 
smallest {}",
 +                         sstables,
 +                         sstables.size(),
 +                         FBUtilities.prettyPrintMemory(totalBytesOnDisk),
 +                         FBUtilities.prettyPrintMemory(maxBytesOnDisk),
 +                         FBUtilities.prettyPrintMemory(minBytesOnDisk));
 +            return sstables;
          }
  
          private void reclaim(final Memtable memtable)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f268eda/test/unit/org/apache/cassandra/index/CustomIndexTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/index/CustomIndexTest.java
index 8e1385e,b8e4185..4a43210
--- a/test/unit/org/apache/cassandra/index/CustomIndexTest.java
+++ b/test/unit/org/apache/cassandra/index/CustomIndexTest.java
@@@ -624,6 -624,6 +624,43 @@@ public class CustomIndexTest extends CQ
          assertEquals("bar", 
IndexWithOverloadedValidateOptions.options.get("foo"));
      }
  
++    @Test
++    public void testFailing2iFlush() throws Throwable
++    {
++        createTable("CREATE TABLE %s (pk int PRIMARY KEY, value int)");
++        createIndex("CREATE CUSTOM INDEX IF NOT EXISTS ON %s(value) USING 
'org.apache.cassandra.index.CustomIndexTest$BrokenCustom2I'");
++
++        for (int i = 0; i < 10; i++)
++            execute("INSERT INTO %s (pk, value) VALUES (?, ?)", i, i);
++
++        try
++        {
++            getCurrentColumnFamilyStore().forceBlockingFlush();
++            fail("Exception should have been propagated");
++        }
++        catch (Throwable t)
++        {
++            assertTrue(t.getMessage().contains("Broken2I"));
++        }
++
++        // SSTables remain uncommitted.
++        assertEquals(1, 
getCurrentColumnFamilyStore().getDirectories().getDirectoryForNewSSTables().listFiles().length);
++    }
++
++    // Used for index creation above
++    public static class BrokenCustom2I extends StubIndex
++    {
++        public BrokenCustom2I(ColumnFamilyStore baseCfs, IndexMetadata 
metadata)
++        {
++            super(baseCfs, metadata);
++        }
++
++        public Callable<?> getBlockingFlushTask()
++        {
++            throw new RuntimeException("Broken2I");
++        }
++    }
++
      private void testCreateIndex(String indexName, String... 
targetColumnNames) throws Throwable
      {
          createIndex(String.format("CREATE CUSTOM INDEX %s ON %%s(%s) USING 
'%s'",

Reply via email to