This is an automated email from the ASF dual-hosted git repository.

zstan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new be246366346 IGNITE-20861 P2P prevent to load already deployed class 
from different node on SHARED mode (#11041)
be246366346 is described below

commit be2463663468876cc51f6757d5993c0399b13754
Author: Evgeniy Stanilovskiy <[email protected]>
AuthorDate: Sun Jul 28 13:38:40 2024 +0300

    IGNITE-20861 P2P prevent to load already deployed class from different node 
on SHARED mode (#11041)
---
 .../ignite/internal/binary/BinaryMarshaller.java   |  2 +-
 .../deployment/GridDeploymentPerVersionStore.java  | 25 +++++-
 .../managers/deployment/P2PClassLoadingIssues.java |  8 ++
 .../datastreamer/DataStreamerUpdateJob.java        |  5 +-
 .../jdk/JdkMarshallerObjectInputStream.java        |  2 +-
 ...acheAtomicEntryProcessorDeploymentSelfTest.java | 97 ++++++++++++++++++++++
 .../ignite/p2p/GridP2PSameClassLoaderSelfTest.java | 25 ++----
 .../apache/ignite/p2p/SharedDeploymentTest.java    |  2 +-
 .../CacheDeploymentEntryProcessorMultipleEnts.java | 51 ++++++++++++
 .../apache/ignite/tests/p2p/cache/Container.java   |  2 +-
 10 files changed, 193 insertions(+), 26 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryMarshaller.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryMarshaller.java
index 7db76c1ddab..21251db8572 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryMarshaller.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryMarshaller.java
@@ -97,7 +97,7 @@ public class BinaryMarshaller extends 
AbstractNodeNameAwareMarshaller {
     }
 
     /** {@inheritDoc} */
-    @Override protected <T> T unmarshal0(byte[] bytes, @Nullable ClassLoader 
clsLdr) throws IgniteCheckedException {
+    @Override protected <T> T unmarshal0(byte[] bytes, @Nullable ClassLoader 
clsLdr) {
         return impl.deserialize(bytes, clsLdr);
     }
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentPerVersionStore.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentPerVersionStore.java
index 5c2e84112bc..7c6c3246fd5 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentPerVersionStore.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentPerVersionStore.java
@@ -329,6 +329,26 @@ public class GridDeploymentPerVersionStore extends 
GridDeploymentStoreAdapter {
                 if (isDeadClassLoader(meta))
                     return null;
 
+                boolean skipSearchDeployment = false;
+
+                // Check already exist deployment.
+                if (meta.deploymentMode() == SHARED) {
+                    Collection<GridDeployment> created = getDeployments();
+
+                    for (GridDeployment dep0 : created) {
+                        // hot redeploy from same node
+                        if 
(dep0.participants().containsKey(meta.senderNodeId()) || dep0.undeployed())
+                            continue;
+
+                        IgniteBiTuple<Class<?>, Throwable> cls = 
dep0.deployedClass(meta.className(), meta.alias());
+
+                        if (cls.getKey() != null && cls.getValue() == null) {
+                            
((SharedDeployment)dep0).addParticipant(meta.senderNodeId(), 
meta.classLoaderId());
+                            skipSearchDeployment = true;
+                        }
+                    }
+                }
+
                 if (!F.isEmpty(meta.participants())) {
                     Map<UUID, IgniteUuid> participants = new LinkedHashMap<>();
 
@@ -376,7 +396,8 @@ public class GridDeploymentPerVersionStore extends 
GridDeploymentStoreAdapter {
                     return null;
                 }
 
-                dep = (SharedDeployment)searchDeploymentCache(meta);
+                if (!skipSearchDeployment)
+                    dep = (SharedDeployment)searchDeploymentCache(meta);
 
                 if (dep == null) {
                     List<SharedDeployment> deps = 
cache.get(meta.userVersion());
@@ -1243,8 +1264,6 @@ public class GridDeploymentPerVersionStore extends 
GridDeploymentStoreAdapter {
 
         /** {@inheritDoc} */
         @Override public void onDeployed(Class<?> cls) {
-            assert !Thread.holdsLock(mux);
-
             boolean isTask = isTask(cls);
 
             String msg = (isTask ? "Task" : "Class") + " was deployed in 
SHARED or CONTINUOUS mode: " + cls;
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/P2PClassLoadingIssues.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/P2PClassLoadingIssues.java
index efdeb0ae2c1..ec9cc81d9cb 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/P2PClassLoadingIssues.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/P2PClassLoadingIssues.java
@@ -38,6 +38,14 @@ public class P2PClassLoadingIssues {
         throw error;
     }
 
+    /** Wraps specific exception.
+     *
+     * @param e Exception to be wrapped.
+     */
+    public static P2PClassNotFoundException 
wrapWithP2PFailure(NoClassDefFoundError e) {
+        return new P2PClassNotFoundException("P2P class loading failed", e);
+    }
+
     /**
      * Returns @{code true} if the given Throwable is an error caused by a P2P 
class-loading failure.
      *
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerUpdateJob.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerUpdateJob.java
index ffe5b944d6f..b4a9bd0133b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerUpdateJob.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerUpdateJob.java
@@ -21,7 +21,6 @@ import java.util.Collection;
 import java.util.Map;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.managers.deployment.P2PClassLoadingIssues;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
@@ -33,6 +32,8 @@ import org.apache.ignite.plugin.security.SecurityPermission;
 import org.apache.ignite.stream.StreamReceiver;
 import org.jetbrains.annotations.Nullable;
 
+import static 
org.apache.ignite.internal.managers.deployment.P2PClassLoadingIssues.wrapWithP2PFailure;
+
 /**
  * Job to put entries to cache on affinity node.
  */
@@ -146,7 +147,7 @@ class DataStreamerUpdateJob implements 
GridPlainCallable<Object> {
             return null;
         }
         catch (NoClassDefFoundError e) {
-            return 
P2PClassLoadingIssues.rethrowDisarmedP2PClassLoadingFailure(e);
+            throw wrapWithP2PFailure(e);
         }
         finally {
             if (ignoreDepOwnership)
diff --git 
a/modules/core/src/main/java/org/apache/ignite/marshaller/jdk/JdkMarshallerObjectInputStream.java
 
b/modules/core/src/main/java/org/apache/ignite/marshaller/jdk/JdkMarshallerObjectInputStream.java
index d9fdd3d2f1c..7e7c8f107e2 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/marshaller/jdk/JdkMarshallerObjectInputStream.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/marshaller/jdk/JdkMarshallerObjectInputStream.java
@@ -51,7 +51,7 @@ class JdkMarshallerObjectInputStream extends 
ObjectInputStream {
     }
 
     /** {@inheritDoc} */
-    @Override protected Class<?> resolveClass(ObjectStreamClass desc) throws 
IOException, ClassNotFoundException {
+    @Override protected Class<?> resolveClass(ObjectStreamClass desc) throws 
ClassNotFoundException {
         // NOTE: DO NOT CHANGE TO 'clsLoader.loadClass()'
         // Must have 'Class.forName()' instead of clsLoader.loadClass()
         // due to weird ClassNotFoundExceptions for arrays of classes
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicEntryProcessorDeploymentSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicEntryProcessorDeploymentSelfTest.java
index f1128573c3e..2fe83283a45 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicEntryProcessorDeploymentSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicEntryProcessorDeploymentSelfTest.java
@@ -17,8 +17,10 @@
 
 package org.apache.ignite.internal.processors.cache;
 
+import java.util.HashMap;
 import java.util.Map;
 import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
 import javax.cache.processor.EntryProcessorResult;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.CacheAtomicityMode;
@@ -27,12 +29,16 @@ import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.DeploymentMode;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.config.GridTestProperties;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.junit.Test;
 
 import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
 import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
 import static org.apache.ignite.cache.CacheRebalanceMode.SYNC;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
 
@@ -116,6 +122,7 @@ public class 
GridCacheAtomicEntryProcessorDeploymentSelfTest extends GridCommonA
         depMode = DeploymentMode.SHARED;
 
         doTestInvoke();
+        doTestInvokeEx();
     }
 
     /**
@@ -168,6 +175,96 @@ public class 
GridCacheAtomicEntryProcessorDeploymentSelfTest extends GridCommonA
         }
     }
 
+    /**
+     * Scenario: 2 different client nodes invoke entry processors on 
intersected collection of keys.
+     * @throws Exception
+     */
+    private void doTestInvokeEx() throws Exception {
+        String testCacheName = "dynamic_params";
+
+        String prcClsName = 
"org.apache.ignite.tests.p2p.CacheDeploymentEntryProcessorMultipleEnts";
+
+        String contClsName = "org.apache.ignite.tests.p2p.cache.Container";
+
+        try {
+            startGrid(0);
+            IgniteEx cli1 = startClientGrid(1);
+            IgniteEx cli2 = startClientGrid(2);
+
+            Class procCls1 = 
cli1.configuration().getClassLoader().loadClass(prcClsName);
+            Class procCls2 = 
cli2.configuration().getClassLoader().loadClass(prcClsName);
+
+            Class contCls1 = 
cli1.configuration().getClassLoader().loadClass(contClsName);
+            Class contCls2 = 
cli2.configuration().getClassLoader().loadClass(contClsName);
+
+            // just one more additional class unavailability check.
+            try {
+                Class.forName(TEST_VALUE);
+                fail();
+            }
+            catch (ClassNotFoundException e) {
+                // No op.
+            }
+
+            Class<?> cacheValClazz = 
grid(2).configuration().getClassLoader().loadClass(TEST_VALUE);
+            Object cacheVal = cacheValClazz.newInstance();
+
+            CacheConfiguration<Long, Object> ccfg = new CacheConfiguration<>();
+            ccfg.setCacheMode(REPLICATED);
+            ccfg.setAtomicityMode(ATOMIC);
+            ccfg.setName(testCacheName);
+
+            IgniteCache<Long, Object> processedCache = cli1.createCache(ccfg);
+
+            Map<Long, Object> map = new HashMap<>();
+            for (long i = 0; i < 100; i++) {
+                map.put(i, cacheVal);
+            }
+
+            processedCache.putAll(map);
+
+            IgniteCache<Object, Object> cache1 = cli1.cache(testCacheName);
+            IgniteCache<Object, Object> cache2 = cli2.cache(testCacheName);
+
+            Object cont1 = 
contCls1.getDeclaredConstructor(Object.class).newInstance(map);
+            Object cont2 = 
contCls2.getDeclaredConstructor(Object.class).newInstance(map);
+
+            for (int i = 0; i < 10; ++i) {
+                IgniteCache<Object, Object> procCache1 = cache1;
+                IgniteInternalFuture<Object> f1 = GridTestUtils.runAsync(() -> 
{
+                    for (long key = 0; key < 10; key++) {
+                        procCache1.invoke(key,
+                                
(CacheEntryProcessor)procCls1.getDeclaredConstructor(Object.class).newInstance(cont1));
+                    }
+                });
+
+                IgniteCache<Object, Object> procCache2 = cache2;
+                IgniteInternalFuture<Object> f2 = GridTestUtils.runAsync(() -> 
{
+                    for (long key = 10; key > 0; key--) {
+                        procCache2.invoke(key,
+                                
(CacheEntryProcessor)procCls2.getDeclaredConstructor(Object.class).newInstance(cont2));
+                    };
+                });
+
+                long duration = TimeUnit.SECONDS.toMillis(30);
+
+                f1.get(duration);
+                f2.get(duration);
+
+                stopAllClients(true);
+
+                cli1 = startClientGrid(1);
+                cli2 = startClientGrid(2);
+
+                cache1 = cli1.cache(testCacheName);
+                cache2 = cli2.cache(testCacheName);
+            }
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
     /**
      * @throws Exception In case of error.
      */
diff --git 
a/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PSameClassLoaderSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PSameClassLoaderSelfTest.java
index 504b4d6e206..5000fe323a0 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PSameClassLoaderSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PSameClassLoaderSelfTest.java
@@ -20,6 +20,7 @@ package org.apache.ignite.p2p;
 import java.net.MalformedURLException;
 import java.net.URL;
 import java.net.URLClassLoader;
+import java.util.UUID;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.configuration.DeploymentMode;
 import org.apache.ignite.configuration.IgniteConfiguration;
@@ -93,16 +94,18 @@ public class GridP2PSameClassLoaderSelfTest extends 
GridCommonAbstractTest {
             Class task1 = CLASS_LOADER.loadClass(TEST_TASK1_NAME);
             Class task2 = CLASS_LOADER.loadClass(TEST_TASK2_NAME);
 
+            UUID id = ignite2.cluster().localNode().id();
+
             // Execute task1 and task2 from node1 on node2 and make sure that 
they reuse same class loader on node2.
-            Integer res1 = (Integer)ignite1.compute().execute(task1, 
ignite2.cluster().localNode().id());
-            Integer res2 = (Integer)ignite1.compute().execute(task2, 
ignite2.cluster().localNode().id());
+            Integer res1 = (Integer)ignite1.compute().execute(task1, id);
+            Integer res2 = (Integer)ignite1.compute().execute(task2, id);
 
             assert res1.equals(res2); // Class loaders are same
 
-            Integer res3 = (Integer)ignite3.compute().execute(task1, 
ignite2.cluster().localNode().id());
-            Integer res4 = (Integer)ignite3.compute().execute(task2, 
ignite2.cluster().localNode().id());
+            Integer res3 = (Integer)ignite3.compute().execute(task1, id);
+            Integer res4 = (Integer)ignite3.compute().execute(task2, id);
 
-            assert res3.equals(res4);
+            assertEquals(res3, res4);
         }
         finally {
             stopGrid(1);
@@ -147,18 +150,6 @@ public class GridP2PSameClassLoaderSelfTest extends 
GridCommonAbstractTest {
         processTest();
     }
 
-    /**
-     * Test GridDeploymentMode.SHARED mode.
-     *
-     * @throws Exception if error occur.
-     */
-    @Test
-    public void testSharedMode() throws Exception {
-        depMode = DeploymentMode.SHARED;
-
-        processTest();
-    }
-
     /**
      * Return true if and only if all elements of array are different.
      *
diff --git 
a/modules/core/src/test/java/org/apache/ignite/p2p/SharedDeploymentTest.java 
b/modules/core/src/test/java/org/apache/ignite/p2p/SharedDeploymentTest.java
index 0d4b6ccb903..51966fa93a9 100644
--- a/modules/core/src/test/java/org/apache/ignite/p2p/SharedDeploymentTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/p2p/SharedDeploymentTest.java
@@ -130,7 +130,7 @@ public class SharedDeploymentTest extends 
GridCommonAbstractTest {
                 new 
URL(GridTestProperties.getProperty("p2p.uri.cls.second"))}), ignite3, 10_000);
 
             for (Object o: res)
-                assertEquals(o, 43);
+                assertEquals(43, o);
         }
         finally {
             stopAllGrids();
diff --git 
a/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentEntryProcessorMultipleEnts.java
 
b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentEntryProcessorMultipleEnts.java
new file mode 100644
index 00000000000..bdb16113bb0
--- /dev/null
+++ 
b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentEntryProcessorMultipleEnts.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tests.p2p;
+
+import java.util.Map;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.MutableEntry;
+import org.apache.ignite.cache.CacheEntryProcessor;
+import org.apache.ignite.tests.p2p.cache.Container;
+
+/**
+ * Entry processor for p2p tests.
+ */
+public class CacheDeploymentEntryProcessorMultipleEnts implements 
CacheEntryProcessor<String, CacheDeploymentTestValue, Boolean> {
+    /** */
+    private Map<Long, CacheDeploymentTestValue> entToProcess;
+
+    /** */
+    public CacheDeploymentEntryProcessorMultipleEnts(Object container) {
+        entToProcess = (Map<Long, 
CacheDeploymentTestValue>)((Container)container).field;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Boolean process(MutableEntry<String, 
CacheDeploymentTestValue> entry,
+            Object... arguments) throws EntryProcessorException {
+        boolean pr = false;
+
+        for (CacheDeploymentTestValue ent : entToProcess.values()) {
+            CacheDeploymentTestValue key = ent;
+            pr = key != null;
+        }
+        CacheDeploymentTestValue val = entry.getValue();
+
+        return pr;
+    }
+}
diff --git 
a/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/cache/Container.java
 
b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/cache/Container.java
index fff36123f27..625890dcc6a 100644
--- 
a/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/cache/Container.java
+++ 
b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/cache/Container.java
@@ -20,7 +20,7 @@ package org.apache.ignite.tests.p2p.cache;
 /** */
 public class Container {
     /** */
-    private Object field;
+    public Object field;
 
     /** */
     public Container(Object field) {

Reply via email to