IGNITE-10643: SQL: Unified affinity key column resolution for queries. This closes #5641. This closes #5649.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c1edb300 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c1edb300 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c1edb300 Branch: refs/heads/ignite-10639 Commit: c1edb300979b6a4127a9a676e8d504f0dac459a6 Parents: 84a8e81 Author: devozerov <[email protected]> Authored: Tue Dec 18 11:07:30 2018 +0300 Committer: devozerov <[email protected]> Committed: Tue Dec 18 11:07:30 2018 +0300 ---------------------------------------------------------------------- .../processors/cache/CacheObjectContext.java | 35 +- .../cache/GridCacheAffinityManager.java | 6 +- .../processors/cache/GridCacheContext.java | 22 +- .../processors/cache/GridCacheContextInfo.java | 88 ++--- .../processors/cache/GridCacheProcessor.java | 4 +- .../cache/binary/CacheObjectBinaryContext.java | 62 ---- .../binary/CacheObjectBinaryProcessor.java | 149 -------- .../binary/CacheObjectBinaryProcessorImpl.java | 306 +++++++++++++--- .../cache/binary/IgniteBinaryImpl.java | 4 +- .../reader/StandaloneWalRecordsIterator.java | 2 +- .../cacheobject/IgniteCacheObjectProcessor.java | 119 ++++++- .../IgniteCacheObjectProcessorImpl.java | 346 ------------------- .../cacheobject/UserKeyCacheObjectImpl.java | 2 +- .../processors/query/GridQueryProcessor.java | 15 +- .../query/GridQueryTypeDescriptor.java | 5 + .../query/QueryTypeDescriptorImpl.java | 25 +- .../internal/processors/query/QueryUtils.java | 35 +- .../binary/BinaryMarshallerSelfTest.java | 2 +- ...IgniteClientCacheInitializationFailTest.java | 2 +- .../wal/IgniteWalIteratorSwitchSegmentTest.java | 4 +- .../query/h2/DmlStatementsProcessor.java | 2 +- .../processors/query/h2/H2RowCacheRegistry.java | 2 +- .../processors/query/h2/H2TableDescriptor.java | 6 +- .../internal/processors/query/h2/H2Utils.java | 4 +- .../processors/query/h2/IgniteH2Indexing.java | 4 +- .../query/h2/affinity/PartitionExtractor.java | 36 +- .../query/h2/dml/UpdatePlanBuilder.java | 2 +- .../query/h2/opt/GridH2CollocationModel.java | 10 +- .../query/h2/opt/GridH2IndexBase.java | 6 +- .../query/h2/opt/GridH2RowDescriptor.java | 11 +- .../processors/query/h2/opt/GridH2Table.java | 74 ++-- ...ityKeyNameAndValueFieldNameConflictTest.java | 6 + ...acheDistributedJoinCollocatedAndNotTest.java | 2 + .../AndOperationExtractPartitionSelfTest.java | 2 +- ...sappearedCacheCauseRetryMessageSelfTest.java | 6 +- ...appearedCacheWasNotFoundMessageSelfTest.java | 4 +- .../InOperationExtractPartitionSelfTest.java | 5 +- .../query/h2/twostep/JoinSqlTestHelper.java | 72 ++-- .../NonCollocatedRetryMessageSelfTest.java | 4 +- .../h2/twostep/RetryCauseMessageSelfTest.java | 4 +- 40 files changed, 648 insertions(+), 847 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/c1edb300/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectContext.java index a34b63b..d121a5e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectContext.java @@ -25,22 +25,29 @@ import org.apache.ignite.internal.GridKernalContext; */ public class CacheObjectContext implements CacheObjectValueContext { /** */ - private GridKernalContext kernalCtx; + private final GridKernalContext kernalCtx; /** */ - private String cacheName; + private final String cacheName; /** */ + @SuppressWarnings("deprecation") private AffinityKeyMapper dfltAffMapper; + /** Whether custom affinity mapper is used. */ + private final boolean customAffMapper; + /** */ - private boolean cpyOnGet; + private final boolean cpyOnGet; /** */ - private boolean storeVal; + private final boolean storeVal; /** */ - private boolean addDepInfo; + private final boolean addDepInfo; + + /** Boinary enabled flag. */ + private final boolean binaryEnabled; /** * @param kernalCtx Kernal context. @@ -48,19 +55,25 @@ public class CacheObjectContext implements CacheObjectValueContext { * @param cpyOnGet Copy on get flag. * @param storeVal {@code True} if should store unmarshalled value in cache. * @param addDepInfo {@code true} if deployment info should be associated with the objects of this cache. + * @param binaryEnabled Binary enabled flag. */ + @SuppressWarnings("deprecation") public CacheObjectContext(GridKernalContext kernalCtx, String cacheName, AffinityKeyMapper dfltAffMapper, + boolean customAffMapper, boolean cpyOnGet, boolean storeVal, - boolean addDepInfo) { + boolean addDepInfo, + boolean binaryEnabled) { this.kernalCtx = kernalCtx; this.cacheName = cacheName; this.dfltAffMapper = dfltAffMapper; + this.customAffMapper = customAffMapper; this.cpyOnGet = cpyOnGet; this.storeVal = storeVal; this.addDepInfo = addDepInfo; + this.binaryEnabled = binaryEnabled; } /** @@ -88,10 +101,18 @@ public class CacheObjectContext implements CacheObjectValueContext { /** * @return Default affinity mapper. */ + @SuppressWarnings("deprecation") public AffinityKeyMapper defaultAffMapper() { return dfltAffMapper; } + /** + * @return Whether custom affinity mapper is used. + */ + public boolean customAffinityMapper() { + return customAffMapper; + } + /** {@inheritDoc} */ @Override public GridKernalContext kernalContext() { return kernalCtx; @@ -99,7 +120,7 @@ public class CacheObjectContext implements CacheObjectValueContext { /** {@inheritDoc} */ @Override public boolean binaryEnabled() { - return false; + return binaryEnabled; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/c1edb300/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java index e20855f..1315c67 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java @@ -193,10 +193,12 @@ public class GridCacheAffinityManager extends GridCacheManagerAdapter { * @return Affinity key. */ public Object affinityKey(Object key) { + CacheObjectContext coCtx = cctx.cacheObjectContext(); + if (key instanceof CacheObject && !(key instanceof BinaryObject)) - key = ((CacheObject)key).value(cctx.cacheObjectContext(), false); + key = ((CacheObject)key).value(coCtx, false); - return (key instanceof GridCacheInternal ? cctx.defaultAffMapper() : affMapper).affinityKey(key); + return (key instanceof GridCacheInternal ? coCtx.defaultAffMapper() : affMapper).affinityKey(key); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/c1edb300/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java index ffb1886..426a29c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java @@ -47,7 +47,6 @@ import org.apache.ignite.binary.BinaryField; import org.apache.ignite.binary.BinaryObjectBuilder; import org.apache.ignite.cache.CacheInterceptor; import org.apache.ignite.cache.CacheMode; -import org.apache.ignite.cache.affinity.AffinityKeyMapper; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; @@ -256,9 +255,6 @@ public class GridCacheContext<K, V> implements Externalizable { /** */ private boolean deferredDel; - /** */ - private boolean customAffMapper; - /** Whether {@link EventType#EVT_CACHE_REBALANCE_STARTED} was sent (used only for REPLICATED cache). */ private volatile boolean rebalanceStartedEvtSent; @@ -465,13 +461,6 @@ public class GridCacheContext<K, V> implements Externalizable { } /** - * @return {@code True} if custom {@link AffinityKeyMapper} is configured for cache. - */ - public boolean customAffinityMapper() { - return customAffMapper; - } - - /** * @return Dynamic deployment ID. */ public IgniteUuid dynamicDeploymentId() { @@ -1228,13 +1217,6 @@ public class GridCacheContext<K, V> implements Externalizable { } /** - * @return Default affinity key mapper. - */ - public AffinityKeyMapper defaultAffMapper() { - return cacheObjCtx.defaultAffMapper(); - } - - /** * @return Compression manager. */ public CacheCompressionManager compress() { @@ -1248,8 +1230,6 @@ public class GridCacheContext<K, V> implements Externalizable { */ public void cacheObjectContext(CacheObjectContext cacheObjCtx) { this.cacheObjCtx = cacheObjCtx; - - customAffMapper = cacheCfg.getAffinityMapper().getClass() != cacheObjCtx.defaultAffMapper().getClass(); } /** @@ -2302,7 +2282,7 @@ public class GridCacheContext<K, V> implements Externalizable { BinaryObjectBuilderImpl builder0 = (BinaryObjectBuilderImpl)buider; - if (!customAffinityMapper()) { + if (!cacheObjCtx.customAffinityMapper()) { CacheDefaultBinaryAffinityKeyMapper mapper = (CacheDefaultBinaryAffinityKeyMapper)cacheObjCtx.defaultAffMapper(); http://git-wip-us.apache.org/repos/asf/ignite/blob/c1edb300/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContextInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContextInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContextInfo.java index 673c32e..958f4f8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContextInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContextInfo.java @@ -18,7 +18,6 @@ package org.apache.ignite.internal.processors.cache; import org.apache.ignite.configuration.CacheConfiguration; -import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.lang.IgniteUuid; @@ -29,15 +28,9 @@ import org.jetbrains.annotations.Nullable; */ @GridToStringExclude public class GridCacheContextInfo<K, V> { - /** Full cache context. Can be {@code null} in case a cache is not started. */ - @Nullable private volatile GridCacheContext gridCacheContext; - /** Cache is client or not. */ private final boolean clientCache; - /** Kernal context. */ - private final GridKernalContext ctx; - /** Dynamic cache deployment ID. */ private final IgniteUuid dynamicDeploymentId; @@ -50,96 +43,90 @@ public class GridCacheContextInfo<K, V> { /** Cache ID. */ private final int cacheId; + /** Full cache context. Can be {@code null} in case a cache is not started. */ + @Nullable private volatile GridCacheContext cctx; + /** * Constructor of full cache context. * - * @param gridCacheContext Cache context. + * @param cctx Cache context. * @param clientCache Client cache or not. */ - public GridCacheContextInfo(GridCacheContext<K, V> gridCacheContext, boolean clientCache) { - this.gridCacheContext = gridCacheContext; - this.ctx = gridCacheContext.kernalContext(); - this.config = gridCacheContext.config(); - this.dynamicDeploymentId = null; - this.groupId = gridCacheContext.groupId(); - this.cacheId = gridCacheContext.cacheId(); + public GridCacheContextInfo(GridCacheContext<K, V> cctx, boolean clientCache) { + config = cctx.config(); + dynamicDeploymentId = null; + groupId = cctx.groupId(); + cacheId = cctx.cacheId(); + this.clientCache = clientCache; + + this.cctx = cctx; } /** * Constructor of not started cache context. * * @param cacheDesc Cache descriptor. - * @param ctx Kernal context. */ - public GridCacheContextInfo(DynamicCacheDescriptor cacheDesc, GridKernalContext ctx) { - this.config = cacheDesc.cacheConfiguration(); - this.dynamicDeploymentId = cacheDesc.deploymentId(); - this.groupId = cacheDesc.groupId(); - this.ctx = ctx; - this.clientCache = true; - - this.cacheId = CU.cacheId(config.getName()); + public GridCacheContextInfo(DynamicCacheDescriptor cacheDesc) { + config = cacheDesc.cacheConfiguration(); + dynamicDeploymentId = cacheDesc.deploymentId(); + groupId = cacheDesc.groupId(); + cacheId = CU.cacheId(config.getName()); + clientCache = true; } /** * @return Cache configuration. */ public CacheConfiguration config() { - return isCacheContextInited() ? gridCacheContext.config() : config; + return config; } /** * @return Cache name. */ public String name() { - return isCacheContextInited() ? gridCacheContext.name() : config.getName(); - } - - /** - * @return {@code true} in case cache use custom affinity mapper. - */ - public boolean customAffinityMapper() { - return isCacheContextInited() && gridCacheContext.customAffinityMapper(); + return config.getName(); } /** * @return Cache group id. */ public int groupId() { - return isCacheContextInited() ? gridCacheContext.groupId() : groupId; + return groupId; } /** * @return Cache id. */ public int cacheId() { - return isCacheContextInited() ? gridCacheContext.cacheId() : cacheId; + return cacheId; } /** * @return {@code true} in case affinity node. */ public boolean affinityNode() { - return isCacheContextInited() && gridCacheContext.affinityNode(); + return cctx != null && cctx.affinityNode(); } /** * @return Cache context. {@code null} for not started cache. */ - @Nullable public GridCacheContext gridCacheContext() { - return gridCacheContext; + @Nullable public GridCacheContext cacheContext() { + return cctx; } /** * @return Dynamic deployment ID. */ public IgniteUuid dynamicDeploymentId() { - GridCacheContext ctx = gridCacheContext; + GridCacheContext cctx0 = cctx; - if (ctx != null) - return ctx.dynamicDeploymentId(); + if (cctx0 != null) + return cctx0.dynamicDeploymentId(); assert dynamicDeploymentId != null : "Deployment id is not set and cache context is not initialized: " + this; @@ -149,13 +136,13 @@ public class GridCacheContextInfo<K, V> { /** * Set real cache context in case cache has been fully initted and start. * - * @param gridCacheCtx Initted cache context. + * @param cctx Initted cache context. */ - public void initCacheContext(GridCacheContext<?, ?> gridCacheCtx) { - assert this.gridCacheContext == null : this.gridCacheContext; - assert gridCacheCtx != null; + public void initCacheContext(GridCacheContext<?, ?> cctx) { + assert this.cctx == null : this.cctx; + assert cctx != null; - this.gridCacheContext = gridCacheCtx; + this.cctx = cctx; } /** @@ -166,17 +153,10 @@ public class GridCacheContextInfo<K, V> { } /** - * @return Kernal context. - */ - public GridKernalContext context() { - return ctx; - } - - /** * @return {@code true} If Cache context is initted. */ public boolean isCacheContextInited() { - return gridCacheContext != null; + return cctx != null; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/c1edb300/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 885ee22..5bd9eb8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -1298,7 +1298,9 @@ public class GridCacheProcessor extends GridProcessorAdapter { public void initQueryStructuresForNotStartedCache(DynamicCacheDescriptor cacheDesc) throws IgniteCheckedException { QuerySchema schema = cacheDesc.schema() != null ? cacheDesc.schema() : new QuerySchema(); - GridCacheContextInfo cacheInfo = new GridCacheContextInfo(cacheDesc, ctx); + CacheObjectContext coCtx = cacheDesc.cacheObjectContext(ctx.cacheObjects()); + + GridCacheContextInfo cacheInfo = new GridCacheContextInfo(cacheDesc); ctx.query().onCacheStart(cacheInfo, schema, cacheDesc.sql()); } http://git-wip-us.apache.org/repos/asf/ignite/blob/c1edb300/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryContext.java deleted file mode 100644 index 7f7e26e..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryContext.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.binary; - -import org.apache.ignite.configuration.CacheConfiguration; -import org.apache.ignite.internal.GridKernalContext; -import org.apache.ignite.internal.processors.cache.CacheDefaultBinaryAffinityKeyMapper; -import org.apache.ignite.internal.processors.cache.CacheObjectContext; -import org.apache.ignite.internal.processors.cache.GridCacheDefaultAffinityKeyMapper; - -/** - * - */ -public class CacheObjectBinaryContext extends CacheObjectContext { - /** */ - private boolean binaryEnabled; - - /** - * @param kernalCtx Kernal context. - * @param ccfg Cache configuration. - * @param binaryEnabled Binary enabled flag. - * @param cpyOnGet Copy on get flag. - * @param storeVal {@code True} if should store unmarshalled value in cache. - * @param depEnabled {@code true} if deployment is enabled for the given cache. - */ - public CacheObjectBinaryContext(GridKernalContext kernalCtx, - CacheConfiguration ccfg, - boolean cpyOnGet, - boolean storeVal, - boolean binaryEnabled, - boolean depEnabled) { - super(kernalCtx, - ccfg.getName(), - binaryEnabled ? new CacheDefaultBinaryAffinityKeyMapper(ccfg.getKeyConfiguration()) : - new GridCacheDefaultAffinityKeyMapper(), - cpyOnGet, - storeVal, - depEnabled); - - this.binaryEnabled = binaryEnabled; - } - - /** {@inheritDoc} */ - @Override public boolean binaryEnabled() { - return binaryEnabled; - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/c1edb300/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessor.java deleted file mode 100644 index 781bc5e..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessor.java +++ /dev/null @@ -1,149 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.binary; - -import java.util.Collection; -import java.util.Map; -import org.apache.ignite.IgniteBinary; -import org.apache.ignite.IgniteException; -import org.apache.ignite.binary.BinaryObject; -import org.apache.ignite.binary.BinaryObjectBuilder; -import org.apache.ignite.binary.BinaryType; -import org.apache.ignite.internal.binary.BinaryFieldMetadata; -import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor; -import org.jetbrains.annotations.Nullable; - -/** - * Extended cache object processor interface with additional methods for binary. - */ -public interface CacheObjectBinaryProcessor extends IgniteCacheObjectProcessor { - /** - * @param clsName Class name. - * @return Builder. - */ - public BinaryObjectBuilder builder(String clsName); - - /** - * Creates builder initialized by existing binary object. - * - * @param binaryObj Binary object to edit. - * @return Binary builder. - */ - public BinaryObjectBuilder builder(BinaryObject binaryObj); - - /** - * @param typeId Type ID. - * @param newMeta New meta data. - * @param failIfUnregistered Fail if unregistered. - * @throws IgniteException In case of error. - */ - public void addMeta(int typeId, final BinaryType newMeta, boolean failIfUnregistered) throws IgniteException; - - /** - * Adds metadata locally without triggering discovery exchange. - * - * Must be used only during startup and only if it is guaranteed that all nodes have the same copy - * of BinaryType. - * - * @param typeId Type ID. - * @param newMeta New meta data. - * @throws IgniteException In case of error. - */ - public void addMetaLocally(int typeId, final BinaryType newMeta) throws IgniteException; - - /** - * @param typeId Type ID. - * @param typeName Type name. - * @param affKeyFieldName Affinity key field name. - * @param fieldTypeIds Fields map. - * @param isEnum Enum flag. - * @param enumMap Enum name to ordinal mapping. - * @throws IgniteException In case of error. - */ - public void updateMetadata(int typeId, String typeName, @Nullable String affKeyFieldName, - Map<String, BinaryFieldMetadata> fieldTypeIds, boolean isEnum, @Nullable Map<String, Integer> enumMap) - throws IgniteException; - - /** - * @param typeId Type ID. - * @return Meta data. - * @throws IgniteException In case of error. - */ - @Nullable public BinaryType metadata(int typeId) throws IgniteException; - - - /** - * @param typeId Type ID. - * @param schemaId Schema ID. - * @return Meta data. - * @throws IgniteException In case of error. - */ - @Nullable public BinaryType metadata(int typeId, int schemaId) throws IgniteException; - - /** - * @param typeIds Type ID. - * @return Meta data. - * @throws IgniteException In case of error. - */ - public Map<Integer, BinaryType> metadata(Collection<Integer> typeIds) throws IgniteException; - - /** - * @return Metadata for all types. - * @throws IgniteException In case of error. - */ - public Collection<BinaryType> metadata() throws IgniteException; - - /** - * @param typeName Type name. - * @param ord ordinal. - * @return Enum object. - * @throws IgniteException If failed. - */ - public BinaryObject buildEnum(String typeName, int ord) throws IgniteException; - - /** - * @param typeName Type name. - * @param name Name. - * @return Enum object. - * @throws IgniteException If failed. - */ - public BinaryObject buildEnum(String typeName, String name) throws IgniteException; - - /** - * Register enum type - * - * @param typeName Type name. - * @param vals Mapping of enum constant names to ordinals. - * @return Binary Type for registered enum. - */ - public BinaryType registerEnum(String typeName, Map<String, Integer> vals) throws IgniteException; - - /** - * @return Binaries interface. - * @throws IgniteException If failed. - */ - @Override public IgniteBinary binary() throws IgniteException; - - /** - * @param obj Original object. - * @param failIfUnregistered Throw exception if class isn't registered. - * @return Binary object (in case binary marshaller is used). - * @throws IgniteException If failed. - */ - public Object marshalToBinary(Object obj, boolean failIfUnregistered) throws IgniteException; -} http://git-wip-us.apache.org/repos/asf/ignite/blob/c1edb300/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java index 8dcc510..0a1e887 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java @@ -19,10 +19,13 @@ package org.apache.ignite.internal.processors.cache.binary; import java.io.File; import java.io.Serializable; +import java.math.BigDecimal; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; @@ -43,6 +46,7 @@ import org.apache.ignite.binary.BinaryObjectBuilder; import org.apache.ignite.binary.BinaryObjectException; import org.apache.ignite.binary.BinaryType; import org.apache.ignite.binary.BinaryTypeConfiguration; +import org.apache.ignite.cache.affinity.AffinityKeyMapper; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.BinaryConfiguration; import org.apache.ignite.configuration.CacheConfiguration; @@ -67,14 +71,25 @@ import org.apache.ignite.internal.binary.GridBinaryMarshaller; import org.apache.ignite.internal.binary.builder.BinaryObjectBuilderImpl; import org.apache.ignite.internal.binary.streams.BinaryInputStream; import org.apache.ignite.internal.binary.streams.BinaryOffheapInputStream; +import org.apache.ignite.internal.processors.GridProcessorAdapter; +import org.apache.ignite.internal.processors.cache.CacheDefaultBinaryAffinityKeyMapper; import org.apache.ignite.internal.processors.cache.CacheObject; +import org.apache.ignite.internal.processors.cache.CacheObjectByteArrayImpl; import org.apache.ignite.internal.processors.cache.CacheObjectContext; +import org.apache.ignite.internal.processors.cache.CacheObjectImpl; import org.apache.ignite.internal.processors.cache.CacheObjectValueContext; import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheDefaultAffinityKeyMapper; import org.apache.ignite.internal.processors.cache.GridCacheUtils; +import org.apache.ignite.internal.processors.cache.IncompleteCacheObject; import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; -import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessorImpl; +import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor; +import org.apache.ignite.internal.processors.cacheobject.UserCacheObjectByteArrayImpl; +import org.apache.ignite.internal.processors.cacheobject.UserCacheObjectImpl; +import org.apache.ignite.internal.processors.cacheobject.UserKeyCacheObjectImpl; +import org.apache.ignite.internal.processors.query.QueryUtils; import org.apache.ignite.internal.util.GridUnsafe; import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.internal.util.MutableSingletonList; @@ -90,6 +105,7 @@ import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.lang.IgniteFuture; +import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.marshaller.Marshaller; import org.apache.ignite.spi.IgniteNodeValidationResult; import org.apache.ignite.spi.discovery.DiscoveryDataBag; @@ -108,8 +124,10 @@ import static org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType /** * Binary processor implementation. */ -public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorImpl implements - CacheObjectBinaryProcessor { +public class CacheObjectBinaryProcessorImpl extends GridProcessorAdapter implements IgniteCacheObjectProcessor { + /** Immutable classes. */ + private static final Collection<Class<?>> IMMUTABLE_CLS = new HashSet<>(); + /** */ private volatile boolean discoveryStarted; @@ -139,7 +157,8 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm private long waitSchemaTimeout = IgniteSystemProperties.getLong(IGNITE_WAIT_SCHEMA_UPDATE, 30_000); /** For tests. */ - public static boolean useTestBinaryCtx = false; + @SuppressWarnings("PublicField") + public static boolean useTestBinaryCtx; /** */ @GridToStringExclude @@ -154,9 +173,28 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm /** Cached affinity key field names. */ private final ConcurrentHashMap<Integer, T1<BinaryField>> affKeyFields = new ConcurrentHashMap<>(); + /* + * Static initializer + */ + static { + IMMUTABLE_CLS.add(String.class); + IMMUTABLE_CLS.add(Boolean.class); + IMMUTABLE_CLS.add(Byte.class); + IMMUTABLE_CLS.add(Short.class); + IMMUTABLE_CLS.add(Character.class); + IMMUTABLE_CLS.add(Integer.class); + IMMUTABLE_CLS.add(Long.class); + IMMUTABLE_CLS.add(Float.class); + IMMUTABLE_CLS.add(Double.class); + IMMUTABLE_CLS.add(UUID.class); + IMMUTABLE_CLS.add(IgniteUuid.class); + IMMUTABLE_CLS.add(BigDecimal.class); + } + /** * @param ctx Kernal context. */ + @SuppressWarnings("deprecation") public CacheObjectBinaryProcessorImpl(GridKernalContext ctx) { super(ctx); @@ -277,7 +315,7 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm } /** {@inheritDoc} */ - @Override public void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException { + @Override public void onDisconnected(IgniteFuture<?> reconnectFut) { this.reconnectFut = reconnectFut; if (transport != null) @@ -291,7 +329,7 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm /** {@inheritDoc} */ @Override public IgniteInternalFuture<?> onReconnected(boolean clusterRestarted) throws IgniteCheckedException { - this.reconnectFut = null; + reconnectFut = null; return super.onReconnected(clusterRestarted); } @@ -304,13 +342,33 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm } /** {@inheritDoc} */ + @Nullable @Override public CacheObject prepareForCache(@Nullable CacheObject obj, GridCacheContext cctx) { + if (obj == null) + return null; + + return obj.prepareForCache(cctx.cacheObjectContext()); + } + + /** {@inheritDoc} */ @Override public int typeId(String typeName) { if (binaryCtx == null) - return super.typeId(typeName); + return 0; return binaryCtx.typeId(typeName); } + /** {@inheritDoc} */ + @Override public boolean immutable(Object obj) { + assert obj != null; + + return IMMUTABLE_CLS.contains(obj.getClass()); + } + + /** {@inheritDoc} */ + @Override public void onContinuousProcessorStarted(GridKernalContext ctx) { + // No-op. + } + /** * @param obj Object. * @return Bytes. @@ -462,7 +520,8 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm } /** {@inheritDoc} */ - @Override public void addMeta(final int typeId, final BinaryType newMeta, boolean failIfUnregistered) throws BinaryObjectException { + @Override public void addMeta(final int typeId, final BinaryType newMeta, boolean failIfUnregistered) + throws BinaryObjectException { assert newMeta != null; assert newMeta instanceof BinaryTypeImpl; @@ -827,7 +886,7 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm return field; } else { - affKeyFields.putIfAbsent(typeId, new T1<BinaryField>(null)); + affKeyFields.putIfAbsent(typeId, new T1<>(null)); return null; } @@ -846,7 +905,7 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm if (obj == null) return null; - return isBinaryObject(obj) ? ((BinaryObject)obj).field(fieldName) : super.field(obj, fieldName); + return isBinaryObject(obj) ? ((BinaryObject)obj).field(fieldName) : null; } /** {@inheritDoc} */ @@ -862,30 +921,44 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm } /** {@inheritDoc} */ - @Override public CacheObjectContext contextForCache(CacheConfiguration cfg) throws IgniteCheckedException { - assert cfg != null; - - boolean binaryEnabled = marsh instanceof BinaryMarshaller && !GridCacheUtils.isSystemCache(cfg.getName()) && - !GridCacheUtils.isIgfsCache(ctx.config(), cfg.getName()); - - CacheObjectContext ctx0 = super.contextForCache(cfg); - - CacheObjectContext res = new CacheObjectBinaryContext(ctx, - cfg, - ctx0.copyOnGet(), - ctx0.storeValue(), - binaryEnabled, - ctx0.addDeploymentInfo()); - - ctx.resource().injectGeneric(res.defaultAffMapper()); - - return res; + @SuppressWarnings("deprecation") + @Override public CacheObjectContext contextForCache(CacheConfiguration ccfg) throws IgniteCheckedException { + assert ccfg != null; + + boolean storeVal = !ccfg.isCopyOnRead() || (!isBinaryEnabled(ccfg) && + (QueryUtils.isEnabled(ccfg) || ctx.config().isPeerClassLoadingEnabled())); + + boolean binaryEnabled = marsh instanceof BinaryMarshaller && !GridCacheUtils.isSystemCache(ccfg.getName()) && + !GridCacheUtils.isIgfsCache(ctx.config(), ccfg.getName()); + + AffinityKeyMapper cacheAffMapper = ccfg.getAffinityMapper(); + + boolean customAffMapper = + cacheAffMapper != null && + !(cacheAffMapper instanceof CacheDefaultBinaryAffinityKeyMapper) && + !(cacheAffMapper instanceof GridCacheDefaultAffinityKeyMapper); + + AffinityKeyMapper dfltAffMapper = binaryEnabled ? + new CacheDefaultBinaryAffinityKeyMapper(ccfg.getKeyConfiguration()) : + new GridCacheDefaultAffinityKeyMapper(); + + ctx.resource().injectGeneric(dfltAffMapper); + + return new CacheObjectContext(ctx, + ccfg.getName(), + dfltAffMapper, + customAffMapper, + ccfg.isCopyOnRead(), + storeVal, + ctx.config().isPeerClassLoadingEnabled() && !isBinaryEnabled(ccfg), + binaryEnabled + ); } /** {@inheritDoc} */ @Override public byte[] marshal(CacheObjectValueContext ctx, Object val) throws IgniteCheckedException { if (!ctx.binaryEnabled() || binaryMarsh == null) - return super.marshal(ctx, val); + return CU.marshal(ctx.kernalContext().cache().context(), ctx.addDeploymentInfo(), val); byte[] arr = binaryMarsh.marshal(val, false); @@ -898,7 +971,7 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm @Override public Object unmarshal(CacheObjectValueContext ctx, byte[] bytes, ClassLoader clsLdr) throws IgniteCheckedException { if (!ctx.binaryEnabled() || binaryMarsh == null) - return super.unmarshal(ctx, bytes, clsLdr); + return U.unmarshal(ctx.kernalContext(), bytes, U.resolveClassLoader(clsLdr, ctx.kernalContext().config())); return binaryMarsh.unmarshal(bytes, clsLdr); } @@ -906,8 +979,19 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm /** {@inheritDoc} */ @Override public KeyCacheObject toCacheKeyObject(CacheObjectContext ctx, @Nullable GridCacheContext cctx, Object obj, boolean userObj) { - if (!ctx.binaryEnabled()) - return super.toCacheKeyObject(ctx, cctx, obj, userObj); + if (!ctx.binaryEnabled()) { + if (obj instanceof KeyCacheObject) { + KeyCacheObject key = (KeyCacheObject)obj; + + if (key.partition() == -1) + // Assume all KeyCacheObjects except BinaryObject can not be reused for another cache. + key.partition(partition(ctx, cctx, key)); + + return (KeyCacheObject)obj; + } + + return toCacheKeyObject0(ctx, cctx, obj, userObj); + } if (obj instanceof KeyCacheObject) { KeyCacheObject key = (KeyCacheObject)obj; @@ -926,7 +1010,7 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm obj = toBinary(obj, false); if (obj instanceof BinaryObjectImpl) { - ((BinaryObjectImpl)obj).partition(partition(ctx, cctx, obj)); + ((KeyCacheObject) obj).partition(partition(ctx, cctx, obj)); return (KeyCacheObject)obj; } @@ -934,11 +1018,33 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm return toCacheKeyObject0(ctx, cctx, obj, userObj); } + /** + * @param obj Object. + * @param userObj If {@code true} then given object is object provided by user and should be copied + * before stored in cache. + * @return Key cache object. + */ + protected KeyCacheObject toCacheKeyObject0(CacheObjectContext ctx, + @Nullable GridCacheContext cctx, + Object obj, + boolean userObj) { + int part = partition(ctx, cctx, obj); + + if (!userObj) + return new KeyCacheObjectImpl(obj, null, part); + + return new UserKeyCacheObjectImpl(obj, part); + } + /** {@inheritDoc} */ @Nullable @Override public CacheObject toCacheObject(CacheObjectContext ctx, @Nullable Object obj, boolean userObj, boolean failIfUnregistered) { - if (!ctx.binaryEnabled()) - return super.toCacheObject(ctx, obj, userObj, failIfUnregistered); + if (!ctx.binaryEnabled()) { + if (obj == null || obj instanceof CacheObject) + return (CacheObject)obj; + + return toCacheObject0(obj, userObj); + } if (obj == null || obj instanceof CacheObject) return (CacheObject)obj; @@ -951,23 +1057,136 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm return toCacheObject0(obj, userObj); } + /** + * @param obj Object. + * @param userObj If {@code true} then given object is object provided by user and should be copied + * before stored in cache. + * @return Cache object. + */ + private CacheObject toCacheObject0(@Nullable Object obj, boolean userObj) { + assert obj != null; + + if (obj instanceof byte[]) { + if (!userObj) + return new CacheObjectByteArrayImpl((byte[])obj); + + return new UserCacheObjectByteArrayImpl((byte[])obj); + } + + if (!userObj) + return new CacheObjectImpl(obj, null); + + return new UserCacheObjectImpl(obj, null); + } + + /** + * @param ctx Cache objects context. + * @param cctx Cache context. + * @param obj Object. + * @return Object partition. + */ + private int partition(CacheObjectContext ctx, @Nullable GridCacheContext cctx, Object obj) { + try { + return cctx != null ? + cctx.affinity().partition(obj, false) : + ctx.kernalContext().affinity().partition0(ctx.cacheName(), obj, null); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to get partition", e); + + return -1; + } + } + /** {@inheritDoc} */ @Override public CacheObject toCacheObject(CacheObjectContext ctx, byte type, byte[] bytes) { - if (type == BinaryObjectImpl.TYPE_BINARY) - return new BinaryObjectImpl(binaryContext(), bytes, 0); - else if (type == BinaryObjectImpl.TYPE_BINARY_ENUM) - return new BinaryEnumObjectImpl(binaryContext(), bytes); + switch (type) { + case BinaryObjectImpl.TYPE_BINARY: + return new BinaryObjectImpl(binaryContext(), bytes, 0); - return super.toCacheObject(ctx, type, bytes); + case BinaryObjectImpl.TYPE_BINARY_ENUM: + return new BinaryEnumObjectImpl(binaryContext(), bytes); + + case CacheObject.TYPE_BYTE_ARR: + return new CacheObjectByteArrayImpl(bytes); + + case CacheObject.TYPE_REGULAR: + return new CacheObjectImpl(null, bytes); + } + + throw new IllegalArgumentException("Invalid object type: " + type); } /** {@inheritDoc} */ @Override public KeyCacheObject toKeyCacheObject(CacheObjectContext ctx, byte type, byte[] bytes) throws IgniteCheckedException { - if (type == BinaryObjectImpl.TYPE_BINARY) - return new BinaryObjectImpl(binaryContext(), bytes, 0); + switch (type) { + case BinaryObjectImpl.TYPE_BINARY: + return new BinaryObjectImpl(binaryContext(), bytes, 0); + + case CacheObject.TYPE_BYTE_ARR: + throw new IllegalArgumentException("Byte arrays cannot be used as cache keys."); + + case CacheObject.TYPE_REGULAR: + return new KeyCacheObjectImpl(ctx.kernalContext().cacheObjects().unmarshal(ctx, bytes, null), bytes, -1); + } + + throw new IllegalArgumentException("Invalid object type: " + type); + } + + /** {@inheritDoc} */ + @Override public CacheObject toCacheObject(CacheObjectContext ctx, ByteBuffer buf) { + int len = buf.getInt(); - return super.toKeyCacheObject(ctx, type, bytes); + assert len >= 0 : len; + + byte type = buf.get(); + + byte[] data = new byte[len]; + + buf.get(data); + + return toCacheObject(ctx, type, data); + } + + /** {@inheritDoc} */ + @Override public IncompleteCacheObject toCacheObject(CacheObjectContext ctx, ByteBuffer buf, + @Nullable IncompleteCacheObject incompleteObj) { + if (incompleteObj == null) + incompleteObj = new IncompleteCacheObject(buf); + + if (incompleteObj.isReady()) + return incompleteObj; + + incompleteObj.readData(buf); + + if (incompleteObj.isReady()) + incompleteObj.object(toCacheObject(ctx, incompleteObj.type(), incompleteObj.data())); + + return incompleteObj; + } + + /** {@inheritDoc} */ + @Override public IncompleteCacheObject toKeyCacheObject(CacheObjectContext ctx, ByteBuffer buf, + @Nullable IncompleteCacheObject incompleteObj) throws IgniteCheckedException { + if (incompleteObj == null) + incompleteObj = new IncompleteCacheObject(buf); + + if (incompleteObj.isReady()) + return incompleteObj; + + incompleteObj.readData(buf); + + if (incompleteObj.isReady()) + incompleteObj.object(toKeyCacheObject(ctx, incompleteObj.type(), incompleteObj.data())); + + return incompleteObj; + } + + /** {@inheritDoc} */ + @Nullable @Override public CacheObject toCacheObject(CacheObjectContext ctx, @Nullable Object obj, + boolean userObj) { + return toCacheObject(ctx, obj, userObj, false); } /** {@inheritDoc} */ @@ -1177,6 +1396,7 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm } /** */ + @SuppressWarnings("PublicInnerClass") public static class TestBinaryContext extends BinaryContext { /** */ private List<TestBinaryContextListener> listeners; http://git-wip-us.apache.org/repos/asf/ignite/blob/c1edb300/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/IgniteBinaryImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/IgniteBinaryImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/IgniteBinaryImpl.java index 5f4cdcd..0ee8a0c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/IgniteBinaryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/IgniteBinaryImpl.java @@ -37,12 +37,12 @@ public class IgniteBinaryImpl implements IgniteBinary { private GridKernalContext ctx; /** */ - private CacheObjectBinaryProcessor proc; + private IgniteCacheObjectProcessor proc; /** * @param ctx Context. */ - public IgniteBinaryImpl(GridKernalContext ctx, CacheObjectBinaryProcessor proc) { + public IgniteBinaryImpl(GridKernalContext ctx, IgniteCacheObjectProcessor proc) { this.ctx = ctx; this.proc = proc; http://git-wip-us.apache.org/repos/asf/ignite/blob/c1edb300/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java index 00495e1..238f99a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java @@ -400,7 +400,7 @@ class StandaloneWalRecordsIterator extends AbstractWalRecordsIterator { IgniteCacheObjectProcessor processor ) throws IgniteCheckedException { final CacheObjectContext fakeCacheObjCtx = new CacheObjectContext( - kernalCtx, null, null, false, false, false); + kernalCtx, null, null, false, false, false, false, false); final List<DataEntry> entries = dataRec.writeEntries(); final List<DataEntry> postProcessedEntries = new ArrayList<>(entries.size()); http://git-wip-us.apache.org/repos/asf/ignite/blob/c1edb300/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java index 0759a93..f59a99e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java @@ -18,11 +18,18 @@ package org.apache.ignite.internal.processors.cacheobject; import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.Map; + import org.apache.ignite.IgniteBinary; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; +import org.apache.ignite.binary.BinaryObject; +import org.apache.ignite.binary.BinaryObjectBuilder; +import org.apache.ignite.binary.BinaryType; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.binary.BinaryFieldMetadata; import org.apache.ignite.internal.processors.GridProcessor; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.CacheObjectContext; @@ -213,6 +220,116 @@ public interface IgniteCacheObjectProcessor extends GridProcessor { /** * @return Ignite binary interface. + * @throws IgniteException If failed. + */ + public IgniteBinary binary() throws IgniteException; + + /** + * @param clsName Class name. + * @return Builder. + */ + public BinaryObjectBuilder builder(String clsName); + + /** + * Creates builder initialized by existing binary object. + * + * @param binaryObj Binary object to edit. + * @return Binary builder. + */ + public BinaryObjectBuilder builder(BinaryObject binaryObj); + + /** + * @param typeId Type ID. + * @param newMeta New meta data. + * @param failIfUnregistered Fail if unregistered. + * @throws IgniteException In case of error. + */ + public void addMeta(int typeId, final BinaryType newMeta, boolean failIfUnregistered) throws IgniteException; + + /** + * Adds metadata locally without triggering discovery exchange. + * + * Must be used only during startup and only if it is guaranteed that all nodes have the same copy + * of BinaryType. + * + * @param typeId Type ID. + * @param newMeta New meta data. + * @throws IgniteException In case of error. + */ + public void addMetaLocally(int typeId, final BinaryType newMeta) throws IgniteException; + + /** + * @param typeId Type ID. + * @param typeName Type name. + * @param affKeyFieldName Affinity key field name. + * @param fieldTypeIds Fields map. + * @param isEnum Enum flag. + * @param enumMap Enum name to ordinal mapping. + * @throws IgniteException In case of error. + */ + public void updateMetadata(int typeId, String typeName, @Nullable String affKeyFieldName, + Map<String, BinaryFieldMetadata> fieldTypeIds, boolean isEnum, @Nullable Map<String, Integer> enumMap) + throws IgniteException; + + /** + * @param typeId Type ID. + * @return Meta data. + * @throws IgniteException In case of error. + */ + @Nullable public BinaryType metadata(int typeId) throws IgniteException; + + + /** + * @param typeId Type ID. + * @param schemaId Schema ID. + * @return Meta data. + * @throws IgniteException In case of error. + */ + @Nullable public BinaryType metadata(int typeId, int schemaId) throws IgniteException; + + /** + * @param typeIds Type ID. + * @return Meta data. + * @throws IgniteException In case of error. + */ + public Map<Integer, BinaryType> metadata(Collection<Integer> typeIds) throws IgniteException; + + /** + * @return Metadata for all types. + * @throws IgniteException In case of error. + */ + public Collection<BinaryType> metadata() throws IgniteException; + + /** + * @param typeName Type name. + * @param ord ordinal. + * @return Enum object. + * @throws IgniteException If failed. + */ + public BinaryObject buildEnum(String typeName, int ord) throws IgniteException; + + /** + * @param typeName Type name. + * @param name Name. + * @return Enum object. + * @throws IgniteException If failed. + */ + public BinaryObject buildEnum(String typeName, String name) throws IgniteException; + + /** + * Register enum type + * + * @param typeName Type name. + * @param vals Mapping of enum constant names to ordinals. + * @return Binary Type for registered enum. + */ + public BinaryType registerEnum(String typeName, Map<String, Integer> vals) throws IgniteException; + + /** + * @param obj Original object. + * @param failIfUnregistered Throw exception if class isn't registered. + * @return Binary object (in case binary marshaller is used). + * @throws IgniteException If failed. */ - public IgniteBinary binary(); + public Object marshalToBinary(Object obj, boolean failIfUnregistered) throws IgniteException; } http://git-wip-us.apache.org/repos/asf/ignite/blob/c1edb300/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java deleted file mode 100644 index 2170d54..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java +++ /dev/null @@ -1,346 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cacheobject; - -import java.math.BigDecimal; -import java.nio.ByteBuffer; -import java.util.Collection; -import java.util.HashSet; -import java.util.UUID; -import org.apache.ignite.IgniteBinary; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteException; -import org.apache.ignite.configuration.CacheConfiguration; -import org.apache.ignite.internal.GridKernalContext; -import org.apache.ignite.internal.processors.GridProcessorAdapter; -import org.apache.ignite.internal.processors.cache.CacheObject; -import org.apache.ignite.internal.processors.cache.CacheObjectByteArrayImpl; -import org.apache.ignite.internal.processors.cache.CacheObjectContext; -import org.apache.ignite.internal.processors.cache.CacheObjectImpl; -import org.apache.ignite.internal.processors.cache.CacheObjectValueContext; -import org.apache.ignite.internal.processors.cache.GridCacheContext; -import org.apache.ignite.internal.processors.cache.GridCacheDefaultAffinityKeyMapper; -import org.apache.ignite.internal.processors.cache.IncompleteCacheObject; -import org.apache.ignite.internal.processors.cache.KeyCacheObject; -import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl; -import org.apache.ignite.internal.processors.query.QueryUtils; -import org.apache.ignite.internal.util.typedef.internal.CU; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.lang.IgniteUuid; -import org.jetbrains.annotations.Nullable; - -/** - * - */ -public class IgniteCacheObjectProcessorImpl extends GridProcessorAdapter implements IgniteCacheObjectProcessor { - /** Immutable classes. */ - private static final Collection<Class<?>> IMMUTABLE_CLS = new HashSet<>(); - - /** */ - private IgniteBinary noOpBinary = new NoOpBinary(); - - /* - * Static initializer - */ - static { - IMMUTABLE_CLS.add(String.class); - IMMUTABLE_CLS.add(Boolean.class); - IMMUTABLE_CLS.add(Byte.class); - IMMUTABLE_CLS.add(Short.class); - IMMUTABLE_CLS.add(Character.class); - IMMUTABLE_CLS.add(Integer.class); - IMMUTABLE_CLS.add(Long.class); - IMMUTABLE_CLS.add(Float.class); - IMMUTABLE_CLS.add(Double.class); - IMMUTABLE_CLS.add(UUID.class); - IMMUTABLE_CLS.add(IgniteUuid.class); - IMMUTABLE_CLS.add(BigDecimal.class); - } - - /** - * @param ctx Context. - */ - public IgniteCacheObjectProcessorImpl(GridKernalContext ctx) { - super(ctx); - } - - /** {@inheritDoc} */ - @Override public IgniteBinary binary() { - return noOpBinary; - } - - /** {@inheritDoc} */ - @Nullable @Override public CacheObject prepareForCache(@Nullable CacheObject obj, GridCacheContext cctx) { - if (obj == null) - return null; - - return obj.prepareForCache(cctx.cacheObjectContext()); - } - - /** {@inheritDoc} */ - @Override public byte[] marshal(CacheObjectValueContext ctx, Object val) throws IgniteCheckedException { - return CU.marshal(ctx.kernalContext().cache().context(), ctx.addDeploymentInfo(), val); - } - - /** {@inheritDoc} */ - @Override public Object unmarshal(CacheObjectValueContext ctx, byte[] bytes, ClassLoader clsLdr) - throws IgniteCheckedException { - return U.unmarshal(ctx.kernalContext(), bytes, U.resolveClassLoader(clsLdr, ctx.kernalContext().config())); - } - - /** {@inheritDoc} */ - @Override public KeyCacheObject toCacheKeyObject(CacheObjectContext ctx, - @Nullable GridCacheContext cctx, - Object obj, - boolean userObj) { - if (obj instanceof KeyCacheObject) { - KeyCacheObject key = (KeyCacheObject)obj; - - if (key.partition() == -1) - // Assume all KeyCacheObjects except BinaryObject can not be reused for another cache. - key.partition(partition(ctx, cctx, key)); - - return (KeyCacheObject)obj; - } - - return toCacheKeyObject0(ctx, cctx, obj, userObj); - } - - /** - * @param obj Object. - * @param userObj If {@code true} then given object is object provided by user and should be copied - * before stored in cache. - * @return Key cache object. - */ - protected KeyCacheObject toCacheKeyObject0(CacheObjectContext ctx, - @Nullable GridCacheContext cctx, - Object obj, - boolean userObj) { - int part = partition(ctx, cctx, obj); - - if (!userObj) - return new KeyCacheObjectImpl(obj, null, part); - - return new UserKeyCacheObjectImpl(obj, part); - } - - /** {@inheritDoc} */ - @Override public CacheObject toCacheObject(CacheObjectContext ctx, byte type, byte[] bytes) { - switch (type) { - case CacheObject.TYPE_BYTE_ARR: - return new CacheObjectByteArrayImpl(bytes); - - case CacheObject.TYPE_REGULAR: - return new CacheObjectImpl(null, bytes); - } - - throw new IllegalArgumentException("Invalid object type: " + type); - } - - /** {@inheritDoc} */ - @Override public KeyCacheObject toKeyCacheObject(CacheObjectContext ctx, byte type, byte[] bytes) throws IgniteCheckedException { - switch (type) { - case CacheObject.TYPE_BYTE_ARR: - throw new IllegalArgumentException("Byte arrays cannot be used as cache keys."); - - case CacheObject.TYPE_REGULAR: - return new KeyCacheObjectImpl(ctx.kernalContext().cacheObjects().unmarshal(ctx, bytes, null), bytes, -1); - } - - throw new IllegalArgumentException("Invalid object type: " + type); - } - - /** {@inheritDoc} */ - @Override public CacheObject toCacheObject(CacheObjectContext ctx, ByteBuffer buf) { - int len = buf.getInt(); - - assert len >= 0 : len; - - byte type = buf.get(); - - byte[] data = new byte[len]; - - buf.get(data); - - return toCacheObject(ctx, type, data); - } - - /** {@inheritDoc} */ - @Override public IncompleteCacheObject toCacheObject( - final CacheObjectContext ctx, - final ByteBuffer buf, - @Nullable IncompleteCacheObject incompleteObj - ) throws IgniteCheckedException { - if (incompleteObj == null) - incompleteObj = new IncompleteCacheObject(buf); - - if (incompleteObj.isReady()) - return incompleteObj; - - incompleteObj.readData(buf); - - if (incompleteObj.isReady()) - incompleteObj.object(toCacheObject(ctx, incompleteObj.type(), incompleteObj.data())); - - return incompleteObj; - } - - /** {@inheritDoc} */ - @Override public IncompleteCacheObject toKeyCacheObject( - final CacheObjectContext ctx, - final ByteBuffer buf, - @Nullable IncompleteCacheObject incompleteObj - ) throws IgniteCheckedException { - if (incompleteObj == null) - incompleteObj = new IncompleteCacheObject(buf); - - if (incompleteObj.isReady()) - return incompleteObj; - - incompleteObj.readData(buf); - - if (incompleteObj.isReady()) - incompleteObj.object(toKeyCacheObject(ctx, incompleteObj.type(), incompleteObj.data())); - - return incompleteObj; - } - - /** {@inheritDoc} */ - @Nullable @Override public CacheObject toCacheObject(CacheObjectContext ctx, - @Nullable Object obj, - boolean userObj) { - return toCacheObject(ctx, obj, userObj, false); - } - - /** {@inheritDoc} */ - @Nullable @Override public CacheObject toCacheObject(CacheObjectContext ctx, @Nullable Object obj, boolean userObj, - boolean failIfUnregistered) { - if (obj == null || obj instanceof CacheObject) - return (CacheObject)obj; - - return toCacheObject0(obj, userObj); - } - - /** - * @param obj Object. - * @param userObj If {@code true} then given object is object provided by user and should be copied - * before stored in cache. - * @return Cache object. - */ - protected CacheObject toCacheObject0(@Nullable Object obj, boolean userObj) { - assert obj != null; - - if (obj instanceof byte[]) { - if (!userObj) - return new CacheObjectByteArrayImpl((byte[])obj); - - return new UserCacheObjectByteArrayImpl((byte[])obj); - } - - if (!userObj) - return new CacheObjectImpl(obj, null); - - return new UserCacheObjectImpl(obj, null); - } - - /** - * @param ctx Cache objects context. - * @param cctx Cache context. - * @param obj Object. - * @return Object partition. - */ - protected final int partition(CacheObjectContext ctx, @Nullable GridCacheContext cctx, Object obj) { - try { - return cctx != null ? - cctx.affinity().partition(obj, false) : - ctx.kernalContext().affinity().partition0(ctx.cacheName(), obj, null); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to get partition", e); - - return -1; - } - } - - /** {@inheritDoc} */ - @Override public CacheObjectContext contextForCache(CacheConfiguration ccfg) throws IgniteCheckedException { - assert ccfg != null; - - boolean storeVal = !ccfg.isCopyOnRead() || (!isBinaryEnabled(ccfg) && - (QueryUtils.isEnabled(ccfg) || ctx.config().isPeerClassLoadingEnabled())); - - CacheObjectContext res = new CacheObjectContext(ctx, - ccfg.getName(), - ccfg.getAffinityMapper() != null ? ccfg.getAffinityMapper() : new GridCacheDefaultAffinityKeyMapper(), - ccfg.isCopyOnRead(), - storeVal, - ctx.config().isPeerClassLoadingEnabled() && !isBinaryEnabled(ccfg)); - - ctx.resource().injectGeneric(res.defaultAffMapper()); - - return res; - } - - /** {@inheritDoc} */ - @Override public boolean immutable(Object obj) { - assert obj != null; - - return IMMUTABLE_CLS.contains(obj.getClass()); - } - - /** {@inheritDoc} */ - @Override public void onContinuousProcessorStarted(GridKernalContext ctx) throws IgniteCheckedException { - // No-op. - } - - /** {@inheritDoc} */ - @Override public int typeId(String typeName) { - return 0; - } - - /** {@inheritDoc} */ - @Override public Object unwrapTemporary(GridCacheContext ctx, Object obj) throws IgniteException { - return obj; - } - - /** {@inheritDoc} */ - @Override public boolean isBinaryObject(Object obj) { - return false; - } - - /** {@inheritDoc} */ - @Override public boolean isBinaryEnabled(CacheConfiguration<?, ?> ccfg) { - return false; - } - - /** {@inheritDoc} */ - @Override public int typeId(Object obj) { - return 0; - } - - /** {@inheritDoc} */ - @Override public Object field(Object obj, String fieldName) { - return null; - } - - /** {@inheritDoc} */ - @Override public boolean hasField(Object obj, String fieldName) { - return false; - } - -} http://git-wip-us.apache.org/repos/asf/ignite/blob/c1edb300/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/UserKeyCacheObjectImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/UserKeyCacheObjectImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/UserKeyCacheObjectImpl.java index de57667..24bf22b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/UserKeyCacheObjectImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/UserKeyCacheObjectImpl.java @@ -44,7 +44,7 @@ public class UserKeyCacheObjectImpl extends KeyCacheObjectImpl { * @param key Key. * @param part Partition. */ - UserKeyCacheObjectImpl(Object key, int part) { + public UserKeyCacheObjectImpl(Object key, int part) { super(key, null, part); } http://git-wip-us.apache.org/repos/asf/ignite/blob/c1edb300/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java index 65c8205..eee21e2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java @@ -682,7 +682,7 @@ public class GridQueryProcessor extends GridProcessorAdapter { ctx.cache().context().database().checkpointReadLock(); try { - if (cacheInfo.isClientCache() && cacheInfo.isCacheContextInited() && idx.initCacheContext(cacheInfo.gridCacheContext())) + if (cacheInfo.isClientCache() && cacheInfo.isCacheContextInited() && idx.initCacheContext(cacheInfo.cacheContext())) return; synchronized (stateMux) { @@ -701,8 +701,15 @@ public class GridQueryProcessor extends GridProcessorAdapter { if (!F.isEmpty(qryEntities)) { for (QueryEntity qryEntity : qryEntities) { - QueryTypeCandidate cand = QueryUtils.typeForQueryEntity(cacheName, schemaName, cacheInfo, qryEntity, - mustDeserializeClss, escape); + QueryTypeCandidate cand = QueryUtils.typeForQueryEntity( + ctx, + cacheName, + schemaName, + cacheInfo, + qryEntity, + mustDeserializeClss, + escape + ); cands.add(cand); } @@ -1448,7 +1455,7 @@ public class GridQueryProcessor extends GridProcessorAdapter { SchemaIndexCacheVisitor visitor; if (cacheInfo.isCacheContextInited()) { - GridCacheContext cctx = cacheInfo.gridCacheContext(); + GridCacheContext cctx = cacheInfo.cacheContext(); SchemaIndexCacheFilter filter = new TableCacheFilter(cctx, op0.tableName()); http://git-wip-us.apache.org/repos/asf/ignite/blob/c1edb300/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryTypeDescriptor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryTypeDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryTypeDescriptor.java index af42452..13ae9bf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryTypeDescriptor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryTypeDescriptor.java @@ -138,6 +138,11 @@ public interface GridQueryTypeDescriptor { public String affinityKey(); /** + * @return Whether custom affinity key mapper exists. + */ + public boolean customAffinityKeyMapper(); + + /** * @return BinaryObject's type ID if indexed value is BinaryObject, otherwise value class' hash code. */ public int typeId(); http://git-wip-us.apache.org/repos/asf/ignite/blob/c1edb300/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryTypeDescriptorImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryTypeDescriptorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryTypeDescriptorImpl.java index 87394f6..722205b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryTypeDescriptorImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryTypeDescriptorImpl.java @@ -108,6 +108,9 @@ public class QueryTypeDescriptorImpl implements GridQueryTypeDescriptor { private String affKey; /** */ + private boolean customAffKeyMapper; + + /** */ private String keyFieldName; /** */ @@ -123,7 +126,7 @@ public class QueryTypeDescriptorImpl implements GridQueryTypeDescriptor { private List<GridQueryProperty> propsWithDefaultValue; /** */ - @Nullable private CacheObjectContext coCtx; + private final CacheObjectContext coCtx; /** * Constructor. @@ -131,7 +134,7 @@ public class QueryTypeDescriptorImpl implements GridQueryTypeDescriptor { * @param cacheName Cache name. * @param coCtx Cache object context. */ - public QueryTypeDescriptorImpl(String cacheName, @Nullable CacheObjectContext coCtx) { + public QueryTypeDescriptorImpl(String cacheName, CacheObjectContext coCtx) { this.cacheName = cacheName; this.coCtx = coCtx; } @@ -481,6 +484,18 @@ public class QueryTypeDescriptorImpl implements GridQueryTypeDescriptor { this.affKey = affKey; } + /** {@inheritDoc} */ + @Override public boolean customAffinityKeyMapper() { + return customAffKeyMapper; + } + + /** + * @param customAffKeyMapper Whether custom affinity key mapper is set. + */ + public void customAffinityKeyMapper(boolean customAffKeyMapper) { + this.customAffKeyMapper = customAffKeyMapper; + } + /** * @return Aliases. */ @@ -564,14 +579,12 @@ public class QueryTypeDescriptorImpl implements GridQueryTypeDescriptor { boolean isKey = false; if (F.eq(prop.name(), keyFieldName) || (keyFieldName == null && F.eq(prop.name(), KEY_FIELD_NAME))) { - propVal = key instanceof KeyCacheObject && coCtx != null ? - ((KeyCacheObject)key).value(coCtx, true) : key; + propVal = key instanceof KeyCacheObject ? ((CacheObject) key).value(coCtx, true) : key; isKey = true; } else if (F.eq(prop.name(), valFieldName) || (valFieldName == null && F.eq(prop.name(), VAL_FIELD_NAME))) { - propVal = val instanceof CacheObject && coCtx != null ? - ((CacheObject)val).value(coCtx, true) : val; + propVal = val instanceof CacheObject ? ((CacheObject)val).value(coCtx, true) : val; } else { propVal = prop.value(key, val); http://git-wip-us.apache.org/repos/asf/ignite/blob/c1edb300/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryUtils.java index 08d44bd..e251bd6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryUtils.java @@ -404,16 +404,15 @@ public class QueryUtils { * @return Type candidate. * @throws IgniteCheckedException If failed. */ - public static QueryTypeCandidate typeForQueryEntity(String cacheName, String schemaName, + public static QueryTypeCandidate typeForQueryEntity(GridKernalContext ctx, String cacheName, String schemaName, GridCacheContextInfo cacheInfo, QueryEntity qryEntity, List<Class<?>> mustDeserializeClss, boolean escape) throws IgniteCheckedException { - GridKernalContext ctx = cacheInfo.context(); CacheConfiguration<?, ?> ccfg = cacheInfo.config(); boolean binaryEnabled = ctx.cacheObjects().isBinaryEnabled(ccfg); - CacheObjectContext coCtx = binaryEnabled ? ctx.cacheObjects().contextForCache(ccfg) : null; + CacheObjectContext coCtx = ctx.cacheObjects().contextForCache(ccfg); QueryTypeDescriptorImpl desc = new QueryTypeDescriptorImpl(cacheName, coCtx); @@ -493,27 +492,31 @@ public class QueryUtils { String affField = null; - // Need to setup affinity key for distributed joins. - String keyType = qryEntity.getKeyType(); + if (!coCtx.customAffinityMapper()) { + String keyType = qryEntity.getKeyType(); - if (!cacheInfo.customAffinityMapper() && keyType != null) { - if (coCtx != null) { + if (keyType != null) { CacheDefaultBinaryAffinityKeyMapper mapper = (CacheDefaultBinaryAffinityKeyMapper)coCtx.defaultAffMapper(); BinaryField field = mapper.affinityKeyField(keyType); - if (field != null) - affField = field.name(); - } - } + if (field != null) { + String affField0 = field.name(); - if (affField != null) { - if (!escape) - affField = normalizeObjectName(affField, false); + if (!F.isEmpty(qryEntity.getKeyFields()) && qryEntity.getKeyFields().contains(affField0)) { + affField = affField0; - desc.affinityKey(affField); + if (!escape) + affField = normalizeObjectName(affField, false); + } + } + } } + else + desc.customAffinityKeyMapper(true); + + desc.affinityKey(affField); } else { processClassMeta(qryEntity, desc, coCtx); @@ -531,6 +534,8 @@ public class QueryUtils { desc.affinityKey(affField); } } + else + desc.customAffinityKeyMapper(true); typeId = new QueryTypeIdKey(cacheName, valCls); altTypeId = new QueryTypeIdKey(cacheName, valTypeId); http://git-wip-us.apache.org/repos/asf/ignite/blob/c1edb300/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryMarshallerSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryMarshallerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryMarshallerSelfTest.java index eaca668..17689aa 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryMarshallerSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryMarshallerSelfTest.java @@ -2601,7 +2601,7 @@ public class BinaryMarshallerSelfTest extends GridCommonAbstractTest { BinaryObjectImpl po = marshal(simpleObject(), marsh); - CacheObjectContext coCtx = new CacheObjectContext(newContext(), null, null, false, true, false); + CacheObjectContext coCtx = new CacheObjectContext(newContext(), null, null, false, false, true, false, false); assert po.value(coCtx, false) == po.value(coCtx, false); http://git-wip-us.apache.org/repos/asf/ignite/blob/c1edb300/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java index a30ae20..167a7cd 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java @@ -345,7 +345,7 @@ public class IgniteClientCacheInitializationFailTest extends GridCommonAbstractT /** {@inheritDoc} */ @Override public void registerCache(String cacheName, String schemaName, GridCacheContextInfo<?, ?> cacheInfo) throws IgniteCheckedException { - if (FAILED_CACHES.contains(cacheInfo.name()) && cacheInfo.gridCacheContext().kernalContext().clientNode()) + if (FAILED_CACHES.contains(cacheInfo.name()) && cacheInfo.cacheContext().kernalContext().clientNode()) throw new IgniteCheckedException("Test query exception " + cacheInfo.name() + " " + new Random().nextInt()); } http://git-wip-us.apache.org/repos/asf/ignite/blob/c1edb300/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalIteratorSwitchSegmentTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalIteratorSwitchSegmentTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalIteratorSwitchSegmentTest.java index 001f77c..0f6c03f 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalIteratorSwitchSegmentTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalIteratorSwitchSegmentTest.java @@ -40,6 +40,7 @@ import org.apache.ignite.internal.pagemem.wal.record.WALRecord; import org.apache.ignite.internal.processors.cache.GridCacheIoManager; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.WalStateManager; +import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl; import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager; import org.apache.ignite.internal.processors.cache.persistence.file.FileIO; @@ -52,7 +53,6 @@ import org.apache.ignite.internal.processors.cache.persistence.wal.reader.Standa import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializer; import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactoryImpl; import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor; -import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessorImpl; import org.apache.ignite.internal.processors.subscription.GridInternalSubscriptionProcessor; import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.U; @@ -124,7 +124,7 @@ public class IgniteWalIteratorSwitchSegmentTest extends GridCommonAbstractTest { GridKernalContext kctx = new StandaloneGridKernalContext( log, null, null) { @Override public IgniteCacheObjectProcessor cacheObjects() { - return new IgniteCacheObjectProcessorImpl(this); + return new CacheObjectBinaryProcessorImpl(this); } }; http://git-wip-us.apache.org/repos/asf/ignite/blob/c1edb300/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java index a8d447c..dfd677b 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java @@ -1233,7 +1233,7 @@ public class DmlStatementsProcessor { IgniteQueryErrorCode.TABLE_NOT_FOUND); } - H2Utils.checkAndStartNotStartedCache(tbl); + H2Utils.checkAndStartNotStartedCache(ctx, tbl); UpdatePlan plan = UpdatePlanBuilder.planForBulkLoad(cmd, tbl); http://git-wip-us.apache.org/repos/asf/ignite/blob/c1edb300/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2RowCacheRegistry.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2RowCacheRegistry.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2RowCacheRegistry.java index ca135f1..7c06cd0 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2RowCacheRegistry.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2RowCacheRegistry.java @@ -69,7 +69,7 @@ public class H2RowCacheRegistry { HashMap<Integer, H2RowCache> caches0 = copy(); if (cacheInfo.affinityNode()) { - GridCacheContext cacheCtx = cacheInfo.gridCacheContext(); + GridCacheContext cacheCtx = cacheInfo.cacheContext(); assert cacheCtx != null; http://git-wip-us.apache.org/repos/asf/ignite/blob/c1edb300/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2TableDescriptor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2TableDescriptor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2TableDescriptor.java index 7ec8b9b..5a0cc44 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2TableDescriptor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2TableDescriptor.java @@ -174,7 +174,7 @@ public class H2TableDescriptor implements GridH2SystemIndexFactory { * @return Cache context. */ public GridCacheContext cache() { - return cacheInfo.gridCacheContext(); + return cacheInfo.cacheContext(); } /** @@ -204,7 +204,7 @@ public class H2TableDescriptor implements GridH2SystemIndexFactory { */ H2RowFactory rowFactory(GridH2RowDescriptor rowDesc) { if (cacheInfo.affinityNode()) - return new H2RowFactory(rowDesc, cacheInfo.gridCacheContext()); + return new H2RowFactory(rowDesc, cacheInfo.cacheContext()); return null; } @@ -437,7 +437,7 @@ public class H2TableDescriptor implements GridH2SystemIndexFactory { if (cacheInfo.affinityNode()) { assert pkHashIdx == null : pkHashIdx; - pkHashIdx = new H2PkHashIndex(cacheInfo.gridCacheContext(), tbl, PK_HASH_IDX_NAME, cols); + pkHashIdx = new H2PkHashIndex(cacheInfo.cacheContext(), tbl, PK_HASH_IDX_NAME, cols); return pkHashIdx; }
