This is an automated email from the ASF dual-hosted git repository.
shishkovilja pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 1fa94e8c937 IGNITE-28477 Merge StartRoutineDiscoveryMessageV2 into
StartRoutineDiscoveryMessage (#13100)
1fa94e8c937 is described below
commit 1fa94e8c9374cb0eeccd085a0e907d9e73a9dd2e
Author: Aleksandr Chesnokov <[email protected]>
AuthorDate: Wed May 20 11:18:28 2026 +0300
IGNITE-28477 Merge StartRoutineDiscoveryMessageV2 into
StartRoutineDiscoveryMessage (#13100)
---
.../ignite/internal/CoreMessagesProvider.java | 4 +-
.../continuous/GridContinuousProcessor.java | 30 +++--------
.../continuous/StartRoutineDiscoveryMessage.java | 26 +++++++--
.../continuous/StartRoutineDiscoveryMessageV2.java | 62 ----------------------
.../main/resources/META-INF/classnames.properties | 1 -
.../ignite/messaging/GridMessagingSelfTest.java | 5 +-
6 files changed, 34 insertions(+), 94 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java
b/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java
index 460cb7bcc10..28777790e3d 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java
@@ -189,7 +189,6 @@ import
org.apache.ignite.internal.processors.continuous.GridContinuousMessage;
import org.apache.ignite.internal.processors.continuous.StartRequestData;
import
org.apache.ignite.internal.processors.continuous.StartRoutineAckDiscoveryMessage;
import
org.apache.ignite.internal.processors.continuous.StartRoutineDiscoveryMessage;
-import
org.apache.ignite.internal.processors.continuous.StartRoutineDiscoveryMessageV2;
import
org.apache.ignite.internal.processors.continuous.StopRoutineAckDiscoveryMessage;
import
org.apache.ignite.internal.processors.continuous.StopRoutineDiscoveryMessage;
import org.apache.ignite.internal.processors.datastreamer.DataStreamerEntry;
@@ -519,9 +518,8 @@ public class CoreMessagesProvider extends
AbstractMarshallableMessageFactoryProv
withNoSchema(LatchAckMessage.class);
withSchema(AtomicApplicationAttributesAwareRequest.class);
withNoSchema(StartRequestData.class);
- withNoSchema(StartRoutineDiscoveryMessage.class);
withNoSchema(StartRoutineAckDiscoveryMessage.class);
- withNoSchema(StartRoutineDiscoveryMessageV2.class);
+ withNoSchema(StartRoutineDiscoveryMessage.class);
withNoSchema(StoredCacheData.class);
// [10600-10800]: Affinity & partition maps.
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index 30bc8175de3..489f13e71fe 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -72,6 +72,7 @@ import
org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheProcessor;
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap;
import
org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandler;
+import
org.apache.ignite.internal.processors.continuous.StartRoutineDiscoveryMessage.Mode;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
import org.apache.ignite.internal.systemview.ContinuousQueryViewWalker;
import org.apache.ignite.internal.thread.OomExceptionHandler;
@@ -211,26 +212,13 @@ public class GridContinuousProcessor extends
GridProcessorAdapter {
@Override public void onCustomEvent(AffinityTopologyVersion
topVer,
ClusterNode snd,
StartRoutineDiscoveryMessage msg) {
- assert !immutableDiscoCustomMsg;
-
if (ctx.isStopping())
return;
- processStartRequestMutable(snd, msg);
- }
- });
-
-
ctx.discovery().setCustomEventListener(StartRoutineDiscoveryMessageV2.class,
- new CustomEventListener<StartRoutineDiscoveryMessageV2>() {
- @Override public void onCustomEvent(AffinityTopologyVersion
topVer,
- ClusterNode snd,
- StartRoutineDiscoveryMessageV2 msg) {
- assert immutableDiscoCustomMsg;
-
- if (ctx.isStopping())
- return;
-
- processStartRequestImmutable(topVer, snd, msg);
+ if (immutableDiscoCustomMsg)
+ processStartRequestImmutable(topVer, snd, msg);
+ else
+ processStartRequestMutable(snd, msg);
}
});
@@ -991,9 +979,7 @@ public class GridContinuousProcessor extends
GridProcessorAdapter {
reqData.prepareMarshal(ctx);
if (!immutableDiscoCustomMsg) {
- StartRoutineDiscoveryMessage msg = new
StartRoutineDiscoveryMessage(
- routineId,
- reqData);
+ StartRoutineDiscoveryMessage msg = new
StartRoutineDiscoveryMessage(routineId, reqData, Mode.MUTABLE);
if (hnd.updateCounters() != null)
msg.addUpdateCounters(ctx.localNodeId(), hnd.updateCounters());
@@ -1001,7 +987,7 @@ public class GridContinuousProcessor extends
GridProcessorAdapter {
return msg;
}
else
- return new StartRoutineDiscoveryMessageV2(routineId, reqData);
+ return new StartRoutineDiscoveryMessage(routineId, reqData,
Mode.IMMUTABLE);
}
/**
@@ -1467,7 +1453,7 @@ public class GridContinuousProcessor extends
GridProcessorAdapter {
*/
private void processStartRequestImmutable(final AffinityTopologyVersion
topVer,
final ClusterNode snd,
- final StartRoutineDiscoveryMessageV2 msg) {
+ final StartRoutineDiscoveryMessage msg) {
StartRequestData reqData = msg.startRequestData();
ContinuousRoutineInfo routineInfo = new ContinuousRoutineInfo(snd.id(),
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java
index af18f362fe0..ddf9e63f6c4 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java
@@ -26,15 +26,26 @@ import
org.apache.ignite.internal.managers.communication.ErrorMessage;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.util.typedef.internal.S;
+import static
org.apache.ignite.internal.processors.continuous.StartRoutineDiscoveryMessage.Mode.MUTABLE;
+
/**
* Discovery message used for Continuous Query registration.
*/
public class StartRoutineDiscoveryMessage extends AbstractContinuousMessage {
+ /** Discovery message mode. */
+ enum Mode {
+ /** Mutable discovery mode. */
+ MUTABLE,
+
+ /** Immutable discovery mode. */
+ IMMUTABLE
+ }
+
/** */
@Order(0)
StartRequestData startReqData;
- /** */
+ /** Errors collected by mutable discovery. */
@Order(1)
Map<UUID, ErrorMessage> errs = new HashMap<>();
@@ -46,14 +57,20 @@ public class StartRoutineDiscoveryMessage extends
AbstractContinuousMessage {
@Order(3)
Map<UUID, Map<Integer, Long>> updateCntrsPerNode;
+ /** Discovery message mode. */
+ @Order(4)
+ Mode mode;
+
/**
* @param routineId Routine id.
* @param startReqData Start request data.
+ * @param mode Discovery message mode.
*/
- public StartRoutineDiscoveryMessage(UUID routineId, StartRequestData
startReqData) {
+ StartRoutineDiscoveryMessage(UUID routineId, StartRequestData
startReqData, Mode mode) {
super(routineId);
this.startReqData = startReqData;
+ this.mode = mode;
}
/** */
@@ -110,11 +127,14 @@ public class StartRoutineDiscoveryMessage extends
AbstractContinuousMessage {
/** {@inheritDoc} */
@Override public boolean isMutable() {
- return true;
+ return mode == MUTABLE;
}
/** {@inheritDoc} */
@Override public DiscoveryCustomMessage ackMessage() {
+ if (!isMutable())
+ return null;
+
return new StartRoutineAckDiscoveryMessage(routineId, errs,
updateCntrs, updateCntrsPerNode);
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessageV2.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessageV2.java
deleted file mode 100644
index 15b57836528..00000000000
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessageV2.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.continuous;
-
-import java.util.UUID;
-import org.apache.ignite.internal.Order;
-import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
-import org.apache.ignite.internal.util.typedef.internal.S;
-
-/**
- *
- */
-public class StartRoutineDiscoveryMessageV2 extends AbstractContinuousMessage {
- /** */
- @Order(0)
- StartRequestData startReqData;
-
- /** */
- public StartRoutineDiscoveryMessageV2() {}
-
- /**
- * @param routineId Routine id.
- * @param startReqData Start request data.
- */
- StartRoutineDiscoveryMessageV2(UUID routineId, StartRequestData
startReqData) {
- super(routineId);
-
- this.startReqData = startReqData;
- }
-
- /**
- * @return Start request data.
- */
- public StartRequestData startRequestData() {
- return startReqData;
- }
-
- /** {@inheritDoc} */
- @Override public DiscoveryCustomMessage ackMessage() {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(StartRoutineDiscoveryMessageV2.class, this,
"routineId", routineId());
- }
-}
diff --git a/modules/core/src/main/resources/META-INF/classnames.properties
b/modules/core/src/main/resources/META-INF/classnames.properties
index f6acd558984..b975cbde1df 100644
--- a/modules/core/src/main/resources/META-INF/classnames.properties
+++ b/modules/core/src/main/resources/META-INF/classnames.properties
@@ -1518,7 +1518,6 @@
org.apache.ignite.internal.processors.continuous.GridContinuousProcessor$LocalRo
org.apache.ignite.internal.processors.continuous.StartRequestData
org.apache.ignite.internal.processors.continuous.StartRoutineAckDiscoveryMessage
org.apache.ignite.internal.processors.continuous.StartRoutineDiscoveryMessage
-org.apache.ignite.internal.processors.continuous.StartRoutineDiscoveryMessageV2
org.apache.ignite.internal.processors.continuous.StopRoutineAckDiscoveryMessage
org.apache.ignite.internal.processors.continuous.StopRoutineDiscoveryMessage
org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor$3
diff --git
a/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java
index 6a7760ec558..0af9e2f2876 100644
---
a/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java
@@ -40,7 +40,6 @@ import org.apache.ignite.IgniteMessaging;
import org.apache.ignite.cluster.ClusterGroup;
import org.apache.ignite.internal.DiscoverySpiTestListener;
import
org.apache.ignite.internal.processors.continuous.StartRoutineDiscoveryMessage;
-import
org.apache.ignite.internal.processors.continuous.StartRoutineDiscoveryMessageV2;
import
org.apache.ignite.internal.processors.continuous.StopRoutineDiscoveryMessage;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.typedef.P2;
@@ -1051,7 +1050,7 @@ public class GridMessagingSelfTest extends
GridCommonAbstractTest implements Ser
}
}, IllegalStateException.class, null);
- lsnr.blockCustomEvent(StartRoutineDiscoveryMessage.class,
StartRoutineDiscoveryMessageV2.class);
+ lsnr.blockCustomEvent(StartRoutineDiscoveryMessage.class);
final String topic = "topic";
@@ -1149,7 +1148,7 @@ public class GridMessagingSelfTest extends
GridCommonAbstractTest implements Ser
discoSpi.setInternalListener(lsnr);
- lsnr.blockCustomEvent(StartRoutineDiscoveryMessage.class,
StartRoutineDiscoveryMessageV2.class);
+ lsnr.blockCustomEvent(StartRoutineDiscoveryMessage.class);
final String topic = "topic";