This is an automated email from the ASF dual-hosted git repository.

ifesdjeen pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cep-15-accord by this push:
     new b0ca509e7a Accord Journal Determinism: PreAccept replay stability
b0ca509e7a is described below

commit b0ca509e7add760d187fcc5a9908d93d7c4fd6ec
Author: Alex Petrov <[email protected]>
AuthorDate: Wed May 29 19:16:26 2024 +0200

    Accord Journal Determinism: PreAccept replay stability
    
    Patch by Alex Petrov; reviewed by Aleksey Yeschenko for CASSANDRA-19664
---
 modules/accord                                     |   2 +-
 .../apache/cassandra/journal/RecordPointer.java    |  66 +++++++
 .../service/accord/AccordCommandStore.java         |   4 +-
 .../cassandra/service/accord/AccordJournal.java    | 206 +++++++++------------
 .../service/accord/AccordSafeCommandStore.java     |  49 ++++-
 5 files changed, 197 insertions(+), 130 deletions(-)

diff --git a/modules/accord b/modules/accord
index 4e8bcae81f..84e89bd91c 160000
--- a/modules/accord
+++ b/modules/accord
@@ -1 +1 @@
-Subproject commit 4e8bcae81f9751b9d732fd5056bce31c97ad58f3
+Subproject commit 84e89bd91cf1b058fbf314b750336a1ec1096b18
diff --git a/src/java/org/apache/cassandra/journal/RecordPointer.java 
b/src/java/org/apache/cassandra/journal/RecordPointer.java
new file mode 100644
index 0000000000..2b3e8ea6b8
--- /dev/null
+++ b/src/java/org/apache/cassandra/journal/RecordPointer.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.journal;
+
+import com.google.common.primitives.Ints;
+import com.google.common.primitives.Longs;
+
+// TODO: make this available in the accord table as an ID
+public class RecordPointer implements Comparable<RecordPointer>
+{
+    public final long segment; // unique segment id
+    public final int position; // record start position within the segment
+
+    public RecordPointer(long segment, int position)
+    {
+        this.segment = segment;
+        this.position = position;
+    }
+
+    @Override
+    public boolean equals(Object other)
+    {
+        if (this == other)
+            return true;
+        if (!(other instanceof RecordPointer))
+            return false;
+        RecordPointer that = (RecordPointer) other;
+        return this.segment == that.segment
+               && this.position == that.position;
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Long.hashCode(segment) + position * 31;
+    }
+
+    @Override
+    public String toString()
+    {
+        return "(" + segment + ", " + position + ')';
+    }
+
+    @Override
+    public int compareTo(RecordPointer that)
+    {
+        int cmp = Longs.compare(this.segment, that.segment);
+        return cmp != 0 ? cmp : Ints.compare(this.position, that.position);
+    }
+}
\ No newline at end of file
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java 
b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
index 2a67ba656d..c846038fd8 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
@@ -488,7 +488,9 @@ public class AccordCommandStore extends CommandStore 
implements CacheSize
         timestampsForKeys.values().forEach(AccordSafeState::preExecute);
         if (commandsForRanges != null)
             commandsForRanges.preExecute();
-        current = new AccordSafeCommandStore(preLoadContext, commands, 
timestampsForKeys, commandsForKeys, commandsForRanges, this);
+
+        current = AccordSafeCommandStore.create(preLoadContext, commands, 
timestampsForKeys, commandsForKeys, commandsForRanges, this);
+
         return current;
     }
 
diff --git a/src/java/org/apache/cassandra/service/accord/AccordJournal.java 
b/src/java/org/apache/cassandra/service/accord/AccordJournal.java
index ce90b26747..0c31afbb4c 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordJournal.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordJournal.java
@@ -40,22 +40,16 @@ import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ListMultimap;
 import com.google.common.collect.Multimap;
 import com.google.common.primitives.Ints;
-import com.google.common.primitives.Longs;
-
-import accord.messages.ApplyThenWaitUntilApplied;
-import org.agrona.collections.Long2ObjectHashMap;
-import org.agrona.collections.LongArrayList;
-import org.agrona.collections.ObjectHashSet;
-import org.cliffc.high_scale_lib.NonBlockingHashMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import accord.local.Node.Id;
 import accord.local.Node;
+import accord.local.Node.Id;
 import accord.local.SerializerSupport;
 import accord.messages.AbstractEpochRequest;
 import accord.messages.Accept;
 import accord.messages.Apply;
