Repository: ignite
Updated Branches:
  refs/heads/ignite-1607 61df8c26d -> 353249795


ignite-1607 WIP


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/35324979
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/35324979
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/35324979

Branch: refs/heads/ignite-1607
Commit: 3532497953875f15a99a466baa33c22fdb962749
Parents: 61df8c2
Author: sboikov <[email protected]>
Authored: Fri Oct 23 17:05:36 2015 +0300
Committer: sboikov <[email protected]>
Committed: Fri Oct 23 17:05:36 2015 +0300

----------------------------------------------------------------------
 .../cache/version/GridCacheVersionManager.java  |  18 +-
 .../datastreamer/DataStreamProcessor.java       |   3 +-
 .../datastreamer/DataStreamerImpl.java          |  36 +---
 .../datastreamer/DataStreamerRequest.java       |  34 +---
 .../datastreamer/DataStreamerUpdateJob.java     |  33 ++--
 .../DataStreamerUpdateAfterLoadTest.java        | 184 +++++++++++++++++++
 .../DateStreamerUpdateAfterLoadTest.java        | 184 -------------------
 .../ignite/testsuites/IgniteCacheTestSuite.java |   4 +-
 8 files changed, 222 insertions(+), 274 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/35324979/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java
index 87fe515..68d03cd 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java
@@ -61,6 +61,9 @@ public class GridCacheVersionManager extends 
GridCacheSharedManagerAdapter {
     private long gridStartTime;
 
     /** */
+    private GridCacheVersion ISOLATED_STREAMER_VER;
+
+    /** */
     private final GridLocalEventListener discoLsnr = new 
GridLocalEventListener() {
         @Override public void onEvent(Event evt) {
             assert evt.type() == EVT_NODE_METRICS_UPDATED;
@@ -154,8 +157,19 @@ public class GridCacheVersionManager extends 
GridCacheSharedManagerAdapter {
      *
      * @return Version for entries loaded with isolated streamer.
      */
-    public GridCacheVersion nextForIsolatedStreamer() {
-        return next(0, true, false);
+    public GridCacheVersion isolatedStreamerVersion() {
+        if (ISOLATED_STREAMER_VER == null) {
+            long topVer = 1;
+
+            if (gridStartTime == 0)
+                gridStartTime = 
cctx.kernalContext().discovery().gridStartTime();
+
+            topVer += (gridStartTime - TOP_VER_BASE_TIME) / 1000;
+
+            ISOLATED_STREAMER_VER = new GridCacheVersion((int)topVer, 0, 0, 1, 
dataCenterId);
+        }
+
+        return ISOLATED_STREAMER_VER;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/35324979/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
index 20a013b..5150d83 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
@@ -289,8 +289,7 @@ public class DataStreamProcessor<K, V> extends 
GridProcessorAdapter {
                 col,
                 req.ignoreDeploymentOwnership(),
                 req.skipStore(),
-                updater,
-                req.version());
+                updater);
 
             Exception err = null;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/35324979/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index 9f07541..2190bf6 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -213,9 +213,6 @@ public class DataStreamerImpl<K, V> implements 
IgniteDataStreamer<K, V>, Delayed
     /** */
     private int maxRemapCnt = DFLT_MAX_REMAP_CNT;
 
-    /** */
-    private GridCacheVersion ver;
-
     /** Whether a warning at {@link DataStreamerImpl#allowOverwrite()} printed 
*/
     private static boolean isWarningPrinted;
 
@@ -302,8 +299,6 @@ public class DataStreamerImpl<K, V> implements 
IgniteDataStreamer<K, V>, Delayed
         fut = new DataStreamerFuture(this);
 
         publicFut = new IgniteCacheFutureImpl<>(fut);
-
-        ver = ctx.cache().context().versions().nextForIsolatedStreamer();
     }
 
     /**
@@ -1247,16 +1242,8 @@ public class DataStreamerImpl<K, V> implements 
IgniteDataStreamer<K, V>, Delayed
             IgniteInternalFuture<Object> fut;
 
             if (isLocNode) {
-                DataStreamerUpdateJob job = new DataStreamerUpdateJob(ctx,
-                    log,
-                    cacheName,
-                    entries,
-                    false,
-                    skipStore,
-                    rcvr,
-                    rcvr == ISOLATED_UPDATER ? ver : null);
-
-                fut = ctx.closure().callLocalSafe(job, false);
+                fut = ctx.closure().callLocalSafe(
+                    new DataStreamerUpdateJob(ctx, log, cacheName, entries, 
false, skipStore, rcvr), false);
 
                 locFuts.add(fut);
 
@@ -1350,8 +1337,7 @@ public class DataStreamerImpl<K, V> implements 
IgniteDataStreamer<K, V>, Delayed
                     dep != null ? dep.participants() : null,
                     dep != null ? dep.classLoaderId() : null,
                     dep == null,
-                    topVer,
-                    rcvr == ISOLATED_UPDATER ? ver : null);
+                    topVer);
 
                 try {
                     ctx.io().send(node, TOPIC_DATASTREAM, req, PUBLIC_POOL);
@@ -1551,7 +1537,7 @@ public class DataStreamerImpl<K, V> implements 
IgniteDataStreamer<K, V>, Delayed
     /**
      * Isolated receiver which only loads entry initial value.
      */
-    static class IsolatedUpdater implements StreamReceiver<KeyCacheObject, 
CacheObject>,
+    private static class IsolatedUpdater implements 
StreamReceiver<KeyCacheObject, CacheObject>,
         DataStreamerCacheUpdaters.InternalUpdater {
         /** */
         private static final long serialVersionUID = 0L;
@@ -1559,17 +1545,6 @@ public class DataStreamerImpl<K, V> implements 
IgniteDataStreamer<K, V>, Delayed
         /** {@inheritDoc} */
         @Override public void receive(IgniteCache<KeyCacheObject, CacheObject> 
cache,
             Collection<Map.Entry<KeyCacheObject, CacheObject>> entries) {
-            receive(cache, entries, null);
-        }
-
-        /**
-         * @param cache Cache.
-         * @param entries Entries.
-         * @param ver Entries version.
-         */
-        void receive(IgniteCache<KeyCacheObject, CacheObject> cache,
-            Collection<Map.Entry<KeyCacheObject, CacheObject>> entries,
-            GridCacheVersion ver) {
             IgniteCacheProxy<KeyCacheObject, CacheObject> proxy = 
(IgniteCacheProxy<KeyCacheObject, CacheObject>)cache;
 
             GridCacheAdapter<KeyCacheObject, CacheObject> internalCache = 
proxy.context().cache();
@@ -1581,8 +1556,7 @@ public class DataStreamerImpl<K, V> implements 
IgniteDataStreamer<K, V>, Delayed
 
             AffinityTopologyVersion topVer = 
cctx.affinity().affinityTopologyVersion();
 
-            if (ver == null)
-                ver = cctx.versions().next(topVer);
+            GridCacheVersion ver = cctx.versions().isolatedStreamerVersion();
 
             long ttl = CU.TTL_ETERNAL;
             long expiryTime = CU.EXPIRE_TIME_ETERNAL;

http://git-wip-us.apache.org/repos/asf/ignite/blob/35324979/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java
index 59810ee..c1a1528 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java
@@ -25,7 +25,6 @@ import org.apache.ignite.configuration.DeploymentMode;
 import org.apache.ignite.internal.GridDirectCollection;
 import org.apache.ignite.internal.GridDirectMap;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.lang.IgniteUuid;
@@ -88,9 +87,6 @@ public class DataStreamerRequest implements Message {
     /** Topology version. */
     private AffinityTopologyVersion topVer;
 
-    /** */
-    private GridCacheVersion ver;
-
     /**
      * {@code Externalizable} support.
      */
@@ -113,7 +109,6 @@ public class DataStreamerRequest implements Message {
      * @param clsLdrId Class loader ID.
      * @param forceLocDep Force local deployment.
      * @param topVer Topology version.
-     * @param ver Entries version for {@link DataStreamerImpl.IsolatedUpdater}.
      */
     public DataStreamerRequest(long reqId,
         byte[] resTopicBytes,
@@ -128,8 +123,7 @@ public class DataStreamerRequest implements Message {
         Map<UUID, IgniteUuid> ldrParticipants,
         IgniteUuid clsLdrId,
         boolean forceLocDep,
-        @NotNull AffinityTopologyVersion topVer,
-        @Nullable GridCacheVersion ver) {
+        @NotNull AffinityTopologyVersion topVer) {
         assert topVer != null;
 
         this.reqId = reqId;
@@ -146,14 +140,6 @@ public class DataStreamerRequest implements Message {
         this.clsLdrId = clsLdrId;
         this.forceLocDep = forceLocDep;
         this.topVer = topVer;
-        this.ver = ver;
-    }
-
-    /**
-     * @return Version.
-     */
-    @Nullable public GridCacheVersion version() {
-        return ver;
     }
 
     /**
@@ -355,12 +341,6 @@ public class DataStreamerRequest implements Message {
 
                 writer.incrementState();
 
-            case 14:
-                if (!writer.writeMessage("ver", ver))
-                    return false;
-
-                writer.incrementState();
-
         }
 
         return true;
@@ -490,14 +470,6 @@ public class DataStreamerRequest implements Message {
 
                 reader.incrementState();
 
-            case 14:
-                ver = reader.readMessage("ver");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
         }
 
         return reader.afterMessageRead(DataStreamerRequest.class);
@@ -510,6 +482,6 @@ public class DataStreamerRequest implements Message {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 15;
+        return 14;
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/35324979/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerUpdateJob.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerUpdateJob.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerUpdateJob.java
index 12eee88..42084a3 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerUpdateJob.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerUpdateJob.java
@@ -24,7 +24,6 @@ import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.lang.GridPlainCallable;
 import org.apache.ignite.internal.util.typedef.C1;
 import org.apache.ignite.internal.util.typedef.F;
@@ -57,9 +56,6 @@ class DataStreamerUpdateJob implements 
GridPlainCallable<Object> {
     /** */
     private final StreamReceiver rcvr;
 
-    /** */
-    private final GridCacheVersion ver;
-
     /**
      * @param ctx Context.
      * @param log Log.
@@ -68,7 +64,6 @@ class DataStreamerUpdateJob implements 
GridPlainCallable<Object> {
      * @param ignoreDepOwnership {@code True} to ignore deployment ownership.
      * @param skipStore Skip store flag.
      * @param rcvr Updater.
-     * @param ver Entries version for {@link DataStreamerImpl.IsolatedUpdater}.
      */
     DataStreamerUpdateJob(
         GridKernalContext ctx,
@@ -77,8 +72,7 @@ class DataStreamerUpdateJob implements 
GridPlainCallable<Object> {
         Collection<DataStreamerEntry> col,
         boolean ignoreDepOwnership,
         boolean skipStore,
-        StreamReceiver<?, ?> rcvr,
-        @Nullable GridCacheVersion ver) {
+        StreamReceiver<?, ?> rcvr) {
         this.ctx = ctx;
         this.log = log;
 
@@ -90,7 +84,6 @@ class DataStreamerUpdateJob implements 
GridPlainCallable<Object> {
         this.ignoreDepOwnership = ignoreDepOwnership;
         this.skipStore = skipStore;
         this.rcvr = rcvr;
-        this.ver = ver;
     }
 
     /** {@inheritDoc} */
@@ -126,21 +119,17 @@ class DataStreamerUpdateJob implements 
GridPlainCallable<Object> {
                     checkSecurityPermission(SecurityPermission.CACHE_REMOVE);
             }
 
-            if (rcvr instanceof DataStreamerImpl.IsolatedUpdater)
-                ((DataStreamerImpl.IsolatedUpdater)rcvr).receive(cache, 
(Collection)col, ver);
-            else {
-                if (unwrapEntries()) {
-                    Collection<Map.Entry> col0 = F.viewReadOnly(col, new 
C1<DataStreamerEntry, Map.Entry>() {
-                        @Override public Map.Entry apply(DataStreamerEntry e) {
-                            return e.toEntry(cctx);
-                        }
-                    });
-
-                    rcvr.receive(cache, col0);
-                }
-                else
-                    rcvr.receive(cache, col);
+            if (unwrapEntries()) {
+                Collection<Map.Entry> col0 = F.viewReadOnly(col, new 
C1<DataStreamerEntry, Map.Entry>() {
+                    @Override public Map.Entry apply(DataStreamerEntry e) {
+                        return e.toEntry(cctx);
+                    }
+                });
+
+                rcvr.receive(cache, col0);
             }
+            else
+                rcvr.receive(cache, col);
 
             return null;
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/35324979/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerUpdateAfterLoadTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerUpdateAfterLoadTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerUpdateAfterLoadTest.java
new file mode 100644
index 0000000..32e204b
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerUpdateAfterLoadTest.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.datastreamer;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.cache.CacheAtomicWriteOrderMode;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK;
+import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ *
+ */
+public class DataStreamerUpdateAfterLoadTest extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new 
TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private boolean client;
+
+    /** */
+    private static final int NODES = 4;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
+
+        cfg.setClientMode(client);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGridsMultiThreaded(NODES - 1);
+
+        client = true;
+
+        startGrid(NODES - 1);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+
+        super.afterTestsStopped();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testUpdateAfterLoad() throws Exception {
+        Ignite ignite0 = ignite(0);
+
+        for (CacheConfiguration<Integer, Integer> ccfg : 
cacheConfigurations()) {
+            int key = 0;
+
+            try (IgniteCache<Integer, Integer> cache = 
ignite0.createCache(ccfg)) {
+                key = testLoadAndUpdate(cache.getName(), key, false);
+
+                testLoadAndUpdate(cache.getName(), key, true);
+
+                ignite0.destroyCache(cache.getName());
+            }
+        }
+    }
+
+    /**
+     * @param cacheName Cache name.
+     * @param key Key.
+     * @param allowOverwrite Streamer flag.
+     * @return Next key.
+     * @throws Exception If failed.
+     */
+    private int testLoadAndUpdate(String cacheName, int key, boolean 
allowOverwrite) throws Exception {
+        for (int loadNode = 0; loadNode < NODES; loadNode++) {
+            Ignite loadIgnite = ignite(loadNode);
+
+            for (int updateNode = 0; updateNode < NODES; updateNode++) {
+                try (IgniteDataStreamer<Integer, Integer> streamer = 
loadIgnite.dataStreamer(cacheName)) {
+                    streamer.allowOverwrite(allowOverwrite);
+
+                    streamer.addData(key, key);
+                }
+
+                Ignite updateIgnite = ignite(updateNode);
+
+                IgniteCache<Integer, Integer> cache = 
updateIgnite.cache(cacheName);
+
+                if (allowOverwrite)
+                    atomicClockModeDelay(cache);
+
+                updateIgnite.cache(cacheName).put(key, key + 1);
+
+                checkValue(key, key + 1, cacheName);
+
+                key++;
+            }
+        }
+
+        return key;
+    }
+
+    /**
+     * @param key Key.
+     * @param val Value.
+     * @param cacheName Cache name.
+     */
+    private void checkValue(Integer key, Integer val, String cacheName) {
+        for (int i = 0; i < NODES; i++) {
+            IgniteCache<Integer, Integer> cache = ignite(i).cache(cacheName);
+
+            assertEquals("Unexpected value " + i, val, cache.get(key));
+        }
+    }
+
+    /**
+     * @return Cache configurations to test.
+     */
+    private List<CacheConfiguration<Integer, Integer>> cacheConfigurations() {
+        List<CacheConfiguration<Integer, Integer>> ccfgs = new ArrayList<>();
+
+        ccfgs.add(cacheConfiguration(CacheAtomicityMode.ATOMIC, PRIMARY, 1, 
"cache-" + ccfgs.size()));
+        ccfgs.add(cacheConfiguration(CacheAtomicityMode.ATOMIC, PRIMARY, 0, 
"cache-" + ccfgs.size()));
+        ccfgs.add(cacheConfiguration(CacheAtomicityMode.ATOMIC, CLOCK, 1, 
"cache-" + ccfgs.size()));
+        ccfgs.add(cacheConfiguration(CacheAtomicityMode.TRANSACTIONAL, null, 
1, "cache-" + ccfgs.size()));
+        ccfgs.add(cacheConfiguration(CacheAtomicityMode.TRANSACTIONAL, null, 
0, "cache-" + ccfgs.size()));
+
+        return ccfgs;
+    }
+
+    /**
+     * @param atomicityMode Cache atomicity mode.
+     * @param writeOrderMode Cache write order mode.
+     * @param backups Number of backups.
+     * @param name Cache name.
+     * @return Cache configuration.
+     */
+    private CacheConfiguration<Integer, Integer> 
cacheConfiguration(CacheAtomicityMode atomicityMode,
+        CacheAtomicWriteOrderMode writeOrderMode,
+        int backups,
+        String name) {
+        CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>();
+
+        ccfg.setName(name);
+        ccfg.setAtomicityMode(atomicityMode);
+        ccfg.setBackups(backups);
+        ccfg.setAtomicWriteOrderMode(writeOrderMode);
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+
+        return ccfg;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/35324979/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DateStreamerUpdateAfterLoadTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DateStreamerUpdateAfterLoadTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DateStreamerUpdateAfterLoadTest.java
deleted file mode 100644
index fc3e9e0..0000000
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DateStreamerUpdateAfterLoadTest.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.datastreamer;
-
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteCache;
-import org.apache.ignite.IgniteDataStreamer;
-import org.apache.ignite.cache.CacheAtomicWriteOrderMode;
-import org.apache.ignite.cache.CacheAtomicityMode;
-import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
-import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-
-import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK;
-import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY;
-import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
-
-/**
- *
- */
-public class DateStreamerUpdateAfterLoadTest extends GridCommonAbstractTest {
-    /** */
-    private static final TcpDiscoveryIpFinder IP_FINDER = new 
TcpDiscoveryVmIpFinder(true);
-
-    /** */
-    private boolean client;
-
-    /** */
-    private static final int NODES = 4;
-
-    /** {@inheritDoc} */
-    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
-        IgniteConfiguration cfg = super.getConfiguration(gridName);
-
-        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
-
-        cfg.setClientMode(client);
-
-        return cfg;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void beforeTestsStarted() throws Exception {
-        super.beforeTestsStarted();
-
-        startGridsMultiThreaded(NODES - 1);
-
-        client = true;
-
-        startGrid(NODES - 1);
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void afterTestsStopped() throws Exception {
-        stopAllGrids();
-
-        super.afterTestsStopped();
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testUpdateAfterLoad() throws Exception {
-        Ignite ignite0 = ignite(0);
-
-        for (CacheConfiguration<Integer, Integer> ccfg : 
cacheConfigurations()) {
-            int key = 0;
-
-            try (IgniteCache<Integer, Integer> cache = 
ignite0.createCache(ccfg)) {
-                key = testLoadAndUpdate(cache.getName(), key, false);
-
-                testLoadAndUpdate(cache.getName(), key, true);
-
-                ignite0.destroyCache(cache.getName());
-            }
-        }
-    }
-
-    /**
-     * @param cacheName Cache name.
-     * @param key Key.
-     * @param allowOverwrite Streamer flag.
-     * @return Next key.
-     * @throws Exception If failed.
-     */
-    private int testLoadAndUpdate(String cacheName, int key, boolean 
allowOverwrite) throws Exception {
-        for (int loadNode = 0; loadNode < NODES; loadNode++) {
-            Ignite loadIgnite = ignite(loadNode);
-
-            for (int updateNode = 0; updateNode < NODES; updateNode++) {
-                try (IgniteDataStreamer<Integer, Integer> streamer = 
loadIgnite.dataStreamer(cacheName)) {
-                    streamer.allowOverwrite(allowOverwrite);
-
-                    streamer.addData(key, key);
-                }
-
-                Ignite updateIgnite = ignite(updateNode);
-
-                IgniteCache<Integer, Integer> cache = 
updateIgnite.cache(cacheName);
-
-                if (allowOverwrite)
-                    atomicClockModeDelay(cache);
-
-                updateIgnite.cache(cacheName).put(key, key + 1);
-
-                checkValue(key, key + 1, cacheName);
-
-                key++;
-            }
-        }
-
-        return key;
-    }
-
-    /**
-     * @param key Key.
-     * @param val Value.
-     * @param cacheName Cache name.
-     */
-    private void checkValue(Integer key, Integer val, String cacheName) {
-        for (int i = 0; i < NODES; i++) {
-            IgniteCache<Integer, Integer> cache = ignite(i).cache(cacheName);
-
-            assertEquals("Unexpected value " + i, val, cache.get(key));
-        }
-    }
-
-    /**
-     * @return Cache configurations to test.
-     */
-    private List<CacheConfiguration<Integer, Integer>> cacheConfigurations() {
-        List<CacheConfiguration<Integer, Integer>> ccfgs = new ArrayList<>();
-
-        ccfgs.add(cacheConfiguration(CacheAtomicityMode.ATOMIC, PRIMARY, 1, 
"cache-" + ccfgs.size()));
-        ccfgs.add(cacheConfiguration(CacheAtomicityMode.ATOMIC, PRIMARY, 0, 
"cache-" + ccfgs.size()));
-        ccfgs.add(cacheConfiguration(CacheAtomicityMode.ATOMIC, CLOCK, 1, 
"cache-" + ccfgs.size()));
-        ccfgs.add(cacheConfiguration(CacheAtomicityMode.TRANSACTIONAL, null, 
1, "cache-" + ccfgs.size()));
-        ccfgs.add(cacheConfiguration(CacheAtomicityMode.TRANSACTIONAL, null, 
0, "cache-" + ccfgs.size()));
-
-        return ccfgs;
-    }
-
-    /**
-     * @param atomicityMode Cache atomicity mode.
-     * @param writeOrderMode Cache write order mode.
-     * @param backups Number of backups.
-     * @param name Cache name.
-     * @return Cache configuration.
-     */
-    private CacheConfiguration<Integer, Integer> 
cacheConfiguration(CacheAtomicityMode atomicityMode,
-        CacheAtomicWriteOrderMode writeOrderMode,
-        int backups,
-        String name) {
-        CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>();
-
-        ccfg.setName(name);
-        ccfg.setAtomicityMode(atomicityMode);
-        ccfg.setBackups(backups);
-        ccfg.setAtomicWriteOrderMode(writeOrderMode);
-        ccfg.setWriteSynchronizationMode(FULL_SYNC);
-
-        return ccfg;
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/35324979/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
index 8f719cf..7925642 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
@@ -122,7 +122,7 @@ import 
org.apache.ignite.internal.processors.datastreamer.DataStreamProcessorSel
 import 
org.apache.ignite.internal.processors.datastreamer.DataStreamerImplSelfTest;
 import 
org.apache.ignite.internal.processors.datastreamer.DataStreamerMultiThreadedSelfTest;
 import 
org.apache.ignite.internal.processors.datastreamer.DataStreamerMultinodeCreateCacheTest;
-import 
org.apache.ignite.internal.processors.datastreamer.DateStreamerUpdateAfterLoadTest;
+import 
org.apache.ignite.internal.processors.datastreamer.DataStreamerUpdateAfterLoadTest;
 import org.apache.ignite.testframework.GridTestUtils;
 
 /**
@@ -214,7 +214,7 @@ public class IgniteCacheTestSuite extends TestSuite {
         suite.addTestSuite(GridCacheAffinityApiSelfTest.class);
         suite.addTestSuite(GridCacheStoreValueBytesSelfTest.class);
         GridTestUtils.addTestIfNeeded(suite, 
DataStreamProcessorSelfTest.class, ignoredTests);
-        GridTestUtils.addTestIfNeeded(suite, 
DateStreamerUpdateAfterLoadTest.class, ignoredTests);
+        GridTestUtils.addTestIfNeeded(suite, 
DataStreamerUpdateAfterLoadTest.class, ignoredTests);
             suite.addTestSuite(DataStreamerMultiThreadedSelfTest.class);
         suite.addTestSuite(DataStreamerMultinodeCreateCacheTest.class);
         suite.addTestSuite(DataStreamerImplSelfTest.class);

Reply via email to