Merge branch 'cassandra-2.2' into cassandra-3.0

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/695065e2
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/695065e2
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/695065e2

Branch: refs/heads/trunk
Commit: 695065e27a16c30019f34fc4c626a1841616d037
Parents: 45d0176 be6e6ea
Author: Stefania Alborghetti <stefania.alborghe...@datastax.com>
Authored: Fri Oct 7 16:51:10 2016 +0800
Committer: Stefania Alborghetti <stefania.alborghe...@datastax.com>
Committed: Fri Oct 7 16:52:01 2016 +0800

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../DebuggableScheduledThreadPoolExecutor.java  |   2 +-
 .../concurrent/ScheduledExecutors.java          |   4 -
 .../db/compaction/CompactionManager.java        | 127 +++++++++++--------
 .../db/lifecycle/LifecycleTransaction.java      |  14 +-
 .../io/sstable/format/SSTableReader.java        |  15 ++-
 .../apache/cassandra/net/MessagingService.java  |   3 +-
 .../cassandra/service/StorageService.java       |  12 +-
 .../org/apache/cassandra/utils/ExpiringMap.java |   4 +-
 9 files changed, 116 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/695065e2/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 827a208,54425fa..894113a
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,19 -1,10 +1,20 @@@
 -2.2.9
 +3.0.10
 + * Make stress use client mode to avoid checking commit log size on startup 
(CASSANDRA-12478)
 + * Fix exceptions with new vnode allocation (CASSANDRA-12715)
 + * Unify drain and shutdown processes (CASSANDRA-12509)
 + * Fix NPE in ComponentOfSlice.isEQ() (CASSANDRA-12706)
 + * Fix failure in LogTransactionTest (CASSANDRA-12632)
 + * Fix potentially incomplete non-frozen UDT values when querying with the
 +   full primary key specified (CASSANDRA-12605)
 + * Skip writing MV mutations to commitlog on mutation.applyUnsafe() 
(CASSANDRA-11670)
 + * Establish consistent distinction between non-existing partition and NULL 
value for LWTs on static columns (CASSANDRA-12060)
 + * Extend ColumnIdentifier.internedInstances key to include the type that 
generated the byte buffer (CASSANDRA-12516)
 + * Backport CASSANDRA-10756 (race condition in NativeTransportService 
shutdown) (CASSANDRA-12472)
 + * If CF has no clustering columns, any row cache is full partition cache 
(CASSANDRA-12499)
 +Merged from 2.2:
+  * Fix leak errors and execution rejected exceptions when draining 
(CASSANDRA-12457)
   * Fix merkle tree depth calculation (CASSANDRA-12580)
   * Make Collections deserialization more robust (CASSANDRA-12618)
 - 
 - 
 -2.2.8
   * Fix exceptions when enabling gossip on nodes that haven't joined the ring 
(CASSANDRA-12253)
   * Fix authentication problem when invoking clqsh copy from a SOURCE command 
(CASSANDRA-12642)
   * Decrement pending range calculator jobs counter in finally block

http://git-wip-us.apache.org/repos/asf/cassandra/blob/695065e2/src/java/org/apache/cassandra/concurrent/DebuggableScheduledThreadPoolExecutor.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/695065e2/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 4d1757e,626bd27..478b896
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@@ -171,17 -164,15 +171,15 @@@ public class CompactionManager implemen
          logger.trace("Scheduling a background task check for {}.{} with {}",
                       cfs.keyspace.getName(),
                       cfs.name,
 -                     cfs.getCompactionStrategy().getName());
 +                     cfs.getCompactionStrategyManager().getName());
