This is an automated email from the ASF dual-hosted git repository.

tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 653cc4c8da IGNITE-20262 Refuse accepting partition Raft commands when 
not enough schemas are available (#2484)
653cc4c8da is described below

commit 653cc4c8da89529ba81e7b457162fae1516c3d2b
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Fri Sep 1 17:00:28 2023 +0400

    IGNITE-20262 Refuse accepting partition Raft commands when not enough 
schemas are available (#2484)
---
 .../java/org/apache/ignite/internal/raft/Loza.java |  11 ++
 .../internal/raft/server/impl/JraftServerImpl.java |  18 ++-
 ...erceptor.java => ActionRequestInterceptor.java} |  16 +--
 .../raft/jraft/rpc/impl/IgniteRpcServer.java       |   5 +-
 ...ava => InterceptingActionRequestProcessor.java} |  34 +++--
 .../rpc/impl/NullActionRequestInterceptor.java     |  33 +++++
 .../impl/core/AppendEntriesRequestInterceptor.java |   1 +
 .../InterceptingAppendEntriesRequestProcessor.java |   6 +-
 .../ignite/raft/jraft/rpc/TestIgniteRpcServer.java |   4 +-
 .../org/apache/ignite/internal/app/IgniteImpl.java |   2 +
 .../schema/CatalogVersionSufficiency.java          |  40 ++++++
 .../schema/CheckCatalogVersionOnActionRequest.java |  78 ++++++++++++
 .../schema/CheckCatalogVersionOnAppendEntries.java |  21 ++--
 .../schema/CatalogVersionSufficiencyTest.java      |  56 +++++++++
 .../CheckCatalogVersionOnActionRequestTest.java    | 138 +++++++++++++++++++++
 15 files changed, 421 insertions(+), 42 deletions(-)

diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
index 6a62911fec..38e2714fc1 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
@@ -47,6 +47,7 @@ import org.apache.ignite.network.MessagingService;
 import org.apache.ignite.network.TopologyService;
 import org.apache.ignite.raft.jraft.RaftMessagesFactory;
 import org.apache.ignite.raft.jraft.option.NodeOptions;
+import org.apache.ignite.raft.jraft.rpc.impl.ActionRequestInterceptor;
 import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener;
 import 
org.apache.ignite.raft.jraft.rpc.impl.core.AppendEntriesRequestInterceptor;
 import org.apache.ignite.raft.jraft.util.Utils;
@@ -157,6 +158,16 @@ public class Loza implements RaftManager {
         
raftServer.appendEntriesRequestInterceptor(appendEntriesRequestInterceptor);
     }
 
