Hello,

MULTI_THREADED=0 fixed the issue for me but it's not what I expected... 
What exactly MULTI_THREADED=1 does? Is there any good explanation posted? 
Does it really means that H2 is not multi threaded by default? Would all 32 
CPU cores be used under heavy load?

I've created a requested small test case (see attached 
H2MultiThreadedTester.java; required dependencies are h2database v.1.4.181 
and commons-pool v.1.6). It runs great in case if MULTI_THREADED=0. 
For MULTI_THREADED=1 it throws different kinds of exceptions.
Let me know if you have any questions regarding the test case.

Viktor

On Thursday, September 25, 2014 11:28:01 AM UTC+3, Noel Grandin wrote:
>
> Hi 
>
> If you could create us a small test case that can reproduce this, we could 
> probably fix it. 
> But it looks like a multi-threading issue, so if you're not able to create 
> a test case, I would simply suggest turning 
> MULTI_THREAD off. 
>
> Regards, Noel. 
>

-- 
The information in this message may be confidential.  It is intended solely 
for
the addressee(s).  If you are not the intended recipient, any disclosure,
copying or distribution of the message, or any action or omission taken by 
you
in reliance on it, is prohibited and may be unlawful.  Please immediately
contact the sender if you have received this message in error.

-- 
You received this message because you are subscribed to the Google Groups "H2 
Database" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/h2-database.
For more options, visit https://groups.google.com/d/optout.
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import javax.sql.DataSource;

import org.apache.commons.dbcp.BasicDataSource;

public class H2MultiThreadedTester {

    private static int nrOfProcessors = Runtime.getRuntime().availableProcessors();
    private static Random randomGenerator = new Random();

    private static int numOfThreads = nrOfProcessors * 4;
    private static int numTables = 50;
    private static final int maxNumRowsToInsert = 50000;
    private static final int numOfQueriesPerThread = 100;
    private static final String URL_TEMPLATE = "jdbc:h2:mem:queries_db;CACHE_SIZE=6048576;DATABASE_TO_UPPER=false;IGNORECASE=false;DB_CLOSE_DELAY=-1;MULTI_THREADED=1;MVCC=FALSE;LOCK_TIMEOUT=5000;LOG=0;UNDO_LOG=0";