+import accord.messages.ApplyThenWaitUntilApplied;
 import accord.messages.BeginRecovery;
 import accord.messages.Commit;
 import accord.messages.LocalRequest;
@@ -63,6 +57,7 @@ import accord.messages.Message;
 import accord.messages.MessageType;
 import accord.messages.PreAccept;
 import accord.messages.Propagate;
+import accord.messages.ReplyContext;
 import accord.messages.Request;
 import accord.messages.TxnRequest;
 import accord.primitives.Ballot;
@@ -70,6 +65,9 @@ import accord.primitives.Timestamp;
 import accord.primitives.TxnId;
 import accord.utils.Invariants;
 import accord.utils.MapReduceConsume;
+import org.agrona.collections.Long2ObjectHashMap;
+import org.agrona.collections.LongArrayList;
+import org.agrona.collections.ObjectHashSet;
 import org.apache.cassandra.concurrent.Interruptible;
 import org.apache.cassandra.concurrent.ManyToOneConcurrentLinkedQueue;
 import org.apache.cassandra.concurrent.SequentialExecutorPlus;
@@ -83,13 +81,14 @@ import org.apache.cassandra.journal.AsyncCallbacks;
 import org.apache.cassandra.journal.Journal;
 import org.apache.cassandra.journal.KeySupport;
 import org.apache.cassandra.journal.Params;
+import org.apache.cassandra.journal.RecordPointer;
 import org.apache.cassandra.journal.ValueSerializer;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.service.accord.interop.AccordInteropApply;
-import org.apache.cassandra.service.accord.interop.AccordInteropCommit;
 import org.apache.cassandra.net.ResponseContext;
 import org.apache.cassandra.net.Verb;
+import org.apache.cassandra.service.accord.interop.AccordInteropApply;
+import org.apache.cassandra.service.accord.interop.AccordInteropCommit;
 import org.apache.cassandra.service.accord.serializers.AcceptSerializers;
 import org.apache.cassandra.service.accord.serializers.ApplySerializers;
 import 
org.apache.cassandra.service.accord.serializers.BeginInvalidationSerializers;
@@ -104,6 +103,8 @@ import 
org.apache.cassandra.service.accord.serializers.SetDurableSerializers;
 import org.apache.cassandra.utils.ByteArrayUtil;
 import org.apache.cassandra.utils.ExecutorUtils;
 import org.apache.cassandra.utils.concurrent.Semaphore;
+import org.apache.cassandra.utils.vint.VIntCoding;
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
 import org.jctools.queues.SpscLinkedQueue;
 
 import static accord.messages.MessageType.ACCEPT_INVALIDATE_REQ;
@@ -120,18 +121,18 @@ import static 
accord.messages.MessageType.INFORM_DURABLE_REQ;
 import static accord.messages.MessageType.INFORM_OF_TXN_REQ;
 import static accord.messages.MessageType.PRE_ACCEPT_REQ;
 import static accord.messages.MessageType.PROPAGATE_APPLY_MSG;
-import static accord.messages.MessageType.PROPAGATE_STABLE_MSG;
 import static accord.messages.MessageType.PROPAGATE_OTHER_MSG;
 import static accord.messages.MessageType.PROPAGATE_PRE_ACCEPT_MSG;
+import static accord.messages.MessageType.PROPAGATE_STABLE_MSG;
 import static accord.messages.MessageType.SET_GLOBALLY_DURABLE_REQ;
 import static accord.messages.MessageType.SET_SHARD_DURABLE_REQ;
+import static accord.messages.MessageType.STABLE_FAST_PATH_REQ;
+import static accord.messages.MessageType.STABLE_MAXIMAL_REQ;
+import static accord.messages.MessageType.STABLE_SLOW_PATH_REQ;
 import static 
org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory;
 import static 
org.apache.cassandra.concurrent.InfiniteLoopExecutor.Daemon.NON_DAEMON;
 import static 
org.apache.cassandra.concurrent.InfiniteLoopExecutor.Interrupts.SYNCHRONIZED;
 import static 
org.apache.cassandra.concurrent.InfiniteLoopExecutor.SimulatorSafe.SAFE;
-import static accord.messages.MessageType.STABLE_FAST_PATH_REQ;
-import static accord.messages.MessageType.STABLE_MAXIMAL_REQ;
-import static accord.messages.MessageType.STABLE_SLOW_PATH_REQ;
 import static org.apache.cassandra.concurrent.Interruptible.State.NORMAL;
 import static org.apache.cassandra.db.TypeSizes.BYTE_SIZE;
 import static org.apache.cassandra.db.TypeSizes.INT_SIZE;
