This is an automated email from the ASF dual-hosted git repository.
tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 653cc4c8da IGNITE-20262 Refuse accepting partition Raft commands when
not enough schemas are available (#2484)
653cc4c8da is described below
commit 653cc4c8da89529ba81e7b457162fae1516c3d2b
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Fri Sep 1 17:00:28 2023 +0400
IGNITE-20262 Refuse accepting partition Raft commands when not enough
schemas are available (#2484)
---
.../java/org/apache/ignite/internal/raft/Loza.java | 11 ++
.../internal/raft/server/impl/JraftServerImpl.java | 18 ++-
...erceptor.java => ActionRequestInterceptor.java} | 16 +--
.../raft/jraft/rpc/impl/IgniteRpcServer.java | 5 +-
...ava => InterceptingActionRequestProcessor.java} | 34 +++--
.../rpc/impl/NullActionRequestInterceptor.java | 33 +++++
.../impl/core/AppendEntriesRequestInterceptor.java | 1 +
.../InterceptingAppendEntriesRequestProcessor.java | 6 +-
.../ignite/raft/jraft/rpc/TestIgniteRpcServer.java | 4 +-
.../org/apache/ignite/internal/app/IgniteImpl.java | 2 +
.../schema/CatalogVersionSufficiency.java | 40 ++++++
.../schema/CheckCatalogVersionOnActionRequest.java | 78 ++++++++++++
.../schema/CheckCatalogVersionOnAppendEntries.java | 21 ++--
.../schema/CatalogVersionSufficiencyTest.java | 56 +++++++++
.../CheckCatalogVersionOnActionRequestTest.java | 138 +++++++++++++++++++++
15 files changed, 421 insertions(+), 42 deletions(-)
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
index 6a62911fec..38e2714fc1 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
@@ -47,6 +47,7 @@ import org.apache.ignite.network.MessagingService;
import org.apache.ignite.network.TopologyService;
import org.apache.ignite.raft.jraft.RaftMessagesFactory;
import org.apache.ignite.raft.jraft.option.NodeOptions;
+import org.apache.ignite.raft.jraft.rpc.impl.ActionRequestInterceptor;
import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener;
import
org.apache.ignite.raft.jraft.rpc.impl.core.AppendEntriesRequestInterceptor;
import org.apache.ignite.raft.jraft.util.Utils;
@@ -157,6 +158,16 @@ public class Loza implements RaftManager {
raftServer.appendEntriesRequestInterceptor(appendEntriesRequestInterceptor);
}
+ /**
+ * Sets {@link ActionRequestInterceptor} to use. Should only be called
from the same thread that is used
+ * to {@link #start()} the component.
+ *
+ * @param actionRequestInterceptor Interceptor to use.
+ */
+ public void actionRequestInterceptor(ActionRequestInterceptor
actionRequestInterceptor) {
+ raftServer.actionRequestInterceptor(actionRequestInterceptor);
+ }
+
/** {@inheritDoc} */
@Override
public void start() {
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
index 07a3eff40c..4a647e0799 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
@@ -75,8 +75,10 @@ import
org.apache.ignite.raft.jraft.disruptor.StripedDisruptor;
import org.apache.ignite.raft.jraft.entity.PeerId;
import org.apache.ignite.raft.jraft.error.RaftError;
import org.apache.ignite.raft.jraft.option.NodeOptions;
+import org.apache.ignite.raft.jraft.rpc.impl.ActionRequestInterceptor;
import org.apache.ignite.raft.jraft.rpc.impl.IgniteRpcClient;
import org.apache.ignite.raft.jraft.rpc.impl.IgniteRpcServer;
+import org.apache.ignite.raft.jraft.rpc.impl.NullActionRequestInterceptor;
import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener;
import
org.apache.ignite.raft.jraft.rpc.impl.core.AppendEntriesRequestInterceptor;
import
org.apache.ignite.raft.jraft.rpc.impl.core.NullAppendEntriesRequestInterceptor;
@@ -133,6 +135,9 @@ public class JraftServerImpl implements RaftServer {
/** Interceptor for AppendEntriesRequests. Not thread-safe, should be
assigned and read in the same thread. */
private AppendEntriesRequestInterceptor appendEntriesRequestInterceptor =
new NullAppendEntriesRequestInterceptor();
+ /** Interceptor for ActionRequests. Not thread-safe, should be assigned
and read in the same thread. */
+ private ActionRequestInterceptor actionRequestInterceptor = new
NullActionRequestInterceptor();
+
/** The number of parallel raft groups starts. */
private static final int SIMULTANEOUS_GROUP_START_PARALLELISM =
Math.min(Utils.cpus() * 3, 25);
@@ -209,6 +214,16 @@ public class JraftServerImpl implements RaftServer {
this.appendEntriesRequestInterceptor = appendEntriesRequestInterceptor;
}
+ /**
+ * Sets {@link ActionRequestInterceptor} to use. Should only be called
from the same thread that is used
+ * to {@link #start()} the component.
+ *
+ * @param actionRequestInterceptor Interceptor to use.
+ */
+ public void actionRequestInterceptor(ActionRequestInterceptor
actionRequestInterceptor) {
+ this.actionRequestInterceptor = actionRequestInterceptor;
+ }
+
/** {@inheritDoc} */
@Override
public void start() {
@@ -256,7 +271,8 @@ public class JraftServerImpl implements RaftServer {
requestExecutor,
serviceEventInterceptor,
raftGroupEventsClientListener,
- appendEntriesRequestInterceptor
+ appendEntriesRequestInterceptor,
+ actionRequestInterceptor
);
if (opts.getfSMCallerExecutorDisruptor() == null) {
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/core/AppendEntriesRequestInterceptor.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/ActionRequestInterceptor.java
similarity index 70%
copy from
modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/core/AppendEntriesRequestInterceptor.java
copy to
modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/ActionRequestInterceptor.java
index 600aa3d562..d00bccacd7 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/core/AppendEntriesRequestInterceptor.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/ActionRequestInterceptor.java
@@ -15,26 +15,26 @@
* limitations under the License.
*/
-package org.apache.ignite.raft.jraft.rpc.impl.core;
+package org.apache.ignite.raft.jraft.rpc.impl;
-import org.apache.ignite.raft.jraft.rpc.Message;
+import org.apache.ignite.raft.jraft.rpc.ActionRequest;import
org.apache.ignite.raft.jraft.rpc.Message;
import org.apache.ignite.raft.jraft.rpc.RaftServerService;
-import org.apache.ignite.raft.jraft.rpc.RpcRequestClosure;
+import org.apache.ignite.raft.jraft.rpc.RpcContext;import
org.apache.ignite.raft.jraft.rpc.RpcRequestClosure;
import org.apache.ignite.raft.jraft.rpc.RpcRequests.AppendEntriesRequest;
import org.jetbrains.annotations.Nullable;
/**
- * Intercepts {@link AppendEntriesRequest}s as they come in. It might be used
to handle such a request in a non-standard
+ * Intercepts {@link ActionRequest}s as they come in. It might be used to
handle such a request in a non-standard
* way (like returning EBUSY under special circumstances instead of the
standard behavior).
*/
-public interface AppendEntriesRequestInterceptor {
+@SuppressWarnings("InterfaceMayBeAnnotatedFunctional")
+public interface ActionRequestInterceptor {
/**
* Intercepts handling of an incoming request. If non-null message is
returned, the standard handling is omitted.
*
- * @param service Server service.
+ * @param rpcCtx RPC context.
* @param request Request in question.
- * @param done Done closure.
* @return A message to return to the caller, or {@code null} if standard
handling should be used.
*/
- @Nullable Message intercept(RaftServerService service,
AppendEntriesRequest request, RpcRequestClosure done);
+ @Nullable Message intercept(RpcContext rpcCtx, ActionRequest request);
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java
index 4a6536fc2f..c4e0c8e1f6 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java
@@ -91,7 +91,8 @@ public class IgniteRpcServer implements RpcServer<Void> {
Executor rpcExecutor,
RaftServiceEventInterceptor serviceEventInterceptor,
RaftGroupEventsClientListener raftGroupEventsClientListener,
- AppendEntriesRequestInterceptor appendEntriesRequestFilter
+ AppendEntriesRequestInterceptor appendEntriesRequestFilter,
+ ActionRequestInterceptor actionRequestInterceptor
) {
this.service = service;
this.nodeManager = nodeManager;
@@ -122,7 +123,7 @@ public class IgniteRpcServer implements RpcServer<Void> {
registerProcessor(new RemoveLearnersRequestProcessor(rpcExecutor,
raftMessagesFactory));
registerProcessor(new ResetLearnersRequestProcessor(rpcExecutor,
raftMessagesFactory));
// common client integration
- registerProcessor(new ActionRequestProcessor(rpcExecutor,
raftMessagesFactory));
+ registerProcessor(new InterceptingActionRequestProcessor(rpcExecutor,
raftMessagesFactory, actionRequestInterceptor));
registerProcessor(new NotifyElectProcessor(raftMessagesFactory,
serviceEventInterceptor));
registerProcessor(new
RaftGroupEventsProcessor(raftGroupEventsClientListener));
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/core/InterceptingAppendEntriesRequestProcessor.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/InterceptingActionRequestProcessor.java
similarity index 50%
copy from
modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/core/InterceptingAppendEntriesRequestProcessor.java
copy to
modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/InterceptingActionRequestProcessor.java
index 3a3b336fe9..dac65dc7be 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/core/InterceptingAppendEntriesRequestProcessor.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/InterceptingActionRequestProcessor.java
@@ -15,41 +15,37 @@
* limitations under the License.
*/
-package org.apache.ignite.raft.jraft.rpc.impl.core;
+package org.apache.ignite.raft.jraft.rpc.impl;
import java.util.concurrent.Executor;
import org.apache.ignite.raft.jraft.RaftMessagesFactory;
+import org.apache.ignite.raft.jraft.rpc.ActionRequest;
import org.apache.ignite.raft.jraft.rpc.Message;
-import org.apache.ignite.raft.jraft.rpc.RaftServerService;
-import org.apache.ignite.raft.jraft.rpc.RpcRequestClosure;
-import org.apache.ignite.raft.jraft.rpc.RpcRequests.AppendEntriesRequest;
+import org.apache.ignite.raft.jraft.rpc.RpcContext;
/**
- * Extension of the standard {@link AppendEntriesRequestProcessor} that allows
to add some interception logic.
+ * Extension of the standard {@link ActionRequestProcessor} that allows to add
some interception logic.
*
- * @see AppendEntriesRequestInterceptor
+ * @see ActionRequestInterceptor
*/
-public class InterceptingAppendEntriesRequestProcessor extends
AppendEntriesRequestProcessor {
- private final AppendEntriesRequestInterceptor interceptor;
+public class InterceptingActionRequestProcessor extends ActionRequestProcessor
{
+ private final ActionRequestInterceptor interceptor;
- /**
- * Constructor.
- */
- public InterceptingAppendEntriesRequestProcessor(Executor executor,
RaftMessagesFactory msgFactory,
- AppendEntriesRequestInterceptor interceptor) {
+ /** Constructor. */
+ public InterceptingActionRequestProcessor(Executor executor,
RaftMessagesFactory msgFactory, ActionRequestInterceptor interceptor) {
super(executor, msgFactory);
this.interceptor = interceptor;
}
@Override
- public Message processRequest0(RaftServerService service,
AppendEntriesRequest request, RpcRequestClosure done) {
- Message result = interceptor.intercept(service, request, done);
+ public void handleRequest(RpcContext rpcCtx, ActionRequest request) {
+ Message interceptionResult = interceptor.intercept(rpcCtx, request);
- if (result != null) {
- return result;
+ if (interceptionResult != null) {
+ rpcCtx.sendResponse(interceptionResult);
+ } else {
+ super.handleRequest(rpcCtx, request);
}
-
- return super.processRequest0(service, request, done);
}
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/NullActionRequestInterceptor.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/NullActionRequestInterceptor.java
new file mode 100644
index 0000000000..77c46ede23
--- /dev/null
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/NullActionRequestInterceptor.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.rpc.impl;
+
+import org.apache.ignite.raft.jraft.rpc.ActionRequest;import
org.apache.ignite.raft.jraft.rpc.Message;
+import org.apache.ignite.raft.jraft.rpc.RpcContext;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * An {@link ActionRequestInterceptor} that never intercepts anything and
always asks the standard handling
+ * to be used.
+ */
+public class NullActionRequestInterceptor implements ActionRequestInterceptor {
+ @Override
+ public @Nullable Message intercept(RpcContext rpcCtx, ActionRequest
request) {
+ return null;
+ }
+}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/core/AppendEntriesRequestInterceptor.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/core/AppendEntriesRequestInterceptor.java
index 600aa3d562..37844dc967 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/core/AppendEntriesRequestInterceptor.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/core/AppendEntriesRequestInterceptor.java
@@ -27,6 +27,7 @@ import org.jetbrains.annotations.Nullable;
* Intercepts {@link AppendEntriesRequest}s as they come in. It might be used
to handle such a request in a non-standard
* way (like returning EBUSY under special circumstances instead of the
standard behavior).
*/
+@SuppressWarnings("InterfaceMayBeAnnotatedFunctional")
public interface AppendEntriesRequestInterceptor {
/**
* Intercepts handling of an incoming request. If non-null message is
returned, the standard handling is omitted.
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/core/InterceptingAppendEntriesRequestProcessor.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/core/InterceptingAppendEntriesRequestProcessor.java
index 3a3b336fe9..872d6cb45e 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/core/InterceptingAppendEntriesRequestProcessor.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/core/InterceptingAppendEntriesRequestProcessor.java
@@ -44,10 +44,10 @@ public class InterceptingAppendEntriesRequestProcessor
extends AppendEntriesRequ
@Override
public Message processRequest0(RaftServerService service,
AppendEntriesRequest request, RpcRequestClosure done) {
- Message result = interceptor.intercept(service, request, done);
+ Message interceptionResult = interceptor.intercept(service, request,
done);
- if (result != null) {
- return result;
+ if (interceptionResult != null) {
+ return interceptionResult;
}
return super.processRequest0(service, request, done);
diff --git
a/modules/raft/src/testFixtures/java/org/apache/ignite/raft/jraft/rpc/TestIgniteRpcServer.java
b/modules/raft/src/testFixtures/java/org/apache/ignite/raft/jraft/rpc/TestIgniteRpcServer.java
index 1fc359858f..3aeef2a2b5 100644
---
a/modules/raft/src/testFixtures/java/org/apache/ignite/raft/jraft/rpc/TestIgniteRpcServer.java
+++
b/modules/raft/src/testFixtures/java/org/apache/ignite/raft/jraft/rpc/TestIgniteRpcServer.java
@@ -23,6 +23,7 @@ import org.apache.ignite.network.ClusterService;
import org.apache.ignite.raft.jraft.NodeManager;
import org.apache.ignite.raft.jraft.option.NodeOptions;
import org.apache.ignite.raft.jraft.rpc.impl.IgniteRpcServer;
+import org.apache.ignite.raft.jraft.rpc.impl.NullActionRequestInterceptor;
import
org.apache.ignite.raft.jraft.rpc.impl.core.NullAppendEntriesRequestInterceptor;
import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener;
import org.apache.ignite.raft.messages.TestMessageGroup;
@@ -46,7 +47,8 @@ public class TestIgniteRpcServer extends IgniteRpcServer {
requestExecutor,
new RaftServiceEventInterceptor(),
new RaftGroupEventsClientListener(),
- new NullAppendEntriesRequestInterceptor()
+ new NullAppendEntriesRequestInterceptor(),
+ new NullActionRequestInterceptor()
);
clusterService.messagingService().addMessageHandler(TestMessageGroup.class, new
RpcMessageHandler());
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 86bda67d1c..b4e8ed6a60 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -129,6 +129,7 @@ import
org.apache.ignite.internal.storage.DataStorageModules;
import org.apache.ignite.internal.table.distributed.TableManager;
import org.apache.ignite.internal.table.distributed.TableMessageGroup;
import
org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.OutgoingSnapshotsManager;
+import
org.apache.ignite.internal.table.distributed.schema.CheckCatalogVersionOnActionRequest;
import
org.apache.ignite.internal.table.distributed.schema.CheckCatalogVersionOnAppendEntries;
import org.apache.ignite.internal.table.distributed.schema.SchemaSyncService;
import
org.apache.ignite.internal.table.distributed.schema.SchemaSyncServiceImpl;
@@ -536,6 +537,7 @@ public class IgniteImpl implements Ignite {
);
raftMgr.appendEntriesRequestInterceptor(new
CheckCatalogVersionOnAppendEntries(catalogManager));
+ raftMgr.actionRequestInterceptor(new
CheckCatalogVersionOnActionRequest(catalogManager));
SchemaSyncService schemaSyncService = new
SchemaSyncServiceImpl(metaStorageMgr.clusterTime(), delayDurationMsSupplier);
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/CatalogVersionSufficiency.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/CatalogVersionSufficiency.java
new file mode 100644
index 0000000000..499249eeae
--- /dev/null
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/CatalogVersionSufficiency.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.schema;
+
+import org.apache.ignite.internal.catalog.CatalogService;
+
+/**
+ * Logic that allows to determine whether the logcal Catalog version is
sufficient.
+ */
+public class CatalogVersionSufficiency {
+ private CatalogVersionSufficiency() {
+ // Deny instantiation.
+ }
+
+ /**
+ * Determines whether the local Catalog version is sufficient.
+ *
+ * @param requiredCatalogVersion Minimal catalog version that is required
to present.
+ * @param catalogService Catalog service.
+ * @return {@code true} iff the local Catalog version is sufficient.
+ */
+ static boolean isMetadataAvailableFor(int requiredCatalogVersion,
CatalogService catalogService) {
+ return requiredCatalogVersion <= catalogService.latestCatalogVersion();
+ }
+}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/CheckCatalogVersionOnActionRequest.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/CheckCatalogVersionOnActionRequest.java
new file mode 100644
index 0000000000..e5a599b225
--- /dev/null
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/CheckCatalogVersionOnActionRequest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.schema;
+
+import static
org.apache.ignite.internal.table.distributed.schema.CatalogVersionSufficiency.isMetadataAvailableFor;
+
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.raft.Command;
+import
org.apache.ignite.internal.table.distributed.command.CatalogVersionAware;
+import org.apache.ignite.raft.jraft.Node;
+import org.apache.ignite.raft.jraft.entity.PeerId;
+import org.apache.ignite.raft.jraft.error.RaftError;
+import org.apache.ignite.raft.jraft.rpc.ActionRequest;
+import org.apache.ignite.raft.jraft.rpc.Message;
+import org.apache.ignite.raft.jraft.rpc.RaftRpcFactory;
+import org.apache.ignite.raft.jraft.rpc.RpcContext;
+import org.apache.ignite.raft.jraft.rpc.impl.ActionRequestInterceptor;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * An {@link ActionRequestInterceptor} that rejects requests (by returning
EBUSY error code) if the incoming command
+ * requires catalog version that is not available locally yet.
+ */
+public class CheckCatalogVersionOnActionRequest implements
ActionRequestInterceptor {
+ private static final IgniteLogger LOG =
Loggers.forClass(CheckCatalogVersionOnActionRequest.class);
+
+ private final CatalogService catalogService;
+
+ public CheckCatalogVersionOnActionRequest(CatalogService catalogService) {
+ this.catalogService = catalogService;
+ }
+
+ @Override
+ public @Nullable Message intercept(RpcContext rpcCtx, ActionRequest
request) {
+ Node node = rpcCtx.getNodeManager().get(request.groupId(), new
PeerId(rpcCtx.getLocalConsistentId()));
+
+ Command command = request.command();
+
+ if (command instanceof CatalogVersionAware) {
+ int requiredCatalogVersion = ((CatalogVersionAware)
command).requiredCatalogVersion();
+
+ if (!isMetadataAvailableFor(requiredCatalogVersion,
catalogService)) {
+ // TODO: IGNITE-20298 - throttle logging.
+ LOG.warn(
+ "Metadata not yet available, group {}, required level
{}; rejecting ActionRequest with EBUSY.",
+ request.groupId(), requiredCatalogVersion
+ );
+
+ return RaftRpcFactory.DEFAULT //
+ .newResponse(
+ node.getRaftOptions().getRaftMessagesFactory(),
+ RaftError.EBUSY,
+ "Metadata not yet available, group '%s', required
level %d; rejecting ActionRequest with EBUSY.",
+ request.groupId(), requiredCatalogVersion
+ );
+ }
+ }
+
+ return null;
+ }
+}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/CheckCatalogVersionOnAppendEntries.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/CheckCatalogVersionOnAppendEntries.java
index f0887784d3..062ade979b 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/CheckCatalogVersionOnAppendEntries.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/CheckCatalogVersionOnAppendEntries.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.table.distributed.schema;
+import static
org.apache.ignite.internal.table.distributed.schema.CatalogVersionSufficiency.isMetadataAvailableFor;
+
import java.nio.ByteBuffer;
import org.apache.ignite.internal.catalog.CatalogService;
import org.apache.ignite.internal.logger.IgniteLogger;
@@ -64,13 +66,20 @@ public class CheckCatalogVersionOnAppendEntries implements
AppendEntriesRequestI
for (RaftOutter.EntryMeta entry : request.entriesList()) {
int requiredCatalogVersion =
readRequiredCatalogVersionForMeta(allData, entry,
node.getOptions().getCommandsMarshaller());
- if (requiredCatalogVersion != NO_VERSION_REQUIREMENT &&
!isMetadataAvailableFor(requiredCatalogVersion)) {
+ if (requiredCatalogVersion != NO_VERSION_REQUIREMENT &&
!isMetadataAvailableFor(requiredCatalogVersion, catalogService)) {
// TODO: IGNITE-20298 - throttle logging.
- LOG.warn("Metadata not yet available, group {}, required level
{}.", request.groupId(), requiredCatalogVersion);
+ LOG.warn(
+ "Metadata not yet available, group {}, required level
{}; rejecting AppendEntriesRequest with EBUSY.",
+ request.groupId(), requiredCatalogVersion
+ );
return RaftRpcFactory.DEFAULT //
-
.newResponse(node.getRaftOptions().getRaftMessagesFactory(), RaftError.EBUSY,
- "Metadata not yet available, group '%s', required
level %d.", request.groupId(), requiredCatalogVersion);
+ .newResponse(
+ node.getRaftOptions().getRaftMessagesFactory(),
+ RaftError.EBUSY,
+ "Metadata not yet available, group '%s', required
level %d; rejecting AppendEntriesRequest with EBUSY.",
+ request.groupId(), requiredCatalogVersion
+ );
}
offset += (int) entry.dataLen();
@@ -98,8 +107,4 @@ public class CheckCatalogVersionOnAppendEntries implements
AppendEntriesRequestI
return NO_VERSION_REQUIREMENT;
}
-
- private boolean isMetadataAvailableFor(int requiredCatalogVersion) {
- return requiredCatalogVersion <= catalogService.latestCatalogVersion();
- }
}
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/CatalogVersionSufficiencyTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/CatalogVersionSufficiencyTest.java
new file mode 100644
index 0000000000..5cfff5c3d8
--- /dev/null
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/CatalogVersionSufficiencyTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.schema;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.when;
+
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class CatalogVersionSufficiencyTest extends BaseIgniteAbstractTest {
+ @Mock
+ private CatalogService catalogService;
+
+ @Test
+ void exceedingLocalVersionIsSufficient() {
+ when(catalogService.latestCatalogVersion()).thenReturn(10);
+
+ assertTrue(CatalogVersionSufficiency.isMetadataAvailableFor(8,
catalogService));
+ }
+
+ @Test
+ void equalLocalVersionIsSufficient() {
+ when(catalogService.latestCatalogVersion()).thenReturn(10);
+
+ assertTrue(CatalogVersionSufficiency.isMetadataAvailableFor(10,
catalogService));
+ }
+
+ @Test
+ void lowerLocalVersionIsSufficient() {
+ when(catalogService.latestCatalogVersion()).thenReturn(10);
+
+ assertFalse(CatalogVersionSufficiency.isMetadataAvailableFor(12,
catalogService));
+ }
+}
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/CheckCatalogVersionOnActionRequestTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/CheckCatalogVersionOnActionRequestTest.java
new file mode 100644
index 0000000000..1061d75f79
--- /dev/null
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/CheckCatalogVersionOnActionRequestTest.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.schema;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyString;
+import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.when;
+
+import java.util.UUID;
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.raft.Command;
+import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
+import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.raft.jraft.Node;
+import org.apache.ignite.raft.jraft.NodeManager;
+import org.apache.ignite.raft.jraft.RaftMessagesFactory;
+import org.apache.ignite.raft.jraft.error.RaftError;
+import org.apache.ignite.raft.jraft.option.RaftOptions;
+import org.apache.ignite.raft.jraft.rpc.ActionRequest;
+import org.apache.ignite.raft.jraft.rpc.Message;
+import org.apache.ignite.raft.jraft.rpc.RpcContext;
+import org.apache.ignite.raft.jraft.rpc.RpcRequests.ErrorResponse;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class CheckCatalogVersionOnActionRequestTest extends BaseIgniteAbstractTest {
+ private final ReplicaMessagesFactory replicaMessagesFactory = new
ReplicaMessagesFactory();
+
+ private final TableMessagesFactory tableMessagesFactory = new
TableMessagesFactory();
+
+ private final RaftMessagesFactory raftMessagesFactory = new
RaftMessagesFactory();
+
+ @Mock
+ private CatalogService catalogService;
+
+ @InjectMocks
+ private CheckCatalogVersionOnActionRequest interceptor;
+
+ @Mock
+ private RpcContext rpcContext;
+
+ @Mock
+ private NodeManager nodeManager;
+
+ @Mock
+ private Node node;
+
+ private final RaftOptions raftOptions = new RaftOptions();
+
+ @BeforeEach
+ void initMocks() {
+ when(rpcContext.getNodeManager()).thenReturn(nodeManager);
+ when(nodeManager.get(anyString(), any())).thenReturn(node);
+ lenient().when(node.getRaftOptions()).thenReturn(raftOptions);
+ }
+
+ @Test
+ void delegatesWhenCommandHasNoRequiredCatalogVersion() {
+ ActionRequest request = raftMessagesFactory.actionRequest()
+ .groupId("test")
+ .command(commandWithoutRequiredCatalogVersion())
+ .build();
+
+ assertThat(interceptor.intercept(rpcContext, request),
is(nullValue()));
+ }
+
+ private Command commandWithoutRequiredCatalogVersion() {
+ return replicaMessagesFactory.safeTimeSyncCommand().build();
+ }
+
+ @Test
+ void delegatesWhenHavingEnoughMetadata() {
+ when(catalogService.latestCatalogVersion()).thenReturn(5);
+
+ ActionRequest request = raftMessagesFactory.actionRequest()
+ .groupId("test")
+ .command(commandWithRequiredCatalogVersion(3))
+ .build();
+
+ assertThat(interceptor.intercept(rpcContext, request),
is(nullValue()));
+ }
+
+ private Command commandWithRequiredCatalogVersion(int requiredVersion) {
+ return tableMessagesFactory.updateCommand()
+
.tablePartitionId(tableMessagesFactory.tablePartitionIdMessage().build())
+ .txId(UUID.randomUUID())
+ .rowUuid(UUID.randomUUID())
+ .requiredCatalogVersion(requiredVersion)
+ .build();
+ }
+
+ @Test
+ void returnsErrorCodeBusyWhenNotHavingEnoughMetadata() {
+ when(catalogService.latestCatalogVersion()).thenReturn(5);
+
+ ActionRequest request = raftMessagesFactory.actionRequest()
+ .groupId("test")
+ .command(commandWithRequiredCatalogVersion(6))
+ .build();
+
+ Message result = interceptor.intercept(rpcContext, request);
+
+ assertThat(result, is(notNullValue()));
+ assertThat(result, instanceOf(ErrorResponse.class));
+
+ ErrorResponse errorResponse = (ErrorResponse) result;
+ assertThat(errorResponse.errorCode(), is(RaftError.EBUSY.getNumber()));
+ assertThat(errorResponse.errorMsg(),
+ is("Metadata not yet available, group 'test', required level
6; rejecting ActionRequest with EBUSY."));
+ }
+}