Repository: ignite Updated Branches: refs/heads/ignite-6149 a02bf51e5 -> 6d6e0c62b
ignite-6149 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6d6e0c62 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6d6e0c62 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6d6e0c62 Branch: refs/heads/ignite-6149 Commit: 6d6e0c62b51c59b39306342192d27f70e7e6790d Parents: a02bf51 Author: sboikov <[email protected]> Authored: Tue Aug 29 14:48:40 2017 +0300 Committer: sboikov <[email protected]> Committed: Tue Aug 29 18:04:54 2017 +0300 ---------------------------------------------------------------------- .../org/apache/ignite/internal/MvccTestApp.java | 566 ++++++++++++++++--- 1 file changed, 480 insertions(+), 86 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/6d6e0c62/modules/core/src/main/java/org/apache/ignite/internal/MvccTestApp.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/MvccTestApp.java b/modules/core/src/main/java/org/apache/ignite/internal/MvccTestApp.java index fe36e1f..a4bdd91 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/MvccTestApp.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/MvccTestApp.java @@ -35,6 +35,7 @@ import java.util.Set; import java.util.TreeMap; import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -47,6 +48,7 @@ import org.apache.ignite.internal.util.GridAtomicLong; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; +import org.jetbrains.annotations.NotNull; import org.jsr166.ConcurrentHashMap8; /** @@ -56,12 +58,15 @@ public class MvccTestApp { /** */ private static final boolean DEBUG_LOG = false; - public static void main0(String[] args) throws Exception { - final MvccTestApp.TestCluster cluster = new MvccTestApp.TestCluster(3); + /** */ + private static final boolean SQL = true; + + public static void main1(String[] args) throws Exception { + final MvccTestApp.TestCluster cluster = new MvccTestApp.TestCluster(1); final int ACCOUNTS = 3; - final int START_VAL = 100_000; + final int START_VAL = 10; final Map<Object, Object> data = new TreeMap<>(); @@ -74,7 +79,11 @@ public class MvccTestApp { cluster.txTransfer(0, 1, true); cluster.txTransfer(0, 2, true); - Map<Object, Object> getData = cluster.getAll(data.keySet()); + Map<Object, Object> vals = cluster.sqlAll(); + + System.out.println(); + + Map<Object, Object> getData = cluster.sqlAll();;//cluster.getAll(data.keySet()); int sum = 0; @@ -90,26 +99,71 @@ public class MvccTestApp { cluster.cleanup(); - getData = cluster.getAll(data.keySet()); + getData = cluster.sqlAll(); + + System.out.println(); +// +// MvccQueryVersion ver1 = cluster.crd.queryVersion(); +// MvccQueryVersion ver2 = cluster.crd.queryVersion(); +// +// cluster.crd.queryDone(ver2.cntr); +// cluster.crd.queryDone(ver1.cntr); + } + + public static void main0(String[] args) throws Exception { + final MvccTestApp.TestCluster cluster = new MvccTestApp.TestCluster(1); + + final int ACCOUNTS = 3; + + final int START_VAL = 10; + + final Map<Object, Object> data = new TreeMap<>(); + + for (int i = 0; i < ACCOUNTS; i++) + data.put(i, START_VAL); + + cluster.txPutAll(data); + + cluster.txRemoveTransfer(0, 1); + + Map<Object, Object> getData = cluster.sqlAll();;//cluster.getAll(data.keySet()); + + int sum = 0; + + for (Map.Entry<Object, Object> e : getData.entrySet()) { + Integer val = (Integer)e.getValue(); + + if (val != null) + sum += val; - MvccQueryVersion ver1 = cluster.crd.queryVersion(); - MvccQueryVersion ver2 = cluster.crd.queryVersion(); + System.out.println("Val: " + val); + } - cluster.crd.queryDone(ver2.cntr); - cluster.crd.queryDone(ver1.cntr); + System.out.println("Sum: " + sum); + cluster.cleanup(); + + getData = cluster.sqlAll(); + + System.out.println(); +// +// MvccQueryVersion ver1 = cluster.crd.queryVersion(); +// MvccQueryVersion ver2 = cluster.crd.queryVersion(); +// +// cluster.crd.queryDone(ver2.cntr); +// cluster.crd.queryDone(ver1.cntr); } public static void main(String[] args) throws Exception { final AtomicBoolean err = new AtomicBoolean(); - final int READ_THREADS = 4; - final int UPDATE_THREADS = 6; + final int READ_THREADS = 3; + final int UPDATE_THREADS = 4; final int ACCOUNTS = 100; - final int START_VAL = 1000; + final int START_VAL = 100000; - for (int iter = 0; iter < 10; iter++) { + for (int iter = 0; iter < 5; iter++) { System.out.println("Iteration [readThreads=" + READ_THREADS + ", updateThreads=" + UPDATE_THREADS + ", accounts=" + ACCOUNTS + ", iter=" + iter + ']'); @@ -147,6 +201,8 @@ public class MvccTestApp { cleanupThread.start(); + final boolean REMOVES = true; + for (int i = 0; i < READ_THREADS; i++) { final int id = i; @@ -157,16 +213,37 @@ public class MvccTestApp { int cnt = 0; while (!stop.get()) { - Map<Object, Object> getData = cluster.getAll(data.keySet()); + Map<Object, Object> qryData = SQL ? cluster.sqlAll() : cluster.getAll(data.keySet()); cnt++; int sum = 0; - for (int i = 0; i < ACCOUNTS; i++) { - Integer val = (Integer)getData.get(i); + if (REMOVES) { + for (Map.Entry<Object, Object> e : qryData.entrySet()) { + Integer val = (Integer)e.getValue(); - sum += val; + if (val != null) + sum += val; + else + System.out.println("With null"); + } + } + else { + for (int i = 0; i < ACCOUNTS; i++) { + Integer val = (Integer)qryData.get(i); + + if (val == null) { + if (stop.compareAndSet(false, true)) { + stop.set(true); + err.set(true); + + TestDebugLog.printAllAndExit("No value for key: " + i); + } + } + + sum += val; + } } if (sum != ACCOUNTS * START_VAL) { @@ -174,19 +251,12 @@ public class MvccTestApp { stop.set(true); err.set(true); - System.out.println("Invalid get sum: " + sum); - - TestDebugLog.addMessage("Invalid get sum: " + sum); - - List<Object> msgs = TestDebugLog.printMessages(true, null); - - TestDebugLog.printMessages0(msgs, "test_debug_update.txt"); - - TestDebugLog.printMessagesForThread(msgs, Thread.currentThread().getName(), "test_debug_thread.txt"); - - System.exit(1); + TestDebugLog.printAllAndExit("Invalid get sum: " + sum); } } + +// if (cnt % 100 == 0) +// System.out.println("get " + cnt); } System.out.println("Get cnt: " + cnt); @@ -201,31 +271,60 @@ public class MvccTestApp { for (int i = 0; i < UPDATE_THREADS; i++) { final int id = i; - Thread thread = new Thread(new Runnable() { - @Override public void run() { - Thread.currentThread().setName("update" + id); + Thread thread; - ThreadLocalRandom rnd = ThreadLocalRandom.current(); + if (REMOVES) { + thread = new Thread(new Runnable() { + @Override public void run() { + Thread.currentThread().setName("update" + id); - while (!stop.get()) { - int id1 = rnd.nextInt(ACCOUNTS); + ThreadLocalRandom rnd = ThreadLocalRandom.current(); - int id2 = rnd.nextInt(ACCOUNTS); + while (!stop.get()) { + int id1 = rnd.nextInt(ACCOUNTS); - while (id2 == id1) - id2 = rnd.nextInt(ACCOUNTS); + int id2 = rnd.nextInt(ACCOUNTS); - if (id1 > id2) { - int tmp = id1; - id1 = id2; - id2 = tmp; + while (id2 == id1) + id2 = rnd.nextInt(ACCOUNTS); + + if (rnd.nextBoolean()) { + cluster.txRemoveTransfer(id1, id2); + } + else + cluster.txTransfer(id1, id2, rnd.nextBoolean()); } - cluster.txTransfer(id1, id2, rnd.nextBoolean()); } + }); + } + else { + thread = new Thread(new Runnable() { + @Override public void run() { + Thread.currentThread().setName("update" + id); - } - }); + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + while (!stop.get()) { + int id1 = rnd.nextInt(ACCOUNTS); + + int id2 = rnd.nextInt(ACCOUNTS); + + while (id2 == id1) + id2 = rnd.nextInt(ACCOUNTS); + + if (id1 > id2) { + int tmp = id1; + id1 = id2; + id2 = tmp; + } + + cluster.txTransfer(id1, id2, rnd.nextBoolean()); + } + + } + }); + } threads.add(thread); @@ -240,7 +339,7 @@ public class MvccTestApp { if (System.currentTimeMillis() >= endTime) break; - //cluster.dumpMvccInfo(); + cluster.dumpMvccInfo(); } stop.set(true); @@ -248,16 +347,17 @@ public class MvccTestApp { for (Thread thread : threads) thread.join(); - Map<Object, Object> getData = cluster.getAll(data.keySet()); + Map<Object, Object> qryData = SQL ? cluster.sqlAll() : cluster.getAll(data.keySet()); int sum = 0; for (int i = 0; i < ACCOUNTS; i++) { - Integer val = (Integer)getData.get(i); + Integer val = (Integer)qryData.get(i); System.out.println("Val " + val); - sum += val; + if (val != null) + sum += val; } System.out.println("Sum=" + sum + ", expSum=" + (ACCOUNTS * START_VAL)); @@ -330,6 +430,10 @@ public class MvccTestApp { Node node = e.getValue(); node.dataStore.updateEntry(e.getKey(), data.get(e.getKey()), mvccVer); + } + + for (Map.Entry<Object, Node> e : mappedEntries.entrySet()) { + Node node = e.getValue(); node.dataStore.unlockEntry(e.getKey()); } @@ -368,29 +472,31 @@ public class MvccTestApp { boolean update = false; - Map<Object, Object> newVals = new HashMap<>(); - Integer newVal1 = null; Integer newVal2 = null; - if (fromFirst) { - if (curVal1 > 0) { - update = true; + if (curVal1 != null && curVal2 != null) { + if (fromFirst) { + if (curVal1 > 0) { + update = true; - newVal1 = curVal1 - 1; - newVal2 = curVal2 + 1; + newVal1 = curVal1 - 1; + newVal2 = curVal2 + 1; + } } - } - else { - if (curVal2 > 0) { - update = true; + else { + if (curVal2 > 0) { + update = true; - newVal1 = curVal1 + 1; - newVal2 = curVal2 - 1; + newVal1 = curVal1 + 1; + newVal2 = curVal2 - 1; + } } } if (update) { + Map<Object, Object> newVals = new HashMap<>(); + newVals.put(id1, newVal1); newVals.put(id2, newVal2); @@ -398,11 +504,6 @@ public class MvccTestApp { if (DEBUG_LOG) { TestDebugLog.msgs.add(new TestDebugLog.Msg6("update", txId, id1, newVal1, id2, newVal2, cntr)); - -// log("update txId=" + txId + -// ", id1=" + id1 + ", v1=" + newVal1 + -// ", id2=" + id2 + ", v2=" + newVal2 + -// ", ver=" + cntr); } for (Map.Entry<Object, Node> e : mappedEntries.entrySet()) { @@ -425,16 +526,85 @@ public class MvccTestApp { } } - if (DEBUG_LOG) { - TestDebugLog.msgs.add(new TestDebugLog.Msg2("tx done", txId, null)); - //log("tx done " + txId); + crd.txDone(txId); + + if (DEBUG_LOG) + TestDebugLog.msgs.add(new TestDebugLog.Msg2("tx done", txId, cntr.cntr)); + } + + void txRemoveTransfer(Integer from, Integer to) { + TreeSet<Integer> keys = new TreeSet<>(); + + keys.add(from); + keys.add(to); + + TxId txId = new TxId(txIdGen.incrementAndGet()); + + Map<Object, Node> mappedEntries = new LinkedHashMap<>(); + + Map<Object, Object> vals = new HashMap<>(); + + for (Object key : keys) { + int nodeIdx = nodeForKey(key); + + Node node = nodes.get(nodeIdx); + + node.dataStore.lockEntry(key); + + vals.put(key, node.dataStore.lastValue(key)); + + mappedEntries.put(key, node); + } + + CoordinatorCounter cntr = crd.nextTxCounter(txId); + + Integer fromVal = (Integer)vals.get(from); + Integer toVal = (Integer)vals.get(to); + + boolean update = fromVal != null && toVal != null; + + if (update) { + Map<Object, Object> newVals = new HashMap<>(); + + newVals.put(from, null); + newVals.put(to, fromVal + toVal); + + MvccUpdateVersion mvccVer = new MvccUpdateVersion(cntr, txId); + + if (DEBUG_LOG) { + TestDebugLog.msgs.add(new TestDebugLog.Msg6("remove", txId, from, fromVal, to, toVal, cntr)); + } + + for (Map.Entry<Object, Node> e : mappedEntries.entrySet()) { + Node node = e.getValue(); + + node.dataStore.updateEntry(e.getKey(), newVals.get(e.getKey()), mvccVer); + } + + for (Map.Entry<Object, Node> e : mappedEntries.entrySet()) { + Node node = e.getValue(); + + node.dataStore.unlockEntry(e.getKey()); + } + } + else { + for (Map.Entry<Object, Node> e : mappedEntries.entrySet()) { + Node node = e.getValue(); + + node.dataStore.unlockEntry(e.getKey()); + } } crd.txDone(txId); + + if (DEBUG_LOG) + TestDebugLog.msgs.add(new TestDebugLog.Msg2("tx done", txId, cntr.cntr)); } public void dumpMvccInfo() { for (Node node : nodes) { + int sql = node.dataStore.mvccSqlIdx.size(); + for (Map.Entry<Object, MvccValue> e : node.dataStore.mainIdx.entrySet()) { List<MvccValue> list = node.dataStore.mvccIdx.get(e.getKey()); @@ -448,13 +618,34 @@ public class MvccTestApp { System.out.println("Mvcc info [key=" + e.getKey() + ", val=" + e.getValue() + - ", mvccVals=" + size + ']'); + ", mvccVals=" + size + + ", sqlVals=" + sql + ']'); } } } + public Map<Object, Object> sqlAll() { + MvccQueryVersion qryVer = crd.queryVersion(); + + Map<Object, Object> res = new HashMap<>(); + + for (Node node : nodes) { + Map<Object, Object> nodeRes = node.dataStore.sqlQuery(qryVer); + + res.putAll(nodeRes); + } + + crd.queryDone(qryVer.cntr); + + if (DEBUG_LOG) { + TestDebugLog.msgs.add(new TestDebugLog.Msg3("sqlAll", qryVer.cntr, qryVer.activeTxs, res)); + } + + return res; + } + public Map<Object, Object> getAll(Set<?> keys) { - MvccQueryVersion ver = crd.queryVersion(); + MvccQueryVersion qryVer = crd.queryVersion(); Map<Object, Object> res = new HashMap<>(); @@ -463,16 +654,15 @@ public class MvccTestApp { Node node = nodes.get(nodeIdx); - Object val = node.dataStore.get(key, ver); + Object val = node.dataStore.get(key, qryVer); res.put(key, val); } - crd.queryDone(ver.cntr); + crd.queryDone(qryVer.cntr); if (DEBUG_LOG) { - TestDebugLog.msgs.add(new TestDebugLog.Msg3("getAll", ver.cntr, ver.activeTxs, res)); - //log("query [cntr=" + ver.cntr + ", txs=" + ver.activeTxs + ", res=" + res + ']'); + TestDebugLog.msgs.add(new TestDebugLog.Msg3("getAll", qryVer.cntr, qryVer.activeTxs, res)); } return res; @@ -786,6 +976,53 @@ public class MvccTestApp { /** * */ + static class SqlKey implements Comparable<SqlKey> { + /** */ + final Comparable key; + + /** */ + final Comparable val; + + /** */ + final CoordinatorCounter cntr; + + public SqlKey(Object key, Object val, CoordinatorCounter cntr) { + this.key = (Comparable)key; + this.val = (Comparable)val; + this.cntr = cntr; + } + + @Override public int compareTo(@NotNull SqlKey o) { + int cmp; + + if (val != null && o.val != null) + cmp = val.compareTo(o.val); + else { + if (val != null) + cmp = 1; + else + cmp = o.val == null ? 0 : -1; + } + + + if (cmp == 0) { + cmp = key.compareTo(o.key); + + if (cmp == 0) + cmp = cntr.compareTo(o.cntr); + } + + return cmp; + } + + @Override public String toString() { + return "SqlKey [key=" + key + ", val=" + val + ']'; + } + } + + /** + * + */ static class DataStore { /** */ private final ConcurrentHashMap<Object, ReentrantLock> locks = new ConcurrentHashMap<>(); @@ -796,6 +1033,9 @@ public class MvccTestApp { /** */ final ConcurrentHashMap<Object, List<MvccValue>> mvccIdx = new ConcurrentHashMap<>(); + /** */ + final ConcurrentSkipListMap<SqlKey, MvccSqlValue> mvccSqlIdx = new ConcurrentSkipListMap<>(); + void cleanup(CoordinatorCounter cleanupCntr) { for (Map.Entry<Object, List<MvccValue>> e : mvccIdx.entrySet()) { lockEntry(e.getKey()); @@ -813,10 +1053,34 @@ public class MvccTestApp { e.getKey(), val.val, val.ver, cleanupCntr.cntr, null, null)); } - mainIdx.put(e.getKey(), val); + MvccValue prev; + + if (val.val != null) + prev = mainIdx.put(e.getKey(), val); + else + prev = mainIdx.remove(e.getKey()); + + if (prev != null) { + SqlKey key = new SqlKey(e.getKey(), prev.val, prev.ver.cntr); + + MvccSqlValue old = mvccSqlIdx.remove(key); + + assert old != null; + } + + for (int j = 0; j <= i; j++) { + MvccValue rmvd = list.remove(0); - for (int j = 0; j <= i; j++) - list.remove(0); + assert rmvd != null; + + if (j != i || rmvd.val == null) { + SqlKey key = new SqlKey(e.getKey(), rmvd.val, rmvd.ver.cntr); + + MvccSqlValue old = mvccSqlIdx.remove(key); + + assert old != null; + } + } if (list.isEmpty()) mvccIdx.remove(e.getKey()); @@ -853,9 +1117,28 @@ public class MvccTestApp { assert old == null; } + MvccValue prevVal = null; + synchronized (list) { + if (!list.isEmpty()) + prevVal = list.get(list.size() - 1); + list.add(new MvccValue(val, ver)); } + + if (prevVal == null) + prevVal = mainIdx.get(key); + + if (prevVal != null) { + SqlKey prevKey = new SqlKey(key, prevVal.val, prevVal.ver.cntr); + + MvccSqlValue old = + mvccSqlIdx.put(prevKey, new MvccSqlValue(prevVal.val, prevVal.ver, ver)); + + assert old != null; + } + + mvccSqlIdx.put(new SqlKey(key, val, ver.cntr), new MvccSqlValue(val, ver, null)); } Object lastValue(Object key) { @@ -873,6 +1156,56 @@ public class MvccTestApp { return val != null ? val.val : null; } + Map<Object, Object> sqlQuery(MvccQueryVersion qryVer) { + Map<Object, Object> res = new HashMap<>(); + + for (Map.Entry<SqlKey, MvccSqlValue> e : mvccSqlIdx.entrySet()) { + MvccSqlValue val = e.getValue(); + + if (!versionVisible(val.ver, qryVer)) { + if (DEBUG_LOG) { + TestDebugLog.msgs.add(new TestDebugLog.Msg3("sql skip mvcc val", e.getKey().key, val.val, val.ver)); + } + + continue; + } + + MvccUpdateVersion newVer = val.newVer; + + if (newVer != null && versionVisible(newVer, qryVer)) { + if (DEBUG_LOG) { + TestDebugLog.msgs.add(new TestDebugLog.Msg4("sql skip mvcc val2", e.getKey().key, val.val, val.ver, val.newVer)); + } + + continue; + } + + Object old = res.put(e.getKey().key, e.getValue().val); + + if (DEBUG_LOG) { + TestDebugLog.msgs.add(new TestDebugLog.Msg4("sql get mvcc val", e.getKey().key, val.val, val.ver, val.newVer)); + } + + if (old != null) { + TestDebugLog.printAllAndExit("Already has value for key [key=" + e.getKey().key + + ", qryVer=" + qryVer + + ", oldVal=" + old + + ", newVal=" + e.getValue().val + + ']'); + } + + assert old == null; + } + + return res; + } + + private boolean versionVisible(MvccUpdateVersion ver, MvccQueryVersion qryVer) { + int cmp = ver.cntr.compareTo(qryVer.cntr); + + return cmp <= 0 && !qryVer.activeTxs.contains(ver.txId); + } + Object get(Object key, MvccQueryVersion ver) { List<MvccValue> list = mvccIdx.get(key); @@ -881,17 +1214,11 @@ public class MvccTestApp { for (int i = list.size() - 1; i >= 0; i--) { MvccValue val = list.get(i); - int cmp = val.ver.cntr.compareTo(ver.cntr); - - if (cmp > 0) - continue; - - if (ver.activeTxs.contains(val.ver.txId)) + if (!versionVisible(val.ver, ver)) continue; if (DEBUG_LOG) { TestDebugLog.msgs.add(new TestDebugLog.Msg3("read mvcc val", key, val, val.ver)); - //log("get res [key=" + key + ", val=" + val.val + ", ver=" + val.ver + ']'); } return val.val; @@ -955,6 +1282,35 @@ public class MvccTestApp { } } + /** + * + */ + static class MvccSqlValue { + /** */ + @GridToStringInclude + final Object val; + + /** */ + @GridToStringInclude + final MvccUpdateVersion ver; + + /** */ + @GridToStringInclude + final MvccUpdateVersion newVer; + + MvccSqlValue(Object val, MvccUpdateVersion ver, MvccUpdateVersion newVer) { + assert ver != null; + + this.val = val; + this.ver = ver; + this.newVer = newVer; + } + + @Override public String toString() { + return S.toString(MvccSqlValue.class, this); + } + } + static void log(String msg) { System.out.println(Thread.currentThread() + ": " + msg); } @@ -1018,7 +1374,31 @@ class TestDebugLog { ", v1=" + v1 + ", v2=" + v2 + ", v3=" + v3 + - ", msg=" + msg + + ", thread=" + thread + + ", time=" + DEBUG_DATE_FMT.format(new Date(ts)) + ']'; + } + } + + static class Msg4 extends Message{ + Object v1; + Object v2; + Object v3; + Object v4; + + public Msg4(String msg, Object v1, Object v2, Object v3, Object v4) { + super(msg); + this.v1 = v1; + this.v2 = v2; + this.v3 = v3; + this.v4 = v4; + } + + public String toString() { + return "Msg [msg=" + msg + + ", v1=" + v1 + + ", v2=" + v2 + + ", v3=" + v3 + + ", v4=" + v4 + ", thread=" + thread + ", time=" + DEBUG_DATE_FMT.format(new Date(ts)) + ']'; } @@ -1151,6 +1531,20 @@ class TestDebugLog { } } + static void printAllAndExit(String msg) { + System.out.println(msg); + + TestDebugLog.addMessage(msg); + + List<Object> msgs = TestDebugLog.printMessages(true, null); + + TestDebugLog.printMessages0(msgs, "test_debug_update.txt"); + + TestDebugLog.printMessagesForThread(msgs, Thread.currentThread().getName(), "test_debug_thread.txt"); + + System.exit(1); + } + public static void printMessagesForThread(List<Object> msgs0, String thread0, String file) { try { FileOutputStream out = new FileOutputStream(file);