-         List<Future<?>> futures = new ArrayList<>();
-         // we must schedule it at least once, otherwise compaction will stop 
for a CF until next flush
-         if (executor.isShutdown())
+ 
+         List<Future<?>> futures = new ArrayList<>(1);
+         Future<?> fut = executor.submitIfRunning(new 
BackgroundCompactionCandidate(cfs), "background task");
+         if (!fut.isCancelled())
          {
-             logger.info("Executor has shut down, not submitting background 
task");
-             return Collections.emptyList();
+             compactingCF.add(cfs);
+             futures.add(fut);
          }
-         compactingCF.add(cfs);
-         futures.add(executor.submit(new BackgroundCompactionCandidate(cfs)));
- 
          return futures;
      }
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/695065e2/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
index 91515aa,a95c4a8..582c9d8
--- a/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
@@@ -200,14 -155,10 +206,16 @@@ public class LifecycleTransaction exten
      public Throwable doCommit(Throwable accumulate)
      {
          assert staged.isEmpty() : "must be no actions introduced between 
prepareToCommit and a commit";
-         logger.trace("Committing update:{}, obsolete:{}", staged.update, 
staged.obsolete);
+ 
+         if (logger.isTraceEnabled())
+             logger.trace("Committing transaction over {} staged: {}, logged: 
{}", originals, staged, logged);
  
 +        // accumulate must be null if we have been used correctly, so fail 
immediately if it is not
 +        maybeFail(accumulate);
 +
 +        // transaction log commit failure means we must abort; safe commit is 
not possible
 +        maybeFail(log.commit(null));
 +
          // this is now the point of no return; we cannot safely rollback, so 
we ignore exceptions until we're done
          // we restore state by obsoleting our obsolete files, releasing our 
references to them, and updating our size
          // and notification status for the obsolete and new files
@@@ -226,12 -175,10 +234,12 @@@
      public Throwable doAbort(Throwable accumulate)
      {
          if (logger.isTraceEnabled())
-             logger.trace("Aborting transaction over {}, with ({},{}) logged 
and ({},{}) staged", originals, logged.update, logged.obsolete, staged.update, 
staged.obsolete);
+             logger.trace("Aborting transaction over {} staged: {}, logged: 
{}", originals, staged, logged);
  
 +        accumulate = abortObsoletion(obsoletions, accumulate);
 +
          if (logged.isEmpty() && staged.isEmpty())
 -            return accumulate;
 +            return log.abort(accumulate);
  
          // mark obsolete all readers that are not versions of those present 
in the original set
          Iterable<SSTableReader> obsolete = 
filterOut(concatUniq(staged.update, logged.update), originals);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/695065e2/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
index 9f2663e,fddf058..9f31af1
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
@@@ -2049,12 -2028,16 +2049,12 @@@ public abstract class SSTableReader ext
          private Runnable runOnClose;
          private boolean isReplaced = false;
  
 -        // a reference to our shared per-Descriptor.Type tidy instance, that
 +        // a reference to our shared tidy instance, that
          // we will release when we are ourselves released
 -        private Ref<DescriptorTypeTidy> typeRef;
 -
 -        // a convenience stashing of the shared per-descriptor-type tidy 
instance itself
 -        // and the per-logical-sstable globally shared state that it is 
linked to
 -        private DescriptorTypeTidy type;
 +        private Ref<GlobalTidy> globalRef;
          private GlobalTidy global;
  
-         private boolean setup;
+         private volatile boolean setup;
  
          void setup(SSTableReader reader, boolean trackHotness)
          {
@@@ -2108,7 -2102,10 +2118,10 @@@
                          dfile.close();
                      if (ifile != null)
                          ifile.close();
 -                    typeRef.release();
 +                    globalRef.release();
+ 
+                     if (logger.isTraceEnabled())
+                         logger.trace("Async instance tidier for {}, 
completed", descriptor);
                  }
              });
          }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/695065e2/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/695065e2/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageService.java
index 0be5d92,db86294..24ad73d
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@@ -4013,143 -3933,88 +4013,149 @@@ public class StorageService extends Not
       */
      public synchronized void drain() throws IOException, 
InterruptedException, ExecutionException
      {
 -        inShutdownHook = true;
 +        drain(false);
 +    }
  
 +    protected synchronized void drain(boolean isFinalShutdown) throws 
IOException, InterruptedException, ExecutionException
 +    {
          ExecutorService counterMutationStage = 
StageManager.getStage(Stage.COUNTER_MUTATION);
 +        ExecutorService viewMutationStage = 
StageManager.getStage(Stage.VIEW_MUTATION);
          ExecutorService mutationStage = StageManager.getStage(Stage.MUTATION);
 -        if (mutationStage.isTerminated() && 
counterMutationStage.isTerminated())
 +
 +        if (mutationStage.isTerminated()
 +            && counterMutationStage.isTerminated()
 +            && viewMutationStage.isTerminated())
          {
 -            logger.warn("Cannot drain node (did it already happen?)");
 +            if (!isFinalShutdown)
 +                logger.warn("Cannot drain node (did it already happen?)");
              return;
          }
 -        setMode(Mode.DRAINING, "starting drain process", true);
 -        shutdownClientServers();
 -        ScheduledExecutors.optionalTasks.shutdown();
 -        Gossiper.instance.stop();
  
 -        setMode(Mode.DRAINING, "shutting down MessageService", false);
 -        MessagingService.instance().shutdown();
 +        assert !isShutdown;
 +        isShutdown = true;
  
 -        setMode(Mode.DRAINING, "clearing mutation stage", false);
 -        counterMutationStage.shutdown();
 -        mutationStage.shutdown();
 -        counterMutationStage.awaitTermination(3600, TimeUnit.SECONDS);
 -        mutationStage.awaitTermination(3600, TimeUnit.SECONDS);
 +        try
 +        {
 +            setMode(Mode.DRAINING, "starting drain process", 
!isFinalShutdown);
  
 -        StorageProxy.instance.verifyNoHintsInProgress();
 +            BatchlogManager.instance.shutdown();
 +            HintsService.instance.pauseDispatch();
  
 -        setMode(Mode.DRAINING, "flushing column families", false);
 -        // count CFs first, since forceFlush could block for the flushWriter 
to get a queue slot empty
 -        totalCFs = 0;
 -        for (Keyspace keyspace : Keyspace.nonSystem())
 -            totalCFs += keyspace.getColumnFamilyStores().size();
 -        remainingCFs = totalCFs;
 -        // flush
 -        List<Future<?>> flushes = new ArrayList<>();
 -        for (Keyspace keyspace : Keyspace.nonSystem())
 -        {
 -            for (ColumnFamilyStore cfs : keyspace.getColumnFamilyStores())
 -                flushes.add(cfs.forceFlush());
 -        }
 -        // wait for the flushes.
 -        // TODO this is a godawful way to track progress, since they flush in 
parallel.  a long one could
 -        // thus make several short ones "instant" if we wait for them later.
 -        for (Future f : flushes)
 -        {
 -            FBUtilities.waitOnFuture(f);
 -            remainingCFs--;
 -        }
 +            if (daemon != null)
 +                shutdownClientServers();
 +            ScheduledExecutors.optionalTasks.shutdown();
 +            Gossiper.instance.stop();
  
 -        BatchlogManager.shutdown();
 +            if (!isFinalShutdown)
 +                setMode(Mode.DRAINING, "shutting down MessageService", false);
 +
 +            // In-progress writes originating here could generate hints to be 
written, so shut down MessagingService
 +            // before mutation stage, so we can get all the hints saved 
before shutting down
 +            MessagingService.instance().shutdown();
 +
 +            if (!isFinalShutdown)
 +                setMode(Mode.DRAINING, "clearing mutation stage", false);
 +            viewMutationStage.shutdown();
 +            counterMutationStage.shutdown();
 +            mutationStage.shutdown();
 +            viewMutationStage.awaitTermination(3600, TimeUnit.SECONDS);
 +            counterMutationStage.awaitTermination(3600, TimeUnit.SECONDS);
 +            mutationStage.awaitTermination(3600, TimeUnit.SECONDS);
 +
 +            StorageProxy.instance.verifyNoHintsInProgress();
 +
 +            if (!isFinalShutdown)
 +                setMode(Mode.DRAINING, "flushing column families", false);
 +
 +            // disable autocompaction - we don't want to start any new 
compactions while we are draining
 +            for (Keyspace keyspace : Keyspace.all())
 +                for (ColumnFamilyStore cfs : keyspace.getColumnFamilyStores())
 +                    cfs.disableAutoCompaction();
 +
 +            // count CFs first, since forceFlush could block for the 
flushWriter to get a queue slot empty
 +            totalCFs = 0;
 +            for (Keyspace keyspace : Keyspace.nonSystem())
 +                totalCFs += keyspace.getColumnFamilyStores().size();
 +            remainingCFs = totalCFs;
 +            // flush
 +            List<Future<?>> flushes = new ArrayList<>();
 +            for (Keyspace keyspace : Keyspace.nonSystem())
 +            {
 +                for (ColumnFamilyStore cfs : keyspace.getColumnFamilyStores())
 +                    flushes.add(cfs.forceFlush());
 +            }
 +            // wait for the flushes.
 +            // TODO this is a godawful way to track progress, since they 
flush in parallel.  a long one could
 +            // thus make several short ones "instant" if we wait for them 
later.
 +            for (Future f : flushes)
 +            {
 +                try
 +                {
 +                    FBUtilities.waitOnFuture(f);
 +                }
 +                catch (Throwable t)
 +                {
 +                    JVMStabilityInspector.inspectThrowable(t);
 +                    // don't let this stop us from shutting down the 
commitlog and other thread pools
 +                    logger.warn("Caught exception while waiting for memtable 
flushes during shutdown hook", t);
 +                }
  
 -        // Interrupt on going compaction and shutdown to prevent further 
compaction
 -        CompactionManager.instance.forceShutdown();
 +                remainingCFs--;
 +            }
-             // flush the system ones after all the rest are done, just in 
case flushing modifies any system state
-             // like CASSANDRA-5151. don't bother with progress tracking since 
system data is tiny.
+ 
 -        // Flush the system tables after all other tables are flushed, just 
in case flushing modifies any system state
 -        // like CASSANDRA-5151. Don't bother with progress tracking since 
system data is tiny.
 -        // Flush system tables after stopping the batchlog manager and 
compactions since they both modify
 -        // system tables (for example compactions can obsolete sstables and 
the tidiers in SSTableReader update
 -        // system tables, see SSTableReader.GlobalTidy)
 -        flushes.clear();
 -        for (Keyspace keyspace : Keyspace.system())
 -        {
 -            for (ColumnFamilyStore cfs : keyspace.getColumnFamilyStores())
 -                flushes.add(cfs.forceFlush());
 -        }
 -        FBUtilities.waitOnFutures(flushes);
++            // Interrupt ongoing compactions and shutdown CM to prevent 
further compactions.
++            CompactionManager.instance.forceShutdown();
++            // Flush the system tables after all other tables are flushed, 
just in case flushing modifies any system state
++            // like CASSANDRA-5151. Don't bother with progress tracking since 
system data is tiny.
++            // Flush system tables after stopping compactions since they 
modify
++            // system tables (for example compactions can obsolete sstables 
and the tidiers in SSTableReader update
++            // system tables, see SSTableReader.GlobalTidy)
 +            flushes.clear();
 +            for (Keyspace keyspace : Keyspace.system())
 +            {
 +                for (ColumnFamilyStore cfs : keyspace.getColumnFamilyStores())
 +                    flushes.add(cfs.forceFlush());
 +            }
 +            FBUtilities.waitOnFutures(flushes);
 +
 +            HintsService.instance.shutdownBlocking();
  
 -        // whilst we've flushed all the CFs, which will have recycled all 
completed segments, we want to ensure
 -        // there are no segments to replay, so we force the recycling of any 
remaining (should be at most one)
 -        CommitLog.instance.forceRecycleAllSegments();
 +            // Interrupt ongoing compactions and shutdown CM to prevent 
further compactions.
 +            CompactionManager.instance.forceShutdown();
  
 -        CommitLog.instance.shutdownBlocking();
 +            // whilst we've flushed all the CFs, which will have recycled all 
completed segments, we want to ensure
 +            // there are no segments to replay, so we force the recycling of 
any remaining (should be at most one)
 +            CommitLog.instance.forceRecycleAllSegments();
  
 -        // wait for miscellaneous tasks like sstable and commitlog segment 
deletion
 -        ScheduledExecutors.nonPeriodicTasks.shutdown();
 -        if (!ScheduledExecutors.nonPeriodicTasks.awaitTermination(1, 
TimeUnit.MINUTES))
 -            logger.warn("Failed to wait for non periodic tasks to shutdown");
 +            CommitLog.instance.shutdownBlocking();
  
 -        ColumnFamilyStore.shutdownPostFlushExecutor();
 +            // wait for miscellaneous tasks like sstable and commitlog 
segment deletion
 +            ScheduledExecutors.nonPeriodicTasks.shutdown();
 +            if (!ScheduledExecutors.nonPeriodicTasks.awaitTermination(1, 
TimeUnit.MINUTES))
-                 logger.warn("Miscellaneous task executor still busy after one 
minute; proceeding with shutdown");
++                logger.warn("Failed to wait for non periodic tasks to 
shutdown");
  
 -        setMode(Mode.DRAINED, true);
 +            ColumnFamilyStore.shutdownPostFlushExecutor();
 +            setMode(Mode.DRAINED, !isFinalShutdown);
 +        }
 +        catch (Throwable t)
 +        {
 +            logger.error("Caught an exception while draining ", t);
 +        }
 +    }
 +
 +    /**
 +     * Some services are shutdown during draining and we should not attempt 
to start them again.
 +     *
 +     * @param service - the name of the service we are trying to start.
 +     * @throws IllegalStateException - an exception that nodetool is able to 
convert into a message to display to the user
 +     */
 +    synchronized void checkServiceAllowedToStart(String service)
 +    {
 +        if (isDraining()) // when draining isShutdown is also true, so we 
check first to return a more accurate message
 +            throw new IllegalStateException(String.format("Unable to start %s 
because the node is draining.", service));
 +
 +        if (isShutdown()) // do not rely on operationMode in case it gets 
changed to decomissioned or other
 +            throw new IllegalStateException(String.format("Unable to start %s 
because the node was drained.", service));
      }
  
      // Never ever do this at home. Used by tests.

Reply via email to