Repository: cassandra
Updated Branches:
  refs/heads/trunk beaa2b1b9 -> 0cb5e8032


Added new task to Index which runs before joining

Patch by Sergio Bossa; reviewed by Sam Tunnicliffe for CASSANDRA-12039


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0cb5e803
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0cb5e803
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0cb5e803

Branch: refs/heads/trunk
Commit: 0cb5e8032a2e12831064ab6a9600c235c599a33d
Parents: beaa2b1
Author: Sergio Bossa <sergio.bo...@gmail.com>
Authored: Fri Jun 24 18:09:11 2016 +0100
Committer: Sam Tunnicliffe <s...@beobal.com>
Committed: Tue Sep 20 18:56:45 2016 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 NEWS.txt                                        |  2 +
 src/java/org/apache/cassandra/index/Index.java  | 11 ++++
 .../cassandra/index/SecondaryIndexManager.java  | 11 ++++
 .../cassandra/service/StorageService.java       | 32 +++++++++---
 .../unit/org/apache/cassandra/SchemaLoader.java | 32 ++++++++++++
 .../org/apache/cassandra/index/StubIndex.java   |  9 ++++
 .../cassandra/service/JoinTokenRingTest.java    | 54 ++++++++++++++++++++
 8 files changed, 145 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/0cb5e803/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 74a2372..e847f8b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.10
+ * Add a blocking task to Index, run before joining the ring (CASSANDRA-12039)
  * Fix NPE when using CQLSSTableWriter (CASSANDRA-12667)
  * Support optional backpressure strategies at the coordinator (CASSANDRA-9318)
  * Make randompartitioner work with new vnode allocation (CASSANDRA-12647)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0cb5e803/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index 1b15f7d..708e839 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -18,6 +18,8 @@ using the provided 'sstableupgrade' tool.
 
 New features
 ------------
+   - An Index implementation may now provide a task which runs prior to joining
+     the ring. See CASSANDRA-12039
    - Filtering on partition key columns is now also supported for queries 
without
      secondary indexes.
    - A slow query log has been added: slow queries will be logged at DEBUG 
level.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0cb5e803/src/java/org/apache/cassandra/index/Index.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/Index.java 
b/src/java/org/apache/cassandra/index/Index.java
index 4ffef1e..e254555 100644
--- a/src/java/org/apache/cassandra/index/Index.java
+++ b/src/java/org/apache/cassandra/index/Index.java
@@ -253,6 +253,17 @@ public interface Index
     public Callable<?> getTruncateTask(long truncatedAt);
 
     /**
+     * Return a task to be executed before the node enters NORMAL state and 
finally joins the ring.
+     *
+     * @param hadBootstrap If the node had bootstrap before joining.
+     * @return task to be executed by the index manager before joining the 
ring.
+     */
+    default public Callable<?> getPreJoinTask(boolean hadBootstrap)
+    {
+        return null;
+    }
+
+    /**
      * Return true if this index can be built or rebuilt when the index 
manager determines it is necessary. Returning
      * false enables the index implementation (or some other component) to 
control if and when SSTable data is
      * incorporated into the index.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0cb5e803/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java 
b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
index e06cab0..6e36511 100644
--- a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
+++ b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
@@ -493,6 +493,17 @@ public class SecondaryIndexManager implements IndexRegistry
     }
 
     /**
+     * Performs a blocking execution of pre-join tasks of all indexes
+     */
+    public void executePreJoinTasksBlocking(boolean hadBootstrap)
+    {
+        logger.info("Executing pre-join{} tasks for: {}", hadBootstrap ? " 
post-bootstrap" : "", this.baseCfs);
+        executeAllBlocking(indexes.values().stream(), (index) -> {
+            return index.getPreJoinTask(hadBootstrap);
+        });
+    }
+
+    /**
      * @return all indexes which are marked as built and ready to use
      */
     public List<String> getBuiltIndexNames()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0cb5e803/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java 
b/src/java/org/apache/cassandra/service/StorageService.java
index b33ac0e..24b10ea 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -29,6 +29,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.regex.MatchResult;
 import java.util.regex.Pattern;
+import java.util.stream.StreamSupport;
+
 import javax.annotation.Nullable;
 import javax.management.*;
 import javax.management.openmbean.TabularData;
@@ -38,7 +40,9 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Predicate;
 import com.google.common.collect.*;
 import com.google.common.util.concurrent.*;
+
 import org.apache.commons.lang3.StringUtils;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -858,7 +862,8 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
         }
 
         boolean dataAvailable = true; // make this to false when bootstrap 
