This is an automated email from the ASF dual-hosted git repository.

shishkovilja pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 1fa94e8c937 IGNITE-28477 Merge StartRoutineDiscoveryMessageV2 into 
StartRoutineDiscoveryMessage (#13100)
1fa94e8c937 is described below

commit 1fa94e8c9374cb0eeccd085a0e907d9e73a9dd2e
Author: Aleksandr Chesnokov <[email protected]>
AuthorDate: Wed May 20 11:18:28 2026 +0300

    IGNITE-28477 Merge StartRoutineDiscoveryMessageV2 into 
StartRoutineDiscoveryMessage (#13100)
---
 .../ignite/internal/CoreMessagesProvider.java      |  4 +-
 .../continuous/GridContinuousProcessor.java        | 30 +++--------
 .../continuous/StartRoutineDiscoveryMessage.java   | 26 +++++++--
 .../continuous/StartRoutineDiscoveryMessageV2.java | 62 ----------------------
 .../main/resources/META-INF/classnames.properties  |  1 -
 .../ignite/messaging/GridMessagingSelfTest.java    |  5 +-
 6 files changed, 34 insertions(+), 94 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java
index 460cb7bcc10..28777790e3d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java
@@ -189,7 +189,6 @@ import 
org.apache.ignite.internal.processors.continuous.GridContinuousMessage;
 import org.apache.ignite.internal.processors.continuous.StartRequestData;
 import 
org.apache.ignite.internal.processors.continuous.StartRoutineAckDiscoveryMessage;
 import 
org.apache.ignite.internal.processors.continuous.StartRoutineDiscoveryMessage;
-import 
org.apache.ignite.internal.processors.continuous.StartRoutineDiscoveryMessageV2;
 import 
org.apache.ignite.internal.processors.continuous.StopRoutineAckDiscoveryMessage;
 import 
org.apache.ignite.internal.processors.continuous.StopRoutineDiscoveryMessage;
 import org.apache.ignite.internal.processors.datastreamer.DataStreamerEntry;
@@ -519,9 +518,8 @@ public class CoreMessagesProvider extends 
AbstractMarshallableMessageFactoryProv
         withNoSchema(LatchAckMessage.class);
         withSchema(AtomicApplicationAttributesAwareRequest.class);
         withNoSchema(StartRequestData.class);
-        withNoSchema(StartRoutineDiscoveryMessage.class);
         withNoSchema(StartRoutineAckDiscoveryMessage.class);
-        withNoSchema(StartRoutineDiscoveryMessageV2.class);
+        withNoSchema(StartRoutineDiscoveryMessage.class);
         withNoSchema(StoredCacheData.class);
 
         // [10600-10800]: Affinity & partition maps.
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index 30bc8175de3..489f13e71fe 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -72,6 +72,7 @@ import 
org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheProcessor;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap;
 import 
org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandler;
+import 
org.apache.ignite.internal.processors.continuous.StartRoutineDiscoveryMessage.Mode;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
 import org.apache.ignite.internal.systemview.ContinuousQueryViewWalker;
 import org.apache.ignite.internal.thread.OomExceptionHandler;
@@ -211,26 +212,13 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
                 @Override public void onCustomEvent(AffinityTopologyVersion 
topVer,
                     ClusterNode snd,
                     StartRoutineDiscoveryMessage msg) {
-                    assert !immutableDiscoCustomMsg;
-
                     if (ctx.isStopping())
                         return;
 
-                    processStartRequestMutable(snd, msg);
-                }
-            });
-
-        
ctx.discovery().setCustomEventListener(StartRoutineDiscoveryMessageV2.class,
-            new CustomEventListener<StartRoutineDiscoveryMessageV2>() {
-                @Override public void onCustomEvent(AffinityTopologyVersion 
topVer,
-                    ClusterNode snd,
-                    StartRoutineDiscoveryMessageV2 msg) {
-                    assert immutableDiscoCustomMsg;
-
-                    if (ctx.isStopping())
-                        return;
-
-                    processStartRequestImmutable(topVer, snd, msg);
+                    if (immutableDiscoCustomMsg)
+                        processStartRequestImmutable(topVer, snd, msg);
+                    else
+                        processStartRequestMutable(snd, msg);
                 }
             });
 
