http://git-wip-us.apache.org/repos/asf/ignite/blob/f6e98254/modules/core/src/main/java/org/apache/ignite/internal/MvccTestApp2.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/MvccTestApp2.java b/modules/core/src/main/java/org/apache/ignite/internal/MvccTestApp2.java new file mode 100644 index 0000000..397c408 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/MvccTestApp2.java @@ -0,0 +1,1708 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.PrintWriter; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.util.GridAtomicLong; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.jetbrains.annotations.NotNull; +import org.jsr166.ConcurrentHashMap8; + +/** + * + */ +public class MvccTestApp2 { + /** */ + private static final boolean DEBUG_LOG = false; + + /** */ + private static final boolean SQL = false; + + public static void main1(String[] args) throws Exception { + final TestCluster cluster = new TestCluster(1); + + final int ACCOUNTS = 3; + + final int START_VAL = 10; + + final Map<Object, Object> data = new TreeMap<>(); + + for (int i = 0; i < ACCOUNTS; i++) + data.put(i, START_VAL); + + cluster.txPutAll(data); + + cluster.txTransfer(0, 1, true); + cluster.txTransfer(0, 1, true); + cluster.txTransfer(0, 2, true); + + Map<Object, Object> vals = cluster.sqlAll(); + + System.out.println(); + + Map<Object, Object> getData = cluster.sqlAll();;//cluster.getAll(data.keySet()); + + int sum = 0; + + for (int i = 0; i < ACCOUNTS; i++) { + Integer val = (Integer)getData.get(i); + + sum += val; + + System.out.println("Val: " + val); + } + + System.out.println("Sum: " + sum); + + cluster.cleanup(); + + getData = cluster.sqlAll(); + + System.out.println(); +// +// MvccQueryVersion ver1 = cluster.crd.queryVersion(); +// MvccQueryVersion ver2 = cluster.crd.queryVersion(); +// +// cluster.crd.queryDone(ver2.cntr); +// cluster.crd.queryDone(ver1.cntr); + } + + public static void main0(String[] args) throws Exception { + final TestCluster cluster = new TestCluster(1); + + final int ACCOUNTS = 3; + + final int START_VAL = 10; + + final Map<Object, Object> data = new TreeMap<>(); + + for (int i = 0; i < ACCOUNTS; i++) + data.put(i, START_VAL); + + cluster.txPutAll(data); + + cluster.txRemoveTransfer(0, 1); + + Map<Object, Object> getData = cluster.sqlAll();;//cluster.getAll(data.keySet()); + + int sum = 0; + + for (Map.Entry<Object, Object> e : getData.entrySet()) { + Integer val = (Integer)e.getValue(); + + if (val != null) + sum += val; + + System.out.println("Val: " + val); + } + + System.out.println("Sum: " + sum); + + cluster.cleanup(); + + getData = cluster.sqlAll(); + + System.out.println(); +// +// MvccQueryVersion ver1 = cluster.crd.queryVersion(); +// MvccQueryVersion ver2 = cluster.crd.queryVersion(); +// +// cluster.crd.queryDone(ver2.cntr); +// cluster.crd.queryDone(ver1.cntr); + } + + public static void main(String[] args) throws Exception { + final AtomicBoolean err = new AtomicBoolean(); + + final int READ_THREADS = 4; + final int UPDATE_THREADS = 4; + final int ACCOUNTS = 50; + + final int START_VAL = 100000; + + for (int iter = 0; iter < 1000; iter++) { + System.out.println("Iteration [readThreads=" + READ_THREADS + + ", updateThreads=" + UPDATE_THREADS + ", accounts=" + ACCOUNTS + ", iter=" + iter + ']'); + + final TestCluster cluster = new TestCluster(1); + + final Map<Object, Object> data = new TreeMap<>(); + + for (int i = 0; i < ACCOUNTS; i++) + data.put(i, START_VAL); + + cluster.txPutAll(data); + + final AtomicBoolean stop = new AtomicBoolean(); + + List<Thread> threads = new ArrayList<>(); + + Thread cleanupThread = new Thread(new Runnable() { + @Override public void run() { + Thread.currentThread().setName("cleanup"); + + try { + while (!stop.get()) { + cluster.cleanup(); + + Thread.sleep(1); + } + } + catch (Exception e) { + e.printStackTrace(); + } + } + }); + + threads.add(cleanupThread); + + cleanupThread.start(); + + final boolean REMOVES = false; + + for (int i = 0; i < READ_THREADS; i++) { + final int id = i; + + Thread thread = new Thread(new Runnable() { + @Override public void run() { + Thread.currentThread().setName("read" + id); + + int cnt = 0; + + while (!stop.get()) { + Map<Object, Object> qryData = SQL ? cluster.sqlAll() : cluster.getAll(data.keySet()); + + cnt++; + + int sum = 0; + + if (REMOVES) { + for (Map.Entry<Object, Object> e : qryData.entrySet()) { + Integer val = (Integer)e.getValue(); + + if (val != null) + sum += val; + else + System.out.println("With null"); + } + } + else { + for (int i = 0; i < ACCOUNTS; i++) { + Integer val = (Integer)qryData.get(i); + + if (val == null) { + if (stop.compareAndSet(false, true)) { + stop.set(true); + err.set(true); + + TestDebugLog.printAllAndExit("No value for key: " + i); + } + + return; + } + + sum += val; + } + } + + if (sum != ACCOUNTS * START_VAL) { + if (stop.compareAndSet(false, true)) { + stop.set(true); + err.set(true); + + TestDebugLog.printAllAndExit("Invalid get sum: " + sum); + } + } + +// if (cnt % 100 == 0) +// System.out.println("get " + cnt); + } + + System.out.println("Get cnt: " + cnt); + } + }); + + threads.add(thread); + + thread.start(); + } + + for (int i = 0; i < UPDATE_THREADS; i++) { + final int id = i; + + Thread thread; + + if (REMOVES) { + thread = new Thread(new Runnable() { + @Override public void run() { + Thread.currentThread().setName("update" + id); + + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + while (!stop.get()) { + int id1 = rnd.nextInt(ACCOUNTS); + + int id2 = rnd.nextInt(ACCOUNTS); + + while (id2 == id1) + id2 = rnd.nextInt(ACCOUNTS); + + if (rnd.nextBoolean()) { + cluster.txRemoveTransfer(id1, id2); + } + else + cluster.txTransfer(id1, id2, rnd.nextBoolean()); + } + + } + }); + } + else { + thread = new Thread(new Runnable() { + @Override public void run() { + Thread.currentThread().setName("update" + id); + + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + while (!stop.get()) { + int id1 = rnd.nextInt(ACCOUNTS); + + int id2 = rnd.nextInt(ACCOUNTS); + + while (id2 == id1) + id2 = rnd.nextInt(ACCOUNTS); + + if (id1 > id2) { + int tmp = id1; + id1 = id2; + id2 = tmp; + } + + cluster.txTransfer(id1, id2, rnd.nextBoolean()); + } + + } + }); + } + + threads.add(thread); + + thread.start(); + } + + long endTime = System.currentTimeMillis() + 2_000; + + while (!stop.get()) { + Thread.sleep(1000); + + if (System.currentTimeMillis() >= endTime) + break; + + //cluster.dumpMvccInfo(); + } + + stop.set(true); + + for (Thread thread : threads) + thread.join(); + + Map<Object, Object> qryData = SQL ? cluster.sqlAll() : cluster.getAll(data.keySet()); + + int sum = 0; + + for (int i = 0; i < ACCOUNTS; i++) { + Integer val = (Integer)qryData.get(i); + + System.out.println("Val " + val); + + if (val != null) + sum += val; + } + + System.out.println("Sum=" + sum + ", expSum=" + (ACCOUNTS * START_VAL)); + + if (err.get()) { + System.out.println("Error!"); + + System.exit(1); + } + +// cluster.dumpMvccInfo(); +// +// System.out.println("Cleanup"); +// +// cluster.cleanup(); +// +// cluster.dumpMvccInfo(); + + TestDebugLog.clear(); + } + } + + /** + * + */ + static class TestCluster { + /** */ + final List<Node> nodes = new ArrayList<>(); + + /** */ + final Coordinator crd; + + /** */ + final AtomicLong txIdGen = new AtomicLong(10_000); + + TestCluster(int nodesNum) { + crd = new Coordinator(); + + for (int i = 0; i < nodesNum; i++) + nodes.add(new Node(i)); + } + + void cleanup() { + CoordinatorCounter cntr = crd.cleanupVersion(); + + for (Node node : nodes) + node.dataStore.cleanup(cntr); + } + + void txPutAll(Map<Object, Object> data) { + TxId txId = new TxId(txIdGen.incrementAndGet()); + + Map<Object, Node> mappedEntries = new LinkedHashMap<>(); + + for (Object key : data.keySet()) { + int nodeIdx = nodeForKey(key); + + Node node = nodes.get(nodeIdx); + + node.dataStore.lockEntry(key); + + mappedEntries.put(key, node); + } + + CoordinatorCounter cntr = crd.nextTxCounter(txId); + + MvccUpdateVersion mvccVer = new MvccUpdateVersion(cntr, txId); + + for (Map.Entry<Object, Node> e : mappedEntries.entrySet()) { + Node node = e.getValue(); + + node.dataStore.updateEntry(e.getKey(), data.get(e.getKey()), mvccVer); + } + + for (Map.Entry<Object, Node> e : mappedEntries.entrySet()) { + Node node = e.getValue(); + + node.dataStore.unlockEntry(e.getKey()); + } + + crd.txDone(txId, cntr.cntr); + } + + void txTransfer(Integer id1, Integer id2, boolean fromFirst) { + TreeSet<Integer> keys = new TreeSet<>(); + + keys.add(id1); + keys.add(id2); + + TxId txId = new TxId(txIdGen.incrementAndGet()); + + Map<Object, Node> mappedEntries = new LinkedHashMap<>(); + + Map<Object, Object> vals = new HashMap<>(); + + for (Object key : keys) { + int nodeIdx = nodeForKey(key); + + Node node = nodes.get(nodeIdx); + + node.dataStore.lockEntry(key); + + vals.put(key, node.dataStore.lastValue(key)); + + mappedEntries.put(key, node); + } + + CoordinatorCounter cntr = crd.nextTxCounter(txId); + + Integer curVal1 = (Integer)vals.get(id1); + Integer curVal2 = (Integer)vals.get(id2); + + boolean update = false; + + Integer newVal1 = null; + Integer newVal2 = null; + + if (curVal1 != null && curVal2 != null) { + if (fromFirst) { + if (curVal1 > 0) { + update = true; + + newVal1 = curVal1 - 1; + newVal2 = curVal2 + 1; + } + } + else { + if (curVal2 > 0) { + update = true; + + newVal1 = curVal1 + 1; + newVal2 = curVal2 - 1; + } + } + } + + if (update) { + Map<Object, Object> newVals = new HashMap<>(); + + newVals.put(id1, newVal1); + newVals.put(id2, newVal2); + + MvccUpdateVersion mvccVer = new MvccUpdateVersion(cntr, txId); + + if (DEBUG_LOG) { + TestDebugLog.msgs.add(new TestDebugLog.Msg6("update", txId, id1, newVal1, id2, newVal2, cntr)); + } + + for (Map.Entry<Object, Node> e : mappedEntries.entrySet()) { + Node node = e.getValue(); + + node.dataStore.updateEntry(e.getKey(), newVals.get(e.getKey()), mvccVer); + } + + for (Map.Entry<Object, Node> e : mappedEntries.entrySet()) { + Node node = e.getValue(); + + node.dataStore.unlockEntry(e.getKey()); + } + } + else { + for (Map.Entry<Object, Node> e : mappedEntries.entrySet()) { + Node node = e.getValue(); + + node.dataStore.unlockEntry(e.getKey()); + } + } + + crd.txDone(txId, cntr.cntr); + +// if (DEBUG_LOG) +// TestDebugLog.msgs.add(new TestDebugLog.Msg2("tx done", txId, cntr.cntr)); + } + + void txRemoveTransfer(Integer from, Integer to) { + TreeSet<Integer> keys = new TreeSet<>(); + + keys.add(from); + keys.add(to); + + TxId txId = new TxId(txIdGen.incrementAndGet()); + + Map<Object, Node> mappedEntries = new LinkedHashMap<>(); + + Map<Object, Object> vals = new HashMap<>(); + + for (Object key : keys) { + int nodeIdx = nodeForKey(key); + + Node node = nodes.get(nodeIdx); + + node.dataStore.lockEntry(key); + + vals.put(key, node.dataStore.lastValue(key)); + + mappedEntries.put(key, node); + } + + CoordinatorCounter cntr = crd.nextTxCounter(txId); + + Integer fromVal = (Integer)vals.get(from); + Integer toVal = (Integer)vals.get(to); + + boolean update = fromVal != null && toVal != null; + + if (update) { + Map<Object, Object> newVals = new HashMap<>(); + + newVals.put(from, null); + newVals.put(to, fromVal + toVal); + + MvccUpdateVersion mvccVer = new MvccUpdateVersion(cntr, txId); + + if (DEBUG_LOG) { + TestDebugLog.msgs.add(new TestDebugLog.Msg6("remove", txId, from, fromVal, to, toVal, cntr)); + } + + for (Map.Entry<Object, Node> e : mappedEntries.entrySet()) { + Node node = e.getValue(); + + node.dataStore.updateEntry(e.getKey(), newVals.get(e.getKey()), mvccVer); + } + + for (Map.Entry<Object, Node> e : mappedEntries.entrySet()) { + Node node = e.getValue(); + + node.dataStore.unlockEntry(e.getKey()); + } + } + else { + for (Map.Entry<Object, Node> e : mappedEntries.entrySet()) { + Node node = e.getValue(); + + node.dataStore.unlockEntry(e.getKey()); + } + } + + crd.txDone(txId, cntr.cntr); + + if (DEBUG_LOG) + TestDebugLog.msgs.add(new TestDebugLog.Msg2("tx done", txId, cntr.cntr)); + } + + public void dumpMvccInfo() { + for (Node node : nodes) { + int sql = node.dataStore.mvccSqlIdx.size(); + + for (Map.Entry<Object, MvccValue> e : node.dataStore.mainIdx.entrySet()) { + List<MvccValue> list = node.dataStore.mvccIdx.get(e.getKey()); + + int size = 0; + + if (list != null) { + synchronized (list) { + size = list.size(); + } + } + + System.out.println("Mvcc info [key=" + e.getKey() + + ", val=" + e.getValue() + + ", mvccVals=" + size + + ", sqlVals=" + sql + ']'); + } + } + } + + public Map<Object, Object> sqlAll() { + MvccQueryVersion qryVer = crd.queryVersion(); + + Map<Object, Object> res = new HashMap<>(); + + for (Node node : nodes) { + Map<Object, Object> nodeRes = node.dataStore.sqlQuery(qryVer); + + res.putAll(nodeRes); + } + + crd.queryDone(qryVer.cntr); + + if (DEBUG_LOG) { + TestDebugLog.msgs.add(new TestDebugLog.Msg3("sqlAll", qryVer.cntr, qryVer.activeTxs, res)); + } + + return res; + } + + public Map<Object, Object> getAll(Set<?> keys) { + MvccQueryVersion qryVer = crd.queryVersion(); + + Map<Object, Object> res = new HashMap<>(); + + for (Object key : keys) { + int nodeIdx = nodeForKey(key); + + Node node = nodes.get(nodeIdx); + + Object val = node.dataStore.get(key, qryVer); + + res.put(key, val); + } + + crd.queryDone(qryVer.cntr); + + if (DEBUG_LOG) { + TestDebugLog.msgs.add(new TestDebugLog.Msg3("getAll", qryVer.cntr, qryVer.activeTxs, res)); + } + + return res; + } + + private int nodeForKey(Object key) { + return U.safeAbs(key.hashCode()) % nodes.size(); + } + } + + /** + * + */ + static class Node { + /** */ + final DataStore dataStore; + + /** */ + final int nodexIdx; + + public Node(int nodexIdx) { + this.nodexIdx = nodexIdx; + + dataStore = new DataStore(); + } + + @Override public String toString() { + return "Node [idx=" + nodexIdx + ']'; + } + } + + /** + * + */ + static class Coordinator { + /** */ + private final AtomicLong cntr = new AtomicLong(-1); + + /** */ + private final GridAtomicLong commitCntr = new GridAtomicLong(-1); + + /** */ + private final ConcurrentHashMap8<Long, QueryCounter> activeQueries = new ConcurrentHashMap8<>(); + + /** */ + @GridToStringInclude + private final ConcurrentHashMap8<TxId, Long> activeTxs = new ConcurrentHashMap8<>(); + + CoordinatorCounter nextTxCounter(TxId txId) { + long cur = cntr.get(); + + activeTxs.put(txId, cur + 1); + + CoordinatorCounter newCtr = new CoordinatorCounter(cntr.incrementAndGet()); + + return newCtr; + } + + void txDone(TxId txId, long cntr) { + Long rmvd = activeTxs.remove(txId); + + assert rmvd != null; + + commitCntr.setIfGreater(cntr); + } + + private GridAtomicLong minActive0 = new GridAtomicLong(0); + + private Long minActive(Set<TxId> txs) { + Long minActive = null; + + for (Map.Entry<TxId, Long> e : activeTxs.entrySet()) { + if (txs != null) + txs.add(e.getKey()); + +// TxId val = e.getValue(); +// +// while (val.cntr == -1) +// Thread.yield(); + + long cntr = e.getValue(); + + if (minActive == null) + minActive = cntr; + else if (cntr < minActive) + minActive = cntr; + } + + if (minActive != null) { + if (!minActive0.setIfGreater(minActive)) + return minActive0.get(); + } + + return minActive; + } + + static class QueryCounter extends AtomicInteger { + public QueryCounter(int initialValue) { + super(initialValue); + } + + boolean increment2() { + for (;;) { + int current = get(); + int next = current + 1; + + if (current == 0) + return false; + + if (compareAndSet(current, next)) + return true; + } + } + } + + private ReadWriteLock rwLock = new ReentrantReadWriteLock(); + + MvccQueryVersion queryVersion() { + rwLock.readLock().lock(); + + long useCntr = commitCntr.get(); + + Set<TxId> txs = new HashSet<>(); + + Long minActive = minActive(txs); + + if (minActive != null && minActive < useCntr) + useCntr = minActive - 1; + + MvccQueryVersion qryVer = new MvccQueryVersion(new CoordinatorCounter(useCntr), txs); + + for (;;) { + QueryCounter qryCnt = activeQueries.get(useCntr); + + if (qryCnt != null) { + boolean inc = qryCnt.increment2(); + + if (!inc) { + activeQueries.remove(useCntr, qryCnt); + + continue; + } + } + else { + qryCnt = new QueryCounter(1); + + if (activeQueries.putIfAbsent(useCntr, qryCnt) != null) + continue; + } + + break; + } + + rwLock.readLock().unlock(); + + return qryVer; + } + + void queryDone(CoordinatorCounter cntr) { + AtomicInteger qryCnt = activeQueries.get(cntr.cntr); + + assert qryCnt != null : cntr.cntr; + + int left = qryCnt.decrementAndGet(); + + assert left >= 0 : left; + + if (left == 0) + activeQueries.remove(cntr.cntr, qryCnt); + } + + CoordinatorCounter cleanupVersion() { + rwLock.writeLock().lock(); + + long useCntr = commitCntr.get(); + + Long minActive = minActive(null); + + if (minActive != null && minActive < useCntr) + useCntr = minActive - 1; + + for (Long qryCntr : activeQueries.keySet()) { + if (qryCntr <= useCntr) + useCntr = qryCntr - 1; + } + + rwLock.writeLock().unlock(); + + return new CoordinatorCounter(useCntr); + } + + @Override public String toString() { + return S.toString(Coordinator.class, this); + } + } + + /** + * + */ + static class CoordinatorCounter implements Comparable<CoordinatorCounter> { + /** */ + private final long topVer; // TODO + + /** */ + private final long cntr; + + CoordinatorCounter(long cntr) { + this.topVer = 1; + this.cntr = cntr; + } + + @Override public int compareTo(CoordinatorCounter o) { + return Long.compare(cntr, o.cntr); + } + + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + CoordinatorCounter that = (CoordinatorCounter)o; + + return cntr == that.cntr; + } + + @Override public int hashCode() { + return (int)(cntr ^ (cntr >>> 32)); + } + + @Override public String toString() { + return "Cntr [c=" + cntr + ']'; + } + } + + /** + * + */ + static class MvccUpdateVersion { + /** */ + @GridToStringInclude + final CoordinatorCounter cntr; + + /** */ + @GridToStringInclude + final TxId txId; + + /** + * @param cntr + */ + MvccUpdateVersion(CoordinatorCounter cntr, TxId txId) { + assert cntr != null; + + this.cntr = cntr; + this.txId = txId; + } + + @Override public String toString() { + return S.toString(MvccUpdateVersion.class, this); + } + } + + /** + * + */ + static class MvccQueryVersion { + /** */ + @GridToStringInclude + final CoordinatorCounter cntr; + + /** */ + @GridToStringInclude + final Collection<TxId> activeTxs; + + MvccQueryVersion(CoordinatorCounter cntr, Collection<TxId> activeTxs) { + this.cntr = cntr; + this.activeTxs = activeTxs; + } + + @Override public String toString() { + return S.toString(MvccQueryVersion.class, this); + } + } + + /** + * + */ + static class TxId { + /** */ + @GridToStringInclude + final long id; + + TxId(long id) { + this.id = id; + } + + @Override public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + TxId txId = (TxId) o; + + return id == txId.id; + } + + @Override public int hashCode() { + return (int) (id ^ (id >>> 32)); + } + + @Override public String toString() { + return S.toString(TxId.class, this); + } + } + + /** + * + */ + static class SqlKey implements Comparable<SqlKey> { + /** */ + final Comparable key; + + /** */ + final Comparable val; + + /** */ + final CoordinatorCounter cntr; + + public SqlKey(Object key, Object val, CoordinatorCounter cntr) { + this.key = (Comparable)key; + this.val = (Comparable)val; + this.cntr = cntr; + } + + @Override public int compareTo(@NotNull SqlKey o) { + int cmp; + + if (val != null && o.val != null) + cmp = val.compareTo(o.val); + else { + if (val != null) + cmp = 1; + else + cmp = o.val == null ? 0 : -1; + } + + + if (cmp == 0) { + cmp = key.compareTo(o.key); + + if (cmp == 0) + cmp = cntr.compareTo(o.cntr); + } + + return cmp; + } + + @Override public String toString() { + return "SqlKey [key=" + key + ", val=" + val + ']'; + } + } + + /** + * + */ + static class DataStore { + /** */ + private final ConcurrentHashMap<Object, ReentrantLock> locks = new ConcurrentHashMap<>(); + + /** */ + final ConcurrentHashMap<Object, MvccValue> mainIdx = new ConcurrentHashMap<>(); + + /** */ + final ConcurrentHashMap<Object, List<MvccValue>> mvccIdx = new ConcurrentHashMap<>(); + + /** */ + final ConcurrentSkipListMap<SqlKey, MvccSqlValue> mvccSqlIdx = new ConcurrentSkipListMap<>(); + + void cleanup(CoordinatorCounter cleanupCntr) { + for (Map.Entry<Object, List<MvccValue>> e : mvccIdx.entrySet()) { + lockEntry(e.getKey()); + + try { + List<MvccValue> list = e.getValue(); + + synchronized (list) { + for (int i = list.size() - 1; i >= 0; i--) { + MvccValue val = list.get(i); + + if (val.ver.cntr.compareTo(cleanupCntr) <= 0) { + if (DEBUG_LOG) { + TestDebugLog.msgs.add(new TestDebugLog.Msg6_1("cleanup", + e.getKey(), val.val, val.ver, cleanupCntr.cntr, null, null)); + } + + MvccValue prev; + + if (val.val != null) + prev = mainIdx.put(e.getKey(), val); + else + prev = mainIdx.remove(e.getKey()); + + if (prev != null) { + SqlKey key = new SqlKey(e.getKey(), prev.val, prev.ver.cntr); + + MvccSqlValue old = mvccSqlIdx.remove(key); + + assert old != null; + } + + for (int j = 0; j <= i; j++) { + MvccValue rmvd = list.remove(0); + + assert rmvd != null; + + if (j != i || rmvd.val == null) { + SqlKey key = new SqlKey(e.getKey(), rmvd.val, rmvd.ver.cntr); + + MvccSqlValue old = mvccSqlIdx.remove(key); + + assert old != null; + } + } + + if (list.isEmpty()) + mvccIdx.remove(e.getKey()); + + break; + } + } + } + } + finally { + unlockEntry(e.getKey()); + } + } + } + + void lockEntry(Object key) { + ReentrantLock e = lock(key); + + e.lock(); + } + + void unlockEntry(Object key) { + ReentrantLock e = lock(key); + + e.unlock(); + } + + void updateEntry(Object key, Object val, MvccUpdateVersion ver) { + List<MvccValue> list = mvccIdx.get(key); + + if (list == null) { + Object old = mvccIdx.putIfAbsent(key, list = new ArrayList<>()); + + assert old == null; + } + + MvccValue prevVal = null; + + synchronized (list) { + if (!list.isEmpty()) + prevVal = list.get(list.size() - 1); + + list.add(new MvccValue(val, ver)); + } + + if (prevVal == null) + prevVal = mainIdx.get(key); + + if (prevVal != null) { + SqlKey prevKey = new SqlKey(key, prevVal.val, prevVal.ver.cntr); + + MvccSqlValue old = + mvccSqlIdx.put(prevKey, new MvccSqlValue(prevVal.val, prevVal.ver, ver)); + + assert old != null; + } + + mvccSqlIdx.put(new SqlKey(key, val, ver.cntr), new MvccSqlValue(val, ver, null)); + } + + Object lastValue(Object key) { + List<MvccValue> list = mvccIdx.get(key); + + if (list != null) { + synchronized (list) { + if (list.size() > 0) + return list.get(list.size() - 1).val; + } + } + + MvccValue val = mainIdx.get(key); + + return val != null ? val.val : null; + } + + Map<Object, Object> sqlQuery(MvccQueryVersion qryVer) { + Map<Object, Object> res = new HashMap<>(); + + for (Map.Entry<SqlKey, MvccSqlValue> e : mvccSqlIdx.entrySet()) { + MvccSqlValue val = e.getValue(); + + if (!versionVisible(val.ver, qryVer)) { + if (DEBUG_LOG) { + TestDebugLog.msgs.add(new TestDebugLog.Msg3("sql skip mvcc val", e.getKey().key, val.val, val.ver)); + } + + continue; + } + + MvccUpdateVersion newVer = val.newVer; + + if (newVer != null && versionVisible(newVer, qryVer)) { + if (DEBUG_LOG) { + TestDebugLog.msgs.add(new TestDebugLog.Msg4("sql skip mvcc val2", e.getKey().key, val.val, val.ver, val.newVer)); + } + + continue; + } + + Object old = res.put(e.getKey().key, e.getValue().val); + + if (DEBUG_LOG) { + //TestDebugLog.msgs.add(new TestDebugLog.Msg4("sql get mvcc val", e.getKey().key, val.val, val.ver, val.newVer)); + } + + if (old != null) { + TestDebugLog.printAllAndExit("Already has value for key [key=" + e.getKey().key + + ", qryVer=" + qryVer + + ", oldVal=" + old + + ", newVal=" + e.getValue().val + + ']'); + } + + assert old == null; + } + + return res; + } + + private boolean versionVisible(MvccUpdateVersion ver, MvccQueryVersion qryVer) { + int cmp = ver.cntr.compareTo(qryVer.cntr); + + return cmp <= 0;// && !qryVer.activeTxs.contains(ver.txId); + } + + Object get(Object key, MvccQueryVersion ver) { + List<MvccValue> list = mvccIdx.get(key); + + if (list != null) { + synchronized (list) { + for (int i = list.size() - 1; i >= 0; i--) { + MvccValue val = list.get(i); + + if (!versionVisible(val.ver, ver)) + continue; + + if (DEBUG_LOG) { + TestDebugLog.msgs.add(new TestDebugLog.Msg3("read mvcc val", key, val.val, val.ver)); + } + + return val.val; + } + } + } + + MvccValue val = mainIdx.get(key); + + if (val != null) { + int cmp = val.ver.cntr.compareTo(ver.cntr); + + if (DEBUG_LOG) { + if (cmp > 0) { + synchronized (TestDebugLog.msgs) { + TestDebugLog.msgs.add(new TestDebugLog.Message("Committed [key=" + key + ", ver=" + val.ver + ", qryVer=" + ver.cntr + ']')); + + TestDebugLog.printAllAndExit("Committed [key=" + key + ", ver=" + val.ver + ", qryVer=" + ver + ']'); + } + } + } + + assert cmp <= 0 : "Committed [ver=" + val.ver + ", qryVer=" + ver.cntr + ']'; + + if (DEBUG_LOG) + TestDebugLog.msgs.add(new TestDebugLog.Msg3("read comitted val", key, val, val.ver)); + } + else { + if (DEBUG_LOG) + TestDebugLog.msgs.add(new TestDebugLog.Msg3("read comitted null", key, null, null)); + } + + return val != null ? val.val : null; + } + + private ReentrantLock lock(Object key) { + ReentrantLock e = locks.get(key); + + if (e == null) { + ReentrantLock old = locks.putIfAbsent(key, e = new ReentrantLock()); + + if (old != null) + e = old; + } + + return e; + } + } + + /** + * + */ + static class MvccValue { + /** */ + @GridToStringInclude + final Object val; + + /** */ + @GridToStringInclude + final MvccUpdateVersion ver; + + MvccValue(Object val, MvccUpdateVersion ver) { + assert ver != null; + + this.val = val; + this.ver = ver; + } + + @Override public String toString() { + return S.toString(MvccValue.class, this); + } + } + + /** + * + */ + static class MvccSqlValue { + /** */ + @GridToStringInclude + final Object val; + + /** */ + @GridToStringInclude + final MvccUpdateVersion ver; + + /** */ + @GridToStringInclude + final MvccUpdateVersion newVer; + + MvccSqlValue(Object val, MvccUpdateVersion ver, MvccUpdateVersion newVer) { + assert ver != null; + + this.val = val; + this.ver = ver; + this.newVer = newVer; + } + + @Override public String toString() { + return S.toString(MvccSqlValue.class, this); + } + } + + static void log(String msg) { + System.out.println(Thread.currentThread() + ": " + msg); + } + + static class TestDebugLog { + /** */ + //static final List<Object> msgs = Collections.synchronizedList(new ArrayList<>(1_000_000)); + static final ConcurrentLinkedQueue<Object> msgs = new ConcurrentLinkedQueue<>(); + + + + /** */ + private static final SimpleDateFormat DEBUG_DATE_FMT = new SimpleDateFormat("HH:mm:ss,SSS"); + + static class Message { + String thread = Thread.currentThread().getName(); + + String msg; + + long ts = U.currentTimeMillis(); + + public Message(String msg) { + this.msg = msg; + } + + public String toString() { + return "Msg [msg=" + msg + ", thread=" + thread + ", time=" + DEBUG_DATE_FMT.format(new Date(ts)) + ']'; + } + } + + static class Msg2 extends Message{ + Object v1; + Object v2; + + public Msg2(String msg, Object v1, Object v2) { + super(msg); + this.v1 = v1; + this.v2 = v2; + } + public String toString() { + return "Msg [msg=" + msg + + ", v1=" + v1 + + ", v2=" + v2 + + ", msg=" + msg + + ", thread=" + thread + + ", time=" + DEBUG_DATE_FMT.format(new Date(ts)) + ']'; + } + } + + static class Msg3 extends Message{ + Object v1; + Object v2; + Object v3; + + public Msg3(String msg, Object v1, Object v2, Object v3) { + super(msg); + this.v1 = v1; + this.v2 = v2; + this.v3 = v3; + } + public String toString() { + return "Msg [msg=" + msg + + ", v1=" + v1 + + ", v2=" + v2 + + ", v3=" + v3 + + ", thread=" + thread + + ", time=" + DEBUG_DATE_FMT.format(new Date(ts)) + ']'; + } + } + + static class Msg4 extends Message{ + Object v1; + Object v2; + Object v3; + Object v4; + + public Msg4(String msg, Object v1, Object v2, Object v3, Object v4) { + super(msg); + this.v1 = v1; + this.v2 = v2; + this.v3 = v3; + this.v4 = v4; + } + + public String toString() { + return "Msg [msg=" + msg + + ", v1=" + v1 + + ", v2=" + v2 + + ", v3=" + v3 + + ", v4=" + v4 + + ", thread=" + thread + + ", time=" + DEBUG_DATE_FMT.format(new Date(ts)) + ']'; + } + } + + static class Msg6 extends Message{ + Object v1; + Object v2; + Object v3; + Object v4; + Object v5; + Object v6; + + public Msg6(String msg, Object v1, Object v2, Object v3, Object v4, Object v5, Object v6) { + super(msg); + this.v1 = v1; + this.v2 = v2; + this.v3 = v3; + this.v4 = v4; + this.v5 = v5; + this.v6 = v6; + } + + public String toString() { + return "Msg [msg=" + msg + + ", txId=" + v1 + + ", id1=" + v2 + + ", v1=" + v3 + + ", id2=" + v4 + + ", v2=" + v5 + + ", cntr=" + v6 + + ", thread=" + thread + + ", time=" + DEBUG_DATE_FMT.format(new Date(ts)) + ']'; + } + } + static class Msg6_1 extends Message{ + Object v1; + Object v2; + Object v3; + Object v4; + Object v5; + Object v6; + + public Msg6_1(String msg, Object v1, Object v2, Object v3, Object v4, Object v5, Object v6) { + super(msg); + this.v1 = v1; + this.v2 = v2; + this.v3 = v3; + this.v4 = v4; + this.v5 = v5; + this.v6 = v6; + } + + public String toString() { + return "Msg [msg=" + msg + + ", key=" + v1 + + ", val=" + v2 + + ", ver=" + v3 + + ", cleanupC=" + v4 + + ", thread=" + thread + + ", time=" + DEBUG_DATE_FMT.format(new Date(ts)) + ']'; + } + } + + static class EntryMessage extends Message { + Object key; + Object val; + + public EntryMessage(Object key, Object val, String msg) { + super(msg); + + this.key = key; + this.val = val; + } + + public String toString() { + return "EntryMsg [key=" + key + ", val=" + val + ", msg=" + msg + ", thread=" + thread + ", time=" + DEBUG_DATE_FMT.format(new Date(ts)) + ']'; + } + } + + static class PartMessage extends Message { + int p; + Object val; + + public PartMessage(int p, Object val, String msg) { + super(msg); + + this.p = p; + this.val = val; + } + + public String toString() { + return "PartMessage [p=" + p + ", val=" + val + ", msg=" + msg + ", thread=" + thread + ", time=" + DEBUG_DATE_FMT.format(new Date(ts)) + ']'; + } + } + + static final boolean out = false; + + public static void addMessage(String msg) { + msgs.add(new Message(msg)); + + if (out) + System.out.println(msg); + } + + public static void addEntryMessage(Object key, Object val, String msg) { + if (key instanceof KeyCacheObject) + key = ((KeyCacheObject)key).value(null, false); + + EntryMessage msg0 = new EntryMessage(key, val, msg); + + msgs.add(msg0); + + if (out) { + System.out.println(msg0.toString()); + + System.out.flush(); + } + } + + public static void addPartMessage(int p, Object val, String msg) { + PartMessage msg0 = new PartMessage(p, val, msg); + + msgs.add(msg0); + + if (out) { + System.out.println(msg0.toString()); + + System.out.flush(); + } + } + + static void printAllAndExit(String msg) { + System.out.println(msg); + + TestDebugLog.addMessage(msg); + + List<Object> msgs = TestDebugLog.printMessages(true, null); + + TestDebugLog.printMessages0(msgs, "test_debug_update.txt"); + + TestDebugLog.printMessagesForThread(msgs, Thread.currentThread().getName(), "test_debug_thread.txt"); + + System.exit(1); + } + + public static void printMessagesForThread(List<Object> msgs0, String thread0, String file) { + try { + FileOutputStream out = new FileOutputStream(file); + + PrintWriter w = new PrintWriter(out); + + for (Object msg : msgs0) { + if (msg instanceof Message) { + String thread = ((Message) msg).thread; + + if (thread.equals(thread0)) + w.println(msg.toString()); + } + } + + w.close(); + + out.close(); + } + catch (IOException e) { + e.printStackTrace(); + } + } + + public static void printMessages0(List<Object> msgs0, String file) { + try { + FileOutputStream out = new FileOutputStream(file); + + PrintWriter w = new PrintWriter(out); + + for (Object msg : msgs0) { + if (msg instanceof Message) { + String msg0 = ((Message) msg).msg; + + if (msg0.equals("tx done") || msg0.equals("update") || msg0.equals("cleanup")) + w.println(msg.toString()); + } + } + + w.close(); + + out.close(); + } + catch (IOException e) { + e.printStackTrace(); + } + } + + public static List<Object> printMessages(boolean file, Integer part) { + List<Object> msgs0; + + synchronized (msgs) { + msgs0 = new ArrayList<>(msgs); + + msgs.clear(); + } + + if (file) { + try { + FileOutputStream out = new FileOutputStream("test_debug.log"); + + PrintWriter w = new PrintWriter(out); + + for (Object msg : msgs0) { + if (part != null && msg instanceof PartMessage) { + if (((PartMessage) msg).p != part) + continue; + } + + w.println(msg.toString()); + } + + w.close(); + + out.close(); + } + catch (IOException e) { + e.printStackTrace(); + } + } + else { + for (Object msg : msgs0) + System.out.println(msg); + } + + return msgs0; + } + + public static void printKeyMessages(boolean file, Object key) { + List<Object> msgs0; + + synchronized (msgs) { + msgs0 = new ArrayList<>(msgs); + + msgs.clear(); + } + + if (file) { + try { + FileOutputStream out = new FileOutputStream("test_debug.log"); + + PrintWriter w = new PrintWriter(out); + + for (Object msg : msgs0) { + if (msg instanceof EntryMessage && !((EntryMessage)msg).key.equals(key)) + continue; + + w.println(msg.toString()); + } + + w.close(); + + out.close(); + } + catch (IOException e) { + e.printStackTrace(); + } + } + else { + for (Object msg : msgs0) { + if (msg instanceof EntryMessage && !((EntryMessage)msg).key.equals(key)) + continue; + + System.out.println(msg); + } + } + } + + public static void clear() { + msgs.clear(); + } + + public static void clearEntries() { + for (Iterator it = msgs.iterator(); it.hasNext();) { + Object msg = it.next(); + + if (msg instanceof EntryMessage) + it.remove(); + } + } + + }}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f6e98254/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index 97e06bf..eae435e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -102,6 +102,12 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFi import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearUnlockRequest; +import org.apache.ignite.internal.processors.cache.mvcc.CoordinatorMvccCounterResponse; +import org.apache.ignite.internal.processors.cache.mvcc.CoordinatorQueryAckRequest; +import org.apache.ignite.internal.processors.cache.mvcc.CoordinatorQueryCounterRequest; +import org.apache.ignite.internal.processors.cache.mvcc.CoordinatorTxAckRequest; +import org.apache.ignite.internal.processors.cache.mvcc.CoordinatorTxAckResponse; +import org.apache.ignite.internal.processors.cache.mvcc.CoordinatorTxCounterRequest; import org.apache.ignite.internal.processors.cache.query.GridCacheQueryRequest; import org.apache.ignite.internal.processors.cache.query.GridCacheQueryResponse; import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery; @@ -875,6 +881,36 @@ public class GridIoMessageFactory implements MessageFactory { break; + case 129: + msg = new CoordinatorTxCounterRequest(); + + break; + + case 130: + msg = new CoordinatorMvccCounterResponse(); + + break; + + case 131: + msg = new CoordinatorTxAckRequest(); + + break; + + case 132: + msg = new CoordinatorTxAckResponse(); + + break; + + case 133: + msg = new CoordinatorQueryCounterRequest(); + + break; + + case 134: + msg = new CoordinatorQueryAckRequest(); + + break; + // [-3..119] [124..128] [-23..-27] [-36..-55]- this // [120..123] - DR http://git-wip-us.apache.org/repos/asf/ignite/blob/f6e98254/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java index 5e2c8db..19bd05d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java @@ -237,6 +237,9 @@ class ClusterCachesInfo { CU.checkAttributeMismatch(log, rmtAttr.groupName(), rmt, "groupName", "Cache group name", locAttr.groupName(), rmtAttr.groupName(), true); + CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "mvccEnabled", "MVCC mode", + locAttr.mvccEnabled(), rmtAttr.mvccEnabled(), true); + if (rmtAttr.cacheMode() != LOCAL) { CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "interceptor", "Cache Interceptor", locAttr.interceptorClassName(), rmtAttr.interceptorClassName(), true); http://git-wip-us.apache.org/repos/asf/ignite/blob/f6e98254/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAttributes.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAttributes.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAttributes.java index d64ee8b..c1f03fa 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAttributes.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAttributes.java @@ -329,6 +329,13 @@ public class GridCacheAttributes implements Serializable { } /** + * @return MVCC enabled flag. + */ + public boolean mvccEnabled() { + return ccfg.isMvccEnabled(); + } + + /** * @param obj Object to get class of. * @return Class name or {@code null}. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/f6e98254/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java index b6faf47..b504625 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java @@ -2033,6 +2033,13 @@ public class GridCacheContext<K, V> implements Externalizable { } /** + * @return {@code True} if mvcc is enabled for cache. + */ + public boolean mvccEnabled() { + return cacheCfg.isMvccEnabled(); + } + + /** * @param part Partition. * @param affNodes Affinity nodes. * @param topVer Topology version. http://git-wip-us.apache.org/repos/asf/ignite/blob/f6e98254/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index bd950fa..030f7e7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -86,6 +86,7 @@ import org.apache.ignite.internal.processors.cache.dr.GridCacheDrManager; import org.apache.ignite.internal.processors.cache.jta.CacheJtaManagerAdapter; import org.apache.ignite.internal.processors.cache.local.GridLocalCache; import org.apache.ignite.internal.processors.cache.local.atomic.GridLocalAtomicCache; +import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsSharedManager; import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager; import org.apache.ignite.internal.processors.cache.persistence.MemoryPolicy; @@ -456,10 +457,16 @@ public class GridCacheProcessor extends GridProcessorAdapter { ctx.igfsHelper().validateCacheConfiguration(cc); - if (cc.getAtomicityMode() == ATOMIC) + if (cc.getAtomicityMode() == ATOMIC) { assertParameter(cc.getTransactionManagerLookupClassName() == null, "transaction manager can not be used with ATOMIC cache"); + assertParameter(!cc.isMvccEnabled(), "MVCC can not used with ATOMIC cache"); + } + + if (cc.getCacheMode() == LOCAL) + assertParameter(!cc.isMvccEnabled(), "MVCC can not used with LOCAL cache"); + if (cc.getEvictionPolicy() != null && !cc.isOnheapCacheEnabled()) throw new IgniteCheckedException("Onheap cache must be enabled if eviction policy is configured [cacheName=" + U.maskName(cc.getName()) + "]"); @@ -2170,6 +2177,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { @SuppressWarnings("unchecked") private GridCacheSharedContext createSharedContext(GridKernalContext kernalCtx, Collection<CacheStoreSessionListener> storeSesLsnrs) throws IgniteCheckedException { + CacheCoordinatorsSharedManager coord = new CacheCoordinatorsSharedManager(); IgniteTxManager tm = new IgniteTxManager(); GridCacheMvccManager mvccMgr = new GridCacheMvccManager(); GridCacheVersionManager verMgr = new GridCacheVersionManager(); @@ -2208,6 +2216,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { return new GridCacheSharedContext( kernalCtx, + coord, tm, verMgr, mvccMgr, http://git-wip-us.apache.org/repos/asf/ignite/blob/f6e98254/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java index 82d960a..09c8b1a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java @@ -46,6 +46,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartit import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; import org.apache.ignite.internal.processors.cache.jta.CacheJtaManagerAdapter; +import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsSharedManager; import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager; import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteCacheSnapshotManager; import org.apache.ignite.internal.processors.cache.store.CacheStoreManager; @@ -122,6 +123,9 @@ public class GridCacheSharedContext<K, V> { /** Ttl cleanup manager. */ private GridCacheSharedTtlCleanupManager ttlMgr; + /** Cache mvcc coordinator. */ + private CacheCoordinatorsSharedManager coord; + /** Cache contexts map. */ private ConcurrentHashMap8<Integer, GridCacheContext<K, V>> ctxMap; @@ -163,6 +167,7 @@ public class GridCacheSharedContext<K, V> { /** * @param kernalCtx Context. + * @param coord Cache mvcc coordinator manager. * @param txMgr Transaction manager. * @param verMgr Version manager. * @param mvccMgr MVCC manager. @@ -176,6 +181,7 @@ public class GridCacheSharedContext<K, V> { */ public GridCacheSharedContext( GridKernalContext kernalCtx, + CacheCoordinatorsSharedManager coord, IgniteTxManager txMgr, GridCacheVersionManager verMgr, GridCacheMvccManager mvccMgr, @@ -193,7 +199,21 @@ public class GridCacheSharedContext<K, V> { ) { this.kernalCtx = kernalCtx; - setManagers(mgrs, txMgr, jtaMgr, verMgr, mvccMgr, pageStoreMgr, walMgr, dbMgr, snpMgr, depMgr, exchMgr, affMgr, ioMgr, ttlMgr); + setManagers(mgrs, + coord, + txMgr, + jtaMgr, + verMgr, + mvccMgr, + pageStoreMgr, + walMgr, + dbMgr, + snpMgr, + depMgr, + exchMgr, + affMgr, + ioMgr, + ttlMgr); this.storeSesLsnrs = storeSesLsnrs; @@ -335,7 +355,9 @@ public class GridCacheSharedContext<K, V> { void onReconnected(boolean active) throws IgniteCheckedException { List<GridCacheSharedManager<K, V>> mgrs = new LinkedList<>(); - setManagers(mgrs, txMgr, + setManagers(mgrs, + coord, + txMgr, jtaMgr, verMgr, mvccMgr, @@ -374,6 +396,7 @@ public class GridCacheSharedContext<K, V> { /** * @param mgrs Managers list. + * @param coord Cache mvcc coordinator manager. * @param txMgr Transaction manager. * @param jtaMgr JTA manager. * @param verMgr Version manager. @@ -385,6 +408,7 @@ public class GridCacheSharedContext<K, V> { * @param ttlMgr Ttl cleanup manager. */ private void setManagers(List<GridCacheSharedManager<K, V>> mgrs, + CacheCoordinatorsSharedManager coord, IgniteTxManager txMgr, CacheJtaManagerAdapter jtaMgr, GridCacheVersionManager verMgr, @@ -398,6 +422,7 @@ public class GridCacheSharedContext<K, V> { CacheAffinitySharedManager affMgr, GridCacheIoManager ioMgr, GridCacheSharedTtlCleanupManager ttlMgr) { + this.coord = add(mgrs, coord); this.mvccMgr = add(mgrs, mvccMgr); this.verMgr = add(mgrs, verMgr); this.txMgr = add(mgrs, txMgr); @@ -738,6 +763,13 @@ public class GridCacheSharedContext<K, V> { } /** + * @return Cache mvcc coordinator manager. + */ + public CacheCoordinatorsSharedManager coordinators() { + return coord; + } + + /** * @return Node ID. */ public UUID localNodeId() { http://git-wip-us.apache.org/repos/asf/ignite/blob/f6e98254/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java new file mode 100644 index 0000000..e5d07ea --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java @@ -0,0 +1,454 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.mvcc; + +import java.util.List; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.events.DiscoveryEvent; +import org.apache.ignite.events.Event; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; +import org.apache.ignite.internal.managers.communication.GridMessageListener; +import org.apache.ignite.internal.managers.discovery.DiscoCache; +import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; +import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.jetbrains.annotations.Nullable; + +import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; +import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; +import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE_COORDINATOR; +import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL; + +/** + * + */ +public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManagerAdapter<K, V> { + /** */ + private final AtomicLong mvccCntr = new AtomicLong(0L); + + /** */ + private final AtomicLong committedCntr = new AtomicLong(0L); + + /** */ + private final ConcurrentHashMap<GridCacheVersion, Long> activeTxs = new ConcurrentHashMap<>(); + + /** */ + private final ConcurrentMap<Long, MvccCounterFuture> cntrFuts = new ConcurrentHashMap<>(); + + /** */ + private final ConcurrentMap<Long, TxAckFuture> ackFuts = new ConcurrentHashMap<>(); + + /** */ + private final AtomicLong futIdCntr = new AtomicLong(); + + /** {@inheritDoc} */ + @Override protected void start0() throws IgniteCheckedException { + super.start0(); + + cctx.gridEvents().addLocalEventListener(new CacheCoordinatorDiscoveryListener(), + EVT_NODE_FAILED, EVT_NODE_LEFT); + + cctx.gridIO().addMessageListener(TOPIC_CACHE_COORDINATOR, new CoordinatorMessageListener()); + } + + /** + * @param crd Coordinator. + * @param txId Transaction ID. + * @return Counter request future. + */ + public IgniteInternalFuture<Long> requestTxCounter(ClusterNode crd, GridCacheVersion txId) { + MvccCounterFuture fut = new MvccCounterFuture(futIdCntr.incrementAndGet(), crd); + + cntrFuts.put(fut.id, fut); + + try { + cctx.gridIO().sendToGridTopic(crd, + TOPIC_CACHE_COORDINATOR, + new CoordinatorTxCounterRequest(fut.id, txId), + SYSTEM_POOL); + } + catch (IgniteCheckedException e) { + if (cntrFuts.remove(fut.id) != null) + fut.onDone(e); + } + + return fut; + } + + public IgniteInternalFuture<Long> requestQueryCounter(ClusterNode crd) { + MvccCounterFuture fut = new MvccCounterFuture(futIdCntr.incrementAndGet(), crd); + + cntrFuts.put(fut.id, fut); + + try { + cctx.gridIO().sendToGridTopic(crd, + TOPIC_CACHE_COORDINATOR, + new CoordinatorQueryCounterRequest(fut.id), + SYSTEM_POOL); + } + catch (IgniteCheckedException e) { + if (cntrFuts.remove(fut.id) != null) + fut.onDone(e); + } + + return fut; + } + + /** + * @param txId Transaction ID. + * @return Acknowledge future. + */ + public IgniteInternalFuture<Void> ackTxCommit(ClusterNode crd, GridCacheVersion txId) { + TxAckFuture fut = new TxAckFuture(futIdCntr.incrementAndGet(), crd); + + ackFuts.put(fut.id, fut); + + try { + cctx.gridIO().sendToGridTopic(crd, + TOPIC_CACHE_COORDINATOR, + new CoordinatorTxAckRequest(fut.id, txId), + SYSTEM_POOL); + } + catch (ClusterTopologyCheckedException e) { + if (cntrFuts.remove(fut.id) != null) + fut.onDone(); + } + catch (IgniteCheckedException e) { + if (cntrFuts.remove(fut.id) != null) + fut.onDone(e); + } + + return fut; + } + + public void ackTxRollback(ClusterNode crd, GridCacheVersion txId) { + CoordinatorTxAckRequest msg = new CoordinatorTxAckRequest(0, txId); + + msg.skipResponse(true); + + try { + cctx.gridIO().sendToGridTopic(crd, + TOPIC_CACHE_COORDINATOR, + msg, + SYSTEM_POOL); + } + catch (ClusterTopologyCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to send tx rollback ack, node left [msg=" + msg + ", node=" + crd.id() + ']'); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send tx rollback ack [msg=" + msg + ", node=" + crd.id() + ']', e); + } + } + + + /** + * @param txId Transaction ID. + * @return Counter. + */ + private long assignTxCounter(GridCacheVersion txId) { + long nextCtr = mvccCntr.incrementAndGet(); + + Object old = activeTxs.put(txId, nextCtr); + + assert old == null : txId; + + return nextCtr; + } + + /** + * @param nodeId Sender node ID. + * @param msg Message. + */ + private void processCoordinatorTxCounterRequest(UUID nodeId, CoordinatorTxCounterRequest msg) { + ClusterNode node = cctx.discovery().node(nodeId); + + if (node == null) { + if (log.isDebugEnabled()) + log.debug("Ignore tx counter request processing, node left [msg=" + msg + ", node=" + nodeId + ']'); + + return; + } + + long nextCtr = assignTxCounter(msg.txId()); + + try { + cctx.gridIO().sendToGridTopic(node, + TOPIC_CACHE_COORDINATOR, + new CoordinatorMvccCounterResponse(nextCtr, msg.futureId()), + SYSTEM_POOL); + } + catch (ClusterTopologyCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to send tx counter response, node left [msg=" + msg + ", node=" + nodeId + ']'); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send tx counter response [msg=" + msg + ", node=" + nodeId + ']', e); + } + } + + /** + * @param nodeId Sender node ID. + * @param msg Message. + */ + private void processCoordinatorCounterResponse(UUID nodeId, CoordinatorMvccCounterResponse msg) { + MvccCounterFuture fut = cntrFuts.remove(msg.futureId()); + + if (fut != null) + fut.onResponse(msg.counter()); + else { + if (cctx.discovery().alive(nodeId)) + U.warn(log, "Failed to find coordinator counter future [node=" + nodeId + ", msg=" + msg + ']'); + else if (log.isDebugEnabled()) + log.debug("Failed to find query counter future [node=" + nodeId + ", msg=" + msg + ']'); + } + } + /** + * + * @param nodeId Sender node ID. + * @param msg Message. + */ + private void processCoordinatorQueryStateRequest(UUID nodeId, CoordinatorQueryCounterRequest msg) { + ClusterNode node = cctx.discovery().node(nodeId); + + if (node == null) { + if (log.isDebugEnabled()) + log.debug("Ignore query counter request processing, node left [msg=" + msg + ", node=" + nodeId + ']'); + + return; + } + + long qryCntr = assignQueryCounter(nodeId); + + CoordinatorMvccCounterResponse res = new CoordinatorMvccCounterResponse(msg.futureId(), qryCntr); + + try { + cctx.gridIO().sendToGridTopic(node, + TOPIC_CACHE_COORDINATOR, + res, + SYSTEM_POOL); + } + catch (ClusterTopologyCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to send query counter response, node left [msg=" + msg + ", node=" + nodeId + ']'); + + onQueryDone(qryCntr); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send query counter response [msg=" + msg + ", node=" + nodeId + ']', e); + + onQueryDone(qryCntr); + } + } + + /** + * @param msg Message. + */ + private void processCoordinatorQueryAckRequest(CoordinatorQueryAckRequest msg) { + onQueryDone(msg.counter()); + } + + /** + * @param nodeId Sender node ID. + * @param msg Message. + */ + private void processCoordinatorTxAckRequest(UUID nodeId, CoordinatorTxAckRequest msg) { + activeTxs.remove(msg.txId()); + + if (!msg.skipResponse()) { + try { + cctx.gridIO().sendToGridTopic(nodeId, + TOPIC_CACHE_COORDINATOR, + new CoordinatorTxAckResponse(msg.futureId()), + SYSTEM_POOL); + } + catch (ClusterTopologyCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to send tx ack response, node left [msg=" + msg + ", node=" + nodeId + ']'); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send tx ack response [msg=" + msg + ", node=" + nodeId + ']', e); + } + } + } + + /** + * @param nodeId Sender node ID. + * @param msg Message. + */ + private void processCoordinatorTxAckResponse(UUID nodeId, CoordinatorTxAckResponse msg) { + TxAckFuture fut = ackFuts.get(msg.futureId()); + + if (fut != null) + fut.onResponse(); + else { + if (cctx.discovery().alive(nodeId)) + U.warn(log, "Failed to find tx ack future [node=" + nodeId + ", msg=" + msg + ']'); + else if (log.isDebugEnabled()) + log.debug("Failed to find tx ack future [node=" + nodeId + ", msg=" + msg + ']'); + } + } + + /** + * @param qryNodeId Node initiated query. + * @return Counter for query. + */ + private long assignQueryCounter(UUID qryNodeId) { + // TODO IGNITE-3478 + return committedCntr.get(); + } + + /** + * @param cntr Query counter. + */ + private void onQueryDone(long cntr) { + // TODO IGNITE-3478 + } + + /** + * @param discoCache Cluster topology. + * @return Assigned coordinator. + */ + @Nullable public ClusterNode assignCoordinator(DiscoCache discoCache) { + // TODO IGNITE-3478 + List<ClusterNode> srvNodes = discoCache.serverNodes(); + + return srvNodes.isEmpty() ? null : srvNodes.get(0); + } + + /** + * + */ + private class MvccCounterFuture extends GridFutureAdapter<Long> { + /** */ + private final Long id; + + /** */ + private final ClusterNode crd; + + /** + * @param id Future ID. + * @param crd Coordinator. + */ + MvccCounterFuture(Long id, ClusterNode crd) { + this.id = id; + this.crd = crd; + } + + /** + * @param cntr Counter. + */ + void onResponse(long cntr) { + onDone(cntr); + } + + /** + * @param nodeId Failed node ID. + */ + void onNodeLeft(UUID nodeId) { + if (crd.id().equals(nodeId) && cntrFuts.remove(id) != null) + onDone(new ClusterTopologyCheckedException("Failed to request counter, node failed: " + nodeId)); + } + } + + /** + * + */ + private class TxAckFuture extends GridFutureAdapter<Void> { + /** */ + private final Long id; + + /** */ + private final ClusterNode crd; + + /** + * @param id Future ID. + * @param crd Coordinator. + */ + TxAckFuture(Long id, ClusterNode crd) { + this.id = id; + this.crd = crd; + } + + /** + * + */ + void onResponse() { + onDone(); + } + + /** + * @param nodeId Failed node ID. + */ + void onNodeLeft(UUID nodeId) { + if (crd.id().equals(nodeId) && cntrFuts.remove(id) != null) + onDone(); + } + } + + /** + * + */ + private class CacheCoordinatorDiscoveryListener implements GridLocalEventListener { + /** {@inheritDoc} */ + @Override public void onEvent(Event evt) { + assert evt instanceof DiscoveryEvent : evt; + + DiscoveryEvent discoEvt = (DiscoveryEvent)evt; + + UUID nodeId = discoEvt.eventNode().id(); + + for (MvccCounterFuture fut : cntrFuts.values()) + fut.onNodeLeft(nodeId); + +// for (AckFuture fut : ackFuts.values()) +// fut.onNodeLeft(nodeId); +// + } + } + /** + * + */ + private class CoordinatorMessageListener implements GridMessageListener { + /** {@inheritDoc} */ + @Override public void onMessage(UUID nodeId, Object msg, byte plc) { + if (msg instanceof CoordinatorTxCounterRequest) + processCoordinatorTxCounterRequest(nodeId, (CoordinatorTxCounterRequest)msg); + else if (msg instanceof CoordinatorMvccCounterResponse) + processCoordinatorCounterResponse(nodeId, (CoordinatorMvccCounterResponse)msg); + else if (msg instanceof CoordinatorTxAckRequest) + processCoordinatorTxAckRequest(nodeId, (CoordinatorTxAckRequest)msg); + else if (msg instanceof CoordinatorTxAckResponse) + processCoordinatorTxAckResponse(nodeId, (CoordinatorTxAckResponse)msg); + else if (msg instanceof CoordinatorQueryAckRequest) + processCoordinatorQueryAckRequest((CoordinatorQueryAckRequest)msg); + else if (msg instanceof CoordinatorQueryCounterRequest) + processCoordinatorQueryStateRequest(nodeId, (CoordinatorQueryCounterRequest)msg); + else + U.warn(log, "Unexpected message received: " + msg); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/f6e98254/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorMvccCounterResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorMvccCounterResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorMvccCounterResponse.java new file mode 100644 index 0000000..5005477 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorMvccCounterResponse.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.mvcc; + +import java.nio.ByteBuffer; +import org.apache.ignite.internal.managers.communication.GridIoMessageFactory; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; + +/** + * + */ +public class CoordinatorMvccCounterResponse implements Message { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private long cntr; + + /** */ + private long futId; + + /** + * Required by {@link GridIoMessageFactory}. + */ + public CoordinatorMvccCounterResponse() { + // No-op. + } + + /** + * @param cntr Counter. + * @param futId Future ID. + */ + CoordinatorMvccCounterResponse(long cntr, long futId) { + this.cntr = cntr; + this.futId = futId; + } + + /** + * @return Future ID. + */ + public long futureId() { + return futId; + } + + /** + * @return Counter. + */ + public long counter() { + return cntr; + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 0: + if (!writer.writeLong("cntr", cntr)) + return false; + + writer.incrementState(); + + case 1: + if (!writer.writeLong("futId", futId)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + switch (reader.state()) { + case 0: + cntr = reader.readLong("cntr"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 1: + futId = reader.readLong("futId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return reader.afterMessageRead(CoordinatorMvccCounterResponse.class); + } + + /** {@inheritDoc} */ + @Override public short directType() { + return 130; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 2; + } + + /** {@inheritDoc} */ + @Override public void onAckReceived() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(CoordinatorMvccCounterResponse.class, this); + } +}
