Improve batchlog write path patch by Stefania Alborghetti; reviewed by Aleksey Yeschenko for CASSANDRA-9673
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/53a177a9 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/53a177a9 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/53a177a9 Branch: refs/heads/trunk Commit: 53a177a9150586e56408f25c959f75110a2997e7 Parents: 5f02f20 Author: Stefania Alborghetti <[email protected]> Authored: Fri Jul 10 17:03:06 2015 +0800 Committer: Aleksey Yeschenko <[email protected]> Committed: Wed Sep 2 08:43:42 2015 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + conf/cassandra.yaml | 1 - .../org/apache/cassandra/batchlog/Batch.java | 155 +++++ .../batchlog/BatchRemoveVerbHandler.java | 31 + .../batchlog/BatchStoreVerbHandler.java | 32 + .../cassandra/batchlog/BatchlogManager.java | 554 +++++++++++++++++ .../batchlog/BatchlogManagerMBean.java | 38 ++ .../batchlog/LegacyBatchlogMigrator.java | 196 ++++++ .../org/apache/cassandra/concurrent/Stage.java | 2 - .../cassandra/concurrent/StageManager.java | 1 - .../org/apache/cassandra/config/Config.java | 1 - .../cassandra/config/DatabaseDescriptor.java | 7 +- .../cql3/statements/BatchStatement.java | 5 - .../apache/cassandra/db/BatchlogManager.java | 596 ------------------- .../cassandra/db/BatchlogManagerMBean.java | 38 -- .../db/CounterMutationVerbHandler.java | 2 +- src/java/org/apache/cassandra/db/Mutation.java | 7 +- .../cassandra/db/MutationVerbHandler.java | 43 +- .../cassandra/db/ReadRepairVerbHandler.java | 3 +- .../org/apache/cassandra/db/SystemKeyspace.java | 2 +- .../org/apache/cassandra/db/WriteResponse.java | 18 +- .../cassandra/hints/EncodedHintMessage.java | 6 +- src/java/org/apache/cassandra/hints/Hint.java | 11 +- .../org/apache/cassandra/hints/HintMessage.java | 14 +- .../cassandra/hints/LegacyHintsMigrator.java | 2 +- .../apache/cassandra/net/MessagingService.java | 26 +- .../cassandra/service/CassandraDaemon.java | 6 +- .../apache/cassandra/service/StorageProxy.java | 208 ++++--- .../cassandra/service/StorageService.java | 19 +- .../service/paxos/CommitVerbHandler.java | 6 +- .../org/apache/cassandra/tools/NodeProbe.java | 4 +- .../cql3/MaterializedViewLongTest.java | 2 +- .../apache/cassandra/batchlog/BatchTest.java | 153 +++++ .../batchlog/BatchlogEndpointFilterTest.java | 115 ++++ .../cassandra/batchlog/BatchlogManagerTest.java | 460 ++++++++++++++ .../cassandra/db/BatchlogManagerTest.java | 394 ------------ .../service/BatchlogEndpointFilterTest.java | 117 ---- 37 files changed, 1945 insertions(+), 1331 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index fe8f453..751b75d 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0.0-beta2 + * Improve batchlog write patch (CASSANDRA-9673) * Re-apply MaterializedView updates on commitlog replay (CASSANDRA-10164) * Require AbstractType.isByteOrderComparable declaration in constructor (CASSANDRA-9901) * Avoid digest mismatch on upgrade to 3.0 (CASSANDRA-9554) http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/conf/cassandra.yaml ---------------------------------------------------------------------- diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml index 16108bd..0f8b829 100644 --- a/conf/cassandra.yaml +++ b/conf/cassandra.yaml @@ -355,7 +355,6 @@ seed_provider: concurrent_reads: 32 concurrent_writes: 32 concurrent_counter_writes: 32 -concurrent_batchlog_writes: 32 # For materialized view writes, as there is a read involved, so this should # be limited by the less of concurrent reads or concurrent writes. http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/batchlog/Batch.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/batchlog/Batch.java b/src/java/org/apache/cassandra/batchlog/Batch.java new file mode 100644 index 0000000..caa2682 --- /dev/null +++ b/src/java/org/apache/cassandra/batchlog/Batch.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.batchlog; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.*; + +import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.UUIDSerializer; + +import static org.apache.cassandra.db.TypeSizes.sizeof; +import static org.apache.cassandra.db.TypeSizes.sizeofVInt; + +public final class Batch +{ + public static final Serializer serializer = new Serializer(); + + public final UUID id; + public final long creationTime; // time of batch creation (in microseconds) + + // one of these will always be empty + final Collection<Mutation> decodedMutations; + final Collection<ByteBuffer> encodedMutations; + + private Batch(UUID id, long creationTime, Collection<Mutation> decodedMutations, Collection<ByteBuffer> encodedMutations) + { + this.id = id; + this.creationTime = creationTime; + + this.decodedMutations = decodedMutations; + this.encodedMutations = encodedMutations; + } + + /** + * Creates a 'local' batch - with all enclosed mutations in decoded form (as Mutation instances) + */ + public static Batch createLocal(UUID id, long creationTime, Collection<Mutation> mutations) + { + return new Batch(id, creationTime, mutations, Collections.emptyList()); + } + + /** + * Creates a 'remote' batch - with all enclosed mutations in encoded form (as ByteBuffer instances) + * + * The mutations will always be encoded using the current messaging version. + */ + public static Batch createRemote(UUID id, long creationTime, Collection<ByteBuffer> mutations) + { + return new Batch(id, creationTime, Collections.<Mutation>emptyList(), mutations); + } + + /** + * Count of the mutations in the batch. + */ + public int size() + { + return decodedMutations.size() + encodedMutations.size(); + } + + static final class Serializer implements IVersionedSerializer<Batch> + { + public long serializedSize(Batch batch, int version) + { + assert batch.encodedMutations.isEmpty() : "attempted to serialize a 'remote' batch"; + + long size = UUIDSerializer.serializer.serializedSize(batch.id, version); + size += sizeof(batch.creationTime); + + size += sizeofVInt(batch.decodedMutations.size()); + for (Mutation mutation : batch.decodedMutations) + { + int mutationSize = (int) Mutation.serializer.serializedSize(mutation, version); + size += sizeofVInt(mutationSize); + size += mutationSize; + } + + return size; + } + + public void serialize(Batch batch, DataOutputPlus out, int version) throws IOException + { + assert batch.encodedMutations.isEmpty() : "attempted to serialize a 'remote' batch"; + + UUIDSerializer.serializer.serialize(batch.id, out, version); + out.writeLong(batch.creationTime); + + out.writeVInt(batch.decodedMutations.size()); + for (Mutation mutation : batch.decodedMutations) + { + out.writeVInt(Mutation.serializer.serializedSize(mutation, version)); + Mutation.serializer.serialize(mutation, out, version); + } + } + + public Batch deserialize(DataInputPlus in, int version) throws IOException + { + UUID id = UUIDSerializer.serializer.deserialize(in, version); + long creationTime = in.readLong(); + + /* + * If version doesn't match the current one, we cannot not just read the encoded mutations verbatim, + * so we decode them instead, to deal with compatibility. + */ + return version == MessagingService.current_version + ? createRemote(id, creationTime, readEncodedMutations(in)) + : createLocal(id, creationTime, decodeMutations(in, version)); + } + + private static Collection<ByteBuffer> readEncodedMutations(DataInputPlus in) throws IOException + { + int count = (int) in.readVInt(); + + ArrayList<ByteBuffer> mutations = new ArrayList<>(count); + for (int i = 0; i < count; i++) + mutations.add(ByteBufferUtil.readWithVIntLength(in)); + + return mutations; + } + + private static Collection<Mutation> decodeMutations(DataInputPlus in, int version) throws IOException + { + int count = (int) in.readVInt(); + + ArrayList<Mutation> mutations = new ArrayList<>(count); + for (int i = 0; i < count; i++) + { + in.readVInt(); // skip mutation size + mutations.add(Mutation.serializer.deserialize(in, version)); + } + + return mutations; + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/batchlog/BatchRemoveVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/batchlog/BatchRemoveVerbHandler.java b/src/java/org/apache/cassandra/batchlog/BatchRemoveVerbHandler.java new file mode 100644 index 0000000..3c3fcec --- /dev/null +++ b/src/java/org/apache/cassandra/batchlog/BatchRemoveVerbHandler.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.batchlog; + +import java.util.UUID; + +import org.apache.cassandra.net.IVerbHandler; +import org.apache.cassandra.net.MessageIn; + +public final class BatchRemoveVerbHandler implements IVerbHandler<UUID> +{ + public void doVerb(MessageIn<UUID> message, int id) + { + BatchlogManager.remove(message.payload); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/batchlog/BatchStoreVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/batchlog/BatchStoreVerbHandler.java b/src/java/org/apache/cassandra/batchlog/BatchStoreVerbHandler.java new file mode 100644 index 0000000..4bc878c --- /dev/null +++ b/src/java/org/apache/cassandra/batchlog/BatchStoreVerbHandler.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.batchlog; + +import org.apache.cassandra.db.WriteResponse; +import org.apache.cassandra.net.IVerbHandler; +import org.apache.cassandra.net.MessageIn; +import org.apache.cassandra.net.MessagingService; + +public final class BatchStoreVerbHandler implements IVerbHandler<Batch> +{ + public void doVerb(MessageIn<Batch> message, int id) + { + BatchlogManager.store(message.payload); + MessagingService.instance().sendReply(WriteResponse.createMessage(), id, message.from); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/batchlog/BatchlogManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/batchlog/BatchlogManager.java b/src/java/org/apache/cassandra/batchlog/BatchlogManager.java new file mode 100644 index 0000000..934ebaa --- /dev/null +++ b/src/java/org/apache/cassandra/batchlog/BatchlogManager.java @@ -0,0 +1,554 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.batchlog; + +import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.net.InetAddress; +import java.nio.ByteBuffer; +import java.util.*; +import java.util.concurrent.*; + +import javax.management.MBeanServer; +import javax.management.ObjectName; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.*; +import com.google.common.util.concurrent.RateLimiter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.cql3.UntypedResultSet; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.marshal.BytesType; +import org.apache.cassandra.db.marshal.UUIDType; +import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.exceptions.WriteFailureException; +import org.apache.cassandra.exceptions.WriteTimeoutException; +import org.apache.cassandra.gms.FailureDetector; +import org.apache.cassandra.hints.Hint; +import org.apache.cassandra.hints.HintsService; +import org.apache.cassandra.io.util.DataInputBuffer; +import org.apache.cassandra.io.util.DataOutputBuffer; +import org.apache.cassandra.net.MessageIn; +import org.apache.cassandra.net.MessageOut; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.service.WriteResponseHandler; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.UUIDGen; + +import static com.google.common.collect.Iterables.transform; +import static org.apache.cassandra.cql3.QueryProcessor.executeInternal; +import static org.apache.cassandra.cql3.QueryProcessor.executeInternalWithPaging; + +public class BatchlogManager implements BatchlogManagerMBean +{ + public static final String MBEAN_NAME = "org.apache.cassandra.db:type=BatchlogManager"; + private static final long REPLAY_INTERVAL = 10 * 1000; // milliseconds + static final int DEFAULT_PAGE_SIZE = 128; + + private static final Logger logger = LoggerFactory.getLogger(BatchlogManager.class); + public static final BatchlogManager instance = new BatchlogManager(); + + private volatile long totalBatchesReplayed = 0; // no concurrency protection necessary as only written by replay thread. + private volatile UUID lastReplayedUuid = UUIDGen.minTimeUUID(0); + + // Single-thread executor service for scheduling and serializing log replay. + private final ScheduledExecutorService batchlogTasks = new DebuggableScheduledThreadPoolExecutor("BatchlogTasks"); + + public void start() + { + MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); + try + { + mbs.registerMBean(this, new ObjectName(MBEAN_NAME)); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + + batchlogTasks.scheduleWithFixedDelay(this::replayFailedBatches, + StorageService.RING_DELAY, + REPLAY_INTERVAL, + TimeUnit.MILLISECONDS); + } + + public void shutdown() throws InterruptedException + { + batchlogTasks.shutdown(); + batchlogTasks.awaitTermination(60, TimeUnit.SECONDS); + } + + public static void remove(UUID id) + { + new Mutation(PartitionUpdate.fullPartitionDelete(SystemKeyspace.Batches, + UUIDType.instance.decompose(id), + FBUtilities.timestampMicros(), + FBUtilities.nowInSeconds())) + .apply(); + } + + public static void store(Batch batch) + { + store(batch, true); + } + + public static void store(Batch batch, boolean durableWrites) + { + RowUpdateBuilder builder = + new RowUpdateBuilder(SystemKeyspace.Batches, batch.creationTime, batch.id) + .clustering() + .add("version", MessagingService.current_version); + + for (ByteBuffer mutation : batch.encodedMutations) + builder.addListEntry("mutations", mutation); + + for (Mutation mutation : batch.decodedMutations) + { + try (DataOutputBuffer buffer = new DataOutputBuffer()) + { + Mutation.serializer.serialize(mutation, buffer, MessagingService.current_version); + builder.addListEntry("mutations", buffer.buffer()); + } + catch (IOException e) + { + // shouldn't happen + throw new AssertionError(e); + } + } + + builder.build().apply(durableWrites); + } + + @VisibleForTesting + public int countAllBatches() + { + String query = String.format("SELECT count(*) FROM %s.%s", SystemKeyspace.NAME, SystemKeyspace.BATCHES); + UntypedResultSet results = executeInternal(query); + if (results == null || results.isEmpty()) + return 0; + + return (int) results.one().getLong("count"); + } + + public long getTotalBatchesReplayed() + { + return totalBatchesReplayed; + } + + public void forceBatchlogReplay() throws Exception + { + startBatchlogReplay().get(); + } + + public Future<?> startBatchlogReplay() + { + // If a replay is already in progress this request will be executed after it completes. + return batchlogTasks.submit(this::replayFailedBatches); + } + + void performInitialReplay() throws InterruptedException, ExecutionException + { + // Invokes initial replay. Used for testing only. + batchlogTasks.submit(this::replayFailedBatches).get(); + } + + private void replayFailedBatches() + { + logger.debug("Started replayFailedBatches"); + + // rate limit is in bytes per second. Uses Double.MAX_VALUE if disabled (set to 0 in cassandra.yaml). + // max rate is scaled by the number of nodes in the cluster (same as for HHOM - see CASSANDRA-5272). + int throttleInKB = DatabaseDescriptor.getBatchlogReplayThrottleInKB() / StorageService.instance.getTokenMetadata().getAllEndpoints().size(); + RateLimiter rateLimiter = RateLimiter.create(throttleInKB == 0 ? Double.MAX_VALUE : throttleInKB * 1024); + + UUID limitUuid = UUIDGen.maxTimeUUID(System.currentTimeMillis() - getBatchlogTimeout()); + ColumnFamilyStore store = Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.BATCHES); + int pageSize = calculatePageSize(store); + // There cannot be any live content where token(id) <= token(lastReplayedUuid) as every processed batch is + // deleted, but the tombstoned content may still be present in the tables. To avoid walking over it we specify + // token(id) > token(lastReplayedUuid) as part of the query. + String query = String.format("SELECT id, mutations, version FROM %s.%s WHERE token(id) > token(?) AND token(id) <= token(?)", + SystemKeyspace.NAME, + SystemKeyspace.BATCHES); + UntypedResultSet batches = executeInternalWithPaging(query, pageSize, lastReplayedUuid, limitUuid); + processBatchlogEntries(batches, pageSize, rateLimiter); + lastReplayedUuid = limitUuid; + logger.debug("Finished replayFailedBatches"); + } + + // read less rows (batches) per page if they are very large + static int calculatePageSize(ColumnFamilyStore store) + { + double averageRowSize = store.getMeanPartitionSize(); + if (averageRowSize <= 0) + return DEFAULT_PAGE_SIZE; + + return (int) Math.max(1, Math.min(DEFAULT_PAGE_SIZE, 4 * 1024 * 1024 / averageRowSize)); + } + + private void processBatchlogEntries(UntypedResultSet batches, int pageSize, RateLimiter rateLimiter) + { + int positionInPage = 0; + ArrayList<ReplayingBatch> unfinishedBatches = new ArrayList<>(pageSize); + + Set<InetAddress> hintedNodes = new HashSet<>(); + Set<UUID> replayedBatches = new HashSet<>(); + + // Sending out batches for replay without waiting for them, so that one stuck batch doesn't affect others + for (UntypedResultSet.Row row : batches) + { + UUID id = row.getUUID("id"); + int version = row.getInt("version"); + try + { + ReplayingBatch batch = new ReplayingBatch(id, version, row.getList("mutations", BytesType.instance)); + if (batch.replay(rateLimiter, hintedNodes) > 0) + { + unfinishedBatches.add(batch); + } + else + { + remove(id); // no write mutations were sent (either expired or all CFs involved truncated). + ++totalBatchesReplayed; + } + } + catch (IOException e) + { + logger.warn("Skipped batch replay of {} due to {}", id, e); + remove(id); + } + + if (++positionInPage == pageSize) + { + // We have reached the end of a batch. To avoid keeping more than a page of mutations in memory, + // finish processing the page before requesting the next row. + finishAndClearBatches(unfinishedBatches, hintedNodes, replayedBatches); + positionInPage = 0; + } + } + + finishAndClearBatches(unfinishedBatches, hintedNodes, replayedBatches); + + // to preserve batch guarantees, we must ensure that hints (if any) have made it to disk, before deleting the batches + HintsService.instance.flushAndFsyncBlockingly(transform(hintedNodes, StorageService.instance::getHostIdForEndpoint)); + + // once all generated hints are fsynced, actually delete the batches + replayedBatches.forEach(BatchlogManager::remove); + } + + private void finishAndClearBatches(ArrayList<ReplayingBatch> batches, Set<InetAddress> hintedNodes, Set<UUID> replayedBatches) + { + // schedule hints for timed out deliveries + for (ReplayingBatch batch : batches) + { + batch.finish(hintedNodes); + replayedBatches.add(batch.id); + } + + totalBatchesReplayed += batches.size(); + batches.clear(); + } + + public static long getBatchlogTimeout() + { + return DatabaseDescriptor.getWriteRpcTimeout() * 2; // enough time for the actual write + BM removal mutation + } + + private static class ReplayingBatch + { + private final UUID id; + private final long writtenAt; + private final List<Mutation> mutations; + private final int replayedBytes; + + private List<ReplayWriteResponseHandler<Mutation>> replayHandlers; + + ReplayingBatch(UUID id, int version, List<ByteBuffer> serializedMutations) throws IOException + { + this.id = id; + this.writtenAt = UUIDGen.unixTimestamp(id); + this.mutations = new ArrayList<>(serializedMutations.size()); + this.replayedBytes = addMutations(version, serializedMutations); + } + + public int replay(RateLimiter rateLimiter, Set<InetAddress> hintedNodes) throws IOException + { + logger.debug("Replaying batch {}", id); + + if (mutations.isEmpty()) + return 0; + + int gcgs = gcgs(mutations); + if (TimeUnit.MILLISECONDS.toSeconds(writtenAt) + gcgs <= FBUtilities.nowInSeconds()) + return 0; + + replayHandlers = sendReplays(mutations, writtenAt, hintedNodes); + + rateLimiter.acquire(replayedBytes); // acquire afterwards, to not mess up ttl calculation. + + return replayHandlers.size(); + } + + public void finish(Set<InetAddress> hintedNodes) + { + for (int i = 0; i < replayHandlers.size(); i++) + { + ReplayWriteResponseHandler<Mutation> handler = replayHandlers.get(i); + try + { + handler.get(); + } + catch (WriteTimeoutException|WriteFailureException e) + { + logger.debug("Failed replaying a batched mutation to a node, will write a hint"); + logger.debug("Failure was : {}", e.getMessage()); + // writing hints for the rest to hints, starting from i + writeHintsForUndeliveredEndpoints(i, hintedNodes); + return; + } + } + } + + private int addMutations(int version, List<ByteBuffer> serializedMutations) throws IOException + { + int ret = 0; + for (ByteBuffer serializedMutation : serializedMutations) + { + ret += serializedMutation.remaining(); + try (DataInputBuffer in = new DataInputBuffer(serializedMutation, true)) + { + addMutation(Mutation.serializer.deserialize(in, version)); + } + } + + return ret; + } + + // Remove CFs that have been truncated since. writtenAt and SystemTable#getTruncatedAt() both return millis. + // We don't abort the replay entirely b/c this can be considered a success (truncated is same as delivered then + // truncated. + private void addMutation(Mutation mutation) + { + for (UUID cfId : mutation.getColumnFamilyIds()) + if (writtenAt <= SystemKeyspace.getTruncatedAt(cfId)) + mutation = mutation.without(cfId); + + if (!mutation.isEmpty()) + mutations.add(mutation); + } + + private void writeHintsForUndeliveredEndpoints(int startFrom, Set<InetAddress> hintedNodes) + { + int gcgs = gcgs(mutations); + + // expired + if (TimeUnit.MILLISECONDS.toSeconds(writtenAt) + gcgs <= FBUtilities.nowInSeconds()) + return; + + for (int i = startFrom; i < replayHandlers.size(); i++) + { + ReplayWriteResponseHandler<Mutation> handler = replayHandlers.get(i); + Mutation undeliveredMutation = mutations.get(i); + + if (handler != null) + { + hintedNodes.addAll(handler.undelivered); + HintsService.instance.write(transform(handler.undelivered, StorageService.instance::getHostIdForEndpoint), + Hint.create(undeliveredMutation, writtenAt)); + } + } + } + + private static List<ReplayWriteResponseHandler<Mutation>> sendReplays(List<Mutation> mutations, + long writtenAt, + Set<InetAddress> hintedNodes) + { + List<ReplayWriteResponseHandler<Mutation>> handlers = new ArrayList<>(mutations.size()); + for (Mutation mutation : mutations) + { + ReplayWriteResponseHandler<Mutation> handler = sendSingleReplayMutation(mutation, writtenAt, hintedNodes); + if (handler != null) + handlers.add(handler); + } + return handlers; + } + + /** + * We try to deliver the mutations to the replicas ourselves if they are alive and only resort to writing hints + * when a replica is down or a write request times out. + * + * @return direct delivery handler to wait on or null, if no live nodes found + */ + private static ReplayWriteResponseHandler<Mutation> sendSingleReplayMutation(final Mutation mutation, + long writtenAt, + Set<InetAddress> hintedNodes) + { + Set<InetAddress> liveEndpoints = new HashSet<>(); + String ks = mutation.getKeyspaceName(); + Token tk = mutation.key().getToken(); + + for (InetAddress endpoint : Iterables.concat(StorageService.instance.getNaturalEndpoints(ks, tk), + StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, ks))) + { + if (endpoint.equals(FBUtilities.getBroadcastAddress())) + { + mutation.apply(); + } + else if (FailureDetector.instance.isAlive(endpoint)) + { + liveEndpoints.add(endpoint); // will try delivering directly instead of writing a hint. + } + else + { + hintedNodes.add(endpoint); + HintsService.instance.write(StorageService.instance.getHostIdForEndpoint(endpoint), + Hint.create(mutation, writtenAt)); + } + } + + if (liveEndpoints.isEmpty()) + return null; + + ReplayWriteResponseHandler<Mutation> handler = new ReplayWriteResponseHandler<>(liveEndpoints); + MessageOut<Mutation> message = mutation.createMessage(); + for (InetAddress endpoint : liveEndpoints) + MessagingService.instance().sendRR(message, endpoint, handler, false); + return handler; + } + + private static int gcgs(Collection<Mutation> mutations) + { + int gcgs = Integer.MAX_VALUE; + for (Mutation mutation : mutations) + gcgs = Math.min(gcgs, mutation.smallestGCGS()); + return gcgs; + } + + /** + * A wrapper of WriteResponseHandler that stores the addresses of the endpoints from + * which we did not receive a successful reply. + */ + private static class ReplayWriteResponseHandler<T> extends WriteResponseHandler<T> + { + private final Set<InetAddress> undelivered = Collections.newSetFromMap(new ConcurrentHashMap<>()); + + ReplayWriteResponseHandler(Collection<InetAddress> writeEndpoints) + { + super(writeEndpoints, Collections.<InetAddress>emptySet(), null, null, null, WriteType.UNLOGGED_BATCH); + undelivered.addAll(writeEndpoints); + } + + @Override + protected int totalBlockFor() + { + return this.naturalEndpoints.size(); + } + + @Override + public void response(MessageIn<T> m) + { + boolean removed = undelivered.remove(m == null ? FBUtilities.getBroadcastAddress() : m.from); + assert removed; + super.response(m); + } + } + } + + public static class EndpointFilter + { + private final String localRack; + private final Multimap<String, InetAddress> endpoints; + + public EndpointFilter(String localRack, Multimap<String, InetAddress> endpoints) + { + this.localRack = localRack; + this.endpoints = endpoints; + } + + /** + * @return list of candidates for batchlog hosting. If possible these will be two nodes from different racks. + */ + public Collection<InetAddress> filter() + { + // special case for single-node data centers + if (endpoints.values().size() == 1) + return endpoints.values(); + + // strip out dead endpoints and localhost + ListMultimap<String, InetAddress> validated = ArrayListMultimap.create(); + for (Map.Entry<String, InetAddress> entry : endpoints.entries()) + if (isValid(entry.getValue())) + validated.put(entry.getKey(), entry.getValue()); + + if (validated.size() <= 2) + return validated.values(); + + if (validated.size() - validated.get(localRack).size() >= 2) + { + // we have enough endpoints in other racks + validated.removeAll(localRack); + } + + if (validated.keySet().size() == 1) + { + // we have only 1 `other` rack + Collection<InetAddress> otherRack = Iterables.getOnlyElement(validated.asMap().values()); + return Lists.newArrayList(Iterables.limit(otherRack, 2)); + } + + // randomize which racks we pick from if more than 2 remaining + Collection<String> racks; + if (validated.keySet().size() == 2) + { + racks = validated.keySet(); + } + else + { + racks = Lists.newArrayList(validated.keySet()); + Collections.shuffle((List<String>) racks); + } + + // grab a random member of up to two racks + List<InetAddress> result = new ArrayList<>(2); + for (String rack : Iterables.limit(racks, 2)) + { + List<InetAddress> rackMembers = validated.get(rack); + result.add(rackMembers.get(getRandomInt(rackMembers.size()))); + } + + return result; + } + + @VisibleForTesting + protected boolean isValid(InetAddress input) + { + return !input.equals(FBUtilities.getBroadcastAddress()) && FailureDetector.instance.isAlive(input); + } + + @VisibleForTesting + protected int getRandomInt(int bound) + { + return ThreadLocalRandom.current().nextInt(bound); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/batchlog/BatchlogManagerMBean.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/batchlog/BatchlogManagerMBean.java b/src/java/org/apache/cassandra/batchlog/BatchlogManagerMBean.java new file mode 100644 index 0000000..4dcc9f2 --- /dev/null +++ b/src/java/org/apache/cassandra/batchlog/BatchlogManagerMBean.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.batchlog; + +public interface BatchlogManagerMBean +{ + /** + * Counts all batches currently in the batchlog. + * + * @return total batch count + */ + public int countAllBatches(); + + /** + * @return total count of batches replayed since node start + */ + public long getTotalBatchesReplayed(); + + /** + * Forces batchlog replay. Returns immediately if replay is already in progress. + */ + public void forceBatchlogReplay() throws Exception; +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/batchlog/LegacyBatchlogMigrator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/batchlog/LegacyBatchlogMigrator.java b/src/java/org/apache/cassandra/batchlog/LegacyBatchlogMigrator.java new file mode 100644 index 0000000..13ff81a --- /dev/null +++ b/src/java/org/apache/cassandra/batchlog/LegacyBatchlogMigrator.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.batchlog; + +import java.io.IOException; +import java.net.InetAddress; +import java.nio.ByteBuffer; +import java.util.*; +import java.util.concurrent.TimeUnit; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.cql3.QueryProcessor; +import org.apache.cassandra.cql3.UntypedResultSet; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.marshal.UUIDType; +import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.exceptions.WriteFailureException; +import org.apache.cassandra.exceptions.WriteTimeoutException; +import org.apache.cassandra.io.util.DataInputBuffer; +import org.apache.cassandra.io.util.DataOutputBuffer; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.service.AbstractWriteResponseHandler; +import org.apache.cassandra.service.WriteResponseHandler; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.UUIDGen; + +public final class LegacyBatchlogMigrator +{ + private static final Logger logger = LoggerFactory.getLogger(LegacyBatchlogMigrator.class); + + private LegacyBatchlogMigrator() + { + // static class + } + + @SuppressWarnings("deprecation") + public static void migrate() + { + ColumnFamilyStore store = Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.LEGACY_BATCHLOG); + + // nothing to migrate + if (store.isEmpty()) + return; + + logger.info("Migrating legacy batchlog to new storage"); + + int convertedBatches = 0; + String query = String.format("SELECT id, data, written_at, version FROM %s.%s", + SystemKeyspace.NAME, + SystemKeyspace.LEGACY_BATCHLOG); + + int pageSize = BatchlogManager.calculatePageSize(store); + + UntypedResultSet rows = QueryProcessor.executeInternalWithPaging(query, pageSize); + for (UntypedResultSet.Row row : rows) + { + if (apply(row, convertedBatches)) + convertedBatches++; + } + + if (convertedBatches > 0) + Keyspace.openAndGetStore(SystemKeyspace.LegacyBatchlog).truncateBlocking(); + } + + @SuppressWarnings("deprecation") + public static boolean isLegacyBatchlogMutation(Mutation mutation) + { + return mutation.getKeyspaceName().equals(SystemKeyspace.NAME) + && mutation.getPartitionUpdate(SystemKeyspace.LegacyBatchlog.cfId) != null; + } + + @SuppressWarnings("deprecation") + public static void handleLegacyMutation(Mutation mutation) + { + PartitionUpdate update = mutation.getPartitionUpdate(SystemKeyspace.LegacyBatchlog.cfId); + logger.debug("Applying legacy batchlog mutation {}", update); + update.forEach(row -> apply(UntypedResultSet.Row.fromInternalRow(update.metadata(), update.partitionKey(), row), -1)); + } + + private static boolean apply(UntypedResultSet.Row row, long counter) + { + UUID id = row.getUUID("id"); + long timestamp = id.version() == 1 ? UUIDGen.unixTimestamp(id) : row.getLong("written_at"); + int version = row.has("version") ? row.getInt("version") : MessagingService.VERSION_12; + + if (id.version() != 1) + id = UUIDGen.getTimeUUID(timestamp, counter); + + logger.debug("Converting mutation at {}", timestamp); + + try (DataInputBuffer in = new DataInputBuffer(row.getBytes("data"), false)) + { + int numMutations = in.readInt(); + List<Mutation> mutations = new ArrayList<>(numMutations); + for (int i = 0; i < numMutations; i++) + mutations.add(Mutation.serializer.deserialize(in, version)); + + BatchlogManager.store(Batch.createLocal(id, TimeUnit.MILLISECONDS.toMicros(timestamp), mutations)); + return true; + } + catch (Throwable t) + { + logger.error("Failed to convert mutation {} at timestamp {}", id, timestamp, t); + return false; + } + } + + public static void syncWriteToBatchlog(WriteResponseHandler<?> handler, Batch batch, Collection<InetAddress> endpoints) + throws WriteTimeoutException, WriteFailureException + { + for (InetAddress target : endpoints) + { + logger.debug("Sending legacy batchlog store request {} to {} for {} mutations", batch.id, target, batch.size()); + + int targetVersion = MessagingService.instance().getVersion(target); + MessagingService.instance().sendRR(getStoreMutation(batch, targetVersion).createMessage(MessagingService.Verb.MUTATION), + target, + handler, + false); + } + } + + public static void asyncRemoveFromBatchlog(Collection<InetAddress> endpoints, UUID uuid) + { + AbstractWriteResponseHandler<IMutation> handler = new WriteResponseHandler<>(endpoints, + Collections.<InetAddress>emptyList(), + ConsistencyLevel.ANY, + Keyspace.open(SystemKeyspace.NAME), + null, + WriteType.SIMPLE); + Mutation mutation = getRemoveMutation(uuid); + + for (InetAddress target : endpoints) + { + logger.debug("Sending legacy batchlog remove request {} to {}", uuid, target); + MessagingService.instance().sendRR(mutation.createMessage(MessagingService.Verb.MUTATION), target, handler, false); + } + } + + static void store(Batch batch, int version) + { + getStoreMutation(batch, version).apply(); + } + + @SuppressWarnings("deprecation") + static Mutation getStoreMutation(Batch batch, int version) + { + return new RowUpdateBuilder(SystemKeyspace.LegacyBatchlog, batch.creationTime, batch.id) + .clustering() + .add("written_at", new Date(batch.creationTime / 1000)) + .add("data", getSerializedMutations(version, batch.decodedMutations)) + .add("version", version) + .build(); + } + + @SuppressWarnings("deprecation") + private static Mutation getRemoveMutation(UUID uuid) + { + return new Mutation(PartitionUpdate.fullPartitionDelete(SystemKeyspace.LegacyBatchlog, + UUIDType.instance.decompose(uuid), + FBUtilities.timestampMicros(), + FBUtilities.nowInSeconds())); + } + + private static ByteBuffer getSerializedMutations(int version, Collection<Mutation> mutations) + { + try (DataOutputBuffer buf = new DataOutputBuffer()) + { + buf.writeInt(mutations.size()); + for (Mutation mutation : mutations) + Mutation.serializer.serialize(mutation, buf, version); + return buf.buffer(); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/concurrent/Stage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/concurrent/Stage.java b/src/java/org/apache/cassandra/concurrent/Stage.java index e91c515..a57587c 100644 --- a/src/java/org/apache/cassandra/concurrent/Stage.java +++ b/src/java/org/apache/cassandra/concurrent/Stage.java @@ -27,7 +27,6 @@ public enum Stage READ, MUTATION, COUNTER_MUTATION, - BATCHLOG_MUTATION, MATERIALIZED_VIEW_MUTATION, GOSSIP, REQUEST_RESPONSE, @@ -62,7 +61,6 @@ public enum Stage return "internal"; case MUTATION: case COUNTER_MUTATION: - case BATCHLOG_MUTATION: case MATERIALIZED_VIEW_MUTATION: case READ: case REQUEST_RESPONSE: http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/concurrent/StageManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/concurrent/StageManager.java b/src/java/org/apache/cassandra/concurrent/StageManager.java index ca83829..ee1fbe5 100644 --- a/src/java/org/apache/cassandra/concurrent/StageManager.java +++ b/src/java/org/apache/cassandra/concurrent/StageManager.java @@ -47,7 +47,6 @@ public class StageManager { stages.put(Stage.MUTATION, multiThreadedLowSignalStage(Stage.MUTATION, getConcurrentWriters())); stages.put(Stage.COUNTER_MUTATION, multiThreadedLowSignalStage(Stage.COUNTER_MUTATION, getConcurrentCounterWriters())); - stages.put(Stage.BATCHLOG_MUTATION, multiThreadedLowSignalStage(Stage.BATCHLOG_MUTATION, getConcurrentBatchlogWriters())); stages.put(Stage.MATERIALIZED_VIEW_MUTATION, multiThreadedLowSignalStage(Stage.MATERIALIZED_VIEW_MUTATION, getConcurrentMaterializedViewWriters())); stages.put(Stage.READ, multiThreadedLowSignalStage(Stage.READ, getConcurrentReaders())); stages.put(Stage.REQUEST_RESPONSE, multiThreadedLowSignalStage(Stage.REQUEST_RESPONSE, FBUtilities.getAvailableProcessors())); http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/config/Config.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index 9d55fc8..22b09d3 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -93,7 +93,6 @@ public class Config public Integer concurrent_reads = 32; public Integer concurrent_writes = 32; public Integer concurrent_counter_writes = 32; - public Integer concurrent_batchlog_writes = 32; public Integer concurrent_materialized_view_writes = 32; @Deprecated http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 4e13911..31a4e9d 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -1080,8 +1080,9 @@ public class DatabaseDescriptor case PAXOS_COMMIT: case PAXOS_PREPARE: case PAXOS_PROPOSE: - case BATCHLOG_MUTATION: case HINT: + case BATCH_STORE: + case BATCH_REMOVE: return getWriteRpcTimeout(); case COUNTER_MUTATION: return getCounterWriteRpcTimeout(); @@ -1128,10 +1129,6 @@ public class DatabaseDescriptor return conf.concurrent_counter_writes; } - public static int getConcurrentBatchlogWriters() - { - return conf.concurrent_batchlog_writes; - } public static int getConcurrentMaterializedViewWriters() { return conf.concurrent_materialized_view_writes; http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java index 5de4b6c..c8482b3 100644 --- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java @@ -524,11 +524,6 @@ public class BatchStatement implements CQLStatement } } - public interface BatchVariables - { - public List<ByteBuffer> getVariablesForStatement(int statementInBatch); - } - public String toString() { return String.format("BatchStatement(type=%s, statements=%s)", type, statements); http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/db/BatchlogManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/BatchlogManager.java b/src/java/org/apache/cassandra/db/BatchlogManager.java deleted file mode 100644 index de85925..0000000 --- a/src/java/org/apache/cassandra/db/BatchlogManager.java +++ /dev/null @@ -1,596 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db; - -import java.io.IOException; -import java.lang.management.ManagementFactory; -import java.net.InetAddress; -import java.nio.ByteBuffer; -import java.util.*; -import java.util.concurrent.*; - -import javax.management.MBeanServer; -import javax.management.ObjectName; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.*; -import com.google.common.util.concurrent.RateLimiter; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor; -import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.cql3.UntypedResultSet; -import org.apache.cassandra.db.partitions.PartitionUpdate; -import org.apache.cassandra.db.marshal.UUIDType; -import org.apache.cassandra.dht.Token; -import org.apache.cassandra.exceptions.WriteFailureException; -import org.apache.cassandra.exceptions.WriteTimeoutException; -import org.apache.cassandra.gms.FailureDetector; -import org.apache.cassandra.hints.Hint; -import org.apache.cassandra.hints.HintsService; -import org.apache.cassandra.io.util.DataInputBuffer; -import org.apache.cassandra.io.util.DataInputPlus; -import org.apache.cassandra.io.util.DataOutputBuffer; -import org.apache.cassandra.net.MessageIn; -import org.apache.cassandra.net.MessageOut; -import org.apache.cassandra.net.MessagingService; -import org.apache.cassandra.service.StorageService; -import org.apache.cassandra.service.WriteResponseHandler; -import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.UUIDGen; - -import static com.google.common.collect.Iterables.transform; -import static org.apache.cassandra.cql3.QueryProcessor.executeInternal; -import static org.apache.cassandra.cql3.QueryProcessor.executeInternalWithPaging; - -public class BatchlogManager implements BatchlogManagerMBean -{ - public static final String MBEAN_NAME = "org.apache.cassandra.db:type=BatchlogManager"; - private static final long REPLAY_INTERVAL = 10 * 1000; // milliseconds - private static final int DEFAULT_PAGE_SIZE = 128; - - private static final Logger logger = LoggerFactory.getLogger(BatchlogManager.class); - public static final BatchlogManager instance = new BatchlogManager(); - - private volatile long totalBatchesReplayed = 0; // no concurrency protection necessary as only written by replay thread. - private volatile UUID lastReplayedUuid = UUIDGen.minTimeUUID(0); - - // Single-thread executor service for scheduling and serializing log replay. - private static final ScheduledExecutorService batchlogTasks = new DebuggableScheduledThreadPoolExecutor("BatchlogTasks"); - - public void start() - { - MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); - try - { - mbs.registerMBean(this, new ObjectName(MBEAN_NAME)); - } - catch (Exception e) - { - throw new RuntimeException(e); - } - - batchlogTasks.schedule(this::replayInitially, StorageService.RING_DELAY, TimeUnit.MILLISECONDS); - - batchlogTasks.scheduleWithFixedDelay(this::replayAllFailedBatches, - StorageService.RING_DELAY + REPLAY_INTERVAL, - REPLAY_INTERVAL, - TimeUnit.MILLISECONDS); - } - - private void replayInitially() - { - // Initial run must take care of non-time-uuid batches as written by Version 1.2. - convertOldBatchEntries(); - - replayAllFailedBatches(); - } - - public static void shutdown() throws InterruptedException - { - batchlogTasks.shutdown(); - batchlogTasks.awaitTermination(60, TimeUnit.SECONDS); - } - - public int countAllBatches() - { - String query = String.format("SELECT count(*) FROM %s.%s", SystemKeyspace.NAME, SystemKeyspace.BATCHES); - UntypedResultSet results = executeInternal(query); - if (results.isEmpty()) - return 0; - return (int) results.one().getLong("count"); - } - - public long getTotalBatchesReplayed() - { - return totalBatchesReplayed; - } - - public void forceBatchlogReplay() throws Exception - { - startBatchlogReplay().get(); - } - - public Future<?> startBatchlogReplay() - { - // If a replay is already in progress this request will be executed after it completes. - return batchlogTasks.submit(this::replayAllFailedBatches); - } - - void performInitialReplay() throws InterruptedException, ExecutionException - { - // Invokes initial replay. Used for testing only. - batchlogTasks.submit(this::replayInitially).get(); - } - - public static Mutation getBatchlogMutationFor(Collection<Mutation> mutations, UUID uuid, int version) - { - return new RowUpdateBuilder(SystemKeyspace.Batches, FBUtilities.timestampMicros(), uuid) - .clustering() - .add("data", serializeMutations(mutations, version)) - .add("version", version) - .build(); - } - - @VisibleForTesting - static ByteBuffer serializeMutations(Collection<Mutation> mutations, int version) - { - try (DataOutputBuffer buf = new DataOutputBuffer()) - { - buf.writeInt(mutations.size()); - for (Mutation mutation : mutations) - Mutation.serializer.serialize(mutation, buf, version); - return buf.buffer(); - } - catch (IOException e) - { - throw new AssertionError(); // cannot happen. - } - } - - private void replayAllFailedBatches() - { - logger.debug("Started replayAllFailedBatches"); - - // rate limit is in bytes per second. Uses Double.MAX_VALUE if disabled (set to 0 in cassandra.yaml). - // max rate is scaled by the number of nodes in the cluster (same as for HHOM - see CASSANDRA-5272). - int throttleInKB = DatabaseDescriptor.getBatchlogReplayThrottleInKB() / StorageService.instance.getTokenMetadata().getAllEndpoints().size(); - RateLimiter rateLimiter = RateLimiter.create(throttleInKB == 0 ? Double.MAX_VALUE : throttleInKB * 1024); - - UUID limitUuid = UUIDGen.maxTimeUUID(System.currentTimeMillis() - getBatchlogTimeout()); - int pageSize = calculatePageSize(); - // There cannot be any live content where token(id) <= token(lastReplayedUuid) as every processed batch is - // deleted, but the tombstoned content may still be present in the tables. To avoid walking over it we specify - // token(id) > token(lastReplayedUuid) as part of the query. - String query = String.format("SELECT id, data, version FROM %s.%s WHERE token(id) > token(?) AND token(id) <= token(?)", - SystemKeyspace.NAME, - SystemKeyspace.BATCHES); - UntypedResultSet batches = executeInternalWithPaging(query, pageSize, lastReplayedUuid, limitUuid); - processBatchlogEntries(batches, pageSize, rateLimiter); - lastReplayedUuid = limitUuid; - logger.debug("Finished replayAllFailedBatches"); - } - - // read less rows (batches) per page if they are very large - private static int calculatePageSize() - { - ColumnFamilyStore store = Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.BATCHES); - double averageRowSize = store.getMeanPartitionSize(); - if (averageRowSize <= 0) - return DEFAULT_PAGE_SIZE; - - return (int) Math.max(1, Math.min(DEFAULT_PAGE_SIZE, 4 * 1024 * 1024 / averageRowSize)); - } - - private static void deleteBatch(UUID id) - { - Mutation mutation = new Mutation( - PartitionUpdate.fullPartitionDelete(SystemKeyspace.Batches, - UUIDType.instance.decompose(id), - FBUtilities.timestampMicros(), - FBUtilities.nowInSeconds())); - mutation.apply(); - } - - private void processBatchlogEntries(UntypedResultSet batches, int pageSize, RateLimiter rateLimiter) - { - int positionInPage = 0; - ArrayList<Batch> unfinishedBatches = new ArrayList<>(pageSize); - - Set<InetAddress> hintedNodes = new HashSet<>(); - Set<UUID> replayedBatches = new HashSet<>(); - - // Sending out batches for replay without waiting for them, so that one stuck batch doesn't affect others - for (UntypedResultSet.Row row : batches) - { - UUID id = row.getUUID("id"); - int version = row.getInt("version"); - Batch batch = new Batch(id, row.getBytes("data"), version); - try - { - if (batch.replay(rateLimiter, hintedNodes) > 0) - { - unfinishedBatches.add(batch); - } - else - { - deleteBatch(id); // no write mutations were sent (either expired or all CFs involved truncated). - ++totalBatchesReplayed; - } - } - catch (IOException e) - { - logger.warn("Skipped batch replay of {} due to {}", id, e); - deleteBatch(id); - } - - if (++positionInPage == pageSize) - { - // We have reached the end of a batch. To avoid keeping more than a page of mutations in memory, - // finish processing the page before requesting the next row. - finishAndClearBatches(unfinishedBatches, hintedNodes, replayedBatches); - positionInPage = 0; - } - } - - finishAndClearBatches(unfinishedBatches, hintedNodes, replayedBatches); - - // to preserve batch guarantees, we must ensure that hints (if any) have made it to disk, before deleting the batches - HintsService.instance.flushAndFsyncBlockingly(transform(hintedNodes, StorageService.instance::getHostIdForEndpoint)); - - // once all generated hints are fsynced, actually delete the batches - replayedBatches.forEach(BatchlogManager::deleteBatch); - } - - private void finishAndClearBatches(ArrayList<Batch> batches, Set<InetAddress> hintedNodes, Set<UUID> replayedBatches) - { - // schedule hints for timed out deliveries - for (Batch batch : batches) - { - batch.finish(hintedNodes); - replayedBatches.add(batch.id); - } - - totalBatchesReplayed += batches.size(); - batches.clear(); - } - - public static long getBatchlogTimeout() - { - return DatabaseDescriptor.getWriteRpcTimeout() * 2; // enough time for the actual write + BM removal mutation - } - - private static class Batch - { - private final UUID id; - private final long writtenAt; - private final ByteBuffer data; - private final int version; - - private List<ReplayWriteResponseHandler<Mutation>> replayHandlers; - - Batch(UUID id, ByteBuffer data, int version) - { - this.id = id; - this.writtenAt = UUIDGen.unixTimestamp(id); - this.data = data; - this.version = version; - } - - public int replay(RateLimiter rateLimiter, Set<InetAddress> hintedNodes) throws IOException - { - logger.debug("Replaying batch {}", id); - - List<Mutation> mutations = replayingMutations(); - - if (mutations.isEmpty()) - return 0; - - int gcgs = gcgs(mutations); - if (TimeUnit.MILLISECONDS.toSeconds(writtenAt) + gcgs <= FBUtilities.nowInSeconds()) - return 0; - - replayHandlers = sendReplays(mutations, writtenAt, hintedNodes); - - rateLimiter.acquire(data.remaining()); // acquire afterwards, to not mess up ttl calculation. - - return replayHandlers.size(); - } - - public void finish(Set<InetAddress> hintedNodes) - { - for (int i = 0; i < replayHandlers.size(); i++) - { - ReplayWriteResponseHandler<Mutation> handler = replayHandlers.get(i); - try - { - handler.get(); - } - catch (WriteTimeoutException|WriteFailureException e) - { - logger.debug("Failed replaying a batched mutation to a node, will write a hint"); - logger.debug("Failure was : {}", e.getMessage()); - // writing hints for the rest to hints, starting from i - writeHintsForUndeliveredEndpoints(i, hintedNodes); - return; - } - } - } - - private List<Mutation> replayingMutations() throws IOException - { - DataInputPlus in = new DataInputBuffer(data, true); - int size = in.readInt(); - List<Mutation> mutations = new ArrayList<>(size); - for (int i = 0; i < size; i++) - { - Mutation mutation = Mutation.serializer.deserialize(in, version); - - // Remove CFs that have been truncated since. writtenAt and SystemTable#getTruncatedAt() both return millis. - // We don't abort the replay entirely b/c this can be considered a success (truncated is same as delivered then - // truncated. - for (UUID cfId : mutation.getColumnFamilyIds()) - if (writtenAt <= SystemKeyspace.getTruncatedAt(cfId)) - mutation = mutation.without(cfId); - - if (!mutation.isEmpty()) - mutations.add(mutation); - } - return mutations; - } - - private void writeHintsForUndeliveredEndpoints(int startFrom, Set<InetAddress> hintedNodes) - { - try - { - // Here we deserialize mutations 2nd time from byte buffer. - // but this is ok, because timeout on batch direct delivery is rare - // (it can happen only several seconds until node is marked dead) - // so trading some cpu to keep less objects - List<Mutation> replayingMutations = replayingMutations(); - for (int i = startFrom; i < replayHandlers.size(); i++) - { - Mutation undeliveredMutation = replayingMutations.get(i); - int gcgs = gcgs(replayingMutations); - ReplayWriteResponseHandler<Mutation> handler = replayHandlers.get(i); - - if (TimeUnit.MILLISECONDS.toSeconds(writtenAt) + gcgs > FBUtilities.nowInSeconds() && handler != null) - { - hintedNodes.addAll(handler.undelivered); - HintsService.instance.write(transform(handler.undelivered, StorageService.instance::getHostIdForEndpoint), - Hint.create(undeliveredMutation, writtenAt)); - } - } - } - catch (IOException e) - { - logger.error("Cannot schedule hints for undelivered batch", e); - } - } - - private static List<ReplayWriteResponseHandler<Mutation>> sendReplays(List<Mutation> mutations, - long writtenAt, - Set<InetAddress> hintedNodes) - { - List<ReplayWriteResponseHandler<Mutation>> handlers = new ArrayList<>(mutations.size()); - for (Mutation mutation : mutations) - { - ReplayWriteResponseHandler<Mutation> handler = sendSingleReplayMutation(mutation, writtenAt, hintedNodes); - if (handler != null) - handlers.add(handler); - } - return handlers; - } - - /** - * We try to deliver the mutations to the replicas ourselves if they are alive and only resort to writing hints - * when a replica is down or a write request times out. - * - * @return direct delivery handler to wait on or null, if no live nodes found - */ - private static ReplayWriteResponseHandler<Mutation> sendSingleReplayMutation(final Mutation mutation, - long writtenAt, - Set<InetAddress> hintedNodes) - { - Set<InetAddress> liveEndpoints = new HashSet<>(); - String ks = mutation.getKeyspaceName(); - Token tk = mutation.key().getToken(); - - for (InetAddress endpoint : Iterables.concat(StorageService.instance.getNaturalEndpoints(ks, tk), - StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, ks))) - { - if (endpoint.equals(FBUtilities.getBroadcastAddress())) - { - mutation.apply(); - } - else if (FailureDetector.instance.isAlive(endpoint)) - { - liveEndpoints.add(endpoint); // will try delivering directly instead of writing a hint. - } - else - { - hintedNodes.add(endpoint); - HintsService.instance.write(StorageService.instance.getHostIdForEndpoint(endpoint), - Hint.create(mutation, writtenAt)); - } - } - - if (liveEndpoints.isEmpty()) - return null; - - ReplayWriteResponseHandler<Mutation> handler = new ReplayWriteResponseHandler<>(liveEndpoints); - MessageOut<Mutation> message = mutation.createMessage(); - for (InetAddress endpoint : liveEndpoints) - MessagingService.instance().sendRR(message, endpoint, handler, false); - return handler; - } - - private static int gcgs(Collection<Mutation> mutations) - { - int gcgs = Integer.MAX_VALUE; - for (Mutation mutation : mutations) - gcgs = Math.min(gcgs, mutation.smallestGCGS()); - return gcgs; - } - - /** - * A wrapper of WriteResponseHandler that stores the addresses of the endpoints from - * which we did not receive a successful reply. - */ - private static class ReplayWriteResponseHandler<T> extends WriteResponseHandler<T> - { - private final Set<InetAddress> undelivered = Collections.newSetFromMap(new ConcurrentHashMap<>()); - - ReplayWriteResponseHandler(Collection<InetAddress> writeEndpoints) - { - super(writeEndpoints, Collections.<InetAddress>emptySet(), null, null, null, WriteType.UNLOGGED_BATCH); - undelivered.addAll(writeEndpoints); - } - - @Override - protected int totalBlockFor() - { - return this.naturalEndpoints.size(); - } - - @Override - public void response(MessageIn<T> m) - { - boolean removed = undelivered.remove(m == null ? FBUtilities.getBroadcastAddress() : m.from); - assert removed; - super.response(m); - } - } - } - - @SuppressWarnings("deprecation") - private static void convertOldBatchEntries() - { - logger.debug("Started convertOldBatchEntries"); - - String query = String.format("SELECT id, data, written_at, version FROM %s.%s", - SystemKeyspace.NAME, - SystemKeyspace.LEGACY_BATCHLOG); - UntypedResultSet batches = executeInternalWithPaging(query, DEFAULT_PAGE_SIZE); - int convertedBatches = 0; - for (UntypedResultSet.Row row : batches) - { - UUID id = row.getUUID("id"); - long timestamp = row.getLong("written_at"); - int version = row.has("version") ? row.getInt("version") : MessagingService.VERSION_12; - logger.debug("Converting mutation at " + timestamp); - - UUID newId = id; - if (id.version() != 1 || timestamp != UUIDGen.unixTimestamp(id)) - newId = UUIDGen.getTimeUUID(timestamp, convertedBatches); - ++convertedBatches; - - Mutation addRow = new RowUpdateBuilder(SystemKeyspace.Batches, - FBUtilities.timestampMicros(), - newId) - .clustering() - .add("data", row.getBytes("data")) - .add("version", version) - .build(); - - addRow.apply(); - } - if (convertedBatches > 0) - Keyspace.openAndGetStore(SystemKeyspace.LegacyBatchlog).truncateBlocking(); - // cleanup will be called after replay - logger.debug("Finished convertOldBatchEntries"); - } - - public static class EndpointFilter - { - private final String localRack; - private final Multimap<String, InetAddress> endpoints; - - public EndpointFilter(String localRack, Multimap<String, InetAddress> endpoints) - { - this.localRack = localRack; - this.endpoints = endpoints; - } - - /** - * @return list of candidates for batchlog hosting. If possible these will be two nodes from different racks. - */ - public Collection<InetAddress> filter() - { - // special case for single-node data centers - if (endpoints.values().size() == 1) - return endpoints.values(); - - // strip out dead endpoints and localhost - ListMultimap<String, InetAddress> validated = ArrayListMultimap.create(); - for (Map.Entry<String, InetAddress> entry : endpoints.entries()) - if (isValid(entry.getValue())) - validated.put(entry.getKey(), entry.getValue()); - - if (validated.size() <= 2) - return validated.values(); - - if (validated.size() - validated.get(localRack).size() >= 2) - { - // we have enough endpoints in other racks - validated.removeAll(localRack); - } - - if (validated.keySet().size() == 1) - { - // we have only 1 `other` rack - Collection<InetAddress> otherRack = Iterables.getOnlyElement(validated.asMap().values()); - return Lists.newArrayList(Iterables.limit(otherRack, 2)); - } - - // randomize which racks we pick from if more than 2 remaining - Collection<String> racks; - if (validated.keySet().size() == 2) - { - racks = validated.keySet(); - } - else - { - racks = Lists.newArrayList(validated.keySet()); - Collections.shuffle((List<String>) racks); - } - - // grab a random member of up to two racks - List<InetAddress> result = new ArrayList<>(2); - for (String rack : Iterables.limit(racks, 2)) - { - List<InetAddress> rackMembers = validated.get(rack); - result.add(rackMembers.get(getRandomInt(rackMembers.size()))); - } - - return result; - } - - @VisibleForTesting - protected boolean isValid(InetAddress input) - { - return !input.equals(FBUtilities.getBroadcastAddress()) && FailureDetector.instance.isAlive(input); - } - - @VisibleForTesting - protected int getRandomInt(int bound) - { - return ThreadLocalRandom.current().nextInt(bound); - } - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/db/BatchlogManagerMBean.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/BatchlogManagerMBean.java b/src/java/org/apache/cassandra/db/BatchlogManagerMBean.java deleted file mode 100644 index a688117..0000000 --- a/src/java/org/apache/cassandra/db/BatchlogManagerMBean.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db; - -public interface BatchlogManagerMBean -{ - /** - * Counts all batches currently in the batchlog. - * - * @return total batch count - */ - public int countAllBatches(); - - /** - * @return total count of batches replayed since node start - */ - public long getTotalBatchesReplayed(); - - /** - * Forces batchlog replay. Returns immediately if replay is already in progress. - */ - public void forceBatchlogReplay() throws Exception; -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java b/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java index d9ee38a..e349bfc 100644 --- a/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java +++ b/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java @@ -49,7 +49,7 @@ public class CounterMutationVerbHandler implements IVerbHandler<CounterMutation> { public void run() { - MessagingService.instance().sendReply(new WriteResponse().createMessage(), id, message.from); + MessagingService.instance().sendReply(WriteResponse.createMessage(), id, message.from); } }); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/db/Mutation.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Mutation.java b/src/java/org/apache/cassandra/db/Mutation.java index 6e78b0e..da7d13d 100644 --- a/src/java/org/apache/cassandra/db/Mutation.java +++ b/src/java/org/apache/cassandra/db/Mutation.java @@ -58,7 +58,7 @@ public class Mutation implements IMutation public final long createdAt = System.currentTimeMillis(); public Mutation(String keyspaceName, DecoratedKey key) { - this(keyspaceName, key, new HashMap<UUID, PartitionUpdate>()); + this(keyspaceName, key, new HashMap<>()); } public Mutation(PartitionUpdate update) @@ -201,6 +201,11 @@ public class Mutation implements IMutation ks.apply(this, ks.getMetadata().params.durableWrites); } + public void apply(boolean durableWrites) + { + Keyspace.open(keyspaceName).apply(this, durableWrites); + } + public void applyUnsafe() { Keyspace.open(keyspaceName).apply(this, false); http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/db/MutationVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/MutationVerbHandler.java b/src/java/org/apache/cassandra/db/MutationVerbHandler.java index 640e45f..d4670a2 100644 --- a/src/java/org/apache/cassandra/db/MutationVerbHandler.java +++ b/src/java/org/apache/cassandra/db/MutationVerbHandler.java @@ -18,10 +18,10 @@ package org.apache.cassandra.db; import java.io.DataInputStream; -import java.io.IOError; import java.io.IOException; import java.net.InetAddress; +import org.apache.cassandra.batchlog.LegacyBatchlogMigrator; import org.apache.cassandra.exceptions.WriteTimeoutException; import org.apache.cassandra.io.util.FastByteArrayInputStream; import org.apache.cassandra.net.*; @@ -29,31 +29,32 @@ import org.apache.cassandra.tracing.Tracing; public class MutationVerbHandler implements IVerbHandler<Mutation> { - private static final boolean TEST_FAIL_WRITES = System.getProperty("cassandra.test.fail_writes", "false").equalsIgnoreCase("true"); - public void doVerb(MessageIn<Mutation> message, int id) throws IOException { - // Check if there were any forwarding headers in this message - byte[] from = message.parameters.get(Mutation.FORWARD_FROM); - InetAddress replyTo; - if (from == null) - { - replyTo = message.from; - byte[] forwardBytes = message.parameters.get(Mutation.FORWARD_TO); - if (forwardBytes != null) - forwardToLocalNodes(message.payload, message.verb, forwardBytes, message.from); - } - else - { - replyTo = InetAddress.getByAddress(from); - } + // Check if there were any forwarding headers in this message + byte[] from = message.parameters.get(Mutation.FORWARD_FROM); + InetAddress replyTo; + if (from == null) + { + replyTo = message.from; + byte[] forwardBytes = message.parameters.get(Mutation.FORWARD_TO); + if (forwardBytes != null) + forwardToLocalNodes(message.payload, message.verb, forwardBytes, message.from); + } + else + { + replyTo = InetAddress.getByAddress(from); + } try { - message.payload.apply(); - WriteResponse response = new WriteResponse(); + if (message.version < MessagingService.VERSION_30 && LegacyBatchlogMigrator.isLegacyBatchlogMutation(message.payload)) + LegacyBatchlogMigrator.handleLegacyMutation(message.payload); + else + message.payload.apply(); + Tracing.trace("Enqueuing response to {}", replyTo); - MessagingService.instance().sendReply(response.createMessage(), id, replyTo); + MessagingService.instance().sendReply(WriteResponse.createMessage(), id, replyTo); } catch (WriteTimeoutException wto) { @@ -65,7 +66,7 @@ public class MutationVerbHandler implements IVerbHandler<Mutation> * Older version (< 1.0) will not send this message at all, hence we don't * need to check the version of the data. */ - private void forwardToLocalNodes(Mutation mutation, MessagingService.Verb verb, byte[] forwardBytes, InetAddress from) throws IOException + private static void forwardToLocalNodes(Mutation mutation, MessagingService.Verb verb, byte[] forwardBytes, InetAddress from) throws IOException { try (DataInputStream in = new DataInputStream(new FastByteArrayInputStream(forwardBytes))) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java b/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java index 849ac70..2e499e7 100644 --- a/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java +++ b/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java @@ -26,7 +26,6 @@ public class ReadRepairVerbHandler implements IVerbHandler<Mutation> public void doVerb(MessageIn<Mutation> message, int id) { message.payload.apply(); - WriteResponse response = new WriteResponse(); - MessagingService.instance().sendReply(response.createMessage(), id, message.from); + MessagingService.instance().sendReply(WriteResponse.createMessage(), id, message.from); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/db/SystemKeyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java index fb9eb48..cf8e14d 100644 --- a/src/java/org/apache/cassandra/db/SystemKeyspace.java +++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java @@ -113,7 +113,7 @@ public final class SystemKeyspace "batches awaiting replay", "CREATE TABLE %s (" + "id timeuuid," - + "data blob," + + "mutations list<blob>," + "version int," + "PRIMARY KEY ((id)))") .copy(new LocalPartitioner(TimeUUIDType.instance))
