
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.Random;
import org.apache.derby.iapi.services.context.ContextManager;
import org.apache.derby.iapi.services.context.ContextService;
import org.apache.derby.iapi.services.io.FormatableBitSet;
import org.apache.derby.iapi.sql.execute.ExecIndexRow;
import org.apache.derby.iapi.sql.execute.ExecRow;
import org.apache.derby.iapi.sql.execute.ExecutionFactory;
import org.apache.derby.iapi.store.access.AccessFactory;
import org.apache.derby.iapi.store.access.ConglomerateController;
import org.apache.derby.iapi.store.access.DynamicCompiledOpenConglomInfo;
import org.apache.derby.iapi.store.access.ScanController;
import org.apache.derby.iapi.store.access.StaticCompiledOpenConglomInfo;
import org.apache.derby.iapi.store.access.TransactionController;
import org.apache.derby.iapi.types.RowLocation;
import org.apache.derby.iapi.types.SQLInteger;
import org.apache.derby.iapi.types.SQLVarchar;
import org.apache.derby.impl.sql.execute.GenericExecutionFactory;
import org.apache.derby.impl.store.access.RAMAccessManager;
import org.apache.derby.impl.store.access.heap.HeapRowLocation;

public class SimpleSelect {
    private static long heapId = -1;
    private static long indexId = -1;
    private static ExecutionFactory ef;
    private static ContextService csf;
    private static AccessFactory af;

    public static void main(String[] args) throws Exception {

        if (args.length != 3) {
            System.out.println("usage: SimpleSelect clients warmup runtime");
            System.exit(1);
        }

        int numberOfClients = Integer.parseInt(args[0]);
        long warmup = Long.parseLong(args[1]) * 1000;
        long runtime = Long.parseLong(args[2]) * 1000;

        Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
        Connection c = DriverManager.getConnection("jdbc:derby:db");
        Statement s = c.createStatement();
        ResultSet rs = s.executeQuery(
                "select conglomeratenumber, isindex from " +
                "sys.sysconglomerates sc, sys.systables st " +
                "where st.tablename = 'SINGLE_RECORD_100000_0_VARCHAR' " +
                "and sc.tableid = st.tableid");
        while (rs.next()) {
            long conglomerate = rs.getLong(1);
            if (rs.getBoolean(2)) {
                System.out.print("Found index conglomerate: ");
                indexId = conglomerate;
            } else {
                System.out.print("Found heap conglomerate: ");
                heapId = conglomerate;
            }
            System.out.println(conglomerate);
        }
        rs.close();
        s.close();
        c.close();

        af = RAMAccessManager.DEFAULT_INSTANCE;
        System.out.println(af);
        csf = ContextService.getFactory();
        System.out.println(csf);
        ef = GenericExecutionFactory.DEFAULT_INSTANCE;
        System.out.println(ef);

        System.out.println("Starting " + numberOfClients + " client(s)...");
        Thread[] clientThreads = new Thread[numberOfClients];
        Client[] clients = new Client[numberOfClients];

        for (int i = 0; i < clientThreads.length; i++) {
            clients[i] = new Client();
            clientThreads[i] = new Thread(clients[i]);
            clientThreads[i].start();
        }

        System.out.println("warmup...");
        Thread.sleep(warmup);
        System.out.println("collect...");
        collect = true;
        long time = System.currentTimeMillis();
        Thread.sleep(runtime);
        stop = true;
        collect = false;
        time = System.currentTimeMillis() - time;
        long txCount = 0;
        for (int i = 0; i < clientThreads.length; i++) {
            clientThreads[i].join();
            txCount += clients[i].txCount;
        }
        System.out.println("tps: " + (double) txCount * 1000 / time);
    }

    private static volatile boolean stop;
    private static volatile boolean collect;

    private static class Client implements Runnable {
        
        int txCount;
        
        public void run() {
            try {
                run_();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        private void run_() throws Exception {
            ContextManager cm = csf.newContextManager();
            csf.setCurrentContextManager(cm);

            TransactionController tc = af.getTransaction(cm);

            StaticCompiledOpenConglomInfo statIdx =
                    tc.getStaticCompiledConglomInfo(indexId);
            StaticCompiledOpenConglomInfo statHeap =
                    tc.getStaticCompiledConglomInfo(heapId);
            DynamicCompiledOpenConglomInfo dynIdx =
                    tc.getDynamicCompiledConglomInfo(indexId);
            DynamicCompiledOpenConglomInfo dynHeap =
                    tc.getDynamicCompiledConglomInfo(heapId);

            SQLInteger key = new SQLInteger();
            ExecRow keyRow = ef.getValueRow(1);
            keyRow.setColumn(1, key);
            
            ExecIndexRow indexRow = ef.getIndexableRow(2);
            SQLInteger id = new SQLInteger();
            indexRow.setColumn(1, id);
            RowLocation loc = new HeapRowLocation();
            indexRow.setColumn(2, loc);

            ExecRow heapRow = ef.getValueRow(2);
            SQLVarchar val = new SQLVarchar();
            heapRow.setColumn(2, val);
            FormatableBitSet heapCols = new FormatableBitSet(2);
            heapCols.set(1);
            
            Random r = new Random();

            while (!stop) {
                int x = r.nextInt(100000);
                key.setValue(x);

                ScanController sc = tc.openCompiledScan(
                        true, 0, TransactionController.MODE_RECORD,
                        TransactionController.ISOLATION_READ_COMMITTED,
                        null, // columns
                        keyRow.getRowArray(), // start key
                        ScanController.GE,
                        null, // qualifiers
                        keyRow.getRowArray(), // stop key
                        ScanController.GT,
                        statIdx,
                        dynIdx);

                if (!sc.fetchNext(indexRow.getRowArray())) {
                    throw new Exception("index row not found");
                }

                if (x != id.getInt()) {
                    throw new Exception("unexpected id");
                }

                ConglomerateController cc =
                        tc.openCompiledConglomerate(
                            true,
                            TransactionController.OPENMODE_SECONDARY_LOCKED,
                            TransactionController.MODE_RECORD,
                            TransactionController.ISOLATION_READ_COMMITTED,
                            statHeap, dynHeap);
                
                if (!cc.fetch(loc, heapRow.getRowArray(), heapCols)) {
                    throw new Exception("heap row not found");
                }
                
                String data = val.getString();
                if (data.length() != 100) {
                    throw new Exception("unexpected length: " + data.length());
                }

                sc.close();
                cc.close();
                tc.commit();
                if (collect) txCount++;
            }
        }
        
    }
}
