This is an automated email from the ASF dual-hosted git repository.
jlewandowski pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
The following commit(s) were added to refs/heads/trunk by this push:
new 5ffe3d50 CEP-15: Add Accord configuration stub
5ffe3d50 is described below
commit 5ffe3d504bb5aa1ff1c2b96d817791e40f7ced0f
Author: Jacek Lewandowski <[email protected]>
AuthorDate: Mon Oct 9 14:56:57 2023 +0200
CEP-15: Add Accord configuration stub
Patch by Jacek Lewandowski; reviewed by David Capwell for CASSANDRA-18221
---
.../src/main/java/accord/config/LocalConfig.java | 29 ++++++++++++++++
.../java/accord/config/MutableLocalConfig.java | 40 ++++++++++++++++++++++
.../main/java/accord/impl/SimpleProgressLog.java | 4 ++-
accord-core/src/main/java/accord/local/Node.java | 10 +++++-
accord-core/src/test/java/accord/Utils.java | 6 +++-
.../src/test/java/accord/impl/basic/Cluster.java | 5 ++-
.../test/java/accord/impl/mock/MockCluster.java | 6 +++-
.../java/accord/local/ImmutableCommandTest.java | 5 ++-
.../src/main/java/accord/maelstrom/Cluster.java | 5 ++-
.../src/main/java/accord/maelstrom/Main.java | 5 ++-
10 files changed, 107 insertions(+), 8 deletions(-)
diff --git a/accord-core/src/main/java/accord/config/LocalConfig.java
b/accord-core/src/main/java/accord/config/LocalConfig.java
new file mode 100644
index 00000000..0ced3b98
--- /dev/null
+++ b/accord-core/src/main/java/accord/config/LocalConfig.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.config;
+
+import java.time.Duration;
+
+public interface LocalConfig
+{
+ default Duration getProgressLogScheduleDelay()
+ {
+ return Duration.ofSeconds(1);
+ }
+}
diff --git a/accord-core/src/main/java/accord/config/MutableLocalConfig.java
b/accord-core/src/main/java/accord/config/MutableLocalConfig.java
new file mode 100644
index 00000000..f3c42782
--- /dev/null
+++ b/accord-core/src/main/java/accord/config/MutableLocalConfig.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.config;
+
+import java.time.Duration;
+
+import com.google.common.annotations.VisibleForTesting;
+
+@VisibleForTesting
+public class MutableLocalConfig implements LocalConfig
+{
+ private volatile Duration progressLogScheduleDelay =
LocalConfig.super.getProgressLogScheduleDelay();
+
+ public void setProgressLogScheduleDelay(Duration progressLogScheduleDelay)
+ {
+ this.progressLogScheduleDelay = progressLogScheduleDelay;
+ }
+
+ @Override
+ public Duration getProgressLogScheduleDelay()
+ {
+ return progressLogScheduleDelay;
+ }
+}
diff --git a/accord-core/src/main/java/accord/impl/SimpleProgressLog.java
b/accord-core/src/main/java/accord/impl/SimpleProgressLog.java
index 89291a1e..420d54db 100644
--- a/accord-core/src/main/java/accord/impl/SimpleProgressLog.java
+++ b/accord-core/src/main/java/accord/impl/SimpleProgressLog.java
@@ -18,6 +18,7 @@
package accord.impl;
+import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
@@ -679,7 +680,8 @@ public class SimpleProgressLog implements
ProgressLog.Factory
return;
isScheduled = true;
- node.scheduler().once(() -> commandStore.execute(empty(), ignore
-> run()).begin(commandStore.agent()), 1L, TimeUnit.SECONDS);
+ Duration delay = node.localConfig().getProgressLogScheduleDelay();
+ node.scheduler().once(() -> commandStore.execute(empty(), ignore
-> run()).begin(commandStore.agent()), delay.toNanos(), TimeUnit.NANOSECONDS);
}
@Override
diff --git a/accord-core/src/main/java/accord/local/Node.java
b/accord-core/src/main/java/accord/local/Node.java
index 8e4237f1..f0ee2bd1 100644
--- a/accord-core/src/main/java/accord/local/Node.java
+++ b/accord-core/src/main/java/accord/local/Node.java
@@ -49,6 +49,7 @@ import accord.api.Result;
import accord.api.RoutingKey;
import accord.api.Scheduler;
import accord.api.TopologySorter;
+import accord.config.LocalConfig;
import accord.coordinate.CoordinateTransaction;
import accord.coordinate.MaybeRecover;
import accord.coordinate.Outcome;
@@ -145,6 +146,7 @@ public class Node implements ConfigurationService.Listener,
NodeTimeService
private final AtomicReference<Timestamp> now;
private final Agent agent;
private final RandomSource random;
+ private final LocalConfig localConfig;
// TODO (expected, consider): this really needs to be thought through some
more, as it needs to be per-instance in some cases, and per-node in others
private final Scheduler scheduler;
@@ -155,9 +157,10 @@ public class Node implements
ConfigurationService.Listener, NodeTimeService
public Node(Id id, MessageSink messageSink, LocalMessage.Handler
localMessageHandler,
ConfigurationService configService, LongSupplier nowSupplier,
ToLongFunction<TimeUnit> nowTimeUnit,
Supplier<DataStore> dataSupplier, ShardDistributor
shardDistributor, Agent agent, RandomSource random, Scheduler scheduler,
TopologySorter.Supplier topologySorter,
- Function<Node, ProgressLog.Factory> progressLogFactory,
CommandStores.Factory factory)
+ Function<Node, ProgressLog.Factory> progressLogFactory,
CommandStores.Factory factory, LocalConfig localConfig)
{
this.id = id;
+ this.localConfig = localConfig;
this.messageSink = messageSink;
this.localMessageHandler = localMessageHandler;
this.configService = configService;
@@ -173,6 +176,11 @@ public class Node implements
ConfigurationService.Listener, NodeTimeService
configService.registerListener(this);
}
+ public LocalConfig localConfig()
+ {
+ return localConfig;
+ }
+
/**
* This starts the node for tests and makes sure that the provided
topology is acknowledged correctly. This method is not
* safe for production systems as it doesn't handle restarts and partially
acknowledged histories
diff --git a/accord-core/src/test/java/accord/Utils.java
b/accord-core/src/test/java/accord/Utils.java
index 77a8bd81..6d42388c 100644
--- a/accord-core/src/test/java/accord/Utils.java
+++ b/accord-core/src/test/java/accord/Utils.java
@@ -27,6 +27,7 @@ import com.google.common.collect.Sets;
import accord.api.MessageSink;
import accord.api.Scheduler;
+import accord.config.LocalConfig;
import accord.impl.InMemoryCommandStores;
import accord.impl.IntKey;
import accord.impl.SimpleProgressLog;
@@ -38,6 +39,7 @@ import accord.impl.mock.MockStore;
import accord.local.Node;
import accord.local.NodeTimeService;
import accord.local.ShardDistributor;
+import accord.config.MutableLocalConfig;
import accord.messages.LocalMessage;
import accord.primitives.Keys;
import accord.primitives.Range;
@@ -140,6 +142,7 @@ public class Utils
{
MockStore store = new MockStore();
Scheduler scheduler = new ThreadPoolScheduler();
+ LocalConfig localConfig = new MutableLocalConfig();
Node node = new Node(nodeId,
messageSink,
LocalMessage::process,
@@ -153,7 +156,8 @@ public class Utils
scheduler,
SizeOfIntersectionSorter.SUPPLIER,
SimpleProgressLog::new,
- InMemoryCommandStores.Synchronized::new);
+ InMemoryCommandStores.Synchronized::new,
+ localConfig);
awaitUninterruptibly(node.unsafeStart());
return node;
}
diff --git a/accord-core/src/test/java/accord/impl/basic/Cluster.java
b/accord-core/src/test/java/accord/impl/basic/Cluster.java
index a7eff1cd..efeadfec 100644
--- a/accord-core/src/test/java/accord/impl/basic/Cluster.java
+++ b/accord-core/src/test/java/accord/impl/basic/Cluster.java
@@ -35,6 +35,7 @@ import java.util.function.Function;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
+import accord.config.LocalConfig;
import accord.impl.MessageListener;
import org.junit.jupiter.api.Assertions;
import org.slf4j.Logger;
@@ -55,6 +56,7 @@ import accord.local.Node;
import accord.local.Node.Id;
import accord.local.NodeTimeService;
import accord.local.ShardDistributor;
+import accord.config.MutableLocalConfig;
import accord.messages.LocalMessage;
import accord.messages.MessageType;
import accord.messages.Message;
@@ -229,11 +231,12 @@ public class Cluster implements Scheduler
MessageSink messageSink = sinks.create(id,
randomSupplier.get());
LongSupplier nowSupplier = nowSupplierSupplier.get();
BurnTestConfigurationService configService = new
BurnTestConfigurationService(id, executor, randomSupplier, topology,
lookup::get, topologyUpdates);
+ LocalConfig localConfig = new MutableLocalConfig();
Node node = new Node(id, messageSink, LocalMessage::process,
configService, nowSupplier, NodeTimeService.unixWrapper(TimeUnit.MILLISECONDS,
nowSupplier),
() -> new ListStore(id), new
ShardDistributor.EvenSplit<>(8, ignore -> new IntHashKey.Splitter()),
executor.agent(),
randomSupplier.get(), sinks,
SizeOfIntersectionSorter.SUPPLIER,
- SimpleProgressLog::new,
DelayedCommandStores.factory(sinks.pending));
+ SimpleProgressLog::new,
DelayedCommandStores.factory(sinks.pending), localConfig);
lookup.put(id, node);
CoordinateDurabilityScheduling durability = new
CoordinateDurabilityScheduling(node);
// TODO (desired): randomise
diff --git a/accord-core/src/test/java/accord/impl/mock/MockCluster.java
b/accord-core/src/test/java/accord/impl/mock/MockCluster.java
index fdd8a91b..4b3d198d 100644
--- a/accord-core/src/test/java/accord/impl/mock/MockCluster.java
+++ b/accord-core/src/test/java/accord/impl/mock/MockCluster.java
@@ -34,6 +34,7 @@ import org.slf4j.LoggerFactory;
import accord.NetworkFilter;
import accord.api.MessageSink;
+import accord.config.LocalConfig;
import accord.impl.InMemoryCommandStores;
import accord.impl.IntKey;
import accord.impl.SimpleProgressLog;
@@ -45,6 +46,7 @@ import accord.local.Node;
import accord.local.Node.Id;
import accord.local.NodeTimeService;
import accord.local.ShardDistributor;
+import accord.config.MutableLocalConfig;
import accord.messages.Callback;
import accord.messages.LocalMessage;
import accord.messages.Reply;
@@ -119,6 +121,7 @@ public class MockCluster implements Network, AutoCloseable,
Iterable<Node>
MockStore store = new MockStore();
MessageSink messageSink = messageSinkFactory.apply(id, this);
MockConfigurationService configurationService = new
MockConfigurationService(messageSink, onFetchTopology, topology);
+ LocalConfig localConfig = new MutableLocalConfig();
Node node = new Node(id,
messageSink,
LocalMessage::process,
@@ -132,7 +135,8 @@ public class MockCluster implements Network, AutoCloseable,
Iterable<Node>
new ThreadPoolScheduler(),
SizeOfIntersectionSorter.SUPPLIER,
SimpleProgressLog::new,
- InMemoryCommandStores.SingleThread::new);
+ InMemoryCommandStores.SingleThread::new,
+ localConfig);
awaitUninterruptibly(node.unsafeStart());
node.onTopologyUpdate(topology, true);
return node;
diff --git a/accord-core/src/test/java/accord/local/ImmutableCommandTest.java
b/accord-core/src/test/java/accord/local/ImmutableCommandTest.java
index 15061ea5..ef573d11 100644
--- a/accord-core/src/test/java/accord/local/ImmutableCommandTest.java
+++ b/accord-core/src/test/java/accord/local/ImmutableCommandTest.java
@@ -32,6 +32,7 @@ import accord.api.ProgressLog;
import accord.api.RoutingKey;
import accord.api.Scheduler;
import accord.api.TestableConfigurationService;
+import accord.config.LocalConfig;
import accord.impl.InMemoryCommandStore;
import accord.impl.InMemoryCommandStores;
import accord.impl.IntKey;
@@ -43,6 +44,7 @@ import accord.impl.mock.MockConfigurationService;
import accord.impl.mock.MockStore;
import accord.local.Node.Id;
import accord.local.SaveStatus.LocalExecution;
+import accord.config.MutableLocalConfig;
import accord.primitives.FullKeyRoute;
import accord.primitives.Keys;
import accord.primitives.Participants;
@@ -109,10 +111,11 @@ public class ImmutableCommandTest
private static Node createNode(Id id, CommandStoreSupport storeSupport)
{
MockCluster.Clock clock = new MockCluster.Clock(100);
+ LocalConfig localConfig = new MutableLocalConfig();
Node node = new Node(id, null, null, new
MockConfigurationService(null, (epoch, service) -> { },
storeSupport.local.get()),
clock,
NodeTimeService.unixWrapper(TimeUnit.MICROSECONDS, clock),
() -> storeSupport.data, new
ShardDistributor.EvenSplit(8, ignore -> new IntKey.Splitter()), new
TestAgent(), new DefaultRandom(), Scheduler.NEVER_RUN_SCHEDULED,
- SizeOfIntersectionSorter.SUPPLIER, ignore -> ignore2
-> new NoOpProgressLog(), InMemoryCommandStores.Synchronized::new);
+ SizeOfIntersectionSorter.SUPPLIER, ignore -> ignore2
-> new NoOpProgressLog(), InMemoryCommandStores.Synchronized::new, localConfig);
awaitUninterruptibly(node.unsafeStart());
node.onTopologyUpdate(storeSupport.local.get(), true);
return node;
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
b/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
index 1af7d4c2..0184765d 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
@@ -41,6 +41,8 @@ import java.util.function.Supplier;
import accord.api.MessageSink;
import accord.api.Scheduler;
+import accord.config.LocalConfig;
+import accord.config.MutableLocalConfig;
import accord.impl.InMemoryCommandStores;
import accord.impl.SimpleProgressLog;
import accord.impl.SizeOfIntersectionSorter;
@@ -314,12 +316,13 @@ public class Cluster implements Scheduler
{
MessageSink messageSink = sinks.create(node,
randomSupplier.get());
LongSupplier nowSupplier = nowSupplierSupplier.get();
+ LocalConfig localConfig = new MutableLocalConfig();
lookup.put(node, new Node(node, messageSink,
LocalMessage::process, new SimpleConfigService(topology),
nowSupplier,
NodeTimeService.unixWrapper(TimeUnit.MICROSECONDS, nowSupplier),
MaelstromStore::new, new
ShardDistributor.EvenSplit(8, ignore -> new MaelstromKey.Splitter()),
MaelstromAgent.INSTANCE,
randomSupplier.get(), sinks,
SizeOfIntersectionSorter.SUPPLIER,
- SimpleProgressLog::new,
InMemoryCommandStores.SingleThread::new));
+ SimpleProgressLog::new,
InMemoryCommandStores.SingleThread::new, localConfig));
}
AsyncResult<?> startup =
AsyncChains.reduce(lookup.values().stream().map(Node::unsafeStart).collect(toList()),
(a, b) -> null).beginAsResult();
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Main.java
b/accord-maelstrom/src/main/java/accord/maelstrom/Main.java
index 871123a3..40e73ff2 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/Main.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/Main.java
@@ -32,6 +32,8 @@ import java.util.function.Supplier;
import accord.api.MessageSink;
import accord.api.Scheduler;
+import accord.config.LocalConfig;
+import accord.config.MutableLocalConfig;
import accord.coordinate.Timeout;
import accord.impl.InMemoryCommandStores;
import accord.impl.SimpleProgressLog;
@@ -173,11 +175,12 @@ public class Main
MaelstromInit init = (MaelstromInit) packet.body;
topology = topologyFactory.toTopology(init.cluster);
sink = new StdoutSink(System::currentTimeMillis, scheduler, start,
init.self, out, err);
+ LocalConfig localConfig = new MutableLocalConfig();
on = new Node(init.self, sink, LocalMessage::process, new
SimpleConfigService(topology),
System::currentTimeMillis,
NodeTimeService.unixWrapper(TimeUnit.MILLISECONDS, System::currentTimeMillis),
MaelstromStore::new, new
ShardDistributor.EvenSplit(8, ignore -> new MaelstromKey.Splitter()),
MaelstromAgent.INSTANCE, new DefaultRandom(),
scheduler, SizeOfIntersectionSorter.SUPPLIER,
- SimpleProgressLog::new,
InMemoryCommandStores.SingleThread::new);
+ SimpleProgressLog::new,
InMemoryCommandStores.SingleThread::new, localConfig);
awaitUninterruptibly(on.unsafeStart());
err.println("Initialized node " + init.self);
err.flush();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]