ignite-3478

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b72f362d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b72f362d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b72f362d

Branch: refs/heads/ignite-3479
Commit: b72f362d3d9c25a7f53a19a81a8294fe247f454d
Parents: 666a6ac
Author: sboikov <sboi...@gridgain.com>
Authored: Fri Sep 22 13:08:36 2017 +0300
Committer: sboikov <sboi...@gridgain.com>
Committed: Fri Sep 22 13:08:36 2017 +0300

----------------------------------------------------------------------
 .../org/apache/ignite/internal/MvccTestApp.java | 1689 -----------------
 .../apache/ignite/internal/MvccTestApp2.java    | 1750 ------------------
 .../apache/ignite/internal/MvccTestApp3.java    | 1713 -----------------
 3 files changed, 5152 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/b72f362d/modules/core/src/main/java/org/apache/ignite/internal/MvccTestApp.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/MvccTestApp.java 
b/modules/core/src/main/java/org/apache/ignite/internal/MvccTestApp.java
deleted file mode 100644
index d384339..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/MvccTestApp.java
+++ /dev/null
@@ -1,1689 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal;
-
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.TreeSet;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.util.GridAtomicLong;
-import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.jetbrains.annotations.NotNull;
-import org.jsr166.ConcurrentHashMap8;
-
-/**
- *
- */
-public class MvccTestApp {
-    /** */
-    private static final boolean DEBUG_LOG = false;
-
-    /** */
-    private static final boolean SQL = false;
-
-    public static void main1(String[] args) throws Exception {
-        final MvccTestApp.TestCluster cluster = new MvccTestApp.TestCluster(1);
-
-        final int ACCOUNTS = 3;
-
-        final int START_VAL = 10;
-
-        final Map<Object, Object> data = new TreeMap<>();
-
-        for (int i = 0; i < ACCOUNTS; i++)
-            data.put(i, START_VAL);
-
-        cluster.txPutAll(data);
-
-        cluster.txTransfer(0, 1, true);
-        cluster.txTransfer(0, 1, true);
-        cluster.txTransfer(0, 2, true);
-
-        Map<Object, Object> vals = cluster.sqlAll();
-
-        System.out.println();
-
-        Map<Object, Object> getData = 
cluster.sqlAll();;//cluster.getAll(data.keySet());
-
-        int sum = 0;
-
-        for (int i = 0; i < ACCOUNTS; i++) {
-            Integer val = (Integer)getData.get(i);
-
-            sum += val;
-
-            System.out.println("Val: " + val);
-        }
-
-        System.out.println("Sum: " + sum);
-
-        cluster.cleanup();
-
-        getData = cluster.sqlAll();
-
-        System.out.println();
-//
-//        MvccQueryVersion ver1 = cluster.crd.queryVersion();
-//        MvccQueryVersion ver2 = cluster.crd.queryVersion();
-//
-//        cluster.crd.queryDone(ver2.cntr);
-//        cluster.crd.queryDone(ver1.cntr);
-    }
-
-    public static void main0(String[] args) throws Exception {
-        final MvccTestApp.TestCluster cluster = new MvccTestApp.TestCluster(1);
-
-        final int ACCOUNTS = 3;
-
-        final int START_VAL = 10;
-
-        final Map<Object, Object> data = new TreeMap<>();
-
-        for (int i = 0; i < ACCOUNTS; i++)
-            data.put(i, START_VAL);
-
-        cluster.txPutAll(data);
-
-        cluster.txRemoveTransfer(0, 1);
-
-        Map<Object, Object> getData = 
cluster.sqlAll();;//cluster.getAll(data.keySet());
-
-        int sum = 0;
-
-        for (Map.Entry<Object, Object> e : getData.entrySet()) {
-            Integer val = (Integer)e.getValue();
-
-            if (val != null)
-                sum += val;
-
-            System.out.println("Val: " + val);
-        }
-
-        System.out.println("Sum: " + sum);
-
-        cluster.cleanup();
-
-        getData = cluster.sqlAll();
-
-        System.out.println();
-//
-//        MvccQueryVersion ver1 = cluster.crd.queryVersion();
-//        MvccQueryVersion ver2 = cluster.crd.queryVersion();
-//
-//        cluster.crd.queryDone(ver2.cntr);
-//        cluster.crd.queryDone(ver1.cntr);
-    }
-
-    public static void main(String[] args) throws Exception {
-        final AtomicBoolean err = new AtomicBoolean();
-
-        final int READ_THREADS = 4;
-        final int UPDATE_THREADS = 4;
-        final int ACCOUNTS = 50;
-
-        final int START_VAL = 100000;
-
-        for (int iter = 0; iter < 1000; iter++) {
-            System.out.println("Iteration [readThreads=" + READ_THREADS +
-                ", updateThreads=" + UPDATE_THREADS + ", accounts=" + ACCOUNTS 
+ ", iter=" + iter + ']');
-
-            final TestCluster cluster = new TestCluster(1);
-
-            final Map<Object, Object> data = new TreeMap<>();
-
-            for (int i = 0; i < ACCOUNTS; i++)
-                data.put(i, START_VAL);
-
-            cluster.txPutAll(data);
-
-            final AtomicBoolean stop = new AtomicBoolean();
-
-            List<Thread> threads = new ArrayList<>();
-
-            Thread cleanupThread = new Thread(new Runnable() {
-                @Override public void run() {
-                    Thread.currentThread().setName("cleanup");
-
-                    try {
-                        while (!stop.get()) {
-                            cluster.cleanup();
-
-                            Thread.sleep(1);
-                        }
-                    }
-                    catch (Exception e) {
-                        e.printStackTrace();
-                    }
-                }
-            });
-
-            threads.add(cleanupThread);
-
-            cleanupThread.start();
-
-            final boolean REMOVES = false;
-
-            for (int i = 0; i < READ_THREADS; i++) {
-                final int id = i;
-
-                Thread thread = new Thread(new Runnable() {
-                    @Override public void run() {
-                        Thread.currentThread().setName("read" + id);
-
-                        int cnt = 0;
-
-                        while (!stop.get()) {
-                            Map<Object, Object> qryData = SQL ? 
cluster.sqlAll() : cluster.getAll(data.keySet());
-
-                            cnt++;
-
-                            int sum = 0;
-
-                            if (REMOVES) {
-                                for (Map.Entry<Object, Object> e : 
qryData.entrySet()) {
-                                    Integer val = (Integer)e.getValue();
-
-                                    if (val != null)
-                                        sum += val;
-                                    else
-                                        System.out.println("With null");
-                                }
-                            }
-                            else {
-                                for (int i = 0; i < ACCOUNTS; i++) {
-                                    Integer val = (Integer)qryData.get(i);
-
-                                    if (val == null) {
-                                        if (stop.compareAndSet(false, true)) {
-                                            stop.set(true);
-                                            err.set(true);
-
-                                            TestDebugLog.printAllAndExit("No 
value for key: " + i);
-                                        }
-                                    }
-
-                                    sum += val;
-                                }
-                            }
-
-                            if (sum != ACCOUNTS * START_VAL) {
-                                if (stop.compareAndSet(false, true)) {
-                                    stop.set(true);
-                                    err.set(true);
-
-                                    TestDebugLog.printAllAndExit("Invalid get 
sum: " + sum);
-                                }
-                            }
-
-//                            if (cnt % 100 == 0)
-//                                System.out.println("get " + cnt);
-                        }
-
-                        System.out.println("Get cnt: " + cnt);
-                    }
-                });
-
-                threads.add(thread);
-
-                thread.start();
-            }
-
-            for (int i = 0; i < UPDATE_THREADS; i++) {
-                final int id = i;
-
-                Thread thread;
-
-                if (REMOVES) {
-                    thread = new Thread(new Runnable() {
-                        @Override public void run() {
-                            Thread.currentThread().setName("update" + id);
-
-                            ThreadLocalRandom rnd = 
ThreadLocalRandom.current();
-
-                            while (!stop.get()) {
-                                int id1 = rnd.nextInt(ACCOUNTS);
-
-                                int id2 = rnd.nextInt(ACCOUNTS);
-
-                                while (id2 == id1)
-                                    id2 = rnd.nextInt(ACCOUNTS);
-
-                                if (rnd.nextBoolean()) {
-                                    cluster.txRemoveTransfer(id1, id2);
-                                }
-                                else
-                                    cluster.txTransfer(id1, id2, 
rnd.nextBoolean());
-                            }
-
-                        }
-                    });
-                }
-                else {
-                    thread = new Thread(new Runnable() {
-                        @Override public void run() {
-                            Thread.currentThread().setName("update" + id);
-
-                            ThreadLocalRandom rnd = 
ThreadLocalRandom.current();
-
-                            while (!stop.get()) {
-                                int id1 = rnd.nextInt(ACCOUNTS);
-
-                                int id2 = rnd.nextInt(ACCOUNTS);
-
-                                while (id2 == id1)
-                                    id2 = rnd.nextInt(ACCOUNTS);
-
-                                if (id1 > id2) {
-                                    int tmp = id1;
-                                    id1 = id2;
-                                    id2 = tmp;
-                                }
-
-                                cluster.txTransfer(id1, id2, 
rnd.nextBoolean());
-                            }
-
-                        }
-                    });
-                }
-
-                threads.add(thread);
-
-                thread.start();
-            }
-
-            long endTime = System.currentTimeMillis() + 2_000;
-
-            while (!stop.get()) {
-                Thread.sleep(1000);
-
-                if (System.currentTimeMillis() >= endTime)
-                    break;
-
-                //cluster.dumpMvccInfo();
-            }
-
-            stop.set(true);
-
-            for (Thread thread : threads)
-                thread.join();
-
-            Map<Object, Object> qryData = SQL ? cluster.sqlAll() : 
cluster.getAll(data.keySet());
-
-            int sum = 0;
-
-            for (int i = 0; i < ACCOUNTS; i++) {
-                Integer val = (Integer)qryData.get(i);
-
-                System.out.println("Val " + val);
-
-                if (val != null)
-                    sum += val;
-            }
-
-            System.out.println("Sum=" + sum + ", expSum=" + (ACCOUNTS * 
START_VAL));
-
-            if (err.get()) {
-                System.out.println("Error!");
-
-                System.exit(1);
-            }
-
-//            cluster.dumpMvccInfo();
-//
-//            System.out.println("Cleanup");
-//
-//            cluster.cleanup();
-//
-//            cluster.dumpMvccInfo();
-
-            TestDebugLog.clear();
-        }
-    }
-
-    /**
-     *
-     */
-    static class TestCluster {
-        /** */
-        final List<Node> nodes = new ArrayList<>();
-
-        /** */
-        final Coordinator crd;
-
-        /** */
-        final AtomicLong txIdGen = new AtomicLong(10_000);
-
-        TestCluster(int nodesNum) {
-            crd = new Coordinator();
-
-            for (int i = 0; i < nodesNum; i++)
-                nodes.add(new Node(i));
-        }
-
-        void cleanup() {
-            CoordinatorCounter cntr = crd.cleanupVersion();
-
-            for (Node node : nodes)
-                node.dataStore.cleanup(cntr);
-        }
-
-        void txPutAll(Map<Object, Object> data) {
-            TxId txId = new TxId(txIdGen.incrementAndGet());
-
-            Map<Object, Node> mappedEntries = new LinkedHashMap<>();
-
-            for (Object key : data.keySet()) {
-                int nodeIdx = nodeForKey(key);
-
-                Node node = nodes.get(nodeIdx);
-
-                node.dataStore.lockEntry(key);
-
-                mappedEntries.put(key, node);
-            }
-
-            CoordinatorCounter cntr = crd.nextTxCounter(txId);
-
-            MvccUpdateVersion mvccVer = new MvccUpdateVersion(cntr, txId);
-
-            for (Map.Entry<Object, Node> e : mappedEntries.entrySet()) {
-                Node node = e.getValue();
-
-                node.dataStore.updateEntry(e.getKey(), data.get(e.getKey()), 
mvccVer);
-            }
-
-            for (Map.Entry<Object, Node> e : mappedEntries.entrySet()) {
-                Node node = e.getValue();
-
-                node.dataStore.unlockEntry(e.getKey());
-            }
-
-            crd.txDone(txId);
-        }
-
-        void txTransfer(Integer id1, Integer id2, boolean fromFirst) {
-            TreeSet<Integer> keys = new TreeSet<>();
-
-            keys.add(id1);
-            keys.add(id2);
-
-            TxId txId = new TxId(txIdGen.incrementAndGet());
-
-            Map<Object, Node> mappedEntries = new LinkedHashMap<>();
-
-            Map<Object, Object> vals = new HashMap<>();
-
-            for (Object key : keys) {
-                int nodeIdx = nodeForKey(key);
-
-                Node node = nodes.get(nodeIdx);
-
-                node.dataStore.lockEntry(key);
-
-                vals.put(key, node.dataStore.lastValue(key));
-
-                mappedEntries.put(key, node);
-            }
-
-            CoordinatorCounter cntr = crd.nextTxCounter(txId);
-
-            Integer curVal1 = (Integer)vals.get(id1);
-            Integer curVal2 = (Integer)vals.get(id2);
-
-            boolean update = false;
-
-            Integer newVal1 = null;
-            Integer newVal2 = null;
-
-            if (curVal1 != null && curVal2 != null) {
-                if (fromFirst) {
-                    if (curVal1 > 0) {
-                        update = true;
-
-                        newVal1 = curVal1 - 1;
-                        newVal2 = curVal2 + 1;
-                    }
-                }
-                else {
-                    if (curVal2 > 0) {
-                        update = true;
-
-                        newVal1 = curVal1 + 1;
-                        newVal2 = curVal2 - 1;
-                    }
-                }
-            }
-
-            if (update) {
-                Map<Object, Object> newVals = new HashMap<>();
-
-                newVals.put(id1, newVal1);
-                newVals.put(id2, newVal2);
-
-                MvccUpdateVersion mvccVer = new MvccUpdateVersion(cntr, txId);
-
-                if (DEBUG_LOG) {
-                    TestDebugLog.msgs.add(new TestDebugLog.Msg6("update", 
txId, id1, newVal1, id2, newVal2, cntr));
-                }
-
-                for (Map.Entry<Object, Node> e : mappedEntries.entrySet()) {
-                    Node node = e.getValue();
-
-                    node.dataStore.updateEntry(e.getKey(), 
newVals.get(e.getKey()), mvccVer);
-                }
-
-                for (Map.Entry<Object, Node> e : mappedEntries.entrySet()) {
-                    Node node = e.getValue();
-
-                    node.dataStore.unlockEntry(e.getKey());
-                }
-            }
-            else {
-                for (Map.Entry<Object, Node> e : mappedEntries.entrySet()) {
-                    Node node = e.getValue();
-
-                    node.dataStore.unlockEntry(e.getKey());
-                }
-            }
-
-            crd.txDone(txId);
-
-            if (DEBUG_LOG)
-                TestDebugLog.msgs.add(new TestDebugLog.Msg2("tx done", txId, 
cntr.cntr));
-        }
-
-        void txRemoveTransfer(Integer from, Integer to) {
-            TreeSet<Integer> keys = new TreeSet<>();
-
-            keys.add(from);
-            keys.add(to);
-
-            TxId txId = new TxId(txIdGen.incrementAndGet());
-
-            Map<Object, Node> mappedEntries = new LinkedHashMap<>();
-
-            Map<Object, Object> vals = new HashMap<>();
-
-            for (Object key : keys) {
-                int nodeIdx = nodeForKey(key);
-
-                Node node = nodes.get(nodeIdx);
-
-                node.dataStore.lockEntry(key);
-
-                vals.put(key, node.dataStore.lastValue(key));
-
-                mappedEntries.put(key, node);
-            }
-
-            CoordinatorCounter cntr = crd.nextTxCounter(txId);
-
-            Integer fromVal = (Integer)vals.get(from);
-            Integer toVal = (Integer)vals.get(to);
-
-            boolean update = fromVal != null && toVal != null;
-
-            if (update) {
-                Map<Object, Object> newVals = new HashMap<>();
-
-                newVals.put(from, null);
-                newVals.put(to, fromVal + toVal);
-
-                MvccUpdateVersion mvccVer = new MvccUpdateVersion(cntr, txId);
-
-                if (DEBUG_LOG) {
-                    TestDebugLog.msgs.add(new TestDebugLog.Msg6("remove", 
txId, from, fromVal, to, toVal, cntr));
-                }
-
-                for (Map.Entry<Object, Node> e : mappedEntries.entrySet()) {
-                    Node node = e.getValue();
-
-                    node.dataStore.updateEntry(e.getKey(), 
newVals.get(e.getKey()), mvccVer);
-                }
-
-                for (Map.Entry<Object, Node> e : mappedEntries.entrySet()) {
-                    Node node = e.getValue();
-
-                    node.dataStore.unlockEntry(e.getKey());
-                }
-            }
-            else {
-                for (Map.Entry<Object, Node> e : mappedEntries.entrySet()) {
-                    Node node = e.getValue();
-
-                    node.dataStore.unlockEntry(e.getKey());
-                }
-            }
-
-            crd.txDone(txId);
-
-            if (DEBUG_LOG)
-                TestDebugLog.msgs.add(new TestDebugLog.Msg2("tx done", txId, 
cntr.cntr));
-        }
-
-        public void dumpMvccInfo() {
-            for (Node node : nodes) {
-                int sql = node.dataStore.mvccSqlIdx.size();
-
-                for (Map.Entry<Object, MvccValue> e : 
node.dataStore.mainIdx.entrySet()) {
-                    List<MvccValue> list = 
node.dataStore.mvccIdx.get(e.getKey());
-
-                    int size = 0;
-
-                    if (list != null) {
-                        synchronized (list) {
-                            size = list.size();
-                        }
-                    }
-
-                    System.out.println("Mvcc info [key=" + e.getKey() +
-                        ", val=" + e.getValue() +
-                        ", mvccVals=" + size +
-                        ", sqlVals=" + sql + ']');
-                }
-            }
-        }
-
-        public Map<Object, Object> sqlAll() {
-            MvccQueryVersion qryVer = crd.queryVersion();
-
-            Map<Object, Object> res = new HashMap<>();
-
-            for (Node node : nodes) {
-                Map<Object, Object> nodeRes = node.dataStore.sqlQuery(qryVer);
-
-                res.putAll(nodeRes);
-            }
-
-            crd.queryDone(qryVer.cntr);
-
-            if (DEBUG_LOG) {
-                TestDebugLog.msgs.add(new TestDebugLog.Msg3("sqlAll", 
qryVer.cntr, qryVer.activeTxs, res));
-            }
-
-            return res;
-        }
-
-        public Map<Object, Object> getAll(Set<?> keys) {
-            MvccQueryVersion qryVer = crd.queryVersion();
-
-            Map<Object, Object> res = new HashMap<>();
-
-            for (Object key : keys) {
-                int nodeIdx = nodeForKey(key);
-
-                Node node = nodes.get(nodeIdx);
-
-                Object val = node.dataStore.get(key, qryVer);
-
-                res.put(key, val);
-            }
-
-            crd.queryDone(qryVer.cntr);
-
-            if (DEBUG_LOG) {
-                TestDebugLog.msgs.add(new TestDebugLog.Msg3("getAll", 
qryVer.cntr, qryVer.activeTxs, res));
-            }
-
-            return res;
-        }
-
-        private int nodeForKey(Object key) {
-            return U.safeAbs(key.hashCode()) % nodes.size();
-        }
-    }
-
-    /**
-     *
-     */
-    static class Node {
-        /** */
-        final DataStore dataStore;
-
-        /** */
-        final int nodexIdx;
-
-        public Node(int nodexIdx) {
-            this.nodexIdx = nodexIdx;
-
-            dataStore = new DataStore();
-        }
-
-        @Override public String toString() {
-            return "Node [idx=" + nodexIdx + ']';
-        }
-    }
-
-    /**
-     *
-     */
-    static class Coordinator {
-        /** */
-        private final AtomicLong cntr = new AtomicLong(-1);
-
-        /** */
-        private final GridAtomicLong commitCntr = new GridAtomicLong(-1);
-
-        /** */
-        private final ConcurrentHashMap8<Long, QueryCounter> activeQueries = 
new ConcurrentHashMap8<>();
-
-        /** */
-        @GridToStringInclude
-        private final ConcurrentHashMap8<TxId, TxId> activeTxs = new 
ConcurrentHashMap8<>();
-
-        CoordinatorCounter nextTxCounter(TxId txId) {
-            activeTxs.put(txId, txId);
-
-            CoordinatorCounter newCtr = new 
CoordinatorCounter(cntr.incrementAndGet());
-
-            txId.cntr = newCtr.cntr;
-
-            return newCtr;
-        }
-
-        void txDone(TxId txId) {
-            TxId cntr = activeTxs.remove(txId);
-
-            assert cntr != null && cntr.cntr != -1L;
-
-            commitCntr.setIfGreater(cntr.cntr);
-        }
-
-        private Long minActive(Set<TxId> txs) {
-            Long minActive = null;
-
-            for (Map.Entry<TxId, TxId> e : activeTxs.entrySet()) {
-                if (txs != null)
-                    txs.add(e.getKey());
-
-                TxId val = e.getValue();
-
-                while (val.cntr == -1)
-                    Thread.yield();
-
-                long cntr = val.cntr;
-
-                if (minActive == null)
-                    minActive = cntr;
-                else if (cntr < minActive)
-                    minActive = cntr;
-            }
-
-            return minActive;
-        }
-
-        static class QueryCounter extends AtomicInteger {
-            public QueryCounter(int initialValue) {
-                super(initialValue);
-            }
-
-            boolean increment2() {
-                for (;;) {
-                    int current = get();
-                    int next = current + 1;
-
-                    if (current == 0)
-                        return false;
-
-                    if (compareAndSet(current, next))
-                        return true;
-                }
-            }
-        }
-
-        private ReadWriteLock rwLock = new ReentrantReadWriteLock();
-
-        MvccQueryVersion queryVersion() {
-            rwLock.readLock().lock();
-
-            long useCntr = commitCntr.get();
-
-            Set<TxId> txs = new HashSet<>();
-
-            Long minActive = minActive(txs);
-
-            if (minActive != null && minActive < useCntr)
-                useCntr = minActive;
-
-            MvccQueryVersion qryVer = new MvccQueryVersion(new 
CoordinatorCounter(useCntr), txs);
-
-            for (;;) {
-                QueryCounter qryCnt = activeQueries.get(useCntr);
-
-                if (qryCnt != null) {
-                    boolean inc = qryCnt.increment2();
-
-                    if (!inc) {
-                        activeQueries.remove(useCntr, qryCnt);
-
-                        continue;
-                    }
-                }
-                else {
-                    qryCnt = new QueryCounter(1);
-
-                    if (activeQueries.putIfAbsent(useCntr, qryCnt) != null)
-                        continue;
-                }
-
-                break;
-            }
-
-            rwLock.readLock().unlock();
-
-            return qryVer;
-        }
-
-        void queryDone(CoordinatorCounter cntr) {
-            AtomicInteger qryCnt = activeQueries.get(cntr.cntr);
-
-            assert qryCnt != null : cntr.cntr;
-
-            int left = qryCnt.decrementAndGet();
-
-            assert left >= 0 : left;
-
-            if (left == 0)
-                activeQueries.remove(cntr.cntr, qryCnt);
-        }
-
-        CoordinatorCounter cleanupVersion() {
-            rwLock.writeLock().lock();
-
-            long useCntr = commitCntr.get();
-
-            Long minActive = minActive(null);
-
-            if (minActive != null && minActive < useCntr)
-                useCntr = minActive - 1;
-
-            for (Long qryCntr : activeQueries.keySet()) {
-                if (qryCntr <= useCntr)
-                    useCntr = qryCntr - 1;
-            }
-
-            rwLock.writeLock().unlock();
-
-            return new CoordinatorCounter(useCntr);
-        }
-
-        @Override public String toString() {
-            return S.toString(Coordinator.class, this);
-        }
-    }
-
-    /**
-     *
-     */
-    static class CoordinatorCounter implements Comparable<CoordinatorCounter> {
-        /** */
-        private final long topVer; // TODO
-
-        /** */
-        private final long cntr;
-
-        CoordinatorCounter(long cntr) {
-            this.topVer = 1;
-            this.cntr = cntr;
-        }
-
-        @Override public int compareTo(CoordinatorCounter o) {
-            return Long.compare(cntr, o.cntr);
-        }
-
-        @Override public boolean equals(Object o) {
-            if (this == o)
-                return true;
-
-            if (o == null || getClass() != o.getClass())
-                return false;
-
-            CoordinatorCounter that = (CoordinatorCounter)o;
-
-            return cntr == that.cntr;
-        }
-
-        @Override public int hashCode() {
-            return (int)(cntr ^ (cntr >>> 32));
-        }
-
-        @Override public String toString() {
-            return "Cntr [c=" + cntr + ']';
-        }
-    }
-
-    /**
-     *
-     */
-    static class MvccUpdateVersion {
-        /** */
-        @GridToStringInclude
-        final CoordinatorCounter cntr;
-
-        /** */
-        @GridToStringInclude
-        final TxId txId;
-
-        /**
-         * @param cntr
-         */
-        MvccUpdateVersion(CoordinatorCounter cntr, TxId txId) {
-            assert cntr != null;
-
-            this.cntr = cntr;
-            this.txId = txId;
-        }
-
-        @Override public String toString() {
-            return S.toString(MvccUpdateVersion.class, this);
-        }
-    }
-
-    /**
-     *
-     */
-    static class MvccQueryVersion {
-        /** */
-        @GridToStringInclude
-        final CoordinatorCounter cntr;
-
-        /** */
-        @GridToStringInclude
-        final Collection<TxId> activeTxs;
-
-        MvccQueryVersion(CoordinatorCounter cntr, Collection<TxId> activeTxs) {
-            this.cntr = cntr;
-            this.activeTxs = activeTxs;
-        }
-
-        @Override public String toString() {
-            return S.toString(MvccQueryVersion.class, this);
-        }
-    }
-
-    /**
-     *
-     */
-    static class TxId {
-        long cntr = -1;
-
-        /** */
-        @GridToStringInclude
-        final long id;
-
-        TxId(long id) {
-            this.id = id;
-        }
-
-        @Override public boolean equals(Object o) {
-            if (this == o) return true;
-            if (o == null || getClass() != o.getClass()) return false;
-
-            TxId txId = (TxId) o;
-
-            return id == txId.id;
-        }
-
-        @Override public int hashCode() {
-            return (int) (id ^ (id >>> 32));
-        }
-
-        @Override public String toString() {
-            return S.toString(TxId.class, this);
-        }
-    }
-
-    /**
-     *
-     */
-    static class SqlKey implements Comparable<SqlKey> {
-        /** */
-        final Comparable key;
-
-        /** */
-        final Comparable val;
-
-        /** */
-        final CoordinatorCounter cntr;
-
-        public SqlKey(Object key, Object val, CoordinatorCounter cntr) {
-            this.key = (Comparable)key;
-            this.val = (Comparable)val;
-            this.cntr = cntr;
-        }
-
-        @Override public int compareTo(@NotNull SqlKey o) {
-            int cmp;
-
-            if (val != null && o.val != null)
-                cmp = val.compareTo(o.val);
-            else {
-                if (val != null)
-                    cmp = 1;
-                else
-                    cmp = o.val == null ? 0 : -1;
-            }
-
-
-            if (cmp == 0) {
-                cmp = key.compareTo(o.key);
-
-                if (cmp == 0)
-                    cmp = cntr.compareTo(o.cntr);
-            }
-
-            return cmp;
-        }
-
-        @Override public String toString() {
-            return "SqlKey [key=" + key + ", val=" + val + ']';
-        }
-    }
-
-    /**
-     *
-     */
-    static class DataStore {
-        /** */
-        private final ConcurrentHashMap<Object, ReentrantLock> locks = new 
ConcurrentHashMap<>();
-
-        /** */
-        final ConcurrentHashMap<Object, MvccValue> mainIdx = new 
ConcurrentHashMap<>();
-
-        /** */
-        final ConcurrentHashMap<Object, List<MvccValue>> mvccIdx = new 
ConcurrentHashMap<>();
-
-        /** */
-        final ConcurrentSkipListMap<SqlKey, MvccSqlValue> mvccSqlIdx = new 
ConcurrentSkipListMap<>();
-
-        void cleanup(CoordinatorCounter cleanupCntr) {
-            for (Map.Entry<Object, List<MvccValue>> e : mvccIdx.entrySet()) {
-                lockEntry(e.getKey());
-
-                try {
-                    List<MvccValue> list = e.getValue();
-
-                    synchronized (list) {
-                        for (int i = list.size() - 1; i >= 0; i--) {
-                            MvccValue val = list.get(i);
-
-                            if (val.ver.cntr.compareTo(cleanupCntr) <= 0) {
-                                if (DEBUG_LOG) {
-                                    TestDebugLog.msgs.add(new 
TestDebugLog.Msg6_1("cleanup",
-                                        e.getKey(), val.val, val.ver, 
cleanupCntr.cntr, null, null));
-                                }
-
-                                MvccValue prev;
-
-                                if (val.val != null)
-                                    prev = mainIdx.put(e.getKey(), val);
-                                else
-                                    prev = mainIdx.remove(e.getKey());
-
-                                if (prev != null) {
-                                    SqlKey key = new SqlKey(e.getKey(), 
prev.val, prev.ver.cntr);
-
-                                    MvccSqlValue old = mvccSqlIdx.remove(key);
-
-                                    assert old != null;
-                                }
-
-                                for (int j = 0; j <= i; j++) {
-                                    MvccValue rmvd = list.remove(0);
-
-                                    assert rmvd != null;
-
-                                    if (j != i || rmvd.val == null) {
-                                        SqlKey key = new SqlKey(e.getKey(), 
rmvd.val, rmvd.ver.cntr);
-
-                                        MvccSqlValue old = 
mvccSqlIdx.remove(key);
-
-                                        assert old != null;
-                                    }
-                                }
-
-                                if (list.isEmpty())
-                                    mvccIdx.remove(e.getKey());
-
-                                break;
-                            }
-                        }
-                    }
-                }
-                finally {
-                    unlockEntry(e.getKey());
-                }
-            }
-        }
-
-        void lockEntry(Object key) {
-            ReentrantLock e = lock(key);
-
-            e.lock();
-        }
-
-        void unlockEntry(Object key) {
-            ReentrantLock e = lock(key);
-
-            e.unlock();
-        }
-
-        void updateEntry(Object key, Object val, MvccUpdateVersion ver) {
-            List<MvccValue> list = mvccIdx.get(key);
-
-            if (list == null) {
-                Object old = mvccIdx.putIfAbsent(key, list = new 
ArrayList<>());
-
-                assert old == null;
-            }
-
-            MvccValue prevVal = null;
-
-            synchronized (list) {
-                if (!list.isEmpty())
-                    prevVal = list.get(list.size() - 1);
-
-                list.add(new MvccValue(val, ver));
-            }
-
-            if (prevVal == null)
-                prevVal = mainIdx.get(key);
-
-            if (prevVal != null) {
-                SqlKey prevKey = new SqlKey(key, prevVal.val, 
prevVal.ver.cntr);
-
-                MvccSqlValue old =
-                    mvccSqlIdx.put(prevKey, new MvccSqlValue(prevVal.val, 
prevVal.ver, ver));
-
-                assert old != null;
-            }
-
-            mvccSqlIdx.put(new SqlKey(key, val, ver.cntr), new 
MvccSqlValue(val, ver, null));
-        }
-
-        Object lastValue(Object key) {
-            List<MvccValue> list = mvccIdx.get(key);
-
-            if (list != null) {
-                synchronized (list) {
-                    if (list.size() > 0)
-                        return list.get(list.size() - 1).val;
-                }
-            }
-
-            MvccValue val = mainIdx.get(key);
-
-            return val != null ? val.val : null;
-        }
-
-        Map<Object, Object> sqlQuery(MvccQueryVersion qryVer) {
-            Map<Object, Object> res = new HashMap<>();
-
-            for (Map.Entry<SqlKey, MvccSqlValue> e : mvccSqlIdx.entrySet()) {
-                MvccSqlValue val = e.getValue();
-
-                if (!versionVisible(val.ver, qryVer)) {
-                    if (DEBUG_LOG) {
-                        TestDebugLog.msgs.add(new TestDebugLog.Msg3("sql skip 
mvcc val", e.getKey().key, val.val, val.ver));
-                    }
-
-                    continue;
-                }
-
-                MvccUpdateVersion newVer = val.newVer;
-
-                if (newVer != null && versionVisible(newVer, qryVer)) {
-                    if (DEBUG_LOG) {
-                        TestDebugLog.msgs.add(new TestDebugLog.Msg4("sql skip 
mvcc val2", e.getKey().key, val.val, val.ver, val.newVer));
-                    }
-
-                    continue;
-                }
-
-                Object old = res.put(e.getKey().key, e.getValue().val);
-
-                if (DEBUG_LOG) {
-                    TestDebugLog.msgs.add(new TestDebugLog.Msg4("sql get mvcc 
val", e.getKey().key, val.val, val.ver, val.newVer));
-                }
-
-                if (old != null) {
-                    TestDebugLog.printAllAndExit("Already has value for key 
[key=" + e.getKey().key +
-                        ", qryVer=" + qryVer +
-                        ", oldVal=" + old +
-                        ", newVal=" + e.getValue().val +
-                        ']');
-                }
-
-                assert old == null;
-            }
-
-            return res;
-        }
-
-        private boolean versionVisible(MvccUpdateVersion ver, MvccQueryVersion 
qryVer) {
-            int cmp = ver.cntr.compareTo(qryVer.cntr);
-
-            return cmp <= 0 && !qryVer.activeTxs.contains(ver.txId);
-        }
-
-        Object get(Object key, MvccQueryVersion ver) {
-            List<MvccValue> list = mvccIdx.get(key);
-
-            if (list != null) {
-                synchronized (list) {
-                    for (int i = list.size() - 1; i >= 0; i--) {
-                        MvccValue val = list.get(i);
-
-                        if (!versionVisible(val.ver, ver))
-                            continue;
-
-                        if (DEBUG_LOG) {
-                            TestDebugLog.msgs.add(new TestDebugLog.Msg3("read 
mvcc val", key, val, val.ver));
-                        }
-
-                        return val.val;
-                    }
-                }
-            }
-
-            MvccValue val = mainIdx.get(key);
-
-            if (val != null) {
-                int cmp = val.ver.cntr.compareTo(ver.cntr);
-
-                assert cmp <= 0 : "Committed [ver=" + val.ver + ", qryVer=" + 
ver.cntr + ']';
-
-                if (DEBUG_LOG)
-                    TestDebugLog.msgs.add(new TestDebugLog.Msg3("read comitted 
val", key, val, val.ver));
-            }
-            else {
-                if (DEBUG_LOG)
-                    TestDebugLog.msgs.add(new TestDebugLog.Msg3("read comitted 
null", key, null, null));
-            }
-
-            return val != null ? val.val : null;
-        }
-
-        private ReentrantLock lock(Object key) {
-            ReentrantLock e = locks.get(key);
-
-            if (e == null) {
-                ReentrantLock old = locks.putIfAbsent(key, e = new 
ReentrantLock());
-
-                if (old != null)
-                    e = old;
-            }
-
-            return e;
-        }
-    }
-
-    /**
-     *
-     */
-    static class MvccValue {
-        /** */
-        @GridToStringInclude
-        final Object val;
-
-        /** */
-        @GridToStringInclude
-        final MvccUpdateVersion ver;
-
-        MvccValue(Object val, MvccUpdateVersion ver) {
-            assert ver != null;
-
-            this.val = val;
-            this.ver = ver;
-        }
-
-        @Override public String toString() {
-            return S.toString(MvccValue.class, this);
-        }
-    }
-
-    /**
-     *
-     */
-    static class MvccSqlValue {
-        /** */
-        @GridToStringInclude
-        final Object val;
-
-        /** */
-        @GridToStringInclude
-        final MvccUpdateVersion ver;
-
-        /** */
-        @GridToStringInclude
-        final MvccUpdateVersion newVer;
-
-        MvccSqlValue(Object val, MvccUpdateVersion ver, MvccUpdateVersion 
newVer) {
-            assert ver != null;
-
-            this.val = val;
-            this.ver = ver;
-            this.newVer = newVer;
-        }
-
-        @Override public String toString() {
-            return S.toString(MvccSqlValue.class, this);
-        }
-    }
-
-    static void log(String msg) {
-        System.out.println(Thread.currentThread() + ": " + msg);
-    }
-}
-
-class TestDebugLog {
-    /** */
-    static final List<Object> msgs = Collections.synchronizedList(new 
ArrayList<>(100_000));
-
-    /** */
-    private static final SimpleDateFormat DEBUG_DATE_FMT = new 
SimpleDateFormat("HH:mm:ss,SSS");
-
-    static class Message {
-        String thread = Thread.currentThread().getName();
-
-        String msg;
-
-        long ts = U.currentTimeMillis();
-
-        public Message(String msg) {
-            this.msg = msg;
-        }
-
-        public String toString() {
-            return "Msg [msg=" + msg + ", thread=" + thread + ", time=" + 
DEBUG_DATE_FMT.format(new Date(ts)) + ']';
-        }
-    }
-
-    static class Msg2 extends Message{
-        Object v1;
-        Object v2;
-
-        public Msg2(String msg, Object v1, Object v2) {
-            super(msg);
-            this.v1 = v1;
-            this.v2 = v2;
-        }
-        public String toString() {
-            return "Msg [msg=" + msg +
-                ", v1=" + v1 +
-                ", v2=" + v2 +
-                ", msg=" + msg +
-                ", thread=" + thread +
-                ", time=" + DEBUG_DATE_FMT.format(new Date(ts)) + ']';
-        }
-    }
-
-    static class Msg3 extends Message{
-        Object v1;
-        Object v2;
-        Object v3;
-
-        public Msg3(String msg, Object v1, Object v2, Object v3) {
-            super(msg);
-            this.v1 = v1;
-            this.v2 = v2;
-            this.v3 = v3;
-        }
-        public String toString() {
-            return "Msg [msg=" + msg +
-                ", v1=" + v1 +
-                ", v2=" + v2 +
-                ", v3=" + v3 +
-                ", thread=" + thread +
-                ", time=" + DEBUG_DATE_FMT.format(new Date(ts)) + ']';
-        }
-    }
-
-    static class Msg4 extends Message{
-        Object v1;
-        Object v2;
-        Object v3;
-        Object v4;
-
-        public Msg4(String msg, Object v1, Object v2, Object v3, Object v4) {
-            super(msg);
-            this.v1 = v1;
-            this.v2 = v2;
-            this.v3 = v3;
-            this.v4 = v4;
-        }
-
-        public String toString() {
-            return "Msg [msg=" + msg +
-                ", v1=" + v1 +
-                ", v2=" + v2 +
-                ", v3=" + v3 +
-                ", v4=" + v4 +
-                ", thread=" + thread +
-                ", time=" + DEBUG_DATE_FMT.format(new Date(ts)) + ']';
-        }
-    }
-
-    static class Msg6 extends Message{
-        Object v1;
-        Object v2;
-        Object v3;
-        Object v4;
-        Object v5;
-        Object v6;
-
-        public Msg6(String msg, Object v1, Object v2, Object v3, Object v4, 
Object v5, Object v6) {
-            super(msg);
-            this.v1 = v1;
-            this.v2 = v2;
-            this.v3 = v3;
-            this.v4 = v4;
-            this.v5 = v5;
-            this.v6 = v6;
-        }
-
-        public String toString() {
-            return "Msg [msg=" + msg +
-                ", txId=" + v1 +
-                ", id1=" + v2 +
-                ", v1=" + v3 +
-                ", id2=" + v4 +
-                ", v2=" + v5 +
-                ", cntr=" + v6 +
-                ", thread=" + thread +
-                ", time=" + DEBUG_DATE_FMT.format(new Date(ts)) + ']';
-        }
-    }
-    static class Msg6_1 extends Message{
-        Object v1;
-        Object v2;
-        Object v3;
-        Object v4;
-        Object v5;
-        Object v6;
-
-        public Msg6_1(String msg, Object v1, Object v2, Object v3, Object v4, 
Object v5, Object v6) {
-            super(msg);
-            this.v1 = v1;
-            this.v2 = v2;
-            this.v3 = v3;
-            this.v4 = v4;
-            this.v5 = v5;
-            this.v6 = v6;
-        }
-
-        public String toString() {
-            return "Msg [msg=" + msg +
-                ", key=" + v1 +
-                ", val=" + v2 +
-                ", ver=" + v3 +
-                ", cleanupC=" + v4 +
-                ", thread=" + thread +
-                ", time=" + DEBUG_DATE_FMT.format(new Date(ts)) + ']';
-        }
-    }
-
-    static class EntryMessage extends Message {
-        Object key;
-        Object val;
-
-        public EntryMessage(Object key, Object val, String msg) {
-            super(msg);
-
-            this.key = key;
-            this.val = val;
-        }
-
-        public String toString() {
-            return "EntryMsg [key=" + key + ", val=" + val + ", msg=" + msg + 
", thread=" + thread + ", time=" + DEBUG_DATE_FMT.format(new Date(ts)) + ']';
-        }
-    }
-
-    static class PartMessage extends Message {
-        int p;
-        Object val;
-
-        public PartMessage(int p, Object val, String msg) {
-            super(msg);
-
-            this.p = p;
-            this.val = val;
-        }
-
-        public String toString() {
-            return "PartMessage [p=" + p + ", val=" + val + ", msg=" + msg + 
", thread=" + thread + ", time=" + DEBUG_DATE_FMT.format(new Date(ts)) + ']';
-        }
-    }
-
-    static final boolean out = false;
-
-    public static void addMessage(String msg) {
-        msgs.add(new Message(msg));
-
-        if (out)
-            System.out.println(msg);
-    }
-
-    public static void addEntryMessage(Object key, Object val, String msg) {
-        if (key instanceof KeyCacheObject)
-            key = ((KeyCacheObject)key).value(null, false);
-
-        EntryMessage msg0 = new EntryMessage(key, val, msg);
-
-        msgs.add(msg0);
-
-        if (out) {
-            System.out.println(msg0.toString());
-
-            System.out.flush();
-        }
-    }
-
-    public static void addPartMessage(int p, Object val, String msg) {
-        PartMessage msg0 = new PartMessage(p, val, msg);
-
-        msgs.add(msg0);
-
-        if (out) {
-            System.out.println(msg0.toString());
-
-            System.out.flush();
-        }
-    }
-
-    static void printAllAndExit(String msg) {
-        System.out.println(msg);
-
-        TestDebugLog.addMessage(msg);
-
-        List<Object> msgs = TestDebugLog.printMessages(true, null);
-
-        TestDebugLog.printMessages0(msgs, "test_debug_update.txt");
-
-        TestDebugLog.printMessagesForThread(msgs, 
Thread.currentThread().getName(), "test_debug_thread.txt");
-
-        System.exit(1);
-    }
-
-    public static void printMessagesForThread(List<Object> msgs0, String 
thread0, String file) {
-        try {
-            FileOutputStream out = new FileOutputStream(file);
-
-            PrintWriter w = new PrintWriter(out);
-
-            for (Object msg : msgs0) {
-                if (msg instanceof Message) {
-                    String thread = ((Message) msg).thread;
-
-                    if (thread.equals(thread0))
-                        w.println(msg.toString());
-                }
-            }
-
-            w.close();
-
-            out.close();
-        }
-        catch (IOException e) {
-            e.printStackTrace();
-        }
-    }
-
-    public static void printMessages0(List<Object> msgs0, String file) {
-        try {
-            FileOutputStream out = new FileOutputStream(file);
-
-            PrintWriter w = new PrintWriter(out);
-
-            for (Object msg : msgs0) {
-                if (msg instanceof Message) {
-                    String msg0 = ((Message) msg).msg;
-
-                    if (msg0.equals("tx done") || msg0.equals("update") || 
msg0.equals("cleanup"))
-                        w.println(msg.toString());
-                }
-            }
-
-            w.close();
-
-            out.close();
-        }
-        catch (IOException e) {
-            e.printStackTrace();
-        }
-    }
-
-    public static List<Object> printMessages(boolean file, Integer part) {
-        List<Object> msgs0;
-
-        synchronized (msgs) {
-            msgs0 = new ArrayList<>(msgs);
-
-            msgs.clear();
-        }
-
-        if (file) {
-            try {
-                FileOutputStream out = new FileOutputStream("test_debug.log");
-
-                PrintWriter w = new PrintWriter(out);
-
-                for (Object msg : msgs0) {
-                    if (part != null && msg instanceof PartMessage) {
-                        if (((PartMessage) msg).p != part)
-                            continue;
-                    }
-
-                    w.println(msg.toString());
-                }
-
-                w.close();
-
-                out.close();
-            }
-            catch (IOException e) {
-                e.printStackTrace();
-            }
-        }
-        else {
-            for (Object msg : msgs0)
-                System.out.println(msg);
-        }
-
-        return msgs0;
-    }
-
-    public static void printKeyMessages(boolean file, Object key) {
-        List<Object> msgs0;
-
-        synchronized (msgs) {
-            msgs0 = new ArrayList<>(msgs);
-
-            msgs.clear();
-        }
-
-        if (file) {
-            try {
-                FileOutputStream out = new FileOutputStream("test_debug.log");
-
-                PrintWriter w = new PrintWriter(out);
-
-                for (Object msg : msgs0) {
-                    if (msg instanceof EntryMessage && 
!((EntryMessage)msg).key.equals(key))
-                        continue;
-
-                    w.println(msg.toString());
-                }
-
-                w.close();
-
-                out.close();
-            }
-            catch (IOException e) {
-                e.printStackTrace();
-            }
-        }
-        else {
-            for (Object msg : msgs0) {
-                if (msg instanceof EntryMessage && 
!((EntryMessage)msg).key.equals(key))
-                    continue;
-
-                System.out.println(msg);
-            }
-        }
-    }
-
-    public static void clear() {
-        msgs.clear();
-    }
-
-    public static void clearEntries() {
-        for (Iterator it = msgs.iterator(); it.hasNext();) {
-            Object msg = it.next();
-
-            if (msg instanceof EntryMessage)
-                it.remove();
-        }
-    }
-
-}
\ No newline at end of file

Reply via email to