    public static void main(String[] args) {
        long startTime = System.currentTimeMillis();
        ExecutorService executorService = Executors.newFixedThreadPool(numOfThreads);
        final BasicDataSource ds = getPooledDataSource();
        List<Future<Long>> futures = new ArrayList<Future<Long>>();

        for (int i = 0; i < numTables; i++) {
            Callable<Long> task = new Callable<Long>() {
                @Override
                public Long call() throws Exception {
                    String tblName = "table" + randomGenerator.nextInt(10000000);
                    long createTableTime = createTable(ds, tblName);
                    System.out.println(MessageFormat.format("Table {0} - created in [{1}] millis", new Object[] { tblName, createTableTime }));
                    long insertTime = insertRandomData(ds, tblName);
                    System.out.println(MessageFormat.format("Table {0} - data inserted in [{1}] millis", new Object[] { tblName, insertTime }));
                    long createIndexTime = createIndex(ds, tblName);
                    System.out.println(MessageFormat.format("Table {0} - index created in [{1}] millis", new Object[] { tblName, createIndexTime }));
                    long parallelQueryTime = parallelQuery(ds, tblName);
                    System.out.println(MessageFormat.format("Table {0} - data queries in [{1}] millis", new Object[] { tblName, parallelQueryTime }));
                    long removeTableTime = removeTable(ds, tblName);
                    System.out.println(MessageFormat.format("Table {0} - removed in [{1}] millis", new Object[] { tblName, removeTableTime }));
                    return parallelQueryTime;
                }
            };

            Future<Long> submit = executorService.submit(task);
            futures.add(submit);
        }

        for (Future<Long> future : futures) {
            try {
                future.get();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        executorService.shutdown();
        System.out.println("Total run time [" + (System.currentTimeMillis() - startTime) + "] millis");
    }

    private static BasicDataSource getPooledDataSource() {
        BasicDataSource ds = new BasicDataSource();
        ds.setDriverClassName("org.h2.Driver");
        ds.setUrl(URL_TEMPLATE);
        ds.setUsername("sa");
        ds.setPassword("");
        ds.setInitialSize(10);
        ds.setMaxActive(50);
        ds.setDefaultAutoCommit(false);

        return ds;
    }

    private static long createTable(DataSource dataSource, String tableName) throws SQLException {
        long startTime = System.currentTimeMillis();
        executeUpdateInTransaction(dataSource,
                MessageFormat.format("CREATE MEMORY TABLE IF NOT EXISTS {0} (field1 BIGINT, field2 VARCHAR, field3 VARCHAR) NOT PERSISTENT", new Object[] { tableName }));
        return (System.currentTimeMillis() - startTime);
    }

    private static long removeTable(BasicDataSource dataSource, String tableName) throws SQLException {
        long startTime = System.currentTimeMillis();
        executeUpdateInTransaction(dataSource, MessageFormat.format("DROP TABLE {0}", new Object[] { tableName }));
        return (System.currentTimeMillis() - startTime);
    }

    private static long createIndex(BasicDataSource dataSource, String tableName) throws SQLException {
        long startTime = System.currentTimeMillis();
        executeUpdateInTransaction(dataSource, MessageFormat.format("CREATE INDEX IF NOT EXISTS {0}_INDEX ON {0} (field1 ASC, field2 ASC, field3 ASC)", new Object[] { tableName }));
        return (System.currentTimeMillis() - startTime);
    }

    private static long insertRandomData(DataSource dataSource, String tableName) throws SQLException {
        long startTime = System.currentTimeMillis();

        Connection con = null;
        Statement st = null;
        try {
            con = dataSource.getConnection();
            con.setAutoCommit(false);
            int numRowsToInsert = randomGenerator.nextInt(maxNumRowsToInsert);
            System.out.println(MessageFormat.format("Table {0} - inserting [{1}] data rows", new Object[] { tableName, numRowsToInsert }));
            for (int i = 0; i < numRowsToInsert; i++) {
                st = con.createStatement();
                st.executeUpdate(MessageFormat.format("INSERT INTO {0} (field1, field2, field3) VALUES ({1,number, #}, ''{2}'', ''{3}'')", new Object[] { tableName, i,
                        UUID.randomUUID().toString(), UUID.randomUUID().toString() }));
            }
            con.commit();
        } catch (SQLException e) {
            e.printStackTrace();
            con.rollback();
        } finally {
            if (st != null) {
                st.close();
            }
            if (con != null) {
                con.close();
            }
        }

        return (System.currentTimeMillis() - startTime);
    }

    private static void execute(DataSource dataSource, String query) throws SQLException {
        Connection con = null;
        Statement st = null;
        try {
            con = dataSource.getConnection();
            st = con.createStatement();
            st.execute(query);
        } catch (SQLException e) {
            e.printStackTrace();
            con.rollback();
        } finally {
            if (st != null) {
                st.close();
            }
            if (con != null) {
                con.close();
            }
        }
    }

    private static void executeUpdateInTransaction(DataSource dataSource, String query) throws SQLException {
        Connection con = null;
        Statement st = null;
        try {
            con = dataSource.getConnection();
            con.setAutoCommit(false);
            st = con.createStatement();
            st.executeUpdate(query);
            con.commit();
        } catch (SQLException e) {
            e.printStackTrace();
            con.rollback();
        } finally {
            if (st != null) {
                st.close();
            }
            if (con != null) {
                con.close();
            }
        }
    }

    private static long parallelQuery(final BasicDataSource dataSource, final String tableName) {
        long queryStartTime = System.currentTimeMillis();
        ExecutorService executorService = Executors.newFixedThreadPool(numOfThreads);
        List<Future<Void>> f = new ArrayList<Future<Void>>();

        System.out.println(MessageFormat.format("Table {0} - querying in parallel [{1} queries]", new Object[] { tableName, numOfQueriesPerThread }));
        for (int i = 0; i < numOfQueriesPerThread; i++) {
            Callable<Void> task = new Callable<Void>() {
                @Override
                public Void call() throws Exception {
                    execute(dataSource, MessageFormat.format(
                            "SELECT COUNT(DISTINCT field2) FROM {0} WHERE field1 >= {1,number,#} AND field1 < {2,number,#} AND field3 IS NOT NULL", new Object[] { tableName,
                                    randomGenerator.nextInt(maxNumRowsToInsert), randomGenerator.nextInt(maxNumRowsToInsert) }));
                    return null;
                }
            };
            Future<Void> submit = executorService.submit(task);
            f.add(submit);
        }

        for (Future<Void> future : f) {
            try {
                future.get();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        executorService.shutdown();
        return (System.currentTimeMillis() - queryStartTime);
    }
}

Reply via email to