Merge with master - WIP.

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f7d89fdb
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f7d89fdb
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f7d89fdb

Branch: refs/heads/ignite-3477
Commit: f7d89fdb311761fd5a4a4b8594001fb6cae13e88
Parents: 2e55963
Author: Ilya Lantukh <[email protected]>
Authored: Thu Dec 22 16:23:55 2016 +0300
Committer: Ilya Lantukh <[email protected]>
Committed: Thu Dec 22 16:23:55 2016 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheProxyImpl.java    | 24 ++++++++++----
 .../processors/cache/GridCacheTtlManager.java   |  3 +-
 .../processors/cache/IgniteInternalCache.java   |  8 +++++
 .../dht/atomic/GridDhtAtomicCache.java          |  6 +++-
 .../GridDhtPartitionsSingleMessage.java         |  1 -
 .../apache/ignite/internal/util/GridUnsafe.java | 11 ++++++
 .../ignite/internal/util/IgniteUtils.java       |  3 ++
 .../query/h2/DmlStatementsProcessor.java        |  2 +-
 .../h2/twostep/GridReduceQueryExecutor.java     | 35 ++++++--------------
 9 files changed, 59 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/f7d89fdb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
index a3f32a8..8c73026 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
@@ -209,7 +209,8 @@ public class GridCacheProxyImpl<K, V> implements 
IgniteInternalCache<K, V>, Exte
     }
 
     /** {@inheritDoc} */
