Repository: cassandra Updated Branches: refs/heads/trunk 40aeaf0c1 -> d31ed0f51
Abstract write path for pluggable storage Patch by Blake Eggleston; Reviewed by Aleksey Yeschenko for CASSANDRA-14118 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d31ed0f5 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d31ed0f5 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d31ed0f5 Branch: refs/heads/trunk Commit: d31ed0f51b23d8fce892695ecd82d4f843f31f4c Parents: 40aeaf0 Author: Blake Eggleston <[email protected]> Authored: Tue Mar 20 17:36:30 2018 -0700 Committer: Blake Eggleston <[email protected]> Committed: Sun Apr 15 17:39:58 2018 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../db/CassandraKeyspaceWriteHandler.java | 92 ++++++++++++++++++++ .../db/CassandraTableWriteHandler.java | 42 +++++++++ .../cassandra/db/CassandraWriteContext.java | 59 +++++++++++++ .../apache/cassandra/db/ColumnFamilyStore.java | 7 ++ src/java/org/apache/cassandra/db/Keyspace.java | 41 +++++---- .../cassandra/db/KeyspaceWriteHandler.java | 29 ++++++ .../cassandra/db/ReadExecutionController.java | 23 ++--- .../apache/cassandra/db/TableWriteHandler.java | 27 ++++++ .../org/apache/cassandra/db/WriteContext.java | 31 +++++++ src/java/org/apache/cassandra/index/Index.java | 4 +- .../cassandra/index/SecondaryIndexManager.java | 30 ++++--- .../index/internal/CassandraIndex.java | 32 +++---- .../internal/composites/CompositesSearcher.java | 12 +-- .../index/internal/keys/KeysSearcher.java | 7 +- .../apache/cassandra/index/sasi/SASIIndex.java | 4 +- .../apache/cassandra/index/CustomIndexTest.java | 5 +- .../org/apache/cassandra/index/StubIndex.java | 2 +- .../index/internal/CustomCassandraIndex.java | 32 +++---- 19 files changed, 387 insertions(+), 93 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/d31ed0f5/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 0935434..5605a77 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0 + * Abstract write path for pluggable storage (CASSANDRA-14118) * nodetool describecluster should be more informative (CASSANDRA-13853) * Compaction performance improvements (CASSANDRA-14261) * Refactor Pair usage to avoid boxing ints/longs (CASSANDRA-14260) http://git-wip-us.apache.org/repos/asf/cassandra/blob/d31ed0f5/src/java/org/apache/cassandra/db/CassandraKeyspaceWriteHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/CassandraKeyspaceWriteHandler.java b/src/java/org/apache/cassandra/db/CassandraKeyspaceWriteHandler.java new file mode 100644 index 0000000..1f1bcdb --- /dev/null +++ b/src/java/org/apache/cassandra/db/CassandraKeyspaceWriteHandler.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db; + +import org.apache.cassandra.db.commitlog.CommitLog; +import org.apache.cassandra.db.commitlog.CommitLogPosition; +import org.apache.cassandra.exceptions.RequestExecutionException; +import org.apache.cassandra.tracing.Tracing; +import org.apache.cassandra.utils.concurrent.OpOrder; + +public class CassandraKeyspaceWriteHandler implements KeyspaceWriteHandler +{ + private final Keyspace keyspace; + + public CassandraKeyspaceWriteHandler(Keyspace keyspace) + { + this.keyspace = keyspace; + } + + @Override + public WriteContext beginWrite(Mutation mutation, boolean makeDurable) throws RequestExecutionException + { + OpOrder.Group group = null; + try + { + group = Keyspace.writeOrder.start(); + + // write the mutation to the commitlog and memtables + CommitLogPosition position = null; + if (makeDurable) + { + Tracing.trace("Appending to commitlog"); + position = CommitLog.instance.add(mutation); + } + return new CassandraWriteContext(group, position); + } + catch (Throwable t) + { + if (group != null) + { + group.close(); + } + throw t; + } + } + + private WriteContext createEmptyContext() + { + OpOrder.Group group = null; + try + { + group = Keyspace.writeOrder.start(); + return new CassandraWriteContext(group, null); + } + catch (Throwable t) + { + if (group != null) + { + group.close(); + } + throw t; + } + } + + @Override + public WriteContext createContextForIndexing() + { + return createEmptyContext(); + } + + @Override + public WriteContext createContextForRead() + { + return createEmptyContext(); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/d31ed0f5/src/java/org/apache/cassandra/db/CassandraTableWriteHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/CassandraTableWriteHandler.java b/src/java/org/apache/cassandra/db/CassandraTableWriteHandler.java new file mode 100644 index 0000000..146539c --- /dev/null +++ b/src/java/org/apache/cassandra/db/CassandraTableWriteHandler.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db; + +import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.index.transactions.UpdateTransaction; +import org.apache.cassandra.tracing.Tracing; + +public class CassandraTableWriteHandler implements TableWriteHandler +{ + private final ColumnFamilyStore cfs; + + public CassandraTableWriteHandler(ColumnFamilyStore cfs) + { + this.cfs = cfs; + } + + @Override + @SuppressWarnings("resource") + public void write(PartitionUpdate update, WriteContext context, UpdateTransaction updateTransaction) + { + CassandraWriteContext ctx = CassandraWriteContext.fromContext(context); + Tracing.trace("Adding to {} memtable", update.metadata().name); + cfs.apply(update, updateTransaction, ctx.getGroup(), ctx.getPosition()); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/d31ed0f5/src/java/org/apache/cassandra/db/CassandraWriteContext.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/CassandraWriteContext.java b/src/java/org/apache/cassandra/db/CassandraWriteContext.java new file mode 100644 index 0000000..bac1351 --- /dev/null +++ b/src/java/org/apache/cassandra/db/CassandraWriteContext.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db; + +import com.google.common.base.Preconditions; + +import org.apache.cassandra.db.commitlog.CommitLogPosition; +import org.apache.cassandra.utils.concurrent.OpOrder; + +public class CassandraWriteContext implements WriteContext +{ + private final OpOrder.Group opGroup; + private final CommitLogPosition position; + + public CassandraWriteContext(OpOrder.Group opGroup, CommitLogPosition position) + { + Preconditions.checkArgument(opGroup != null); + this.opGroup = opGroup; + this.position = position; + } + + public static CassandraWriteContext fromContext(WriteContext context) + { + Preconditions.checkArgument(context instanceof CassandraWriteContext); + return (CassandraWriteContext) context; + } + + public OpOrder.Group getGroup() + { + return opGroup; + } + + public CommitLogPosition getPosition() + { + return position; + } + + @Override + public void close() + { + opGroup.close(); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/d31ed0f5/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index bfab6ea..50db418 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -216,6 +216,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean public final TableMetrics metric; public volatile long sampleLatencyNanos; + private final CassandraTableWriteHandler writeHandler; private final CassandraStreamManager streamManager; private final TableRepairManager repairManager; @@ -450,6 +451,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean mbeanName = null; oldMBeanName= null; } + writeHandler = new CassandraTableWriteHandler(this); streamManager = new CassandraStreamManager(this); repairManager = new CassandraTableRepairManager(this); } @@ -466,6 +468,11 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean } } + public TableWriteHandler getWriteHandler() + { + return writeHandler; + } + public TableStreamManager getStreamManager() { return streamManager; http://git-wip-us.apache.org/repos/asf/cassandra/blob/d31ed0f5/src/java/org/apache/cassandra/db/Keyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Keyspace.java b/src/java/org/apache/cassandra/db/Keyspace.java index 42d43b2..651d156 100644 --- a/src/java/org/apache/cassandra/db/Keyspace.java +++ b/src/java/org/apache/cassandra/db/Keyspace.java @@ -32,8 +32,6 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.concurrent.StageManager; import org.apache.cassandra.config.*; -import org.apache.cassandra.db.commitlog.CommitLog; -import org.apache.cassandra.db.commitlog.CommitLogPosition; import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.db.lifecycle.SSTableSet; import org.apache.cassandra.db.partitions.PartitionUpdate; @@ -91,6 +89,7 @@ public class Keyspace private volatile AbstractReplicationStrategy replicationStrategy; public final ViewManager viewManager; + private final KeyspaceWriteHandler writeHandler; private volatile ReplicationParams replicationParams; private final KeyspaceRepairManager repairManager; @@ -338,7 +337,9 @@ public class Keyspace initCf(Schema.instance.getTableMetadataRef(cfm.id), loadSSTables); } this.viewManager.reload(false); + this.repairManager = new CassandraKeyspaceRepairManager(this); + this.writeHandler = new CassandraKeyspaceWriteHandler(this); } private Keyspace(KeyspaceMetadata metadata) @@ -348,6 +349,7 @@ public class Keyspace this.metric = new KeyspaceMetrics(this); this.viewManager = new ViewManager(this); this.repairManager = new CassandraKeyspaceRepairManager(this); + this.writeHandler = new CassandraKeyspaceWriteHandler(this); } public KeyspaceRepairManager getRepairManager() @@ -424,6 +426,11 @@ public class Keyspace } } + public KeyspaceWriteHandler getWriteHandler() + { + return writeHandler; + } + /** * adds a cf to internal structures, ends up creating disk files). */ @@ -479,16 +486,16 @@ public class Keyspace * * @param mutation the row to write. Must not be modified after calling apply, since commitlog append * may happen concurrently, depending on the CL Executor type. - * @param writeCommitLog false to disable commitlog append entirely + * @param makeDurable if true, don't return unless write has been made durable * @param updateIndexes false to disable index updates (used by CollationController "defragmenting") * @param isDroppable true if this should throw WriteTimeoutException if it does not acquire lock within write_request_timeout_in_ms */ public void apply(final Mutation mutation, - final boolean writeCommitLog, + final boolean makeDurable, boolean updateIndexes, boolean isDroppable) { - applyInternal(mutation, writeCommitLog, updateIndexes, isDroppable, false, null); + applyInternal(mutation, makeDurable, updateIndexes, isDroppable, false, null); } /** @@ -496,13 +503,13 @@ public class Keyspace * * @param mutation the row to write. Must not be modified after calling apply, since commitlog append * may happen concurrently, depending on the CL Executor type. - * @param writeCommitLog false to disable commitlog append entirely + * @param makeDurable if true, don't return unless write has been made durable * @param updateIndexes false to disable index updates (used by CollationController "defragmenting") * @param isDroppable true if this should throw WriteTimeoutException if it does not acquire lock within write_request_timeout_in_ms * @param isDeferrable true if caller is not waiting for future to complete, so that future may be deferred */ private CompletableFuture<?> applyInternal(final Mutation mutation, - final boolean writeCommitLog, + final boolean makeDurable, boolean updateIndexes, boolean isDroppable, boolean isDeferrable, @@ -564,7 +571,7 @@ public class Keyspace // we will re-apply ourself to the queue and try again later final CompletableFuture<?> mark = future; StageManager.getStage(Stage.MUTATION).execute(() -> - applyInternal(mutation, writeCommitLog, true, isDroppable, true, mark) + applyInternal(mutation, makeDurable, true, isDroppable, true, mark) ); return future; } @@ -604,16 +611,8 @@ public class Keyspace } } int nowInSec = FBUtilities.nowInSeconds(); - try (OpOrder.Group opGroup = writeOrder.start()) + try (WriteContext ctx = getWriteHandler().beginWrite(mutation, makeDurable)) { - // write the mutation to the commitlog and memtables - CommitLogPosition commitLogPosition = null; - if (writeCommitLog) - { - Tracing.trace("Appending to commitlog"); - commitLogPosition = CommitLog.instance.add(mutation); - } - for (PartitionUpdate upd : mutation.getPartitionUpdates()) { ColumnFamilyStore cfs = columnFamilyStores.get(upd.metadata().id); @@ -629,7 +628,7 @@ public class Keyspace try { Tracing.trace("Creating materialized view mutations from base table replica"); - viewManager.forTable(upd.metadata().id).pushViewReplicaUpdates(upd, writeCommitLog, baseComplete); + viewManager.forTable(upd.metadata().id).pushViewReplicaUpdates(upd, makeDurable, baseComplete); } catch (Throwable t) { @@ -640,11 +639,11 @@ public class Keyspace } } - Tracing.trace("Adding to {} memtable", upd.metadata().name); UpdateTransaction indexTransaction = updateIndexes - ? cfs.indexManager.newUpdateTransaction(upd, opGroup, nowInSec) + ? cfs.indexManager.newUpdateTransaction(upd, ctx, nowInSec) : UpdateTransaction.NO_OP; - cfs.apply(upd, indexTransaction, opGroup, commitLogPosition); + cfs.getWriteHandler().write(upd, ctx, indexTransaction); + if (requiresViewUpdate) baseComplete.set(System.currentTimeMillis()); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/d31ed0f5/src/java/org/apache/cassandra/db/KeyspaceWriteHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/KeyspaceWriteHandler.java b/src/java/org/apache/cassandra/db/KeyspaceWriteHandler.java new file mode 100644 index 0000000..19cca72 --- /dev/null +++ b/src/java/org/apache/cassandra/db/KeyspaceWriteHandler.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db; + +import org.apache.cassandra.exceptions.RequestExecutionException; + +public interface KeyspaceWriteHandler +{ + // mutation can be null if makeDurable is false + WriteContext beginWrite(Mutation mutation, boolean makeDurable) throws RequestExecutionException; + WriteContext createContextForIndexing(); + WriteContext createContextForRead(); +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/d31ed0f5/src/java/org/apache/cassandra/db/ReadExecutionController.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ReadExecutionController.java b/src/java/org/apache/cassandra/db/ReadExecutionController.java index 9114212..537e3f8 100644 --- a/src/java/org/apache/cassandra/db/ReadExecutionController.java +++ b/src/java/org/apache/cassandra/db/ReadExecutionController.java @@ -29,9 +29,9 @@ public class ReadExecutionController implements AutoCloseable // For index reads private final ReadExecutionController indexController; - private final OpOrder.Group writeOp; + private final WriteContext writeContext; - private ReadExecutionController(OpOrder.Group baseOp, TableMetadata baseMetadata, ReadExecutionController indexController, OpOrder.Group writeOp) + private ReadExecutionController(OpOrder.Group baseOp, TableMetadata baseMetadata, ReadExecutionController indexController, WriteContext writeContext) { // We can have baseOp == null, but only when empty() is called, in which case the controller will never really be used // (which validForReadOn should ensure). But if it's not null, we should have the proper metadata too. @@ -39,7 +39,7 @@ public class ReadExecutionController implements AutoCloseable this.baseOp = baseOp; this.baseMetadata = baseMetadata; this.indexController = indexController; - this.writeOp = writeOp; + this.writeContext = writeContext; } public ReadExecutionController indexReadController() @@ -47,9 +47,9 @@ public class ReadExecutionController implements AutoCloseable return indexController; } - public OpOrder.Group writeOpOrderGroup() + public WriteContext getWriteContext() { - return writeOp; + return writeContext; } public boolean validForReadOn(ColumnFamilyStore cfs) @@ -83,7 +83,8 @@ public class ReadExecutionController implements AutoCloseable } else { - OpOrder.Group baseOp = null, writeOp = null; + OpOrder.Group baseOp = null; + WriteContext writeContext = null; ReadExecutionController indexController = null; // OpOrder.start() shouldn't fail, but better safe than sorry. try @@ -92,13 +93,13 @@ public class ReadExecutionController implements AutoCloseable indexController = new ReadExecutionController(indexCfs.readOrdering.start(), indexCfs.metadata(), null, null); // TODO: this should perhaps not open and maintain a writeOp for the full duration, but instead only *try* to delete stale entries, without blocking if there's no room // as it stands, we open a writeOp and keep it open for the duration to ensure that should this CF get flushed to make room we don't block the reclamation of any room being made - writeOp = Keyspace.writeOrder.start(); - return new ReadExecutionController(baseOp, baseCfs.metadata(), indexController, writeOp); + writeContext = baseCfs.keyspace.getWriteHandler().createContextForRead(); + return new ReadExecutionController(baseOp, baseCfs.metadata(), indexController, writeContext); } catch (RuntimeException e) { - // Note that must have writeOp == null since ReadOrderGroup ctor can't fail - assert writeOp == null; + // Note that must have writeContext == null since ReadOrderGroup ctor can't fail + assert writeContext == null; try { if (baseOp != null) @@ -142,7 +143,7 @@ public class ReadExecutionController implements AutoCloseable } finally { - writeOp.close(); + writeContext.close(); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/d31ed0f5/src/java/org/apache/cassandra/db/TableWriteHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/TableWriteHandler.java b/src/java/org/apache/cassandra/db/TableWriteHandler.java new file mode 100644 index 0000000..4e47221 --- /dev/null +++ b/src/java/org/apache/cassandra/db/TableWriteHandler.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db; + +import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.index.transactions.UpdateTransaction; + +public interface TableWriteHandler +{ + void write(PartitionUpdate update, WriteContext context, UpdateTransaction updateTransaction); +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/d31ed0f5/src/java/org/apache/cassandra/db/WriteContext.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/WriteContext.java b/src/java/org/apache/cassandra/db/WriteContext.java new file mode 100644 index 0000000..102ab50 --- /dev/null +++ b/src/java/org/apache/cassandra/db/WriteContext.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db; + +/** + * Issued by the keyspace write handler and used in the write path (as expected), as well as the read path + * and some async index building code. In the read and index paths, the write context is intended to be used + * as a marker for ordering operations. Reads can also end up performing writes in some cases, particularly + * when correcting secondary indexes. + */ +public interface WriteContext extends AutoCloseable +{ + @Override + void close(); +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/d31ed0f5/src/java/org/apache/cassandra/index/Index.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/Index.java b/src/java/org/apache/cassandra/index/Index.java index 1b4573d..adfe08c 100644 --- a/src/java/org/apache/cassandra/index/Index.java +++ b/src/java/org/apache/cassandra/index/Index.java @@ -377,7 +377,7 @@ public interface Index * This can be empty as an update might only contain partition, range and row deletions, but * the indexer is guaranteed to not get any cells for a column that is not part of {@code columns}. * @param nowInSec current time of the update operation - * @param opGroup operation group spanning the update operation + * @param ctx WriteContext spanning the update operation * @param transactionType indicates what kind of update is being performed on the base data * i.e. a write time insert/update/delete or the result of compaction * @return the newly created indexer or {@code null} if the index is not interested by the update @@ -387,7 +387,7 @@ public interface Index public Indexer indexerFor(DecoratedKey key, RegularAndStaticColumns columns, int nowInSec, - OpOrder.Group opGroup, + WriteContext ctx, IndexTransaction.Type transactionType); /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/d31ed0f5/src/java/org/apache/cassandra/index/SecondaryIndexManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java index b60d811..70aebbd 100644 --- a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java +++ b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java @@ -164,10 +164,12 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum * The underlying column family containing the source data for these indexes */ public final ColumnFamilyStore baseCfs; + private final Keyspace keyspace; public SecondaryIndexManager(ColumnFamilyStore baseCfs) { this.baseCfs = baseCfs; + this.keyspace = baseCfs.keyspace; baseCfs.getTracker().subscribe(this); } @@ -842,7 +844,7 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum while (!pager.isExhausted()) { try (ReadExecutionController controller = cmd.executionController(); - OpOrder.Group writeGroup = Keyspace.writeOrder.start(); + WriteContext ctx = keyspace.getWriteHandler().createContextForIndexing(); UnfilteredPartitionIterator page = pager.fetchPageUnfiltered(baseCfs.metadata(), pageSize, controller)) { if (!page.hasNext()) @@ -854,7 +856,7 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum .map(index -> index.indexerFor(key, partition.columns(), nowInSec, - writeGroup, + ctx, IndexTransaction.Type.UPDATE)) .filter(Objects::nonNull) .collect(Collectors.toSet()); @@ -1115,7 +1117,7 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum /** * Transaction for updates on the write path. */ - public UpdateTransaction newUpdateTransaction(PartitionUpdate update, OpOrder.Group opGroup, int nowInSec) + public UpdateTransaction newUpdateTransaction(PartitionUpdate update, WriteContext ctx, int nowInSec) { if (!hasIndexes()) return UpdateTransaction.NO_OP; @@ -1124,7 +1126,7 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum .map(i -> i.indexerFor(update.partitionKey(), update.columns(), nowInSec, - opGroup, + ctx, IndexTransaction.Type.UPDATE)) .filter(Objects::nonNull) .toArray(Index.Indexer[]::new); @@ -1141,7 +1143,7 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum int nowInSec) { // the check for whether there are any registered indexes is already done in CompactionIterator - return new IndexGCTransaction(key, regularAndStaticColumns, versions, nowInSec, listIndexes()); + return new IndexGCTransaction(key, regularAndStaticColumns, keyspace, versions, nowInSec, listIndexes()); } /** @@ -1154,7 +1156,7 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum if (!hasIndexes()) return CleanupTransaction.NO_OP; - return new CleanupGCTransaction(key, regularAndStaticColumns, nowInSec, listIndexes()); + return new CleanupGCTransaction(key, regularAndStaticColumns, keyspace, nowInSec, listIndexes()); } /** @@ -1267,6 +1269,7 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum { private final DecoratedKey key; private final RegularAndStaticColumns columns; + private final Keyspace keyspace; private final int versions; private final int nowInSec; private final Collection<Index> indexes; @@ -1275,12 +1278,13 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum private IndexGCTransaction(DecoratedKey key, RegularAndStaticColumns columns, - int versions, + Keyspace keyspace, int versions, int nowInSec, Collection<Index> indexes) { this.key = key; this.columns = columns; + this.keyspace = keyspace; this.versions = versions; this.indexes = indexes; this.nowInSec = nowInSec; @@ -1342,11 +1346,11 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum if (rows == null) return; - try (OpOrder.Group opGroup = Keyspace.writeOrder.start()) + try (WriteContext ctx = keyspace.getWriteHandler().createContextForIndexing()) { for (Index index : indexes) { - Index.Indexer indexer = index.indexerFor(key, columns, nowInSec, opGroup, Type.COMPACTION); + Index.Indexer indexer = index.indexerFor(key, columns, nowInSec, ctx, Type.COMPACTION); if (indexer == null) continue; @@ -1370,6 +1374,7 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum { private final DecoratedKey key; private final RegularAndStaticColumns columns; + private final Keyspace keyspace; private final int nowInSec; private final Collection<Index> indexes; @@ -1378,11 +1383,12 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum private CleanupGCTransaction(DecoratedKey key, RegularAndStaticColumns columns, - int nowInSec, + Keyspace keyspace, int nowInSec, Collection<Index> indexes) { this.key = key; this.columns = columns; + this.keyspace = keyspace; this.indexes = indexes; this.nowInSec = nowInSec; } @@ -1406,11 +1412,11 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum if (row == null && partitionDelete == null) return; - try (OpOrder.Group opGroup = Keyspace.writeOrder.start()) + try (WriteContext ctx = keyspace.getWriteHandler().createContextForIndexing()) { for (Index index : indexes) { - Index.Indexer indexer = index.indexerFor(key, columns, nowInSec, opGroup, Type.CLEANUP); + Index.Indexer indexer = index.indexerFor(key, columns, nowInSec, ctx, Type.CLEANUP); if (indexer == null) continue; http://git-wip-us.apache.org/repos/asf/cassandra/blob/d31ed0f5/src/java/org/apache/cassandra/index/internal/CassandraIndex.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/internal/CassandraIndex.java b/src/java/org/apache/cassandra/index/internal/CassandraIndex.java index c7f3536..9a29c02 100644 --- a/src/java/org/apache/cassandra/index/internal/CassandraIndex.java +++ b/src/java/org/apache/cassandra/index/internal/CassandraIndex.java @@ -340,7 +340,7 @@ public abstract class CassandraIndex implements Index public Indexer indexerFor(final DecoratedKey key, final RegularAndStaticColumns columns, final int nowInSec, - final OpOrder.Group opGroup, + final WriteContext ctx, final IndexTransaction.Type transactionType) { /** @@ -445,7 +445,7 @@ public abstract class CassandraIndex implements Index clustering, cell, LivenessInfo.withExpirationTime(cell.timestamp(), cell.ttl(), cell.localDeletionTime()), - opGroup); + ctx); } private void removeCells(Clustering clustering, Iterable<Cell> cells) @@ -462,7 +462,7 @@ public abstract class CassandraIndex implements Index if (cell == null || !cell.isLive(nowInSec)) return; - delete(key.getKey(), clustering, cell, opGroup, nowInSec); + delete(key.getKey(), clustering, cell, ctx, nowInSec); } private void indexPrimaryKey(final Clustering clustering, @@ -470,10 +470,10 @@ public abstract class CassandraIndex implements Index final Row.Deletion deletion) { if (liveness.timestamp() != LivenessInfo.NO_TIMESTAMP) - insert(key.getKey(), clustering, null, liveness, opGroup); + insert(key.getKey(), clustering, null, liveness, ctx); if (!deletion.isLive()) - delete(key.getKey(), clustering, deletion.time(), opGroup); + delete(key.getKey(), clustering, deletion.time(), ctx); } private LivenessInfo getPrimaryKeyIndexLiveness(Row row) @@ -503,14 +503,14 @@ public abstract class CassandraIndex implements Index * @param indexKey the partition key in the index table * @param indexClustering the clustering in the index table * @param deletion deletion timestamp etc - * @param opGroup the operation under which to perform the deletion + * @param ctx the write context under which to perform the deletion */ public void deleteStaleEntry(DecoratedKey indexKey, Clustering indexClustering, DeletionTime deletion, - OpOrder.Group opGroup) + WriteContext ctx) { - doDelete(indexKey, indexClustering, deletion, opGroup); + doDelete(indexKey, indexClustering, deletion, ctx); logger.trace("Removed index entry for stale value {}", indexKey); } @@ -521,14 +521,14 @@ public abstract class CassandraIndex implements Index Clustering clustering, Cell cell, LivenessInfo info, - OpOrder.Group opGroup) + WriteContext ctx) { DecoratedKey valueKey = getIndexKeyFor(getIndexedValue(rowKey, clustering, cell)); Row row = BTreeRow.noCellLiveRow(buildIndexClustering(rowKey, clustering, cell), info); PartitionUpdate upd = partitionUpdate(valueKey, row); - indexCfs.apply(upd, UpdateTransaction.NO_OP, opGroup, null); + indexCfs.getWriteHandler().write(upd, ctx, UpdateTransaction.NO_OP); logger.trace("Inserted entry into index for value {}", valueKey); } @@ -538,7 +538,7 @@ public abstract class CassandraIndex implements Index private void delete(ByteBuffer rowKey, Clustering clustering, Cell cell, - OpOrder.Group opGroup, + WriteContext ctx, int nowInSec) { DecoratedKey valueKey = getIndexKeyFor(getIndexedValue(rowKey, @@ -547,7 +547,7 @@ public abstract class CassandraIndex implements Index doDelete(valueKey, buildIndexClustering(rowKey, clustering, cell), new DeletionTime(cell.timestamp(), nowInSec), - opGroup); + ctx); } /** @@ -556,7 +556,7 @@ public abstract class CassandraIndex implements Index private void delete(ByteBuffer rowKey, Clustering clustering, DeletionTime deletion, - OpOrder.Group opGroup) + WriteContext ctx) { DecoratedKey valueKey = getIndexKeyFor(getIndexedValue(rowKey, clustering, @@ -564,17 +564,17 @@ public abstract class CassandraIndex implements Index doDelete(valueKey, buildIndexClustering(rowKey, clustering, null), deletion, - opGroup); + ctx); } private void doDelete(DecoratedKey indexKey, Clustering indexClustering, DeletionTime deletion, - OpOrder.Group opGroup) + WriteContext ctx) { Row row = BTreeRow.emptyDeletedRow(indexClustering, Row.Deletion.regular(deletion)); PartitionUpdate upd = partitionUpdate(indexKey, row); - indexCfs.apply(upd, UpdateTransaction.NO_OP, opGroup, null); + indexCfs.getWriteHandler().write(upd, ctx, UpdateTransaction.NO_OP); logger.trace("Removed index entry for value {}", indexKey); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/d31ed0f5/src/java/org/apache/cassandra/index/internal/composites/CompositesSearcher.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/internal/composites/CompositesSearcher.java b/src/java/org/apache/cassandra/index/internal/composites/CompositesSearcher.java index f9ae705..9045b3b 100644 --- a/src/java/org/apache/cassandra/index/internal/composites/CompositesSearcher.java +++ b/src/java/org/apache/cassandra/index/internal/composites/CompositesSearcher.java @@ -170,7 +170,7 @@ public class CompositesSearcher extends CassandraIndexSearcher filterStaleEntries(dataCmd.queryMemtableAndDisk(index.baseCfs, executionController), indexKey.getKey(), entries, - executionController.writeOpOrderGroup(), + executionController.getWriteContext(), command.nowInSec()); if (dataIter.isEmpty()) @@ -198,20 +198,20 @@ public class CompositesSearcher extends CassandraIndexSearcher }; } - private void deleteAllEntries(final List<IndexEntry> entries, final OpOrder.Group writeOp, final int nowInSec) + private void deleteAllEntries(final List<IndexEntry> entries, final WriteContext ctx, final int nowInSec) { entries.forEach(entry -> index.deleteStaleEntry(entry.indexValue, entry.indexClustering, new DeletionTime(entry.timestamp, nowInSec), - writeOp)); + ctx)); } // We assume all rows in dataIter belong to the same partition. private UnfilteredRowIterator filterStaleEntries(UnfilteredRowIterator dataIter, final ByteBuffer indexValue, final List<IndexEntry> entries, - final OpOrder.Group writeOp, + final WriteContext ctx, final int nowInSec) { // collect stale index entries and delete them when we close this iterator @@ -245,7 +245,7 @@ public class CompositesSearcher extends CassandraIndexSearcher dataIter.partitionLevelDeletion(), dataIter.isReverseOrder()); } - deleteAllEntries(staleEntries, writeOp, nowInSec); + deleteAllEntries(staleEntries, ctx, nowInSec); } else { @@ -292,7 +292,7 @@ public class CompositesSearcher extends CassandraIndexSearcher @Override public void onPartitionClose() { - deleteAllEntries(staleEntries, writeOp, nowInSec); + deleteAllEntries(staleEntries, ctx, nowInSec); } } iteratorToReturn = Transformation.apply(dataIter, new Transform()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/d31ed0f5/src/java/org/apache/cassandra/index/internal/keys/KeysSearcher.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/internal/keys/KeysSearcher.java b/src/java/org/apache/cassandra/index/internal/keys/KeysSearcher.java index 16dcd20..8b3a3d2 100644 --- a/src/java/org/apache/cassandra/index/internal/keys/KeysSearcher.java +++ b/src/java/org/apache/cassandra/index/internal/keys/KeysSearcher.java @@ -31,7 +31,6 @@ import org.apache.cassandra.db.rows.*; import org.apache.cassandra.index.internal.CassandraIndex; import org.apache.cassandra.index.internal.CassandraIndexSearcher; import org.apache.cassandra.schema.TableMetadata; -import org.apache.cassandra.utils.concurrent.OpOrder; public class KeysSearcher extends CassandraIndexSearcher { @@ -100,7 +99,7 @@ public class KeysSearcher extends CassandraIndexSearcher UnfilteredRowIterator dataIter = filterIfStale(dataCmd.queryMemtableAndDisk(index.baseCfs, executionController), hit, indexKey.getKey(), - executionController.writeOpOrderGroup(), + executionController.getWriteContext(), command.nowInSec()); if (dataIter != null) @@ -142,7 +141,7 @@ public class KeysSearcher extends CassandraIndexSearcher private UnfilteredRowIterator filterIfStale(UnfilteredRowIterator iterator, Row indexHit, ByteBuffer indexedValue, - OpOrder.Group writeOp, + WriteContext ctx, int nowInSec) { Row data = iterator.staticRow(); @@ -152,7 +151,7 @@ public class KeysSearcher extends CassandraIndexSearcher index.deleteStaleEntry(index.getIndexCfs().decorateKey(indexedValue), makeIndexClustering(iterator.partitionKey().getKey(), Clustering.EMPTY), new DeletionTime(indexHit.primaryKeyLivenessInfo().timestamp(), nowInSec), - writeOp); + ctx); iterator.close(); return null; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/d31ed0f5/src/java/org/apache/cassandra/index/sasi/SASIIndex.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/SASIIndex.java b/src/java/org/apache/cassandra/index/sasi/SASIIndex.java index b055e82..50b35fd 100644 --- a/src/java/org/apache/cassandra/index/sasi/SASIIndex.java +++ b/src/java/org/apache/cassandra/index/sasi/SASIIndex.java @@ -242,7 +242,7 @@ public class SASIIndex implements Index, INotificationConsumer public void validate(PartitionUpdate update) throws InvalidRequestException {} - public Indexer indexerFor(DecoratedKey key, RegularAndStaticColumns columns, int nowInSec, OpOrder.Group opGroup, IndexTransaction.Type transactionType) + public Indexer indexerFor(DecoratedKey key, RegularAndStaticColumns columns, int nowInSec, WriteContext context, IndexTransaction.Type transactionType) { return new Indexer() { @@ -258,7 +258,7 @@ public class SASIIndex implements Index, INotificationConsumer public void insertRow(Row row) { if (isNewData()) - adjustMemtableSize(index.index(key, row), opGroup); + adjustMemtableSize(index.index(key, row), CassandraWriteContext.fromContext(context).getGroup()); } public void updateRow(Row oldRow, Row newRow) http://git-wip-us.apache.org/repos/asf/cassandra/blob/d31ed0f5/test/unit/org/apache/cassandra/index/CustomIndexTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/index/CustomIndexTest.java b/test/unit/org/apache/cassandra/index/CustomIndexTest.java index d14b50d..3036b1a 100644 --- a/test/unit/org/apache/cassandra/index/CustomIndexTest.java +++ b/test/unit/org/apache/cassandra/index/CustomIndexTest.java @@ -1048,13 +1048,14 @@ public class CustomIndexTest extends CQLTester public Indexer indexerFor(final DecoratedKey key, RegularAndStaticColumns columns, int nowInSec, - OpOrder.Group opGroup, + WriteContext ctx, IndexTransaction.Type transactionType) { + CassandraWriteContext cassandraWriteContext = (CassandraWriteContext) ctx; if (readOrderingAtStart == null) readOrderingAtStart = baseCfs.readOrdering.getCurrent(); - writeGroups.add(opGroup); + writeGroups.add(cassandraWriteContext.getGroup()); return new Indexer() { http://git-wip-us.apache.org/repos/asf/cassandra/blob/d31ed0f5/test/unit/org/apache/cassandra/index/StubIndex.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/index/StubIndex.java b/test/unit/org/apache/cassandra/index/StubIndex.java index 00c47d1..a351cce 100644 --- a/test/unit/org/apache/cassandra/index/StubIndex.java +++ b/test/unit/org/apache/cassandra/index/StubIndex.java @@ -100,7 +100,7 @@ public class StubIndex implements Index public Indexer indexerFor(final DecoratedKey key, RegularAndStaticColumns columns, int nowInSec, - OpOrder.Group opGroup, + WriteContext ctx, IndexTransaction.Type transactionType) { return new Indexer() http://git-wip-us.apache.org/repos/asf/cassandra/blob/d31ed0f5/test/unit/org/apache/cassandra/index/internal/CustomCassandraIndex.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/index/internal/CustomCassandraIndex.java b/test/unit/org/apache/cassandra/index/internal/CustomCassandraIndex.java index 028bbd0..f007c09 100644 --- a/test/unit/org/apache/cassandra/index/internal/CustomCassandraIndex.java +++ b/test/unit/org/apache/cassandra/index/internal/CustomCassandraIndex.java @@ -287,7 +287,7 @@ public class CustomCassandraIndex implements Index public Indexer indexerFor(final DecoratedKey key, final RegularAndStaticColumns columns, final int nowInSec, - final OpOrder.Group opGroup, + final WriteContext ctx, final IndexTransaction.Type transactionType) { if (!isPrimaryKeyIndex() && !columns.contains(indexedColumn)) @@ -377,7 +377,7 @@ public class CustomCassandraIndex implements Index clustering, cell, LivenessInfo.withExpirationTime(cell.timestamp(), cell.ttl(), cell.localDeletionTime()), - opGroup); + ctx); } private void removeCells(Clustering clustering, Iterable<Cell> cells) @@ -394,7 +394,7 @@ public class CustomCassandraIndex implements Index if (cell == null || !cell.isLive(nowInSec)) return; - delete(key.getKey(), clustering, cell, opGroup, nowInSec); + delete(key.getKey(), clustering, cell, ctx, nowInSec); } private void indexPrimaryKey(final Clustering clustering, @@ -402,10 +402,10 @@ public class CustomCassandraIndex implements Index final Row.Deletion deletion) { if (liveness.timestamp() != LivenessInfo.NO_TIMESTAMP) - insert(key.getKey(), clustering, null, liveness, opGroup); + insert(key.getKey(), clustering, null, liveness, ctx); if (!deletion.isLive()) - delete(key.getKey(), clustering, deletion.time(), opGroup); + delete(key.getKey(), clustering, deletion.time(), ctx); } private LivenessInfo getPrimaryKeyIndexLiveness(Row row) @@ -435,14 +435,14 @@ public class CustomCassandraIndex implements Index * @param indexKey the partition key in the index table * @param indexClustering the clustering in the index table * @param deletion deletion timestamp etc - * @param opGroup the operation under which to perform the deletion + * @param ctx the context under which to perform the deletion */ public void deleteStaleEntry(DecoratedKey indexKey, Clustering indexClustering, DeletionTime deletion, - OpOrder.Group opGroup) + WriteContext ctx) { - doDelete(indexKey, indexClustering, deletion, opGroup); + doDelete(indexKey, indexClustering, deletion, ctx); logger.debug("Removed index entry for stale value {}", indexKey); } @@ -453,14 +453,14 @@ public class CustomCassandraIndex implements Index Clustering clustering, Cell cell, LivenessInfo info, - OpOrder.Group opGroup) + WriteContext ctx) { DecoratedKey valueKey = getIndexKeyFor(getIndexedValue(rowKey, clustering, cell)); Row row = BTreeRow.noCellLiveRow(buildIndexClustering(rowKey, clustering, cell), info); PartitionUpdate upd = partitionUpdate(valueKey, row); - indexCfs.apply(upd, UpdateTransaction.NO_OP, opGroup, null); + indexCfs.getWriteHandler().write(upd, ctx, UpdateTransaction.NO_OP); logger.debug("Inserted entry into index for value {}", valueKey); } @@ -470,7 +470,7 @@ public class CustomCassandraIndex implements Index private void delete(ByteBuffer rowKey, Clustering clustering, Cell cell, - OpOrder.Group opGroup, + WriteContext ctx, int nowInSec) { DecoratedKey valueKey = getIndexKeyFor(getIndexedValue(rowKey, @@ -479,7 +479,7 @@ public class CustomCassandraIndex implements Index doDelete(valueKey, buildIndexClustering(rowKey, clustering, cell), new DeletionTime(cell.timestamp(), nowInSec), - opGroup); + ctx); } /** @@ -488,7 +488,7 @@ public class CustomCassandraIndex implements Index private void delete(ByteBuffer rowKey, Clustering clustering, DeletionTime deletion, - OpOrder.Group opGroup) + WriteContext ctx) { DecoratedKey valueKey = getIndexKeyFor(getIndexedValue(rowKey, clustering, @@ -496,17 +496,17 @@ public class CustomCassandraIndex implements Index doDelete(valueKey, buildIndexClustering(rowKey, clustering, null), deletion, - opGroup); + ctx); } private void doDelete(DecoratedKey indexKey, Clustering indexClustering, DeletionTime deletion, - OpOrder.Group opGroup) + WriteContext ctx) { Row row = BTreeRow.emptyDeletedRow(indexClustering, Row.Deletion.regular(deletion)); PartitionUpdate upd = partitionUpdate(indexKey, row); - indexCfs.apply(upd, UpdateTransaction.NO_OP, opGroup, null); + indexCfs.getWriteHandler().write(upd, ctx, UpdateTransaction.NO_OP); logger.debug("Removed index entry for value {}", indexKey); } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
