Repository: ignite Updated Branches: refs/heads/ignite-2.4 d753298b4 -> 63445893f
IGNITE-3935 Use PeerDeployAware for streamer transformer - Fixes #3378. Signed-off-by: Alexey Goncharuk <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/63445893 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/63445893 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/63445893 Branch: refs/heads/ignite-2.4 Commit: 63445893f1bc75dc9777184499f7eabc1d4e51b1 Parents: d753298 Author: Denis Mekhanikov <[email protected]> Authored: Thu Jan 18 11:36:18 2018 +0300 Committer: Alexey Goncharuk <[email protected]> Committed: Thu Jan 18 11:42:07 2018 +0300 ---------------------------------------------------------------------- .../apache/ignite/IgniteSystemProperties.java | 15 +++ .../apache/ignite/stream/StreamTransformer.java | 60 ++++++++- .../ignite/p2p/P2PStreamingClassLoaderTest.java | 121 +++++++++++++++++++ .../testsuites/IgniteP2PSelfTestSuite.java | 2 + .../tests/p2p/NoopCacheEntryProcessor.java | 36 ++++++ 5 files changed, 229 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/63445893/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java index 1bbaacd..9d520fb 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java @@ -23,11 +23,13 @@ import java.util.Iterator; import java.util.Map; import java.util.Properties; import javax.net.ssl.HostnameVerifier; +import org.apache.ignite.cache.CacheEntryProcessor; import org.apache.ignite.cluster.ClusterGroup; import org.apache.ignite.configuration.DataStorageConfiguration; import org.apache.ignite.internal.client.GridClient; import org.apache.ignite.internal.marshaller.optimized.OptimizedMarshaller; import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.stream.StreamTransformer; import org.jetbrains.annotations.Nullable; /** @@ -567,6 +569,19 @@ public final class IgniteSystemProperties { public static final String IGNITE_SERVICES_COMPATIBILITY_MODE = "IGNITE_SERVICES_COMPATIBILITY_MODE"; /** + * Manages backward compatibility of {@link StreamTransformer#from(CacheEntryProcessor)} method. + * <p> + * If the property is {@code true}, then the wrapped {@link CacheEntryProcessor} won't be able to be loaded over + * P2P class loading. + * <p> + * If the property is {@code false}, then another implementation of {@link StreamTransformer} will be returned, + * that fixes P2P class loading for {@link CacheEntryProcessor}, but it will be incompatible with old versions + * of Ignite. + */ + public static final String IGNITE_STREAM_TRANSFORMER_COMPATIBILITY_MODE = + "IGNITE_STREAM_TRANSFORMER_COMPATIBILITY_MODE"; + + /** * When set to {@code true} tree-based data structures - {@code TreeMap} and {@code TreeSet} - will not be * wrapped into special holders introduced to overcome serialization issue caused by missing {@code Comparable} * interface on {@code BinaryObject}. http://git-wip-us.apache.org/repos/asf/ignite/blob/63445893/modules/core/src/main/java/org/apache/ignite/stream/StreamTransformer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/stream/StreamTransformer.java b/modules/core/src/main/java/org/apache/ignite/stream/StreamTransformer.java index 176973e..9951c7c 100644 --- a/modules/core/src/main/java/org/apache/ignite/stream/StreamTransformer.java +++ b/modules/core/src/main/java/org/apache/ignite/stream/StreamTransformer.java @@ -24,7 +24,10 @@ import javax.cache.processor.EntryProcessorException; import javax.cache.processor.MutableEntry; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.cache.CacheEntryProcessor; +import org.apache.ignite.internal.util.lang.GridPeerDeployAware; +import org.apache.ignite.internal.util.typedef.internal.U; /** * Convenience adapter to transform update existing values in streaming cache @@ -39,6 +42,10 @@ public abstract class StreamTransformer<K, V> implements StreamReceiver<K, V>, E /** */ private static final long serialVersionUID = 0L; + /** Compatibility mode flag. */ + private static final boolean compatibilityMode = + IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_STREAM_TRANSFORMER_COMPATIBILITY_MODE); + /** {@inheritDoc} */ @Override public void receive(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) throws IgniteException { for (Map.Entry<K, V> entry : entries) @@ -52,10 +59,53 @@ public abstract class StreamTransformer<K, V> implements StreamReceiver<K, V>, E * @return Stream transformer. */ public static <K, V> StreamTransformer<K, V> from(final CacheEntryProcessor<K, V, Object> ep) { - return new StreamTransformer<K, V>() { - @Override public Object process(MutableEntry<K, V> entry, Object... args) throws EntryProcessorException { - return ep.process(entry, args); - } - }; + if (compatibilityMode) + return new StreamTransformer<K, V>() { + @Override public Object process(MutableEntry<K, V> entry, Object... args) throws EntryProcessorException { + return ep.process(entry, args); + } + }; + else + return new EntryProcessorWrapper<>(ep); + } + + /** + * @param <K> Key type. + * @param <V> Value type. + */ + private static class EntryProcessorWrapper<K, V> extends StreamTransformer<K,V> implements GridPeerDeployAware { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private CacheEntryProcessor<K, V, Object> ep; + + /** */ + private transient ClassLoader ldr; + + /** + * @param ep Entry processor. + */ + EntryProcessorWrapper(CacheEntryProcessor<K, V, Object> ep) { + this.ep = ep; + } + + /** {@inheritDoc} */ + @Override public Object process(MutableEntry<K, V> entry, Object... args) throws EntryProcessorException { + return ep.process(entry, args); + } + + /** {@inheritDoc} */ + @Override public Class<?> deployClass() { + return ep.getClass(); + } + + /** {@inheritDoc} */ + @Override public ClassLoader classLoader() { + if (ldr == null) + ldr = U.detectClassLoader(deployClass()); + + return ldr; + } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/63445893/modules/core/src/test/java/org/apache/ignite/p2p/P2PStreamingClassLoaderTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/p2p/P2PStreamingClassLoaderTest.java b/modules/core/src/test/java/org/apache/ignite/p2p/P2PStreamingClassLoaderTest.java new file mode 100644 index 0000000..a541813 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/p2p/P2PStreamingClassLoaderTest.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.p2p; + +import java.lang.reflect.Constructor; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.cache.CacheEntryProcessor; +import org.apache.ignite.configuration.DeploymentMode; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.stream.StreamTransformer; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.testframework.junits.common.GridCommonTest; + +/** */ +@GridCommonTest(group = "P2P") +public class P2PStreamingClassLoaderTest extends GridCommonAbstractTest { + /** */ + private static final String ENTRY_PROCESSOR_CLASS_NAME = "org.apache.ignite.tests.p2p.NoopCacheEntryProcessor"; + + /** */ + private static final String CACHE_NAME = "cache"; + + /** + * Current deployment mode. Used in {@link #getConfiguration(String)}. + */ + private DeploymentMode depMode; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + if (igniteInstanceName.startsWith("client")) + cfg.setClientMode(true); + + cfg.setDeploymentMode(depMode); + + return cfg; + } + + /** + * @throws Exception if error occur + */ + @SuppressWarnings("unchecked") + private void processTest() throws Exception { + try { + startGrid("server"); + Ignite client = startGrid("client"); + + ClassLoader ldr = getExternalClassLoader(); + + Class<?> epCls = ldr.loadClass(ENTRY_PROCESSOR_CLASS_NAME); + + Constructor<?> epCtr = epCls.getConstructor(); + + CacheEntryProcessor ep = (CacheEntryProcessor)epCtr.newInstance(); + + IgniteCache<Integer, String> cache = client.createCache(CACHE_NAME); + + try (IgniteDataStreamer<Integer, String> streamer = client.dataStreamer(CACHE_NAME)) { + streamer.receiver(StreamTransformer.from(ep)); + + streamer.addData(1, "1"); + } + + assertEquals("1", cache.get(1)); + } + finally { + stopAllGrids(); + } + } + + /** + * Test GridDeploymentMode.PRIVATE mode. + * + * @throws Exception if error occur. + */ + public void testPrivateMode() throws Exception { + depMode = DeploymentMode.PRIVATE; + + processTest(); + } + + /** + * Test {@link DeploymentMode#CONTINUOUS} mode. + * + * @throws Exception if error occur. + */ + public void testContinuousMode() throws Exception { + depMode = DeploymentMode.CONTINUOUS; + + processTest(); + } + + /** + * Test GridDeploymentMode.SHARED mode. + * + * @throws Exception if error occur. + */ + public void testSharedMode() throws Exception { + depMode = DeploymentMode.SHARED; + + processTest(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/63445893/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteP2PSelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteP2PSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteP2PSelfTestSuite.java index 3c50baf..524283b 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteP2PSelfTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteP2PSelfTestSuite.java @@ -35,6 +35,7 @@ import org.apache.ignite.p2p.GridP2PRemoteClassLoadersSelfTest; import org.apache.ignite.p2p.GridP2PSameClassLoaderSelfTest; import org.apache.ignite.p2p.GridP2PTimeoutSelfTest; import org.apache.ignite.p2p.GridP2PUndeploySelfTest; +import org.apache.ignite.p2p.P2PStreamingClassLoaderTest; import org.apache.ignite.p2p.SharedDeploymentTest; import org.apache.ignite.testframework.GridTestUtils; @@ -73,6 +74,7 @@ public class IgniteP2PSelfTestSuite extends TestSuite { suite.addTest(new TestSuite(GridP2PMissedResourceCacheSizeSelfTest.class)); suite.addTest(new TestSuite(GridP2PContinuousDeploymentSelfTest.class)); suite.addTest(new TestSuite(DeploymentClassLoaderCallableTest.class)); + suite.addTest(new TestSuite(P2PStreamingClassLoaderTest.class)); suite.addTest(new TestSuite(SharedDeploymentTest.class)); GridTestUtils.addTestIfNeeded(suite, GridDeploymentMessageCountSelfTest.class, ignoredTests); http://git-wip-us.apache.org/repos/asf/ignite/blob/63445893/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/NoopCacheEntryProcessor.java ---------------------------------------------------------------------- diff --git a/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/NoopCacheEntryProcessor.java b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/NoopCacheEntryProcessor.java new file mode 100644 index 0000000..c473d2f --- /dev/null +++ b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/NoopCacheEntryProcessor.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.tests.p2p; + +import javax.cache.processor.EntryProcessorException; +import javax.cache.processor.MutableEntry; +import org.apache.ignite.cache.CacheEntryProcessor; + +/** + * @param <K> Key type. + * @param <V> Value type. + */ +@SuppressWarnings("unchecked") +public class NoopCacheEntryProcessor<K, V> implements CacheEntryProcessor<K, V, Object> { + /** {@inheritDoc} */ + @Override public Object process(MutableEntry e, Object... args) throws EntryProcessorException { + e.setValue(args[0]); + + return null; + } +}
