Merge with master - WIP.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f7d89fdb Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f7d89fdb Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f7d89fdb Branch: refs/heads/ignite-3477 Commit: f7d89fdb311761fd5a4a4b8594001fb6cae13e88 Parents: 2e55963 Author: Ilya Lantukh <[email protected]> Authored: Thu Dec 22 16:23:55 2016 +0300 Committer: Ilya Lantukh <[email protected]> Committed: Thu Dec 22 16:23:55 2016 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheProxyImpl.java | 24 ++++++++++---- .../processors/cache/GridCacheTtlManager.java | 3 +- .../processors/cache/IgniteInternalCache.java | 8 +++++ .../dht/atomic/GridDhtAtomicCache.java | 6 +++- .../GridDhtPartitionsSingleMessage.java | 1 - .../apache/ignite/internal/util/GridUnsafe.java | 11 ++++++ .../ignite/internal/util/IgniteUtils.java | 3 ++ .../query/h2/DmlStatementsProcessor.java | 2 +- .../h2/twostep/GridReduceQueryExecutor.java | 35 ++++++-------------- 9 files changed, 59 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/f7d89fdb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java index a3f32a8..8c73026 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java @@ -209,7 +209,8 @@ public class GridCacheProxyImpl<K, V> implements IgniteInternalCache<K, V>, Exte } /** {@inheritDoc} */ - @Override public void localLoadCache(IgniteBiPredicate<K, V> p, @Nullable Object[] args) throws IgniteCheckedException { + @Override public void localLoadCache(IgniteBiPredicate<K, V> p, + @Nullable Object[] args) throws IgniteCheckedException { CacheOperationContext prev = gate.enter(opCtx); try { @@ -915,6 +916,18 @@ public class GridCacheProxyImpl<K, V> implements IgniteInternalCache<K, V>, Exte } /** {@inheritDoc} */ + @Override public Set<Cache.Entry<K, V>> entrySetx(CacheEntryPredicate... filter) { + CacheOperationContext prev = gate.enter(opCtx); + + try { + return delegate.entrySetx(filter); + } + finally { + gate.leave(prev); + } + } + + /** {@inheritDoc} */ @Override public Iterator<Cache.Entry<K, V>> scanIterator(boolean keepBinary, @Nullable IgniteBiPredicate<Object, Object> p) throws IgniteCheckedException { CacheOperationContext prev = gate.enter(opCtx); @@ -981,8 +994,7 @@ public class GridCacheProxyImpl<K, V> implements IgniteInternalCache<K, V>, Exte @Nullable @Override public V localPeek(K key, CachePeekMode[] peekModes, @Nullable IgniteCacheExpiryPolicy plc) - throws IgniteCheckedException - { + throws IgniteCheckedException { CacheOperationContext prev = gate.enter(opCtx); try { @@ -1190,8 +1202,7 @@ public class GridCacheProxyImpl<K, V> implements IgniteInternalCache<K, V>, Exte /** {@inheritDoc} */ @Override public IgniteInternalFuture<?> removeAllConflictAsync(Map<KeyCacheObject, GridCacheVersion> drMap) - throws IgniteCheckedException - { + throws IgniteCheckedException { CacheOperationContext prev = gate.enter(opCtx); try { @@ -1341,7 +1352,8 @@ public class GridCacheProxyImpl<K, V> implements IgniteInternalCache<K, V>, Exte } /** {@inheritDoc} */ - @Override public boolean lockAll(@Nullable Collection<? extends K> keys, long timeout) throws IgniteCheckedException { + @Override public boolean lockAll(@Nullable Collection<? extends K> keys, + long timeout) throws IgniteCheckedException { CacheOperationContext prev = gate.enter(opCtx); try { http://git-wip-us.apache.org/repos/asf/ignite/blob/f7d89fdb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java index 7ccf890..abe1b45 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java @@ -125,7 +125,8 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter { U.cancel(cleanupWorker); U.join(cleanupWorker, log); - pendingEntries.clear(); + if (pendingEntries != null) + pendingEntries.clear(); cctx.shared().ttl().unregister(this); } http://git-wip-us.apache.org/repos/asf/ignite/blob/f7d89fdb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java index 568f92e..c2790db 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java @@ -940,6 +940,14 @@ public interface IgniteInternalCache<K, V> extends Iterable<Cache.Entry<K, V>> { @Nullable public Set<Cache.Entry<K, V>> entrySet(int part); /** + * Gets entry set containing internal entries. + * + * @param filter Filter. + * @return Entry set. + */ + public Set<Cache.Entry<K, V>> entrySetx(CacheEntryPredicate... filter); + + /** * Starts new transaction with the specified concurrency and isolation. * * @param concurrency Concurrency. http://git-wip-us.apache.org/repos/asf/ignite/blob/f7d89fdb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 2f2706e..e2bd45b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -466,7 +466,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } /** {@inheritDoc} */ - @Nullable @Override public V get0(K key, boolean deserializeBinary, boolean needVer) throws IgniteCheckedException { + @Nullable public V get0(K key, boolean deserializeBinary, boolean needVer) throws IgniteCheckedException { ctx.checkSecurity(SecurityPermission.CACHE_READ); if (keyCheck) @@ -549,12 +549,15 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { /** {@inheritDoc} */ @Override protected Map<K, V> getAll0(Collection<? extends K> keys, boolean deserializeBinary, boolean needVer) throws IgniteCheckedException { + CacheOperationContext opCtx = ctx.operationContextPerCall(); + return getAllAsyncInternal(keys, !ctx.config().isReadFromBackup(), true, null, ctx.kernalContext().job().currentTaskName(), deserializeBinary, + opCtx != null && opCtx.recovery(), false, true, needVer, @@ -580,6 +583,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { subjId, taskName, deserializeBinary, + recovery, skipVals, canRemap, needVer, http://git-wip-us.apache.org/repos/asf/ignite/blob/f7d89fdb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java index 0975a07..b380d9b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java @@ -221,7 +221,6 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes GridDhtPartitionMap2 map1 = parts.get(e.getKey()); assert map1 != null : e.getKey(); - assert F.isEmpty(map1.map()); assert !map1.hasMovingPartitions(); GridDhtPartitionMap2 map2 = parts.get(e.getValue()); http://git-wip-us.apache.org/repos/asf/ignite/blob/f7d89fdb/modules/core/src/main/java/org/apache/ignite/internal/util/GridUnsafe.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridUnsafe.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridUnsafe.java index 6e9efdb..783ab96 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridUnsafe.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridUnsafe.java @@ -1081,6 +1081,17 @@ public abstract class GridUnsafe { } /** + * Copies memory. + * + * @param src Source. + * @param dst Dst. + * @param len Length. + */ + public static void copyMemory(long src, long dst, long len) { + UNSAFE.copyMemory(src, dst, len); + } + + /** * Sets all bytes in a given block of memory to a copy of another block. * * @param srcBase Source base. http://git-wip-us.apache.org/repos/asf/ignite/blob/f7d89fdb/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java index 4a8a33c..c418acc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java @@ -9183,6 +9183,9 @@ public abstract class IgniteUtils { * @throws IgniteCheckedException If failed. */ public static File resolveWorkDirectory(String path, boolean delIfExist) throws IgniteCheckedException { + if (1 == 1) + return resolveWorkDirectory(defaultWorkDirectory(), path, delIfExist); + if (path == null) { String ggWork0 = igniteWork; http://git-wip-us.apache.org/repos/asf/ignite/blob/f7d89fdb/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java index 4030758..9d1bbd8 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java @@ -145,7 +145,7 @@ public class DmlStatementsProcessor { if (opCtx == null) // Mimics behavior of GridCacheAdapter#keepBinary and GridCacheProxyImpl#keepBinary - newOpCtx = new CacheOperationContext(false, null, true, null, false, null); + newOpCtx = new CacheOperationContext(false, null, true, null, false, null, false); else if (!opCtx.isKeepBinary()) newOpCtx = opCtx.keepBinary(); http://git-wip-us.apache.org/repos/asf/ignite/blob/f7d89fdb/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java index 333908f..51f8bef 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java @@ -586,7 +586,6 @@ public class GridReduceQueryExecutor { IgniteProductVersion minNodeVer = cctx.shared().exchange().minimumNodeVersion(topVer); - final boolean oldStyle = minNodeVer.compareToIgnoreTimestamp(DISTRIBUTED_JOIN_SINCE) < 0; final boolean distributedJoins = qry.distributedJoins(); cancel.set(new Runnable() { @@ -597,30 +596,18 @@ public class GridReduceQueryExecutor { boolean retry = false; - if (oldStyle && distributedJoins) - throw new CacheException("Failed to enable distributed joins. Topology contains older data nodes."); - if (send(nodes, - oldStyle ? - new GridQueryRequest(qryReqId, - r.pageSize, - space, - mapQrys, - topVer, - extraSpaces(space, qry.spaces()), - null, - timeoutMillis) : - new GridH2QueryRequest() - .requestId(qryReqId) - .topologyVersion(topVer) - .pageSize(r.pageSize) - .caches(qry.caches()) - .tables(distributedJoins ? qry.tables() : null) - .partitions(convert(partsMap)) - .queries(mapQrys) - .flags(distributedJoins ? GridH2QueryRequest.FLAG_DISTRIBUTED_JOINS : 0) - .timeout(timeoutMillis), - oldStyle && partsMap != null ? new ExplicitPartitionsSpecializer(partsMap) : null, + new GridH2QueryRequest() + .requestId(qryReqId) + .topologyVersion(topVer) + .pageSize(r.pageSize) + .caches(qry.caches()) + .tables(distributedJoins ? qry.tables() : null) + .partitions(convert(partsMap)) + .queries(mapQrys) + .flags(distributedJoins ? GridH2QueryRequest.FLAG_DISTRIBUTED_JOINS : 0) + .timeout(timeoutMillis), + null, distributedJoins) ) { awaitAllReplies(r, nodes);
