IGNITE-5795 Register binary metadata during cache start - Fixes #4852. Signed-off-by: Alexey Goncharuk <[email protected]>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3bb03444 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3bb03444 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3bb03444 Branch: refs/heads/ignite-9720 Commit: 3bb03444246f863096063d084393676a84d2bc0e Parents: 5939a94 Author: Anton Kalashnikov <[email protected]> Authored: Fri Oct 19 17:51:41 2018 +0300 Committer: Alexey Goncharuk <[email protected]> Committed: Fri Oct 19 17:56:36 2018 +0300 ---------------------------------------------------------------------- .../apache/ignite/internal/IgniteKernal.java | 2 +- .../binary/BinaryCachingMetadataHandler.java | 25 +- .../ignite/internal/binary/BinaryContext.java | 59 +++- .../internal/binary/BinaryMetadataHandler.java | 10 + .../binary/BinaryNoopMetadataHandler.java | 6 + .../binary/builder/BinaryObjectBuilderImpl.java | 2 +- .../internal/client/thin/TcpIgniteClient.java | 6 + .../processors/cache/GridCacheProcessor.java | 9 +- .../binary/CacheObjectBinaryProcessorImpl.java | 5 + .../processors/query/GridQueryProcessor.java | 77 ++++- .../binary/TestCachingMetadataHandler.java | 6 + .../cache/CacheRegisterMetadataLocallyTest.java | 287 +++++++++++++++++++ .../cache/index/AbstractSchemaSelfTest.java | 6 +- .../index/H2DynamicIndexAbstractSelfTest.java | 48 ++-- .../IgniteCacheWithIndexingTestSuite.java | 2 + 15 files changed, 494 insertions(+), 56 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/3bb03444/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index c6ec9be..40347d7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -1005,6 +1005,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { // Start processors before discovery manager, so they will // be able to start receiving messages once discovery completes. try { + startProcessor(new GridMarshallerMappingProcessor(ctx)); startProcessor(new PdsConsistentIdProcessor(ctx)); startProcessor(new MvccProcessorImpl(ctx)); startProcessor(createComponent(DiscoveryNodeValidationProcessor.class, ctx)); @@ -1028,7 +1029,6 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { startProcessor(createHadoopComponent()); startProcessor(new DataStructuresProcessor(ctx)); startProcessor(createComponent(PlatformProcessor.class, ctx)); - startProcessor(new GridMarshallerMappingProcessor(ctx)); // Start plugins. for (PluginProvider provider : ctx.plugins().allProviders()) { http://git-wip-us.apache.org/repos/asf/ignite/blob/3bb03444/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryCachingMetadataHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryCachingMetadataHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryCachingMetadataHandler.java index a0559cb..b60dc097 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryCachingMetadataHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryCachingMetadataHandler.java @@ -46,24 +46,29 @@ public class BinaryCachingMetadataHandler implements BinaryMetadataHandler { } /** {@inheritDoc} */ - @Override public synchronized void addMeta(int typeId, BinaryType type, boolean failIfUnregistered) throws BinaryObjectException { - synchronized (this) { - BinaryType oldType = metas.put(typeId, type); + @Override public synchronized void addMeta(int typeId, BinaryType type, + boolean failIfUnregistered) throws BinaryObjectException { + BinaryType oldType = metas.put(typeId, type); - if (oldType != null) { - BinaryMetadata oldMeta = ((BinaryTypeImpl)oldType).metadata(); - BinaryMetadata newMeta = ((BinaryTypeImpl)type).metadata(); + if (oldType != null) { + BinaryMetadata oldMeta = ((BinaryTypeImpl)oldType).metadata(); + BinaryMetadata newMeta = ((BinaryTypeImpl)type).metadata(); - BinaryMetadata mergedMeta = BinaryUtils.mergeMetadata(oldMeta, newMeta); + BinaryMetadata mergedMeta = BinaryUtils.mergeMetadata(oldMeta, newMeta); - BinaryType mergedType = mergedMeta.wrap(((BinaryTypeImpl)oldType).context()); + BinaryType mergedType = mergedMeta.wrap(((BinaryTypeImpl)oldType).context()); - metas.put(typeId, mergedType); - } + metas.put(typeId, mergedType); } } /** {@inheritDoc} */ + @Override public synchronized void addMetaLocally(int typeId, BinaryType meta, boolean failIfUnregistered) + throws BinaryObjectException { + addMeta(typeId, meta, failIfUnregistered); + } + + /** {@inheritDoc} */ @Override public synchronized BinaryType metadata(int typeId) throws BinaryObjectException { return metas.get(typeId); } http://git-wip-us.apache.org/repos/asf/ignite/blob/3bb03444/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryContext.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryContext.java index 7885d95..7ab74e0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryContext.java @@ -617,6 +617,18 @@ public class BinaryContext { */ public BinaryClassDescriptor descriptorForClass(Class<?> cls, boolean deserialize, boolean failIfUnregistered) throws BinaryObjectException { + return descriptorForClass(cls, deserialize, failIfUnregistered, false); + } + + /** + * @param cls Class. + * @param failIfUnregistered Throw exception if class isn't registered. + * @param onlyLocReg {@code true} if descriptor need to register only locally when registration is required at all. + * @return Class descriptor. + * @throws BinaryObjectException In case of error. + */ + public BinaryClassDescriptor descriptorForClass(Class<?> cls, boolean deserialize, boolean failIfUnregistered, + boolean onlyLocReg) throws BinaryObjectException { assert cls != null; BinaryClassDescriptor desc = descByCls.get(cls); @@ -625,7 +637,7 @@ public class BinaryContext { if (failIfUnregistered) throw new UnregisteredClassException(cls); - desc = registerClassDescriptor(cls, deserialize); + desc = registerClassDescriptor(cls, deserialize, onlyLocReg); } else if (!desc.registered()) { if (!desc.userType()) { @@ -662,7 +674,7 @@ public class BinaryContext { if (failIfUnregistered) throw new UnregisteredClassException(cls); - desc = registerUserClassDescriptor(desc); + desc = registerUserClassDescriptor(desc, onlyLocReg); } } @@ -715,7 +727,7 @@ public class BinaryContext { } if (desc == null) { - desc = registerClassDescriptor(cls, deserialize); + desc = registerClassDescriptor(cls, deserialize, false); assert desc.typeId() == typeId : "Duplicate typeId [typeId=" + typeId + ", cls=" + cls + ", desc=" + desc + "]"; @@ -728,9 +740,10 @@ public class BinaryContext { * Creates and registers {@link BinaryClassDescriptor} for the given {@code class}. * * @param cls Class. + * @param onlyLocReg {@code true} if descriptor need to register only locally when registration is required at all. * @return Class descriptor. */ - private BinaryClassDescriptor registerClassDescriptor(Class<?> cls, boolean deserialize) { + private BinaryClassDescriptor registerClassDescriptor(Class<?> cls, boolean deserialize, boolean onlyLocReg) { BinaryClassDescriptor desc; String clsName = cls.getName(); @@ -759,7 +772,7 @@ public class BinaryContext { desc = old; } else - desc = registerUserClassDescriptor(cls, deserialize); + desc = registerUserClassDescriptor(cls, deserialize, onlyLocReg); return desc; } @@ -768,9 +781,10 @@ public class BinaryContext { * Creates and registers {@link BinaryClassDescriptor} for the given user {@code class}. * * @param cls Class. + * @param onlyLocReg {@code true} if descriptor need to register only locally. * @return Class descriptor. */ - private BinaryClassDescriptor registerUserClassDescriptor(Class<?> cls, boolean deserialize) { + private BinaryClassDescriptor registerUserClassDescriptor(Class<?> cls, boolean deserialize, boolean onlyLocReg) { boolean registered; final String clsName = cls.getName(); @@ -781,7 +795,7 @@ public class BinaryContext { final int typeId = mapper.typeId(clsName); - registered = registerUserClassName(typeId, cls.getName()); + registered = registerUserClassName(typeId, cls.getName(), onlyLocReg); BinarySerializer serializer = serializerForClass(cls); @@ -799,9 +813,22 @@ public class BinaryContext { registered ); - if (!deserialize) - metaHnd.addMeta(typeId, new BinaryMetadata(typeId, typeName, desc.fieldsMeta(), affFieldName, null, - desc.isEnum(), cls.isEnum() ? enumMap(cls) : null).wrap(this), false); + if (!deserialize) { + BinaryMetadata binaryMetadata = new BinaryMetadata( + typeId, + typeName, + desc.fieldsMeta(), + affFieldName, + null, + desc.isEnum(), + cls.isEnum() ? enumMap(cls) : null + ); + + if (onlyLocReg) + metaHnd.addMetaLocally(typeId, binaryMetadata.wrap(this), false); + else + metaHnd.addMeta(typeId, binaryMetadata.wrap(this), false); + } descByCls.put(cls, desc); @@ -814,12 +841,13 @@ public class BinaryContext { * Creates and registers {@link BinaryClassDescriptor} for the given user {@code class}. * * @param desc Old descriptor that should be re-registered. + * @param onlyLocReg {@code true} if descriptor need to register only locally. * @return Class descriptor. */ - private BinaryClassDescriptor registerUserClassDescriptor(BinaryClassDescriptor desc) { + private BinaryClassDescriptor registerUserClassDescriptor(BinaryClassDescriptor desc, boolean onlyLocReg) { boolean registered; - registered = registerUserClassName(desc.typeId(), desc.describedClass().getName()); + registered = registerUserClassName(desc.typeId(), desc.describedClass().getName(), onlyLocReg); if (registered) { BinarySerializer serializer = desc.initialSerializer(); @@ -1191,15 +1219,18 @@ public class BinaryContext { * * @param typeId Type ID. * @param clsName Class Name. + * @param onlyLocReg {@code true} if descriptor need to register only locally. * @return {@code True} if the mapping was registered successfully. */ - public boolean registerUserClassName(int typeId, String clsName) { + public boolean registerUserClassName(int typeId, String clsName, boolean onlyLocReg) { IgniteCheckedException e = null; boolean res = false; try { - res = marshCtx.registerClassName(JAVA_ID, typeId, clsName); + res = onlyLocReg + ? marshCtx.registerClassNameLocally(JAVA_ID, typeId, clsName) + : marshCtx.registerClassName(JAVA_ID, typeId, clsName); } catch (DuplicateTypeIdException dupEx) { // Ignore if trying to register mapped type name of the already registered class name and vise versa http://git-wip-us.apache.org/repos/asf/ignite/blob/3bb03444/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryMetadataHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryMetadataHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryMetadataHandler.java index 85ab137..d1336bf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryMetadataHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryMetadataHandler.java @@ -36,6 +36,16 @@ public interface BinaryMetadataHandler { public void addMeta(int typeId, BinaryType meta, boolean failIfUnregistered) throws BinaryObjectException; /** + * Adds meta data locally on current node without sending any messages. + * + * @param typeId Type ID. + * @param meta Metadata. + * @param failIfUnregistered Fail if unregistered. + * @throws BinaryObjectException In case of error. + */ + public void addMetaLocally(int typeId, BinaryType meta, boolean failIfUnregistered) throws BinaryObjectException; + + /** * Gets meta data for provided type ID. * * @param typeId Type ID. http://git-wip-us.apache.org/repos/asf/ignite/blob/3bb03444/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryNoopMetadataHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryNoopMetadataHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryNoopMetadataHandler.java index 4ee2428..a552d61 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryNoopMetadataHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryNoopMetadataHandler.java @@ -48,6 +48,12 @@ public class BinaryNoopMetadataHandler implements BinaryMetadataHandler { } /** {@inheritDoc} */ + @Override public void addMetaLocally(int typeId, BinaryType meta, boolean failIfUnregistered) + throws BinaryObjectException { + // No-op. + } + + /** {@inheritDoc} */ @Override public BinaryType metadata(int typeId) throws BinaryObjectException { return null; } http://git-wip-us.apache.org/repos/asf/ignite/blob/3bb03444/modules/core/src/main/java/org/apache/ignite/internal/binary/builder/BinaryObjectBuilderImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/builder/BinaryObjectBuilderImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/builder/BinaryObjectBuilderImpl.java index abd63cd..5414a25 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/binary/builder/BinaryObjectBuilderImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/builder/BinaryObjectBuilderImpl.java @@ -364,7 +364,7 @@ public class BinaryObjectBuilderImpl implements BinaryObjectBuilder { if (affFieldName0 == null) affFieldName0 = ctx.affinityKeyFieldName(typeId); - ctx.registerUserClassName(typeId, typeName); + ctx.registerUserClassName(typeId, typeName, false); ctx.updateMetadata(typeId, new BinaryMetadata(typeId, typeName, fieldsMeta, affFieldName0, Collections.singleton(curSchema), false, null), writer.failIfUnregistered()); http://git-wip-us.apache.org/repos/asf/ignite/blob/3bb03444/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java index 5040816..856f41c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java @@ -263,6 +263,12 @@ public class TcpIgniteClient implements IgniteClient { } /** {@inheritDoc} */ + @Override public void addMetaLocally(int typeId, BinaryType meta, boolean failIfUnregistered) + throws BinaryObjectException { + throw new UnsupportedOperationException("Can't register metadata locally for thin client."); + } + + /** {@inheritDoc} */ @Override public BinaryType metadata(int typeId) throws BinaryObjectException { BinaryType meta = cache.metadata(typeId); http://git-wip-us.apache.org/repos/asf/ignite/blob/3bb03444/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index ec88a93..4a6bed4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -3823,8 +3823,13 @@ public class GridCacheProcessor extends GridProcessorAdapter implements Metastor return msg0.needExchange(); } - if (msg instanceof DynamicCacheChangeBatch) - return cachesInfo.onCacheChangeRequested((DynamicCacheChangeBatch)msg, topVer); + if (msg instanceof DynamicCacheChangeBatch) { + boolean changeRequested = cachesInfo.onCacheChangeRequested((DynamicCacheChangeBatch)msg, topVer); + + ctx.query().onCacheChangeRequested((DynamicCacheChangeBatch)msg); + + return changeRequested; + } if (msg instanceof DynamicCacheChangeFailureMessage) cachesInfo.onCacheChangeRequested((DynamicCacheChangeFailureMessage)msg, topVer); http://git-wip-us.apache.org/repos/asf/ignite/blob/3bb03444/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java index 137db9f..f3078cb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java @@ -207,6 +207,11 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm CacheObjectBinaryProcessorImpl.this.addMeta(typeId, newMeta0.wrap(binaryCtx), failIfUnregistered); } + @Override public void addMetaLocally(int typeId, BinaryType meta, boolean failIfUnregistered) + throws BinaryObjectException { + CacheObjectBinaryProcessorImpl.this.addMetaLocally(typeId, meta); + } + @Override public BinaryType metadata(int typeId) throws BinaryObjectException { return CacheObjectBinaryProcessorImpl.this.metadata(typeId); } http://git-wip-us.apache.org/repos/asf/ignite/blob/3bb03444/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java index 43310a7..ea588ff 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java @@ -17,6 +17,8 @@ package org.apache.ignite.internal.processors.query; +import javax.cache.Cache; +import javax.cache.CacheException; import java.sql.PreparedStatement; import java.sql.SQLException; import java.util.ArrayList; @@ -34,8 +36,6 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; -import javax.cache.Cache; -import javax.cache.CacheException; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteDataStreamer; import org.apache.ignite.IgniteException; @@ -62,16 +62,20 @@ import org.apache.ignite.internal.processors.GridProcessorAdapter; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.CacheObjectContext; +import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch; +import org.apache.ignite.internal.processors.cache.DynamicCacheChangeRequest; import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor; import org.apache.ignite.internal.processors.cache.GridCacheAdapter; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl; import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot; import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; import org.apache.ignite.internal.processors.cache.query.CacheQueryFuture; import org.apache.ignite.internal.processors.cache.query.CacheQueryType; import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType; import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode; +import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor; import org.apache.ignite.internal.processors.query.property.QueryBinaryProperty; import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheFilter; import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitor; @@ -257,6 +261,8 @@ public class GridQueryProcessor extends GridProcessorAdapter { ctxs.queries().evictDetailMetrics(); } }, QRY_DETAIL_METRICS_EVICTION_FREQ, QRY_DETAIL_METRICS_EVICTION_FREQ); + + registerMetadataForRegisteredCaches(); } /** {@inheritDoc} */ @@ -904,6 +910,73 @@ public class GridQueryProcessor extends GridProcessorAdapter { } /** + * Register metadata locally for already registered caches. + */ + private void registerMetadataForRegisteredCaches() { + for (DynamicCacheDescriptor cacheDescriptor : ctx.cache().cacheDescriptors().values()) { + registerBinaryMetadata(cacheDescriptor.cacheConfiguration(), cacheDescriptor.schema()); + } + } + + /** + * Handle of cache change request. + * + * @param batch Dynamic cache change batch request. + */ + public void onCacheChangeRequested(DynamicCacheChangeBatch batch) { + for (DynamicCacheChangeRequest req : batch.requests()) { + if (!req.start()) + continue; + + registerBinaryMetadata(req.startCacheConfiguration(), req.schema()); + } + } + + /** + * Register binary metadata locally. + * + * @param ccfg Cache configuration. + * @param schema Schema for which register metadata is required. + */ + private void registerBinaryMetadata(CacheConfiguration ccfg, QuerySchema schema) { + if (schema != null) { + Collection<QueryEntity> qryEntities = schema.entities(); + + if (!F.isEmpty(qryEntities)) { + boolean binaryEnabled = ctx.cacheObjects().isBinaryEnabled(ccfg); + + if (binaryEnabled) { + for (QueryEntity qryEntity : qryEntities) { + Class<?> keyCls = U.box(U.classForName(qryEntity.findKeyType(), null, true)); + Class<?> valCls = U.box(U.classForName(qryEntity.findValueType(), null, true)); + + if (keyCls != null) + registerDescriptorLocallyIfNeeded(keyCls); + + if (valCls != null) + registerDescriptorLocallyIfNeeded(valCls); + } + } + } + } + } + + /** + * Register class metadata locally if it didn't do it earlier. + * + * @param cls Class for which the metadata should be registered. + */ + private void registerDescriptorLocallyIfNeeded(Class<?> cls) { + IgniteCacheObjectProcessor cacheObjProc = ctx.cacheObjects(); + + if (cacheObjProc instanceof CacheObjectBinaryProcessorImpl) { + ((CacheObjectBinaryProcessorImpl)cacheObjProc) + .binaryContext() + .descriptorForClass(cls, false, false, true); + } + } + + /** * Handle custom discovery message. * * @param msg Message. http://git-wip-us.apache.org/repos/asf/ignite/blob/3bb03444/modules/core/src/test/java/org/apache/ignite/internal/binary/TestCachingMetadataHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/binary/TestCachingMetadataHandler.java b/modules/core/src/test/java/org/apache/ignite/internal/binary/TestCachingMetadataHandler.java index c515f81..47138dd 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/binary/TestCachingMetadataHandler.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/binary/TestCachingMetadataHandler.java @@ -39,6 +39,12 @@ public class TestCachingMetadataHandler implements BinaryMetadataHandler { } /** {@inheritDoc} */ + @Override public void addMetaLocally(int typeId, BinaryType meta, boolean failIfUnregistered) + throws BinaryObjectException { + addMeta(typeId, meta, failIfUnregistered); + } + + /** {@inheritDoc} */ @Override public BinaryType metadata(int typeId) throws BinaryObjectException { return metas.get(typeId); } http://git-wip-us.apache.org/repos/asf/ignite/blob/3bb03444/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheRegisterMetadataLocallyTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheRegisterMetadataLocallyTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheRegisterMetadataLocallyTest.java new file mode 100644 index 0000000..d4066c2 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheRegisterMetadataLocallyTest.java @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.util.Collections; +import java.util.concurrent.ConcurrentLinkedQueue; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteException; +import org.apache.ignite.binary.BinaryType; +import org.apache.ignite.cache.QueryEntity; +import org.apache.ignite.cache.affinity.AffinityKeyMapped; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.managers.communication.GridIoMessage; +import org.apache.ignite.internal.managers.discovery.CustomMessageWrapper; +import org.apache.ignite.internal.processors.cache.binary.MetadataRequestMessage; +import org.apache.ignite.internal.processors.cache.binary.MetadataResponseMessage; +import org.apache.ignite.internal.processors.cache.binary.MetadataUpdateProposedMessage; +import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.spi.IgniteSpiException; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +/** + * Tests, that binary metadata is registered correctly during the start without extra request to grid. + */ +public class CacheRegisterMetadataLocallyTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final String STATIC_CACHE_NAME = "staticCache"; + + /** */ + private static final String DYNAMIC_CACHE_NAME = "dynamicCache"; + + /** Holder of sent custom messages. */ + private final ConcurrentLinkedQueue<Object> customMessages = new ConcurrentLinkedQueue<>(); + + /** Holder of sent communication messages. */ + private final ConcurrentLinkedQueue<Object> communicationMessages = new ConcurrentLinkedQueue<>(); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + cfg.setDiscoverySpi(new TcpDiscoverySpi() { + @Override public void sendCustomEvent(DiscoverySpiCustomMessage msg) throws IgniteException { + if (msg instanceof CustomMessageWrapper) + customMessages.add(((CustomMessageWrapper)msg).delegate()); + else + customMessages.add(msg); + + super.sendCustomEvent(msg); + } + }); + + cfg.setCommunicationSpi(new TcpCommunicationSpi() { + @Override public void sendMessage(ClusterNode node, Message msg, + IgniteInClosure<IgniteException> ackC) throws IgniteSpiException { + if (msg instanceof GridIoMessage) + communicationMessages.add(((GridIoMessage)msg).message()); + + super.sendMessage(node, msg, ackC); + } + + @Override public void sendMessage(ClusterNode node, Message msg) throws IgniteSpiException { + if (msg instanceof GridIoMessage) + communicationMessages.add(((GridIoMessage)msg).message()); + + super.sendMessage(node, msg); + } + }); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER); + + if (igniteInstanceName.equals("client")) + cfg.setClientMode(true); + + cfg.setCacheConfiguration(cacheConfiguration(STATIC_CACHE_NAME, StaticKey.class, StaticValue.class)); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + + cleanPersistenceDir(); + + customMessages.clear(); + communicationMessages.clear(); + } + + /** + * @throws Exception If failed. + */ + public void testAffinityKeyRegisteredStaticCache() throws Exception { + Ignite ignite = startGrid(); + + assertEquals("affKey", getAffinityKey(ignite, StaticKey.class)); + assertEquals("affKey", getAffinityKey(ignite, StaticValue.class)); + } + + /** + * @throws Exception If failed. + */ + public void testAffinityKeyRegisteredDynamicCache() throws Exception { + Ignite ignite = startGrid(); + + ignite.createCache(cacheConfiguration(DYNAMIC_CACHE_NAME, DynamicKey.class, DynamicValue.class)); + + assertEquals("affKey", getAffinityKey(ignite, DynamicKey.class)); + assertEquals("affKey", getAffinityKey(ignite, DynamicValue.class)); + } + + /** + * @throws Exception If failed. + */ + public void testClientFindsValueByAffinityKeyStaticCacheWithoutExtraRequest() throws Exception { + Ignite srv = startGrid(); + IgniteCache<StaticKey, StaticValue> cache = srv.cache(STATIC_CACHE_NAME); + + testClientFindsValueByAffinityKey(cache, new StaticKey(1), new StaticValue(2)); + + assertCustomMessages(2); //MetadataUpdateProposedMessage for update schema. + assertCommunicationMessages(); + } + + /** + * @throws Exception If failed. + */ + public void testClientFindsValueByAffinityKeyDynamicCacheWithoutExtraRequest() throws Exception { + Ignite srv = startGrid(); + IgniteCache<DynamicKey, DynamicValue> cache = + srv.createCache(cacheConfiguration(DYNAMIC_CACHE_NAME, DynamicKey.class, DynamicValue.class)); + + testClientFindsValueByAffinityKey(cache, new DynamicKey(3), new DynamicValue(4)); + + //Expected only DynamicCacheChangeBatch for start cache and MetadataUpdateProposedMessage for update schema. + assertCustomMessages(3); + assertCommunicationMessages(); + } + + /** + * @param ignite Ignite instance. + * @param keyCls Key class. + * @return Name of affinity key field of the given class. + */ + private <K> String getAffinityKey(Ignite ignite, Class<K> keyCls) { + BinaryType binType = ignite.binary().type(keyCls); + + return binType.affinityKeyFieldName(); + } + + /** + * @param cache Cache instance. + * @param key Test key. + * @param val Test value. + * @throws Exception If failed. + */ + private <K, V> void testClientFindsValueByAffinityKey(IgniteCache<K, V> cache, K key, V val) throws Exception { + cache.put(key, val); + + assertTrue(cache.containsKey(key)); + + Ignite client = startGrid("client"); + + IgniteCache<K, V> clientCache = client.cache(cache.getName()); + + assertTrue(clientCache.containsKey(key)); + } + + /** + * @param name Cache name. + * @param keyCls Key {@link Class}. + * @param valCls Value {@link Class}. + * @param <K> Key type. + * @param <V> Value type. + * @return Cache configuration + */ + private static <K, V> CacheConfiguration<K, V> cacheConfiguration(String name, Class<K> keyCls, Class<V> valCls) { + CacheConfiguration<K, V> cfg = new CacheConfiguration<>(name); + cfg.setQueryEntities(Collections.singleton(new QueryEntity(keyCls, valCls))); + return cfg; + } + + /** + * Expecting that "proposed binary metadata"( {@link org.apache.ignite.internal.processors.marshaller.MappingProposedMessage}, + * {@link org.apache.ignite.internal.processors.cache.binary.MetadataUpdateProposedMessage}) will be skipped because + * it should be register locally during the start. + * + * @param expMsgCnt Count of expected messages. + */ + private void assertCustomMessages(int expMsgCnt) { + assertEquals(customMessages.toString(), expMsgCnt, customMessages.size()); + + customMessages.forEach(cm -> assertTrue(cm.toString(), cm instanceof DynamicCacheChangeBatch || cm instanceof MetadataUpdateProposedMessage)); + } + + /** + * Expecting that extra request to binary metadata( {@link MetadataRequestMessage}, {@link MetadataResponseMessage}) + * will be skipped because it should be register locally during the start. + */ + private void assertCommunicationMessages() { + communicationMessages.forEach(cm -> + assertFalse(cm.toString(), cm instanceof MetadataRequestMessage || cm instanceof MetadataResponseMessage) + ); + } + + /** */ + private static class StaticKey { + /** */ + @AffinityKeyMapped + private int affKey; + + /** + * @param affKey Affinity key. + */ + StaticKey(int affKey) { + this.affKey = affKey; + } + } + + /** */ + private static class StaticValue { + /** */ + @AffinityKeyMapped + private int affKey; + + /** + * @param affKey Affinity key. + */ + StaticValue(int affKey) { + } + } + + /** */ + private static class DynamicKey { + /** */ + @AffinityKeyMapped + private int affKey; + + /** + * @param affKey Affinity key. + */ + DynamicKey(int affKey) { + this.affKey = affKey; + } + } + + /** */ + private static class DynamicValue { + /** */ + @AffinityKeyMapped + private int affKey; + + /** + * @param affKey Affinity key. + */ + DynamicValue(int affKey) { + this.affKey = affKey; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/3bb03444/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/AbstractSchemaSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/AbstractSchemaSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/AbstractSchemaSelfTest.java index 7f1e2e7..5f0e18e 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/AbstractSchemaSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/AbstractSchemaSelfTest.java @@ -629,21 +629,21 @@ public abstract class AbstractSchemaSelfTest extends GridCommonAbstractTest { public static class ValueClass { /** Field 1. */ @QuerySqlField - private String field1; + private Long field1; /** * Constructor. * * @param field1 Field 1. */ - public ValueClass(String field1) { + public ValueClass(Long field1) { this.field1 = field1; } /** * @return Field 1 */ - public String field1() { + public Long field1() { return field1; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/3bb03444/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexAbstractSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexAbstractSelfTest.java index 2d7a636..bfc3881 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexAbstractSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexAbstractSelfTest.java @@ -60,9 +60,9 @@ public abstract class H2DynamicIndexAbstractSelfTest extends AbstractSchemaSelfT IgniteCache<KeyClass, ValueClass> cache = client().cache(CACHE_NAME); - cache.put(new KeyClass(1), new ValueClass("val1")); - cache.put(new KeyClass(2), new ValueClass("val2")); - cache.put(new KeyClass(3), new ValueClass("val3")); + cache.put(new KeyClass(1), new ValueClass(1L)); + cache.put(new KeyClass(2), new ValueClass(2L)); + cache.put(new KeyClass(3), new ValueClass(3L)); } /** {@inheritDoc} */ @@ -89,14 +89,14 @@ public abstract class H2DynamicIndexAbstractSelfTest extends AbstractSchemaSelfT continue; List<List<?>> locRes = ignite(i).cache("cache").query(new SqlFieldsQuery("explain select \"id\" from " + - "\"cache\".\"ValueClass\" where \"field1\" = 'A'").setLocal(true)).getAll(); + "\"cache\".\"ValueClass\" where \"field1\" = 1").setLocal(true)).getAll(); assertEquals(F.asList( Collections.singletonList("SELECT\n" + " \"id\"\n" + "FROM \"cache\".\"ValueClass\"\n" + - " /* \"cache\".\"idx_1\": \"field1\" = 'A' */\n" + - "WHERE \"field1\" = 'A'") + " /* \"cache\".\"idx_1\": \"field1\" = 1 */\n" + + "WHERE \"field1\" = 1") ), locRes); } @@ -106,7 +106,7 @@ public abstract class H2DynamicIndexAbstractSelfTest extends AbstractSchemaSelfT assertSize(2); - cache.put(new KeyClass(4), new ValueClass("someVal")); + cache.put(new KeyClass(4), new ValueClass(1L)); assertSize(3); } @@ -162,14 +162,14 @@ public abstract class H2DynamicIndexAbstractSelfTest extends AbstractSchemaSelfT continue; List<List<?>> locRes = ignite(i).cache("cache").query(new SqlFieldsQuery("explain select \"id\" from " + - "\"cache\".\"ValueClass\" where \"field1\" = 'A'").setLocal(true)).getAll(); + "\"cache\".\"ValueClass\" where \"field1\" = 1").setLocal(true)).getAll(); assertEquals(F.asList( Collections.singletonList("SELECT\n" + " \"id\"\n" + "FROM \"cache\".\"ValueClass\"\n" + " /* \"cache\".\"ValueClass\".__SCAN_ */\n" + - "WHERE \"field1\" = 'A'") + "WHERE \"field1\" = 1") ), locRes); } @@ -204,38 +204,39 @@ public abstract class H2DynamicIndexAbstractSelfTest extends AbstractSchemaSelfT public void testIndexState() { IgniteCache<KeyClass, ValueClass> cache = cache(); - assertColumnValues("val1", "val2", "val3"); + assertColumnValues(1L, 2L, 3L); cache.query(new SqlFieldsQuery("CREATE INDEX \"" + IDX_NAME_1_ESCAPED + "\" ON \"" + TBL_NAME_ESCAPED + "\"(\"" + FIELD_NAME_1_ESCAPED + "\" ASC)")); - assertColumnValues("val1", "val2", "val3"); + assertColumnValues(1L, 2L, 3L); cache.remove(new KeyClass(2)); - assertColumnValues("val1", "val3"); + assertColumnValues(1L, 3L); - cache.put(new KeyClass(0), new ValueClass("someVal")); + cache.put(new KeyClass(0), new ValueClass(0L)); - assertColumnValues("someVal", "val1", "val3"); + assertColumnValues(0L, 1L, 3L); cache.query(new SqlFieldsQuery("DROP INDEX \"" + IDX_NAME_1_ESCAPED + "\"")); - assertColumnValues("someVal", "val1", "val3"); + assertColumnValues(0L, 1L, 3L); } /** * Check that values of {@code field1} match what we expect. * @param vals Expected values. */ - private void assertColumnValues(String... vals) { + private void assertColumnValues(Long... vals) { List<List<?>> expRes = new ArrayList<>(vals.length); - for (String v : vals) + for (Long v : vals) expRes.add(Collections.singletonList(v)); - assertEquals(expRes, cache().query(new SqlFieldsQuery("SELECT \"" + FIELD_NAME_1_ESCAPED + "\" FROM \"" + - TBL_NAME_ESCAPED + "\" ORDER BY \"id\"")).getAll()); + List<List<?>> all = cache().query(new SqlFieldsQuery("SELECT \"" + FIELD_NAME_1_ESCAPED + "\" FROM \"" + + TBL_NAME_ESCAPED + "\" ORDER BY \"id\"")).getAll(); + assertEquals(expRes, all); } /** @@ -245,8 +246,9 @@ public abstract class H2DynamicIndexAbstractSelfTest extends AbstractSchemaSelfT private void assertSize(long expSize) { assertEquals(expSize, cache().size()); - assertEquals(expSize, cache().query(new SqlFieldsQuery("SELECT COUNT(*) from \"ValueClass\"")) - .getAll().get(0).get(0)); + Object actual = cache().query(new SqlFieldsQuery("SELECT COUNT(*) from \"ValueClass\"")) + .getAll().get(0).get(0); + assertEquals(expSize, actual); } /** @@ -313,8 +315,8 @@ public abstract class H2DynamicIndexAbstractSelfTest extends AbstractSchemaSelfT entity.setValueType(ValueClass.class.getName()); entity.addQueryField("id", Long.class.getName(), null); - entity.addQueryField(FIELD_NAME_1_ESCAPED, String.class.getName(), null); - entity.addQueryField(FIELD_NAME_2_ESCAPED, String.class.getName(), null); + entity.addQueryField(FIELD_NAME_1_ESCAPED, Long.class.getName(), null); + entity.addQueryField(FIELD_NAME_2_ESCAPED, Long.class.getName(), null); entity.setKeyFields(Collections.singleton("id")); http://git-wip-us.apache.org/repos/asf/ignite/blob/3bb03444/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java index 8517ebb..fae196f 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java @@ -19,6 +19,7 @@ package org.apache.ignite.testsuites; import junit.framework.TestSuite; import org.apache.ignite.internal.processors.cache.BinaryTypeMismatchLoggingTest; +import org.apache.ignite.internal.processors.cache.CacheRegisterMetadataLocallyTest; import org.apache.ignite.internal.processors.cache.CacheBinaryKeyConcurrentQueryTest; import org.apache.ignite.internal.processors.cache.CacheConfigurationP2PTest; import org.apache.ignite.internal.processors.cache.CacheIndexStreamerTest; @@ -80,6 +81,7 @@ public class IgniteCacheWithIndexingTestSuite extends TestSuite { suite.addTestSuite(CacheOperationsWithExpirationTest.class); suite.addTestSuite(CacheBinaryKeyConcurrentQueryTest.class); suite.addTestSuite(CacheQueryFilterExpiredTest.class); + suite.addTestSuite(CacheRegisterMetadataLocallyTest.class); suite.addTestSuite(ClientReconnectAfterClusterRestartTest.class);