@@ -165,7 +166,7 @@ public class AccordJournal implements IJournal, Shutdownable
      * A cache of deserialized journal records we keep to avoid fetching them 
from log when free memory allows it.
      * TODO (expected, performance): cap memory used for cached records
      */
-    private final NonBlockingHashMap<Pointer, Object> cachedRecords = new 
NonBlockingHashMap<>();
+    private final NonBlockingHashMap<RecordPointer, Object> cachedRecords = 
new NonBlockingHashMap<>();
 
     Node node;
 
@@ -292,88 +293,7 @@ public class AccordJournal implements IJournal, 
Shutdownable
         return null;
     }
 
-    private static class Pointer implements Comparable<Pointer>
-    {
-        final long segment; // unique segment id
-        final int position; // record start position within the segment
-
-        Pointer(long segment, int position)
-        {
-            this.segment = segment;
-            this.position = position;
-        }
-
-        @Override
-        public boolean equals(Object other)
-        {
-            if (this == other)
-                return true;
-            if (!(other instanceof Pointer))
-                return false;
-            Pointer that = (Pointer) other;
-            return this.segment == that.segment
-                && this.position == that.position;
-        }
-
-        @Override
-        public int hashCode()
-        {
-            return Long.hashCode(segment) + position * 31;
-        }
-
-        @Override
-        public String toString()
-        {
-            return "(" + segment + ", " + position + ')';
-        }
-
-        @Override
-        public int compareTo(Pointer that)
-        {
-            int cmp = Longs.compare(this.segment, that.segment);
-            return cmp != 0 ? cmp : Ints.compare(this.position, that.position);
-        }
-
-        int serializedSize()
-        {
-            return computeUnsignedVIntSize(segment) + 
computeUnsignedVIntSize(position);
-        }
-
-        void serialize(DataOutputPlus out) throws IOException
-        {
-            out.writeUnsignedVInt(segment);
-            out.writeUnsignedVInt32(position);
-        }
-
-        static Pointer deserialize(DataInputPlus in) throws IOException
-        {
-            long segment = in.readUnsignedVInt();
-            int position = in.readUnsignedVInt32();
-            return new Pointer(segment, position);
-        }
-
-        static final IVersionedSerializer<Pointer> SERIALIZER = new 
IVersionedSerializer<>()
-        {
-            @Override
-            public void serialize(Pointer p, DataOutputPlus out, int version) 
throws IOException
-            {
-                p.serialize(out);
-            }
-
-            @Override
-            public Pointer deserialize(DataInputPlus in, int version) throws 
IOException
-            {
-                return Pointer.deserialize(in);
-            }
-
-            @Override
-            public long serializedSize(Pointer p, int version)
-            {
-                return Ints.checkedCast(p.serializedSize());
-            }
-        };
-    }
-
+    // TODO (alexp): tests for objects that go through AccordJournal
     private class JournalCallbacks implements AsyncCallbacks<Key, Object>
     {
         /**
@@ -382,7 +302,7 @@ public class AccordJournal implements IJournal, Shutdownable
         @Override
         public void onWrite(long segment, int position, int size, Key key, 
Object value, Object writeContext)
         {
-            Pointer pointer = new Pointer(segment, position);
+            RecordPointer pointer = new RecordPointer(segment, position);
             cachedRecords.put(pointer, value);
 
             /*
@@ -451,29 +371,40 @@ public class AccordJournal implements IJournal, 
Shutdownable
      * Context necessary to process log records
      */
 
-    private static class RequestContext
+    static class RequestContext implements ReplyContext
     {
         final long waitForEpoch;
-        final Pointer pointer;
+        final RecordPointer pointer;
+        private long preAcceptTimeout;
 
-        RequestContext(long waitForEpoch, Pointer pointer)
+        RequestContext(long waitForEpoch, RecordPointer pointer)
         {
             this.waitForEpoch = waitForEpoch;
             this.pointer = pointer;
         }
+
+        void preAcceptTimeout(long preAcceptTimeout)
+        {
+            this.preAcceptTimeout = preAcceptTimeout;
+        }
+
+        public long preAcceptTimeout()
+        {
+            return preAcceptTimeout;
+        }
     }
 
     private static class LocalRequestContext extends RequestContext
     {
         private final BiConsumer<?, Throwable> callback;
 
-        LocalRequestContext(long waitForEpoch, BiConsumer<?, Throwable> 
callback, Pointer pointer)
+        LocalRequestContext(long waitForEpoch, BiConsumer<?, Throwable> 
callback, RecordPointer pointer)
         {
             super(waitForEpoch, pointer);
             this.callback = callback;
         }
 
-        static LocalRequestContext create(LocalRequest<?> request, Pointer 
pointer)
+        static LocalRequestContext create(LocalRequest<?> request, 
RecordPointer pointer)
         {
             return new LocalRequestContext(request.waitForEpoch(), 
request.callback(), pointer);
         }
@@ -489,7 +420,7 @@ public class AccordJournal implements IJournal, Shutdownable
         private final Verb verb;
         private final long expiresAtNanos;
 
-        RemoteRequestContext(long waitForEpoch, long id, InetAddressAndPort 
from, Verb verb, long expiresAtNanos, Pointer pointer)
+        RemoteRequestContext(long waitForEpoch, long id, InetAddressAndPort 
from, Verb verb, long expiresAtNanos, RecordPointer pointer)
         {
             super(waitForEpoch, pointer);
             this.id = id;
@@ -498,7 +429,7 @@ public class AccordJournal implements IJournal, Shutdownable
             this.expiresAtNanos = expiresAtNanos;
         }
 
-        static RemoteRequestContext create(long waitForEpoch, ResponseContext 
context, Pointer pointer)
+        static RemoteRequestContext create(long waitForEpoch, ResponseContext 
context, RecordPointer pointer)
         {
             return new RemoteRequestContext(waitForEpoch, context.id(), 
context.from(), context.verb(), context.expiresAtNanos(), pointer);
         }
@@ -1155,9 +1086,9 @@ public class AccordJournal implements IJournal, 
Shutdownable
 
             if (requests != null)
             {
-                ArrayList<Pointer> pointers = new ArrayList<>(requests.size());
+                ArrayList<RecordPointer> pointers = new 
ArrayList<>(requests.size());
                 for (RequestContext req : requests) pointers.add(req.pointer);
-                FrameRecord frame = new FrameRecord(node.uniqueNow(), 
pointers);
+                FrameRecord frame = new FrameRecord(node.uniqueNow(), 
pointers, node.agent().preAcceptTimeout());
                 FrameContext context = new FrameContext(requests);
                 appendAuxiliaryRecord(frame, context);
             }
@@ -1178,20 +1109,20 @@ public class AccordJournal implements IJournal, 
Shutdownable
         private final ArrayList<PendingFrame> pendingFrames = new 
ArrayList<>();
 
         /* furthest flushed journal segment + position */
-        private volatile Pointer flushedUntil = null;
+        private volatile RecordPointer flushedUntil = null;
 
         private volatile SequentialExecutorPlus executor;
 
         /* invoked from FrameGenerator thread via appendAuxiliaryRecord() call 
*/
-        void onWrite(Pointer start, int size, FrameContext context)
+        void onWrite(RecordPointer start, int size, FrameContext context)
         {
-            newFrames.add(new PendingFrame(start, new Pointer(start.segment, 
start.position + size), context));
+            newFrames.add(new PendingFrame(start, new 
RecordPointer(start.segment, start.position + size), context));
         }
 
         /* invoked only from Journal Flusher thread (single) */
         void onFlush(long segment, int position)
         {
-            flushedUntil = new Pointer(segment, position);
+            flushedUntil = new RecordPointer(segment, position);
             executor.submit(this);
         }
 
@@ -1231,7 +1162,7 @@ public class AccordJournal implements IJournal, 
Shutdownable
                 pendingFrames.sort((f1, f2) -> f2.start.compareTo(f1.start));
             }
 
-            Pointer flushedUntil = this.flushedUntil;
+            RecordPointer flushedUntil = this.flushedUntil;
             for (int i = pendingFrames.size() - 1; i >= 0; i--)
             {
                 PendingFrame frame = pendingFrames.get(i);
@@ -1246,13 +1177,15 @@ public class AccordJournal implements IJournal, 
Shutdownable
         {
             Invariants.checkState(frame.pointers.size() == 
context.requestContexts.size());
             for (int i = 0; i < frame.pointers.size(); i++)
-                applyRequest(frame.pointers.get(i), 
context.requestContexts.get(i));
+                applyRequest(frame.pointers.get(i), 
context.requestContexts.get(i), frame.preAcceptTimeoutMicros);
         }
 
-        private void applyRequest(Pointer pointer, RequestContext context)
+        private void applyRequest(RecordPointer pointer, RequestContext 
context, long preAcceptTimeout)
         {
             Request request = (Request) cachedRecords.remove(pointer);
             Type type = Type.fromMessageType(request.type());
+            if (type == Type.PRE_ACCEPT || type == Type.BEGIN_RECOVER)
+                context.preAcceptTimeout(preAcceptTimeout);
 
             if (type.isRemoteRequest())
             {
@@ -1264,6 +1197,7 @@ public class AccordJournal implements IJournal, 
Shutdownable
             {
                 Invariants.checkState(type.isLocalRequest());
                 LocalRequestContext ctx = (LocalRequestContext) context;
+                // TODO (expected): Make Propagate PreAccept receive 
preAcceptTimeout and timestamps
                 //noinspection unchecked,rawtypes
                 ((LocalRequest) request).process(node, ctx.callback);
             }
@@ -1276,11 +1210,11 @@ public class AccordJournal implements IJournal, 
Shutdownable
          */
         private final class PendingFrame
         {
-            final Pointer start;
-            final Pointer end;
+            final RecordPointer start;
+            final RecordPointer end;
             final FrameContext context;
 
-            PendingFrame(Pointer start, Pointer end, FrameContext context)
+            PendingFrame(RecordPointer start, RecordPointer end, FrameContext 
context)
             {
                 this.start = start;
                 this.end = end;
@@ -1311,14 +1245,40 @@ public class AccordJournal implements IJournal, 
Shutdownable
         abstract Type type();
     }
 
+    public static final IVersionedSerializer<RecordPointer> 
RECORD_POINTER_SERIALIZER = new IVersionedSerializer<>()
+    {
+        @Override
+        public void serialize(RecordPointer p, DataOutputPlus out, int 
version) throws IOException
+        {
+            out.writeUnsignedVInt(p.segment);
+            out.writeUnsignedVInt32(p.position);
+        }
+
+        @Override
+        public RecordPointer deserialize(DataInputPlus in, int version) throws 
IOException
+        {
+            long segment = in.readUnsignedVInt();
+            int position = in.readUnsignedVInt32();
+            return new RecordPointer(segment, position);
+        }
+
+        @Override
+        public long serializedSize(RecordPointer p, int version)
+        {
+            return computeUnsignedVIntSize(p.segment) + 
computeUnsignedVIntSize(p.position);
+        }
+    };
+
     private static final class FrameRecord extends AuxiliaryRecord
     {
-        final List<Pointer> pointers;
+        final List<RecordPointer> pointers;
+        final long preAcceptTimeoutMicros;
 
-        FrameRecord(Timestamp timestamp, List<Pointer> pointers)
+        FrameRecord(Timestamp timestamp, List<RecordPointer> pointers, long 
preAcceptTimeoutMicros)
         {
             super(timestamp);
             this.pointers = pointers;
+            this.preAcceptTimeoutMicros = preAcceptTimeoutMicros;
         }
 
         @Override
@@ -1332,19 +1292,21 @@ public class AccordJournal implements IJournal, 
Shutdownable
             @Override
             public int serializedSize(Key key, FrameRecord frame, int 
userVersion)
             {
-                return Ints.checkedCast(serializedListSize(frame.pointers, 
userVersion, Pointer.SERIALIZER));
+                return Ints.checkedCast(serializedListSize(frame.pointers, 
userVersion, RECORD_POINTER_SERIALIZER)) +
+                       computeUnsignedVIntSize(frame.preAcceptTimeoutMicros);
             }
 
             @Override
             public void serialize(Key key, FrameRecord frame, DataOutputPlus 
out, int userVersion) throws IOException
             {
-                serializeList(frame.pointers, out, userVersion, 
Pointer.SERIALIZER);
+                serializeList(frame.pointers, out, userVersion, 
RECORD_POINTER_SERIALIZER);
+                VIntCoding.writeUnsignedVInt(frame.preAcceptTimeoutMicros, 
out);
             }
 
             @Override
             public FrameRecord deserialize(Key key, DataInputPlus in, int 
userVersion) throws IOException
             {
-                return new FrameRecord(key.timestamp, deserializeList(in, 
userVersion, Pointer.SERIALIZER));
+                return new FrameRecord(key.timestamp, deserializeList(in, 
userVersion, RECORD_POINTER_SERIALIZER), VIntCoding.readUnsignedVInt(in));
             }
         };
     }
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java 
b/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java
index 7e8e9f43ac..32a6973b07 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java
@@ -33,6 +33,10 @@ import accord.impl.CommandsSummary;
 import accord.local.CommandStores.RangesForEpoch;
 import accord.local.NodeTimeService;
 import accord.local.PreLoadContext;
+import accord.messages.BeginRecovery;
+
+import accord.messages.PreAccept;
+import accord.messages.TxnRequest;
 import accord.primitives.AbstractKeys;
 import accord.primitives.AbstractRanges;
 import accord.primitives.Deps;
@@ -45,6 +49,7 @@ import accord.primitives.TxnId;
 
 public class AccordSafeCommandStore extends 
AbstractSafeCommandStore<AccordSafeCommand, AccordSafeTimestampsForKey, 
AccordSafeCommandsForKey>
 {
+    private final long preAcceptTimeout;
     private final Map<TxnId, AccordSafeCommand> commands;
     private final NavigableMap<Key, AccordSafeCommandsForKey> commandsForKeys;
     private final NavigableMap<Key, AccordSafeTimestampsForKey> 
timestampsForKeys;
@@ -52,14 +57,16 @@ public class AccordSafeCommandStore extends 
AbstractSafeCommandStore<AccordSafeC
     private final AccordCommandStore commandStore;
     private final RangesForEpoch ranges;
 
-    public AccordSafeCommandStore(PreLoadContext context,
-                                  Map<TxnId, AccordSafeCommand> commands,
-                                  NavigableMap<Key, 
AccordSafeTimestampsForKey> timestampsForKey,
-                                  NavigableMap<Key, AccordSafeCommandsForKey> 
commandsForKey,
-                                  @Nullable AccordSafeCommandsForRanges 
commandsForRanges,
-                                  AccordCommandStore commandStore)
+    private AccordSafeCommandStore(PreLoadContext context,
+                                   long preAcceptTimeout,
+                                   Map<TxnId, AccordSafeCommand> commands,
+                                   NavigableMap<Key, 
AccordSafeTimestampsForKey> timestampsForKey,
+                                   NavigableMap<Key, AccordSafeCommandsForKey> 
commandsForKey,
+                                   @Nullable AccordSafeCommandsForRanges 
commandsForRanges,
+                                   AccordCommandStore commandStore)
     {
         super(context);
+        this.preAcceptTimeout = preAcceptTimeout;
         this.commands = commands;
         this.timestampsForKeys = timestampsForKey;
         this.commandsForKeys = commandsForKey;
@@ -68,6 +75,26 @@ public class AccordSafeCommandStore extends 
AbstractSafeCommandStore<AccordSafeC
         this.ranges = commandStore.updateRangesForEpoch();
     }
 
+    public static AccordSafeCommandStore create(PreLoadContext preLoadContext,
+                                                Map<TxnId, AccordSafeCommand> 
commands,
+                                                NavigableMap<Key, 
AccordSafeTimestampsForKey> timestampsForKey,
+                                                NavigableMap<Key, 
AccordSafeCommandsForKey> commandsForKey,
+                                                @Nullable 
AccordSafeCommandsForRanges commandsForRanges,
+                                                AccordCommandStore 
commandStore)
+    {
+        long preAcceptTimeoutMicros = -1;
+        if ((preLoadContext instanceof PreAccept || preLoadContext instanceof 
BeginRecovery))
+        {
+            TxnRequest<?> preAccept = (TxnRequest<?>) preLoadContext;
+            AccordJournal.RequestContext context = 
(AccordJournal.RequestContext) preAccept.replyContext();
+            // TODO (required): SimulatedDepsTest and some other tests aren't 
calling preProcess, hence do not set context
+            if (context != null)
+                preAcceptTimeoutMicros = context.preAcceptTimeout();
+        }
+
+        return new AccordSafeCommandStore(preLoadContext, 
preAcceptTimeoutMicros, commands, timestampsForKey, commandsForKey, 
commandsForRanges, commandStore);
+    }
+
     @Override
     protected AccordSafeCommand getCommandInternal(TxnId txnId)
     {
@@ -155,9 +182,19 @@ public class AccordSafeCommandStore extends 
AbstractSafeCommandStore<AccordSafeC
     @Override
     public NodeTimeService time()
     {
+        // TODO: safe command store should not have arbitrary time
         return commandStore.time();
     }
 
+    @Override
+    public long preAcceptTimeout()
+    {
+        if (preAcceptTimeout == -1)
+            return super.preAcceptTimeout();
+
+        return preAcceptTimeout;
+    }
+
     @Override
     public RangesForEpoch ranges()
     {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to