http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7253addd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index 342ebd0..f417311 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -770,32 +770,29 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter GridCacheVersionConflictContext<?, ?> conflictCtx = null; if (conflictNeedResolve) { -// TODO IGNITE-51. -// IgniteBiTuple<GridCacheOperation, GridCacheVersionConflictContext<K, V>> -// conflictRes = conflictResolve(op, txEntry, val, valBytes, explicitVer, -// cached); -// -// assert conflictRes != null; -// -// conflictCtx = conflictRes.get2(); -// -// if (conflictCtx.isUseOld()) -// op = NOOP; -// else if (conflictCtx.isUseNew()) { -// txEntry.ttl(conflictCtx.ttl()); -// txEntry.conflictExpireTime(conflictCtx.expireTime()); -// } -// else { -// assert conflictCtx.isMerge(); -// -// op = conflictRes.get1(); -// val = conflictCtx.mergeValue(); -// valBytes = null; -// explicitVer = writeVersion(); -// -// txEntry.ttl(conflictCtx.ttl()); -// txEntry.conflictExpireTime(conflictCtx.expireTime()); -// } + IgniteBiTuple<GridCacheOperation, GridCacheVersionConflictContext> conflictRes = + conflictResolve(op, txEntry, val, explicitVer, cached); + + assert conflictRes != null; + + conflictCtx = conflictRes.get2(); + + if (conflictCtx.isUseOld()) + op = NOOP; + else if (conflictCtx.isUseNew()) { + txEntry.ttl(conflictCtx.ttl()); + txEntry.conflictExpireTime(conflictCtx.expireTime()); + } + else { + assert conflictCtx.isMerge(); + + op = conflictRes.get1(); + val = txEntry.context().toCacheObject(conflictCtx.mergeValue()); + explicitVer = writeVersion(); + + txEntry.ttl(conflictCtx.ttl()); + txEntry.conflictExpireTime(conflictCtx.expireTime()); + } } else // Nullify explicit version so that innerSet/innerRemove will work as usual. @@ -1886,11 +1883,11 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter } /** {@inheritDoc} */ - @Override public <K, V> IgniteInternalFuture<?> putAllDrAsync( + @Override public IgniteInternalFuture<?> putAllDrAsync( GridCacheContext cacheCtx, - Map<? extends K, GridCacheDrInfo<V>> drMap + Map<KeyCacheObject, GridCacheDrInfo> drMap ) { - return putAllAsync0(cacheCtx, + return this.<Object, Object>putAllAsync0(cacheCtx, null, null, null, @@ -1918,9 +1915,9 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter } /** {@inheritDoc} */ - @Override public <K> IgniteInternalFuture<?> removeAllDrAsync( + @Override public IgniteInternalFuture<?> removeAllDrAsync( GridCacheContext cacheCtx, - Map<? extends K, GridCacheVersion> drMap + Map<KeyCacheObject, GridCacheVersion> drMap ) { return removeAllAsync0(cacheCtx, null, drMap, null, false, null); } @@ -1960,20 +1957,20 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter */ protected <K, V> IgniteInternalFuture<Set<KeyCacheObject>> enlistWrite( final GridCacheContext cacheCtx, - Collection<? extends K> keys, + Collection<?> keys, @Nullable GridCacheEntryEx cached, @Nullable ExpiryPolicy expiryPlc, boolean implicit, - @Nullable Map<? extends K, ? extends V> lookup, - @Nullable Map<? extends K, EntryProcessor<K, V, Object>> invokeMap, + @Nullable Map<?, ?> lookup, + @Nullable Map<?, EntryProcessor<K, V, Object>> invokeMap, @Nullable Object[] invokeArgs, boolean retval, boolean lockOnly, CacheEntryPredicate[] filter, final GridCacheReturn<CacheObject> ret, Collection<KeyCacheObject> enlisted, - @Nullable Map<? extends K, GridCacheDrInfo<V>> drPutMap, - @Nullable Map<? extends K, GridCacheVersion> drRmvMap + @Nullable Map<KeyCacheObject, GridCacheDrInfo> drPutMap, + @Nullable Map<KeyCacheObject, GridCacheVersion> drRmvMap ) { assert cached == null || keys.size() == 1; assert cached == null || F.first(keys).equals(cached.key()); @@ -1998,14 +1995,14 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter groupLockSanityCheck(cacheCtx, keys); - for (K key : keys) { + for (Object key : keys) { if (key == null) { setRollbackOnly(); throw new NullPointerException("Null key."); } - V val = rmv || lookup == null ? null : lookup.get(key); + Object val = rmv || lookup == null ? null : lookup.get(key); EntryProcessor entryProcessor = invokeMap == null ? null : invokeMap.get(key); GridCacheVersion drVer; @@ -2013,7 +2010,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter long drExpireTime; if (drPutMap != null) { - GridCacheDrInfo<V> info = drPutMap.get(key); + GridCacheDrInfo info = drPutMap.get(key); assert info != null; @@ -2510,7 +2507,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter @Nullable Map<? extends K, ? extends V> map, @Nullable Map<? extends K, ? extends EntryProcessor<K, V, Object>> invokeMap, @Nullable final Object[] invokeArgs, - @Nullable final Map<? extends K, GridCacheDrInfo<V>> drMap, + @Nullable final Map<KeyCacheObject, GridCacheDrInfo> drMap, final boolean retval, @Nullable GridCacheEntryEx cached, @Nullable final CacheEntryPredicate[] filter @@ -2523,14 +2520,14 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter needReturnValue(true); // Cached entry may be passed only from entry wrapper. - final Map<K, V> map0; - final Map<K, EntryProcessor<K, V, Object>> invokeMap0; + final Map<?, ?> map0; + final Map<?, EntryProcessor<K, V, Object>> invokeMap0; if (drMap != null) { assert map == null; - map0 = (Map<K, V>)F.viewReadOnly(drMap, new IgniteClosure<GridCacheDrInfo<V>, V>() { - @Override public V apply(GridCacheDrInfo<V> val) { + map0 = F.viewReadOnly(drMap, new IgniteClosure<GridCacheDrInfo, Object>() { + @Override public Object apply(GridCacheDrInfo val) { return val.value(); } }); @@ -2538,7 +2535,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter invokeMap0 = null; } else { - map0 = (Map<K, V>)map; + map0 = map; invokeMap0 = (Map<K, EntryProcessor<K, V, Object>>)invokeMap; } @@ -2573,7 +2570,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter } try { - Set<? extends K> keySet = map0 != null ? map0.keySet() : invokeMap0.keySet(); + Set<?> keySet = map0 != null ? map0.keySet() : invokeMap0.keySet(); Collection<KeyCacheObject> enlisted = new ArrayList<>(); @@ -2721,7 +2718,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter private <K, V> IgniteInternalFuture<GridCacheReturn<CacheObject>> removeAllAsync0( final GridCacheContext cacheCtx, @Nullable final Collection<? extends K> keys, - @Nullable Map<? extends K, GridCacheVersion> drMap, + @Nullable Map<KeyCacheObject, GridCacheVersion> drMap, @Nullable GridCacheEntryEx cached, final boolean retval, @Nullable final CacheEntryPredicate[] filter) { @@ -2730,7 +2727,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter if (retval) needReturnValue(true); - final Collection<? extends K> keys0; + final Collection<?> keys0; if (drMap != null) { assert keys == null;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7253addd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java index 1af6378..936e4e8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java @@ -129,18 +129,18 @@ public interface IgniteTxLocalEx extends IgniteInternalTx { * @param drMap DR map to put. * @return Future for DR put operation. */ - public <K, V> IgniteInternalFuture<?> putAllDrAsync( + public IgniteInternalFuture<?> putAllDrAsync( GridCacheContext cacheCtx, - Map<? extends K, GridCacheDrInfo<V>> drMap); + Map<KeyCacheObject, GridCacheDrInfo> drMap); /** * @param cacheCtx Cache context. * @param drMap DR map. * @return Future for asynchronous remove. */ - public <K> IgniteInternalFuture<?> removeAllDrAsync( + public IgniteInternalFuture<?> removeAllDrAsync( GridCacheContext cacheCtx, - Map<? extends K, GridCacheVersion> drMap); + Map<KeyCacheObject, GridCacheVersion> drMap); /** * Performs keys locking for affinity-based group lock transactions. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7253addd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheRawVersionedEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheRawVersionedEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheRawVersionedEntry.java index 23585fb..8235ec0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheRawVersionedEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheRawVersionedEntry.java @@ -18,31 +18,26 @@ package org.apache.ignite.internal.processors.cache.version; import org.apache.ignite.*; -import org.apache.ignite.internal.util.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.processors.dataload.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.marshaller.*; +import org.apache.ignite.plugin.extensions.communication.*; import org.jetbrains.annotations.*; import java.io.*; -import java.util.*; +import java.nio.*; /** * Raw versioned entry. */ -public class GridCacheRawVersionedEntry<K, V> implements GridCacheVersionedEntry<K, V>, GridCacheVersionable, - Map.Entry<K, V>, Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** Key. */ - private K key; - +public class GridCacheRawVersionedEntry<K, V> extends IgniteDataLoaderEntry implements + GridCacheVersionedEntry<K, V>, GridCacheVersionable, Externalizable { /** Key bytes. */ + @GridDirectTransient private byte[] keyBytes; - /** Value. */ - private V val; - /** Value bytes. */ private byte[] valBytes; @@ -63,21 +58,44 @@ public class GridCacheRawVersionedEntry<K, V> implements GridCacheVersionedEntry } /** - * Constructor. + * Constructor used for local store load when key and value are available. * * @param key Key. - * @param keyBytes Key bytes. * @param val Value. - * @param valBytes Value bytes. * @param expireTime Expire time. * @param ttl TTL. * @param ver Version. */ - public GridCacheRawVersionedEntry(K key, @Nullable byte[] keyBytes, @Nullable V val, @Nullable byte[] valBytes, - long ttl, long expireTime, GridCacheVersion ver) { + public GridCacheRawVersionedEntry(KeyCacheObject key, + @Nullable CacheObject val, + long ttl, + long expireTime, + GridCacheVersion ver) { + assert key != null; + this.key = key; - this.keyBytes = keyBytes; this.val = val; + this.ttl = ttl; + this.expireTime = expireTime; + this.ver = ver; + } + + /** + * Constructor used in receiver hub where marshalled key and value are available and we do not want to + * unmarshal value. + * + * @param keyBytes Key. + * @param valBytes Value bytes. + * @param expireTime Expire time. + * @param ttl TTL. + * @param ver Version. + */ + public GridCacheRawVersionedEntry(byte[] keyBytes, + byte[] valBytes, + long ttl, + long expireTime, + GridCacheVersion ver) { + this.keyBytes = keyBytes; this.valBytes = valBytes; this.ttl = ttl; this.expireTime = expireTime; @@ -88,7 +106,14 @@ public class GridCacheRawVersionedEntry<K, V> implements GridCacheVersionedEntry @Override public K key() { assert key != null : "Entry is being improperly processed."; - return key; + return key.value(null, false); + } + + /** + * @param key Key. + */ + public void key(KeyCacheObject key) { + this.key = key; } /** @@ -100,7 +125,7 @@ public class GridCacheRawVersionedEntry<K, V> implements GridCacheVersionedEntry /** {@inheritDoc} */ @Override public V value() { - return val; + return val != null ? val.<V>value(null, false) : null; } /** @@ -149,90 +174,205 @@ public class GridCacheRawVersionedEntry<K, V> implements GridCacheVersionedEntry * Perform internal unmarshal of this entry. It must be performed after entry is deserialized and before * its restored key/value are needed. * + * @param ctx Context. * @param marsh Marshaller. * @throws IgniteCheckedException If failed. */ - public void unmarshal(Marshaller marsh) throws IgniteCheckedException { - unmarshalKey(marsh); + public void unmarshal(CacheObjectContext ctx, Marshaller marsh) throws IgniteCheckedException { + unmarshalKey(ctx, marsh); - if (valBytes != null && val == null) + if (val == null && valBytes != null) { val = marsh.unmarshal(valBytes, null); + + val.finishUnmarshal(ctx, null); + } + } + + /** + * @param ctx Context. + * @throws IgniteCheckedException If failed. + */ + public void unmarshal(CacheObjectContext ctx) throws IgniteCheckedException { + assert key != null; + + key.finishUnmarshal(ctx, null); + + if (val != null) + val.finishUnmarshal(ctx, null); } /** * Perform internal key unmarshal of this entry. It must be performed after entry is deserialized and before * its restored key/value are needed. * + * @param ctx Context. * @param marsh Marshaller. * @throws IgniteCheckedException If failed. */ - public void unmarshalKey(Marshaller marsh) throws IgniteCheckedException { - if (key == null) + public void unmarshalKey(CacheObjectContext ctx, Marshaller marsh) throws IgniteCheckedException { + if (key == null) { + assert keyBytes != null; + key = marsh.unmarshal(keyBytes, null); + + key.finishUnmarshal(ctx, null); + } } /** * Perform internal marshal of this entry before it will be serialized. * + * @param ctx Context. * @param marsh Marshaller. * @throws IgniteCheckedException If failed. */ - public void marshal(Marshaller marsh) throws IgniteCheckedException { - if (keyBytes == null) + public void marshal(CacheObjectContext ctx, Marshaller marsh) throws IgniteCheckedException { + if (keyBytes == null) { + key.prepareMarshal(ctx); + keyBytes = marsh.marshal(key); + } + + if (valBytes == null && val != null) { + val.prepareMarshal(ctx); - if (valBytes == null && val != null) valBytes = marsh.marshal(val); + } + } + + /** + * @param ctx Context. + * @throws IgniteCheckedException If failed. + */ + public void prepareDirectMarshal(CacheObjectContext ctx) throws IgniteCheckedException { + key.prepareMarshal(ctx); + + if (val != null) + val.prepareMarshal(ctx); } /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - assert keyBytes != null; + @Override public byte directType() { + return 103; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + if (!super.readFrom(buf, reader)) + return false; + + switch (reader.state()) { + case 2: + expireTime = reader.readLong("expireTime"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 3: + ttl = reader.readLong("ttl"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 4: + valBytes = reader.readByteArray("valBytes"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); - U.writeByteArray(out, keyBytes); - U.writeByteArray(out, valBytes); + case 5: + ver = reader.readMessage("ver"); - out.writeLong(ttl); + if (!reader.isLastRead()) + return false; - if (ttl != 0) - out.writeLong(expireTime); + reader.incrementState(); - out.writeObject(ver); + } + + assert key != null; + assert !(val != null && valBytes != null); + + return true; } /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - keyBytes = U.readByteArray(in); - valBytes = U.readByteArray(in); + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + assert key != null; + assert !(val != null && valBytes != null); + + writer.setBuffer(buf); + + if (!super.writeTo(buf, writer)) + return false; + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 2: + if (!writer.writeLong("expireTime", expireTime)) + return false; + + writer.incrementState(); + + case 3: + if (!writer.writeLong("ttl", ttl)) + return false; - ttl = in.readLong(); + writer.incrementState(); - if (ttl != 0) - expireTime = in.readLong(); + case 4: + if (!writer.writeByteArray("valBytes", valBytes)) + return false; - ver = (GridCacheVersion)in.readObject(); + writer.incrementState(); - assert keyBytes != null; + case 5: + if (!writer.writeMessage("ver", ver)) + return false; + + writer.incrementState(); + + } + + return true; } /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridCacheRawVersionedEntry.class, this, "keyBytesLen", - keyBytes != null ? keyBytes.length : "n/a", "valBytesLen", valBytes != null ? valBytes.length : "n/a"); + @Override public byte fieldsCount() { + return 6; } /** {@inheritDoc} */ - @Override public K getKey() { - return key(); + @Override public void writeExternal(ObjectOutput out) throws IOException { + assert false; } /** {@inheritDoc} */ - @Override public V getValue() { - return value(); + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + assert false; } /** {@inheritDoc} */ - @Override public V setValue(V val) { - throw new UnsupportedOperationException(); + @Override public String toString() { + return S.toString(GridCacheRawVersionedEntry.class, this, "keyBytesLen", + keyBytes != null ? keyBytes.length : "n/a", "valBytesLen", + valBytes != null ? valBytes.length : "n/a"); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7253addd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionEx.java index 9a6cbd2..1fe8a25 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionEx.java @@ -17,7 +17,10 @@ package org.apache.ignite.internal.processors.cache.version; +import org.apache.ignite.plugin.extensions.communication.*; + import java.io.*; +import java.nio.*; /** * Extended cache version which also has additional DR version. @@ -87,6 +90,66 @@ public class GridCacheVersionEx extends GridCacheVersion { } /** {@inheritDoc} */ + @Override public byte directType() { + return 104; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 5; + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!super.writeTo(buf, writer)) + return false; + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 4: + if (!writer.writeMessage("drVer", drVer)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + if (!super.readFrom(buf, reader)) + return false; + + switch (reader.state()) { + case 4: + drVer = reader.readMessage("drVer"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ @Override public void readExternal(ObjectInput in) throws IOException { super.readExternal(in); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7253addd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java index 74b38f9..20dd6bf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java @@ -37,7 +37,7 @@ import static org.apache.ignite.events.EventType.*; * like, for example GridCacheContext, as it may be reused between different * caches. */ -public class GridCacheVersionManager<K, V> extends GridCacheSharedManagerAdapter<K, V> { +public class GridCacheVersionManager extends GridCacheSharedManagerAdapter { /** Timestamp used as base time for cache topology version (January 1, 2014). */ public static final long TOP_VER_BASE_TIME = 1388520000000L; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7253addd/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadUpdateJob.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadUpdateJob.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadUpdateJob.java index 7d46e23..e54a281 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadUpdateJob.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadUpdateJob.java @@ -57,6 +57,7 @@ class GridDataLoadUpdateJob implements GridPlainCallable<Object> { * @param cacheName Cache name. * @param col Entries to put. * @param ignoreDepOwnership {@code True} to ignore deployment ownership. + * @param skipStore Skip store flag. * @param updater Updater. */ GridDataLoadUpdateJob( http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7253addd/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderEntry.java index 89bebe4..f719cfc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderEntry.java @@ -31,11 +31,11 @@ import java.util.*; public class IgniteDataLoaderEntry implements Map.Entry<KeyCacheObject, CacheObject>, Message { /** */ @GridToStringInclude - private KeyCacheObject key; + protected KeyCacheObject key; /** */ @GridToStringInclude - private CacheObject val; + protected CacheObject val; /** * http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7253addd/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java index 30fd8bb..154e685 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java @@ -243,6 +243,13 @@ public class IgniteDataLoaderImpl<K, V> implements IgniteDataLoader<K, V>, Delay } /** + * @return Cache object context. + */ + public CacheObjectContext cacheObjectContext() { + return cacheObjCtx; + } + + /** * Enters busy lock. */ private void enterBusy() { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7253addd/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/GridDrDataLoadCacheUpdater.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/GridDrDataLoadCacheUpdater.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/GridDrDataLoadCacheUpdater.java index 115141d..ae8c77b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/GridDrDataLoadCacheUpdater.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/GridDrDataLoadCacheUpdater.java @@ -32,19 +32,20 @@ import java.util.*; /** * Data center replication cache updater for data loader. */ -public class GridDrDataLoadCacheUpdater<K, V> implements IgniteDataLoader.Updater<K, V>, +public class GridDrDataLoadCacheUpdater implements IgniteDataLoader.Updater<KeyCacheObject, CacheObject>, GridDataLoadCacheUpdaters.InternalUpdater { /** */ private static final long serialVersionUID = 0L; /** {@inheritDoc} */ - @Override public void update(IgniteCache<K, V> cache0, Collection<Map.Entry<K, V>> col) { + @Override public void update(IgniteCache<KeyCacheObject, CacheObject> cache0, + Collection<Map.Entry<KeyCacheObject, CacheObject>> col) { try { String cacheName = cache0.getConfiguration(CacheConfiguration.class).getName(); GridKernalContext ctx = ((IgniteKernal)cache0.unwrap(Ignite.class)).context(); IgniteLogger log = ctx.log(GridDrDataLoadCacheUpdater.class); - GridCacheAdapter<K, V> cache = ctx.cache().internalCache(cacheName); + GridCacheAdapter cache = ctx.cache().internalCache(cacheName); assert !F.isEmpty(col); @@ -56,20 +57,24 @@ public class GridDrDataLoadCacheUpdater<K, V> implements IgniteDataLoader.Update if (!f.isDone()) f.get(); - for (Map.Entry<K, V> entry0 : col) { - GridCacheRawVersionedEntry<K, V> entry = (GridCacheRawVersionedEntry<K, V>)entry0; + CacheObjectContext cacheObjCtx = cache.context().cacheObjectContext(); - entry.unmarshal(ctx.config().getMarshaller()); + for (Map.Entry<KeyCacheObject, CacheObject> entry0 : col) { + GridCacheRawVersionedEntry entry = (GridCacheRawVersionedEntry)entry0; - K key = entry.key(); + entry.unmarshal(cacheObjCtx, ctx.config().getMarshaller()); + + KeyCacheObject key = entry.getKey(); // Ensure that updater to not receive special-purpose values for TTL and expire time. assert entry.ttl() != CU.TTL_NOT_CHANGED && entry.ttl() != CU.TTL_ZERO && entry.ttl() >= 0; assert entry.expireTime() != CU.EXPIRE_TIME_CALCULATE && entry.expireTime() >= 0; - GridCacheDrInfo<V> val = entry.value() != null ? entry.ttl() != CU.TTL_ETERNAL ? - new GridCacheDrExpirationInfo<>(entry.value(), entry.version(), entry.ttl(), entry.expireTime()) : - new GridCacheDrInfo<>(entry.value(), entry.version()) : null; + CacheObject cacheVal = entry.getValue(); + + GridCacheDrInfo val = cacheVal != null ? entry.ttl() != CU.TTL_ETERNAL ? + new GridCacheDrExpirationInfo(cacheVal, entry.version(), entry.ttl(), entry.expireTime()) : + new GridCacheDrInfo(cacheVal, entry.version()) : null; if (val == null) cache.removeAllConflict(Collections.singletonMap(key, entry.version())); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7253addd/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManagerLoadTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManagerLoadTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManagerLoadTest.java index 6b97f9a..1235a1d 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManagerLoadTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManagerLoadTest.java @@ -60,7 +60,7 @@ public class GridCacheTtlManagerLoadTest extends GridCacheTtlManagerSelfTest { } }, 1); - GridCacheTtlManager<Object, Object> ttlMgr = g.internalCache().context().ttl(); + GridCacheTtlManager ttlMgr = g.internalCache().context().ttl(); for (int i = 0; i < 300; i++) { U.sleep(1000); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7253addd/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java index f1f58f9..644644e 100644 --- a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java @@ -48,7 +48,7 @@ public class GridCacheTestContext<K, V> extends GridCacheContext<K, V> { new GridCacheSharedContext<>( ctx, new IgniteTxManager(), - new GridCacheVersionManager<K, V>(), + new GridCacheVersionManager(), new GridCacheMvccManager(), new GridCacheDeploymentManager<K, V>(), new GridCachePartitionExchangeManager<K, V>(), @@ -67,8 +67,8 @@ public class GridCacheTestContext<K, V> extends GridCacheContext<K, V> { new CacheContinuousQueryManager<K, V>(), new GridCacheAffinityManager<K, V>(), new CacheDataStructuresManager<K, V>(), - new GridCacheTtlManager<K, V>(), - new GridOsCacheDrManager<K, V>(), - new CacheNoopJtaManager<K, V>()); + new GridCacheTtlManager(), + new GridOsCacheDrManager(), + new CacheNoopJtaManager()); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7253addd/modules/jta/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaManager.java ---------------------------------------------------------------------- diff --git a/modules/jta/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaManager.java b/modules/jta/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaManager.java index 6077e4a..7153c7d 100644 --- a/modules/jta/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaManager.java +++ b/modules/jta/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaManager.java @@ -28,7 +28,7 @@ import javax.transaction.*; /** * Implementation of {@link CacheJtaManagerAdapter}. */ -public class CacheJtaManager<K, V> extends CacheJtaManagerAdapter<K, V> { +public class CacheJtaManager extends CacheJtaManagerAdapter { /** */ private final ThreadLocal<GridCacheXAResource> xaRsrc = new ThreadLocal<>();