Hi

Please see attached file: I added method testConcurrentMergeDeadlock() to 
existing test.
This test failed with error
...
Caused by: java.lang.IllegalStateException: Entry is locked [1.4.193/101]
at org.h2.mvstore.DataUtils.newIllegalStateException(DataUtils.java:766)


The same test with UPDATE instead of MERGE works fine.
 
Looks like MERGE removes lock previously acquired by SELECT FOR UPDATE


-- 
You received this message because you are subscribed to the Google Groups "H2 
Database" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/h2-database.
For more options, visit https://groups.google.com/d/optout.
/*
 * Copyright 2004-2014 H2 Group. Multiple-Licensed under the MPL 2.0,
 * and the EPL 1.0 (http://h2database.com/html/license.html).
 * Initial Developer: H2 Group
 */
package org.h2.test.mvcc;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.concurrent.CountDownLatch;

import org.h2.api.ErrorCode;
import org.h2.test.TestBase;
import org.h2.util.Task;

/**
 * Multi-threaded MVCC (multi version concurrency) test cases.
 */
public class TestMvccMultiThreaded extends TestBase {

    /**
     * Run just this test.
     *
     * @param a ignored
     */
    public static void main(String... a) throws Exception {
        TestBase.createCaller().init().test();
    }

    @Override
    public void test() throws Exception {
        testMergeWithUniqueKeyViolation();
        // not supported currently
        if (!config.multiThreaded) {
            testConcurrentMerge();
            testConcurrentUpdate();
            testConcurrentMergeDeadlock();
        }
    }

    private void testMergeWithUniqueKeyViolation() throws Exception {
        deleteDb(getTestName());
        Connection conn = getConnection(getTestName());
        Statement stat = conn.createStatement();
        stat.execute("create table test(x int primary key, y int unique)");
        stat.execute("insert into test values(1, 1)");
        assertThrows(ErrorCode.DUPLICATE_KEY_1, stat).
                execute("merge into test values(2, 1)");
        stat.execute("merge into test values(1, 2)");
        conn.close();

    }

    private void testConcurrentMerge() throws Exception {
        deleteDb(getTestName());
        int len = 3;
        final Connection[] connList = new Connection[len];
        for (int i = 0; i < len; i++) {
            Connection conn = getConnection(
                    getTestName() + ";MVCC=TRUE;LOCK_TIMEOUT=500");
            connList[i] = conn;
        }
        Connection conn = connList[0];
        conn.createStatement().execute(
                "create table test(id int primary key, name varchar)");
        Task[] tasks = new Task[len];
        final boolean[] stop = { false };
        for (int i = 0; i < len; i++) {
            final Connection c = connList[i];
            c.setAutoCommit(false);
            tasks[i] = new Task() {
                @Override
                public void call() throws Exception {
                    while (!stop) {
                        c.createStatement().execute(
                                "merge into test values(1, 'x')");
                        c.commit();
                        Thread.sleep(1);
                    }
                }
            };
            tasks[i].execute();
        }
        Thread.sleep(1000);
        stop[0] = true;
        for (int i = 0; i < len; i++) {
            tasks[i].get();
        }
        for (int i = 0; i < len; i++) {
            connList[i].close();
        }
        deleteDb(getTestName());
    }

    private void testConcurrentUpdate() throws Exception {
        deleteDb(getTestName());
        int len = 2;
        final Connection[] connList = new Connection[len];
        for (int i = 0; i < len; i++) {
            connList[i] = getConnection(
                    getTestName() + ";MVCC=TRUE");
        }
        Connection conn = connList[0];
        conn.createStatement().execute(
                "create table test(id int primary key, value int)");
        conn.createStatement().execute(
                "insert into test values(0, 0)");
        final int count = 1000;
        Task[] tasks = new Task[len];

        final CountDownLatch latch = new CountDownLatch(len);

        for (int i = 0; i < len; i++) {
            final int x = i;
            tasks[i] = new Task() {
                @Override
                public void call() throws Exception {
                    for (int a = 0; a < count; a++) {
                        connList[x].createStatement().execute(
                                "update test set value=value+1");
                        latch.countDown();
                        latch.await();
                    }
                }
            };
            tasks[i].execute();
        }
        for (int i = 0; i < len; i++) {
            tasks[i].get();
        }
        ResultSet rs = conn.createStatement().executeQuery("select value from test");
        rs.next();
        assertEquals(count * len, rs.getInt(1));
        for (int i = 0; i < len; i++) {
            connList[i].close();
        }
    }

    private void testConcurrentMergeDeadlock() throws Exception {
        deleteDb(getTestName());
        int len = 5;
        final Connection[] connList = new Connection[len];
        for (int i = 0; i < len; i++) {
            Connection conn = getConnection(
                    getTestName() + ";MVCC=TRUE;LOCK_TIMEOUT=500");
            conn.setAutoCommit(false);
            connList[i] = conn;
        }
        Connection conn = connList[0];
        conn.createStatement().execute(
                "create table test1(id int primary key, name varchar)");
        conn.createStatement().execute(
                "create table test2(id int primary key, name varchar)");
        Task[] tasks = new Task[len];
        final boolean[] stop = { false };
        for (int i = 0; i < len; i++) {
            final Connection c = connList[i];
            c.setAutoCommit(false);
            tasks[i] = new Task() {
                @Override
                public void call() throws Exception {
                    while (!stop) {
                        String t1,t2;
                        if (Math.random() < 0.5) {
                            t1="1"; t2="2";
                        } else {
                            t2="1"; t1="2";
                        } 
                        c.createStatement().execute(
                                "select id from test1 where id=1 for update");
                        c.createStatement().execute(
                                "select id from test2 where id=1 for update");
                        c.createStatement().execute("merge into test"+t1+" values(1, 'x')");
                        c.createStatement().execute("merge into test"+t2+" values(1, 'x')");
                        Thread.sleep(5);
                        c.commit();
                        Thread.sleep(5);
                    }
                }
            };
            tasks[i].execute();
        }
        Thread.sleep(10000);
        stop[0] = true;
        for (int i = 0; i < len; i++) {
            tasks[i].get();
        }
        for (int i = 0; i < len; i++) {
            connList[i].close();
        }
        deleteDb(getTestName());
    }
}

Reply via email to