streaming failed
-        if (shouldBootstrap())
+        boolean bootstrap = shouldBootstrap();
+        if (bootstrap)
         {
             if (SystemKeyspace.bootstrapInProgress())
                 logger.warn("Detected previous bootstrap failure; retrying");
@@ -1000,8 +1005,7 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
         {
             if (dataAvailable)
             {
-                finishJoiningRing();
-
+                finishJoiningRing(bootstrap);
                 // remove the existing info about the replaced node.
                 if (!current.isEmpty())
                 {
@@ -1034,7 +1038,13 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
         Gossiper.instance.addLocalApplicationState(ApplicationState.RACK, 
StorageService.instance.valueFactory.rack(rack));
     }
 
-    public synchronized void joinRing() throws IOException
+    public void joinRing() throws IOException
+    {
+        SystemKeyspace.BootstrapState state = 
SystemKeyspace.getBootstrapState();
+        joinRing(state.equals(SystemKeyspace.BootstrapState.IN_PROGRESS));
+    }
+
+    private synchronized void joinRing(boolean resumedBootstrap) throws 
IOException
     {
         if (!joined)
         {
@@ -1051,15 +1061,23 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
         else if (isSurveyMode)
         {
             logger.info("Leaving write survey mode and joining ring at 
operator request");
-            finishJoiningRing();
+            finishJoiningRing(resumedBootstrap);
             isSurveyMode = false;
         }
     }
 
-    private void finishJoiningRing()
+    private void executePreJoinTasks(boolean bootstrap)
+    {
+        StreamSupport.stream(ColumnFamilyStore.all().spliterator(), false)
+                .filter(cfs -> 
Schema.instance.getUserKeyspaces().contains(cfs.keyspace.getName()))
+                .forEach(cfs -> 
cfs.indexManager.executePreJoinTasksBlocking(bootstrap));
+    }
+
+    private void finishJoiningRing(boolean didBootstrap)
     {
         // start participating in the ring.
         
SystemKeyspace.setBootstrapState(SystemKeyspace.BootstrapState.COMPLETED);
+        executePreJoinTasks(didBootstrap);
         setTokens(bootstrapTokens);
 
         assert tokenMetadata.sortedTokens().size() > 0;
@@ -1506,7 +1524,7 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
                     try
                     {
                         progressSupport.progress("bootstrap", 
ProgressEvent.createNotification("Joining ring..."));
-                        joinRing();
+                        joinRing(true);
                     }
                     catch (IOException ignore)
                     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0cb5e803/test/unit/org/apache/cassandra/SchemaLoader.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/SchemaLoader.java 
b/test/unit/org/apache/cassandra/SchemaLoader.java
index c178ee0..d9c322f 100644
--- a/test/unit/org/apache/cassandra/SchemaLoader.java
+++ b/test/unit/org/apache/cassandra/SchemaLoader.java
@@ -88,6 +88,7 @@ public class SchemaLoader
         String ks4 = testName + "Keyspace4";
         String ks5 = testName + "Keyspace5";
         String ks6 = testName + "Keyspace6";
+        String ks7 = testName + "Keyspace7";
         String ks_kcs = testName + "KeyCacheSpace";
         String ks_rcs = testName + "RowCacheSpace";
         String ks_ccs = testName + "CounterCacheSpace";
@@ -191,11 +192,17 @@ public class SchemaLoader
         schema.add(KeyspaceMetadata.create(ks5,
                 KeyspaceParams.simple(2),
                 Tables.of(standardCFMD(ks5, "Standard1"))));
+
         // Keyspace 6
         schema.add(KeyspaceMetadata.create(ks6,
                 KeyspaceParams.simple(1),
                 Tables.of(keysIndexCFMD(ks6, "Indexed1", true))));
 
+        // Keyspace 7
+        schema.add(KeyspaceMetadata.create(ks7,
+                KeyspaceParams.simple(1),
+                Tables.of(customIndexCFMD(ks7, "Indexed1"))));
+
         // KeyCacheSpace
         schema.add(KeyspaceMetadata.create(ks_kcs,
                 KeyspaceParams.simple(1),
@@ -455,6 +462,7 @@ public class SchemaLoader
 
         return cfm.compression(getCompressionParameters());
     }
+
     public static CFMetaData keysIndexCFMD(String ksName, String cfName, 
boolean withIndex) throws ConfigurationException
     {
         CFMetaData cfm = CFMetaData.Builder.createDense(ksName, cfName, false, 
false)
@@ -480,6 +488,30 @@ public class SchemaLoader
         return cfm.compression(getCompressionParameters());
     }
 
+    public static CFMetaData customIndexCFMD(String ksName, String cfName) 
throws ConfigurationException
+    {
+        CFMetaData cfm = CFMetaData.Builder.createDense(ksName, cfName, false, 
false)
+                                           .addPartitionKey("key", 
AsciiType.instance)
+                                           .addClusteringColumn("c1", 
AsciiType.instance)
+                                           .addRegularColumn("value", 
LongType.instance)
+                                           .build();
+
+            cfm.indexes(
+                cfm.getIndexes()
+                .with(IndexMetadata.fromIndexTargets(cfm,
+                                                     Collections.singletonList(
+                                                             new 
IndexTarget(new ColumnIdentifier("value", true),
+                                                                             
IndexTarget.Type.VALUES)),
+                                                     "value_index",
+                                                     IndexMetadata.Kind.CUSTOM,
+                                                     Collections.singletonMap(
+                                                             
IndexTarget.CUSTOM_INDEX_OPTION_NAME,
+                                                             
StubIndex.class.getName()))));
+
+
+        return cfm.compression(getCompressionParameters());
+    }
+
     public static CFMetaData jdbcCFMD(String ksName, String cfName, 
AbstractType comp)
     {
         return CFMetaData.Builder.create(ksName, 
cfName).addPartitionKey("key", BytesType.instance)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0cb5e803/test/unit/org/apache/cassandra/index/StubIndex.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/index/StubIndex.java 
b/test/unit/org/apache/cassandra/index/StubIndex.java
index 0b7b32f..92efee5 100644
--- a/test/unit/org/apache/cassandra/index/StubIndex.java
+++ b/test/unit/org/apache/cassandra/index/StubIndex.java
@@ -51,6 +51,7 @@ public class StubIndex implements Index
     public List<Row> rowsInserted = new ArrayList<>();
     public List<Row> rowsDeleted = new ArrayList<>();
     public List<Pair<Row,Row>> rowsUpdated = new ArrayList<>();
+    public volatile boolean preJoinInvocation;
     private IndexMetadata indexMetadata;
     private ColumnFamilyStore baseCfs;
 
@@ -171,6 +172,14 @@ public class StubIndex implements Index
         return null;
     }
 
+    public Callable<?> getPreJoinTask(boolean hadBootstrap)
+    {
+        return () -> {
+            preJoinInvocation = true;
+            return null;
+        };
+    }
+
     public Callable<?> getInvalidateTask()
     {
         return null;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0cb5e803/test/unit/org/apache/cassandra/service/JoinTokenRingTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/JoinTokenRingTest.java 
b/test/unit/org/apache/cassandra/service/JoinTokenRingTest.java
new file mode 100644
index 0000000..866910e
--- /dev/null
+++ b/test/unit/org/apache/cassandra/service/JoinTokenRingTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service;
+
+import java.io.IOException;
+
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.index.SecondaryIndexManager;
+import org.apache.cassandra.index.StubIndex;
+
+public class JoinTokenRingTest
+{
+    @BeforeClass
+    public static void setup() throws ConfigurationException
+    {
+        DatabaseDescriptor.daemonInitialization();
+        SchemaLoader.startGossiper();
+        SchemaLoader.prepareServer();
+        SchemaLoader.schemaDefinition("JoinTokenRingTest");
+    }
+
+    @Test
+    public void testIndexPreJoinInvocation() throws IOException
+    {
+        StorageService ss = StorageService.instance;
+        ss.joinRing();
+
+        SecondaryIndexManager indexManager = 
ColumnFamilyStore.getIfExists("JoinTokenRingTestKeyspace7", 
"Indexed1").indexManager;
+        StubIndex stub = (StubIndex) 
indexManager.getIndexByName("value_index");
+        Assert.assertTrue(stub.preJoinInvocation);
+    }
+}

Reply via email to