@@ -991,9 +979,7 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
         reqData.prepareMarshal(ctx);
 
         if (!immutableDiscoCustomMsg) {
-            StartRoutineDiscoveryMessage msg = new 
StartRoutineDiscoveryMessage(
-                    routineId,
-                    reqData);
+            StartRoutineDiscoveryMessage msg = new 
StartRoutineDiscoveryMessage(routineId, reqData, Mode.MUTABLE);
 
             if (hnd.updateCounters() != null)
                 msg.addUpdateCounters(ctx.localNodeId(), hnd.updateCounters());
@@ -1001,7 +987,7 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
             return msg;
         }
         else
-            return new StartRoutineDiscoveryMessageV2(routineId, reqData);
+            return new StartRoutineDiscoveryMessage(routineId, reqData, 
Mode.IMMUTABLE);
     }
 
     /**
@@ -1467,7 +1453,7 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
      */
     private void processStartRequestImmutable(final AffinityTopologyVersion 
topVer,
         final ClusterNode snd,
-        final StartRoutineDiscoveryMessageV2 msg) {
+        final StartRoutineDiscoveryMessage msg) {
         StartRequestData reqData = msg.startRequestData();
 
         ContinuousRoutineInfo routineInfo = new ContinuousRoutineInfo(snd.id(),
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java
index af18f362fe0..ddf9e63f6c4 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java
@@ -26,15 +26,26 @@ import 
org.apache.ignite.internal.managers.communication.ErrorMessage;
 import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
 import org.apache.ignite.internal.util.typedef.internal.S;
 
+import static 
org.apache.ignite.internal.processors.continuous.StartRoutineDiscoveryMessage.Mode.MUTABLE;
+
 /**
  * Discovery message used for Continuous Query registration.
  */
 public class StartRoutineDiscoveryMessage extends AbstractContinuousMessage {
+    /** Discovery message mode. */
+    enum Mode {
+        /** Mutable discovery mode. */
+        MUTABLE,
+
+        /** Immutable discovery mode. */
+        IMMUTABLE
+    }
+
     /** */
     @Order(0)
     StartRequestData startReqData;
 
-    /** */
+    /** Errors collected by mutable discovery. */
     @Order(1)
     Map<UUID, ErrorMessage> errs = new HashMap<>();
 
@@ -46,14 +57,20 @@ public class StartRoutineDiscoveryMessage extends 
AbstractContinuousMessage {
     @Order(3)
     Map<UUID, Map<Integer, Long>> updateCntrsPerNode;
 
+    /** Discovery message mode. */
+    @Order(4)
+    Mode mode;
+
     /**
      * @param routineId Routine id.
      * @param startReqData Start request data.
+     * @param mode Discovery message mode.
      */
-    public StartRoutineDiscoveryMessage(UUID routineId, StartRequestData 
startReqData) {
+    StartRoutineDiscoveryMessage(UUID routineId, StartRequestData 
startReqData, Mode mode) {
         super(routineId);
 
         this.startReqData = startReqData;
+        this.mode = mode;
     }
 
     /** */
@@ -110,11 +127,14 @@ public class StartRoutineDiscoveryMessage extends 
AbstractContinuousMessage {
 
     /** {@inheritDoc} */
     @Override public boolean isMutable() {
-        return true;
+        return mode == MUTABLE;
     }
 
     /** {@inheritDoc} */
     @Override public DiscoveryCustomMessage ackMessage() {
+        if (!isMutable())
+            return null;
+
         return new StartRoutineAckDiscoveryMessage(routineId, errs, 
updateCntrs, updateCntrsPerNode);
     }
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessageV2.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessageV2.java
deleted file mode 100644
index 15b57836528..00000000000
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessageV2.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.continuous;
-
-import java.util.UUID;
-import org.apache.ignite.internal.Order;
-import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
-import org.apache.ignite.internal.util.typedef.internal.S;
-
-/**
- *
- */
-public class StartRoutineDiscoveryMessageV2 extends AbstractContinuousMessage {
-    /** */
-    @Order(0)
-    StartRequestData startReqData;
-
-    /** */
-    public StartRoutineDiscoveryMessageV2() {}
-
-    /**
-     * @param routineId Routine id.
-     * @param startReqData Start request data.
-     */
-    StartRoutineDiscoveryMessageV2(UUID routineId, StartRequestData 
startReqData) {
-        super(routineId);
-
-        this.startReqData = startReqData;
-    }
-
-    /**
-     * @return Start request data.
-     */
-    public StartRequestData startRequestData() {
-        return startReqData;
-    }
-
-    /** {@inheritDoc} */
-    @Override public DiscoveryCustomMessage ackMessage() {
-        return null;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(StartRoutineDiscoveryMessageV2.class, this, 
"routineId", routineId());
-    }
-}
diff --git a/modules/core/src/main/resources/META-INF/classnames.properties 
b/modules/core/src/main/resources/META-INF/classnames.properties
index f6acd558984..b975cbde1df 100644
--- a/modules/core/src/main/resources/META-INF/classnames.properties
+++ b/modules/core/src/main/resources/META-INF/classnames.properties
@@ -1518,7 +1518,6 @@ 
org.apache.ignite.internal.processors.continuous.GridContinuousProcessor$LocalRo
 org.apache.ignite.internal.processors.continuous.StartRequestData
 
org.apache.ignite.internal.processors.continuous.StartRoutineAckDiscoveryMessage
 org.apache.ignite.internal.processors.continuous.StartRoutineDiscoveryMessage
-org.apache.ignite.internal.processors.continuous.StartRoutineDiscoveryMessageV2
 org.apache.ignite.internal.processors.continuous.StopRoutineAckDiscoveryMessage
 org.apache.ignite.internal.processors.continuous.StopRoutineDiscoveryMessage
 org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor$3
diff --git 
a/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java
index 6a7760ec558..0af9e2f2876 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java
@@ -40,7 +40,6 @@ import org.apache.ignite.IgniteMessaging;
 import org.apache.ignite.cluster.ClusterGroup;
 import org.apache.ignite.internal.DiscoverySpiTestListener;
 import 
org.apache.ignite.internal.processors.continuous.StartRoutineDiscoveryMessage;
-import 
org.apache.ignite.internal.processors.continuous.StartRoutineDiscoveryMessageV2;
 import 
org.apache.ignite.internal.processors.continuous.StopRoutineDiscoveryMessage;
 import org.apache.ignite.internal.util.GridConcurrentHashSet;
 import org.apache.ignite.internal.util.typedef.P2;
@@ -1051,7 +1050,7 @@ public class GridMessagingSelfTest extends 
GridCommonAbstractTest implements Ser
             }
         }, IllegalStateException.class, null);
 
-        lsnr.blockCustomEvent(StartRoutineDiscoveryMessage.class, 
StartRoutineDiscoveryMessageV2.class);
+        lsnr.blockCustomEvent(StartRoutineDiscoveryMessage.class);
 
         final String topic = "topic";
 
@@ -1149,7 +1148,7 @@ public class GridMessagingSelfTest extends 
GridCommonAbstractTest implements Ser
 
         discoSpi.setInternalListener(lsnr);
 
-        lsnr.blockCustomEvent(StartRoutineDiscoveryMessage.class, 
StartRoutineDiscoveryMessageV2.class);
+        lsnr.blockCustomEvent(StartRoutineDiscoveryMessage.class);
 
         final String topic = "topic";
 

Reply via email to