Repository: incubator-ignite Updated Branches: refs/heads/ignite-950 204a408f9 -> 9e2943db8
ignite-950: checking whether a footer is supported for object or not Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/549a1a71 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/549a1a71 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/549a1a71 Branch: refs/heads/ignite-950 Commit: 549a1a716b5e618606e5fea30174bb2fea27f04a Parents: 204a408 Author: Denis Magda <[email protected]> Authored: Tue Jun 16 11:13:19 2015 +0300 Committer: Denis Magda <[email protected]> Committed: Tue Jun 16 11:13:19 2015 +0300 ---------------------------------------------------------------------- .../cacheobject/IgniteCacheObjectProcessor.java | 11 +++ .../IgniteCacheObjectProcessorImpl.java | 80 ++++++++++++++++++-- .../optimized/OptimizedClassDescriptor.java | 30 +++++--- .../optimized/OptimizedMarshaller.java | 21 +++++ .../optimized/OptimizedMarshallerUtils.java | 10 ++- .../optimized/OptimizedObjectInputStream.java | 10 ++- .../optimized/OptimizedObjectOutputStream.java | 29 ++++--- 7 files changed, 160 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/549a1a71/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java index dc0d1e5..c339037 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java @@ -97,6 +97,17 @@ public interface IgniteCacheObjectProcessor extends GridProcessor { public boolean hasField(Object obj, String fieldName); /** + * Checks whether a footer injection into a serialized form of the object is supported. + * Footer contains information on fields location in the serialized form, thus enabling fast queries without a need + * to deserialize the object. + * + * @param obj Object. + * @return {@code true} if the footer is supported. + * @throws IgniteCheckedException If failed. + */ + public boolean footerSupported(Object obj) throws IgniteCheckedException; + + /** * @param ctx Cache object context. * @param val Value. * @return Value bytes. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/549a1a71/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java index c9f3444..a002d36 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java @@ -27,10 +27,13 @@ import org.apache.ignite.internal.processors.query.*; import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; +import org.apache.ignite.marshaller.*; +import org.apache.ignite.marshaller.optimized.*; import org.jetbrains.annotations.*; import java.math.*; import java.util.*; +import java.util.concurrent.*; import static org.apache.ignite.cache.CacheMemoryMode.*; @@ -44,6 +47,21 @@ public class IgniteCacheObjectProcessorImpl extends GridProcessorAdapter impleme /** Immutable classes. */ private static final Collection<Class<?>> IMMUTABLE_CLS = new HashSet<>(); + /** */ + private static final OptimizedObjectMetadata EMPTY_META = new OptimizedObjectMetadata(); + + /** */ + private volatile IgniteCacheProxy<OptimizedObjectMetadataKey, OptimizedObjectMetadata> metaDataCache; + + /** Metadata updates collected before metadata cache is initialized. */ + private final ConcurrentHashMap<Integer, OptimizedObjectMetadata> metaBuf = new ConcurrentHashMap<>(); + + /** */ + private final CountDownLatch startLatch = new CountDownLatch(1); + + /** */ + private OptimizedMarshaller optMarsh; + /** * */ @@ -62,6 +80,58 @@ public class IgniteCacheObjectProcessorImpl extends GridProcessorAdapter impleme IMMUTABLE_CLS.add(BigDecimal.class); } + /** {@inheritDoc} */ + @Override public void start() throws IgniteCheckedException { + super.start(); + + Marshaller marsh = ctx.config().getMarshaller(); + + if (marsh instanceof OptimizedMarshaller) { + optMarsh = (OptimizedMarshaller)marsh; + + OptimizedObjectMetadataHandler metaHandler = new OptimizedObjectMetadataHandler() { + @Override public void addMeta(int typeId, OptimizedObjectMetadata meta) { + if (metaBuf.contains(typeId)) + return; + + metaBuf.put(typeId, meta); + + if (metaDataCache != null) + metaDataCache.putIfAbsent(new OptimizedObjectMetadataKey(typeId), meta); + } + + @Override public OptimizedObjectMetadata metadata(int typeId) { + if (metaDataCache == null) + U.awaitQuiet(startLatch); + + OptimizedObjectMetadata meta = metaBuf.get(typeId); + + if (meta != null) + return meta == EMPTY_META ? null : meta; + + meta = metaDataCache.localPeek(new OptimizedObjectMetadataKey(typeId)); + + if (meta == null) + meta = EMPTY_META; + + return meta == EMPTY_META ? null : meta; + } + }; + + optMarsh.setMetadataHandler(metaHandler); + } + } + + /** {@inheritDoc} */ + @Override public void onUtilityCacheStarted() throws IgniteCheckedException { + metaDataCache = ctx.cache().jcache(CU.UTILITY_CACHE_NAME); + + startLatch.countDown(); + + for (Map.Entry<Integer, OptimizedObjectMetadata> e : metaBuf.entrySet()) + metaDataCache.putIfAbsent(new OptimizedObjectMetadataKey(e.getKey()), e.getValue()); + } + /** * @param ctx Context. */ @@ -213,11 +283,6 @@ public class IgniteCacheObjectProcessorImpl extends GridProcessorAdapter impleme } /** {@inheritDoc} */ - @Override public void onUtilityCacheStarted() throws IgniteCheckedException { - // No-op. - } - - /** {@inheritDoc} */ @Override public int typeId(String typeName) { return 0; } @@ -253,6 +318,11 @@ public class IgniteCacheObjectProcessorImpl extends GridProcessorAdapter impleme return false; } + /** {@inheritDoc} */ + @Override public boolean footerSupported(Object obj) throws IgniteCheckedException { + return optMarsh != null && optMarsh.footerSupported(obj); + } + /** * Wraps key provided by user, must be serialized before stored in cache. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/549a1a71/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedClassDescriptor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedClassDescriptor.java b/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedClassDescriptor.java index ba3dfcf..0ef39b1 100644 --- a/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedClassDescriptor.java +++ b/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedClassDescriptor.java @@ -119,6 +119,8 @@ class OptimizedClassDescriptor { * @param ctx Context. * @param mapper ID mapper. * @param metaHandler Metadata handler. + * @param tryEnableMeta Try to enable meta during {@code OptimizedClassDescriptor} registration. Meta is supported, + * only for objects that support footer injection is their serialized form. * @throws IOException In case of error. */ @SuppressWarnings("ForLoopReplaceableByForEach") @@ -127,7 +129,8 @@ class OptimizedClassDescriptor { ConcurrentMap<Class, OptimizedClassDescriptor> clsMap, MarshallerContext ctx, OptimizedMarshallerIdMapper mapper, - OptimizedObjectMetadataHandler metaHandler) + OptimizedObjectMetadataHandler metaHandler, + boolean tryEnableMeta) throws IOException { this.cls = cls; this.typeId = typeId; @@ -487,7 +490,7 @@ class OptimizedClassDescriptor { this.fields = new Fields(fields, fieldsIndexingEnabled); - if (fieldsIndexingEnabled && metaHandler.metadata(typeId) == null) { + if (tryEnableMeta && fieldsIndexingEnabled && metaHandler.metadata(typeId) == null) { OptimizedObjectMetadata meta = new OptimizedObjectMetadata(); for (ClassFields clsFields : this.fields.fields) @@ -496,9 +499,6 @@ class OptimizedClassDescriptor { U.debug("putting to cache: " + typeId); - if (typeId == 2104067130) { - System.out.println(); - } metaHandler.addMeta(typeId, meta); U.debug("put to cache: " + typeId); @@ -659,9 +659,8 @@ class OptimizedClassDescriptor { case OBJ_ARR: OptimizedClassDescriptor compDesc = classDescriptor(clsMap, - obj.getClass().getComponentType(), - ctx, - mapper, metaHandler); + obj.getClass().getComponentType(), ctx, + mapper, metaHandler, false); compDesc.writeTypeData(out); @@ -720,7 +719,8 @@ class OptimizedClassDescriptor { break; case CLS: - OptimizedClassDescriptor clsDesc = classDescriptor(clsMap, (Class<?>)obj, ctx, mapper, metaHandler); + OptimizedClassDescriptor clsDesc = classDescriptor(clsMap, (Class<?>)obj, ctx, mapper, metaHandler, + false); clsDesc.writeTypeData(out); @@ -815,6 +815,18 @@ class OptimizedClassDescriptor { } /** + * Returns type ID. + * + * @return Type ID. + */ + public int typeId() { + if (typeId == 0) + return resolveTypeId(cls.getName(), mapper); + + return typeId; + } + + /** * @param cls Class. * @return Type. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/549a1a71/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedMarshaller.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedMarshaller.java b/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedMarshaller.java index a4971c4..4e5b1a7 100644 --- a/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedMarshaller.java +++ b/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedMarshaller.java @@ -144,6 +144,7 @@ public class OptimizedMarshaller extends AbstractMarshaller { * performance reason to avoid costly recreation for every serialization routine. If {@code 0} (default), * pool is not used and each thread has its own cached object stream which it keeps reusing. * <p> + * * Since each stream has an internal buffer, creating a stream for each thread can lead to * high memory consumption if many large messages are marshalled or unmarshalled concurrently. * Consider using pool in this case. This will limit number of streams that can be created and, @@ -289,6 +290,26 @@ public class OptimizedMarshaller extends AbstractMarshaller { } /** + * Checks whether a footer injection into a serialized form of the object is supported. + * Footer contains information on fields location in the serialized form, thus enabling fast queries without a need + * to deserialize the object. + * + * @param obj Object. + * @return {@code true} if the footer is supported. + */ + public boolean footerSupported(Object obj) throws IgniteCheckedException { + try { + OptimizedClassDescriptor desc = OptimizedMarshallerUtils.classDescriptor(clsMap, obj.getClass(), ctx, + mapper, metaHandler, true); + + return metaHandler.metadata(desc.typeId()) != null; + } + catch (IOException e) { + throw new IgniteCheckedException("Failed to get class descriptor.", e); + } + } + + /** * Checks whether {@code GridOptimizedMarshaller} is able to work on the current JVM. * <p> * As long as {@code GridOptimizedMarshaller} uses JVM-private API, which is not guaranteed http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/549a1a71/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedMarshallerUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedMarshallerUtils.java b/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedMarshallerUtils.java index bf6a4fa..138e732 100644 --- a/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedMarshallerUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedMarshallerUtils.java @@ -181,6 +181,8 @@ class OptimizedMarshallerUtils { * @param ctx Context. * @param mapper ID mapper. * @param metaHandler Metadata handler. + * @param tryEnableMeta Try to enable meta during {@code OptimizedClassDescriptor} registration. Meta is supported, + * only for objects that support footer injection is their serialized form. * @return Descriptor. * @throws IOException In case of error. */ @@ -189,7 +191,8 @@ class OptimizedMarshallerUtils { Class cls, MarshallerContext ctx, OptimizedMarshallerIdMapper mapper, - OptimizedObjectMetadataHandler metaHandler) + OptimizedObjectMetadataHandler metaHandler, + boolean tryEnableMeta) throws IOException { OptimizedClassDescriptor desc = clsMap.get(cls); @@ -206,7 +209,8 @@ class OptimizedMarshallerUtils { throw new IOException("Failed to register class: " + cls.getName(), e); } - desc = new OptimizedClassDescriptor(cls, registered ? typeId : 0, clsMap, ctx, mapper, metaHandler); + desc = new OptimizedClassDescriptor(cls, registered ? typeId : 0, clsMap, ctx, mapper, metaHandler, + tryEnableMeta); if (registered) { OptimizedClassDescriptor old = clsMap.putIfAbsent(cls, desc); @@ -281,7 +285,7 @@ class OptimizedMarshallerUtils { if (desc == null) { OptimizedClassDescriptor old = clsMap.putIfAbsent(cls, desc = new OptimizedClassDescriptor(cls, resolveTypeId(cls.getName(), mapper), clsMap, ctx, mapper, - metaHandler)); + metaHandler, false)); if (old != null) desc = old; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/549a1a71/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedObjectInputStream.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedObjectInputStream.java b/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedObjectInputStream.java index 846fb08..70a2f3f 100644 --- a/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedObjectInputStream.java +++ b/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedObjectInputStream.java @@ -250,7 +250,7 @@ class OptimizedObjectInputStream extends ObjectInputStream { int typeId = readInt(); OptimizedClassDescriptor desc = typeId == 0 ? - classDescriptor(clsMap, U.forName(readUTF(), clsLdr), ctx, mapper, metaHandler): + classDescriptor(clsMap, U.forName(readUTF(), clsLdr), ctx, mapper, metaHandler, false): classDescriptor(clsMap, typeId, clsLdr, ctx, mapper, metaHandler); curCls = desc.describedClass(); @@ -540,10 +540,12 @@ class OptimizedObjectInputStream extends ObjectInputStream { } } - int footerLen = in.readInt(); + if (metaHandler.metadata(resolveTypeId(cls.getName(), mapper)) != null) { + int footerLen = in.readInt(); - if (footerLen != EMPTY_FOOTER) - in.skipBytes(footerLen - 4); + if (footerLen != EMPTY_FOOTER) + in.skipBytes(footerLen - 4); + } return obj; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/549a1a71/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedObjectOutputStream.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedObjectOutputStream.java b/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedObjectOutputStream.java index 2ead955..05d3e3d 100644 --- a/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedObjectOutputStream.java +++ b/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedObjectOutputStream.java @@ -194,7 +194,8 @@ class OptimizedObjectOutputStream extends ObjectOutputStream { obj instanceof Object[] ? Object[].class : obj.getClass(), ctx, mapper, - metaHandler); + metaHandler, + false); if (desc.excluded()) { writeByte(NULL); @@ -220,7 +221,8 @@ class OptimizedObjectOutputStream extends ObjectOutputStream { obj instanceof Object[] ? Object[].class : obj.getClass(), ctx, mapper, - metaHandler); + metaHandler, + false); } if (handle >= 0) { @@ -317,9 +319,13 @@ class OptimizedObjectOutputStream extends ObjectOutputStream { @SuppressWarnings("ForLoopReplaceableByForEach") void writeSerializable(Object obj, List<Method> mtds, OptimizedClassDescriptor.Fields fields, int headerPos) throws IOException { - Footer footer = new Footer(fields); + Footer footer = null; - footer.headerPos(headerPos); + if (metaHandler.metadata(resolveTypeId(obj.getClass().getName(), mapper)) != null) { + footer = new Footer(fields); + + footer.headerPos(headerPos); + } for (int i = 0; i < mtds.size(); i++) { Method mtd = mtds.get(i); @@ -343,7 +349,8 @@ class OptimizedObjectOutputStream extends ObjectOutputStream { writeFields(obj, fields.fields(i), footer); } - footer.write(); + if (footer != null) + footer.write(); } /** @@ -553,12 +560,12 @@ class OptimizedObjectOutputStream extends ObjectOutputStream { if (t.field() != null) { int handle = writeObject0(getObject(obj, t.offset())); - if (handle >= 0) + if (footer != null && handle >= 0) footer.disable(); } } - if (t.field() != null) { + if (footer != null && t.field() != null) { int fieldLen = out.size() - size; footer.put(t.id(), t.type(), fieldLen); @@ -813,13 +820,15 @@ class OptimizedObjectOutputStream extends ObjectOutputStream { case OTHER: int handle = writeObject0(t.get2()); - if (handle >= 0) + if (footer != null && handle >= 0) footer.disable(); } - int fieldLen = out.size() - size; + if (footer != null) { + int fieldLen = out.size() - size; - footer.put(t.get1().id(), t.get1().type(), fieldLen); + footer.put(t.get1().id(), t.get1().type(), fieldLen); + } } }