+    /**
+     * Sets {@link ActionRequestInterceptor} to use. Should only be called 
from the same thread that is used
+     * to {@link #start()} the component.
+     *
+     * @param actionRequestInterceptor Interceptor to use.
+     */
+    public void actionRequestInterceptor(ActionRequestInterceptor 
actionRequestInterceptor) {
+        raftServer.actionRequestInterceptor(actionRequestInterceptor);
+    }
+
     /** {@inheritDoc} */
     @Override
     public void start() {
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
index 07a3eff40c..4a647e0799 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
@@ -75,8 +75,10 @@ import 
org.apache.ignite.raft.jraft.disruptor.StripedDisruptor;
 import org.apache.ignite.raft.jraft.entity.PeerId;
 import org.apache.ignite.raft.jraft.error.RaftError;
 import org.apache.ignite.raft.jraft.option.NodeOptions;
+import org.apache.ignite.raft.jraft.rpc.impl.ActionRequestInterceptor;
 import org.apache.ignite.raft.jraft.rpc.impl.IgniteRpcClient;
 import org.apache.ignite.raft.jraft.rpc.impl.IgniteRpcServer;
+import org.apache.ignite.raft.jraft.rpc.impl.NullActionRequestInterceptor;
 import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener;
 import 
org.apache.ignite.raft.jraft.rpc.impl.core.AppendEntriesRequestInterceptor;
 import 
org.apache.ignite.raft.jraft.rpc.impl.core.NullAppendEntriesRequestInterceptor;
@@ -133,6 +135,9 @@ public class JraftServerImpl implements RaftServer {
     /** Interceptor for AppendEntriesRequests. Not thread-safe, should be 
assigned and read in the same thread. */
     private AppendEntriesRequestInterceptor appendEntriesRequestInterceptor = 
new NullAppendEntriesRequestInterceptor();
 
+    /** Interceptor for ActionRequests. Not thread-safe, should be assigned 
and read in the same thread. */
+    private ActionRequestInterceptor actionRequestInterceptor = new 
NullActionRequestInterceptor();
+
     /** The number of parallel raft groups starts. */
     private static final int SIMULTANEOUS_GROUP_START_PARALLELISM = 
Math.min(Utils.cpus() * 3, 25);
 
@@ -209,6 +214,16 @@ public class JraftServerImpl implements RaftServer {
         this.appendEntriesRequestInterceptor = appendEntriesRequestInterceptor;
     }
 
+    /**
+     * Sets {@link ActionRequestInterceptor} to use. Should only be called 
from the same thread that is used
+     * to {@link #start()} the component.
+     *
+     * @param actionRequestInterceptor Interceptor to use.
+     */
+    public void actionRequestInterceptor(ActionRequestInterceptor 
actionRequestInterceptor) {
+        this.actionRequestInterceptor = actionRequestInterceptor;
+    }
+
     /** {@inheritDoc} */
     @Override
     public void start() {
@@ -256,7 +271,8 @@ public class JraftServerImpl implements RaftServer {
                 requestExecutor,
                 serviceEventInterceptor,
                 raftGroupEventsClientListener,
-                appendEntriesRequestInterceptor
+                appendEntriesRequestInterceptor,
+                actionRequestInterceptor
         );
 
         if (opts.getfSMCallerExecutorDisruptor() == null) {
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/core/AppendEntriesRequestInterceptor.java
 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/ActionRequestInterceptor.java
similarity index 70%
copy from 
modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/core/AppendEntriesRequestInterceptor.java
copy to 
modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/ActionRequestInterceptor.java
index 600aa3d562..d00bccacd7 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/core/AppendEntriesRequestInterceptor.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/ActionRequestInterceptor.java
@@ -15,26 +15,26 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.raft.jraft.rpc.impl.core;
+package org.apache.ignite.raft.jraft.rpc.impl;
 
-import org.apache.ignite.raft.jraft.rpc.Message;
+import org.apache.ignite.raft.jraft.rpc.ActionRequest;import 
org.apache.ignite.raft.jraft.rpc.Message;
 import org.apache.ignite.raft.jraft.rpc.RaftServerService;
-import org.apache.ignite.raft.jraft.rpc.RpcRequestClosure;
+import org.apache.ignite.raft.jraft.rpc.RpcContext;import 
org.apache.ignite.raft.jraft.rpc.RpcRequestClosure;
 import org.apache.ignite.raft.jraft.rpc.RpcRequests.AppendEntriesRequest;
 import org.jetbrains.annotations.Nullable;
 
 /**
- * Intercepts {@link AppendEntriesRequest}s as they come in. It might be used 
to handle such a request in a non-standard
+ * Intercepts {@link ActionRequest}s as they come in. It might be used to 
handle such a request in a non-standard
  * way (like returning EBUSY under special circumstances instead of the 
standard behavior).
  */
-public interface AppendEntriesRequestInterceptor {
+@SuppressWarnings("InterfaceMayBeAnnotatedFunctional")
+public interface ActionRequestInterceptor {
     /**
      * Intercepts handling of an incoming request. If non-null message is 
returned, the standard handling is omitted.
      *
-     * @param service Server service.
+     * @param rpcCtx RPC context.
      * @param request Request in question.
-     * @param done Done closure.
      * @return A message to return to the caller, or {@code null} if standard 
handling should be used.
      */
-    @Nullable Message intercept(RaftServerService service, 
AppendEntriesRequest request,  RpcRequestClosure done);
+    @Nullable Message intercept(RpcContext rpcCtx, ActionRequest request);
 }
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java
 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java
index 4a6536fc2f..c4e0c8e1f6 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java
@@ -91,7 +91,8 @@ public class IgniteRpcServer implements RpcServer<Void> {
             Executor rpcExecutor,
             RaftServiceEventInterceptor serviceEventInterceptor,
             RaftGroupEventsClientListener raftGroupEventsClientListener,
-            AppendEntriesRequestInterceptor appendEntriesRequestFilter
+            AppendEntriesRequestInterceptor appendEntriesRequestFilter,
+            ActionRequestInterceptor actionRequestInterceptor
     ) {
         this.service = service;
         this.nodeManager = nodeManager;
@@ -122,7 +123,7 @@ public class IgniteRpcServer implements RpcServer<Void> {
         registerProcessor(new RemoveLearnersRequestProcessor(rpcExecutor, 
raftMessagesFactory));
         registerProcessor(new ResetLearnersRequestProcessor(rpcExecutor, 
raftMessagesFactory));
         // common client integration
-        registerProcessor(new ActionRequestProcessor(rpcExecutor, 
raftMessagesFactory));
+        registerProcessor(new InterceptingActionRequestProcessor(rpcExecutor, 
raftMessagesFactory, actionRequestInterceptor));
         registerProcessor(new NotifyElectProcessor(raftMessagesFactory, 
serviceEventInterceptor));
         registerProcessor(new 
RaftGroupEventsProcessor(raftGroupEventsClientListener));
 
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/core/InterceptingAppendEntriesRequestProcessor.java
 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/InterceptingActionRequestProcessor.java
similarity index 50%
copy from 
modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/core/InterceptingAppendEntriesRequestProcessor.java
copy to 
modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/InterceptingActionRequestProcessor.java
index 3a3b336fe9..dac65dc7be 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/core/InterceptingAppendEntriesRequestProcessor.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/InterceptingActionRequestProcessor.java
@@ -15,41 +15,37 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.raft.jraft.rpc.impl.core;
+package org.apache.ignite.raft.jraft.rpc.impl;
 
 import java.util.concurrent.Executor;
 import org.apache.ignite.raft.jraft.RaftMessagesFactory;
+import org.apache.ignite.raft.jraft.rpc.ActionRequest;
 import org.apache.ignite.raft.jraft.rpc.Message;
-import org.apache.ignite.raft.jraft.rpc.RaftServerService;
-import org.apache.ignite.raft.jraft.rpc.RpcRequestClosure;
-import org.apache.ignite.raft.jraft.rpc.RpcRequests.AppendEntriesRequest;
+import org.apache.ignite.raft.jraft.rpc.RpcContext;
 
 /**
- * Extension of the standard {@link AppendEntriesRequestProcessor} that allows 
to add some interception logic.
+ * Extension of the standard {@link ActionRequestProcessor} that allows to add 
some interception logic.
  *
- * @see AppendEntriesRequestInterceptor
+ * @see ActionRequestInterceptor
  */
-public class InterceptingAppendEntriesRequestProcessor extends 
AppendEntriesRequestProcessor {
-    private final AppendEntriesRequestInterceptor interceptor;
+public class InterceptingActionRequestProcessor extends ActionRequestProcessor 
{
+    private final ActionRequestInterceptor interceptor;
 
-    /**
-     * Constructor.
-     */
-    public InterceptingAppendEntriesRequestProcessor(Executor executor, 
RaftMessagesFactory msgFactory,
-            AppendEntriesRequestInterceptor interceptor) {
+    /** Constructor. */
+    public InterceptingActionRequestProcessor(Executor executor, 
RaftMessagesFactory msgFactory, ActionRequestInterceptor interceptor) {
         super(executor, msgFactory);
 
         this.interceptor = interceptor;
     }
 
     @Override
-    public Message processRequest0(RaftServerService service, 
AppendEntriesRequest request, RpcRequestClosure done) {
-        Message result = interceptor.intercept(service, request,  done);
+    public void handleRequest(RpcContext rpcCtx, ActionRequest request) {
+        Message interceptionResult = interceptor.intercept(rpcCtx, request);
 
-        if (result != null) {
-            return result;
+        if (interceptionResult != null) {
+            rpcCtx.sendResponse(interceptionResult);
+        } else {
+            super.handleRequest(rpcCtx, request);
         }
-
-        return super.processRequest0(service, request, done);
     }
 }
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/NullActionRequestInterceptor.java
 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/NullActionRequestInterceptor.java
new file mode 100644
index 0000000000..77c46ede23
--- /dev/null
+++ 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/NullActionRequestInterceptor.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.rpc.impl;
+
+import org.apache.ignite.raft.jraft.rpc.ActionRequest;import 
org.apache.ignite.raft.jraft.rpc.Message;
+import org.apache.ignite.raft.jraft.rpc.RpcContext;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * An {@link ActionRequestInterceptor} that never intercepts anything and 
always asks the standard handling
+ * to be used.
+ */
+public class NullActionRequestInterceptor implements ActionRequestInterceptor {
+    @Override
+    public @Nullable Message intercept(RpcContext rpcCtx, ActionRequest 
request) {
+        return null;
+    }
+}
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/core/AppendEntriesRequestInterceptor.java
 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/core/AppendEntriesRequestInterceptor.java
index 600aa3d562..37844dc967 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/core/AppendEntriesRequestInterceptor.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/core/AppendEntriesRequestInterceptor.java
@@ -27,6 +27,7 @@ import org.jetbrains.annotations.Nullable;
  * Intercepts {@link AppendEntriesRequest}s as they come in. It might be used 
to handle such a request in a non-standard
  * way (like returning EBUSY under special circumstances instead of the 
standard behavior).
  */
+@SuppressWarnings("InterfaceMayBeAnnotatedFunctional")
 public interface AppendEntriesRequestInterceptor {
     /**
      * Intercepts handling of an incoming request. If non-null message is 
returned, the standard handling is omitted.
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/core/InterceptingAppendEntriesRequestProcessor.java
 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/core/InterceptingAppendEntriesRequestProcessor.java
index 3a3b336fe9..872d6cb45e 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/core/InterceptingAppendEntriesRequestProcessor.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/core/InterceptingAppendEntriesRequestProcessor.java
@@ -44,10 +44,10 @@ public class InterceptingAppendEntriesRequestProcessor 
extends AppendEntriesRequ
 
     @Override
     public Message processRequest0(RaftServerService service, 
AppendEntriesRequest request, RpcRequestClosure done) {
-        Message result = interceptor.intercept(service, request,  done);
+        Message interceptionResult = interceptor.intercept(service, request,  
done);
 
-        if (result != null) {
-            return result;
+        if (interceptionResult != null) {
+            return interceptionResult;
         }
 
         return super.processRequest0(service, request, done);
diff --git 
a/modules/raft/src/testFixtures/java/org/apache/ignite/raft/jraft/rpc/TestIgniteRpcServer.java
 
b/modules/raft/src/testFixtures/java/org/apache/ignite/raft/jraft/rpc/TestIgniteRpcServer.java
index 1fc359858f..3aeef2a2b5 100644
--- 
a/modules/raft/src/testFixtures/java/org/apache/ignite/raft/jraft/rpc/TestIgniteRpcServer.java
+++ 
b/modules/raft/src/testFixtures/java/org/apache/ignite/raft/jraft/rpc/TestIgniteRpcServer.java
@@ -23,6 +23,7 @@ import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.raft.jraft.NodeManager;
 import org.apache.ignite.raft.jraft.option.NodeOptions;
 import org.apache.ignite.raft.jraft.rpc.impl.IgniteRpcServer;
+import org.apache.ignite.raft.jraft.rpc.impl.NullActionRequestInterceptor;
 import 
org.apache.ignite.raft.jraft.rpc.impl.core.NullAppendEntriesRequestInterceptor;
 import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener;
 import org.apache.ignite.raft.messages.TestMessageGroup;
@@ -46,7 +47,8 @@ public class TestIgniteRpcServer extends IgniteRpcServer {
                 requestExecutor,
                 new RaftServiceEventInterceptor(),
                 new RaftGroupEventsClientListener(),
-                new NullAppendEntriesRequestInterceptor()
+                new NullAppendEntriesRequestInterceptor(),
+                new NullActionRequestInterceptor()
         );
 
         
clusterService.messagingService().addMessageHandler(TestMessageGroup.class, new 
RpcMessageHandler());
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 86bda67d1c..b4e8ed6a60 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -129,6 +129,7 @@ import 
org.apache.ignite.internal.storage.DataStorageModules;
 import org.apache.ignite.internal.table.distributed.TableManager;
 import org.apache.ignite.internal.table.distributed.TableMessageGroup;
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.OutgoingSnapshotsManager;
+import 
org.apache.ignite.internal.table.distributed.schema.CheckCatalogVersionOnActionRequest;
 import 
org.apache.ignite.internal.table.distributed.schema.CheckCatalogVersionOnAppendEntries;
 import org.apache.ignite.internal.table.distributed.schema.SchemaSyncService;
 import 
org.apache.ignite.internal.table.distributed.schema.SchemaSyncServiceImpl;
@@ -536,6 +537,7 @@ public class IgniteImpl implements Ignite {
         );
 
         raftMgr.appendEntriesRequestInterceptor(new 
CheckCatalogVersionOnAppendEntries(catalogManager));
+        raftMgr.actionRequestInterceptor(new 
CheckCatalogVersionOnActionRequest(catalogManager));
 
         SchemaSyncService schemaSyncService = new 
SchemaSyncServiceImpl(metaStorageMgr.clusterTime(), delayDurationMsSupplier);
 
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/CatalogVersionSufficiency.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/CatalogVersionSufficiency.java
new file mode 100644
index 0000000000..499249eeae
--- /dev/null
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/CatalogVersionSufficiency.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.schema;
+
+import org.apache.ignite.internal.catalog.CatalogService;
+
+/**
+ * Logic that allows to determine whether the logcal Catalog version is 
sufficient.
+ */
+public class CatalogVersionSufficiency {
+    private CatalogVersionSufficiency() {
+        // Deny instantiation.
+    }
+
+    /**
+     * Determines whether the local Catalog version is sufficient.
+     *
+     * @param requiredCatalogVersion Minimal catalog version that is required 
to present.
+     * @param catalogService Catalog service.
+     * @return {@code true} iff the local Catalog version is sufficient.
+     */
+    static boolean isMetadataAvailableFor(int requiredCatalogVersion, 
CatalogService catalogService) {
+        return requiredCatalogVersion <= catalogService.latestCatalogVersion();
+    }
+}
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/CheckCatalogVersionOnActionRequest.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/CheckCatalogVersionOnActionRequest.java
new file mode 100644
index 0000000000..e5a599b225
--- /dev/null
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/CheckCatalogVersionOnActionRequest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.schema;
+
+import static 
org.apache.ignite.internal.table.distributed.schema.CatalogVersionSufficiency.isMetadataAvailableFor;
+
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.raft.Command;
+import 
org.apache.ignite.internal.table.distributed.command.CatalogVersionAware;
+import org.apache.ignite.raft.jraft.Node;
+import org.apache.ignite.raft.jraft.entity.PeerId;
+import org.apache.ignite.raft.jraft.error.RaftError;
+import org.apache.ignite.raft.jraft.rpc.ActionRequest;
+import org.apache.ignite.raft.jraft.rpc.Message;
+import org.apache.ignite.raft.jraft.rpc.RaftRpcFactory;
+import org.apache.ignite.raft.jraft.rpc.RpcContext;
+import org.apache.ignite.raft.jraft.rpc.impl.ActionRequestInterceptor;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * An {@link ActionRequestInterceptor} that rejects requests (by returning 
EBUSY error code) if the incoming command
+ * requires catalog version that is not available locally yet.
+ */
+public class CheckCatalogVersionOnActionRequest implements 
ActionRequestInterceptor {
+    private static final IgniteLogger LOG = 
Loggers.forClass(CheckCatalogVersionOnActionRequest.class);
+
+    private final CatalogService catalogService;
+
+    public CheckCatalogVersionOnActionRequest(CatalogService catalogService) {
+        this.catalogService = catalogService;
+    }
+
+    @Override
+    public @Nullable Message intercept(RpcContext rpcCtx, ActionRequest 
request) {
+        Node node = rpcCtx.getNodeManager().get(request.groupId(), new 
PeerId(rpcCtx.getLocalConsistentId()));
+
+        Command command = request.command();
+
+        if (command instanceof CatalogVersionAware) {
+            int requiredCatalogVersion = ((CatalogVersionAware) 
command).requiredCatalogVersion();
+
+            if (!isMetadataAvailableFor(requiredCatalogVersion, 
catalogService)) {
+                // TODO: IGNITE-20298 - throttle logging.
+                LOG.warn(
+                        "Metadata not yet available, group {}, required level 
{}; rejecting ActionRequest with EBUSY.",
+                        request.groupId(), requiredCatalogVersion
+                );
+
+                return RaftRpcFactory.DEFAULT //
+                    .newResponse(
+                            node.getRaftOptions().getRaftMessagesFactory(),
+                            RaftError.EBUSY,
+                            "Metadata not yet available, group '%s', required 
level %d; rejecting ActionRequest with EBUSY.",
+                            request.groupId(), requiredCatalogVersion
+                    );
+            }
+        }
+
+        return null;
+    }
+}
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/CheckCatalogVersionOnAppendEntries.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/CheckCatalogVersionOnAppendEntries.java
index f0887784d3..062ade979b 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/CheckCatalogVersionOnAppendEntries.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/CheckCatalogVersionOnAppendEntries.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.table.distributed.schema;
 
+import static 
org.apache.ignite.internal.table.distributed.schema.CatalogVersionSufficiency.isMetadataAvailableFor;
+
 import java.nio.ByteBuffer;
 import org.apache.ignite.internal.catalog.CatalogService;
 import org.apache.ignite.internal.logger.IgniteLogger;
@@ -64,13 +66,20 @@ public class CheckCatalogVersionOnAppendEntries implements 
AppendEntriesRequestI
         for (RaftOutter.EntryMeta entry : request.entriesList()) {
             int requiredCatalogVersion = 
readRequiredCatalogVersionForMeta(allData, entry, 
node.getOptions().getCommandsMarshaller());
 
-            if (requiredCatalogVersion != NO_VERSION_REQUIREMENT && 
!isMetadataAvailableFor(requiredCatalogVersion)) {
+            if (requiredCatalogVersion != NO_VERSION_REQUIREMENT && 
!isMetadataAvailableFor(requiredCatalogVersion, catalogService)) {
                 // TODO: IGNITE-20298 - throttle logging.
-                LOG.warn("Metadata not yet available, group {}, required level 
{}.", request.groupId(), requiredCatalogVersion);
+                LOG.warn(
+                        "Metadata not yet available, group {}, required level 
{}; rejecting AppendEntriesRequest with EBUSY.",
+                        request.groupId(), requiredCatalogVersion
+                );
 
                 return RaftRpcFactory.DEFAULT //
-                    
.newResponse(node.getRaftOptions().getRaftMessagesFactory(), RaftError.EBUSY,
-                            "Metadata not yet available, group '%s', required 
level %d.", request.groupId(), requiredCatalogVersion);
+                    .newResponse(
+                            node.getRaftOptions().getRaftMessagesFactory(),
+                            RaftError.EBUSY,
+                            "Metadata not yet available, group '%s', required 
level %d; rejecting AppendEntriesRequest with EBUSY.",
+                            request.groupId(), requiredCatalogVersion
+                    );
             }
 
             offset += (int) entry.dataLen();
@@ -98,8 +107,4 @@ public class CheckCatalogVersionOnAppendEntries implements 
AppendEntriesRequestI
 
         return NO_VERSION_REQUIREMENT;
     }
-
-    private boolean isMetadataAvailableFor(int requiredCatalogVersion) {
-        return requiredCatalogVersion <= catalogService.latestCatalogVersion();
-    }
 }
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/CatalogVersionSufficiencyTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/CatalogVersionSufficiencyTest.java
new file mode 100644
index 0000000000..5cfff5c3d8
--- /dev/null
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/CatalogVersionSufficiencyTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.schema;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.when;
+
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class CatalogVersionSufficiencyTest extends BaseIgniteAbstractTest {
+    @Mock
+    private CatalogService catalogService;
+
+    @Test
+    void exceedingLocalVersionIsSufficient() {
+        when(catalogService.latestCatalogVersion()).thenReturn(10);
+
+        assertTrue(CatalogVersionSufficiency.isMetadataAvailableFor(8, 
catalogService));
+    }
+
+    @Test
+    void equalLocalVersionIsSufficient() {
+        when(catalogService.latestCatalogVersion()).thenReturn(10);
+
+        assertTrue(CatalogVersionSufficiency.isMetadataAvailableFor(10, 
catalogService));
+    }
+
+    @Test
+    void lowerLocalVersionIsSufficient() {
+        when(catalogService.latestCatalogVersion()).thenReturn(10);
+
+        assertFalse(CatalogVersionSufficiency.isMetadataAvailableFor(12, 
catalogService));
+    }
+}
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/CheckCatalogVersionOnActionRequestTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/CheckCatalogVersionOnActionRequestTest.java
new file mode 100644
index 0000000000..1061d75f79
--- /dev/null
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/CheckCatalogVersionOnActionRequestTest.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.schema;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyString;
+import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.when;
+
+import java.util.UUID;
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.raft.Command;
+import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
+import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.raft.jraft.Node;
+import org.apache.ignite.raft.jraft.NodeManager;
+import org.apache.ignite.raft.jraft.RaftMessagesFactory;
+import org.apache.ignite.raft.jraft.error.RaftError;
+import org.apache.ignite.raft.jraft.option.RaftOptions;
+import org.apache.ignite.raft.jraft.rpc.ActionRequest;
+import org.apache.ignite.raft.jraft.rpc.Message;
+import org.apache.ignite.raft.jraft.rpc.RpcContext;
+import org.apache.ignite.raft.jraft.rpc.RpcRequests.ErrorResponse;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class CheckCatalogVersionOnActionRequestTest extends BaseIgniteAbstractTest {
+    private final ReplicaMessagesFactory replicaMessagesFactory = new 
ReplicaMessagesFactory();
+
+    private final TableMessagesFactory tableMessagesFactory = new 
TableMessagesFactory();
+
+    private final RaftMessagesFactory raftMessagesFactory = new 
RaftMessagesFactory();
+
+    @Mock
+    private CatalogService catalogService;
+
+    @InjectMocks
+    private CheckCatalogVersionOnActionRequest interceptor;
+
+    @Mock
+    private RpcContext rpcContext;
+
+    @Mock
+    private NodeManager nodeManager;
+
+    @Mock
+    private Node node;
+
+    private final RaftOptions raftOptions = new RaftOptions();
+
+    @BeforeEach
+    void initMocks() {
+        when(rpcContext.getNodeManager()).thenReturn(nodeManager);
+        when(nodeManager.get(anyString(), any())).thenReturn(node);
+        lenient().when(node.getRaftOptions()).thenReturn(raftOptions);
+    }
+
+    @Test
+    void delegatesWhenCommandHasNoRequiredCatalogVersion() {
+        ActionRequest request = raftMessagesFactory.actionRequest()
+                .groupId("test")
+                .command(commandWithoutRequiredCatalogVersion())
+                .build();
+
+        assertThat(interceptor.intercept(rpcContext, request), 
is(nullValue()));
+    }
+
+    private Command commandWithoutRequiredCatalogVersion() {
+        return replicaMessagesFactory.safeTimeSyncCommand().build();
+    }
+
+    @Test
+    void delegatesWhenHavingEnoughMetadata() {
+        when(catalogService.latestCatalogVersion()).thenReturn(5);
+
+        ActionRequest request = raftMessagesFactory.actionRequest()
+                .groupId("test")
+                .command(commandWithRequiredCatalogVersion(3))
+                .build();
+
+        assertThat(interceptor.intercept(rpcContext, request), 
is(nullValue()));
+    }
+
+    private Command commandWithRequiredCatalogVersion(int requiredVersion) {
+        return tableMessagesFactory.updateCommand()
+                
.tablePartitionId(tableMessagesFactory.tablePartitionIdMessage().build())
+                .txId(UUID.randomUUID())
+                .rowUuid(UUID.randomUUID())
+                .requiredCatalogVersion(requiredVersion)
+                .build();
+    }
+
+    @Test
+    void returnsErrorCodeBusyWhenNotHavingEnoughMetadata() {
+        when(catalogService.latestCatalogVersion()).thenReturn(5);
+
+        ActionRequest request = raftMessagesFactory.actionRequest()
+                .groupId("test")
+                .command(commandWithRequiredCatalogVersion(6))
+                .build();
+
+        Message result = interceptor.intercept(rpcContext, request);
+
+        assertThat(result, is(notNullValue()));
+        assertThat(result, instanceOf(ErrorResponse.class));
+
+        ErrorResponse errorResponse = (ErrorResponse) result;
+        assertThat(errorResponse.errorCode(), is(RaftError.EBUSY.getNumber()));
+        assertThat(errorResponse.errorMsg(),
+                is("Metadata not yet available, group 'test', required level 
6; rejecting ActionRequest with EBUSY."));
+    }
+}


Reply via email to