http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/db/WriteResponse.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/WriteResponse.java b/src/java/org/apache/cassandra/db/WriteResponse.java index 824368e..0dddaab 100644 --- a/src/java/org/apache/cassandra/db/WriteResponse.java +++ b/src/java/org/apache/cassandra/db/WriteResponse.java @@ -28,16 +28,22 @@ import org.apache.cassandra.net.MessagingService; /* * This empty response is sent by a replica to inform the coordinator that the write succeeded */ -public class WriteResponse +public final class WriteResponse { - public static final WriteResponseSerializer serializer = new WriteResponseSerializer(); + public static final Serializer serializer = new Serializer(); - public MessageOut<WriteResponse> createMessage() + private static final WriteResponse instance = new WriteResponse(); + + private WriteResponse() + { + } + + public static MessageOut<WriteResponse> createMessage() { - return new MessageOut<WriteResponse>(MessagingService.Verb.REQUEST_RESPONSE, this, serializer); + return new MessageOut<>(MessagingService.Verb.REQUEST_RESPONSE, instance, serializer); } - public static class WriteResponseSerializer implements IVersionedSerializer<WriteResponse> + public static class Serializer implements IVersionedSerializer<WriteResponse> { public void serialize(WriteResponse wm, DataOutputPlus out, int version) throws IOException { @@ -45,7 +51,7 @@ public class WriteResponse public WriteResponse deserialize(DataInputPlus in, int version) throws IOException { - return new WriteResponse(); + return instance; } public long serializedSize(WriteResponse response, int version)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/hints/EncodedHintMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hints/EncodedHintMessage.java b/src/java/org/apache/cassandra/hints/EncodedHintMessage.java index 2797495..56727fc 100644 --- a/src/java/org/apache/cassandra/hints/EncodedHintMessage.java +++ b/src/java/org/apache/cassandra/hints/EncodedHintMessage.java @@ -65,8 +65,8 @@ final class EncodedHintMessage if (version != message.version) throw new IllegalArgumentException("serializedSize() called with non-matching version " + version); - int size = (int) UUIDSerializer.serializer.serializedSize(message.hostId, version); - size += TypeSizes.sizeof(message.hint.remaining()); + long size = UUIDSerializer.serializer.serializedSize(message.hostId, version); + size += TypeSizes.sizeofVInt(message.hint.remaining()); size += message.hint.remaining(); return size; } @@ -77,7 +77,7 @@ final class EncodedHintMessage throw new IllegalArgumentException("serialize() called with non-matching version " + version); UUIDSerializer.serializer.serialize(message.hostId, out, version); - out.writeInt(message.hint.remaining()); + out.writeVInt(message.hint.remaining()); out.write(message.hint); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/hints/Hint.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hints/Hint.java b/src/java/org/apache/cassandra/hints/Hint.java index d8f85c5..c88c494 100644 --- a/src/java/org/apache/cassandra/hints/Hint.java +++ b/src/java/org/apache/cassandra/hints/Hint.java @@ -26,6 +26,9 @@ import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; +import static org.apache.cassandra.db.TypeSizes.sizeof; +import static org.apache.cassandra.db.TypeSizes.sizeofVInt; + /** * Encapsulates the hinted mutation, its creation time, and the gc grace seconds param for each table involved. * @@ -107,8 +110,8 @@ public final class Hint { public long serializedSize(Hint hint, int version) { - long size = TypeSizes.sizeof(hint.creationTime); - size += TypeSizes.sizeof(hint.gcgs); + long size = sizeof(hint.creationTime); + size += sizeofVInt(hint.gcgs); size += Mutation.serializer.serializedSize(hint.mutation, version); return size; } @@ -116,14 +119,14 @@ public final class Hint public void serialize(Hint hint, DataOutputPlus out, int version) throws IOException { out.writeLong(hint.creationTime); - out.writeInt(hint.gcgs); + out.writeVInt(hint.gcgs); Mutation.serializer.serialize(hint.mutation, out, version); } public Hint deserialize(DataInputPlus in, int version) throws IOException { long creationTime = in.readLong(); - int gcgs = in.readInt(); + int gcgs = (int) in.readVInt(); return new Hint(Mutation.serializer.deserialize(in, version), creationTime, gcgs); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/hints/HintMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hints/HintMessage.java b/src/java/org/apache/cassandra/hints/HintMessage.java index 89baa89..6296a8c 100644 --- a/src/java/org/apache/cassandra/hints/HintMessage.java +++ b/src/java/org/apache/cassandra/hints/HintMessage.java @@ -24,6 +24,8 @@ import java.util.UUID; import javax.annotation.Nullable; +import com.google.common.primitives.Ints; + import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.db.UnknownColumnFamilyException; import org.apache.cassandra.io.IVersionedSerializer; @@ -81,10 +83,10 @@ public final class HintMessage { public long serializedSize(HintMessage message, int version) { - int size = (int) UUIDSerializer.serializer.serializedSize(message.hostId, version); + long size = UUIDSerializer.serializer.serializedSize(message.hostId, version); - int hintSize = (int) Hint.serializer.serializedSize(message.hint, version); - size += TypeSizes.sizeof(hintSize); + long hintSize = Hint.serializer.serializedSize(message.hint, version); + size += TypeSizes.sizeofVInt(hintSize); size += hintSize; return size; @@ -100,7 +102,7 @@ public final class HintMessage * We are serializing the hint size so that the receiver of the message could gracefully handle * deserialize failure when a table had been dropped, by simply skipping the unread bytes. */ - out.writeInt((int) Hint.serializer.serializedSize(message.hint, version)); + out.writeVInt(Hint.serializer.serializedSize(message.hint, version)); Hint.serializer.serialize(message.hint, out, version); } @@ -114,7 +116,7 @@ public final class HintMessage { UUID hostId = UUIDSerializer.serializer.deserialize(in, version); - int hintSize = in.readInt(); + long hintSize = in.readVInt(); BytesReadTracker countingIn = new BytesReadTracker(in); try { @@ -122,7 +124,7 @@ public final class HintMessage } catch (UnknownColumnFamilyException e) { - in.skipBytes(hintSize - (int) countingIn.getBytesRead()); + in.skipBytes(Ints.checkedCast(hintSize - countingIn.getBytesRead())); return new HintMessage(hostId, e.cfId); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/hints/LegacyHintsMigrator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hints/LegacyHintsMigrator.java b/src/java/org/apache/cassandra/hints/LegacyHintsMigrator.java index 196f184..b0095ed 100644 --- a/src/java/org/apache/cassandra/hints/LegacyHintsMigrator.java +++ b/src/java/org/apache/cassandra/hints/LegacyHintsMigrator.java @@ -92,7 +92,7 @@ public final class LegacyHintsMigrator compactLegacyHints(); // paginate over legacy hints and write them to the new storage - logger.info("Migrating legacy hints to the new storage"); + logger.info("Writing legacy hints to the new storage"); migrateLegacyHints(); // truncate the legacy hints table http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/net/MessagingService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java index e59cd58..15199fe 100644 --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@ -47,6 +47,7 @@ import org.apache.cassandra.concurrent.TracingAwareExecutorService; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions; import org.apache.cassandra.db.*; +import org.apache.cassandra.batchlog.Batch; import org.apache.cassandra.dht.AbstractBounds; import org.apache.cassandra.dht.BootStrapper; import org.apache.cassandra.dht.IPartitioner; @@ -103,8 +104,8 @@ public final class MessagingService implements MessagingServiceMBean READ_REPAIR, READ, REQUEST_RESPONSE, // client-initiated reads and writes - @Deprecated STREAM_INITIATE, - @Deprecated STREAM_INITIATE_DONE, + BATCH_STORE, // was @Deprecated STREAM_INITIATE, + BATCH_REMOVE, // was @Deprecated STREAM_INITIATE_DONE, @Deprecated STREAM_REPLY, @Deprecated STREAM_REQUEST, RANGE_SLICE, @@ -135,7 +136,6 @@ public final class MessagingService implements MessagingServiceMBean PAXOS_PROPOSE, PAXOS_COMMIT, @Deprecated PAGED_RANGE, - BATCHLOG_MUTATION, // remember to add new verbs at the end, since we serialize by ordinal UNUSED_1, UNUSED_2, @@ -149,13 +149,14 @@ public final class MessagingService implements MessagingServiceMBean {{ put(Verb.MUTATION, Stage.MUTATION); put(Verb.COUNTER_MUTATION, Stage.COUNTER_MUTATION); - put(Verb.BATCHLOG_MUTATION, Stage.BATCHLOG_MUTATION); put(Verb.READ_REPAIR, Stage.MUTATION); put(Verb.HINT, Stage.MUTATION); put(Verb.TRUNCATE, Stage.MUTATION); put(Verb.PAXOS_PREPARE, Stage.MUTATION); put(Verb.PAXOS_PROPOSE, Stage.MUTATION); put(Verb.PAXOS_COMMIT, Stage.MUTATION); + put(Verb.BATCH_STORE, Stage.MUTATION); + put(Verb.BATCH_REMOVE, Stage.MUTATION); put(Verb.READ, Stage.READ); put(Verb.RANGE_SLICE, Stage.READ); @@ -209,7 +210,6 @@ public final class MessagingService implements MessagingServiceMBean put(Verb.INTERNAL_RESPONSE, CallbackDeterminedSerializer.instance); put(Verb.MUTATION, Mutation.serializer); - put(Verb.BATCHLOG_MUTATION, Mutation.serializer); put(Verb.READ_REPAIR, Mutation.serializer); put(Verb.READ, ReadCommand.serializer); put(Verb.RANGE_SLICE, ReadCommand.rangeSliceSerializer); @@ -229,6 +229,8 @@ public final class MessagingService implements MessagingServiceMBean put(Verb.PAXOS_PROPOSE, Commit.serializer); put(Verb.PAXOS_COMMIT, Commit.serializer); put(Verb.HINT, HintMessage.serializer); + put(Verb.BATCH_STORE, Batch.serializer); + put(Verb.BATCH_REMOVE, UUIDSerializer.serializer); }}; /** @@ -238,7 +240,6 @@ public final class MessagingService implements MessagingServiceMBean {{ put(Verb.MUTATION, WriteResponse.serializer); put(Verb.HINT, HintResponse.serializer); - put(Verb.BATCHLOG_MUTATION, WriteResponse.serializer); put(Verb.READ_REPAIR, WriteResponse.serializer); put(Verb.COUNTER_MUTATION, WriteResponse.serializer); put(Verb.RANGE_SLICE, ReadResponse.rangeSliceSerializer); @@ -254,6 +255,9 @@ public final class MessagingService implements MessagingServiceMBean put(Verb.PAXOS_PREPARE, PrepareResponse.serializer); put(Verb.PAXOS_PROPOSE, BooleanSerializer.serializer); + + put(Verb.BATCH_STORE, WriteResponse.serializer); + put(Verb.BATCH_REMOVE, WriteResponse.serializer); }}; /* This records all the results mapped by message Id */ @@ -286,7 +290,7 @@ public final class MessagingService implements MessagingServiceMBean /* Lookup table for registering message handlers based on the verb. */ private final Map<Verb, IVerbHandler> verbHandlers; - private final ConcurrentMap<InetAddress, OutboundTcpConnectionPool> connectionManagers = new NonBlockingHashMap<InetAddress, OutboundTcpConnectionPool>(); + private final ConcurrentMap<InetAddress, OutboundTcpConnectionPool> connectionManagers = new NonBlockingHashMap<>(); private static final Logger logger = LoggerFactory.getLogger(MessagingService.class); private static final int LOG_DROPPED_INTERVAL_IN_MS = 5000; @@ -301,14 +305,15 @@ public final class MessagingService implements MessagingServiceMBean */ public static final EnumSet<Verb> DROPPABLE_VERBS = EnumSet.of(Verb._TRACE, Verb.MUTATION, - Verb.BATCHLOG_MUTATION, //FIXME: should this be droppable?? Verb.COUNTER_MUTATION, Verb.HINT, Verb.READ_REPAIR, Verb.READ, Verb.RANGE_SLICE, Verb.PAGED_RANGE, - Verb.REQUEST_RESPONSE); + Verb.REQUEST_RESPONSE, + Verb.BATCH_STORE, + Verb.BATCH_REMOVE); private static final class DroppedMessages @@ -372,7 +377,7 @@ public final class MessagingService implements MessagingServiceMBean droppedMessagesMap.put(verb, new DroppedMessages(verb)); listenGate = new SimpleCondition(); - verbHandlers = new EnumMap<Verb, IVerbHandler>(Verb.class); + verbHandlers = new EnumMap<>(Verb.class); if (!testOnly) { Runnable logDropped = new Runnable() @@ -630,7 +635,6 @@ public final class MessagingService implements MessagingServiceMBean boolean allowHints) { assert message.verb == Verb.MUTATION - || message.verb == Verb.BATCHLOG_MUTATION || message.verb == Verb.COUNTER_MUTATION || message.verb == Verb.PAXOS_COMMIT; int messageId = nextId(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/service/CassandraDaemon.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java index cf1e021..c8b9677 100644 --- a/src/java/org/apache/cassandra/service/CassandraDaemon.java +++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java @@ -27,7 +27,6 @@ import java.rmi.registry.LocateRegistry; import java.rmi.server.RMIServerSocketFactory; import java.util.Collections; import java.util.Map; -import java.util.UUID; import java.util.concurrent.TimeUnit; import javax.management.MBeanServer; import javax.management.ObjectName; @@ -41,7 +40,6 @@ import com.codahale.metrics.Meter; import com.codahale.metrics.MetricRegistryListener; import com.codahale.metrics.SharedMetricRegistries; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.Iterables; import com.google.common.util.concurrent.Uninterruptibles; import org.slf4j.Logger; @@ -52,6 +50,7 @@ import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.*; +import org.apache.cassandra.batchlog.LegacyBatchlogMigrator; import org.apache.cassandra.db.commitlog.CommitLog; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.exceptions.StartupException; @@ -280,6 +279,9 @@ public class CassandraDaemon // migrate any legacy (pre-3.0) hints from system.hints table into the new store new LegacyHintsMigrator(DatabaseDescriptor.getHintsDirectory(), DatabaseDescriptor.getMaxHintsFileSize()).migrate(); + // migrate any legacy (pre-3.0) batch entries from system.batchlog to system.batches (new table format) + LegacyBatchlogMigrator.migrate(); + // enable auto compaction for (Keyspace keyspace : Keyspace.all()) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index 4952959..59f1c1c 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -24,12 +24,9 @@ import java.nio.ByteBuffer; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; -import java.util.stream.Collectors; -import javax.annotation.Nullable; import javax.management.MBeanServer; import javax.management.ObjectName; -import com.google.common.base.Function; import com.google.common.base.Predicate; import com.google.common.cache.CacheLoader; import com.google.common.collect.*; @@ -44,14 +41,19 @@ import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.*; +import org.apache.cassandra.db.HintedHandOffManager; +import org.apache.cassandra.batchlog.*; +import org.apache.cassandra.batchlog.LegacyBatchlogMigrator; import org.apache.cassandra.db.filter.DataLimits; import org.apache.cassandra.db.filter.TombstoneOverwhelmingException; -import org.apache.cassandra.db.marshal.UUIDType; import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.db.rows.RowIterator; import org.apache.cassandra.db.view.MaterializedViewManager; import org.apache.cassandra.db.view.MaterializedViewUtils; -import org.apache.cassandra.dht.*; +import org.apache.cassandra.dht.AbstractBounds; +import org.apache.cassandra.dht.Bounds; +import org.apache.cassandra.dht.RingPosition; +import org.apache.cassandra.dht.Token; import org.apache.cassandra.exceptions.*; import org.apache.cassandra.gms.FailureDetector; import org.apache.cassandra.gms.Gossiper; @@ -67,8 +69,8 @@ import org.apache.cassandra.service.paxos.PrepareCallback; import org.apache.cassandra.service.paxos.ProposeCallback; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.triggers.TriggerExecutor; -import org.apache.cassandra.utils.AbstractIterator; import org.apache.cassandra.utils.*; +import org.apache.cassandra.utils.AbstractIterator; public class StorageProxy implements StorageProxyMBean { @@ -687,31 +689,19 @@ public class StorageProxy implements StorageProxyMBean WriteType.BATCH, cleanup); - //When local node is the endpoint and there are no pending nodes we can + // When local node is the endpoint and there are no pending nodes we can // Just apply the mutation locally. - if (pairedEndpoint.equals(FBUtilities.getBroadcastAddress()) && - wrapper.handler.pendingEndpoints.isEmpty()) - { - if (writeCommitLog) - mutation.apply(); - else - mutation.applyUnsafe(); - } + if (pairedEndpoint.equals(FBUtilities.getBroadcastAddress()) && wrapper.handler.pendingEndpoints.isEmpty()) + mutation.apply(writeCommitLog); else - { wrappers.add(wrapper); - } } if (!wrappers.isEmpty()) { - Mutation blMutation = BatchlogManager.getBatchlogMutationFor(Lists.transform(wrappers, w -> w.mutation), batchUUID, MessagingService.current_version); - - //Apply to local batchlog memtable in this thread - if (writeCommitLog) - blMutation.apply(); - else - blMutation.applyUnsafe(); + // Apply to local batchlog memtable in this thread + BatchlogManager.store(Batch.createLocal(batchUUID, FBUtilities.timestampMicros(), Lists.transform(wrappers, w -> w.mutation)), + writeCommitLog); // now actually perform the writes and wait for them to complete asyncWriteBatchedMutations(wrappers, localDataCenter, Stage.MATERIALIZED_VIEW_MUTATION); @@ -781,16 +771,10 @@ public class StorageProxy implements StorageProxyMBean batchConsistencyLevel = consistency_level; } - final Collection<InetAddress> batchlogEndpoints = getBatchlogEndpoints(localDataCenter, batchConsistencyLevel); + final BatchlogEndpoints batchlogEndpoints = getBatchlogEndpoints(localDataCenter, batchConsistencyLevel); final UUID batchUUID = UUIDGen.getTimeUUID(); BatchlogResponseHandler.BatchlogCleanup cleanup = new BatchlogResponseHandler.BatchlogCleanup(mutations.size(), - new BatchlogResponseHandler.BatchlogCleanupCallback() - { - public void invoke() - { - asyncRemoveFromBatchlog(batchlogEndpoints, batchUUID); - } - }); + () -> asyncRemoveFromBatchlog(batchlogEndpoints, batchUUID)); // add a handler for each mutation - includes checking availability, but doesn't initiate any writes, yet for (Mutation mutation : mutations) @@ -840,75 +824,64 @@ public class StorageProxy implements StorageProxyMBean return replica.equals(FBUtilities.getBroadcastAddress()); } + private static void syncWriteToBatchlog(Collection<Mutation> mutations, BatchlogEndpoints endpoints, UUID uuid) + throws WriteTimeoutException, WriteFailureException + { + WriteResponseHandler<?> handler = new WriteResponseHandler<>(endpoints.all, + Collections.<InetAddress>emptyList(), + endpoints.all.size() == 1 ? ConsistencyLevel.ONE : ConsistencyLevel.TWO, + Keyspace.open(SystemKeyspace.NAME), + null, + WriteType.BATCH_LOG); + + Batch batch = Batch.createLocal(uuid, FBUtilities.timestampMicros(), mutations); + + if (!endpoints.current.isEmpty()) + syncWriteToBatchlog(handler, batch, endpoints.current); + + if (!endpoints.legacy.isEmpty()) + LegacyBatchlogMigrator.syncWriteToBatchlog(handler, batch, endpoints.legacy); + + handler.get(); + } - private static void syncWriteToBatchlog(Collection<Mutation> mutations, Collection<InetAddress> endpoints, UUID uuid) + private static void syncWriteToBatchlog(WriteResponseHandler<?> handler, Batch batch, Collection<InetAddress> endpoints) throws WriteTimeoutException, WriteFailureException { - AbstractWriteResponseHandler<IMutation> handler = new WriteResponseHandler<>(endpoints, - Collections.<InetAddress>emptyList(), - ConsistencyLevel.ONE, - Keyspace.open(SystemKeyspace.NAME), - null, - WriteType.BATCH_LOG); + MessageOut<Batch> message = new MessageOut<>(MessagingService.Verb.BATCH_STORE, batch, Batch.serializer); - MessageOut<Mutation> message = BatchlogManager.getBatchlogMutationFor(mutations, uuid, MessagingService.current_version) - .createMessage(MessagingService.Verb.BATCHLOG_MUTATION); for (InetAddress target : endpoints) { - int targetVersion = MessagingService.instance().getVersion(target); + logger.debug("Sending batchlog store request {} to {} for {} mutations", batch.id, target, batch.size()); + if (canDoLocalRequest(target)) - { - insertLocal(Stage.BATCHLOG_MUTATION, message.payload, handler); - } - else if (targetVersion < MessagingService.VERSION_30) - { - MessagingService.instance().sendRR(BatchlogManager.getBatchlogMutationFor(mutations, uuid, targetVersion) - .createMessage(MessagingService.Verb.MUTATION), - target, - handler, - false); - } + performLocally(Stage.MUTATION, () -> BatchlogManager.store(batch), handler); else - { - MessagingService.instance().sendRR(message, target, handler, false); - } + MessagingService.instance().sendRR(message, target, handler); } + } - handler.get(); + private static void asyncRemoveFromBatchlog(BatchlogEndpoints endpoints, UUID uuid) + { + if (!endpoints.current.isEmpty()) + asyncRemoveFromBatchlog(endpoints.current, uuid); + + if (!endpoints.legacy.isEmpty()) + LegacyBatchlogMigrator.asyncRemoveFromBatchlog(endpoints.legacy, uuid); } private static void asyncRemoveFromBatchlog(Collection<InetAddress> endpoints, UUID uuid) { - AbstractWriteResponseHandler<IMutation> handler = new WriteResponseHandler<>(endpoints, - Collections.<InetAddress>emptyList(), - ConsistencyLevel.ANY, - Keyspace.open(SystemKeyspace.NAME), - null, - WriteType.SIMPLE); - Mutation mutation = new Mutation( - PartitionUpdate.fullPartitionDelete(SystemKeyspace.Batches, - UUIDType.instance.decompose(uuid), - FBUtilities.timestampMicros(), - FBUtilities.nowInSeconds())); - MessageOut<Mutation> message = mutation.createMessage(MessagingService.Verb.BATCHLOG_MUTATION); + MessageOut<UUID> message = new MessageOut<>(MessagingService.Verb.BATCH_REMOVE, uuid, UUIDSerializer.serializer); for (InetAddress target : endpoints) { - int targetVersion = MessagingService.instance().getVersion(target); + if (logger.isDebugEnabled()) + logger.debug("Sending batchlog remove request {} to {}", uuid, target); + if (canDoLocalRequest(target)) - { - insertLocal(Stage.BATCHLOG_MUTATION, message.payload, handler); - } - else if (targetVersion < MessagingService.VERSION_30) - { - MessagingService.instance().sendRR(mutation.createMessage(MessagingService.Verb.MUTATION), - target, - handler, - false); - } + performLocally(Stage.MUTATION, () -> BatchlogManager.remove(uuid)); else - { - MessagingService.instance().sendRR(message, target, handler, false); - } + MessagingService.instance().sendOneWay(message, target); } } @@ -1034,13 +1007,38 @@ public class StorageProxy implements StorageProxyMBean } /* + * A class to filter batchlog endpoints into legacy endpoints (version < 3.0) or not. + */ + private static final class BatchlogEndpoints + { + public final Collection<InetAddress> all; + public final Collection<InetAddress> current; + public final Collection<InetAddress> legacy; + + BatchlogEndpoints(Collection<InetAddress> endpoints) + { + all = endpoints; + current = new ArrayList<>(2); + legacy = new ArrayList<>(2); + + for (InetAddress ep : endpoints) + { + if (MessagingService.instance().getVersion(ep) >= MessagingService.VERSION_30) + current.add(ep); + else + legacy.add(ep); + } + } + } + + /* * Replicas are picked manually: * - replicas should be alive according to the failure detector * - replicas should be in the local datacenter * - choose min(2, number of qualifying candiates above) * - allow the local node to be the only replica only if it's a single-node DC */ - private static Collection<InetAddress> getBatchlogEndpoints(String localDataCenter, ConsistencyLevel consistencyLevel) + private static BatchlogEndpoints getBatchlogEndpoints(String localDataCenter, ConsistencyLevel consistencyLevel) throws UnavailableException { TokenMetadata.Topology topology = StorageService.instance.getTokenMetadata().cachedOnlyTokenMap().getTopology(); @@ -1051,12 +1049,12 @@ public class StorageProxy implements StorageProxyMBean if (chosenEndpoints.isEmpty()) { if (consistencyLevel == ConsistencyLevel.ANY) - return Collections.singleton(FBUtilities.getBroadcastAddress()); + return new BatchlogEndpoints(Collections.singleton(FBUtilities.getBroadcastAddress())); throw new UnavailableException(ConsistencyLevel.ONE, 1, 0); } - return chosenEndpoints; + return new BatchlogEndpoints(chosenEndpoints); } /** @@ -1109,7 +1107,8 @@ public class StorageProxy implements StorageProxyMBean if (canDoLocalRequest(destination)) { insertLocal = true; - } else + } + else { // belongs on a different server if (message == null) @@ -1120,14 +1119,15 @@ public class StorageProxy implements StorageProxyMBean if (localDataCenter.equals(dc)) { MessagingService.instance().sendRR(message, destination, responseHandler, true); - } else + } + else { Collection<InetAddress> messages = (dcGroups != null) ? dcGroups.get(dc) : null; if (messages == null) { - messages = new ArrayList<InetAddress>(3); // most DCs will have <= 3 replicas + messages = new ArrayList<>(3); // most DCs will have <= 3 replicas if (dcGroups == null) - dcGroups = new HashMap<String, Collection<InetAddress>>(); + dcGroups = new HashMap<>(); dcGroups.put(dc, messages); } messages.add(destination); @@ -1149,7 +1149,7 @@ public class StorageProxy implements StorageProxyMBean submitHint(mutation, endpointsToHint, responseHandler); if (insertLocal) - insertLocal(stage, mutation, responseHandler); + performLocally(stage, mutation::apply, responseHandler); if (dcGroups != null) { @@ -1198,7 +1198,7 @@ public class StorageProxy implements StorageProxyMBean } } - private static void insertLocal(Stage stage, final Mutation mutation, final AbstractWriteResponseHandler<IMutation> responseHandler) + private static void performLocally(Stage stage, final Runnable runnable) { StageManager.getStage(stage).maybeExecuteImmediately(new LocalMutationRunnable() { @@ -1206,14 +1206,32 @@ public class StorageProxy implements StorageProxyMBean { try { - mutation.apply(); - responseHandler.response(null); + runnable.run(); + } + catch (Exception ex) + { + logger.error("Failed to apply mutation locally : {}", ex); + } + } + }); + } + + private static void performLocally(Stage stage, final Runnable runnable, final IAsyncCallbackWithFailure<?> handler) + { + StageManager.getStage(stage).maybeExecuteImmediately(new LocalMutationRunnable() + { + public void runMayThrow() + { + try + { + runnable.run(); + handler.response(null); } catch (Exception ex) { if (!(ex instanceof WriteTimeoutException)) logger.error("Failed to apply mutation locally : {}", ex); - responseHandler.onFailure(FBUtilities.getBroadcastAddress()); + handler.onFailure(FBUtilities.getBroadcastAddress()); } } }); http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index a6c2f8b..13dc29c7 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -55,6 +55,9 @@ import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.concurrent.StageManager; import org.apache.cassandra.config.*; import org.apache.cassandra.db.*; +import org.apache.cassandra.batchlog.BatchStoreVerbHandler; +import org.apache.cassandra.batchlog.BatchRemoveVerbHandler; +import org.apache.cassandra.batchlog.BatchlogManager; import org.apache.cassandra.db.commitlog.CommitLog; import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.db.lifecycle.TransactionLog; @@ -282,7 +285,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE /* register the verb handlers */ MessagingService.instance().registerVerbHandlers(MessagingService.Verb.MUTATION, new MutationVerbHandler()); - MessagingService.instance().registerVerbHandlers(MessagingService.Verb.BATCHLOG_MUTATION, new MutationVerbHandler()); MessagingService.instance().registerVerbHandlers(MessagingService.Verb.READ_REPAIR, new ReadRepairVerbHandler()); MessagingService.instance().registerVerbHandlers(MessagingService.Verb.READ, new ReadCommandVerbHandler()); MessagingService.instance().registerVerbHandlers(MessagingService.Verb.RANGE_SLICE, new RangeSliceVerbHandler()); @@ -311,6 +313,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE MessagingService.instance().registerVerbHandlers(MessagingService.Verb.SNAPSHOT, new SnapshotVerbHandler()); MessagingService.instance().registerVerbHandlers(MessagingService.Verb.ECHO, new EchoVerbHandler()); + + MessagingService.instance().registerVerbHandlers(MessagingService.Verb.BATCH_STORE, new BatchStoreVerbHandler()); + MessagingService.instance().registerVerbHandlers(MessagingService.Verb.BATCH_REMOVE, new BatchRemoveVerbHandler()); } public void registerDaemon(CassandraDaemon daemon) @@ -620,12 +625,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE { inShutdownHook = true; ExecutorService materializedViewMutationStage = StageManager.getStage(Stage.MATERIALIZED_VIEW_MUTATION); - ExecutorService batchlogMutationStage = StageManager.getStage(Stage.BATCHLOG_MUTATION); ExecutorService counterMutationStage = StageManager.getStage(Stage.COUNTER_MUTATION); ExecutorService mutationStage = StageManager.getStage(Stage.MUTATION); if (mutationStage.isShutdown() && counterMutationStage.isShutdown() - && batchlogMutationStage.isShutdown() && materializedViewMutationStage.isShutdown()) return; // drained already @@ -638,12 +641,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE // before mutation stage, so we can get all the hints saved before shutting down MessagingService.instance().shutdown(); materializedViewMutationStage.shutdown(); - batchlogMutationStage.shutdown(); HintsService.instance.pauseDispatch(); counterMutationStage.shutdown(); mutationStage.shutdown(); materializedViewMutationStage.awaitTermination(3600, TimeUnit.SECONDS); - batchlogMutationStage.awaitTermination(3600, TimeUnit.SECONDS); counterMutationStage.awaitTermination(3600, TimeUnit.SECONDS); mutationStage.awaitTermination(3600, TimeUnit.SECONDS); StorageProxy.instance.verifyNoHintsInProgress(); @@ -3846,17 +3847,15 @@ public class StorageService extends NotificationBroadcasterSupport implements IE { inShutdownHook = true; - BatchlogManager.shutdown(); + BatchlogManager.instance.shutdown(); HintsService.instance.pauseDispatch(); ExecutorService counterMutationStage = StageManager.getStage(Stage.COUNTER_MUTATION); - ExecutorService batchlogMutationStage = StageManager.getStage(Stage.BATCHLOG_MUTATION); ExecutorService materializedViewMutationStage = StageManager.getStage(Stage.MATERIALIZED_VIEW_MUTATION); ExecutorService mutationStage = StageManager.getStage(Stage.MUTATION); if (mutationStage.isTerminated() && counterMutationStage.isTerminated() - && batchlogMutationStage.isTerminated() && materializedViewMutationStage.isTerminated()) { logger.warn("Cannot drain node (did it already happen?)"); @@ -3872,11 +3871,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE setMode(Mode.DRAINING, "clearing mutation stage", false); materializedViewMutationStage.shutdown(); - batchlogMutationStage.shutdown(); counterMutationStage.shutdown(); mutationStage.shutdown(); materializedViewMutationStage.awaitTermination(3600, TimeUnit.SECONDS); - batchlogMutationStage.awaitTermination(3600, TimeUnit.SECONDS); counterMutationStage.awaitTermination(3600, TimeUnit.SECONDS); mutationStage.awaitTermination(3600, TimeUnit.SECONDS); @@ -3913,6 +3910,8 @@ public class StorageService extends NotificationBroadcasterSupport implements IE } FBUtilities.waitOnFutures(flushes); + BatchlogManager.instance.shutdown(); + HintsService.instance.shutdownBlocking(); // whilst we've flushed all the CFs, which will have recycled all completed segments, we want to ensure http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/service/paxos/CommitVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/paxos/CommitVerbHandler.java b/src/java/org/apache/cassandra/service/paxos/CommitVerbHandler.java index 213023e..a702a4d 100644 --- a/src/java/org/apache/cassandra/service/paxos/CommitVerbHandler.java +++ b/src/java/org/apache/cassandra/service/paxos/CommitVerbHandler.java @@ -1,4 +1,3 @@ -package org.apache.cassandra.service.paxos; /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -19,7 +18,7 @@ package org.apache.cassandra.service.paxos; * under the License. * */ - +package org.apache.cassandra.service.paxos; import org.apache.cassandra.db.WriteResponse; import org.apache.cassandra.net.IVerbHandler; @@ -33,8 +32,7 @@ public class CommitVerbHandler implements IVerbHandler<Commit> { PaxosState.commit(message.payload); - WriteResponse response = new WriteResponse(); Tracing.trace("Enqueuing acknowledge to {}", message.from); - MessagingService.instance().sendReply(response.createMessage(), id, message.from); + MessagingService.instance().sendReply(WriteResponse.createMessage(), id, message.from); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/tools/NodeProbe.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java index 6909ea4..5f77097 100644 --- a/src/java/org/apache/cassandra/tools/NodeProbe.java +++ b/src/java/org/apache/cassandra/tools/NodeProbe.java @@ -54,8 +54,8 @@ import javax.management.remote.JMXServiceURL; import javax.rmi.ssl.SslRMIClientSocketFactory; import org.apache.cassandra.concurrent.Stage; -import org.apache.cassandra.db.BatchlogManager; -import org.apache.cassandra.db.BatchlogManagerMBean; +import org.apache.cassandra.batchlog.BatchlogManager; +import org.apache.cassandra.batchlog.BatchlogManagerMBean; import org.apache.cassandra.db.ColumnFamilyStoreMBean; import org.apache.cassandra.db.HintedHandOffManagerMBean; import org.apache.cassandra.db.compaction.CompactionManager; http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/test/long/org/apache/cassandra/cql3/MaterializedViewLongTest.java ---------------------------------------------------------------------- diff --git a/test/long/org/apache/cassandra/cql3/MaterializedViewLongTest.java b/test/long/org/apache/cassandra/cql3/MaterializedViewLongTest.java index 9738103..b833e60 100644 --- a/test/long/org/apache/cassandra/cql3/MaterializedViewLongTest.java +++ b/test/long/org/apache/cassandra/cql3/MaterializedViewLongTest.java @@ -35,7 +35,7 @@ import com.datastax.driver.core.exceptions.WriteTimeoutException; import org.apache.cassandra.concurrent.SEPExecutor; import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.concurrent.StageManager; -import org.apache.cassandra.db.BatchlogManager; +import org.apache.cassandra.batchlog.BatchlogManager; import org.apache.cassandra.utils.WrappedRunnable; public class MaterializedViewLongTest extends CQLTester http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/test/unit/org/apache/cassandra/batchlog/BatchTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/batchlog/BatchTest.java b/test/unit/org/apache/cassandra/batchlog/BatchTest.java new file mode 100644 index 0000000..b7a4100 --- /dev/null +++ b/test/unit/org/apache/cassandra/batchlog/BatchTest.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.batchlog; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.UUID; + +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.db.RowUpdateBuilder; +import org.apache.cassandra.db.marshal.BytesType; +import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.io.util.DataInputBuffer; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputBuffer; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.schema.KeyspaceParams; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.UUIDGen; + +import static org.apache.cassandra.utils.ByteBufferUtil.bytes; +import static org.junit.Assert.assertEquals; + +public class BatchTest +{ + private static final String KEYSPACE = "BatchRequestTest"; + private static final String CF_STANDARD = "Standard"; + + @BeforeClass + public static void defineSchema() throws ConfigurationException + { + SchemaLoader.prepareServer(); + SchemaLoader.createKeyspace(KEYSPACE, + KeyspaceParams.simple(1), + SchemaLoader.standardCFMD(KEYSPACE, CF_STANDARD, 1, BytesType.instance)); + } + + @Test + public void testSerialization() throws IOException + { + CFMetaData cfm = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF_STANDARD).metadata; + + long now = FBUtilities.timestampMicros(); + int version = MessagingService.current_version; + UUID uuid = UUIDGen.getTimeUUID(); + + List<Mutation> mutations = new ArrayList<>(10); + for (int i = 0; i < 10; i++) + { + mutations.add(new RowUpdateBuilder(cfm, FBUtilities.timestampMicros(), bytes(i)) + .clustering("name" + i) + .add("val", "val" + i) + .build()); + } + + Batch batch1 = Batch.createLocal(uuid, now, mutations); + assertEquals(uuid, batch1.id); + assertEquals(now, batch1.creationTime); + assertEquals(mutations, batch1.decodedMutations); + + DataOutputBuffer out = new DataOutputBuffer(); + Batch.serializer.serialize(batch1, out, version); + + assertEquals(out.getLength(), Batch.serializer.serializedSize(batch1, version)); + + DataInputPlus dis = new DataInputBuffer(out.getData()); + Batch batch2 = Batch.serializer.deserialize(dis, version); + + assertEquals(batch1.id, batch2.id); + assertEquals(batch1.creationTime, batch2.creationTime); + assertEquals(batch1.decodedMutations.size(), batch2.encodedMutations.size()); + + Iterator<Mutation> it1 = batch1.decodedMutations.iterator(); + Iterator<ByteBuffer> it2 = batch2.encodedMutations.iterator(); + while (it1.hasNext()) + { + try (DataInputBuffer in = new DataInputBuffer(it2.next().array())) + { + assertEquals(it1.next().toString(), Mutation.serializer.deserialize(in, version).toString()); + } + } + } + + /** + * This is just to test decodeMutations() when deserializing, + * since Batch will never be serialized at a version 2.2. + * @throws IOException + */ + @Test + public void testSerializationNonCurrentVersion() throws IOException + { + CFMetaData cfm = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF_STANDARD).metadata; + + long now = FBUtilities.timestampMicros(); + int version = MessagingService.VERSION_22; + UUID uuid = UUIDGen.getTimeUUID(); + + List<Mutation> mutations = new ArrayList<>(10); + for (int i = 0; i < 10; i++) + { + mutations.add(new RowUpdateBuilder(cfm, FBUtilities.timestampMicros(), bytes(i)) + .clustering("name" + i) + .add("val", "val" + i) + .build()); + } + + Batch batch1 = Batch.createLocal(uuid, now, mutations); + assertEquals(uuid, batch1.id); + assertEquals(now, batch1.creationTime); + assertEquals(mutations, batch1.decodedMutations); + + DataOutputBuffer out = new DataOutputBuffer(); + Batch.serializer.serialize(batch1, out, version); + + assertEquals(out.getLength(), Batch.serializer.serializedSize(batch1, version)); + + DataInputPlus dis = new DataInputBuffer(out.getData()); + Batch batch2 = Batch.serializer.deserialize(dis, version); + + assertEquals(batch1.id, batch2.id); + assertEquals(batch1.creationTime, batch2.creationTime); + assertEquals(batch1.decodedMutations.size(), batch2.decodedMutations.size()); + + Iterator<Mutation> it1 = batch1.decodedMutations.iterator(); + Iterator<Mutation> it2 = batch2.decodedMutations.iterator(); + while (it1.hasNext()) + assertEquals(it1.next().toString(), it2.next().toString()); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/test/unit/org/apache/cassandra/batchlog/BatchlogEndpointFilterTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/batchlog/BatchlogEndpointFilterTest.java b/test/unit/org/apache/cassandra/batchlog/BatchlogEndpointFilterTest.java new file mode 100644 index 0000000..23aeaaa --- /dev/null +++ b/test/unit/org/apache/cassandra/batchlog/BatchlogEndpointFilterTest.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.batchlog; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.Collection; +import java.util.HashSet; + +import com.google.common.collect.ImmutableMultimap; +import com.google.common.collect.Multimap; +import org.junit.Test; +import org.junit.matchers.JUnitMatchers; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; + +public class BatchlogEndpointFilterTest +{ + private static final String LOCAL = "local"; + + @Test + public void shouldSelect2hostsFromNonLocalRacks() throws UnknownHostException + { + Multimap<String, InetAddress> endpoints = ImmutableMultimap.<String, InetAddress> builder() + .put(LOCAL, InetAddress.getByName("0")) + .put(LOCAL, InetAddress.getByName("00")) + .put("1", InetAddress.getByName("1")) + .put("1", InetAddress.getByName("11")) + .put("2", InetAddress.getByName("2")) + .put("2", InetAddress.getByName("22")) + .build(); + Collection<InetAddress> result = new TestEndpointFilter(LOCAL, endpoints).filter(); + assertThat(result.size(), is(2)); + assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("11"))); + assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("22"))); + } + + @Test + public void shouldSelectHostFromLocal() throws UnknownHostException + { + Multimap<String, InetAddress> endpoints = ImmutableMultimap.<String, InetAddress> builder() + .put(LOCAL, InetAddress.getByName("0")) + .put(LOCAL, InetAddress.getByName("00")) + .put("1", InetAddress.getByName("1")) + .build(); + Collection<InetAddress> result = new TestEndpointFilter(LOCAL, endpoints).filter(); + assertThat(result.size(), is(2)); + assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("1"))); + assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("0"))); + } + + @Test + public void shouldReturnAsIsIfNoEnoughEndpoints() throws UnknownHostException + { + Multimap<String, InetAddress> endpoints = ImmutableMultimap.<String, InetAddress> builder() + .put(LOCAL, InetAddress.getByName("0")) + .build(); + Collection<InetAddress> result = new TestEndpointFilter(LOCAL, endpoints).filter(); + assertThat(result.size(), is(1)); + assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("0"))); + } + + @Test + public void shouldSelectTwoRandomHostsFromSingleOtherRack() throws UnknownHostException + { + Multimap<String, InetAddress> endpoints = ImmutableMultimap.<String, InetAddress> builder() + .put(LOCAL, InetAddress.getByName("0")) + .put(LOCAL, InetAddress.getByName("00")) + .put("1", InetAddress.getByName("1")) + .put("1", InetAddress.getByName("11")) + .put("1", InetAddress.getByName("111")) + .build(); + Collection<InetAddress> result = new TestEndpointFilter(LOCAL, endpoints).filter(); + // result should contain random two distinct values + assertThat(new HashSet<>(result).size(), is(2)); + } + + private static class TestEndpointFilter extends BatchlogManager.EndpointFilter + { + TestEndpointFilter(String localRack, Multimap<String, InetAddress> endpoints) + { + super(localRack, endpoints); + } + + @Override + protected boolean isValid(InetAddress input) + { + // We will use always alive non-localhost endpoints + return true; + } + + @Override + protected int getRandomInt(int bound) + { + // We don't need random behavior here + return bound - 1; + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/test/unit/org/apache/cassandra/batchlog/BatchlogManagerTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/batchlog/BatchlogManagerTest.java b/test/unit/org/apache/cassandra/batchlog/BatchlogManagerTest.java new file mode 100644 index 0000000..dfb17c3 --- /dev/null +++ b/test/unit/org/apache/cassandra/batchlog/BatchlogManagerTest.java @@ -0,0 +1,460 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.batchlog; + +import java.io.IOException; +import java.net.InetAddress; +import java.util.*; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import com.google.common.collect.Lists; +import org.junit.*; + +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.Util; +import org.apache.cassandra.Util.PartitionerSwitcher; +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.config.Schema; +import org.apache.cassandra.cql3.UntypedResultSet; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.db.RowUpdateBuilder; +import org.apache.cassandra.db.SystemKeyspace; +import org.apache.cassandra.db.commitlog.ReplayPosition; +import org.apache.cassandra.db.marshal.BytesType; +import org.apache.cassandra.db.partitions.ImmutableBTreePartition; +import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.db.rows.Row; +import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.locator.TokenMetadata; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.schema.KeyspaceParams; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.UUIDGen; + +import static org.apache.cassandra.cql3.QueryProcessor.executeInternal; +import static org.junit.Assert.*; + +public class BatchlogManagerTest +{ + private static final String KEYSPACE1 = "BatchlogManagerTest1"; + private static final String CF_STANDARD1 = "Standard1"; + private static final String CF_STANDARD2 = "Standard2"; + private static final String CF_STANDARD3 = "Standard3"; + private static final String CF_STANDARD4 = "Standard4"; + private static final String CF_STANDARD5 = "Standard5"; + + static PartitionerSwitcher sw; + + @BeforeClass + public static void defineSchema() throws ConfigurationException + { + sw = Util.switchPartitioner(Murmur3Partitioner.instance); + SchemaLoader.prepareServer(); + SchemaLoader.createKeyspace(KEYSPACE1, + KeyspaceParams.simple(1), + SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD1, 1, BytesType.instance), + SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD2, 1, BytesType.instance), + SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD3, 1, BytesType.instance), + SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD4, 1, BytesType.instance), + SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD5, 1, BytesType.instance)); + } + + @AfterClass + public static void cleanup() + { + sw.close(); + } + + @Before + @SuppressWarnings("deprecation") + public void setUp() throws Exception + { + TokenMetadata metadata = StorageService.instance.getTokenMetadata(); + InetAddress localhost = InetAddress.getByName("127.0.0.1"); + metadata.updateNormalToken(Util.token("A"), localhost); + metadata.updateHostId(UUIDGen.getTimeUUID(), localhost); + Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.BATCHES).truncateBlocking(); + Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.LEGACY_BATCHLOG).truncateBlocking(); + } + + @Test + public void testDelete() + { + ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1); + CFMetaData cfm = cfs.metadata; + new RowUpdateBuilder(cfm, FBUtilities.timestampMicros(), ByteBufferUtil.bytes("1234")) + .clustering("c") + .add("val", "val" + 1234) + .build() + .applyUnsafe(); + + DecoratedKey dk = cfs.decorateKey(ByteBufferUtil.bytes("1234")); + ImmutableBTreePartition results = Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, dk).build()); + Iterator<Row> iter = results.iterator(); + assert iter.hasNext(); + + Mutation mutation = new Mutation(PartitionUpdate.fullPartitionDelete(cfm, + dk, + FBUtilities.timestampMicros(), + FBUtilities.nowInSeconds())); + mutation.applyUnsafe(); + + Util.assertEmpty(Util.cmd(cfs, dk).build()); + } + + @Test + public void testReplay() throws Exception + { + testReplay(false); + } + + @Test + public void testLegacyReplay() throws Exception + { + testReplay(true); + } + + @SuppressWarnings("deprecation") + private static void testReplay(boolean legacy) throws Exception + { + long initialAllBatches = BatchlogManager.instance.countAllBatches(); + long initialReplayedBatches = BatchlogManager.instance.getTotalBatchesReplayed(); + + CFMetaData cfm = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1).metadata; + + // Generate 1000 mutations (100 batches of 10 mutations each) and put them all into the batchlog. + // Half batches (50) ready to be replayed, half not. + for (int i = 0; i < 100; i++) + { + List<Mutation> mutations = new ArrayList<>(10); + for (int j = 0; j < 10; j++) + { + mutations.add(new RowUpdateBuilder(cfm, FBUtilities.timestampMicros(), ByteBufferUtil.bytes(i)) + .clustering("name" + j) + .add("val", "val" + j) + .build()); + } + + long timestamp = i < 50 + ? (System.currentTimeMillis() - BatchlogManager.getBatchlogTimeout()) + : (System.currentTimeMillis() + BatchlogManager.getBatchlogTimeout()); + + if (legacy) + LegacyBatchlogMigrator.store(Batch.createLocal(UUIDGen.getTimeUUID(timestamp, i), timestamp * 1000, mutations), MessagingService.current_version); + else + BatchlogManager.store(Batch.createLocal(UUIDGen.getTimeUUID(timestamp, i), timestamp * 1000, mutations)); + } + + if (legacy) + { + Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.LEGACY_BATCHLOG).forceBlockingFlush(); + LegacyBatchlogMigrator.migrate(); + } + + // Flush the batchlog to disk (see CASSANDRA-6822). + Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.BATCHES).forceBlockingFlush(); + + assertEquals(100, BatchlogManager.instance.countAllBatches() - initialAllBatches); + assertEquals(0, BatchlogManager.instance.getTotalBatchesReplayed() - initialReplayedBatches); + + // Force batchlog replay and wait for it to complete. + BatchlogManager.instance.startBatchlogReplay().get(); + + // Ensure that the first half, and only the first half, got replayed. + assertEquals(50, BatchlogManager.instance.countAllBatches() - initialAllBatches); + assertEquals(50, BatchlogManager.instance.getTotalBatchesReplayed() - initialReplayedBatches); + + for (int i = 0; i < 100; i++) + { + String query = String.format("SELECT * FROM \"%s\".\"%s\" WHERE key = intAsBlob(%d)", KEYSPACE1, CF_STANDARD1, i); + UntypedResultSet result = executeInternal(query); + assertNotNull(result); + if (i < 50) + { + Iterator<UntypedResultSet.Row> it = result.iterator(); + assertNotNull(it); + for (int j = 0; j < 10; j++) + { + assertTrue(it.hasNext()); + UntypedResultSet.Row row = it.next(); + + assertEquals(ByteBufferUtil.bytes(i), row.getBytes("key")); + assertEquals("name" + j, row.getString("name")); + assertEquals("val" + j, row.getString("val")); + } + + assertFalse(it.hasNext()); + } + else + { + assertTrue(result.isEmpty()); + } + } + + // Ensure that no stray mutations got somehow applied. + UntypedResultSet result = executeInternal(String.format("SELECT count(*) FROM \"%s\".\"%s\"", KEYSPACE1, CF_STANDARD1)); + assertNotNull(result); + assertEquals(500, result.one().getLong("count")); + } + + @Test + public void testTruncatedReplay() throws InterruptedException, ExecutionException + { + CFMetaData cf2 = Schema.instance.getCFMetaData(KEYSPACE1, CF_STANDARD2); + CFMetaData cf3 = Schema.instance.getCFMetaData(KEYSPACE1, CF_STANDARD3); + // Generate 2000 mutations (1000 batchlog entries) and put them all into the batchlog. + // Each batchlog entry with a mutation for Standard2 and Standard3. + // In the middle of the process, 'truncate' Standard2. + for (int i = 0; i < 1000; i++) + { + Mutation mutation1 = new RowUpdateBuilder(cf2, FBUtilities.timestampMicros(), ByteBufferUtil.bytes(i)) + .clustering("name" + i) + .add("val", "val" + i) + .build(); + Mutation mutation2 = new RowUpdateBuilder(cf3, FBUtilities.timestampMicros(), ByteBufferUtil.bytes(i)) + .clustering("name" + i) + .add("val", "val" + i) + .build(); + + List<Mutation> mutations = Lists.newArrayList(mutation1, mutation2); + + // Make sure it's ready to be replayed, so adjust the timestamp. + long timestamp = System.currentTimeMillis() - BatchlogManager.getBatchlogTimeout(); + + if (i == 500) + SystemKeyspace.saveTruncationRecord(Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD2), + timestamp, + ReplayPosition.NONE); + + // Adjust the timestamp (slightly) to make the test deterministic. + if (i >= 500) + timestamp++; + else + timestamp--; + + BatchlogManager.store(Batch.createLocal(UUIDGen.getTimeUUID(timestamp, i), FBUtilities.timestampMicros(), mutations)); + } + + // Flush the batchlog to disk (see CASSANDRA-6822). + Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.BATCHES).forceBlockingFlush(); + + // Force batchlog replay and wait for it to complete. + BatchlogManager.instance.startBatchlogReplay().get(); + + // We should see half of Standard2-targeted mutations written after the replay and all of Standard3 mutations applied. + for (int i = 0; i < 1000; i++) + { + UntypedResultSet result = executeInternal(String.format("SELECT * FROM \"%s\".\"%s\" WHERE key = intAsBlob(%d)", KEYSPACE1, CF_STANDARD2,i)); + assertNotNull(result); + if (i >= 500) + { + assertEquals(ByteBufferUtil.bytes(i), result.one().getBytes("key")); + assertEquals("name" + i, result.one().getString("name")); + assertEquals("val" + i, result.one().getString("val")); + } + else + { + assertTrue(result.isEmpty()); + } + } + + for (int i = 0; i < 1000; i++) + { + UntypedResultSet result = executeInternal(String.format("SELECT * FROM \"%s\".\"%s\" WHERE key = intAsBlob(%d)", KEYSPACE1, CF_STANDARD3, i)); + assertNotNull(result); + assertEquals(ByteBufferUtil.bytes(i), result.one().getBytes("key")); + assertEquals("name" + i, result.one().getString("name")); + assertEquals("val" + i, result.one().getString("val")); + } + } + + @Test + @SuppressWarnings("deprecation") + public void testConversion() throws Exception + { + long initialAllBatches = BatchlogManager.instance.countAllBatches(); + long initialReplayedBatches = BatchlogManager.instance.getTotalBatchesReplayed(); + CFMetaData cfm = Schema.instance.getCFMetaData(KEYSPACE1, CF_STANDARD4); + + // Generate 1400 version 2.0 mutations and put them all into the batchlog. + // Half ready to be replayed, half not. + for (int i = 0; i < 1400; i++) + { + Mutation mutation = new RowUpdateBuilder(cfm, FBUtilities.timestampMicros(), ByteBufferUtil.bytes(i)) + .clustering("name" + i) + .add("val", "val" + i) + .build(); + + long timestamp = i < 700 + ? (System.currentTimeMillis() - BatchlogManager.getBatchlogTimeout()) + : (System.currentTimeMillis() + BatchlogManager.getBatchlogTimeout()); + + + Mutation batchMutation = LegacyBatchlogMigrator.getStoreMutation(Batch.createLocal(UUIDGen.getTimeUUID(timestamp, i), + TimeUnit.MILLISECONDS.toMicros(timestamp), + Collections.singleton(mutation)), + MessagingService.VERSION_20); + assertTrue(LegacyBatchlogMigrator.isLegacyBatchlogMutation(batchMutation)); + LegacyBatchlogMigrator.handleLegacyMutation(batchMutation); + } + + // Mix in 100 current version mutations, 50 ready for replay. + for (int i = 1400; i < 1500; i++) + { + Mutation mutation = new RowUpdateBuilder(cfm, FBUtilities.timestampMicros(), ByteBufferUtil.bytes(i)) + .clustering("name" + i) + .add("val", "val" + i) + .build(); + + long timestamp = i < 1450 + ? (System.currentTimeMillis() - BatchlogManager.getBatchlogTimeout()) + : (System.currentTimeMillis() + BatchlogManager.getBatchlogTimeout()); + + + BatchlogManager.store(Batch.createLocal(UUIDGen.getTimeUUID(timestamp, i), + FBUtilities.timestampMicros(), + Collections.singleton(mutation))); + } + + // Flush the batchlog to disk (see CASSANDRA-6822). + Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.BATCHES).forceBlockingFlush(); + + assertEquals(1500, BatchlogManager.instance.countAllBatches() - initialAllBatches); + assertEquals(0, BatchlogManager.instance.getTotalBatchesReplayed() - initialReplayedBatches); + + UntypedResultSet result = executeInternal(String.format("SELECT count(*) FROM \"%s\".\"%s\"", SystemKeyspace.NAME, SystemKeyspace.LEGACY_BATCHLOG)); + assertNotNull(result); + assertEquals("Count in blog legacy", 0, result.one().getLong("count")); + result = executeInternal(String.format("SELECT count(*) FROM \"%s\".\"%s\"", SystemKeyspace.NAME, SystemKeyspace.BATCHES)); + assertNotNull(result); + assertEquals("Count in blog", 1500, result.one().getLong("count")); + + // Force batchlog replay and wait for it to complete. + BatchlogManager.instance.performInitialReplay(); + + // Ensure that the first half, and only the first half, got replayed. + assertEquals(750, BatchlogManager.instance.countAllBatches() - initialAllBatches); + assertEquals(750, BatchlogManager.instance.getTotalBatchesReplayed() - initialReplayedBatches); + + for (int i = 0; i < 1500; i++) + { + result = executeInternal(String.format("SELECT * FROM \"%s\".\"%s\" WHERE key = intAsBlob(%d)", KEYSPACE1, CF_STANDARD4, i)); + assertNotNull(result); + if (i < 700 || i >= 1400 && i < 1450) + { + assertEquals(ByteBufferUtil.bytes(i), result.one().getBytes("key")); + assertEquals("name" + i, result.one().getString("name")); + assertEquals("val" + i, result.one().getString("val")); + } + else + { + assertTrue("Present at " + i, result.isEmpty()); + } + } + + // Ensure that no stray mutations got somehow applied. + result = executeInternal(String.format("SELECT count(*) FROM \"%s\".\"%s\"", KEYSPACE1, CF_STANDARD4)); + assertNotNull(result); + assertEquals(750, result.one().getLong("count")); + + // Ensure batchlog is left as expected. + result = executeInternal(String.format("SELECT count(*) FROM \"%s\".\"%s\"", SystemKeyspace.NAME, SystemKeyspace.BATCHES)); + assertNotNull(result); + assertEquals("Count in blog after initial replay", 750, result.one().getLong("count")); + result = executeInternal(String.format("SELECT count(*) FROM \"%s\".\"%s\"", SystemKeyspace.NAME, SystemKeyspace.LEGACY_BATCHLOG)); + assertNotNull(result); + assertEquals("Count in blog legacy after initial replay ", 0, result.one().getLong("count")); + } + + @Test + public void testAddBatch() throws IOException + { + long initialAllBatches = BatchlogManager.instance.countAllBatches(); + CFMetaData cfm = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD5).metadata; + + long timestamp = (System.currentTimeMillis() - DatabaseDescriptor.getWriteRpcTimeout() * 2) * 1000; + UUID uuid = UUIDGen.getTimeUUID(); + + // Add a batch with 10 mutations + List<Mutation> mutations = new ArrayList<>(10); + for (int j = 0; j < 10; j++) + { + mutations.add(new RowUpdateBuilder(cfm, FBUtilities.timestampMicros(), ByteBufferUtil.bytes(j)) + .clustering("name" + j) + .add("val", "val" + j) + .build()); + } + + + BatchlogManager.store(Batch.createLocal(uuid, timestamp, mutations)); + Assert.assertEquals(initialAllBatches + 1, BatchlogManager.instance.countAllBatches()); + + String query = String.format("SELECT count(*) FROM %s.%s where id = %s", + SystemKeyspace.NAME, + SystemKeyspace.BATCHES, + uuid); + UntypedResultSet result = executeInternal(query); + assertNotNull(result); + assertEquals(1L, result.one().getLong("count")); + } + + @Test + public void testRemoveBatch() + { + long initialAllBatches = BatchlogManager.instance.countAllBatches(); + CFMetaData cfm = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD5).metadata; + + long timestamp = (System.currentTimeMillis() - DatabaseDescriptor.getWriteRpcTimeout() * 2) * 1000; + UUID uuid = UUIDGen.getTimeUUID(); + + // Add a batch with 10 mutations + List<Mutation> mutations = new ArrayList<>(10); + for (int j = 0; j < 10; j++) + { + mutations.add(new RowUpdateBuilder(cfm, FBUtilities.timestampMicros(), ByteBufferUtil.bytes(j)) + .clustering("name" + j) + .add("val", "val" + j) + .build()); + } + + // Store the batch + BatchlogManager.store(Batch.createLocal(uuid, timestamp, mutations)); + Assert.assertEquals(initialAllBatches + 1, BatchlogManager.instance.countAllBatches()); + + // Remove the batch + BatchlogManager.remove(uuid); + + assertEquals(initialAllBatches, BatchlogManager.instance.countAllBatches()); + + String query = String.format("SELECT count(*) FROM %s.%s where id = %s", + SystemKeyspace.NAME, + SystemKeyspace.BATCHES, + uuid); + UntypedResultSet result = executeInternal(query); + assertNotNull(result); + assertEquals(0L, result.one().getLong("count")); + } +}
