Repository: ignite Updated Branches: refs/heads/master 2d4360707 -> 05c5939ae
IGNITE-3708 Fixed multithreaded loading entries for MySql. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d399db92 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d399db92 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d399db92 Branch: refs/heads/master Commit: d399db92ab4e147a3933a42dd5635b225665ac63 Parents: 974467a Author: Alexey Kuznetsov <[email protected]> Authored: Mon Aug 22 15:00:06 2016 +0700 Committer: Alexey Kuznetsov <[email protected]> Committed: Mon Aug 22 15:00:06 2016 +0700 ---------------------------------------------------------------------- .../store/jdbc/CacheAbstractJdbcStore.java | 43 +++++++++++--------- .../store/jdbc/dialect/BasicJdbcDialect.java | 7 +++- .../cache/store/jdbc/dialect/JdbcDialect.java | 11 ++++- .../cache/store/jdbc/dialect/MySQLDialect.java | 18 +++++++- .../jdbc/CacheJdbcPojoStoreFactorySelfTest.java | 11 +++-- 5 files changed, 65 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/d399db92/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java index c16f2c6..aad05e0 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java @@ -495,10 +495,11 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>, * @param clo Closure that will be applied to loaded values. * @param lowerBound Lower bound for range. * @param upperBound Upper bound for range. + * @param fetchSize Number of rows to fetch from DB. * @return Callable for pool submit. */ private Callable<Void> loadCacheRange(final EntryMapping em, final IgniteBiInClosure<K, V> clo, - @Nullable final Object[] lowerBound, @Nullable final Object[] upperBound) { + @Nullable final Object[] lowerBound, @Nullable final Object[] upperBound, final int fetchSize) { return new Callable<Void>() { @Override public Void call() throws Exception { Connection conn = null; @@ -512,6 +513,8 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>, ? em.loadCacheQry : em.loadCacheRangeQuery(lowerBound != null, upperBound != null)); + stmt.setFetchSize(fetchSize); + int idx = 1; if (lowerBound != null) @@ -555,7 +558,7 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>, * @return Callable for pool submit. */ private Callable<Void> loadCacheFull(EntryMapping m, IgniteBiInClosure<K, V> clo) { - return loadCacheRange(m, clo, null, null); + return loadCacheRange(m, clo, null, null, dialect.getFetchSize()); } /** @@ -811,10 +814,6 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>, for (EntryMapping em : entryMappings) { if (parallelLoadCacheMinThreshold > 0) { - if (log.isDebugEnabled()) - log.debug("Multithread loading entries from db [cache=" + U.maskName(cacheName) + - ", keyType=" + em.keyType() + " ]"); - Connection conn = null; try { @@ -827,6 +826,10 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>, ResultSet rs = stmt.executeQuery(); if (rs.next()) { + if (log.isDebugEnabled()) + log.debug("Multithread loading entries from db [cache=" + U.maskName(cacheName) + + ", keyType=" + em.keyType() + " ]"); + int keyCnt = em.keyCols.size(); Object[] upperBound = new Object[keyCnt]; @@ -834,7 +837,7 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>, for (int i = 0; i < keyCnt; i++) upperBound[i] = rs.getObject(i + 1); - futs.add(pool.submit(loadCacheRange(em, clo, null, upperBound))); + futs.add(pool.submit(loadCacheRange(em, clo, null, upperBound, 0))); while (rs.next()) { Object[] lowerBound = upperBound; @@ -844,28 +847,28 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>, for (int i = 0; i < keyCnt; i++) upperBound[i] = rs.getObject(i + 1); - futs.add(pool.submit(loadCacheRange(em, clo, lowerBound, upperBound))); + futs.add(pool.submit(loadCacheRange(em, clo, lowerBound, upperBound, 0))); } - futs.add(pool.submit(loadCacheRange(em, clo, upperBound, null))); + futs.add(pool.submit(loadCacheRange(em, clo, upperBound, null, 0))); + + continue; } - else - futs.add(pool.submit(loadCacheFull(em, clo))); } - catch (SQLException ignored) { - futs.add(pool.submit(loadCacheFull(em, clo))); + catch (SQLException e) { + log.warning("Failed to load entries from db in multithreaded mode [cache=" + U.maskName(cacheName) + + ", keyType=" + em.keyType() + " ]", e); } finally { U.closeQuiet(conn); } } - else { - if (log.isDebugEnabled()) - log.debug("Single thread loading entries from db [cache=" + U.maskName(cacheName) + - ", keyType=" + em.keyType() + " ]"); - futs.add(pool.submit(loadCacheFull(em, clo))); - } + if (log.isDebugEnabled()) + log.debug("Single thread loading entries from db [cache=" + U.maskName(cacheName) + + ", keyType=" + em.keyType() + " ]"); + + futs.add(pool.submit(loadCacheFull(em, clo))); } } @@ -1926,6 +1929,8 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>, stmt = conn.prepareStatement(qry); + stmt.setFetchSize(dialect.getFetchSize()); + ResultSet rs = stmt.executeQuery(); ResultSetMetaData meta = rs.getMetaData(); http://git-wip-us.apache.org/repos/asf/ignite/blob/d399db92/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/BasicJdbcDialect.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/BasicJdbcDialect.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/BasicJdbcDialect.java index abb59d3..cd9c986 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/BasicJdbcDialect.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/BasicJdbcDialect.java @@ -274,4 +274,9 @@ public class BasicJdbcDialect implements JdbcDialect { public void setMaxParameterCount(int maxParamsCnt) { this.maxParamsCnt = maxParamsCnt; } -} \ No newline at end of file + + /** {@inheritDoc} */ + @Override public int getFetchSize() { + return 0; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/d399db92/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/JdbcDialect.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/JdbcDialect.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/JdbcDialect.java index 38e981f..9daa00b 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/JdbcDialect.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/JdbcDialect.java @@ -115,4 +115,13 @@ public interface JdbcDialect extends Serializable { * @return Max query parameters count. */ public int getMaxParameterCount(); -} \ No newline at end of file + + /** + * Gives the JDBC driver a hint how many rows should be fetched from the database when more rows are needed. + * If the value specified is zero, then the hint is ignored. + * The default value is zero. + * + * @return The fetch size for result sets. + */ + public int getFetchSize(); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/d399db92/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/MySQLDialect.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/MySQLDialect.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/MySQLDialect.java index f7512a7..84e6d05 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/MySQLDialect.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/MySQLDialect.java @@ -29,6 +29,15 @@ public class MySQLDialect extends BasicJdbcDialect { private static final long serialVersionUID = 0L; /** {@inheritDoc} */ + @Override public String loadCacheSelectRangeQuery(String fullTblName, Collection<String> keyCols) { + String cols = mkString(keyCols, ","); + + return String.format("SELECT %s " + + "FROM (SELECT %s, @rownum := @rownum + 1 AS rn FROM %s, (SELECT @rownum := 0) r ORDER BY %s) as r " + + "WHERE mod(rn, ?) = 0", cols, cols, fullTblName, cols); + } + + /** {@inheritDoc} */ @Override public boolean hasMerge() { return true; } @@ -48,4 +57,11 @@ public class MySQLDialect extends BasicJdbcDialect { return String.format("INSERT INTO %s (%s) VALUES (%s) ON DUPLICATE KEY UPDATE %s", fullTblName, mkString(cols, ", "), repeat("?", cols.size(), "", ",", ""), updPart); } -} \ No newline at end of file + + /** {@inheritDoc} */ + @Override public int getFetchSize() { + // Workaround for known issue with MySQL large result set. + // See: http://dev.mysql.com/doc/connector-j/5.1/en/connector-j-reference-implementation-notes.html + return Integer.MIN_VALUE; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/d399db92/modules/spring/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreFactorySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/spring/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreFactorySelfTest.java b/modules/spring/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreFactorySelfTest.java index 2f36017..dfa1452 100644 --- a/modules/spring/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreFactorySelfTest.java +++ b/modules/spring/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreFactorySelfTest.java @@ -129,9 +129,9 @@ public class CacheJdbcPojoStoreFactorySelfTest extends GridCommonAbstractTest { } /** - * + * Dummy JDBC dialect that does nothing. */ - public static class DummyDialect implements JdbcDialect, Serializable { + public static class DummyDialect implements JdbcDialect { /** {@inheritDoc} */ @Override public String loadCacheSelectRangeQuery(String fullTblName, Collection<String> keyCols) { return null; @@ -185,5 +185,10 @@ public class CacheJdbcPojoStoreFactorySelfTest extends GridCommonAbstractTest { @Override public int getMaxParameterCount() { return 0; } + + /** {@inheritDoc} */ + @Override public int getFetchSize() { + return 0; + } } -} \ No newline at end of file +}
