This is an automated email from the ASF dual-hosted git repository.
apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new ea9d9e82a9 IGNITE-20300 Metastorage command reordering wrt Safe Time
on Raft Group entry (#2513)
ea9d9e82a9 is described below
commit ea9d9e82a93cd11675acb50513f9efb5f98fbe07
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Wed Aug 30 14:28:41 2023 +0400
IGNITE-20300 Metastorage command reordering wrt Safe Time on Raft Group
entry (#2513)
---
.../command/MetaStorageWriteCommand.java | 4 +-
.../server/raft/MetaStorageListener.java | 3 +-
.../server/raft/MetaStorageWriteHandler.java | 5 +--
.../impl/StandaloneMetaStorageManager.java | 5 ++-
.../internal/raft/service/BeforeApplyHandler.java | 43 ++++++++++++++++++++++
.../internal/raft/service/RaftGroupListener.java | 12 ------
.../jraft/rpc/impl/ActionRequestProcessor.java | 38 ++++++++++++++++---
7 files changed, 85 insertions(+), 25 deletions(-)
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/MetaStorageWriteCommand.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/MetaStorageWriteCommand.java
index 752713d331..7f240ffea2 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/MetaStorageWriteCommand.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/MetaStorageWriteCommand.java
@@ -23,7 +23,7 @@ import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.raft.Command;
import org.apache.ignite.internal.raft.WriteCommand;
-import org.apache.ignite.internal.raft.service.RaftGroupListener;
+import org.apache.ignite.internal.raft.service.BeforeApplyHandler;
import org.apache.ignite.network.annotations.WithSetter;
/** Base meta storage write command. */
@@ -44,7 +44,7 @@ public interface MetaStorageWriteCommand extends WriteCommand
{
* This is a dirty hack. This time is set by the leader node to
disseminate new safe time across
* followers and learners. Leader of the ms group reads {@link
#initiatorTime()}, adjusts its clock
* and sets safeTime as {@link HybridClock#now()} as safeTime here. This
must be done before
- * command is saved into the Raft log (see {@link
RaftGroupListener#onBeforeApply(Command)}.
+ * command is saved into the Raft log (see {@link
BeforeApplyHandler#onBeforeApply(Command)}.
*/
@WithSetter
long safeTimeLong();
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
index ab7db8bfbc..d3b5043d97 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
@@ -39,6 +39,7 @@ import
org.apache.ignite.internal.metastorage.server.time.ClusterTimeImpl;
import org.apache.ignite.internal.raft.Command;
import org.apache.ignite.internal.raft.ReadCommand;
import org.apache.ignite.internal.raft.WriteCommand;
+import org.apache.ignite.internal.raft.service.BeforeApplyHandler;
import org.apache.ignite.internal.raft.service.CommandClosure;
import org.apache.ignite.internal.raft.service.RaftGroupListener;
import org.apache.ignite.internal.util.Cursor;
@@ -48,7 +49,7 @@ import org.jetbrains.annotations.Nullable;
* Meta storage listener.
* TODO: IGNITE-14693 Implement Meta storage exception handling logic.
*/
-public class MetaStorageListener implements RaftGroupListener {
+public class MetaStorageListener implements RaftGroupListener,
BeforeApplyHandler {
private final MetaStorageWriteHandler writeHandler;
/** Storage. */
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
index a8f98b58ba..6d06f8aa4e 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
@@ -291,10 +291,7 @@ public class MetaStorageWriteHandler {
}
}
- // TODO: IGNITE-20290 - This is insufficient, we must do this in single
thread before saving the command to the RAFT log.
- // Synchronized to make sure no reodering happens as
RaftGroupListener#beforeApply() might be invoked in different threads
- // for different commands.
- synchronized void beforeApply(Command command) {
+ void beforeApply(Command command) {
if (command instanceof MetaStorageWriteCommand) {
// Initiator sends us a timestamp to adjust to.
// Alter command by setting safe time based on the adjusted clock.
diff --git
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/impl/StandaloneMetaStorageManager.java
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/impl/StandaloneMetaStorageManager.java
index 2801ff77cf..a36d3c9467 100644
---
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/impl/StandaloneMetaStorageManager.java
+++
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/impl/StandaloneMetaStorageManager.java
@@ -39,6 +39,7 @@ import org.apache.ignite.internal.raft.ReadCommand;
import org.apache.ignite.internal.raft.WriteCommand;
import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService;
import
org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
+import org.apache.ignite.internal.raft.service.BeforeApplyHandler;
import org.apache.ignite.internal.raft.service.CommandClosure;
import org.apache.ignite.internal.raft.service.RaftGroupListener;
import org.apache.ignite.internal.vault.VaultManager;
@@ -164,7 +165,9 @@ public class StandaloneMetaStorageManager extends
MetaStorageManagerImpl {
Command command = invocation.getArgument(0);
RaftGroupListener listener = listenerCaptor.getValue();
- listener.onBeforeApply(command);
+ if (listener instanceof BeforeApplyHandler) {
+ ((BeforeApplyHandler) listener).onBeforeApply(command);
+ }
return runCommand(command, listener);
});
diff --git
a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/BeforeApplyHandler.java
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/BeforeApplyHandler.java
new file mode 100644
index 0000000000..c8fd7ca1b4
--- /dev/null
+++
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/BeforeApplyHandler.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft.service;
+
+import org.apache.ignite.internal.raft.Command;
+import org.apache.ignite.internal.raft.ReadCommand;
+import org.apache.ignite.internal.raft.WriteCommand;
+
+/**
+ * Handles 'before apply': that is, before a RAFT command is accepted by a
RAFT leader for processing,
+ * executes some customization on the command.
+ *
+ * <p>For {@link WriteCommand}s, {@link #onBeforeApply(Command)} is executed
atomically with accepting the command: that is,
+ * before-apply/accept of one command cannot intermingle with
before-apply/accept of other commands.
+ *
+ * <p>For {@link ReadCommand}s, no atomicity guarantees are provided.
+ */
+@SuppressWarnings("InterfaceMayBeAnnotatedFunctional")
+public interface BeforeApplyHandler {
+ /**
+ * Invoked on a leader before submitting a command to a raft group.
+ * If a command must be changed before saving to raft log,
+ * this is a place to do it.
+ *
+ * @param command The command.
+ */
+ void onBeforeApply(Command command);
+}
diff --git
a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/RaftGroupListener.java
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/RaftGroupListener.java
index a024710203..212a34807c 100644
---
a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/RaftGroupListener.java
+++
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/RaftGroupListener.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.raft.service;
import java.nio.file.Path;
import java.util.Iterator;
import java.util.function.Consumer;
-import org.apache.ignite.internal.raft.Command;
import org.apache.ignite.internal.raft.ReadCommand;
import org.apache.ignite.internal.raft.WriteCommand;
@@ -82,15 +81,4 @@ public interface RaftGroupListener {
* Invoked once after a raft node has been shut down.
*/
void onShutdown();
-
- /**
- * Invoked on a leader before submitting a command to a raft group.
- * If a command must be changed before saving to raft log,
- * this is a place to do it.
- *
- * @param command The command.
- */
- default void onBeforeApply(Command command) {
- // No-op.
- }
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/ActionRequestProcessor.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/ActionRequestProcessor.java
index ef1307f660..cd320381b1 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/ActionRequestProcessor.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/ActionRequestProcessor.java
@@ -19,6 +19,8 @@ package org.apache.ignite.raft.jraft.rpc.impl;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
@@ -26,6 +28,8 @@ import
org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
import org.apache.ignite.internal.raft.Command;
import org.apache.ignite.internal.raft.ReadCommand;
import org.apache.ignite.internal.raft.WriteCommand;
+import
org.apache.ignite.internal.raft.server.impl.JraftServerImpl.DelegatingStateMachine;
+import org.apache.ignite.internal.raft.service.BeforeApplyHandler;
import org.apache.ignite.internal.raft.service.CommandClosure;
import org.apache.ignite.raft.jraft.Closure;
import org.apache.ignite.raft.jraft.Node;
@@ -41,7 +45,8 @@ import org.apache.ignite.raft.jraft.rpc.RaftRpcFactory;
import org.apache.ignite.raft.jraft.rpc.RpcContext;
import org.apache.ignite.raft.jraft.rpc.RpcProcessor;
import org.apache.ignite.raft.jraft.rpc.RpcRequests;
-import org.apache.ignite.raft.jraft.util.BytesUtil;import
org.apache.ignite.raft.jraft.util.Marshaller;
+import org.apache.ignite.raft.jraft.util.BytesUtil;
+import org.apache.ignite.raft.jraft.util.Marshaller;
/**
* Process action request.
@@ -53,6 +58,12 @@ public class ActionRequestProcessor implements
RpcProcessor<ActionRequest> {
private final RaftMessagesFactory factory;
+ /**
+ * Mapping from group IDs to monitors used to synchronized on (only used
when
+ * RaftGroupListener instance implements {@link BeforeApplyHandler} and the
command is a write command.
+ */
+ private final Map<String, Object> groupIdsToMonitors = new
ConcurrentHashMap<>();
+
public ActionRequestProcessor(Executor executor, RaftMessagesFactory
factory) {
this.executor = executor;
this.factory = factory;
@@ -71,15 +82,32 @@ public class ActionRequestProcessor implements
RpcProcessor<ActionRequest> {
JraftServerImpl.DelegatingStateMachine fsm =
(JraftServerImpl.DelegatingStateMachine) node.getOptions().getFsm();
- // Apply a filter before committing to STM.
- fsm.getListener().onBeforeApply(request.command());
-
if (request.command() instanceof WriteCommand) {
- applyWrite(node, request, rpcCtx);
+ if (fsm.getListener() instanceof BeforeApplyHandler) {
+ synchronized (groupIdSyncMonitor(request.groupId())) {
+ callOnBeforeApply(request, fsm);
+ applyWrite(node, request, rpcCtx);
+ }
+ } else {
+ applyWrite(node, request, rpcCtx);
+ }
} else {
+ if (fsm.getListener() instanceof BeforeApplyHandler) {
+ callOnBeforeApply(request, fsm);
+ }
+
applyRead(node, request, rpcCtx);
}
}
+ private static void callOnBeforeApply(ActionRequest request,
DelegatingStateMachine fsm) {
+ ((BeforeApplyHandler)
fsm.getListener()).onBeforeApply(request.command());
+ }
+
+ private Object groupIdSyncMonitor(String groupId) {
+ assert groupId != null;
+
+ return groupIdsToMonitors.computeIfAbsent(groupId, k -> groupId);
+ }
/**
* @param node The node.