This is an automated email from the ASF dual-hosted git repository.
zstan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new be246366346 IGNITE-20861 P2P prevent to load already deployed class
from different node on SHARED mode (#11041)
be246366346 is described below
commit be2463663468876cc51f6757d5993c0399b13754
Author: Evgeniy Stanilovskiy <[email protected]>
AuthorDate: Sun Jul 28 13:38:40 2024 +0300
IGNITE-20861 P2P prevent to load already deployed class from different node
on SHARED mode (#11041)
---
.../ignite/internal/binary/BinaryMarshaller.java | 2 +-
.../deployment/GridDeploymentPerVersionStore.java | 25 +++++-
.../managers/deployment/P2PClassLoadingIssues.java | 8 ++
.../datastreamer/DataStreamerUpdateJob.java | 5 +-
.../jdk/JdkMarshallerObjectInputStream.java | 2 +-
...acheAtomicEntryProcessorDeploymentSelfTest.java | 97 ++++++++++++++++++++++
.../ignite/p2p/GridP2PSameClassLoaderSelfTest.java | 25 ++----
.../apache/ignite/p2p/SharedDeploymentTest.java | 2 +-
.../CacheDeploymentEntryProcessorMultipleEnts.java | 51 ++++++++++++
.../apache/ignite/tests/p2p/cache/Container.java | 2 +-
10 files changed, 193 insertions(+), 26 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryMarshaller.java
b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryMarshaller.java
index 7db76c1ddab..21251db8572 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryMarshaller.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryMarshaller.java
@@ -97,7 +97,7 @@ public class BinaryMarshaller extends
AbstractNodeNameAwareMarshaller {
}
/** {@inheritDoc} */
- @Override protected <T> T unmarshal0(byte[] bytes, @Nullable ClassLoader
clsLdr) throws IgniteCheckedException {
+ @Override protected <T> T unmarshal0(byte[] bytes, @Nullable ClassLoader
clsLdr) {
return impl.deserialize(bytes, clsLdr);
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentPerVersionStore.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentPerVersionStore.java
index 5c2e84112bc..7c6c3246fd5 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentPerVersionStore.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentPerVersionStore.java
@@ -329,6 +329,26 @@ public class GridDeploymentPerVersionStore extends
GridDeploymentStoreAdapter {
if (isDeadClassLoader(meta))
return null;
+ boolean skipSearchDeployment = false;
+
+ // Check already exist deployment.
+ if (meta.deploymentMode() == SHARED) {
+ Collection<GridDeployment> created = getDeployments();
+
+ for (GridDeployment dep0 : created) {
+ // hot redeploy from same node
+ if
(dep0.participants().containsKey(meta.senderNodeId()) || dep0.undeployed())
+ continue;
+
+ IgniteBiTuple<Class<?>, Throwable> cls =
dep0.deployedClass(meta.className(), meta.alias());
+
+ if (cls.getKey() != null && cls.getValue() == null) {
+
((SharedDeployment)dep0).addParticipant(meta.senderNodeId(),
meta.classLoaderId());
+ skipSearchDeployment = true;
+ }
+ }
+ }
+
if (!F.isEmpty(meta.participants())) {
Map<UUID, IgniteUuid> participants = new LinkedHashMap<>();
@@ -376,7 +396,8 @@ public class GridDeploymentPerVersionStore extends
GridDeploymentStoreAdapter {
return null;
}
- dep = (SharedDeployment)searchDeploymentCache(meta);
+ if (!skipSearchDeployment)
+ dep = (SharedDeployment)searchDeploymentCache(meta);
if (dep == null) {
List<SharedDeployment> deps =
cache.get(meta.userVersion());
@@ -1243,8 +1264,6 @@ public class GridDeploymentPerVersionStore extends
GridDeploymentStoreAdapter {
/** {@inheritDoc} */
@Override public void onDeployed(Class<?> cls) {
- assert !Thread.holdsLock(mux);
-
boolean isTask = isTask(cls);
String msg = (isTask ? "Task" : "Class") + " was deployed in
SHARED or CONTINUOUS mode: " + cls;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/P2PClassLoadingIssues.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/P2PClassLoadingIssues.java
index efdeb0ae2c1..ec9cc81d9cb 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/P2PClassLoadingIssues.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/P2PClassLoadingIssues.java
@@ -38,6 +38,14 @@ public class P2PClassLoadingIssues {
throw error;
}
+ /** Wraps specific exception.
+ *
+ * @param e Exception to be wrapped.
+ */
+ public static P2PClassNotFoundException
wrapWithP2PFailure(NoClassDefFoundError e) {
+ return new P2PClassNotFoundException("P2P class loading failed", e);
+ }
+
/**
* Returns @{code true} if the given Throwable is an error caused by a P2P
class-loading failure.
*
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerUpdateJob.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerUpdateJob.java
index ffe5b944d6f..b4a9bd0133b 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerUpdateJob.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerUpdateJob.java
@@ -21,7 +21,6 @@ import java.util.Collection;
import java.util.Map;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.managers.deployment.P2PClassLoadingIssues;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
@@ -33,6 +32,8 @@ import org.apache.ignite.plugin.security.SecurityPermission;
import org.apache.ignite.stream.StreamReceiver;
import org.jetbrains.annotations.Nullable;
+import static
org.apache.ignite.internal.managers.deployment.P2PClassLoadingIssues.wrapWithP2PFailure;
+
/**
* Job to put entries to cache on affinity node.
*/
@@ -146,7 +147,7 @@ class DataStreamerUpdateJob implements
GridPlainCallable<Object> {
return null;
}
catch (NoClassDefFoundError e) {
- return
P2PClassLoadingIssues.rethrowDisarmedP2PClassLoadingFailure(e);
+ throw wrapWithP2PFailure(e);
}
finally {
if (ignoreDepOwnership)
diff --git
a/modules/core/src/main/java/org/apache/ignite/marshaller/jdk/JdkMarshallerObjectInputStream.java
b/modules/core/src/main/java/org/apache/ignite/marshaller/jdk/JdkMarshallerObjectInputStream.java
index d9fdd3d2f1c..7e7c8f107e2 100644
---
a/modules/core/src/main/java/org/apache/ignite/marshaller/jdk/JdkMarshallerObjectInputStream.java
+++
b/modules/core/src/main/java/org/apache/ignite/marshaller/jdk/JdkMarshallerObjectInputStream.java
@@ -51,7 +51,7 @@ class JdkMarshallerObjectInputStream extends
ObjectInputStream {
}
/** {@inheritDoc} */
- @Override protected Class<?> resolveClass(ObjectStreamClass desc) throws
IOException, ClassNotFoundException {
+ @Override protected Class<?> resolveClass(ObjectStreamClass desc) throws
ClassNotFoundException {
// NOTE: DO NOT CHANGE TO 'clsLoader.loadClass()'
// Must have 'Class.forName()' instead of clsLoader.loadClass()
// due to weird ClassNotFoundExceptions for arrays of classes
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicEntryProcessorDeploymentSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicEntryProcessorDeploymentSelfTest.java
index f1128573c3e..2fe83283a45 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicEntryProcessorDeploymentSelfTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicEntryProcessorDeploymentSelfTest.java
@@ -17,8 +17,10 @@
package org.apache.ignite.internal.processors.cache;
+import java.util.HashMap;
import java.util.Map;
import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
import javax.cache.processor.EntryProcessorResult;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CacheAtomicityMode;
@@ -27,12 +29,16 @@ import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DeploymentMode;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.config.GridTestProperties;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
import static org.apache.ignite.cache.CacheRebalanceMode.SYNC;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
@@ -116,6 +122,7 @@ public class
GridCacheAtomicEntryProcessorDeploymentSelfTest extends GridCommonA
depMode = DeploymentMode.SHARED;
doTestInvoke();
+ doTestInvokeEx();
}
/**
@@ -168,6 +175,96 @@ public class
GridCacheAtomicEntryProcessorDeploymentSelfTest extends GridCommonA
}
}
+ /**
+ * Scenario: 2 different client nodes invoke entry processors on
intersected collection of keys.
+ * @throws Exception
+ */
+ private void doTestInvokeEx() throws Exception {
+ String testCacheName = "dynamic_params";
+
+ String prcClsName =
"org.apache.ignite.tests.p2p.CacheDeploymentEntryProcessorMultipleEnts";
+
+ String contClsName = "org.apache.ignite.tests.p2p.cache.Container";
+
+ try {
+ startGrid(0);
+ IgniteEx cli1 = startClientGrid(1);
+ IgniteEx cli2 = startClientGrid(2);
+
+ Class procCls1 =
cli1.configuration().getClassLoader().loadClass(prcClsName);
+ Class procCls2 =
cli2.configuration().getClassLoader().loadClass(prcClsName);
+
+ Class contCls1 =
cli1.configuration().getClassLoader().loadClass(contClsName);
+ Class contCls2 =
cli2.configuration().getClassLoader().loadClass(contClsName);
+
+ // just one more additional class unavailability check.
+ try {
+ Class.forName(TEST_VALUE);
+ fail();
+ }
+ catch (ClassNotFoundException e) {
+ // No op.
+ }
+
+ Class<?> cacheValClazz =
grid(2).configuration().getClassLoader().loadClass(TEST_VALUE);
+ Object cacheVal = cacheValClazz.newInstance();
+
+ CacheConfiguration<Long, Object> ccfg = new CacheConfiguration<>();
+ ccfg.setCacheMode(REPLICATED);
+ ccfg.setAtomicityMode(ATOMIC);
+ ccfg.setName(testCacheName);
+
+ IgniteCache<Long, Object> processedCache = cli1.createCache(ccfg);
+
+ Map<Long, Object> map = new HashMap<>();
+ for (long i = 0; i < 100; i++) {
+ map.put(i, cacheVal);
+ }
+
+ processedCache.putAll(map);
+
+ IgniteCache<Object, Object> cache1 = cli1.cache(testCacheName);
+ IgniteCache<Object, Object> cache2 = cli2.cache(testCacheName);
+
+ Object cont1 =
contCls1.getDeclaredConstructor(Object.class).newInstance(map);
+ Object cont2 =
contCls2.getDeclaredConstructor(Object.class).newInstance(map);
+
+ for (int i = 0; i < 10; ++i) {
+ IgniteCache<Object, Object> procCache1 = cache1;
+ IgniteInternalFuture<Object> f1 = GridTestUtils.runAsync(() ->
{
+ for (long key = 0; key < 10; key++) {
+ procCache1.invoke(key,
+
(CacheEntryProcessor)procCls1.getDeclaredConstructor(Object.class).newInstance(cont1));
+ }
+ });
+
+ IgniteCache<Object, Object> procCache2 = cache2;
+ IgniteInternalFuture<Object> f2 = GridTestUtils.runAsync(() ->
{
+ for (long key = 10; key > 0; key--) {
+ procCache2.invoke(key,
+
(CacheEntryProcessor)procCls2.getDeclaredConstructor(Object.class).newInstance(cont2));
+ };
+ });
+
+ long duration = TimeUnit.SECONDS.toMillis(30);
+
+ f1.get(duration);
+ f2.get(duration);
+
+ stopAllClients(true);
+
+ cli1 = startClientGrid(1);
+ cli2 = startClientGrid(2);
+
+ cache1 = cli1.cache(testCacheName);
+ cache2 = cli2.cache(testCacheName);
+ }
+ }
+ finally {
+ stopAllGrids();
+ }
+ }
+
/**
* @throws Exception In case of error.
*/
diff --git
a/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PSameClassLoaderSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PSameClassLoaderSelfTest.java
index 504b4d6e206..5000fe323a0 100644
---
a/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PSameClassLoaderSelfTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PSameClassLoaderSelfTest.java
@@ -20,6 +20,7 @@ package org.apache.ignite.p2p;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLClassLoader;
+import java.util.UUID;
import org.apache.ignite.Ignite;
import org.apache.ignite.configuration.DeploymentMode;
import org.apache.ignite.configuration.IgniteConfiguration;
@@ -93,16 +94,18 @@ public class GridP2PSameClassLoaderSelfTest extends
GridCommonAbstractTest {
Class task1 = CLASS_LOADER.loadClass(TEST_TASK1_NAME);
Class task2 = CLASS_LOADER.loadClass(TEST_TASK2_NAME);
+ UUID id = ignite2.cluster().localNode().id();
+
// Execute task1 and task2 from node1 on node2 and make sure that
they reuse same class loader on node2.
- Integer res1 = (Integer)ignite1.compute().execute(task1,
ignite2.cluster().localNode().id());
- Integer res2 = (Integer)ignite1.compute().execute(task2,
ignite2.cluster().localNode().id());
+ Integer res1 = (Integer)ignite1.compute().execute(task1, id);
+ Integer res2 = (Integer)ignite1.compute().execute(task2, id);
assert res1.equals(res2); // Class loaders are same
- Integer res3 = (Integer)ignite3.compute().execute(task1,
ignite2.cluster().localNode().id());
- Integer res4 = (Integer)ignite3.compute().execute(task2,
ignite2.cluster().localNode().id());
+ Integer res3 = (Integer)ignite3.compute().execute(task1, id);
+ Integer res4 = (Integer)ignite3.compute().execute(task2, id);
- assert res3.equals(res4);
+ assertEquals(res3, res4);
}
finally {
stopGrid(1);
@@ -147,18 +150,6 @@ public class GridP2PSameClassLoaderSelfTest extends
GridCommonAbstractTest {
processTest();
}
- /**
- * Test GridDeploymentMode.SHARED mode.
- *
- * @throws Exception if error occur.
- */
- @Test
- public void testSharedMode() throws Exception {
- depMode = DeploymentMode.SHARED;
-
- processTest();
- }
-
/**
* Return true if and only if all elements of array are different.
*
diff --git
a/modules/core/src/test/java/org/apache/ignite/p2p/SharedDeploymentTest.java
b/modules/core/src/test/java/org/apache/ignite/p2p/SharedDeploymentTest.java
index 0d4b6ccb903..51966fa93a9 100644
--- a/modules/core/src/test/java/org/apache/ignite/p2p/SharedDeploymentTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/p2p/SharedDeploymentTest.java
@@ -130,7 +130,7 @@ public class SharedDeploymentTest extends
GridCommonAbstractTest {
new
URL(GridTestProperties.getProperty("p2p.uri.cls.second"))}), ignite3, 10_000);
for (Object o: res)
- assertEquals(o, 43);
+ assertEquals(43, o);
}
finally {
stopAllGrids();
diff --git
a/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentEntryProcessorMultipleEnts.java
b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentEntryProcessorMultipleEnts.java
new file mode 100644
index 00000000000..bdb16113bb0
--- /dev/null
+++
b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentEntryProcessorMultipleEnts.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tests.p2p;
+
+import java.util.Map;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.MutableEntry;
+import org.apache.ignite.cache.CacheEntryProcessor;
+import org.apache.ignite.tests.p2p.cache.Container;
+
+/**
+ * Entry processor for p2p tests.
+ */
+public class CacheDeploymentEntryProcessorMultipleEnts implements
CacheEntryProcessor<String, CacheDeploymentTestValue, Boolean> {
+ /** */
+ private Map<Long, CacheDeploymentTestValue> entToProcess;
+
+ /** */
+ public CacheDeploymentEntryProcessorMultipleEnts(Object container) {
+ entToProcess = (Map<Long,
CacheDeploymentTestValue>)((Container)container).field;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Boolean process(MutableEntry<String,
CacheDeploymentTestValue> entry,
+ Object... arguments) throws EntryProcessorException {
+ boolean pr = false;
+
+ for (CacheDeploymentTestValue ent : entToProcess.values()) {
+ CacheDeploymentTestValue key = ent;
+ pr = key != null;
+ }
+ CacheDeploymentTestValue val = entry.getValue();
+
+ return pr;
+ }
+}
diff --git
a/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/cache/Container.java
b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/cache/Container.java
index fff36123f27..625890dcc6a 100644
---
a/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/cache/Container.java
+++
b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/cache/Container.java
@@ -20,7 +20,7 @@ package org.apache.ignite.tests.p2p.cache;
/** */
public class Container {
/** */
- private Object field;
+ public Object field;
/** */
public Container(Object field) {