Add benchmark.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2df2c1f6 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2df2c1f6 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2df2c1f6 Branch: refs/heads/ignite-6022-proto Commit: 2df2c1f6af2650247a3e717f4ca10a9d817b376b Parents: 392e8ee Author: devozerov <[email protected]> Authored: Mon Dec 18 15:52:54 2017 +0300 Committer: devozerov <[email protected]> Committed: Mon Dec 18 15:52:54 2017 +0300 ---------------------------------------------------------------------- .../odbc/jdbc/JdbcRequestHandler.java | 2 +- modules/indexing/pom.xml | 6 + .../query/h2/opt/JdbcBatchLoader.java | 167 +++++++++++++++++++ 3 files changed, 174 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/2df2c1f6/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java index c28c831..7e508a8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java @@ -465,7 +465,7 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler { } } - public static volatile boolean STREAMER = false; + public static volatile boolean STREAMER = true; /** * @param req Request. http://git-wip-us.apache.org/repos/asf/ignite/blob/2df2c1f6/modules/indexing/pom.xml ---------------------------------------------------------------------- diff --git a/modules/indexing/pom.xml b/modules/indexing/pom.xml index fc965c5..1edaac0 100644 --- a/modules/indexing/pom.xml +++ b/modules/indexing/pom.xml @@ -123,6 +123,12 @@ <version>${spring.version}</version> <scope>test</scope> </dependency> + + <dependency> + <groupId>com.mchange</groupId> + <artifactId>c3p0</artifactId> + <version>0.9.5.2</version> + </dependency> </dependencies> <profiles> http://git-wip-us.apache.org/repos/asf/ignite/blob/2df2c1f6/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/JdbcBatchLoader.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/JdbcBatchLoader.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/JdbcBatchLoader.java new file mode 100644 index 0000000..f145f58 --- /dev/null +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/JdbcBatchLoader.java @@ -0,0 +1,167 @@ +package org.apache.ignite.internal.processors.query.h2.opt; + +import com.mchange.v2.c3p0.ComboPooledDataSource; +import org.apache.ignite.Ignite; +import org.apache.ignite.Ignition; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.util.typedef.internal.U; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.Statement; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +public class JdbcBatchLoader { + /** */ + private static final String SQL_CREATE = "CREATE TABLE IF NOT EXISTS Person(" + + " id integer PRIMARY KEY," + + " name varchar(50)," + + " age integer," + + " salary integer" + + ")"; + + /** */ + private static final String SQL_INSERT = "INSERT INTO Person(id, name, age, salary) VALUES (?, ?, ?, ?)"; + + /** + * @param msg Message to log. + */ + private static void log(String msg) { + U.debug(msg); + } + + /** + * Main entry point. + * + * @param args Command line arguments. + */ + public static void main(String[] args) { + IgniteConfiguration cfg = new IgniteConfiguration().setLocalHost("127.0.0.1"); + + try (Ignite node = Ignition.start(cfg)) { + try { + JdbcBatchLoader ldr = new JdbcBatchLoader(); + + ldr.load(10_000_000, 10_000, 8, "127.0.0.1"); + } + catch (Exception e) { + log("Failed to load data into cloud"); + + e.printStackTrace(); + } + } + } + + /** + * Load data into cloud. + * + * @param total Total number of rows to lad. + * @param batch Batch size. + * @param threads How many threads to use. + * @param addr JDBC endpoint address. + * @throws Exception If failed to load data to cloud. + */ + public void load(int total, int batch, int threads, String addr) throws Exception { + log("Connecting to IGNITE..."); + + ComboPooledDataSource dataSrc = new ComboPooledDataSource(); + + dataSrc.setDriverClass("org.apache.ignite.IgniteJdbcThinDriver"); + dataSrc.setJdbcUrl("jdbc:ignite:thin://" + addr); + + try(Connection conn = dataSrc.getConnection()) { + Statement stmt = conn.createStatement(); + + stmt.execute(SQL_CREATE); + + U.closeQuiet(stmt); + } + + int cnt = total / batch; + + CountDownLatch latch = new CountDownLatch(cnt); + + ExecutorService exec = Executors.newFixedThreadPool(threads); + + log("Start loading of " + total + " records..."); + + long start = System.currentTimeMillis(); + + for (int i = 0; i < cnt; i++) + exec.execute(new Worker(dataSrc, i, batch, latch)); + + latch.await(); + + log("Loading time: " + (System.currentTimeMillis() - start) / 1000 + "seconds"); + log("Loading finished!"); + + U.shutdownNow(JdbcBatchLoader.class, exec, null); + dataSrc.close(); + } + + /** + * Class that execute batch loading. + */ + private static class Worker implements Runnable { + /** */ + private final ComboPooledDataSource dataSrc; + + /** */ + private final int packet; + + /** */ + private final CountDownLatch latch; + + /** */ + private final int start; + + /** */ + private final int finish; + + /** + * + * @param dataSrc Data source. + * @param packet Packet ID. + * @param batch Batch size. + * @param latch Control latch to complete loading. + */ + private Worker(ComboPooledDataSource dataSrc, int packet, int batch, CountDownLatch latch) { + this.dataSrc = dataSrc; + this.packet = packet; + this.latch = latch; + + start = packet * batch; + finish = start + batch; + } + + /** {@inheritDoc} */ + @Override public void run() { + try(Connection conn = dataSrc.getConnection()) { + PreparedStatement pstmt = conn.prepareStatement(SQL_INSERT); + + for (int i = start; i < finish; i++) { + pstmt.setInt(1, i); + pstmt.setString(2, "Some name" + i); + pstmt.setInt(3, 100); + pstmt.setInt(4, 200); + + pstmt.addBatch(); + } + + pstmt.executeBatch(); + } + catch (Throwable e) { + log("Failed to load packet: [packet=" + packet + ", err=" + e.getMessage() + "]"); + + e.printStackTrace(); + } + finally { + latch.countDown(); + +// log("Processed packed: " + packet); + } + } + } +}
