http://git-wip-us.apache.org/repos/asf/ignite/blob/b783d2b7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index 82e5f2a..214ba45 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -218,8 +218,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter timeout, invalidate, storeEnabled, - onePhaseCommit, - txSize, + onePhaseCommit, + txSize, subjId, taskNameHash ); @@ -432,6 +432,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter final Collection<KeyCacheObject> keys, boolean skipVals, boolean needVer, + boolean keepBinary, final GridInClosure3<KeyCacheObject, Object, GridCacheVersion> c ) { assert cacheCtx.isLocal() : cacheCtx.name(); @@ -467,7 +468,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter CU.subjectId(this, cctx), null, resolveTaskName(), - expiryPlc); + expiryPlc, + txEntry == null ? keepBinary : txEntry.keepBinary()); if (res == null) { if (misses == null) @@ -722,9 +724,13 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter } if (intercept) { - Object interceptorVal = cacheCtx.config().getInterceptor() - .onBeforePut(new CacheLazyEntry(cacheCtx, key, e.cached().rawGetOrUnmarshal(true)), - CU.value(val, cacheCtx, false)); + Object interceptorVal = cacheCtx.config().getInterceptor().onBeforePut( + new CacheLazyEntry( + cacheCtx, + key, + e.cached().rawGetOrUnmarshal(true), + e.keepBinary()), + cacheCtx.cacheObjectContext().unwrapPortableIfNeeded(val, e.keepBinary(), false)); if (interceptorVal == null) continue; @@ -739,7 +745,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter if (putMap == null) putMap = new LinkedHashMap<>(writeMap().size(), 1.0f); - putMap.put(CU.value(key, cacheCtx, false), F.t(CU.value(val, cacheCtx, false), ver)); + putMap.put(key, F.<Object, GridCacheVersion>t(val, ver)); } } else if (op == DELETE) { @@ -768,7 +774,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter if (intercept) { IgniteBiTuple<Boolean, Object> t = cacheCtx.config().getInterceptor().onBeforeRemove( - new CacheLazyEntry(cacheCtx, key, e.cached().rawGetOrUnmarshal(true))); + new CacheLazyEntry(cacheCtx, key, e.cached().rawGetOrUnmarshal(true), e.keepBinary())); if (cacheCtx.cancelRemove(t)) continue; @@ -781,7 +787,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter if (rmvCol == null) rmvCol = new ArrayList<>(); - rmvCol.add(key.value(cacheCtx.cacheObjectContext(), false)); + rmvCol.add(key); } } else if (log.isDebugEnabled()) @@ -1018,6 +1024,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter txEntry.ttl(), evt, metrics, + txEntry.keepBinary(), topVer, null, cached.detached() ? DR_NONE : drType, @@ -1038,6 +1045,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter txEntry.ttl(), false, metrics, + txEntry.keepBinary(), topVer, CU.empty0(), DR_NONE, @@ -1057,6 +1065,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter false, evt, metrics, + txEntry.keepBinary(), topVer, null, cached.detached() ? DR_NONE : drType, @@ -1074,6 +1083,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter false, false, metrics, + txEntry.keepBinary(), topVer, CU.empty0(), DR_NONE, @@ -1415,7 +1425,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter CU.subjectId(this, cctx), transformClo, resolveTaskName(), - null); + null, + txEntry.keepBinary()); if (val != null) { if (!readCommitted() && !skipVals) @@ -1474,7 +1485,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter CU.subjectId(this, cctx), null, resolveTaskName(), - accessPlc) : null; + accessPlc, + !deserializePortable) : null; if (res != null) { val = res.get1(); @@ -1493,7 +1505,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter CU.subjectId(this, cctx), null, resolveTaskName(), - accessPlc); + accessPlc, + !deserializePortable); } if (val != null) { @@ -1524,7 +1537,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter -1L, -1L, null, - skipStore); + skipStore, + !deserializePortable); // As optimization, mark as checked immediately // for non-pessimistic if value is not null. @@ -1648,6 +1662,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter missedMap.keySet(), skipVals, needReadVer, + !deserializePortable, new GridInClosure3<KeyCacheObject, Object, GridCacheVersion>() { @Override public void apply(KeyCacheObject key, Object val, GridCacheVersion loadVer) { if (isRollbackOnly()) { @@ -1782,8 +1797,9 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter // Load keys only after the locks have been acquired. for (KeyCacheObject cacheKey : lockKeys) { - K keyVal = - (K)(keepCacheObjects ? cacheKey : cacheKey.value(cacheCtx.cacheObjectContext(), false)); + K keyVal = (K) + (keepCacheObjects ? cacheKey : + cacheCtx.cacheObjectContext().unwrapPortableIfNeeded(cacheKey, !deserializePortable)); if (retMap.containsKey(keyVal)) // We already have a return value. @@ -1816,7 +1832,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter CU.subjectId(IgniteTxLocalAdapter.this, cctx), transformClo, resolveTaskName(), - null); + null, + txEntry.keepBinary()); // If value is in cache and passed the filter. if (val != null) { @@ -2047,7 +2064,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter @Nullable Map<KeyCacheObject, GridCacheDrInfo> drPutMap, @Nullable Map<KeyCacheObject, GridCacheVersion> drRmvMap, boolean skipStore, - final boolean singleRmv + final boolean singleRmv, + final boolean keepBinary ) { assert retval || invokeMap == null; @@ -2152,7 +2170,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter CU.subjectId(this, cctx), entryProcessor, resolveTaskName(), - null) : null; + null, + keepBinary) : null; if (res != null) { old = res.get1(); @@ -2171,7 +2190,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter CU.subjectId(this, cctx), entryProcessor, resolveTaskName(), - null); + null, + keepBinary); } } catch (ClusterTopologyCheckedException e) { @@ -2186,7 +2206,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter if (old != null && hasFilters && !filter(entry.context(), cacheKey, old, filter)) { skipped = skip(skipped, cacheKey); - ret.set(cacheCtx, old, false); + ret.set(cacheCtx, old, false, keepBinary); if (!readCommitted()) { // Enlist failed filters as reads for non-read-committed mode, @@ -2202,7 +2222,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter -1L, -1L, null, - skipStore); + skipStore, + keepBinary); txEntry.markValid(); @@ -2233,7 +2254,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter drTtl, drExpireTime, drVer, - skipStore); + skipStore, + keepBinary); if (!implicit() && readCommitted() && !cacheCtx.offheapTiered()) cacheCtx.evicts().touch(entry, topologyVersion()); @@ -2255,7 +2277,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter assert txEntry.op() != TRANSFORM : txEntry; if (retval) - ret.set(cacheCtx, null, true); + ret.set(cacheCtx, null, true, keepBinary); else ret.success(true); } @@ -2268,7 +2290,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter } if (retval && !transform) - ret.set(cacheCtx, old, true); + ret.set(cacheCtx, old, true, keepBinary); else { if (txEntry.op() == TRANSFORM) { GridCacheVersion ver; @@ -2296,7 +2318,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter // Pessimistic. else { if (retval && !transform) - ret.set(cacheCtx, old, true); + ret.set(cacheCtx, old, true, keepBinary); else ret.success(true); } @@ -2324,7 +2346,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter if (hasFilters && !filter(entry.context(), cacheKey, v, filter)) { skipped = skip(skipped, cacheKey); - ret.set(cacheCtx, v, false); + ret.set(cacheCtx, v, false, keepBinary); continue; } @@ -2343,7 +2365,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter drTtl, drExpireTime, drVer, - skipStore); + skipStore, + keepBinary); enlisted.add(cacheKey); @@ -2370,7 +2393,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter txEntry.markValid(); if (retval && !transform) - ret.set(cacheCtx, v, true); + ret.set(cacheCtx, v, true, keepBinary); else ret.success(true); } @@ -2391,6 +2414,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter missedForLoad, skipVals, needReadVer, + keepBinary, new GridInClosure3<KeyCacheObject, Object, GridCacheVersion>() { @Override public void apply(KeyCacheObject key, @Nullable Object val, @@ -2412,7 +2436,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter assert !hasFilters && !retval; assert val == null || Boolean.TRUE.equals(val) : val; - ret.set(cacheCtx, null, val != null); + ret.set(cacheCtx, null, val != null, keepBinary); } else { CacheObject cacheVal = cacheCtx.toCacheObject(val); @@ -2437,7 +2461,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter else { boolean success = !hasFilters || isAll(e.context(), key, cacheVal, filter); - ret.set(cacheCtx, cacheVal, success); + ret.set(cacheCtx, cacheVal, success, keepBinary); } } } @@ -2559,7 +2583,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter CU.subjectId(this, cctx), null, resolveTaskName(), - null); + null, + txEntry.keepBinary()); } } else { @@ -2587,7 +2612,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter } } else - ret.value(cacheCtx, v); + ret.value(cacheCtx, v, txEntry.keepBinary()); } boolean pass = F.isEmpty(filter) || cacheCtx.isAll(cached, filter); @@ -2665,7 +2690,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter for (T2<EntryProcessor<Object, Object, Object>, Object[]> t : txEntry.entryProcessors()) { CacheInvokeEntry<Object, Object> invokeEntry = - new CacheInvokeEntry(txEntry.context(), txEntry.key(), key0, cacheVal, val0, ver); + new CacheInvokeEntry(txEntry.context(), txEntry.key(), key0, cacheVal, val0, ver, + txEntry.keepBinary()); EntryProcessor<Object, Object, ?> entryProcessor = t.get1(); @@ -2805,7 +2831,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter drMap, null, opCtx != null && opCtx.skipStore(), - false); + false, + opCtx != null && opCtx.isKeepBinary()); if (pessimistic()) { // Loose all skipped. @@ -3032,7 +3059,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter null, drMap, opCtx != null && opCtx.skipStore(), - singleRmv + singleRmv, + opCtx != null && opCtx.isKeepBinary() ); if (log.isDebugEnabled()) @@ -3157,12 +3185,12 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter * Checks if portable values should be deserialized. * * @param cacheCtx Cache context. - * @return {@code True} if portables should be deserialized, {@code false} otherwise. + * @return {@code True} if binary should be deserialized, {@code false} otherwise. */ private boolean deserializePortables(GridCacheContext cacheCtx) { CacheOperationContext opCtx = cacheCtx.operationContextPerCall(); - return opCtx == null || !opCtx.isKeepPortable(); + return opCtx == null || !opCtx.isKeepBinary(); } /** @@ -3283,7 +3311,9 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter long drTtl, long drExpireTime, @Nullable GridCacheVersion drVer, - boolean skipStore) { + boolean skipStore, + boolean keepBinary + ) { assert invokeArgs == null || op == TRANSFORM; IgniteTxKey key = entry.txKey(); @@ -3353,7 +3383,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter entry, filter, drVer, - skipStore); + skipStore, + keepBinary); txEntry.conflictExpireTime(drExpireTime);
http://git-wip-us.apache.org/repos/asf/ignite/blob/b783d2b7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java index 0d83338..08f2e43 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java @@ -166,5 +166,6 @@ public interface IgniteTxLocalEx extends IgniteInternalTx { Collection<KeyCacheObject> keys, boolean skipVals, boolean needVer, + boolean keepBinary, GridInClosure3<KeyCacheObject, Object, GridCacheVersion> c); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/b783d2b7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index 1f51b8a..ec0a318 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@ -1404,7 +1404,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { assert serReadVer == null || (tx.optimistic() && tx.serializable()) : txEntry1; - if (!entry1.tmLock(tx, timeout, serOrder, serReadVer)) { + if (!entry1.tmLock(tx, timeout, serOrder, serReadVer, txEntry1.keepBinary())) { // Unlock locks locked so far. for (IgniteTxEntry txEntry2 : entries) { if (txEntry2 == txEntry1) http://git-wip-us.apache.org/repos/asf/ignite/blob/b783d2b7/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java index ecbe294..239b1b9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.cacheobject; +import org.apache.ignite.IgniteBinary; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.configuration.CacheConfiguration; @@ -164,4 +165,9 @@ public interface IgniteCacheObjectProcessor extends GridProcessor { * @return {@code True} if object is of known immutable type. */ public boolean immutable(Object obj); + + /** + * @return Ignite binary interface. + */ + public IgniteBinary binary(); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/b783d2b7/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java index 0bc102e..0d36e0e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java @@ -22,6 +22,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.HashSet; import java.util.UUID; +import org.apache.ignite.IgniteBinary; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.cache.CacheMemoryMode; @@ -56,6 +57,9 @@ public class IgniteCacheObjectProcessorImpl extends GridProcessorAdapter impleme /** Immutable classes. */ private static final Collection<Class<?>> IMMUTABLE_CLS = new HashSet<>(); + /** */ + private IgniteBinary noOpBinary = new NoOpBinary(); + /** * */ @@ -82,6 +86,11 @@ public class IgniteCacheObjectProcessorImpl extends GridProcessorAdapter impleme } /** {@inheritDoc} */ + @Override public IgniteBinary binary() { + return noOpBinary; + } + + /** {@inheritDoc} */ @Nullable @Override public CacheObject prepareForCache(@Nullable CacheObject obj, GridCacheContext cctx) { if (obj == null) return null; http://git-wip-us.apache.org/repos/asf/ignite/blob/b783d2b7/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/NoOpBinary.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/NoOpBinary.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/NoOpBinary.java new file mode 100644 index 0000000..c20f278 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/NoOpBinary.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ignite.internal.processors.cacheobject; + +import java.util.Collection; +import org.apache.ignite.IgniteBinary; +import org.apache.ignite.binary.BinaryObject; +import org.apache.ignite.binary.BinaryObjectBuilder; +import org.apache.ignite.binary.BinaryObjectException; +import org.apache.ignite.binary.BinaryType; +import org.jetbrains.annotations.Nullable; + +/** + * + */ +public class NoOpBinary implements IgniteBinary { + /** {@inheritDoc} */ + @Override public int typeId(String typeName) { + return 0; + } + + /** {@inheritDoc} */ + @Override public <T> T toBinary(@Nullable Object obj) throws BinaryObjectException { + throw unsupported(); + } + + /** {@inheritDoc} */ + @Override public BinaryObjectBuilder builder(int typeId) throws BinaryObjectException { + throw unsupported(); + } + + /** {@inheritDoc} */ + @Override public BinaryObjectBuilder builder(String typeName) throws BinaryObjectException { + throw unsupported(); + } + + /** {@inheritDoc} */ + @Override public BinaryObjectBuilder builder(BinaryObject portableObj) throws BinaryObjectException { + throw unsupported(); + } + + /** {@inheritDoc} */ + @Override public BinaryType metadata(Class<?> cls) throws BinaryObjectException { + throw unsupported(); + } + + /** {@inheritDoc} */ + @Override public BinaryType metadata(String typeName) throws BinaryObjectException { + throw unsupported(); + } + + /** {@inheritDoc} */ + @Override public BinaryType metadata(int typeId) throws BinaryObjectException { + throw unsupported(); + } + + /** {@inheritDoc} */ + @Override public Collection<BinaryType> metadata() throws BinaryObjectException { + throw unsupported(); + } + + /** {@inheritDoc} */ + private BinaryObjectException unsupported() { + return new BinaryObjectException("Binary marshaller is not configured."); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/b783d2b7/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java index 5150d83..a2aab77 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java @@ -289,6 +289,7 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter { col, req.ignoreDeploymentOwnership(), req.skipStore(), + req.keepBinary(), updater); Exception err = null; http://git-wip-us.apache.org/repos/asf/ignite/blob/b783d2b7/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerEntry.java index 982b691..874d2e2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerEntry.java @@ -82,10 +82,10 @@ public class DataStreamerEntry implements Map.Entry<KeyCacheObject, CacheObject> * @param ctx Cache context. * @return Map entry unwrapping internal key and value. */ - public <K, V> Map.Entry<K, V> toEntry(final GridCacheContext ctx) { + public <K, V> Map.Entry<K, V> toEntry(final GridCacheContext ctx, final boolean keepBinary) { return new Map.Entry<K, V>() { @Override public K getKey() { - return key.value(ctx.cacheObjectContext(), false); + return (K)ctx.cacheObjectContext().unwrapPortableIfNeeded(key, keepBinary, false); } @Override public V setValue(V val) { @@ -93,7 +93,7 @@ public class DataStreamerEntry implements Map.Entry<KeyCacheObject, CacheObject> } @Override public V getValue() { - return val != null ? val.<V>value(ctx.cacheObjectContext(), false) : null; + return (V)ctx.cacheObjectContext().unwrapPortableIfNeeded(val, keepBinary, false); } }; } http://git-wip-us.apache.org/repos/asf/ignite/blob/b783d2b7/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java index 2190bf6..27eff0c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java @@ -211,6 +211,9 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed private boolean skipStore; /** */ + private boolean keepBinary; + + /** */ private int maxRemapCnt = DFLT_MAX_REMAP_CNT; /** Whether a warning at {@link DataStreamerImpl#allowOverwrite()} printed */ @@ -405,6 +408,16 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed } /** {@inheritDoc} */ + @Override public boolean keepBinary() { + return keepBinary; + } + + /** {@inheritDoc} */ + @Override public void keepBinary(boolean keepBinary) { + this.keepBinary = keepBinary; + } + + /** {@inheritDoc} */ @Override @Nullable public String cacheName() { return cacheName; } @@ -1243,7 +1256,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed if (isLocNode) { fut = ctx.closure().callLocalSafe( - new DataStreamerUpdateJob(ctx, log, cacheName, entries, false, skipStore, rcvr), false); + new DataStreamerUpdateJob(ctx, log, cacheName, entries, false, skipStore, keepBinary, rcvr), false); locFuts.add(fut); @@ -1331,6 +1344,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed entries, true, skipStore, + keepBinary, dep != null ? dep.deployMode() : null, dep != null ? jobPda0.deployClass().getName() : null, dep != null ? dep.userVersion() : null, http://git-wip-us.apache.org/repos/asf/ignite/blob/b783d2b7/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java index c1a1528..3d65304 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java @@ -64,6 +64,9 @@ public class DataStreamerRequest implements Message { /** */ private boolean skipStore; + /** Keep binary flag. */ + private boolean keepBinary; + /** */ private DeploymentMode depMode; @@ -117,6 +120,7 @@ public class DataStreamerRequest implements Message { Collection<DataStreamerEntry> entries, boolean ignoreDepOwnership, boolean skipStore, + boolean keepBinary, DeploymentMode depMode, String sampleClsName, String userVer, @@ -133,6 +137,7 @@ public class DataStreamerRequest implements Message { this.entries = entries; this.ignoreDepOwnership = ignoreDepOwnership; this.skipStore = skipStore; + this.keepBinary = keepBinary; this.depMode = depMode; this.sampleClsName = sampleClsName; this.userVer = userVer; @@ -192,6 +197,13 @@ public class DataStreamerRequest implements Message { } /** + * @return Keep binary flag. + */ + public boolean keepBinary() { + return keepBinary; + } + + /** * @return Deployment mode. */ public DeploymentMode deploymentMode() { @@ -294,48 +306,54 @@ public class DataStreamerRequest implements Message { writer.incrementState(); case 6: - if (!writer.writeMap("ldrParticipants", ldrParticipants, MessageCollectionItemType.UUID, MessageCollectionItemType.IGNITE_UUID)) + if (!writer.writeBoolean("keepBinary", keepBinary)) return false; writer.incrementState(); case 7: - if (!writer.writeLong("reqId", reqId)) + if (!writer.writeMap("ldrParticipants", ldrParticipants, MessageCollectionItemType.UUID, MessageCollectionItemType.IGNITE_UUID)) return false; writer.incrementState(); case 8: - if (!writer.writeByteArray("resTopicBytes", resTopicBytes)) + if (!writer.writeLong("reqId", reqId)) return false; writer.incrementState(); case 9: - if (!writer.writeString("sampleClsName", sampleClsName)) + if (!writer.writeByteArray("resTopicBytes", resTopicBytes)) return false; writer.incrementState(); case 10: - if (!writer.writeBoolean("skipStore", skipStore)) + if (!writer.writeString("sampleClsName", sampleClsName)) return false; writer.incrementState(); case 11: - if (!writer.writeMessage("topVer", topVer)) + if (!writer.writeBoolean("skipStore", skipStore)) return false; writer.incrementState(); case 12: - if (!writer.writeByteArray("updaterBytes", updaterBytes)) + if (!writer.writeMessage("topVer", topVer)) return false; writer.incrementState(); case 13: + if (!writer.writeByteArray("updaterBytes", updaterBytes)) + return false; + + writer.incrementState(); + + case 14: if (!writer.writeString("userVer", userVer)) return false; @@ -407,7 +425,7 @@ public class DataStreamerRequest implements Message { reader.incrementState(); case 6: - ldrParticipants = reader.readMap("ldrParticipants", MessageCollectionItemType.UUID, MessageCollectionItemType.IGNITE_UUID, false); + keepBinary = reader.readBoolean("keepBinary"); if (!reader.isLastRead()) return false; @@ -415,7 +433,7 @@ public class DataStreamerRequest implements Message { reader.incrementState(); case 7: - reqId = reader.readLong("reqId"); + ldrParticipants = reader.readMap("ldrParticipants", MessageCollectionItemType.UUID, MessageCollectionItemType.IGNITE_UUID, false); if (!reader.isLastRead()) return false; @@ -423,7 +441,7 @@ public class DataStreamerRequest implements Message { reader.incrementState(); case 8: - resTopicBytes = reader.readByteArray("resTopicBytes"); + reqId = reader.readLong("reqId"); if (!reader.isLastRead()) return false; @@ -431,7 +449,7 @@ public class DataStreamerRequest implements Message { reader.incrementState(); case 9: - sampleClsName = reader.readString("sampleClsName"); + resTopicBytes = reader.readByteArray("resTopicBytes"); if (!reader.isLastRead()) return false; @@ -439,7 +457,7 @@ public class DataStreamerRequest implements Message { reader.incrementState(); case 10: - skipStore = reader.readBoolean("skipStore"); + sampleClsName = reader.readString("sampleClsName"); if (!reader.isLastRead()) return false; @@ -447,7 +465,7 @@ public class DataStreamerRequest implements Message { reader.incrementState(); case 11: - topVer = reader.readMessage("topVer"); + skipStore = reader.readBoolean("skipStore"); if (!reader.isLastRead()) return false; @@ -455,7 +473,7 @@ public class DataStreamerRequest implements Message { reader.incrementState(); case 12: - updaterBytes = reader.readByteArray("updaterBytes"); + topVer = reader.readMessage("topVer"); if (!reader.isLastRead()) return false; @@ -463,6 +481,14 @@ public class DataStreamerRequest implements Message { reader.incrementState(); case 13: + updaterBytes = reader.readByteArray("updaterBytes"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 14: userVer = reader.readString("userVer"); if (!reader.isLastRead()) @@ -482,6 +508,6 @@ public class DataStreamerRequest implements Message { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 14; + return 15; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/b783d2b7/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerUpdateJob.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerUpdateJob.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerUpdateJob.java index 42084a3..c49087f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerUpdateJob.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerUpdateJob.java @@ -56,6 +56,9 @@ class DataStreamerUpdateJob implements GridPlainCallable<Object> { /** */ private final StreamReceiver rcvr; + /** */ + private boolean keepBinary; + /** * @param ctx Context. * @param log Log. @@ -72,6 +75,7 @@ class DataStreamerUpdateJob implements GridPlainCallable<Object> { Collection<DataStreamerEntry> col, boolean ignoreDepOwnership, boolean skipStore, + boolean keepBinary, StreamReceiver<?, ?> rcvr) { this.ctx = ctx; this.log = log; @@ -83,6 +87,7 @@ class DataStreamerUpdateJob implements GridPlainCallable<Object> { this.col = col; this.ignoreDepOwnership = ignoreDepOwnership; this.skipStore = skipStore; + this.keepBinary = keepBinary; this.rcvr = rcvr; } @@ -99,6 +104,9 @@ class DataStreamerUpdateJob implements GridPlainCallable<Object> { if (skipStore) cache = (IgniteCacheProxy<?, ?>)cache.withSkipStore(); + if (keepBinary) + cache = (IgniteCacheProxy<?, ?>)cache.withKeepBinary(); + if (ignoreDepOwnership) cache.context().deploy().ignoreOwnership(true); @@ -122,7 +130,7 @@ class DataStreamerUpdateJob implements GridPlainCallable<Object> { if (unwrapEntries()) { Collection<Map.Entry> col0 = F.viewReadOnly(col, new C1<DataStreamerEntry, Map.Entry>() { @Override public Map.Entry apply(DataStreamerEntry e) { - return e.toEntry(cctx); + return e.toEntry(cctx, keepBinary); } }); http://git-wip-us.apache.org/repos/asf/ignite/blob/b783d2b7/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java index 0f46517..09cb29d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java @@ -20,8 +20,8 @@ package org.apache.ignite.internal.processors.platform; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.portable.PortableRawReaderEx; -import org.apache.ignite.internal.portable.PortableRawWriterEx; +import org.apache.ignite.internal.portable.BinaryRawReaderEx; +import org.apache.ignite.internal.portable.BinaryRawWriterEx; import org.apache.ignite.internal.processors.platform.memory.PlatformMemory; import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream; import org.apache.ignite.internal.processors.platform.utils.PlatformFutureUtils; @@ -62,7 +62,7 @@ public abstract class PlatformAbstractTarget implements PlatformTarget { /** {@inheritDoc} */ @Override public long inStreamOutLong(int type, long memPtr) throws Exception { try (PlatformMemory mem = platformCtx.memory().get(memPtr)) { - PortableRawReaderEx reader = platformCtx.reader(mem); + BinaryRawReaderEx reader = platformCtx.reader(mem); if (type == OP_META) { platformCtx.processMetadata(reader); @@ -80,7 +80,7 @@ public abstract class PlatformAbstractTarget implements PlatformTarget { /** {@inheritDoc} */ @Override public Object inStreamOutObject(int type, long memPtr) throws Exception { try (PlatformMemory mem = platformCtx.memory().get(memPtr)) { - PortableRawReaderEx reader = platformCtx.reader(mem); + BinaryRawReaderEx reader = platformCtx.reader(mem); return processInStreamOutObject(type, reader); } @@ -104,7 +104,7 @@ public abstract class PlatformAbstractTarget implements PlatformTarget { try (PlatformMemory mem = platformCtx.memory().get(memPtr)) { PlatformOutputStream out = mem.output(); - PortableRawWriterEx writer = platformCtx.writer(out); + BinaryRawWriterEx writer = platformCtx.writer(out); processOutStream(type, writer); @@ -128,12 +128,12 @@ public abstract class PlatformAbstractTarget implements PlatformTarget { /** {@inheritDoc} */ @Override public void inStreamOutStream(int type, long inMemPtr, long outMemPtr) throws Exception { try (PlatformMemory inMem = platformCtx.memory().get(inMemPtr)) { - PortableRawReaderEx reader = platformCtx.reader(inMem); + BinaryRawReaderEx reader = platformCtx.reader(inMem); try (PlatformMemory outMem = platformCtx.memory().get(outMemPtr)) { PlatformOutputStream out = outMem.output(); - PortableRawWriterEx writer = platformCtx.writer(out); + BinaryRawWriterEx writer = platformCtx.writer(out); processInStreamOutStream(type, reader, writer); @@ -148,12 +148,12 @@ public abstract class PlatformAbstractTarget implements PlatformTarget { /** {@inheritDoc} */ @Override public void inObjectStreamOutStream(int type, Object arg, long inMemPtr, long outMemPtr) throws Exception { try (PlatformMemory inMem = platformCtx.memory().get(inMemPtr)) { - PortableRawReaderEx reader = platformCtx.reader(inMem); + BinaryRawReaderEx reader = platformCtx.reader(inMem); try (PlatformMemory outMem = platformCtx.memory().get(outMemPtr)) { PlatformOutputStream out = outMem.output(); - PortableRawWriterEx writer = platformCtx.writer(out); + BinaryRawWriterEx writer = platformCtx.writer(out); processInObjectStreamOutStream(type, arg, reader, writer); @@ -233,7 +233,7 @@ public abstract class PlatformAbstractTarget implements PlatformTarget { * @return Result. * @throws IgniteCheckedException In case of exception. */ - protected long processInStreamOutLong(int type, PortableRawReaderEx reader) throws IgniteCheckedException { + protected long processInStreamOutLong(int type, BinaryRawReaderEx reader) throws IgniteCheckedException { return throwUnsupported(type); } @@ -245,7 +245,7 @@ public abstract class PlatformAbstractTarget implements PlatformTarget { * @param writer Portable writer. * @throws IgniteCheckedException In case of exception. */ - protected void processInStreamOutStream(int type, PortableRawReaderEx reader, PortableRawWriterEx writer) + protected void processInStreamOutStream(int type, BinaryRawReaderEx reader, BinaryRawWriterEx writer) throws IgniteCheckedException { throwUnsupported(type); } @@ -258,7 +258,7 @@ public abstract class PlatformAbstractTarget implements PlatformTarget { * @return Result. * @throws IgniteCheckedException In case of exception. */ - protected Object processInStreamOutObject(int type, PortableRawReaderEx reader) throws IgniteCheckedException { + protected Object processInStreamOutObject(int type, BinaryRawReaderEx reader) throws IgniteCheckedException { return throwUnsupported(type); } @@ -271,8 +271,8 @@ public abstract class PlatformAbstractTarget implements PlatformTarget { * @param writer Portable writer. * @throws IgniteCheckedException In case of exception. */ - protected void processInObjectStreamOutStream(int type, @Nullable Object arg, PortableRawReaderEx reader, - PortableRawWriterEx writer) throws IgniteCheckedException { + protected void processInObjectStreamOutStream(int type, @Nullable Object arg, BinaryRawReaderEx reader, + BinaryRawWriterEx writer) throws IgniteCheckedException { throwUnsupported(type); } @@ -293,7 +293,7 @@ public abstract class PlatformAbstractTarget implements PlatformTarget { * @param writer Portable writer. * @throws IgniteCheckedException In case of exception. */ - protected void processOutStream(int type, PortableRawWriterEx writer) throws IgniteCheckedException { + protected void processOutStream(int type, BinaryRawWriterEx writer) throws IgniteCheckedException { throwUnsupported(type); } http://git-wip-us.apache.org/repos/asf/ignite/blob/b783d2b7/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java index a9b7d02..12eb36e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java @@ -21,8 +21,8 @@ import org.apache.ignite.cluster.ClusterMetrics; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.events.Event; import org.apache.ignite.internal.GridKernalContext; -import org.apache.ignite.internal.portable.PortableRawReaderEx; -import org.apache.ignite.internal.portable.PortableRawWriterEx; +import org.apache.ignite.internal.portable.BinaryRawReaderEx; +import org.apache.ignite.internal.portable.BinaryRawWriterEx; import org.apache.ignite.internal.processors.platform.cache.PlatformCacheEntryFilter; import org.apache.ignite.internal.processors.platform.cache.PlatformCacheEntryProcessor; import org.apache.ignite.internal.processors.platform.cache.query.PlatformContinuousQuery; @@ -71,7 +71,7 @@ public interface PlatformContext { * @param mem Memory. * @return Reader. */ - public PortableRawReaderEx reader(PlatformMemory mem); + public BinaryRawReaderEx reader(PlatformMemory mem); /** * Get memory reader. @@ -79,7 +79,7 @@ public interface PlatformContext { * @param in Input. * @return Reader. */ - public PortableRawReaderEx reader(PlatformInputStream in); + public BinaryRawReaderEx reader(PlatformInputStream in); /** * Get memory writer. @@ -87,7 +87,7 @@ public interface PlatformContext { * @param mem Memory. * @return Writer. */ - public PortableRawWriterEx writer(PlatformMemory mem); + public BinaryRawWriterEx writer(PlatformMemory mem); /** * Get memory writer. @@ -95,7 +95,7 @@ public interface PlatformContext { * @param out Output. * @return Writer. */ - public PortableRawWriterEx writer(PlatformOutputStream out); + public BinaryRawWriterEx writer(PlatformOutputStream out); /** * Sends node info to native platform, if necessary. @@ -110,7 +110,7 @@ public interface PlatformContext { * @param writer Writer. * @param node Node. */ - public void writeNode(PortableRawWriterEx writer, ClusterNode node); + public void writeNode(BinaryRawWriterEx writer, ClusterNode node); /** * Writes multiple node ids to a stream and sends node info to native platform, if necessary. @@ -118,14 +118,14 @@ public interface PlatformContext { * @param writer Writer. * @param nodes Nodes. */ - public void writeNodes(PortableRawWriterEx writer, Collection<ClusterNode> nodes); + public void writeNodes(BinaryRawWriterEx writer, Collection<ClusterNode> nodes); /** * Process metadata from the platform. * * @param reader Reader. */ - public void processMetadata(PortableRawReaderEx reader); + public void processMetadata(BinaryRawReaderEx reader); /** * Write metadata for the given type ID. @@ -133,14 +133,14 @@ public interface PlatformContext { * @param writer Writer. * @param typeId Type ID. */ - public void writeMetadata(PortableRawWriterEx writer, int typeId); + public void writeMetadata(BinaryRawWriterEx writer, int typeId); /** * Write all available metadata. * * @param writer Writer. */ - public void writeAllMetadata(PortableRawWriterEx writer); + public void writeAllMetadata(BinaryRawWriterEx writer); /** * Write cluster metrics. @@ -148,7 +148,7 @@ public interface PlatformContext { * @param writer Writer. * @param metrics Metrics. */ - public void writeClusterMetrics(PortableRawWriterEx writer, @Nullable ClusterMetrics metrics); + public void writeClusterMetrics(BinaryRawWriterEx writer, @Nullable ClusterMetrics metrics); /** * @@ -190,7 +190,7 @@ public interface PlatformContext { * @param writer Writer. * @param evt Event. */ - public void writeEvent(PortableRawWriterEx writer, Event evt); + public void writeEvent(BinaryRawWriterEx writer, Event evt); /** * Create local event filter. http://git-wip-us.apache.org/repos/asf/ignite/blob/b783d2b7/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContextImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContextImpl.java index 177a732..c513600 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContextImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContextImpl.java @@ -34,10 +34,10 @@ import org.apache.ignite.events.SwapSpaceEvent; import org.apache.ignite.events.TaskEvent; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.portable.GridPortableMarshaller; -import org.apache.ignite.internal.portable.PortableMetaDataImpl; -import org.apache.ignite.internal.portable.PortableRawReaderEx; -import org.apache.ignite.internal.portable.PortableRawWriterEx; -import org.apache.ignite.internal.processors.cache.portable.CacheObjectPortableProcessorImpl; +import org.apache.ignite.internal.portable.BinaryMetaDataImpl; +import org.apache.ignite.internal.portable.BinaryRawReaderEx; +import org.apache.ignite.internal.portable.BinaryRawWriterEx; +import org.apache.ignite.internal.processors.cache.portable.CacheObjectBinaryProcessorImpl; import org.apache.ignite.internal.processors.platform.cache.PlatformCacheEntryFilter; import org.apache.ignite.internal.processors.platform.cache.PlatformCacheEntryFilterImpl; import org.apache.ignite.internal.processors.platform.cache.PlatformCacheEntryProcessor; @@ -70,13 +70,12 @@ import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.T4; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiTuple; -import org.apache.ignite.portable.PortableMetadata; +import org.apache.ignite.binary.BinaryType; import org.jetbrains.annotations.Nullable; import java.sql.Timestamp; import java.util.Collection; import java.util.Collections; -import java.util.Date; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -105,7 +104,7 @@ public class PlatformContextImpl implements PlatformContext { private final PlatformCallbackGateway gate; /** Cache object processor. */ - private final CacheObjectPortableProcessorImpl cacheObjProc; + private final CacheObjectBinaryProcessorImpl cacheObjProc; /** Node ids that has been sent to native platform. */ private final Set<UUID> sentNodes = Collections.newSetFromMap(new ConcurrentHashMap<UUID, Boolean>()); @@ -150,7 +149,7 @@ public class PlatformContextImpl implements PlatformContext { this.gate = gate; this.mem = mem; - cacheObjProc = (CacheObjectPortableProcessorImpl)ctx.cacheObjects(); + cacheObjProc = (CacheObjectBinaryProcessorImpl)ctx.cacheObjects(); marsh = cacheObjProc.marshaller(); } @@ -171,22 +170,22 @@ public class PlatformContextImpl implements PlatformContext { } /** {@inheritDoc} */ - @Override public PortableRawReaderEx reader(PlatformMemory mem) { + @Override public BinaryRawReaderEx reader(PlatformMemory mem) { return reader(mem.input()); } /** {@inheritDoc} */ - @Override public PortableRawReaderEx reader(PlatformInputStream in) { + @Override public BinaryRawReaderEx reader(PlatformInputStream in) { return marsh.reader(in); } /** {@inheritDoc} */ - @Override public PortableRawWriterEx writer(PlatformMemory mem) { + @Override public BinaryRawWriterEx writer(PlatformMemory mem) { return writer(mem.output()); } /** {@inheritDoc} */ - @Override public PortableRawWriterEx writer(PlatformOutputStream out) { + @Override public BinaryRawWriterEx writer(PlatformOutputStream out) { return marsh.writer(out); } @@ -199,7 +198,7 @@ public class PlatformContextImpl implements PlatformContext { try (PlatformMemory mem0 = mem.allocate()) { PlatformOutputStream out = mem0.output(); - PortableRawWriterEx w = writer(out); + BinaryRawWriterEx w = writer(out); w.writeUuid(node.id()); @@ -234,7 +233,7 @@ public class PlatformContextImpl implements PlatformContext { } /** {@inheritDoc} */ - @Override public void writeNode(PortableRawWriterEx writer, ClusterNode node) { + @Override public void writeNode(BinaryRawWriterEx writer, ClusterNode node) { if (node == null) { writer.writeUuid(null); @@ -247,7 +246,7 @@ public class PlatformContextImpl implements PlatformContext { } /** {@inheritDoc} */ - @Override public void writeNodes(PortableRawWriterEx writer, Collection<ClusterNode> nodes) { + @Override public void writeNodes(BinaryRawWriterEx writer, Collection<ClusterNode> nodes) { if (nodes == null) { writer.writeInt(-1); @@ -264,7 +263,7 @@ public class PlatformContextImpl implements PlatformContext { } /** {@inheritDoc} */ - @Override public void writeClusterMetrics(PortableRawWriterEx writer, @Nullable ClusterMetrics metrics) { + @Override public void writeClusterMetrics(BinaryRawWriterEx writer, @Nullable ClusterMetrics metrics) { if (metrics == null) writer.writeBoolean(false); else { @@ -340,17 +339,17 @@ public class PlatformContextImpl implements PlatformContext { /** {@inheritDoc} */ @SuppressWarnings("ConstantConditions") - @Override public void processMetadata(PortableRawReaderEx reader) { + @Override public void processMetadata(BinaryRawReaderEx reader) { Collection<T4<Integer, String, String, Map<String, Integer>>> metas = PlatformUtils.readCollection(reader, new PlatformReaderClosure<T4<Integer, String, String, Map<String, Integer>>>() { - @Override public T4<Integer, String, String, Map<String, Integer>> read(PortableRawReaderEx reader) { + @Override public T4<Integer, String, String, Map<String, Integer>> read(BinaryRawReaderEx reader) { int typeId = reader.readInt(); String typeName = reader.readString(); String affKey = reader.readString(); Map<String, Integer> fields = PlatformUtils.readMap(reader, new PlatformReaderBiClosure<String, Integer>() { - @Override public IgniteBiTuple<String, Integer> read(PortableRawReaderEx reader) { + @Override public IgniteBiTuple<String, Integer> read(BinaryRawReaderEx reader) { return F.t(reader.readString(), reader.readInt()); } }); @@ -365,17 +364,17 @@ public class PlatformContextImpl implements PlatformContext { } /** {@inheritDoc} */ - @Override public void writeMetadata(PortableRawWriterEx writer, int typeId) { + @Override public void writeMetadata(BinaryRawWriterEx writer, int typeId) { writeMetadata0(writer, typeId, cacheObjProc.metadata(typeId)); } /** {@inheritDoc} */ - @Override public void writeAllMetadata(PortableRawWriterEx writer) { - Collection<PortableMetadata> metas = cacheObjProc.metadata(); + @Override public void writeAllMetadata(BinaryRawWriterEx writer) { + Collection<BinaryType> metas = cacheObjProc.metadata(); writer.writeInt(metas.size()); - for (org.apache.ignite.portable.PortableMetadata m : metas) + for (BinaryType m : metas) writeMetadata0(writer, cacheObjProc.typeId(m.typeName()), m); } @@ -386,18 +385,18 @@ public class PlatformContextImpl implements PlatformContext { * @param typeId Type id. * @param meta Metadata. */ - private void writeMetadata0(PortableRawWriterEx writer, int typeId, PortableMetadata meta) { + private void writeMetadata0(BinaryRawWriterEx writer, int typeId, BinaryType meta) { if (meta == null) writer.writeBoolean(false); else { writer.writeBoolean(true); - Map<String, String> metaFields = ((PortableMetaDataImpl)meta).fields0(); + Map<String, String> metaFields = ((BinaryMetaDataImpl)meta).fields0(); Map<String, Integer> fields = U.newHashMap(metaFields.size()); for (Map.Entry<String, String> metaField : metaFields.entrySet()) - fields.put(metaField.getKey(), CacheObjectPortableProcessorImpl.fieldTypeId(metaField.getValue())); + fields.put(metaField.getKey(), CacheObjectBinaryProcessorImpl.fieldTypeId(metaField.getValue())); writer.writeInt(typeId); writer.writeString(meta.typeName()); @@ -428,7 +427,7 @@ public class PlatformContextImpl implements PlatformContext { } /** {@inheritDoc} */ - @Override public void writeEvent(PortableRawWriterEx writer, Event evt) { + @Override public void writeEvent(BinaryRawWriterEx writer, Event evt) { assert writer != null; if (evt == null) @@ -566,7 +565,7 @@ public class PlatformContextImpl implements PlatformContext { * @param writer Writer. * @param evt Event. */ - private void writeCommonEventData(PortableRawWriterEx writer, EventAdapter evt) { + private void writeCommonEventData(BinaryRawWriterEx writer, EventAdapter evt) { PlatformUtils.writeIgniteUuid(writer, evt.id()); writer.writeLong(evt.localOrder()); writeNode(writer, evt.node()); http://git-wip-us.apache.org/repos/asf/ignite/blob/b783d2b7/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformExtendedException.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformExtendedException.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformExtendedException.java index 825db6c..e8e7b9b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformExtendedException.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformExtendedException.java @@ -17,7 +17,7 @@ package org.apache.ignite.internal.processors.platform; -import org.apache.ignite.internal.portable.PortableRawWriterEx; +import org.apache.ignite.internal.portable.BinaryRawWriterEx; /** * Denotes an exception which has some data to be written in a special manner. @@ -53,5 +53,5 @@ public abstract class PlatformExtendedException extends PlatformException { * * @param writer Writer. */ - public abstract void writeData(PortableRawWriterEx writer); + public abstract void writeData(BinaryRawWriterEx writer); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/b783d2b7/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessorImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessorImpl.java index d783928..27d7da3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessorImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessorImpl.java @@ -26,7 +26,7 @@ import org.apache.ignite.configuration.PlatformConfiguration; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteComputeImpl; import org.apache.ignite.internal.cluster.ClusterGroupAdapter; -import org.apache.ignite.internal.portable.PortableRawWriterEx; +import org.apache.ignite.internal.portable.BinaryRawWriterEx; import org.apache.ignite.internal.processors.GridProcessorAdapter; import org.apache.ignite.internal.processors.cache.IgniteCacheProxy; import org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl; @@ -122,7 +122,7 @@ public class PlatformProcessorImpl extends GridProcessorAdapter implements Platf try (PlatformMemory mem = platformCtx.memory().allocate()) { PlatformOutputStream out = mem.output(); - PortableRawWriterEx writer = platformCtx.writer(out); + BinaryRawWriterEx writer = platformCtx.writer(out); writer.writeString(ctx.gridName()); @@ -253,6 +253,8 @@ public class PlatformProcessorImpl extends GridProcessorAdapter implements Platf throws IgniteCheckedException { IgniteDataStreamer ldr = ctx.dataStream().dataStreamer(cacheName); + ldr.keepBinary(true); + return new PlatformDataStreamer(platformCtx, cacheName, (DataStreamerImpl)ldr, keepPortable); } http://git-wip-us.apache.org/repos/asf/ignite/blob/b783d2b7/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java index ecdfc2c..6ec52d6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java @@ -28,8 +28,8 @@ import org.apache.ignite.cache.query.ScanQuery; import org.apache.ignite.cache.query.SqlFieldsQuery; import org.apache.ignite.cache.query.SqlQuery; import org.apache.ignite.cache.query.TextQuery; -import org.apache.ignite.internal.portable.PortableRawReaderEx; -import org.apache.ignite.internal.portable.PortableRawWriterEx; +import org.apache.ignite.internal.portable.BinaryRawReaderEx; +import org.apache.ignite.internal.portable.BinaryRawWriterEx; import org.apache.ignite.internal.processors.cache.CacheOperationContext; import org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException; import org.apache.ignite.internal.processors.cache.IgniteCacheProxy; @@ -44,6 +44,7 @@ import org.apache.ignite.internal.processors.platform.utils.PlatformFutureUtils; import org.apache.ignite.internal.processors.platform.utils.PlatformUtils; import org.apache.ignite.internal.util.GridConcurrentFactory; import org.apache.ignite.internal.util.typedef.C1; +import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteFuture; import org.jetbrains.annotations.Nullable; @@ -179,7 +180,7 @@ public class PlatformCache extends PlatformAbstractTarget { /** Underlying JCache. */ private final IgniteCacheProxy cache; - /** Whether this cache is created with "keepPortable" flag on the other side. */ + /** Whether this cache is created with "keepBinary" flag on the other side. */ private final boolean keepPortable; /** */ @@ -232,7 +233,7 @@ public class PlatformCache extends PlatformAbstractTarget { if (keepPortable) return this; - return new PlatformCache(platformCtx, cache.withSkipStore(), true); + return new PlatformCache(platformCtx, cache.withKeepBinary(), true); } /** @@ -276,7 +277,7 @@ public class PlatformCache extends PlatformAbstractTarget { } /** {@inheritDoc} */ - @Override protected long processInStreamOutLong(int type, PortableRawReaderEx reader) throws IgniteCheckedException { + @Override protected long processInStreamOutLong(int type, BinaryRawReaderEx reader) throws IgniteCheckedException { switch (type) { case OP_PUT: cache.put(reader.readObjectDetached(), reader.readObjectDetached()); @@ -372,7 +373,7 @@ public class PlatformCache extends PlatformAbstractTarget { /** * Loads cache via localLoadCache or loadCache. */ - private void loadCache0(PortableRawReaderEx reader, boolean loc) throws IgniteCheckedException { + private void loadCache0(BinaryRawReaderEx reader, boolean loc) throws IgniteCheckedException { PlatformCacheEntryFilter filter = null; Object pred = reader.readObjectDetached(); @@ -389,7 +390,7 @@ public class PlatformCache extends PlatformAbstractTarget { } /** {@inheritDoc} */ - @Override protected Object processInStreamOutObject(int type, PortableRawReaderEx reader) + @Override protected Object processInStreamOutObject(int type, BinaryRawReaderEx reader) throws IgniteCheckedException { switch (type) { case OP_QRY_SQL: @@ -432,7 +433,7 @@ public class PlatformCache extends PlatformAbstractTarget { * @param reader Reader. * @return Arguments. */ - @Nullable private Object[] readQueryArgs(PortableRawReaderEx reader) { + @Nullable private Object[] readQueryArgs(BinaryRawReaderEx reader) { int cnt = reader.readInt(); if (cnt > 0) { @@ -448,7 +449,7 @@ public class PlatformCache extends PlatformAbstractTarget { } /** {@inheritDoc} */ - @Override protected void processOutStream(int type, PortableRawWriterEx writer) throws IgniteCheckedException { + @Override protected void processOutStream(int type, BinaryRawWriterEx writer) throws IgniteCheckedException { switch (type) { case OP_GET_NAME: writer.writeObject(cache.getName()); @@ -521,7 +522,7 @@ public class PlatformCache extends PlatformAbstractTarget { /** {@inheritDoc} */ @SuppressWarnings({"IfMayBeConditional", "ConstantConditions"}) - @Override protected void processInStreamOutStream(int type, PortableRawReaderEx reader, PortableRawWriterEx writer) + @Override protected void processInStreamOutStream(int type, BinaryRawReaderEx reader, BinaryRawWriterEx writer) throws IgniteCheckedException { switch (type) { case OP_GET: { @@ -640,7 +641,7 @@ public class PlatformCache extends PlatformAbstractTarget { * @param writer Writer. * @param results Results. */ - private static void writeInvokeAllResult(PortableRawWriterEx writer, Map<Object, EntryProcessorResult> results) { + private static void writeInvokeAllResult(BinaryRawWriterEx writer, Map<Object, EntryProcessorResult> results) { if (results == null) { writer.writeInt(-1); @@ -674,7 +675,7 @@ public class PlatformCache extends PlatformAbstractTarget { * @param writer Writer. * @param ex Exception. */ - private static void writeError(PortableRawWriterEx writer, Exception ex) { + private static void writeError(BinaryRawWriterEx writer, Exception ex) { if (ex.getCause() instanceof PlatformNativeException) writer.writeObjectDetached(((PlatformNativeException)ex.getCause()).cause()); else { @@ -845,7 +846,7 @@ public class PlatformCache extends PlatformAbstractTarget { /** * Runs specified query. */ - private PlatformQueryCursor runQuery(PortableRawReaderEx reader, Query qry) throws IgniteCheckedException { + private PlatformQueryCursor runQuery(BinaryRawReaderEx reader, Query qry) throws IgniteCheckedException { try { QueryCursorEx cursor = (QueryCursorEx) cache.query(qry); @@ -861,7 +862,7 @@ public class PlatformCache extends PlatformAbstractTarget { /** * Runs specified fields query. */ - private PlatformFieldsQueryCursor runFieldsQuery(PortableRawReaderEx reader, Query qry) + private PlatformFieldsQueryCursor runFieldsQuery(BinaryRawReaderEx reader, Query qry) throws IgniteCheckedException { try { QueryCursorEx cursor = (QueryCursorEx) cache.query(qry); @@ -877,7 +878,7 @@ public class PlatformCache extends PlatformAbstractTarget { /** * Reads the query of specified type. */ - private Query readInitialQuery(PortableRawReaderEx reader) throws IgniteCheckedException { + private Query readInitialQuery(BinaryRawReaderEx reader) throws IgniteCheckedException { int typ = reader.readInt(); switch (typ) { @@ -900,7 +901,7 @@ public class PlatformCache extends PlatformAbstractTarget { /** * Reads sql query. */ - private Query readSqlQuery(PortableRawReaderEx reader) { + private Query readSqlQuery(BinaryRawReaderEx reader) { boolean loc = reader.readBoolean(); String sql = reader.readString(); String typ = reader.readString(); @@ -914,7 +915,7 @@ public class PlatformCache extends PlatformAbstractTarget { /** * Reads fields query. */ - private Query readFieldsQuery(PortableRawReaderEx reader) { + private Query readFieldsQuery(BinaryRawReaderEx reader) { boolean loc = reader.readBoolean(); String sql = reader.readString(); final int pageSize = reader.readInt(); @@ -927,7 +928,7 @@ public class PlatformCache extends PlatformAbstractTarget { /** * Reads text query. */ - private Query readTextQuery(PortableRawReaderEx reader) { + private Query readTextQuery(BinaryRawReaderEx reader) { boolean loc = reader.readBoolean(); String txt = reader.readString(); String typ = reader.readString(); @@ -939,7 +940,7 @@ public class PlatformCache extends PlatformAbstractTarget { /** * Reads scan query. */ - private Query readScanQuery(PortableRawReaderEx reader) { + private Query readScanQuery(BinaryRawReaderEx reader) { boolean loc = reader.readBoolean(); final int pageSize = reader.readInt(); @@ -966,7 +967,7 @@ public class PlatformCache extends PlatformAbstractTarget { */ private static class GetAllWriter implements PlatformFutureUtils.Writer { /** <inheritDoc /> */ - @Override public void write(PortableRawWriterEx writer, Object obj, Throwable err) { + @Override public void write(BinaryRawWriterEx writer, Object obj, Throwable err) { assert obj instanceof Map; PlatformUtils.writeNullableMap(writer, (Map) obj); @@ -983,7 +984,7 @@ public class PlatformCache extends PlatformAbstractTarget { */ private static class EntryProcessorInvokeWriter implements PlatformFutureUtils.Writer { /** <inheritDoc /> */ - @Override public void write(PortableRawWriterEx writer, Object obj, Throwable err) { + @Override public void write(BinaryRawWriterEx writer, Object obj, Throwable err) { if (err == null) { writer.writeBoolean(false); // No error. @@ -1007,7 +1008,7 @@ public class PlatformCache extends PlatformAbstractTarget { */ private static class EntryProcessorInvokeAllWriter implements PlatformFutureUtils.Writer { /** <inheritDoc /> */ - @Override public void write(PortableRawWriterEx writer, Object obj, Throwable err) { + @Override public void write(BinaryRawWriterEx writer, Object obj, Throwable err) { writeInvokeAllResult(writer, (Map)obj); } http://git-wip-us.apache.org/repos/asf/ignite/blob/b783d2b7/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryFilterImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryFilterImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryFilterImpl.java index 5f8ec8f..6998451 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryFilterImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryFilterImpl.java @@ -18,7 +18,7 @@ package org.apache.ignite.internal.processors.platform.cache; import org.apache.ignite.Ignite; -import org.apache.ignite.internal.portable.PortableRawWriterEx; +import org.apache.ignite.internal.portable.BinaryRawWriterEx; import org.apache.ignite.internal.processors.platform.PlatformAbstractPredicate; import org.apache.ignite.internal.processors.platform.PlatformContext; import org.apache.ignite.internal.processors.platform.memory.PlatformMemory; @@ -58,7 +58,7 @@ public class PlatformCacheEntryFilterImpl extends PlatformAbstractPredicate impl try (PlatformMemory mem = ctx.memory().allocate()) { PlatformOutputStream out = mem.output(); - PortableRawWriterEx writer = ctx.writer(out); + BinaryRawWriterEx writer = ctx.writer(out); writer.writeObject(k); writer.writeObject(v); @@ -94,7 +94,7 @@ public class PlatformCacheEntryFilterImpl extends PlatformAbstractPredicate impl try (PlatformMemory mem = ctx.memory().allocate()) { PlatformOutputStream out = mem.output(); - PortableRawWriterEx writer = ctx.writer(out); + BinaryRawWriterEx writer = ctx.writer(out); writer.writeObject(pred); http://git-wip-us.apache.org/repos/asf/ignite/blob/b783d2b7/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryProcessorImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryProcessorImpl.java index f59a63f..ce06ec1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryProcessorImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryProcessorImpl.java @@ -26,8 +26,8 @@ import javax.cache.processor.MutableEntry; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.IgniteKernal; -import org.apache.ignite.internal.portable.PortableRawReaderEx; -import org.apache.ignite.internal.portable.PortableRawWriterEx; +import org.apache.ignite.internal.portable.BinaryRawReaderEx; +import org.apache.ignite.internal.portable.BinaryRawWriterEx; import org.apache.ignite.internal.processors.platform.PlatformContext; import org.apache.ignite.internal.processors.platform.PlatformProcessor; import org.apache.ignite.internal.processors.platform.memory.PlatformInputStream; @@ -119,7 +119,7 @@ public class PlatformCacheEntryProcessorImpl implements PlatformCacheEntryProces try (PlatformMemory outMem = ctx.memory().allocate()) { PlatformOutputStream out = outMem.output(); - PortableRawWriterEx writer = ctx.writer(out); + BinaryRawWriterEx writer = ctx.writer(out); writeEntryAndProcessor(entry, writer); @@ -132,7 +132,7 @@ public class PlatformCacheEntryProcessorImpl implements PlatformCacheEntryProces in.synchronize(); - PortableRawReaderEx reader = ctx.reader(in); + BinaryRawReaderEx reader = ctx.reader(in); return readResultAndUpdateEntry(ctx, entry, reader); } @@ -145,7 +145,7 @@ public class PlatformCacheEntryProcessorImpl implements PlatformCacheEntryProces * @param entry Entry to process. * @param writer Writer. */ - private void writeEntryAndProcessor(MutableEntry entry, PortableRawWriterEx writer) { + private void writeEntryAndProcessor(MutableEntry entry, BinaryRawWriterEx writer) { writer.writeObject(entry.getKey()); writer.writeObject(entry.getValue()); @@ -170,7 +170,7 @@ public class PlatformCacheEntryProcessorImpl implements PlatformCacheEntryProces * @throws javax.cache.processor.EntryProcessorException If processing has failed in user code. */ @SuppressWarnings("unchecked") - private Object readResultAndUpdateEntry(PlatformContext ctx, MutableEntry entry, PortableRawReaderEx reader) { + private Object readResultAndUpdateEntry(PlatformContext ctx, MutableEntry entry, BinaryRawReaderEx reader) { byte state = reader.readByte(); switch (state) { http://git-wip-us.apache.org/repos/asf/ignite/blob/b783d2b7/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheIterator.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheIterator.java index 78ca683..a5659af 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheIterator.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheIterator.java @@ -20,7 +20,7 @@ package org.apache.ignite.internal.processors.platform.cache; import java.util.Iterator; import javax.cache.Cache; import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.internal.portable.PortableRawWriterEx; +import org.apache.ignite.internal.portable.BinaryRawWriterEx; import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget; import org.apache.ignite.internal.processors.platform.PlatformContext; @@ -47,7 +47,7 @@ public class PlatformCacheIterator extends PlatformAbstractTarget { } /** {@inheritDoc} */ - @Override protected void processOutStream(int type, PortableRawWriterEx writer) throws IgniteCheckedException { + @Override protected void processOutStream(int type, BinaryRawWriterEx writer) throws IgniteCheckedException { switch (type) { case OP_NEXT: if (iter.hasNext()) { http://git-wip-us.apache.org/repos/asf/ignite/blob/b783d2b7/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCachePartialUpdateException.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCachePartialUpdateException.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCachePartialUpdateException.java index ef17a06..8314e3a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCachePartialUpdateException.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCachePartialUpdateException.java @@ -17,7 +17,7 @@ package org.apache.ignite.internal.processors.platform.cache; -import org.apache.ignite.internal.portable.PortableRawWriterEx; +import org.apache.ignite.internal.portable.BinaryRawWriterEx; import org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException; import org.apache.ignite.internal.processors.platform.PlatformContext; import org.apache.ignite.internal.processors.platform.PlatformExtendedException; @@ -49,7 +49,7 @@ public class PlatformCachePartialUpdateException extends PlatformExtendedExcepti } /** {@inheritDoc} */ - @Override public void writeData(PortableRawWriterEx writer) { + @Override public void writeData(BinaryRawWriterEx writer) { Collection keys = ((CachePartialUpdateCheckedException)getCause()).failedKeys(); writer.writeBoolean(keepPortable);