-    @Override public void localLoadCache(IgniteBiPredicate<K, V> p, @Nullable 
Object[] args) throws IgniteCheckedException {
+    @Override public void localLoadCache(IgniteBiPredicate<K, V> p,
+        @Nullable Object[] args) throws IgniteCheckedException {
         CacheOperationContext prev = gate.enter(opCtx);
 
         try {
@@ -915,6 +916,18 @@ public class GridCacheProxyImpl<K, V> implements 
IgniteInternalCache<K, V>, Exte
     }
 
     /** {@inheritDoc} */
+    @Override public Set<Cache.Entry<K, V>> entrySetx(CacheEntryPredicate... 
filter) {
+        CacheOperationContext prev = gate.enter(opCtx);
+
+        try {
+            return delegate.entrySetx(filter);
+        }
+        finally {
+            gate.leave(prev);
+        }
+    }
+
+    /** {@inheritDoc} */
     @Override public Iterator<Cache.Entry<K, V>> scanIterator(boolean 
keepBinary,
         @Nullable IgniteBiPredicate<Object, Object> p) throws 
IgniteCheckedException {
         CacheOperationContext prev = gate.enter(opCtx);
@@ -981,8 +994,7 @@ public class GridCacheProxyImpl<K, V> implements 
IgniteInternalCache<K, V>, Exte
     @Nullable @Override public V localPeek(K key,
         CachePeekMode[] peekModes,
         @Nullable IgniteCacheExpiryPolicy plc)
-        throws IgniteCheckedException
-    {
+        throws IgniteCheckedException {
         CacheOperationContext prev = gate.enter(opCtx);
 
         try {
@@ -1190,8 +1202,7 @@ public class GridCacheProxyImpl<K, V> implements 
IgniteInternalCache<K, V>, Exte
 
     /** {@inheritDoc} */
     @Override public IgniteInternalFuture<?> 
removeAllConflictAsync(Map<KeyCacheObject, GridCacheVersion> drMap)
-        throws IgniteCheckedException
-    {
+        throws IgniteCheckedException {
         CacheOperationContext prev = gate.enter(opCtx);
 
         try {
@@ -1341,7 +1352,8 @@ public class GridCacheProxyImpl<K, V> implements 
IgniteInternalCache<K, V>, Exte
     }
 
     /** {@inheritDoc} */
-    @Override public boolean lockAll(@Nullable Collection<? extends K> keys, 
long timeout) throws IgniteCheckedException {
+    @Override public boolean lockAll(@Nullable Collection<? extends K> keys,
+        long timeout) throws IgniteCheckedException {
         CacheOperationContext prev = gate.enter(opCtx);
 
         try {

http://git-wip-us.apache.org/repos/asf/ignite/blob/f7d89fdb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
index 7ccf890..abe1b45 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
@@ -125,7 +125,8 @@ public class GridCacheTtlManager extends 
GridCacheManagerAdapter {
         U.cancel(cleanupWorker);
         U.join(cleanupWorker, log);
 
-        pendingEntries.clear();
+        if (pendingEntries != null)
+            pendingEntries.clear();
 
         cctx.shared().ttl().unregister(this);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/f7d89fdb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java
index 568f92e..c2790db 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java
@@ -940,6 +940,14 @@ public interface IgniteInternalCache<K, V> extends 
Iterable<Cache.Entry<K, V>> {
     @Nullable public Set<Cache.Entry<K, V>> entrySet(int part);
 
     /**
+     * Gets entry set containing internal entries.
+     *
+     * @param filter Filter.
+     * @return Entry set.
+     */
+    public Set<Cache.Entry<K, V>> entrySetx(CacheEntryPredicate... filter);
+
+    /**
      * Starts new transaction with the specified concurrency and isolation.
      *
      * @param concurrency Concurrency.

http://git-wip-us.apache.org/repos/asf/ignite/blob/f7d89fdb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 2f2706e..e2bd45b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -466,7 +466,7 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
     }
 
     /** {@inheritDoc} */
-    @Nullable @Override public V get0(K key, boolean deserializeBinary, 
boolean needVer) throws IgniteCheckedException {
+    @Nullable public V get0(K key, boolean deserializeBinary, boolean needVer) 
throws IgniteCheckedException {
         ctx.checkSecurity(SecurityPermission.CACHE_READ);
 
         if (keyCheck)
@@ -549,12 +549,15 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
     /** {@inheritDoc} */
     @Override protected Map<K, V> getAll0(Collection<? extends K> keys, 
boolean deserializeBinary, boolean needVer)
         throws IgniteCheckedException {
+        CacheOperationContext opCtx = ctx.operationContextPerCall();
+
         return getAllAsyncInternal(keys,
             !ctx.config().isReadFromBackup(),
             true,
             null,
             ctx.kernalContext().job().currentTaskName(),
             deserializeBinary,
+            opCtx != null && opCtx.recovery(),
             false,
             true,
             needVer,
@@ -580,6 +583,7 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
             subjId,
             taskName,
             deserializeBinary,
+            recovery,
             skipVals,
             canRemap,
             needVer,

http://git-wip-us.apache.org/repos/asf/ignite/blob/f7d89fdb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
index 0975a07..b380d9b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
@@ -221,7 +221,6 @@ public class GridDhtPartitionsSingleMessage extends 
GridDhtPartitionsAbstractMes
                 GridDhtPartitionMap2 map1 = parts.get(e.getKey());
 
                 assert map1 != null : e.getKey();
-                assert F.isEmpty(map1.map());
                 assert !map1.hasMovingPartitions();
 
                 GridDhtPartitionMap2 map2 = parts.get(e.getValue());

http://git-wip-us.apache.org/repos/asf/ignite/blob/f7d89fdb/modules/core/src/main/java/org/apache/ignite/internal/util/GridUnsafe.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/GridUnsafe.java 
b/modules/core/src/main/java/org/apache/ignite/internal/util/GridUnsafe.java
index 6e9efdb..783ab96 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridUnsafe.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridUnsafe.java
@@ -1081,6 +1081,17 @@ public abstract class GridUnsafe {
     }
 
     /**
+     * Copies memory.
+     *
+     * @param src Source.
+     * @param dst Dst.
+     * @param len Length.
+     */
+    public static void copyMemory(long src, long dst, long len) {
+        UNSAFE.copyMemory(src, dst, len);
+    }
+
+    /**
      * Sets all bytes in a given block of memory to a copy of another block.
      *
      * @param srcBase Source base.

http://git-wip-us.apache.org/repos/asf/ignite/blob/f7d89fdb/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java 
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index 4a8a33c..c418acc 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -9183,6 +9183,9 @@ public abstract class IgniteUtils {
      * @throws IgniteCheckedException If failed.
      */
     public static File resolveWorkDirectory(String path, boolean delIfExist) 
throws IgniteCheckedException {
+        if (1 == 1)
+            return resolveWorkDirectory(defaultWorkDirectory(), path, 
delIfExist);
+
         if (path == null) {
             String ggWork0 = igniteWork;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/f7d89fdb/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
index 4030758..9d1bbd8 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
@@ -145,7 +145,7 @@ public class DmlStatementsProcessor {
 
                 if (opCtx == null)
                     // Mimics behavior of GridCacheAdapter#keepBinary and 
GridCacheProxyImpl#keepBinary
-                    newOpCtx = new CacheOperationContext(false, null, true, 
null, false, null);
+                    newOpCtx = new CacheOperationContext(false, null, true, 
null, false, null, false);
                 else if (!opCtx.isKeepBinary())
                     newOpCtx = opCtx.keepBinary();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/f7d89fdb/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 333908f..51f8bef 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -586,7 +586,6 @@ public class GridReduceQueryExecutor {
 
                 IgniteProductVersion minNodeVer = 
cctx.shared().exchange().minimumNodeVersion(topVer);
 
-                final boolean oldStyle = 
minNodeVer.compareToIgnoreTimestamp(DISTRIBUTED_JOIN_SINCE) < 0;
                 final boolean distributedJoins = qry.distributedJoins();
 
                 cancel.set(new Runnable() {
@@ -597,30 +596,18 @@ public class GridReduceQueryExecutor {
 
                 boolean retry = false;
 
-                if (oldStyle && distributedJoins)
-                    throw new CacheException("Failed to enable distributed 
joins. Topology contains older data nodes.");
-
                 if (send(nodes,
-                    oldStyle ?
-                        new GridQueryRequest(qryReqId,
-                            r.pageSize,
-                            space,
-                            mapQrys,
-                            topVer,
-                            extraSpaces(space, qry.spaces()),
-                            null,
-                            timeoutMillis) :
-                        new GridH2QueryRequest()
-                            .requestId(qryReqId)
-                            .topologyVersion(topVer)
-                            .pageSize(r.pageSize)
-                            .caches(qry.caches())
-                            .tables(distributedJoins ? qry.tables() : null)
-                            .partitions(convert(partsMap))
-                            .queries(mapQrys)
-                            .flags(distributedJoins ? 
GridH2QueryRequest.FLAG_DISTRIBUTED_JOINS : 0)
-                            .timeout(timeoutMillis),
-                    oldStyle && partsMap != null ? new 
ExplicitPartitionsSpecializer(partsMap) : null,
+                    new GridH2QueryRequest()
+                        .requestId(qryReqId)
+                        .topologyVersion(topVer)
+                        .pageSize(r.pageSize)
+                        .caches(qry.caches())
+                        .tables(distributedJoins ? qry.tables() : null)
+                        .partitions(convert(partsMap))
+                        .queries(mapQrys)
+                        .flags(distributedJoins ? 
GridH2QueryRequest.FLAG_DISTRIBUTED_JOINS : 0)
+                        .timeout(timeoutMillis),
+                    null,
                     distributedJoins)
                     ) {
                     awaitAllReplies(r, nodes);

Reply via email to