This is an automated email from the ASF dual-hosted git repository.
ifesdjeen pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cep-15-accord by this push:
new b0ca509e7a Accord Journal Determinism: PreAccept replay stability
b0ca509e7a is described below
commit b0ca509e7add760d187fcc5a9908d93d7c4fd6ec
Author: Alex Petrov <[email protected]>
AuthorDate: Wed May 29 19:16:26 2024 +0200
Accord Journal Determinism: PreAccept replay stability
Patch by Alex Petrov; reviewed by Aleksey Yeschenko for CASSANDRA-19664
---
modules/accord | 2 +-
.../apache/cassandra/journal/RecordPointer.java | 66 +++++++
.../service/accord/AccordCommandStore.java | 4 +-
.../cassandra/service/accord/AccordJournal.java | 206 +++++++++------------
.../service/accord/AccordSafeCommandStore.java | 49 ++++-
5 files changed, 197 insertions(+), 130 deletions(-)
diff --git a/modules/accord b/modules/accord
index 4e8bcae81f..84e89bd91c 160000
--- a/modules/accord
+++ b/modules/accord
@@ -1 +1 @@
-Subproject commit 4e8bcae81f9751b9d732fd5056bce31c97ad58f3
+Subproject commit 84e89bd91cf1b058fbf314b750336a1ec1096b18
diff --git a/src/java/org/apache/cassandra/journal/RecordPointer.java
b/src/java/org/apache/cassandra/journal/RecordPointer.java
new file mode 100644
index 0000000000..2b3e8ea6b8
--- /dev/null
+++ b/src/java/org/apache/cassandra/journal/RecordPointer.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.journal;
+
+import com.google.common.primitives.Ints;
+import com.google.common.primitives.Longs;
+
+// TODO: make this available in the accord table as an ID
+public class RecordPointer implements Comparable<RecordPointer>
+{
+ public final long segment; // unique segment id
+ public final int position; // record start position within the segment
+
+ public RecordPointer(long segment, int position)
+ {
+ this.segment = segment;
+ this.position = position;
+ }
+
+ @Override
+ public boolean equals(Object other)
+ {
+ if (this == other)
+ return true;
+ if (!(other instanceof RecordPointer))
+ return false;
+ RecordPointer that = (RecordPointer) other;
+ return this.segment == that.segment
+ && this.position == that.position;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Long.hashCode(segment) + position * 31;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "(" + segment + ", " + position + ')';
+ }
+
+ @Override
+ public int compareTo(RecordPointer that)
+ {
+ int cmp = Longs.compare(this.segment, that.segment);
+ return cmp != 0 ? cmp : Ints.compare(this.position, that.position);
+ }
+}
\ No newline at end of file
diff --git
a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
index 2a67ba656d..c846038fd8 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
@@ -488,7 +488,9 @@ public class AccordCommandStore extends CommandStore
implements CacheSize
timestampsForKeys.values().forEach(AccordSafeState::preExecute);
if (commandsForRanges != null)
commandsForRanges.preExecute();
- current = new AccordSafeCommandStore(preLoadContext, commands,
timestampsForKeys, commandsForKeys, commandsForRanges, this);
+
+ current = AccordSafeCommandStore.create(preLoadContext, commands,
timestampsForKeys, commandsForKeys, commandsForRanges, this);
+
return current;
}
diff --git a/src/java/org/apache/cassandra/service/accord/AccordJournal.java
b/src/java/org/apache/cassandra/service/accord/AccordJournal.java
index ce90b26747..0c31afbb4c 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordJournal.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordJournal.java
@@ -40,22 +40,16 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Multimap;
import com.google.common.primitives.Ints;
-import com.google.common.primitives.Longs;
-
-import accord.messages.ApplyThenWaitUntilApplied;
-import org.agrona.collections.Long2ObjectHashMap;
-import org.agrona.collections.LongArrayList;
-import org.agrona.collections.ObjectHashSet;
-import org.cliffc.high_scale_lib.NonBlockingHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import accord.local.Node.Id;
import accord.local.Node;
+import accord.local.Node.Id;
import accord.local.SerializerSupport;
import accord.messages.AbstractEpochRequest;
import accord.messages.Accept;
import accord.messages.Apply;
+import accord.messages.ApplyThenWaitUntilApplied;
import accord.messages.BeginRecovery;
import accord.messages.Commit;
import accord.messages.LocalRequest;
@@ -63,6 +57,7 @@ import accord.messages.Message;
import accord.messages.MessageType;
import accord.messages.PreAccept;
import accord.messages.Propagate;
+import accord.messages.ReplyContext;
import accord.messages.Request;
import accord.messages.TxnRequest;
import accord.primitives.Ballot;
@@ -70,6 +65,9 @@ import accord.primitives.Timestamp;
import accord.primitives.TxnId;
import accord.utils.Invariants;
import accord.utils.MapReduceConsume;
+import org.agrona.collections.Long2ObjectHashMap;
+import org.agrona.collections.LongArrayList;
+import org.agrona.collections.ObjectHashSet;
import org.apache.cassandra.concurrent.Interruptible;
import org.apache.cassandra.concurrent.ManyToOneConcurrentLinkedQueue;
import org.apache.cassandra.concurrent.SequentialExecutorPlus;
@@ -83,13 +81,14 @@ import org.apache.cassandra.journal.AsyncCallbacks;
import org.apache.cassandra.journal.Journal;
import org.apache.cassandra.journal.KeySupport;
import org.apache.cassandra.journal.Params;
+import org.apache.cassandra.journal.RecordPointer;
import org.apache.cassandra.journal.ValueSerializer;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.service.accord.interop.AccordInteropApply;
-import org.apache.cassandra.service.accord.interop.AccordInteropCommit;
import org.apache.cassandra.net.ResponseContext;
import org.apache.cassandra.net.Verb;
+import org.apache.cassandra.service.accord.interop.AccordInteropApply;
+import org.apache.cassandra.service.accord.interop.AccordInteropCommit;
import org.apache.cassandra.service.accord.serializers.AcceptSerializers;
import org.apache.cassandra.service.accord.serializers.ApplySerializers;
import
org.apache.cassandra.service.accord.serializers.BeginInvalidationSerializers;
@@ -104,6 +103,8 @@ import
org.apache.cassandra.service.accord.serializers.SetDurableSerializers;
import org.apache.cassandra.utils.ByteArrayUtil;
import org.apache.cassandra.utils.ExecutorUtils;
import org.apache.cassandra.utils.concurrent.Semaphore;
+import org.apache.cassandra.utils.vint.VIntCoding;
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
import org.jctools.queues.SpscLinkedQueue;
import static accord.messages.MessageType.ACCEPT_INVALIDATE_REQ;
@@ -120,18 +121,18 @@ import static
accord.messages.MessageType.INFORM_DURABLE_REQ;
import static accord.messages.MessageType.INFORM_OF_TXN_REQ;
import static accord.messages.MessageType.PRE_ACCEPT_REQ;
import static accord.messages.MessageType.PROPAGATE_APPLY_MSG;
-import static accord.messages.MessageType.PROPAGATE_STABLE_MSG;
import static accord.messages.MessageType.PROPAGATE_OTHER_MSG;
import static accord.messages.MessageType.PROPAGATE_PRE_ACCEPT_MSG;
+import static accord.messages.MessageType.PROPAGATE_STABLE_MSG;
import static accord.messages.MessageType.SET_GLOBALLY_DURABLE_REQ;
import static accord.messages.MessageType.SET_SHARD_DURABLE_REQ;
+import static accord.messages.MessageType.STABLE_FAST_PATH_REQ;
+import static accord.messages.MessageType.STABLE_MAXIMAL_REQ;
+import static accord.messages.MessageType.STABLE_SLOW_PATH_REQ;
import static
org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory;
import static
org.apache.cassandra.concurrent.InfiniteLoopExecutor.Daemon.NON_DAEMON;
import static
org.apache.cassandra.concurrent.InfiniteLoopExecutor.Interrupts.SYNCHRONIZED;
import static
org.apache.cassandra.concurrent.InfiniteLoopExecutor.SimulatorSafe.SAFE;
-import static accord.messages.MessageType.STABLE_FAST_PATH_REQ;
-import static accord.messages.MessageType.STABLE_MAXIMAL_REQ;
-import static accord.messages.MessageType.STABLE_SLOW_PATH_REQ;
import static org.apache.cassandra.concurrent.Interruptible.State.NORMAL;
import static org.apache.cassandra.db.TypeSizes.BYTE_SIZE;
import static org.apache.cassandra.db.TypeSizes.INT_SIZE;
@@ -165,7 +166,7 @@ public class AccordJournal implements IJournal, Shutdownable
* A cache of deserialized journal records we keep to avoid fetching them
from log when free memory allows it.
* TODO (expected, performance): cap memory used for cached records
*/
- private final NonBlockingHashMap<Pointer, Object> cachedRecords = new
NonBlockingHashMap<>();
+ private final NonBlockingHashMap<RecordPointer, Object> cachedRecords =
new NonBlockingHashMap<>();
Node node;
@@ -292,88 +293,7 @@ public class AccordJournal implements IJournal,
Shutdownable
return null;
}
- private static class Pointer implements Comparable<Pointer>
- {
- final long segment; // unique segment id
- final int position; // record start position within the segment
-
- Pointer(long segment, int position)
- {
- this.segment = segment;
- this.position = position;
- }
-
- @Override
- public boolean equals(Object other)
- {
- if (this == other)
- return true;
- if (!(other instanceof Pointer))
- return false;
- Pointer that = (Pointer) other;
- return this.segment == that.segment
- && this.position == that.position;
- }
-
- @Override
- public int hashCode()
- {
- return Long.hashCode(segment) + position * 31;
- }
-
- @Override
- public String toString()
- {
- return "(" + segment + ", " + position + ')';
- }
-
- @Override
- public int compareTo(Pointer that)
- {
- int cmp = Longs.compare(this.segment, that.segment);
- return cmp != 0 ? cmp : Ints.compare(this.position, that.position);
- }
-
- int serializedSize()
- {
- return computeUnsignedVIntSize(segment) +
computeUnsignedVIntSize(position);
- }
-
- void serialize(DataOutputPlus out) throws IOException
- {
- out.writeUnsignedVInt(segment);
- out.writeUnsignedVInt32(position);
- }
-
- static Pointer deserialize(DataInputPlus in) throws IOException
- {
- long segment = in.readUnsignedVInt();
- int position = in.readUnsignedVInt32();
- return new Pointer(segment, position);
- }
-
- static final IVersionedSerializer<Pointer> SERIALIZER = new
IVersionedSerializer<>()
- {
- @Override
- public void serialize(Pointer p, DataOutputPlus out, int version)
throws IOException
- {
- p.serialize(out);
- }
-
- @Override
- public Pointer deserialize(DataInputPlus in, int version) throws
IOException
- {
- return Pointer.deserialize(in);
- }
-
- @Override
- public long serializedSize(Pointer p, int version)
- {
- return Ints.checkedCast(p.serializedSize());
- }
- };
- }
-
+ // TODO (alexp): tests for objects that go through AccordJournal
private class JournalCallbacks implements AsyncCallbacks<Key, Object>
{
/**
@@ -382,7 +302,7 @@ public class AccordJournal implements IJournal, Shutdownable
@Override
public void onWrite(long segment, int position, int size, Key key,
Object value, Object writeContext)
{
- Pointer pointer = new Pointer(segment, position);
+ RecordPointer pointer = new RecordPointer(segment, position);
cachedRecords.put(pointer, value);
/*
@@ -451,29 +371,40 @@ public class AccordJournal implements IJournal,
Shutdownable
* Context necessary to process log records
*/
- private static class RequestContext
+ static class RequestContext implements ReplyContext
{
final long waitForEpoch;
- final Pointer pointer;
+ final RecordPointer pointer;
+ private long preAcceptTimeout;
- RequestContext(long waitForEpoch, Pointer pointer)
+ RequestContext(long waitForEpoch, RecordPointer pointer)
{
this.waitForEpoch = waitForEpoch;
this.pointer = pointer;
}
+
+ void preAcceptTimeout(long preAcceptTimeout)
+ {
+ this.preAcceptTimeout = preAcceptTimeout;
+ }
+
+ public long preAcceptTimeout()
+ {
+ return preAcceptTimeout;
+ }
}
private static class LocalRequestContext extends RequestContext
{
private final BiConsumer<?, Throwable> callback;
- LocalRequestContext(long waitForEpoch, BiConsumer<?, Throwable>
callback, Pointer pointer)
+ LocalRequestContext(long waitForEpoch, BiConsumer<?, Throwable>
callback, RecordPointer pointer)
{
super(waitForEpoch, pointer);
this.callback = callback;
}
- static LocalRequestContext create(LocalRequest<?> request, Pointer
pointer)
+ static LocalRequestContext create(LocalRequest<?> request,
RecordPointer pointer)
{
return new LocalRequestContext(request.waitForEpoch(),
request.callback(), pointer);
}
@@ -489,7 +420,7 @@ public class AccordJournal implements IJournal, Shutdownable
private final Verb verb;
private final long expiresAtNanos;
- RemoteRequestContext(long waitForEpoch, long id, InetAddressAndPort
from, Verb verb, long expiresAtNanos, Pointer pointer)
+ RemoteRequestContext(long waitForEpoch, long id, InetAddressAndPort
from, Verb verb, long expiresAtNanos, RecordPointer pointer)
{
super(waitForEpoch, pointer);
this.id = id;
@@ -498,7 +429,7 @@ public class AccordJournal implements IJournal, Shutdownable
this.expiresAtNanos = expiresAtNanos;
}
- static RemoteRequestContext create(long waitForEpoch, ResponseContext
context, Pointer pointer)
+ static RemoteRequestContext create(long waitForEpoch, ResponseContext
context, RecordPointer pointer)
{
return new RemoteRequestContext(waitForEpoch, context.id(),
context.from(), context.verb(), context.expiresAtNanos(), pointer);
}
@@ -1155,9 +1086,9 @@ public class AccordJournal implements IJournal,
Shutdownable
if (requests != null)
{
- ArrayList<Pointer> pointers = new ArrayList<>(requests.size());
+ ArrayList<RecordPointer> pointers = new
ArrayList<>(requests.size());
for (RequestContext req : requests) pointers.add(req.pointer);
- FrameRecord frame = new FrameRecord(node.uniqueNow(),
pointers);
+ FrameRecord frame = new FrameRecord(node.uniqueNow(),
pointers, node.agent().preAcceptTimeout());
FrameContext context = new FrameContext(requests);
appendAuxiliaryRecord(frame, context);
}
@@ -1178,20 +1109,20 @@ public class AccordJournal implements IJournal,
Shutdownable
private final ArrayList<PendingFrame> pendingFrames = new
ArrayList<>();
/* furthest flushed journal segment + position */
- private volatile Pointer flushedUntil = null;
+ private volatile RecordPointer flushedUntil = null;
private volatile SequentialExecutorPlus executor;
/* invoked from FrameGenerator thread via appendAuxiliaryRecord() call
*/
- void onWrite(Pointer start, int size, FrameContext context)
+ void onWrite(RecordPointer start, int size, FrameContext context)
{
- newFrames.add(new PendingFrame(start, new Pointer(start.segment,
start.position + size), context));
+ newFrames.add(new PendingFrame(start, new
RecordPointer(start.segment, start.position + size), context));
}
/* invoked only from Journal Flusher thread (single) */
void onFlush(long segment, int position)
{
- flushedUntil = new Pointer(segment, position);
+ flushedUntil = new RecordPointer(segment, position);
executor.submit(this);
}
@@ -1231,7 +1162,7 @@ public class AccordJournal implements IJournal,
Shutdownable
pendingFrames.sort((f1, f2) -> f2.start.compareTo(f1.start));
}
- Pointer flushedUntil = this.flushedUntil;
+ RecordPointer flushedUntil = this.flushedUntil;
for (int i = pendingFrames.size() - 1; i >= 0; i--)
{
PendingFrame frame = pendingFrames.get(i);
@@ -1246,13 +1177,15 @@ public class AccordJournal implements IJournal,
Shutdownable
{
Invariants.checkState(frame.pointers.size() ==
context.requestContexts.size());
for (int i = 0; i < frame.pointers.size(); i++)
- applyRequest(frame.pointers.get(i),
context.requestContexts.get(i));
+ applyRequest(frame.pointers.get(i),
context.requestContexts.get(i), frame.preAcceptTimeoutMicros);
}
- private void applyRequest(Pointer pointer, RequestContext context)
+ private void applyRequest(RecordPointer pointer, RequestContext
context, long preAcceptTimeout)
{
Request request = (Request) cachedRecords.remove(pointer);
Type type = Type.fromMessageType(request.type());
+ if (type == Type.PRE_ACCEPT || type == Type.BEGIN_RECOVER)
+ context.preAcceptTimeout(preAcceptTimeout);
if (type.isRemoteRequest())
{
@@ -1264,6 +1197,7 @@ public class AccordJournal implements IJournal,
Shutdownable
{
Invariants.checkState(type.isLocalRequest());
LocalRequestContext ctx = (LocalRequestContext) context;
+ // TODO (expected): Make Propagate PreAccept receive
preAcceptTimeout and timestamps
//noinspection unchecked,rawtypes
((LocalRequest) request).process(node, ctx.callback);
}
@@ -1276,11 +1210,11 @@ public class AccordJournal implements IJournal,
Shutdownable
*/
private final class PendingFrame
{
- final Pointer start;
- final Pointer end;
+ final RecordPointer start;
+ final RecordPointer end;
final FrameContext context;
- PendingFrame(Pointer start, Pointer end, FrameContext context)
+ PendingFrame(RecordPointer start, RecordPointer end, FrameContext
context)
{
this.start = start;
this.end = end;
@@ -1311,14 +1245,40 @@ public class AccordJournal implements IJournal,
Shutdownable
abstract Type type();
}
+ public static final IVersionedSerializer<RecordPointer>
RECORD_POINTER_SERIALIZER = new IVersionedSerializer<>()
+ {
+ @Override
+ public void serialize(RecordPointer p, DataOutputPlus out, int
version) throws IOException
+ {
+ out.writeUnsignedVInt(p.segment);
+ out.writeUnsignedVInt32(p.position);
+ }
+
+ @Override
+ public RecordPointer deserialize(DataInputPlus in, int version) throws
IOException
+ {
+ long segment = in.readUnsignedVInt();
+ int position = in.readUnsignedVInt32();
+ return new RecordPointer(segment, position);
+ }
+
+ @Override
+ public long serializedSize(RecordPointer p, int version)
+ {
+ return computeUnsignedVIntSize(p.segment) +
computeUnsignedVIntSize(p.position);
+ }
+ };
+
private static final class FrameRecord extends AuxiliaryRecord
{
- final List<Pointer> pointers;
+ final List<RecordPointer> pointers;
+ final long preAcceptTimeoutMicros;
- FrameRecord(Timestamp timestamp, List<Pointer> pointers)
+ FrameRecord(Timestamp timestamp, List<RecordPointer> pointers, long
preAcceptTimeoutMicros)
{
super(timestamp);
this.pointers = pointers;
+ this.preAcceptTimeoutMicros = preAcceptTimeoutMicros;
}
@Override
@@ -1332,19 +1292,21 @@ public class AccordJournal implements IJournal,
Shutdownable
@Override
public int serializedSize(Key key, FrameRecord frame, int
userVersion)
{
- return Ints.checkedCast(serializedListSize(frame.pointers,
userVersion, Pointer.SERIALIZER));
+ return Ints.checkedCast(serializedListSize(frame.pointers,
userVersion, RECORD_POINTER_SERIALIZER)) +
+ computeUnsignedVIntSize(frame.preAcceptTimeoutMicros);
}
@Override
public void serialize(Key key, FrameRecord frame, DataOutputPlus
out, int userVersion) throws IOException
{
- serializeList(frame.pointers, out, userVersion,
Pointer.SERIALIZER);
+ serializeList(frame.pointers, out, userVersion,
RECORD_POINTER_SERIALIZER);
+ VIntCoding.writeUnsignedVInt(frame.preAcceptTimeoutMicros,
out);
}
@Override
public FrameRecord deserialize(Key key, DataInputPlus in, int
userVersion) throws IOException
{
- return new FrameRecord(key.timestamp, deserializeList(in,
userVersion, Pointer.SERIALIZER));
+ return new FrameRecord(key.timestamp, deserializeList(in,
userVersion, RECORD_POINTER_SERIALIZER), VIntCoding.readUnsignedVInt(in));
}
};
}
diff --git
a/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java
b/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java
index 7e8e9f43ac..32a6973b07 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java
@@ -33,6 +33,10 @@ import accord.impl.CommandsSummary;
import accord.local.CommandStores.RangesForEpoch;
import accord.local.NodeTimeService;
import accord.local.PreLoadContext;
+import accord.messages.BeginRecovery;
+
+import accord.messages.PreAccept;
+import accord.messages.TxnRequest;
import accord.primitives.AbstractKeys;
import accord.primitives.AbstractRanges;
import accord.primitives.Deps;
@@ -45,6 +49,7 @@ import accord.primitives.TxnId;
public class AccordSafeCommandStore extends
AbstractSafeCommandStore<AccordSafeCommand, AccordSafeTimestampsForKey,
AccordSafeCommandsForKey>
{
+ private final long preAcceptTimeout;
private final Map<TxnId, AccordSafeCommand> commands;
private final NavigableMap<Key, AccordSafeCommandsForKey> commandsForKeys;
private final NavigableMap<Key, AccordSafeTimestampsForKey>
timestampsForKeys;
@@ -52,14 +57,16 @@ public class AccordSafeCommandStore extends
AbstractSafeCommandStore<AccordSafeC
private final AccordCommandStore commandStore;
private final RangesForEpoch ranges;
- public AccordSafeCommandStore(PreLoadContext context,
- Map<TxnId, AccordSafeCommand> commands,
- NavigableMap<Key,
AccordSafeTimestampsForKey> timestampsForKey,
- NavigableMap<Key, AccordSafeCommandsForKey>
commandsForKey,
- @Nullable AccordSafeCommandsForRanges
commandsForRanges,
- AccordCommandStore commandStore)
+ private AccordSafeCommandStore(PreLoadContext context,
+ long preAcceptTimeout,
+ Map<TxnId, AccordSafeCommand> commands,
+ NavigableMap<Key,
AccordSafeTimestampsForKey> timestampsForKey,
+ NavigableMap<Key, AccordSafeCommandsForKey>
commandsForKey,
+ @Nullable AccordSafeCommandsForRanges
commandsForRanges,
+ AccordCommandStore commandStore)
{
super(context);
+ this.preAcceptTimeout = preAcceptTimeout;
this.commands = commands;
this.timestampsForKeys = timestampsForKey;
this.commandsForKeys = commandsForKey;
@@ -68,6 +75,26 @@ public class AccordSafeCommandStore extends
AbstractSafeCommandStore<AccordSafeC
this.ranges = commandStore.updateRangesForEpoch();
}
+ public static AccordSafeCommandStore create(PreLoadContext preLoadContext,
+ Map<TxnId, AccordSafeCommand>
commands,
+ NavigableMap<Key,
AccordSafeTimestampsForKey> timestampsForKey,
+ NavigableMap<Key,
AccordSafeCommandsForKey> commandsForKey,
+ @Nullable
AccordSafeCommandsForRanges commandsForRanges,
+ AccordCommandStore
commandStore)
+ {
+ long preAcceptTimeoutMicros = -1;
+ if ((preLoadContext instanceof PreAccept || preLoadContext instanceof
BeginRecovery))
+ {
+ TxnRequest<?> preAccept = (TxnRequest<?>) preLoadContext;
+ AccordJournal.RequestContext context =
(AccordJournal.RequestContext) preAccept.replyContext();
+ // TODO (required): SimulatedDepsTest and some other tests aren't
calling preProcess, hence do not set context
+ if (context != null)
+ preAcceptTimeoutMicros = context.preAcceptTimeout();
+ }
+
+ return new AccordSafeCommandStore(preLoadContext,
preAcceptTimeoutMicros, commands, timestampsForKey, commandsForKey,
commandsForRanges, commandStore);
+ }
+
@Override
protected AccordSafeCommand getCommandInternal(TxnId txnId)
{
@@ -155,9 +182,19 @@ public class AccordSafeCommandStore extends
AbstractSafeCommandStore<AccordSafeC
@Override
public NodeTimeService time()
{
+ // TODO: safe command store should not have arbitrary time
return commandStore.time();
}
+ @Override
+ public long preAcceptTimeout()
+ {
+ if (preAcceptTimeout == -1)
+ return super.preAcceptTimeout();
+
+ return preAcceptTimeout;
+ }
+
@Override
public RangesForEpoch ranges()
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]