Repository: cassandra Updated Branches: refs/heads/trunk beaa2b1b9 -> 0cb5e8032
Added new task to Index which runs before joining Patch by Sergio Bossa; reviewed by Sam Tunnicliffe for CASSANDRA-12039 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0cb5e803 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0cb5e803 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0cb5e803 Branch: refs/heads/trunk Commit: 0cb5e8032a2e12831064ab6a9600c235c599a33d Parents: beaa2b1 Author: Sergio Bossa <[email protected]> Authored: Fri Jun 24 18:09:11 2016 +0100 Committer: Sam Tunnicliffe <[email protected]> Committed: Tue Sep 20 18:56:45 2016 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + NEWS.txt | 2 + src/java/org/apache/cassandra/index/Index.java | 11 ++++ .../cassandra/index/SecondaryIndexManager.java | 11 ++++ .../cassandra/service/StorageService.java | 32 +++++++++--- .../unit/org/apache/cassandra/SchemaLoader.java | 32 ++++++++++++ .../org/apache/cassandra/index/StubIndex.java | 9 ++++ .../cassandra/service/JoinTokenRingTest.java | 54 ++++++++++++++++++++ 8 files changed, 145 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/0cb5e803/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 74a2372..e847f8b 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.10 + * Add a blocking task to Index, run before joining the ring (CASSANDRA-12039) * Fix NPE when using CQLSSTableWriter (CASSANDRA-12667) * Support optional backpressure strategies at the coordinator (CASSANDRA-9318) * Make randompartitioner work with new vnode allocation (CASSANDRA-12647) http://git-wip-us.apache.org/repos/asf/cassandra/blob/0cb5e803/NEWS.txt ---------------------------------------------------------------------- diff --git a/NEWS.txt b/NEWS.txt index 1b15f7d..708e839 100644 --- a/NEWS.txt +++ b/NEWS.txt @@ -18,6 +18,8 @@ using the provided 'sstableupgrade' tool. New features ------------ + - An Index implementation may now provide a task which runs prior to joining + the ring. See CASSANDRA-12039 - Filtering on partition key columns is now also supported for queries without secondary indexes. - A slow query log has been added: slow queries will be logged at DEBUG level. http://git-wip-us.apache.org/repos/asf/cassandra/blob/0cb5e803/src/java/org/apache/cassandra/index/Index.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/Index.java b/src/java/org/apache/cassandra/index/Index.java index 4ffef1e..e254555 100644 --- a/src/java/org/apache/cassandra/index/Index.java +++ b/src/java/org/apache/cassandra/index/Index.java @@ -253,6 +253,17 @@ public interface Index public Callable<?> getTruncateTask(long truncatedAt); /** + * Return a task to be executed before the node enters NORMAL state and finally joins the ring. + * + * @param hadBootstrap If the node had bootstrap before joining. + * @return task to be executed by the index manager before joining the ring. + */ + default public Callable<?> getPreJoinTask(boolean hadBootstrap) + { + return null; + } + + /** * Return true if this index can be built or rebuilt when the index manager determines it is necessary. Returning * false enables the index implementation (or some other component) to control if and when SSTable data is * incorporated into the index. http://git-wip-us.apache.org/repos/asf/cassandra/blob/0cb5e803/src/java/org/apache/cassandra/index/SecondaryIndexManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java index e06cab0..6e36511 100644 --- a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java +++ b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java @@ -493,6 +493,17 @@ public class SecondaryIndexManager implements IndexRegistry } /** + * Performs a blocking execution of pre-join tasks of all indexes + */ + public void executePreJoinTasksBlocking(boolean hadBootstrap) + { + logger.info("Executing pre-join{} tasks for: {}", hadBootstrap ? " post-bootstrap" : "", this.baseCfs); + executeAllBlocking(indexes.values().stream(), (index) -> { + return index.getPreJoinTask(hadBootstrap); + }); + } + + /** * @return all indexes which are marked as built and ready to use */ public List<String> getBuiltIndexNames() http://git-wip-us.apache.org/repos/asf/cassandra/blob/0cb5e803/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index b33ac0e..24b10ea 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -29,6 +29,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.regex.MatchResult; import java.util.regex.Pattern; +import java.util.stream.StreamSupport; + import javax.annotation.Nullable; import javax.management.*; import javax.management.openmbean.TabularData; @@ -38,7 +40,9 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Predicate; import com.google.common.collect.*; import com.google.common.util.concurrent.*; + import org.apache.commons.lang3.StringUtils; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -858,7 +862,8 @@ public class StorageService extends NotificationBroadcasterSupport implements IE } boolean dataAvailable = true; // make this to false when bootstrap streaming failed - if (shouldBootstrap()) + boolean bootstrap = shouldBootstrap(); + if (bootstrap) { if (SystemKeyspace.bootstrapInProgress()) logger.warn("Detected previous bootstrap failure; retrying"); @@ -1000,8 +1005,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE { if (dataAvailable) { - finishJoiningRing(); - + finishJoiningRing(bootstrap); // remove the existing info about the replaced node. if (!current.isEmpty()) { @@ -1034,7 +1038,13 @@ public class StorageService extends NotificationBroadcasterSupport implements IE Gossiper.instance.addLocalApplicationState(ApplicationState.RACK, StorageService.instance.valueFactory.rack(rack)); } - public synchronized void joinRing() throws IOException + public void joinRing() throws IOException + { + SystemKeyspace.BootstrapState state = SystemKeyspace.getBootstrapState(); + joinRing(state.equals(SystemKeyspace.BootstrapState.IN_PROGRESS)); + } + + private synchronized void joinRing(boolean resumedBootstrap) throws IOException { if (!joined) { @@ -1051,15 +1061,23 @@ public class StorageService extends NotificationBroadcasterSupport implements IE else if (isSurveyMode) { logger.info("Leaving write survey mode and joining ring at operator request"); - finishJoiningRing(); + finishJoiningRing(resumedBootstrap); isSurveyMode = false; } } - private void finishJoiningRing() + private void executePreJoinTasks(boolean bootstrap) + { + StreamSupport.stream(ColumnFamilyStore.all().spliterator(), false) + .filter(cfs -> Schema.instance.getUserKeyspaces().contains(cfs.keyspace.getName())) + .forEach(cfs -> cfs.indexManager.executePreJoinTasksBlocking(bootstrap)); + } + + private void finishJoiningRing(boolean didBootstrap) { // start participating in the ring. SystemKeyspace.setBootstrapState(SystemKeyspace.BootstrapState.COMPLETED); + executePreJoinTasks(didBootstrap); setTokens(bootstrapTokens); assert tokenMetadata.sortedTokens().size() > 0; @@ -1506,7 +1524,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE try { progressSupport.progress("bootstrap", ProgressEvent.createNotification("Joining ring...")); - joinRing(); + joinRing(true); } catch (IOException ignore) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/0cb5e803/test/unit/org/apache/cassandra/SchemaLoader.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/SchemaLoader.java b/test/unit/org/apache/cassandra/SchemaLoader.java index c178ee0..d9c322f 100644 --- a/test/unit/org/apache/cassandra/SchemaLoader.java +++ b/test/unit/org/apache/cassandra/SchemaLoader.java @@ -88,6 +88,7 @@ public class SchemaLoader String ks4 = testName + "Keyspace4"; String ks5 = testName + "Keyspace5"; String ks6 = testName + "Keyspace6"; + String ks7 = testName + "Keyspace7"; String ks_kcs = testName + "KeyCacheSpace"; String ks_rcs = testName + "RowCacheSpace"; String ks_ccs = testName + "CounterCacheSpace"; @@ -191,11 +192,17 @@ public class SchemaLoader schema.add(KeyspaceMetadata.create(ks5, KeyspaceParams.simple(2), Tables.of(standardCFMD(ks5, "Standard1")))); + // Keyspace 6 schema.add(KeyspaceMetadata.create(ks6, KeyspaceParams.simple(1), Tables.of(keysIndexCFMD(ks6, "Indexed1", true)))); + // Keyspace 7 + schema.add(KeyspaceMetadata.create(ks7, + KeyspaceParams.simple(1), + Tables.of(customIndexCFMD(ks7, "Indexed1")))); + // KeyCacheSpace schema.add(KeyspaceMetadata.create(ks_kcs, KeyspaceParams.simple(1), @@ -455,6 +462,7 @@ public class SchemaLoader return cfm.compression(getCompressionParameters()); } + public static CFMetaData keysIndexCFMD(String ksName, String cfName, boolean withIndex) throws ConfigurationException { CFMetaData cfm = CFMetaData.Builder.createDense(ksName, cfName, false, false) @@ -480,6 +488,30 @@ public class SchemaLoader return cfm.compression(getCompressionParameters()); } + public static CFMetaData customIndexCFMD(String ksName, String cfName) throws ConfigurationException + { + CFMetaData cfm = CFMetaData.Builder.createDense(ksName, cfName, false, false) + .addPartitionKey("key", AsciiType.instance) + .addClusteringColumn("c1", AsciiType.instance) + .addRegularColumn("value", LongType.instance) + .build(); + + cfm.indexes( + cfm.getIndexes() + .with(IndexMetadata.fromIndexTargets(cfm, + Collections.singletonList( + new IndexTarget(new ColumnIdentifier("value", true), + IndexTarget.Type.VALUES)), + "value_index", + IndexMetadata.Kind.CUSTOM, + Collections.singletonMap( + IndexTarget.CUSTOM_INDEX_OPTION_NAME, + StubIndex.class.getName())))); + + + return cfm.compression(getCompressionParameters()); + } + public static CFMetaData jdbcCFMD(String ksName, String cfName, AbstractType comp) { return CFMetaData.Builder.create(ksName, cfName).addPartitionKey("key", BytesType.instance) http://git-wip-us.apache.org/repos/asf/cassandra/blob/0cb5e803/test/unit/org/apache/cassandra/index/StubIndex.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/index/StubIndex.java b/test/unit/org/apache/cassandra/index/StubIndex.java index 0b7b32f..92efee5 100644 --- a/test/unit/org/apache/cassandra/index/StubIndex.java +++ b/test/unit/org/apache/cassandra/index/StubIndex.java @@ -51,6 +51,7 @@ public class StubIndex implements Index public List<Row> rowsInserted = new ArrayList<>(); public List<Row> rowsDeleted = new ArrayList<>(); public List<Pair<Row,Row>> rowsUpdated = new ArrayList<>(); + public volatile boolean preJoinInvocation; private IndexMetadata indexMetadata; private ColumnFamilyStore baseCfs; @@ -171,6 +172,14 @@ public class StubIndex implements Index return null; } + public Callable<?> getPreJoinTask(boolean hadBootstrap) + { + return () -> { + preJoinInvocation = true; + return null; + }; + } + public Callable<?> getInvalidateTask() { return null; http://git-wip-us.apache.org/repos/asf/cassandra/blob/0cb5e803/test/unit/org/apache/cassandra/service/JoinTokenRingTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/JoinTokenRingTest.java b/test/unit/org/apache/cassandra/service/JoinTokenRingTest.java new file mode 100644 index 0000000..866910e --- /dev/null +++ b/test/unit/org/apache/cassandra/service/JoinTokenRingTest.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.service; + +import java.io.IOException; + +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.index.SecondaryIndexManager; +import org.apache.cassandra.index.StubIndex; + +public class JoinTokenRingTest +{ + @BeforeClass + public static void setup() throws ConfigurationException + { + DatabaseDescriptor.daemonInitialization(); + SchemaLoader.startGossiper(); + SchemaLoader.prepareServer(); + SchemaLoader.schemaDefinition("JoinTokenRingTest"); + } + + @Test + public void testIndexPreJoinInvocation() throws IOException + { + StorageService ss = StorageService.instance; + ss.joinRing(); + + SecondaryIndexManager indexManager = ColumnFamilyStore.getIfExists("JoinTokenRingTestKeyspace7", "Indexed1").indexManager; + StubIndex stub = (StubIndex) indexManager.getIndexByName("value_index"); + Assert.assertTrue(stub.preJoinInvocation); + } +}
