Repository: ignite Updated Branches: refs/heads/ignite-6149 c13877f2f -> 5a4c6f354
ignite-6149 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5a4c6f35 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5a4c6f35 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5a4c6f35 Branch: refs/heads/ignite-6149 Commit: 5a4c6f3543e609ce4e83feb0a9260c55f462d949 Parents: c13877f Author: sboikov <[email protected]> Authored: Fri Aug 25 16:38:37 2017 +0300 Committer: sboikov <[email protected]> Committed: Fri Aug 25 18:10:15 2017 +0300 ---------------------------------------------------------------------- .../org/apache/ignite/internal/MvccTestApp.java | 287 ++++++++++++++----- 1 file changed, 221 insertions(+), 66 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/5a4c6f35/modules/core/src/main/java/org/apache/ignite/internal/MvccTestApp.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/MvccTestApp.java b/modules/core/src/main/java/org/apache/ignite/internal/MvccTestApp.java index 110236d..b99e805 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/MvccTestApp.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/MvccTestApp.java @@ -44,6 +44,7 @@ import org.apache.ignite.internal.util.GridAtomicLong; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; +import org.jsr166.ConcurrentHashMap8; /** * @@ -52,50 +53,17 @@ public class MvccTestApp { /** */ private static final boolean DEBUG_LOG = false; - public static void main0(String[] args) throws Exception { - final TestCluster cluster = new TestCluster(3); - - final int ACCOUNTS = 3; - - final int START_VAL = 100_000; - - final Map<Object, Object> data = new TreeMap<>(); - - for (int i = 0; i < ACCOUNTS; i++) - data.put(i, START_VAL); - - cluster.txPutAll(data); - - cluster.txTransfer(0, 1, true); - cluster.txTransfer(0, 1, true); - cluster.txTransfer(0, 2, true); - - Map<Object, Object> getData = cluster.getAll(data.keySet()); - - int sum = 0; - - for (int i = 0; i < ACCOUNTS; i++) { - Integer val = (Integer)getData.get(i); - - sum += val; - - System.out.println("Val: " + val); - } - - System.out.println("Sum: " + sum); - } - public static void main(String[] args) throws Exception { final AtomicBoolean err = new AtomicBoolean(); - for (int iter = 0; iter < 3; iter++) { + for (int iter = 0; iter < 10; iter++) { System.out.println("Iteration: " + iter); final TestCluster cluster = new TestCluster(1); - final int ACCOUNTS = 5; + final int ACCOUNTS = 4; - final int START_VAL = 100; + final int START_VAL = 10; final Map<Object, Object> data = new TreeMap<>(); @@ -108,7 +76,7 @@ public class MvccTestApp { List<Thread> threads = new ArrayList<>(); - for (int i = 0; i < 1; i++) { + for (int i = 0; i < 5; i++) { final int id = i; Thread thread = new Thread(new Runnable() { @@ -136,6 +104,16 @@ public class MvccTestApp { err.set(true); System.out.println("Invalid get sum: " + sum); + + TestDebugLog.addMessage("Invalid get sum: " + sum); + + List<Object> msgs = TestDebugLog.printMessages(true, null); + + TestDebugLog.printMessages0(msgs, "test_debug_update.txt"); + + TestDebugLog.printMessagesForThread(msgs, Thread.currentThread().getName(), "test_debug_thread.txt"); + + System.exit(1); } } } @@ -216,6 +194,8 @@ public class MvccTestApp { System.exit(1); } + + TestDebugLog.clear(); } } @@ -329,10 +309,12 @@ public class MvccTestApp { MvccUpdateVersion mvccVer = new MvccUpdateVersion(cntr, txId); if (DEBUG_LOG) { - log("update txId=" + txId + - ", id1=" + id1 + ", v1=" + newVal1 + - ", id2=" + id2 + ", v2=" + newVal2 + - ", ver=" + cntr); + TestDebugLog.msgs.add(new TestDebugLog.Msg6("update", txId, id1, newVal1, id2, newVal2, cntr)); + +// log("update txId=" + txId + +// ", id1=" + id1 + ", v1=" + newVal1 + +// ", id2=" + id2 + ", v2=" + newVal2 + +// ", ver=" + cntr); } for (Map.Entry<Object, Node> e : mappedEntries.entrySet()) { @@ -355,8 +337,10 @@ public class MvccTestApp { } } - if (DEBUG_LOG) - log("tx done " + txId); + if (DEBUG_LOG) { + TestDebugLog.msgs.add(new TestDebugLog.Msg2("tx done", txId, null)); + //log("tx done " + txId); + } crd.txDone(txId); } @@ -378,8 +362,10 @@ public class MvccTestApp { crd.queryDone(ver.cntr); - if (DEBUG_LOG) - log("query [cntr=" + ver.cntr + ", txs=" + ver.activeTxs + ", res=" + res + ']'); + if (DEBUG_LOG) { + TestDebugLog.msgs.add(new TestDebugLog.Msg3("getAll", ver.cntr, ver.activeTxs, res)); + //log("query [cntr=" + ver.cntr + ", txs=" + ver.activeTxs + ", res=" + res + ']'); + } return res; } @@ -422,46 +408,53 @@ public class MvccTestApp { /** */ @GridToStringInclude - private final ConcurrentHashMap<TxId, Long> activeTxs = new ConcurrentHashMap<>(); + private final ConcurrentHashMap8<TxId, TxId> activeTxs = new ConcurrentHashMap8<>(); CoordinatorCounter nextTxCounter(TxId txId) { - final CoordinatorCounter newCtr = new CoordinatorCounter(cntr.incrementAndGet()); + activeTxs.put(txId, txId); - Long old = activeTxs.put(txId, newCtr.cntr); + CoordinatorCounter newCtr = new CoordinatorCounter(cntr.incrementAndGet()); - assert old == null; + txId.cntr = newCtr.cntr; return newCtr; } void txDone(TxId txId) { - Long cntr = activeTxs.remove(txId); + TxId cntr = activeTxs.remove(txId); - assert cntr != null; + assert cntr != null && cntr.cntr != -1L; - commitCntr.setIfGreater(cntr); + commitCntr.setIfGreater(cntr.cntr); } MvccQueryVersion queryVersion() { + long useCntr = commitCntr.get(); + Set<TxId> txs = new HashSet<>(); Long minActive = null; - for (Map.Entry<TxId, Long> e : activeTxs.entrySet()) { + for (Map.Entry<TxId, TxId> e : activeTxs.entrySet()) { txs.add(e.getKey()); + TxId val = e.getValue(); + + while (val.cntr == -1) + Thread.yield(); + + long cntr = val.cntr; + if (minActive == null) - minActive = e.getValue(); - else if (e.getValue() < minActive) - minActive = e.getValue(); + minActive = cntr; + else if (cntr < minActive) + minActive = cntr; } - long cntr = commitCntr.get(); + if (minActive != null && minActive < useCntr) + useCntr = minActive; - if (minActive != null && minActive < cntr) - cntr = minActive; - - return new MvccQueryVersion(new CoordinatorCounter(cntr), txs); + return new MvccQueryVersion(new CoordinatorCounter(useCntr), txs); } void queryDone(CoordinatorCounter ctr) { @@ -566,6 +559,8 @@ public class MvccTestApp { * */ static class TxId { + long cntr = -1; + /** */ @GridToStringInclude final long id; @@ -655,8 +650,10 @@ public class MvccTestApp { if (ver.activeTxs.contains(val.ver.txId)) continue; - if (DEBUG_LOG) - log("get res [key=" + key + ", val=" + val.val + ", ver=" + val.ver + ']'); + if (DEBUG_LOG) { + TestDebugLog.msgs.add(new TestDebugLog.Msg3("read val", key, val, val.ver)); + //log("get res [key=" + key + ", val=" + val.val + ", ver=" + val.ver + ']'); + } return val.val; } @@ -713,7 +710,7 @@ public class MvccTestApp { class TestDebugLog { /** */ - private static final List<Object> msgs = Collections.synchronizedList(new ArrayList<>(100_000)); + static final List<Object> msgs = Collections.synchronizedList(new ArrayList<>(100_000)); /** */ private static final SimpleDateFormat DEBUG_DATE_FMT = new SimpleDateFormat("HH:mm:ss,SSS"); @@ -734,6 +731,79 @@ class TestDebugLog { } } + static class Msg2 extends Message{ + Object v1; + Object v2; + + public Msg2(String msg, Object v1, Object v2) { + super(msg); + this.v1 = v1; + this.v2 = v2; + } + public String toString() { + return "Msg [msg=" + msg + + ", v1=" + v1 + + ", v2=" + v2 + + ", msg=" + msg + + ", thread=" + thread + + ", time=" + DEBUG_DATE_FMT.format(new Date(ts)) + ']'; + } + } + + static class Msg3 extends Message{ + Object v1; + Object v2; + Object v3; + + public Msg3(String msg, Object v1, Object v2, Object v3) { + super(msg); + this.v1 = v1; + this.v2 = v2; + this.v3 = v3; + } + public String toString() { + return "Msg [msg=" + msg + + ", v1=" + v1 + + ", v2=" + v2 + + ", v3=" + v3 + + ", msg=" + msg + + ", thread=" + thread + + ", time=" + DEBUG_DATE_FMT.format(new Date(ts)) + ']'; + } + } + + static class Msg6 extends Message{ + Object v1; + Object v2; + Object v3; + Object v4; + Object v5; + Object v6; + + public Msg6(String msg, Object v1, Object v2, Object v3, Object v4, Object v5, Object v6) { + super(msg); + this.v1 = v1; + this.v2 = v2; + this.v3 = v3; + this.v4 = v4; + this.v5 = v5; + this.v6 = v6; + } + + public String toString() { + return "Msg [msg=" + msg + + ", txId=" + v1 + + ", id1=" + v2 + + ", v1=" + v3 + + ", id2=" + v4 + + ", v2=" + v5 + + ", cntr=" + v6 + + ", msg=" + msg + + ", thread=" + thread + + ", time=" + DEBUG_DATE_FMT.format(new Date(ts)) + ']'; + } + } + static class EntryMessage extends Message { Object key; Object val; @@ -775,7 +845,7 @@ class TestDebugLog { System.out.println(msg); } - public static void addEntryMessage(Object key, Object val, String msg) { + public static void addEntryMessage(Object key, Object val, Object arg, String msg) { if (key instanceof KeyCacheObject) key = ((KeyCacheObject)key).value(null, false); @@ -802,7 +872,57 @@ class TestDebugLog { } } - public static void printMessages(boolean file, Integer part) { + public static void printMessagesForThread(List<Object> msgs0, String thread0, String file) { + try { + FileOutputStream out = new FileOutputStream(file); + + PrintWriter w = new PrintWriter(out); + + for (Object msg : msgs0) { + if (msg instanceof Message) { + String thread = ((Message) msg).thread; + + if (thread.equals(thread0)) + w.println(msg.toString()); + } + } + + w.close(); + + out.close(); + } + catch (IOException e) { + e.printStackTrace(); + } + } + + public static void printMessages0(List<Object> msgs0, String file) { + try { + FileOutputStream out = new FileOutputStream(file); + + PrintWriter w = new PrintWriter(out); + + for (Object msg : msgs0) { + if (msg instanceof Message) { + String msg0 = ((Message) msg).msg; + + if (msg0.equals("tx done") || msg0.equals("update")) + + w.println(msg.toString()); + + } + } + + w.close(); + + out.close(); + } + catch (IOException e) { + e.printStackTrace(); + } + } + + public static List<Object> printMessages(boolean file, Integer part) { List<Object> msgs0; synchronized (msgs) { @@ -838,6 +958,8 @@ class TestDebugLog { for (Object msg : msgs0) System.out.println(msg); } + + return msgs0; } public static void printKeyMessages(boolean file, Object key) { @@ -892,4 +1014,37 @@ class TestDebugLog { it.remove(); } } + public static void main0(String[] args) throws Exception { + final MvccTestApp.TestCluster cluster = new MvccTestApp.TestCluster(3); + + final int ACCOUNTS = 3; + + final int START_VAL = 100_000; + + final Map<Object, Object> data = new TreeMap<>(); + + for (int i = 0; i < ACCOUNTS; i++) + data.put(i, START_VAL); + + cluster.txPutAll(data); + + cluster.txTransfer(0, 1, true); + cluster.txTransfer(0, 1, true); + cluster.txTransfer(0, 2, true); + + Map<Object, Object> getData = cluster.getAll(data.keySet()); + + int sum = 0; + + for (int i = 0; i < ACCOUNTS; i++) { + Integer val = (Integer)getData.get(i); + + sum += val; + + System.out.println("Val: " + val); + } + + System.out.println("Sum: " + sum); + } + } \ No newline at end of file
