Merge branch 'master' into ignite-1186
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/846e8e5d Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/846e8e5d Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/846e8e5d Branch: refs/heads/ignite-1186 Commit: 846e8e5d46fcf206369a6dacd2b56a45ac33b1cd Parents: 5455a9f 8917269 Author: nikolay_tikhonov <[email protected]> Authored: Tue Mar 1 13:41:48 2016 +0300 Committer: nikolay_tikhonov <[email protected]> Committed: Tue Mar 1 14:09:35 2016 +0300 ---------------------------------------------------------------------- .../internal/binary/BinaryClassDescriptor.java | 2 +- .../processors/cache/GridCacheProcessor.java | 20 + .../processors/cache/GridCacheUtils.java | 15 + .../cache/affinity/GridCacheAffinityImpl.java | 7 +- .../CacheContinuousQueryHandlerV2.java | 11 +- .../continuous/CacheContinuousQueryManager.java | 2 +- .../processors/query/GridQueryProcessor.java | 13 +- .../ignite/internal/visor/cache/VisorCache.java | 2 +- .../cache/VisorCacheAggregatedMetrics.java | 113 +- .../internal/visor/cache/VisorCacheMetrics.java | 88 +- .../cache/VisorCacheMetricsCollectorTask.java | 21 +- .../visor/cache/VisorCacheMetricsV2.java | 66 + .../internal/visor/cache/VisorCacheV2.java | 2 +- .../ignite/resources/JobContextResource.java | 4 +- .../org/apache/ignite/spi/IgniteSpiAdapter.java | 15 + .../ignite/spi/IgniteSpiConsistencyChecked.java | 8 + .../ignite/spi/discovery/tcp/ClientImpl.java | 6 +- .../ignite/spi/discovery/tcp/ServerImpl.java | 2 +- .../TcpDiscoveryMulticastIpFinder.java | 1 + .../spi/swapspace/file/FileSwapSpaceSpi.java | 2 + .../spi/swapspace/noop/NoopSwapSpaceSpi.java | 2 + .../cache/GridCacheAbstractFullApiSelfTest.java | 10 +- ...ridCacheSwapSpaceSpiConsistencySelfTest.java | 146 + .../IgniteCacheConfigVariationsFullApiTest.java | 5851 ++++++++++++++++++ ...acheContinuousQueryRandomOperationsTest.java | 51 + .../GridSwapSpaceSpiConsistencySelfTest.java | 131 + .../configvariations/CacheStartMode.java | 29 + .../configvariations/ConfigFactory.java | 39 + .../configvariations/ConfigParameter.java | 34 + .../configvariations/ConfigVariations.java | 346 ++ .../ConfigVariationsFactory.java | 197 + .../ConfigVariationsTestSuiteBuilder.java | 382 ++ .../IgniteConfigVariationsTestSuite.java | 50 + .../configvariations/Parameters.java | 377 ++ .../configvariations/VariationsIterator.java | 174 + .../configvariations/VariationsTestsConfig.java | 161 + .../testframework/junits/GridAbstractTest.java | 43 +- ...IgniteCacheConfigVariationsAbstractTest.java | 583 ++ .../IgniteConfigVariationsAbstractTest.java | 420 ++ .../ConfigVariationsTestSuiteBuilderTest.java | 112 + .../testframework/test/ParametersTest.java | 87 + .../test/VariationsIteratorTest.java | 156 + .../ignite/testsuites/IgniteBasicTestSuite.java | 8 + ...heBasicConfigVariationsFullApiTestSuite.java | 41 + .../testsuites/IgniteCacheTestSuite5.java | 2 + .../IgniteSpiSwapSpaceSelfTestSuite.java | 2 + .../processors/query/h2/IgniteH2Indexing.java | 104 +- .../cache/CacheQueryNewClientSelfTest.java | 108 + .../IgniteCacheQuerySelfTestSuite.java | 6 +- .../commands/cache/VisorCacheCommand.scala | 30 +- .../yardstick/IgniteBenchmarkArguments.java | 11 + .../org/apache/ignite/yardstick/IgniteNode.java | 2 + .../cache/IgniteCacheAbstractBenchmark.java | 54 + 53 files changed, 9994 insertions(+), 155 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/846e8e5d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java index 6fc2041,0000000..dbe2a46 mode 100644,000000..100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java @@@ -1,217 -1,0 +1,222 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.UUID; +import javax.cache.configuration.Factory; +import javax.cache.event.CacheEntryEventFilter; +import javax.cache.event.CacheEntryUpdatedListener; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager.JCacheQueryRemoteFilter; +import org.apache.ignite.internal.processors.continuous.GridContinuousHandler; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.jetbrains.annotations.Nullable; + +/** + * Continuous query handler V2 version. Contains {@link Factory} for remote listener. + */ +public class CacheContinuousQueryHandlerV2<K, V> extends CacheContinuousQueryHandler<K, V> { + /** */ + private static final long serialVersionUID = 0L; + + /** Remote filter factory. */ + private Factory<? extends CacheEntryEventFilter> rmtFilterFactory; + + /** Deployable object for filter factory. */ + private DeployableObject rmtFilterFactoryDep; + + /** Event types for JCache API. */ + private byte types = 0; + + /** */ + protected transient CacheEntryEventFilter filter; + + /** + * Required by {@link Externalizable}. + */ + public CacheContinuousQueryHandlerV2() { + // No-op. + } + + /** + * Constructor. + * + * @param cacheName Cache name. + * @param topic Topic for ordered messages. + * @param locLsnr Local listener. + * @param rmtFilterFactory Remote filter factory. + * @param internal Internal flag. + * @param notifyExisting Notify existing flag. + * @param oldValRequired Old value required flag. + * @param sync Synchronous flag. + * @param ignoreExpired Ignore expired events flag. + * @param skipPrimaryCheck Whether to skip primary check for REPLICATED cache. + * @param taskHash Task name hash code. + * @param locCache {@code True} if local cache. + * @param keepBinary Keep binary flag. + * @param types Event types. + */ + public CacheContinuousQueryHandlerV2( + String cacheName, + Object topic, + CacheEntryUpdatedListener<K, V> locLsnr, + Factory<? extends CacheEntryEventFilter<K, V>> rmtFilterFactory, + boolean internal, + boolean notifyExisting, + boolean oldValRequired, + boolean sync, + boolean ignoreExpired, + int taskHash, + boolean skipPrimaryCheck, + boolean locCache, + boolean keepBinary, + boolean ignoreClsNotFound, + @Nullable Byte types) { + super(cacheName, + topic, + locLsnr, + null, + internal, + notifyExisting, + oldValRequired, + sync, + ignoreExpired, + taskHash, + skipPrimaryCheck, + locCache, + keepBinary, + ignoreClsNotFound); + + assert rmtFilterFactory != null; + + this.rmtFilterFactory = rmtFilterFactory; + - if (types != null) ++ if (types != null) { ++ assert types != 0; ++ + this.types = types; ++ } + } + + /** {@inheritDoc} */ + @Override public CacheEntryEventFilter getEventFilter() { + if (filter == null) { + assert rmtFilterFactory != null; + ++ Factory<? extends CacheEntryEventFilter> factory = rmtFilterFactory; ++ + if (types != 0) - rmtFilterFactory = new JCacheRemoteQueryFactory(rmtFilterFactory, types); ++ factory = new JCacheRemoteQueryFactory(rmtFilterFactory, types); + - filter = rmtFilterFactory.create(); ++ filter = factory.create(); + } + + return filter; + } + + /** {@inheritDoc} */ + @Override public void p2pMarshal(GridKernalContext ctx) throws IgniteCheckedException { + super.p2pMarshal(ctx); + + if (rmtFilterFactory != null && !U.isGrid(rmtFilterFactory.getClass())) + rmtFilterFactoryDep = new DeployableObject(rmtFilterFactory, ctx); + } + + /** {@inheritDoc} */ + @Override public void p2pUnmarshal(UUID nodeId, GridKernalContext ctx) throws IgniteCheckedException { + super.p2pUnmarshal(nodeId, ctx); + + if (rmtFilterFactoryDep != null) + rmtFilterFactory = rmtFilterFactoryDep.unmarshal(nodeId, ctx); + } + + /** {@inheritDoc} */ + @Override public GridContinuousHandler clone() { + return super.clone(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(CacheContinuousQueryHandlerV2.class, this); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + super.writeExternal(out); + + boolean b = rmtFilterFactoryDep != null; + + out.writeBoolean(b); + + if (b) + out.writeObject(rmtFilterFactoryDep); + else + out.writeObject(rmtFilterFactory); + + out.writeByte(types); + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + super.readExternal(in); + + boolean b = in.readBoolean(); + + if (b) + rmtFilterFactoryDep = (DeployableObject)in.readObject(); + else + rmtFilterFactory = (Factory)in.readObject(); + + types = in.readByte(); + } + + /** + * + */ + private static class JCacheRemoteQueryFactory implements Factory<CacheEntryEventFilter> { + /** */ + private static final long serialVersionUID = 0L; + + /** Factory. */ + protected Factory<? extends CacheEntryEventFilter> impl; + + /** */ + private byte types; + + /** + * @param impl Factory. + * @param types Types. + */ + public JCacheRemoteQueryFactory(@Nullable Factory<? extends CacheEntryEventFilter> impl, byte types) { + this.impl = impl; + this.types = types; + } + + /** {@inheritDoc} */ + @Override public JCacheQueryRemoteFilter create() { + return new JCacheQueryRemoteFilter(impl != null ? impl.create() : null, types); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/846e8e5d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java index 2a05865,409c1da..bfe70f1 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java @@@ -608,235 -589,6 +608,235 @@@ public class CacheContinuousQueryManage keepBinary, ignoreClassNotFound); + return executeQuery0(locLsnr, + bufSize, + timeInterval, + autoUnsubscribe, + notifyExisting, + loc, + keepBinary, + hnd); + } + + + /** + * @param locLsnr Local listener. + * @param types JCache event types. + * @param bufSize Buffer size. + * @param timeInterval Time interval. + * @param autoUnsubscribe Auto unsubscribe flag. + * @param internal Internal flag. + * @param notifyExisting Notify existing flag. + * @param oldValRequired Old value required flag. + * @param sync Synchronous flag. + * @param ignoreExpired Ignore expired event flag. + * @param loc Local flag. + * @return Continuous routine ID. + * @throws IgniteCheckedException In case of error. + */ + private UUID executeJCacheQueryFactory(CacheEntryUpdatedListener locLsnr, + final Factory<CacheEntryEventFilter> rmtFilterFactory, + byte types, + int bufSize, + long timeInterval, + boolean autoUnsubscribe, + boolean internal, + boolean notifyExisting, + boolean oldValRequired, + boolean sync, + boolean ignoreExpired, + boolean loc, + final boolean keepBinary, + boolean ignoreClassNotFound) throws IgniteCheckedException + { + assert types != 0 : types; + + cctx.checkSecurity(SecurityPermission.CACHE_READ); + + int taskNameHash = !internal && cctx.kernalContext().security().enabled() ? + cctx.kernalContext().job().currentTaskNameHash() : 0; + + boolean skipPrimaryCheck = loc && cctx.config().getCacheMode() == CacheMode.REPLICATED && cctx.affinityNode(); + + boolean v2 = rmtFilterFactory != null && useV2Protocol(cctx.discovery().allNodes()); + + GridContinuousHandler hnd; + + if (v2) + hnd = new CacheContinuousQueryHandlerV2( + cctx.name(), + TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()), + locLsnr, + rmtFilterFactory, + internal, + notifyExisting, + oldValRequired, + sync, + ignoreExpired, + taskNameHash, + skipPrimaryCheck, + cctx.isLocal(), + keepBinary, + ignoreClassNotFound, + types); + else { + JCacheQueryRemoteFilter jCacheFilter; + + CacheEntryEventFilter filter = null; + + if (rmtFilterFactory != null) { + filter = rmtFilterFactory.create(); + + if (!(filter instanceof Serializable)) + throw new IgniteCheckedException("Topology has nodes of the old versions. In this case " + + "EntryEventFilter must implement java.io.Serializable interface. Filter: " + filter); + } + + jCacheFilter = new JCacheQueryRemoteFilter(filter, types); + + hnd = new CacheContinuousQueryHandler( + cctx.name(), + TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()), + locLsnr, + jCacheFilter, + internal, + notifyExisting, + oldValRequired, + sync, + ignoreExpired, + taskNameHash, + skipPrimaryCheck, + cctx.isLocal(), + keepBinary, + ignoreClassNotFound); + } + + return executeQuery0(locLsnr, + bufSize, + timeInterval, + autoUnsubscribe, + notifyExisting, + loc, + keepBinary, + hnd); + } + + /** + * @param locLsnr Local listener. + * @param bufSize Buffer size. + * @param timeInterval Time interval. + * @param autoUnsubscribe Auto unsubscribe flag. + * @param internal Internal flag. + * @param notifyExisting Notify existing flag. + * @param oldValRequired Old value required flag. + * @param sync Synchronous flag. + * @param ignoreExpired Ignore expired event flag. + * @param loc Local flag. + * @return Continuous routine ID. + * @throws IgniteCheckedException In case of error. + */ + private UUID executeQueryWithFilterFactory(CacheEntryUpdatedListener locLsnr, + final Factory<? extends CacheEntryEventFilter> rmtFilterFactory, + int bufSize, + long timeInterval, + boolean autoUnsubscribe, + boolean internal, + boolean notifyExisting, + boolean oldValRequired, + boolean sync, + boolean ignoreExpired, + boolean loc, + final boolean keepBinary, + boolean ignoreClassNotFound) throws IgniteCheckedException + { + cctx.checkSecurity(SecurityPermission.CACHE_READ); + + int taskNameHash = !internal && cctx.kernalContext().security().enabled() ? + cctx.kernalContext().job().currentTaskNameHash() : 0; + + boolean skipPrimaryCheck = loc && cctx.config().getCacheMode() == CacheMode.REPLICATED && cctx.affinityNode(); + + boolean v2 = rmtFilterFactory != null && useV2Protocol(cctx.discovery().allNodes()); + + GridContinuousHandler hnd; + + if (v2) + hnd = new CacheContinuousQueryHandlerV2( + cctx.name(), + TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()), + locLsnr, + rmtFilterFactory, + internal, + notifyExisting, + oldValRequired, + sync, + ignoreExpired, + taskNameHash, + skipPrimaryCheck, + cctx.isLocal(), + keepBinary, + ignoreClassNotFound, - (byte)0); ++ null); + else { + CacheEntryEventFilter fltr = null; + + if (rmtFilterFactory != null) { + fltr = rmtFilterFactory.create(); + + if (!(fltr instanceof CacheEntryEventSerializableFilter)) + throw new IgniteCheckedException("Topology has nodes of the old versions. In this case " + + "EntryEventFilter should implement org.apache.ignite.cache.CacheEntryEventSerializableFilter " + + "interface. Filter: " + fltr); + } + + hnd = new CacheContinuousQueryHandler( + cctx.name(), + TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()), + locLsnr, + (CacheEntryEventSerializableFilter)fltr, + internal, + notifyExisting, + oldValRequired, + sync, + ignoreExpired, + taskNameHash, + skipPrimaryCheck, + cctx.isLocal(), + keepBinary, + ignoreClassNotFound); + } + + return executeQuery0(locLsnr, + bufSize, + timeInterval, + autoUnsubscribe, + notifyExisting, + loc, + keepBinary, + hnd); + } + + /** + * @param locLsnr Local listener. + * @param bufSize Buffer size. + * @param timeInterval Time interval. + * @param autoUnsubscribe Auto unsubscribe flag. + * @param notifyExisting Notify existing flag. + * @param loc Local flag. + * @param keepBinary Keep binary. + * @param hnd Handler. + * @return Continuous routine ID. + * @throws IgniteCheckedException In case of error. + */ + private UUID executeQuery0(CacheEntryUpdatedListener locLsnr, + int bufSize, + long timeInterval, + boolean autoUnsubscribe, + boolean notifyExisting, + boolean loc, + final boolean keepBinary, + final GridContinuousHandler hnd) + throws IgniteCheckedException { IgnitePredicate<ClusterNode> pred = (loc || cctx.config().getCacheMode() == CacheMode.LOCAL) ? F.nodeForNodeId(cctx.localNodeId()) : F.<ClusterNode>alwaysTrue(); http://git-wip-us.apache.org/repos/asf/ignite/blob/846e8e5d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java ---------------------------------------------------------------------- diff --cc modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java index 23b9d85,62ed66f..c18cf35 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java @@@ -28,11 -28,11 +28,14 @@@ import java.util.SortedMap import java.util.TreeMap; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; ++import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import javax.cache.Cache; import javax.cache.configuration.Factory; import javax.cache.event.CacheEntryEvent; ++import javax.cache.event.CacheEntryEventFilter; ++import javax.cache.event.CacheEntryListenerException; import javax.cache.event.CacheEntryUpdatedListener; import javax.cache.integration.CacheLoaderException; import javax.cache.integration.CacheWriterException; @@@ -40,7 -40,7 +43,9 @@@ import javax.cache.processor.EntryProce import javax.cache.processor.MutableEntry; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; ++import org.apache.ignite.IgniteException; import org.apache.ignite.cache.CacheAtomicityMode; ++import org.apache.ignite.cache.CacheEntryEventSerializableFilter; import org.apache.ignite.cache.CacheMemoryMode; import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.affinity.Affinity; @@@ -55,6 -55,6 +60,7 @@@ import org.apache.ignite.internal.util. import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; ++import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.apache.ignite.transactions.Transaction; import org.apache.ignite.transactions.TransactionConcurrency; @@@ -132,6 -132,6 +138,51 @@@ public class CacheContinuousQueryRandom /** * @throws Exception If failed. */ ++ public void testFilterAndFactoryProvided() throws Exception { ++ CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, ++ 1, ++ ATOMIC, ++ ONHEAP_TIERED, ++ false); ++ ++ final IgniteCache<Object, Object> cache = grid(0).getOrCreateCache(ccfg); ++ ++ try { ++ final ContinuousQuery qry = new ContinuousQuery(); ++ ++ qry.setRemoteFilterFactory(new Factory<CacheEntryEventFilter>() { ++ @Override public CacheEntryEventFilter create() { ++ return null; ++ } ++ }); ++ ++ qry.setRemoteFilter(new CacheEntryEventSerializableFilter() { ++ @Override public boolean evaluate(CacheEntryEvent event) throws CacheEntryListenerException { ++ return false; ++ } ++ }); ++ ++ qry.setLocalListener(new CacheEntryUpdatedListener() { ++ @Override public void onUpdated(Iterable iterable) throws CacheEntryListenerException { ++ // No-op. ++ } ++ }); ++ ++ GridTestUtils.assertThrows(log, new Callable<Object>() { ++ @Override public Object call() throws Exception { ++ return cache.query(qry); ++ } ++ }, IgniteException.class, null); ++ ++ } ++ finally { ++ cache.destroy(); ++ } ++ } ++ ++ /** ++ * @throws Exception If failed. ++ */ public void testAtomicClient() throws Exception { CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 1, http://git-wip-us.apache.org/repos/asf/ignite/blob/846e8e5d/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java ----------------------------------------------------------------------
