This is an automated email from the ASF dual-hosted git repository.
dcapwell pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
The following commit(s) were added to refs/heads/trunk by this push:
new 3d0ff07c CEP-15: (C*) Add notion of CommandsForRanges and make this
durable in C*
3d0ff07c is described below
commit 3d0ff07cd5c7db43390b85afa593e6f76471d886
Author: David Capwell <[email protected]>
AuthorDate: Thu May 25 13:18:56 2023 -0700
CEP-15: (C*) Add notion of CommandsForRanges and make this durable in C*
patch by David Capwell; reviewed by Ariel Weisberg for CASSANDRA-18519
---
.../java/accord/coordinate/RangeUnavailable.java | 38 ++++
.../java/accord/coordinate/ReadCoordinator.java | 6 +-
.../src/main/java/accord/coordinate/Recover.java | 2 +-
.../java/accord/impl/AbstractSafeCommandStore.java | 2 +-
.../main/java/accord/impl/CommandTimeseries.java | 229 +++++++++++++++++++++
.../java/accord/impl/CommandTimeseriesHolder.java | 29 +++
.../src/main/java/accord/impl/CommandsForKey.java | 185 +----------------
.../java/accord/impl/InMemoryCommandStore.java | 91 ++++----
.../main/java/accord/impl/SafeCommandsForKey.java | 3 +-
.../src/main/java/accord/local/Bootstrap.java | 4 +-
.../src/main/java/accord/local/CommandStore.java | 13 +-
.../src/main/java/accord/local/CommandStores.java | 27 ++-
.../main/java/accord/local/SafeCommandStore.java | 2 +
.../src/main/java/accord/local/SaveStatus.java | 12 ++
.../src/main/java/accord/primitives/Ranges.java | 20 ++
.../src/main/java/accord/primitives/Timestamp.java | 10 +
.../src/main/java/accord/topology/Topologies.java | 5 +
.../src/main/java/accord/topology/Topology.java | 53 +++--
.../main/java/accord/topology/TopologyManager.java | 40 ++--
.../main/java/accord/utils/async/AsyncChains.java | 2 +-
.../main/java/accord/utils/async/Observable.java | 159 ++++++++++++++
.../src/test/java/accord/impl/mock/EpochSync.java | 2 +-
.../test/java/accord/messages/PreAcceptTest.java | 4 +-
.../java/accord/topology/TopologyManagerTest.java | 4 +-
.../test/java/accord/topology/TopologyTest.java | 2 +-
.../src/test/java/accord/maelstrom/Runner.java | 9 +-
.../java/accord/maelstrom/SimpleRandomTest.java | 1 +
27 files changed, 683 insertions(+), 271 deletions(-)
diff --git a/accord-core/src/main/java/accord/coordinate/RangeUnavailable.java
b/accord-core/src/main/java/accord/coordinate/RangeUnavailable.java
new file mode 100644
index 00000000..b94e506c
--- /dev/null
+++ b/accord-core/src/main/java/accord/coordinate/RangeUnavailable.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.coordinate;
+
+import accord.primitives.Ranges;
+import accord.primitives.TxnId;
+
+public class RangeUnavailable extends Exhausted
+{
+ public final Ranges unavailable;
+
+ public RangeUnavailable(Ranges unavailable, TxnId txnId)
+ {
+ super(txnId, null, buildMessage(unavailable));
+ this.unavailable = unavailable;
+ }
+
+ private static String buildMessage(Ranges unavailable)
+ {
+ return "The following ranges are unavailable to read: " + unavailable;
+ }
+}
diff --git a/accord-core/src/main/java/accord/coordinate/ReadCoordinator.java
b/accord-core/src/main/java/accord/coordinate/ReadCoordinator.java
index 66fa96c2..da744f7d 100644
--- a/accord-core/src/main/java/accord/coordinate/ReadCoordinator.java
+++ b/accord-core/src/main/java/accord/coordinate/ReadCoordinator.java
@@ -131,7 +131,11 @@ public abstract class ReadCoordinator<Reply extends
accord.messages.Reply> exten
break;
case ApprovePartial:
- handle(recordPartialReadSuccess(from, unavailable(reply)));
+ Ranges unavailable = unavailable(reply);
+ RequestStatus result = recordPartialReadSuccess(from,
unavailable);
+ if (result == RequestStatus.Failed && failure == null)
+ failure = new RangeUnavailable(unavailable, txnId);
+ handle(result);
break;
}
}
diff --git a/accord-core/src/main/java/accord/coordinate/Recover.java
b/accord-core/src/main/java/accord/coordinate/Recover.java
index 62ed788c..a0c68a0b 100644
--- a/accord-core/src/main/java/accord/coordinate/Recover.java
+++ b/accord-core/src/main/java/accord/coordinate/Recover.java
@@ -66,7 +66,7 @@ public class Recover implements Callback<RecoverReply>,
BiConsumer<Result, Throw
AwaitCommit(Node node, TxnId txnId, Unseekables<?, ?> unseekables)
{
- Topology topology =
node.topology().globalForEpoch(txnId.epoch()).forSelection(unseekables);
+ Topology topology =
node.topology().globalForEpoch(txnId.epoch()).forSelection(unseekables,
Topology.OnUnknown.REJECT);
this.tracker = new QuorumTracker(new
Topologies.Single(node.topology().sorter(), topology));
node.send(topology.nodes(), to -> new WaitOnCommit(to, topology,
txnId, unseekables), this);
}
diff --git
a/accord-core/src/main/java/accord/impl/AbstractSafeCommandStore.java
b/accord-core/src/main/java/accord/impl/AbstractSafeCommandStore.java
index cc7de494..36bc696b 100644
--- a/accord-core/src/main/java/accord/impl/AbstractSafeCommandStore.java
+++ b/accord-core/src/main/java/accord/impl/AbstractSafeCommandStore.java
@@ -26,7 +26,7 @@ import java.util.function.Consumer;
import java.util.function.Function;
import accord.api.VisibleForImplementation;
-import accord.impl.CommandsForKey.CommandLoader;
+import accord.impl.CommandTimeseries.CommandLoader;
import accord.local.Command;
import accord.local.CommonAttributes;
import accord.local.PreLoadContext;
diff --git a/accord-core/src/main/java/accord/impl/CommandTimeseries.java
b/accord-core/src/main/java/accord/impl/CommandTimeseries.java
new file mode 100644
index 00000000..69d4dbcd
--- /dev/null
+++ b/accord-core/src/main/java/accord/impl/CommandTimeseries.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.impl;
+
+import java.util.List;
+import java.util.NavigableMap;
+import java.util.Objects;
+import java.util.TreeMap;
+import java.util.function.Predicate;
+import java.util.stream.Stream;
+import javax.annotation.Nullable;
+
+import com.google.common.collect.ImmutableSortedMap;
+
+import accord.api.Key;
+import accord.local.Command;
+import accord.local.SafeCommandStore;
+import accord.local.SaveStatus;
+import accord.local.Status;
+import accord.primitives.Seekable;
+import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
+
+import static accord.local.SafeCommandStore.TestDep.ANY_DEPS;
+import static accord.local.SafeCommandStore.TestDep.WITH;
+import static accord.utils.Utils.ensureSortedImmutable;
+import static accord.utils.Utils.ensureSortedMutable;
+
+public class CommandTimeseries<D>
+{
+ public enum TestTimestamp
+ {BEFORE, AFTER}
+
+ private final Seekable keyOrRange;
+ protected final CommandLoader<D> loader;
+ public final ImmutableSortedMap<Timestamp, D> commands;
+
+ public CommandTimeseries(Update<D> builder)
+ {
+ this.keyOrRange = builder.keyOrRange;
+ this.loader = builder.loader;
+ this.commands = ensureSortedImmutable(builder.commands);
+ }
+
+ CommandTimeseries(Seekable keyOrRange, CommandLoader<D> loader,
ImmutableSortedMap<Timestamp, D> commands)
+ {
+ this.keyOrRange = keyOrRange;
+ this.loader = loader;
+ this.commands = commands;
+ }
+
+ public CommandTimeseries(Key keyOrRange, CommandLoader<D> loader)
+ {
+ this(keyOrRange, loader, ImmutableSortedMap.of());
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ CommandTimeseries<?> that = (CommandTimeseries<?>) o;
+ return keyOrRange.equals(that.keyOrRange) &&
loader.equals(that.loader) && commands.equals(that.commands);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int hash = 1;
+ hash = 31 * hash + Objects.hashCode(keyOrRange);
+ hash = 31 * hash + Objects.hashCode(loader);
+ hash = 31 * hash + Objects.hashCode(commands);
+ return hash;
+ }
+
+ public D get(Timestamp key)
+ {
+ return commands.get(key);
+ }
+
+ public boolean isEmpty()
+ {
+ return commands.isEmpty();
+ }
+
+ public Timestamp maxTimestamp()
+ {
+ return commands.isEmpty() ? Timestamp.NONE : commands.keySet().last();
+ }
+
+ /**
+ * All commands before/after (exclusive of) the given timestamp
+ * <p>
+ * Note that {@code testDep} applies only to commands that know at least
proposed deps; if specified any
+ * commands that do not know any deps will be ignored.
+ * <p>
+ * TODO (expected, efficiency): TestDep should be asynchronous; data
should not be kept memory-resident as only used for recovery
+ */
+ public <T> T mapReduce(SafeCommandStore.TestKind testKind, TestTimestamp
testTimestamp, Timestamp timestamp,
+ SafeCommandStore.TestDep testDep, @Nullable TxnId
depId,
+ @Nullable Status minStatus, @Nullable Status
maxStatus,
+ SafeCommandStore.CommandFunction<T, T> map, T
initialValue, T terminalValue)
+ {
+
+ for (D data : (testTimestamp == TestTimestamp.BEFORE ?
commands.headMap(timestamp, false) : commands.tailMap(timestamp,
false)).values())
+ {
+ TxnId txnId = loader.txnId(data);
+ if (!testKind.test(txnId.rw())) continue;
+ SaveStatus status = loader.saveStatus(data);
+ if (minStatus != null && minStatus.compareTo(status.status) > 0)
+ continue;
+ if (maxStatus != null && maxStatus.compareTo(status.status) < 0)
+ continue;
+ List<TxnId> deps = loader.depsIds(data);
+ // If we don't have any dependencies, we treat a dependency filter
as a mismatch
+ if (testDep != ANY_DEPS &&
(!status.known.deps.hasProposedOrDecidedDeps() || (deps.contains(depId) !=
(testDep == WITH))))
+ continue;
+ Timestamp executeAt = loader.executeAt(data);
+ initialValue = map.apply(keyOrRange, txnId, executeAt,
initialValue);
+ if (initialValue.equals(terminalValue))
+ break;
+ }
+ return initialValue;
+ }
+
+ Stream<TxnId> between(Timestamp min, Timestamp max, Predicate<Status>
statusPredicate)
+ {
+ return commands.subMap(min, true, max, true).values().stream()
+ .filter(d ->
statusPredicate.test(loader.status(d))).map(loader::txnId);
+ }
+
+ public Stream<D> all()
+ {
+ return commands.values().stream();
+ }
+
+ Update<D> beginUpdate()
+ {
+ return new Update<>(this);
+ }
+
+ public CommandLoader<D> loader()
+ {
+ return loader;
+ }
+
+ public interface CommandLoader<D>
+ {
+ D saveForCFK(Command command);
+
+ TxnId txnId(D data);
+ Timestamp executeAt(D data);
+ SaveStatus saveStatus(D data);
+ List<TxnId> depsIds(D data);
+
+ default Status status(D data)
+ {
+ return saveStatus(data).status;
+ }
+
+ default Status.Known known(D data)
+ {
+ return saveStatus(data).known;
+ }
+ }
+
+ public static class Update<D>
+ {
+ private final Seekable keyOrRange;
+ protected CommandLoader<D> loader;
+ protected NavigableMap<Timestamp, D> commands;
+
+ public Update(Seekable keyOrRange, CommandLoader<D> loader)
+ {
+ this.keyOrRange = keyOrRange;
+ this.loader = loader;
+ this.commands = new TreeMap<>();
+ }
+
+ public Update(CommandTimeseries<D> timeseries)
+ {
+ this.keyOrRange = timeseries.keyOrRange;
+ this.loader = timeseries.loader;
+ this.commands = timeseries.commands;
+ }
+
+ public Update<D> add(Timestamp timestamp, Command command)
+ {
+ commands = ensureSortedMutable(commands);
+ commands.put(timestamp, loader.saveForCFK(command));
+ return this;
+ }
+
+ public Update<D> add(Timestamp timestamp, D value)
+ {
+ commands = ensureSortedMutable(commands);
+ commands.put(timestamp, value);
+ return this;
+ }
+
+ public Update<D> remove(Timestamp timestamp)
+ {
+ commands = ensureSortedMutable(commands);
+ commands.remove(timestamp);
+ return this;
+ }
+
+ public CommandTimeseries<D> build()
+ {
+ return new CommandTimeseries<>(this);
+ }
+ }
+}
diff --git a/accord-core/src/main/java/accord/impl/CommandTimeseriesHolder.java
b/accord-core/src/main/java/accord/impl/CommandTimeseriesHolder.java
new file mode 100644
index 00000000..f0a88e75
--- /dev/null
+++ b/accord-core/src/main/java/accord/impl/CommandTimeseriesHolder.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.impl;
+
+import accord.primitives.Timestamp;
+
+public interface CommandTimeseriesHolder
+{
+ CommandTimeseries<?> byId();
+ CommandTimeseries<?> byExecuteAt();
+
+ Timestamp max();
+}
diff --git a/accord-core/src/main/java/accord/impl/CommandsForKey.java
b/accord-core/src/main/java/accord/impl/CommandsForKey.java
index a85ac114..97142d3a 100644
--- a/accord-core/src/main/java/accord/impl/CommandsForKey.java
+++ b/accord-core/src/main/java/accord/impl/CommandsForKey.java
@@ -19,24 +19,19 @@
package accord.impl;
import accord.api.Key;
+import accord.impl.CommandTimeseries.CommandLoader;
import accord.local.*;
import accord.primitives.*;
import com.google.common.collect.ImmutableSortedMap;
-import javax.annotation.Nullable;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
-import java.util.function.Predicate;
-import java.util.stream.Stream;
-import static accord.local.SafeCommandStore.TestDep.ANY_DEPS;
-import static accord.local.SafeCommandStore.TestDep.WITH;
import static accord.local.Status.PreAccepted;
import static accord.local.Status.PreCommitted;
-import static accord.utils.Utils.*;
-public class CommandsForKey
+public class CommandsForKey implements CommandTimeseriesHolder
{
public static class SerializerSupport
{
@@ -55,179 +50,6 @@ public class CommandsForKey
}
}
- public interface CommandLoader<D>
- {
- D saveForCFK(Command command);
-
- TxnId txnId(D data);
- Timestamp executeAt(D data);
- SaveStatus saveStatus(D data);
- List<TxnId> depsIds(D data);
-
- default Status status(D data)
- {
- return saveStatus(data).status;
- }
-
- default Status.Known known(D data)
- {
- return saveStatus(data).known;
- }
- }
-
- public static class CommandTimeseries<D>
- {
- public enum TestTimestamp {BEFORE, AFTER}
-
- private final Key key;
- protected final CommandLoader<D> loader;
- public final ImmutableSortedMap<Timestamp, D> commands;
-
- public CommandTimeseries(Update<D> builder)
- {
- this.key = builder.key;
- this.loader = builder.loader;
- this.commands = ensureSortedImmutable(builder.commands);
- }
-
- CommandTimeseries(Key key, CommandLoader<D> loader,
ImmutableSortedMap<Timestamp, D> commands)
- {
- this.key = key;
- this.loader = loader;
- this.commands = commands;
- }
-
- public CommandTimeseries(Key key, CommandLoader<D> loader)
- {
- this(key, loader, ImmutableSortedMap.of());
- }
-
- @Override
- public boolean equals(Object o)
- {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- CommandTimeseries<?> that = (CommandTimeseries<?>) o;
- return key.equals(that.key) && loader.equals(that.loader) &&
commands.equals(that.commands);
- }
-
- @Override
- public int hashCode()
- {
- int hash = 1;
- hash = 31 * hash + Objects.hashCode(key);
- hash = 31 * hash + Objects.hashCode(loader);
- hash = 31 * hash + Objects.hashCode(commands);
- return hash;
- }
-
- public D get(Timestamp key)
- {
- return commands.get(key);
- }
-
- public boolean isEmpty()
- {
- return commands.isEmpty();
- }
-
- /**
- * All commands before/after (exclusive of) the given timestamp
- * <p>
- * Note that {@code testDep} applies only to commands that know at
least proposed deps; if specified any
- * commands that do not know any deps will be ignored.
- * <p>
- * TODO (expected, efficiency): TestDep should be asynchronous; data
should not be kept memory-resident as only used for recovery
- */
- public <T> T mapReduce(SafeCommandStore.TestKind testKind,
TestTimestamp testTimestamp, Timestamp timestamp,
- SafeCommandStore.TestDep testDep, @Nullable
TxnId depId,
- @Nullable Status minStatus, @Nullable Status
maxStatus,
- SafeCommandStore.CommandFunction<T, T> map, T
initialValue, T terminalValue)
- {
-
- for (D data : (testTimestamp == TestTimestamp.BEFORE ?
commands.headMap(timestamp, false) : commands.tailMap(timestamp,
false)).values())
- {
- TxnId txnId = loader.txnId(data);
- Timestamp executeAt = loader.executeAt(data);
- SaveStatus status = loader.saveStatus(data);
- List<TxnId> deps = loader.depsIds(data);
- if (!testKind.test(txnId.rw())) continue;
- // If we don't have any dependencies, we treat a dependency
filter as a mismatch
- if (testDep != ANY_DEPS &&
(!status.known.deps.hasProposedOrDecidedDeps() || (deps.contains(depId) !=
(testDep == WITH))))
- continue;
- if (minStatus != null && minStatus.compareTo(status.status) >
0)
- continue;
- if (maxStatus != null && maxStatus.compareTo(status.status) <
0)
- continue;
- initialValue = map.apply(key, txnId, executeAt, initialValue);
- if (initialValue.equals(terminalValue))
- break;
- }
- return initialValue;
- }
-
- Stream<TxnId> between(Timestamp min, Timestamp max, Predicate<Status>
statusPredicate)
- {
- return commands.subMap(min, true, max, true).values().stream()
- .filter(d ->
statusPredicate.test(loader.status(d))).map(loader::txnId);
- }
-
- public Stream<D> all()
- {
- return commands.values().stream();
- }
-
- Update<D> beginUpdate()
- {
- return new Update<>(this);
- }
-
- public CommandLoader<D> loader()
- {
- return loader;
- }
-
- public static class Update<D>
- {
- private final Key key;
- protected CommandLoader<D> loader;
- protected NavigableMap<Timestamp, D> commands;
-
- public Update(Key key, CommandLoader<D> loader)
- {
- this.key = key;
- this.loader = loader;
- this.commands = new TreeMap<>();
- }
-
- public Update(CommandTimeseries<D> timeseries)
- {
- this.key = timeseries.key;
- this.loader = timeseries.loader;
- this.commands = timeseries.commands;
- }
-
- public CommandsForKey.CommandTimeseries.Update<D> add(Timestamp
timestamp, Command command)
- {
- commands = ensureSortedMutable(commands);
- commands.put(timestamp, loader.saveForCFK(command));
- return this;
- }
-
- public CommandsForKey.CommandTimeseries.Update<D> remove(Timestamp
timestamp)
- {
- commands = ensureSortedMutable(commands);
- commands.remove(timestamp);
- return this;
- }
-
- CommandTimeseries<D> build()
- {
- return new CommandTimeseries<>(this);
- }
- }
- }
-
public static class Listener implements
Command.DurableAndIdempotentListener
{
protected final Key listenerKey;
@@ -363,6 +185,7 @@ public class CommandsForKey
return key;
}
+ @Override
public Timestamp max()
{
return max;
@@ -383,11 +206,13 @@ public class CommandsForKey
return lastWriteTimestamp;
}
+ @Override
public CommandTimeseries<?> byId()
{
return byId;
}
+ @Override
public CommandTimeseries<?> byExecuteAt()
{
return byExecuteAt;
diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
index aefdf4eb..a380bdec 100644
--- a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
+++ b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
@@ -43,6 +43,7 @@ import accord.api.Agent;
import accord.api.DataStore;
import accord.api.Key;
import accord.api.ProgressLog;
+import accord.impl.CommandTimeseries.CommandLoader;
import accord.local.Command;
import accord.local.CommandStore;
import accord.local.CommandStores.RangesForEpoch;
@@ -109,6 +110,16 @@ public abstract class InMemoryCommandStore extends
CommandStore
return agent;
}
+ TreeMap<TxnId, Ranges> historicalRangeCommands()
+ {
+ return historicalRangeCommands;
+ }
+
+ TreeMap<TxnId, RangeCommand> rangeCommands()
+ {
+ return rangeCommands;
+ }
+
public GlobalCommand ifPresent(TxnId txnId)
{
return commands.get(txnId);
@@ -333,38 +344,6 @@ public abstract class InMemoryCommandStore extends
CommandStore
}
}
- @Override
- protected void registerHistoricalTransactions(Deps deps)
- {
- Ranges allRanges = rangesForEpochHolder.get().all();
- deps.keyDeps.keys().forEach(allRanges, key -> {
- SafeCommandsForKey cfk = commandsForKey(key).createSafeReference();
- deps.keyDeps.forEach(key, txnId -> {
- // TODO (desired, efficiency): this can be made more efficient
by batching by epoch
- if
(rangesForEpochHolder.get().coordinates(txnId).contains(key))
- return; // already coordinates, no need to replicate
- if
(!rangesForEpochHolder.get().allBefore(txnId.epoch()).contains(key))
- return;
-
- cfk.registerNotWitnessed(txnId);
- });
-
- });
- deps.rangeDeps.forEachUniqueTxnId(allRanges, txnId -> {
-
- if (rangeCommands.containsKey(txnId))
- return;
-
- Ranges ranges = deps.rangeDeps.ranges(txnId);
- if
(rangesForEpochHolder.get().coordinates(txnId).intersects(ranges))
- return; // already coordinates, no need to replicate
- if
(!rangesForEpochHolder.get().allBefore(txnId.epoch()).intersects(ranges))
- return;
-
- historicalRangeCommands.merge(txnId, ranges.slice(allRanges),
Ranges::with);
- });
- }
-
protected InMemorySafeStore createSafeStore(PreLoadContext context,
RangesForEpoch ranges, Map<TxnId, InMemorySafeCommand> commands,
Map<RoutableKey, InMemorySafeCommandsForKey> commandsForKeys)
{
return new InMemorySafeStore(this, cfkLoader, ranges, context,
commands, commandsForKeys);
@@ -482,7 +461,7 @@ public abstract class InMemoryCommandStore extends
CommandStore
}
}
- class CFKLoader implements CommandsForKey.CommandLoader<TxnId>
+ class CFKLoader implements CommandLoader<TxnId>
{
private Command loadForCFK(TxnId data)
{
@@ -702,6 +681,41 @@ public abstract class InMemoryCommandStore extends
CommandStore
return timestamp;
}
+ @Override
+ public void registerHistoricalTransactions(Deps deps)
+ {
+ RangesForEpochHolder rangesForEpochHolder =
commandStore.rangesForEpochHolder();
+ Ranges allRanges = rangesForEpochHolder.get().all();
+ deps.keyDeps.keys().forEach(allRanges, key -> {
+ SafeCommandsForKey cfk = commandsForKey(key);
+ deps.keyDeps.forEach(key, txnId -> {
+ // TODO (desired, efficiency): this can be made more
efficient by batching by epoch
+ if
(rangesForEpochHolder.get().coordinates(txnId).contains(key))
+ return; // already coordinates, no need to replicate
+ if
(!rangesForEpochHolder.get().allBefore(txnId.epoch()).contains(key))
+ return;
+
+ cfk.registerNotWitnessed(txnId);
+ });
+
+ });
+ TreeMap<TxnId, RangeCommand> rangeCommands =
commandStore.rangeCommands();
+ TreeMap<TxnId, Ranges> historicalRangeCommands =
commandStore.historicalRangeCommands();
+ deps.rangeDeps.forEachUniqueTxnId(allRanges, txnId -> {
+
+ if (rangeCommands.containsKey(txnId))
+ return;
+
+ Ranges ranges = deps.rangeDeps.ranges(txnId);
+ if
(rangesForEpochHolder.get().coordinates(txnId).intersects(ranges))
+ return; // already coordinates, no need to replicate
+ if
(!rangesForEpochHolder.get().allBefore(txnId.epoch()).intersects(ranges))
+ return;
+
+ historicalRangeCommands.merge(txnId, ranges.slice(allRanges),
Ranges::with);
+ });
+ }
+
public Timestamp maxApplied(Seekables<?, ?> keysOrRanges, Ranges slice)
{
Seekables<?, ?> sliced = keysOrRanges.slice(slice, Minimal);
@@ -725,7 +739,7 @@ public abstract class InMemoryCommandStore extends
CommandStore
public <T> T mapReduce(Seekables<?, ?> keysOrRanges, Ranges slice,
TestKind testKind, TestTimestamp testTimestamp, Timestamp timestamp, TestDep
testDep, @Nullable TxnId depId, @Nullable Status minStatus, @Nullable Status
maxStatus, CommandFunction<T, T> map, T accumulate, T terminalValue)
{
accumulate = commandStore.mapReduceForKey(this, keysOrRanges,
slice, (forKey, prev) -> {
- CommandsForKey.CommandTimeseries<?> timeseries;
+ CommandTimeseries<?> timeseries;
switch (testTimestamp)
{
default: throw new AssertionError();
@@ -737,17 +751,17 @@ public abstract class InMemoryCommandStore extends
CommandStore
case MAY_EXECUTE_BEFORE:
timeseries = forKey.byExecuteAt();
}
- CommandsForKey.CommandTimeseries.TestTimestamp
remapTestTimestamp;
+ CommandTimeseries.TestTimestamp remapTestTimestamp;
switch (testTimestamp)
{
default: throw new AssertionError();
case STARTED_AFTER:
case EXECUTES_AFTER:
- remapTestTimestamp =
CommandsForKey.CommandTimeseries.TestTimestamp.AFTER;
+ remapTestTimestamp =
CommandTimeseries.TestTimestamp.AFTER;
break;
case STARTED_BEFORE:
case MAY_EXECUTE_BEFORE:
- remapTestTimestamp =
CommandsForKey.CommandTimeseries.TestTimestamp.BEFORE;
+ remapTestTimestamp =
CommandTimeseries.TestTimestamp.BEFORE;
}
return timeseries.mapReduce(testKind, remapTestTimestamp,
timestamp, testDep, depId, minStatus, maxStatus, map, prev, terminalValue);
}, accumulate, terminalValue);
@@ -866,7 +880,8 @@ public abstract class InMemoryCommandStore extends
CommandStore
return commandStore.register(this, keyOrRange, slice, command,
attrs);
}
- public CommandsForKey.CommandLoader<?> cfkLoader()
+ @Override
+ public CommandLoader<?> cfkLoader()
{
return cfkLoader;
}
diff --git a/accord-core/src/main/java/accord/impl/SafeCommandsForKey.java
b/accord-core/src/main/java/accord/impl/SafeCommandsForKey.java
index 527c12d1..6ba8d900 100644
--- a/accord-core/src/main/java/accord/impl/SafeCommandsForKey.java
+++ b/accord-core/src/main/java/accord/impl/SafeCommandsForKey.java
@@ -20,8 +20,7 @@ package accord.impl;
import accord.api.Key;
import accord.api.VisibleForImplementation;
-import accord.impl.CommandsForKey.CommandLoader;
-import accord.impl.CommandsForKey.CommandTimeseries;
+import accord.impl.CommandTimeseries.CommandLoader;
import accord.local.Command;
import accord.primitives.Timestamp;
import accord.primitives.TxnId;
diff --git a/accord-core/src/main/java/accord/local/Bootstrap.java
b/accord-core/src/main/java/accord/local/Bootstrap.java
index 9fdd2044..b66a0d9a 100644
--- a/accord-core/src/main/java/accord/local/Bootstrap.java
+++ b/accord-core/src/main/java/accord/local/Bootstrap.java
@@ -134,6 +134,8 @@ class Bootstrap
Ranges commitRanges = valid;
store.markBootstrapping(safeStore0, globalSyncId, valid);
CoordinateSyncPoint.coordinate(node, globalSyncId, commitRanges)
+ // TODO (correcness) : PreLoadContext only works with
Seekables, which doesn't allow mixing Keys and Ranges... But Deps has both Keys
AND Ranges!
+ // ATM all known implementations store ranges in-memory, but
this will not be true soon, so this will need to be addressed
.flatMap(syncPoint -> node.withEpoch(epoch, () ->
store.submit(contextFor(localSyncId, syncPoint.waitFor.keyDeps.keys()),
safeStore1 -> {
if (valid.isEmpty()) // we've lost ownership of the range
return AsyncResults.success(Ranges.EMPTY);
@@ -141,7 +143,7 @@ class Bootstrap
Commands.commitRecipientLocalSyncPoint(safeStore1,
localSyncId, syncPoint, valid);
// TODO (now): this should use a dedicated local id,
distinct from the one we use to coordinate globally, as this may also be
committed and applied locally
// TODO (now): should we even be putting any partialDeps
here? Doesn't seem like it, as they're handled on source nodes.
-
safeStore1.commandStore().registerHistoricalTransactions(syncPoint.waitFor);
+
safeStore1.registerHistoricalTransactions(syncPoint.waitFor);
return fetch = safeStore1.dataStore().fetch(node,
safeStore1, valid, syncPoint, this);
})))
.flatMap(i -> i)
diff --git a/accord-core/src/main/java/accord/local/CommandStore.java
b/accord-core/src/main/java/accord/local/CommandStore.java
index c9b54432..37046e42 100644
--- a/accord-core/src/main/java/accord/local/CommandStore.java
+++ b/accord-core/src/main/java/accord/local/CommandStore.java
@@ -115,6 +115,11 @@ public abstract class CommandStore implements AgentExecutor
return agent;
}
+ public RangesForEpochHolder rangesForEpochHolder()
+ {
+ return rangesForEpochHolder;
+ }
+
public abstract boolean inStore();
public abstract AsyncChain<Void> execute(PreLoadContext context,
Consumer<? super SafeCommandStore> consumer);
@@ -141,8 +146,6 @@ public abstract class CommandStore implements AgentExecutor
this.bootstrapBeganAt = newBootstrapBeganAt;
}
- protected abstract void registerHistoricalTransactions(Deps deps);
-
/**
* This method may be invoked on a non-CommandStore thread
*/
@@ -304,8 +307,10 @@ public abstract class CommandStore implements AgentExecutor
}
else
{
- execute(contextFor(null, deps.txnIds()), safeStore -> {
- registerHistoricalTransactions(deps);
+ // TODO (correcness) : PreLoadContext only works with
Seekables, which doesn't allow mixing Keys and Ranges... But Deps has both Keys
AND Ranges!
+ // ATM all known implementations store ranges in-memory, but
this will not be true soon, so this will need to be addressed
+ execute(contextFor(null, deps.txnIds(), deps.keyDeps.keys()),
safeStore -> {
+ safeStore.registerHistoricalTransactions(deps);
}).begin((success, fail2) -> {
if (fail2 != null) fetchMajorityDeps(coordination, node,
epoch, ranges);
else coordination.setSuccess(null);
diff --git a/accord-core/src/main/java/accord/local/CommandStores.java
b/accord-core/src/main/java/accord/local/CommandStores.java
index b8203a1f..c5dca548 100644
--- a/accord-core/src/main/java/accord/local/CommandStores.java
+++ b/accord-core/src/main/java/accord/local/CommandStores.java
@@ -37,6 +37,7 @@ import accord.utils.async.AsyncChains;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.Map;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Predicate;
@@ -325,6 +326,11 @@ public abstract class CommandStores
}
}
+ protected boolean shouldBootstrap(Node node, Topology local, Topology
newLocalTopology, Range add)
+ {
+ return newLocalTopology.epoch() != 1;
+ }
+
private synchronized TopologyUpdate updateTopology(Node node, Snapshot
prev, Topology newTopology)
{
checkArgument(!newTopology.isSubset(), "Use full topology for
CommandStores.updateTopology");
@@ -371,9 +377,12 @@ public abstract class CommandStores
RangesForEpochHolder rangesHolder = new RangesForEpochHolder();
ShardHolder shardHolder = new
ShardHolder(supplier.create(nextId++, rangesHolder), rangesHolder);
rangesHolder.current = new RangesForEpoch(epoch, add,
shardHolder.store);
- // the first epoch we assume is either empty, or correctly
initialised by whatever system is migrating
- if (epoch == 1) bootstrapUpdates.add(() ->
shardHolder.store.initialise(epoch, add));
- else bootstrapUpdates.add(shardHolder.store.bootstrapper(node,
add, newLocalTopology.epoch()));
+
+ Map<Boolean, Ranges> partitioned = add.partitioningBy(range ->
shouldBootstrap(node, prev.local, newLocalTopology, range));
+ if (partitioned.containsKey(true))
+ bootstrapUpdates.add(shardHolder.store.bootstrapper(node,
partitioned.get(true), newLocalTopology.epoch()));
+ if (partitioned.containsKey(false))
+ bootstrapUpdates.add(() ->
shardHolder.store.initialise(epoch, partitioned.get(false)));
result.add(shardHolder);
}
}
@@ -554,6 +563,18 @@ public abstract class CommandStores
return snapshot.byId.get(id);
}
+ public int[] ids()
+ {
+ Snapshot snapshot = current;
+ Int2ObjectHashMap<CommandStore>.KeySet set = snapshot.byId.keySet();
+ int[] ids = new int[set.size()];
+ int idx = 0;
+ for (int a : set)
+ ids[idx++] = a;
+ Arrays.sort(ids);
+ return ids;
+ }
+
public int count()
{
return current.shards.length;
diff --git a/accord-core/src/main/java/accord/local/SafeCommandStore.java
b/accord-core/src/main/java/accord/local/SafeCommandStore.java
index dd2f7995..93532e85 100644
--- a/accord-core/src/main/java/accord/local/SafeCommandStore.java
+++ b/accord-core/src/main/java/accord/local/SafeCommandStore.java
@@ -23,6 +23,7 @@ import java.util.function.Predicate;
import accord.api.Agent;
import accord.api.DataStore;
import accord.api.ProgressLog;
+import accord.primitives.Deps;
import accord.primitives.Keys;
import accord.primitives.Ranges;
import accord.primitives.Seekable;
@@ -155,6 +156,7 @@ public interface SafeCommandStore
NodeTimeService time();
CommandStores.RangesForEpoch ranges();
Timestamp maxConflict(Seekables<?, ?> keys, Ranges slice);
+ void registerHistoricalTransactions(Deps deps);
default long latestEpoch()
{
diff --git a/accord-core/src/main/java/accord/local/SaveStatus.java
b/accord-core/src/main/java/accord/local/SaveStatus.java
index 1b2f5a31..ec0b4eb9 100644
--- a/accord-core/src/main/java/accord/local/SaveStatus.java
+++ b/accord-core/src/main/java/accord/local/SaveStatus.java
@@ -74,6 +74,18 @@ public enum SaveStatus
return this.status.compareTo(status) >= 0;
}
+ public boolean isComplete()
+ {
+ switch (this)
+ {
+ case Applied:
+ case Invalidated:
+ return true;
+ default:
+ return false;
+ }
+ }
+
// TODO (expected, testing): exhaustive testing, particularly around
PreCommitted
public static SaveStatus get(Status status, Known known)
{
diff --git a/accord-core/src/main/java/accord/primitives/Ranges.java
b/accord-core/src/main/java/accord/primitives/Ranges.java
index 9a560311..4badc911 100644
--- a/accord-core/src/main/java/accord/primitives/Ranges.java
+++ b/accord-core/src/main/java/accord/primitives/Ranges.java
@@ -22,8 +22,15 @@ import accord.api.RoutingKey;
import accord.utils.ArrayBuffers.ObjectBuffers;
import javax.annotation.Nonnull;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Predicate;
import java.util.stream.Stream;
+import com.google.common.collect.ImmutableMap;
+
import static accord.primitives.AbstractRanges.UnionMode.MERGE_OVERLAPPING;
import static accord.primitives.Routables.Slice.Overlapping;
import static accord.utils.ArrayBuffers.cachedRanges;
@@ -227,4 +234,17 @@ public class Ranges extends AbstractRanges<Ranges>
implements Iterable<Range>, S
return construct(cachedRanges.completeAndDiscard(result, count));
}
+ public Map<Boolean, Ranges> partitioningBy(Predicate<? super Range> test)
+ {
+ if (isEmpty())
+ return Collections.emptyMap();
+ List<Range> trues = new ArrayList<>();
+ List<Range> falses = new ArrayList<>();
+ for (Range range : this)
+ (test.test(range) ? trues : falses).add(range);
+ if (trues.isEmpty()) return ImmutableMap.of(Boolean.FALSE, this);
+ if (falses.isEmpty()) return ImmutableMap.of(Boolean.TRUE, this);
+ return ImmutableMap.of(Boolean.TRUE,
Ranges.ofSortedAndDeoverlapped(trues.toArray(new Range[0])),
+ Boolean.FALSE,
Ranges.ofSortedAndDeoverlapped(falses.toArray(new Range[0])));
+ }
}
diff --git a/accord-core/src/main/java/accord/primitives/Timestamp.java
b/accord-core/src/main/java/accord/primitives/Timestamp.java
index 13bc2a55..1a5a44e4 100644
--- a/accord-core/src/main/java/accord/primitives/Timestamp.java
+++ b/accord-core/src/main/java/accord/primitives/Timestamp.java
@@ -345,4 +345,14 @@ public class Timestamp implements Comparable<Timestamp>
{
return "[" + epoch() + ',' + hlc() + ',' + flags() + ',' + node + ']';
}
+
+ public static Timestamp fromString(String string)
+ {
+ String[] split = string.replaceFirst("\\[", "").replaceFirst("\\]",
"").split(",");
+ assert split.length == 4;
+ return Timestamp.fromValues(Long.parseLong(split[0]),
+ Long.parseLong(split[1]),
+ Integer.parseInt(split[2]),
+ new Id(Integer.parseInt(split[3])));
+ }
}
diff --git a/accord-core/src/main/java/accord/topology/Topologies.java
b/accord-core/src/main/java/accord/topology/Topologies.java
index fef92829..5832cce0 100644
--- a/accord-core/src/main/java/accord/topology/Topologies.java
+++ b/accord-core/src/main/java/accord/topology/Topologies.java
@@ -51,6 +51,11 @@ public interface Topologies extends TopologySorter
int size();
+ default boolean isEmpty()
+ {
+ return size() == 0;
+ }
+
int totalShards();
boolean contains(Id to);
diff --git a/accord-core/src/main/java/accord/topology/Topology.java
b/accord-core/src/main/java/accord/topology/Topology.java
index f0c5e79b..728553c1 100644
--- a/accord-core/src/main/java/accord/topology/Topology.java
+++ b/accord-core/src/main/java/accord/topology/Topology.java
@@ -193,24 +193,24 @@ public class Topology
return Arrays.binarySearch(supersetIndexes, i);
}
- public Topology forSelection(Unseekables<?, ?> select)
+ public Topology forSelection(Unseekables<?, ?> select, OnUnknown onUnknown)
{
- return forSelection(select, (ignore, index) -> true, null);
+ return forSelection(select, onUnknown, (ignore, index) -> true, null);
}
- public <P1> Topology forSelection(Unseekables<?, ?> select,
IndexedPredicate<P1> predicate, P1 param)
+ public <P1> Topology forSelection(Unseekables<?, ?> select, OnUnknown
onUnknown, IndexedPredicate<P1> predicate, P1 param)
{
- return forSubset(subsetFor(select, predicate, param));
+ return forSubset(subsetFor(select, predicate, param, onUnknown));
}
- public Topology forSelection(Unseekables<?, ?> select, Collection<Id>
nodes)
+ public Topology forSelection(Unseekables<?, ?> select, OnUnknown
onUnknown, Collection<Id> nodes)
{
- return forSelection(select, nodes, (ignore, index) -> true, null);
+ return forSelection(select, onUnknown, nodes, (ignore, index) -> true,
null);
}
- public <P1> Topology forSelection(Unseekables<?, ?> select, Collection<Id>
nodes, IndexedPredicate<P1> predicate, P1 param)
+ public <P1> Topology forSelection(Unseekables<?, ?> select, OnUnknown
onUnknown, Collection<Id> nodes, IndexedPredicate<P1> predicate, P1 param)
{
- return forSubset(subsetFor(select, predicate, param), nodes);
+ return forSubset(subsetFor(select, predicate, param, onUnknown),
nodes);
}
private Topology forSubset(int[] newSubset)
@@ -236,7 +236,9 @@ public class Topology
return new Topology(epoch, shards, ranges, nodeLookup, rangeSubset,
newSubset);
}
- private <P1> int[] subsetFor(Unseekables<?, ?> select,
IndexedPredicate<P1> predicate, P1 param)
+ public enum OnUnknown { REJECT, IGNORE }
+
+ private <P1> int[] subsetFor(Unseekables<?, ?> select,
IndexedPredicate<P1> predicate, P1 param, OnUnknown onUnknown)
{
int count = 0;
IntBuffers cachedInts = ArrayBuffers.cachedInts();
@@ -258,16 +260,35 @@ public class Topology
if (abi < 0)
{
if (ailim < as.size())
- throw new IllegalArgumentException("Range not
found for " + as.get(ailim));
+ {
+ switch (onUnknown)
+ {
+ case REJECT: throw new
IllegalArgumentException("Range not found for " + as.get(ailim));
+ case IGNORE:
+ break;
+ default:
+ throw new
IllegalArgumentException("Unknown option: " + onUnknown);
+ }
+ }
break;
}
ai = (int)abi;
+ boolean skip = false;
if (ailim < ai)
- throw new IllegalArgumentException("Range not found
for " + as.get(ailim));
+ {
+ switch (onUnknown)
+ {
+ default:
+ throw new IllegalArgumentException("Unknown
option: " + onUnknown);
+ case REJECT: throw new
IllegalArgumentException("Range not found for " + as.get(ailim));
+ case IGNORE:
+ skip = true;
+ }
+ }
bi = (int)(abi >>> 32);
- if (predicate.test(param, bi))
+ if (!skip && predicate.test(param, bi))
{
if (count == newSubset.length)
newSubset = cachedInts.resize(newSubset, count,
count * 2);
@@ -305,14 +326,14 @@ public class Topology
return cachedInts.completeAndDiscard(newSubset, count);
}
- public <P1> void visitNodeForKeysOnceOrMore(Unseekables<?, ?> select,
Consumer<Id> nodes)
+ public <P1> void visitNodeForKeysOnceOrMore(Unseekables<?, ?> select,
OnUnknown onUnknown, Consumer<Id> nodes)
{
- visitNodeForKeysOnceOrMore(select, (i1, i2) -> true, null, nodes);
+ visitNodeForKeysOnceOrMore(select, onUnknown, (i1, i2) -> true, null,
nodes);
}
- public <P1> void visitNodeForKeysOnceOrMore(Unseekables<?, ?> select,
IndexedPredicate<P1> predicate, P1 param, Consumer<Id> nodes)
+ public <P1> void visitNodeForKeysOnceOrMore(Unseekables<?, ?> select,
OnUnknown onUnknown, IndexedPredicate<P1> predicate, P1 param, Consumer<Id>
nodes)
{
- for (int shardIndex : subsetFor(select, predicate, param))
+ for (int shardIndex : subsetFor(select, predicate, param, onUnknown))
{
Shard shard = shards[shardIndex];
for (Id id : shard.nodes)
diff --git a/accord-core/src/main/java/accord/topology/TopologyManager.java
b/accord-core/src/main/java/accord/topology/TopologyManager.java
index 0fcdc8fd..aea8c535 100644
--- a/accord-core/src/main/java/accord/topology/TopologyManager.java
+++ b/accord-core/src/main/java/accord/topology/TopologyManager.java
@@ -230,16 +230,20 @@ public class TopologyManager
{
Epochs current = epochs;
- checkArgument(topology.epoch == current.nextEpoch());
+ checkArgument(topology.epoch == current.nextEpoch(), "Expected
topology update %d to be %d", topology.epoch, current.nextEpoch());
EpochState[] nextEpochs = new EpochState[current.epochs.length + 1];
List<Set<Id>> pendingSync = new
ArrayList<>(current.pendingSyncComplete);
Set<Id> alreadySyncd = Collections.emptySet();
if (!pendingSync.isEmpty())
{
- EpochState currentEpoch = current.epochs[0];
- if (current.epochs[0].syncComplete())
- currentEpoch.markPrevSynced();
- alreadySyncd = pendingSync.remove(0);
+ // if empty, then notified about an epoch from a peer before first
epoch seen
+ if (current.epochs.length != 0)
+ {
+ EpochState currentEpoch = current.epochs[0];
+ if (currentEpoch.syncComplete())
+ currentEpoch.markPrevSynced();
+ alreadySyncd = pendingSync.remove(0);
+ }
}
System.arraycopy(current.epochs, 0, nextEpochs, 1,
current.epochs.length);
@@ -308,15 +312,15 @@ public class TopologyManager
public Topologies withUnsyncedEpochs(Unseekables<?, ?> select, long
minEpoch, long maxEpoch)
{
- Invariants.checkArgument(minEpoch <= maxEpoch);
+ Invariants.checkArgument(minEpoch <= maxEpoch, "min epoch %d > max
%d", minEpoch, maxEpoch);
Epochs snapshot = epochs;
if (maxEpoch == Long.MAX_VALUE) maxEpoch = snapshot.currentEpoch;
- else Invariants.checkState(snapshot.currentEpoch >= maxEpoch);
+ else Invariants.checkState(snapshot.currentEpoch >= maxEpoch, "current
epoch %d < max %d", snapshot.currentEpoch, maxEpoch);
EpochState maxEpochState = nonNull(snapshot.get(maxEpoch));
if (minEpoch == maxEpoch && maxEpochState.syncCompleteFor(select))
- return new Single(sorter,
maxEpochState.global.forSelection(select));
+ return new Single(sorter,
maxEpochState.global.forSelection(select, Topology.OnUnknown.REJECT));
int start = (int)(snapshot.currentEpoch - maxEpoch);
int limit = (int)(Math.min(1 + snapshot.currentEpoch - minEpoch,
snapshot.epochs.length));
@@ -334,20 +338,22 @@ public class TopologyManager
{
EpochState epochState = snapshot.epochs[i];
if (epochState.epoch() < minEpoch)
- epochState.global.visitNodeForKeysOnceOrMore(select,
EpochState::shardIsUnsynced, epochState, nodes::add);
+ epochState.global.visitNodeForKeysOnceOrMore(select,
Topology.OnUnknown.IGNORE, EpochState::shardIsUnsynced, epochState, nodes::add);
else
- epochState.global.visitNodeForKeysOnceOrMore(select,
nodes::add);
+ epochState.global.visitNodeForKeysOnceOrMore(select,
Topology.OnUnknown.IGNORE, nodes::add);
}
+ Invariants.checkState(!nodes.isEmpty(), "Unable to find an epoch that
contained %s", select);
Topologies.Multi topologies = new Topologies.Multi(sorter, count);
for (int i = start; i < limit ; ++i)
{
EpochState epochState = snapshot.epochs[i];
if (epochState.epoch() < minEpoch)
- topologies.add(epochState.global.forSelection(select, nodes,
EpochState::shardIsUnsynced, epochState));
+ topologies.add(epochState.global.forSelection(select,
Topology.OnUnknown.IGNORE, nodes, EpochState::shardIsUnsynced, epochState));
else
- topologies.add(epochState.global.forSelection(select, nodes,
(ignore, idx) -> true, null));
+ topologies.add(epochState.global.forSelection(select,
Topology.OnUnknown.IGNORE, nodes, (ignore, idx) -> true, null));
}
+ Invariants.checkState(!topologies.isEmpty(), "Unable to find an epoch
that contained %s", select);
return topologies;
}
@@ -357,16 +363,18 @@ public class TopologyManager
Epochs snapshot = epochs;
if (minEpoch == maxEpoch)
- return new Single(sorter,
snapshot.get(minEpoch).global.forSelection(keys));
+ return new Single(sorter,
snapshot.get(minEpoch).global.forSelection(keys, Topology.OnUnknown.REJECT));
Set<Id> nodes = new LinkedHashSet<>();
int count = (int)(1 + maxEpoch - minEpoch);
for (int i = count - 1 ; i >= 0 ; --i)
- snapshot.get(minEpoch +
i).global().visitNodeForKeysOnceOrMore(keys, nodes::add);
+ snapshot.get(minEpoch +
i).global().visitNodeForKeysOnceOrMore(keys, Topology.OnUnknown.IGNORE,
nodes::add);
+ Invariants.checkState(!nodes.isEmpty(), "Unable to find an epoch that
contained %s", keys);
Topologies.Multi topologies = new Topologies.Multi(sorter, count);
for (int i = count - 1 ; i >= 0 ; --i)
- topologies.add(snapshot.get(minEpoch +
i).global.forSelection(keys, nodes));
+ topologies.add(snapshot.get(minEpoch +
i).global.forSelection(keys, Topology.OnUnknown.IGNORE, nodes));
+ Invariants.checkState(!topologies.isEmpty(), "Unable to find an epoch
that contained %s", keys);
return topologies;
}
@@ -374,7 +382,7 @@ public class TopologyManager
public Topologies forEpoch(Unseekables<?, ?> select, long epoch)
{
EpochState state = epochs.get(epoch);
- return new Single(sorter, state.global.forSelection(select));
+ return new Single(sorter, state.global.forSelection(select,
Topology.OnUnknown.REJECT));
}
public Shard forEpochIfKnown(RoutingKey key, long epoch)
diff --git a/accord-core/src/main/java/accord/utils/async/AsyncChains.java
b/accord-core/src/main/java/accord/utils/async/AsyncChains.java
index 809ca1c7..4c3e7eb4 100644
--- a/accord-core/src/main/java/accord/utils/async/AsyncChains.java
+++ b/accord-core/src/main/java/accord/utils/async/AsyncChains.java
@@ -462,7 +462,7 @@ public abstract class AsyncChains<V> implements
AsyncChain<V>
public static <V> AsyncChain<V> reduce(List<? extends AsyncChain<? extends
V>> chains, BiFunction<V, V, V> reducer)
{
- Invariants.checkArgument(!chains.isEmpty());
+ Invariants.checkArgument(!chains.isEmpty(), "List of chains is empty");
if (chains.size() == 1)
return (AsyncChain<V>) chains.get(0);
if (Reduce.canAppendTo(chains.get(0), reducer))
diff --git a/accord-core/src/main/java/accord/utils/async/Observable.java
b/accord-core/src/main/java/accord/utils/async/Observable.java
new file mode 100644
index 00000000..c87f12e7
--- /dev/null
+++ b/accord-core/src/main/java/accord/utils/async/Observable.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.utils.async;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.Collector;
+import java.util.stream.Collectors;
+
+/**
+ * Stream like interface that is "pushed" results (to the {@link
#onNext(Object)} method). This interface is similar,
+ * yet different from {@link AsyncChain} as that type works with a single
element, whereas this type works with 0-n.
+ */
+public interface Observable<T>
+{
+ void onNext(T value) throws Exception;
+
+ void onError(Throwable t);
+
+ void onCompleted();
+
+ default <R> Observable<R> map(Function<? super R, ? extends T> mapper)
+ {
+ Observable<T> self = this;
+ // since this project still targets jdk8, can't create private
classes, so to avoid adding types to the public api,
+ // use ananomus classes.
+ return new Observable<R>()
+ {
+ @Override
+ public void onNext(R value) throws Exception
+ {
+ self.onNext(mapper.apply(value));
+ }
+
+ @Override
+ public void onError(Throwable t)
+ {
+ self.onError(t);
+ }
+
+ @Override
+ public void onCompleted()
+ {
+ self.onCompleted();
+ }
+ };
+ }
+
+ static <T> Observable<T> distinct(Observable<T> callback)
+ {
+ return new Observable<T>()
+ {
+ Set<T> keys = new HashSet<>();
+
+ @Override
+ public void onNext(T value) throws Exception
+ {
+ if (keys.add(value))
+ callback.onNext(value);
+ }
+
+ @Override
+ public void onError(Throwable t)
+ {
+ keys.clear();
+ keys = null;
+ callback.onError(t);
+ }
+
+ @Override
+ public void onCompleted()
+ {
+ keys.clear();
+ keys = null;
+ callback.onCompleted();
+ }
+ };
+ }
+
+ static <T> Observable<T> forCallback(BiConsumer<? super List<T>,
Throwable> callback)
+ {
+ return forCallback(callback, Collectors.toList());
+ }
+
+ static <T, Result, Accumulator> Observable<T> forCallback(BiConsumer<?
super Result, Throwable> callback,
+ Collector<?
super T, Accumulator, Result> collector)
+ {
+ return new Observable<T>()
+ {
+ Accumulator values = collector.supplier().get();
+
+ @Override
+ public void onNext(T value)
+ {
+ collector.accumulator().accept(values, value);
+ }
+
+ @Override
+ public void onError(Throwable t)
+ {
+ values = null;
+ callback.accept(null, t);
+ }
+
+ @Override
+ public void onCompleted()
+ {
+ Result result = collector.finisher().apply(this.values);
+ this.values = null;
+ callback.accept(result, null);
+ }
+ };
+ }
+
+ static <A> AsyncChain<List<A>> asChain(Consumer<Observable<A>> work)
+ {
+ return asChain(work, Function.identity(), Collectors.toList());
+ }
+
+ static <A, Result, Accumulator> AsyncChain<Result>
asChain(Consumer<Observable<A>> work,
+ Collector<?
super A, Accumulator, Result> collector)
+ {
+ return asChain(work, Function.identity(), collector);
+ }
+
+ static <A, B, Result, Accumulator> AsyncChain<Result>
asChain(Consumer<Observable<A>> work,
+ Function<?
super A, ? extends B> mapper,
+ Collector<?
super B, Accumulator, Result> collector)
+ {
+ return new AsyncChains.Head<Result>()
+ {
+ @Override
+ protected void start(BiConsumer<? super Result, Throwable>
callback)
+ {
+ work.accept(forCallback(callback, collector).map(mapper));
+ }
+ };
+ }
+}
diff --git a/accord-core/src/test/java/accord/impl/mock/EpochSync.java
b/accord-core/src/test/java/accord/impl/mock/EpochSync.java
index 422c400c..0b4e1a50 100644
--- a/accord-core/src/test/java/accord/impl/mock/EpochSync.java
+++ b/accord-core/src/test/java/accord/impl/mock/EpochSync.java
@@ -102,7 +102,7 @@ public class EpochSync implements Runnable
public CommandSync(Node node, Route<?> route, SyncCommitted message,
Topology topology)
{
- this.tracker = new QuorumTracker(new
Single(node.topology().sorter(), topology.forSelection(route)));
+ this.tracker = new QuorumTracker(new
Single(node.topology().sorter(), topology.forSelection(route,
Topology.OnUnknown.REJECT)));
node.send(tracker.nodes(), message, this);
}
diff --git a/accord-core/src/test/java/accord/messages/PreAcceptTest.java
b/accord-core/src/test/java/accord/messages/PreAcceptTest.java
index 3bb0e488..b3a6994e 100644
--- a/accord-core/src/test/java/accord/messages/PreAcceptTest.java
+++ b/accord-core/src/test/java/accord/messages/PreAcceptTest.java
@@ -20,8 +20,8 @@ package accord.messages;
import accord.api.RoutingKey;
import accord.impl.*;
-import accord.impl.CommandsForKey.CommandLoader;
-import accord.impl.CommandsForKey.CommandTimeseries;
+import accord.impl.CommandTimeseries.CommandLoader;
+import accord.impl.CommandTimeseries;
import accord.impl.IntKey.Raw;
import accord.impl.mock.*;
import accord.local.Node;
diff --git a/accord-core/src/test/java/accord/topology/TopologyManagerTest.java
b/accord-core/src/test/java/accord/topology/TopologyManagerTest.java
index 3ab88af0..ec85d157 100644
--- a/accord-core/src/test/java/accord/topology/TopologyManagerTest.java
+++ b/accord-core/src/test/java/accord/topology/TopologyManagerTest.java
@@ -162,12 +162,12 @@ public class TopologyManagerTest
Assertions.assertFalse(service.getEpochStateUnsafe(2).syncComplete());
RoutingKeys keys = keys(150).toUnseekables();
- Assertions.assertEquals(topologies(topology2.forSelection(keys),
topology1.forSelection(keys)),
+ Assertions.assertEquals(topologies(topology2.forSelection(keys,
Topology.OnUnknown.REJECT), topology1.forSelection(keys,
Topology.OnUnknown.REJECT)),
service.withUnsyncedEpochs(keys, 2, 2));
service.onEpochSyncComplete(id(2), 2);
service.onEpochSyncComplete(id(3), 2);
- Assertions.assertEquals(topologies(topology2.forSelection(keys)),
+ Assertions.assertEquals(topologies(topology2.forSelection(keys,
Topology.OnUnknown.REJECT)),
service.withUnsyncedEpochs(keys, 2, 2));
}
diff --git a/accord-core/src/test/java/accord/topology/TopologyTest.java
b/accord-core/src/test/java/accord/topology/TopologyTest.java
index 6631c15b..24b663e1 100644
--- a/accord-core/src/test/java/accord/topology/TopologyTest.java
+++ b/accord-core/src/test/java/accord/topology/TopologyTest.java
@@ -44,7 +44,7 @@ public class TopologyTest
Assertions.assertTrue(shard.range.contains(expectedKey));
Assertions.assertEquals(expectedRange, shard.range);
- Topology subTopology =
topology.forSelection(Keys.of(expectedKey).toUnseekables());
+ Topology subTopology =
topology.forSelection(Keys.of(expectedKey).toUnseekables(),
Topology.OnUnknown.REJECT);
shard = Iterables.getOnlyElement(subTopology.shards());
Assertions.assertTrue(shard.range.contains(expectedKey));
Assertions.assertEquals(expectedRange, shard.range);
diff --git a/accord-maelstrom/src/test/java/accord/maelstrom/Runner.java
b/accord-maelstrom/src/test/java/accord/maelstrom/Runner.java
index 70b120b7..ab0053d8 100644
--- a/accord-maelstrom/src/test/java/accord/maelstrom/Runner.java
+++ b/accord-maelstrom/src/test/java/accord/maelstrom/Runner.java
@@ -185,7 +185,14 @@ public class Runner
{
logger.info("Seed {}; rerun with -D{}={}", seed, key, seed);
RandomSource randomSource = new DefaultRandom(seed);
- Runner.run(nodeCount, new StandardQueue.Factory(randomSource),
randomSource::fork, factory, commands);
+ try
+ {
+ Runner.run(nodeCount, new StandardQueue.Factory(randomSource),
randomSource::fork, factory, commands);
+ }
+ catch (Throwable t)
+ {
+ throw new AssertionError("Failure for seed " + seed, t);
+ }
}
}
}
diff --git
a/accord-maelstrom/src/test/java/accord/maelstrom/SimpleRandomTest.java
b/accord-maelstrom/src/test/java/accord/maelstrom/SimpleRandomTest.java
index e7772f75..a65fd234 100644
--- a/accord-maelstrom/src/test/java/accord/maelstrom/SimpleRandomTest.java
+++ b/accord-maelstrom/src/test/java/accord/maelstrom/SimpleRandomTest.java
@@ -24,6 +24,7 @@ import org.junit.jupiter.api.Test;
import static accord.maelstrom.Runner.test;
+// TODO (correctness) : if you run the tests with the same seed, you get
different outcomes... this makes it hard to rerun a failure found from CI
public class SimpleRandomTest
{
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]