http://git-wip-us.apache.org/repos/asf/ignite/blob/2961aaa7/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStore.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStore.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStore.java index f8abd16..1724fa0 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStore.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStore.java @@ -19,127 +19,30 @@ package org.apache.ignite.cache.store.jdbc; import java.lang.reflect.Constructor; import java.lang.reflect.Method; -import java.nio.ByteBuffer; -import java.sql.BatchUpdateException; -import java.sql.Connection; -import java.sql.PreparedStatement; import java.sql.ResultSet; -import java.sql.ResultSetMetaData; import java.sql.SQLException; -import java.sql.Statement; -import java.sql.Types; -import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; -import java.util.List; import java.util.Map; -import java.util.UUID; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; -import javax.cache.Cache; import javax.cache.CacheException; import javax.cache.integration.CacheLoaderException; -import javax.cache.integration.CacheWriterException; -import javax.sql.DataSource; import org.apache.ignite.Ignite; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteException; -import org.apache.ignite.IgniteLogger; import org.apache.ignite.IgnitePortables; -import org.apache.ignite.cache.CacheTypeFieldMetadata; -import org.apache.ignite.cache.CacheTypeMetadata; import org.apache.ignite.cache.store.CacheStore; -import org.apache.ignite.cache.store.CacheStoreSession; -import org.apache.ignite.cache.store.jdbc.dialect.BasicJdbcDialect; -import org.apache.ignite.cache.store.jdbc.dialect.DB2Dialect; -import org.apache.ignite.cache.store.jdbc.dialect.H2Dialect; -import org.apache.ignite.cache.store.jdbc.dialect.JdbcDialect; -import org.apache.ignite.cache.store.jdbc.dialect.MySQLDialect; -import org.apache.ignite.cache.store.jdbc.dialect.OracleDialect; -import org.apache.ignite.cache.store.jdbc.dialect.SQLServerDialect; import org.apache.ignite.configuration.CacheConfiguration; -import org.apache.ignite.internal.util.tostring.GridToStringExclude; -import org.apache.ignite.internal.util.typedef.C1; -import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.lang.IgniteBiInClosure; -import org.apache.ignite.lang.IgnitePredicate; -import org.apache.ignite.lifecycle.LifecycleAware; import org.apache.ignite.portable.PortableBuilder; import org.apache.ignite.portable.PortableObject; -import org.apache.ignite.resources.CacheStoreSessionResource; -import org.apache.ignite.resources.IgniteInstanceResource; -import org.apache.ignite.resources.LoggerResource; -import org.apache.ignite.transactions.Transaction; import org.jetbrains.annotations.Nullable; -import static java.sql.Statement.EXECUTE_FAILED; -import static java.sql.Statement.SUCCESS_NO_INFO; -import static org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStoreConfiguration.DFLT_BATCH_SIZE; -import static org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStoreConfiguration.DFLT_WRITE_ATTEMPTS; -import static org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStoreConfiguration.DFLT_PARALLEL_LOAD_CACHE_MINIMUM_THRESHOLD; - /** * Implementation of {@link CacheStore} backed by JDBC and POJO via reflection. * * This implementation stores objects in underlying database using java beans mapping description via reflection. <p> * Use {@link CacheJdbcPojoStoreFactory} factory to pass {@link CacheJdbcPojoStore} to {@link CacheConfiguration}. */ -public class CacheJdbcPojoStore<K, V> implements CacheStore<K, V>, LifecycleAware { - /** Connection attribute property name. */ - private static final String ATTR_CONN_PROP = "JDBC_STORE_CONNECTION"; - - /** Auto-injected store session. */ - @CacheStoreSessionResource - private CacheStoreSession ses; - - /** Auto injected ignite instance. */ - @IgniteInstanceResource - private Ignite ignite; - - /** Auto-injected logger instance. */ - @LoggerResource - private IgniteLogger log; - - /** Lock for metadata cache. */ - @GridToStringExclude - private final Lock cacheMappingsLock = new ReentrantLock(); - - /** Data source. */ - private DataSource dataSrc; - - /** Portables. */ - private IgnitePortables portables; - - /** Cache with entry mapping description. (cache name, (key id, mapping description)). */ - private volatile Map<String, Map<Object, EntryMapping>> cacheMappings = Collections.emptyMap(); - - /** Maximum batch size for writeAll and deleteAll operations. */ - private int batchSz = DFLT_BATCH_SIZE; - - /** Database dialect. */ - private JdbcDialect dialect; - - /** Max workers thread count. These threads are responsible for load cache. */ - private int maxPoolSz = Runtime.getRuntime().availableProcessors(); - - /** Maximum write attempts in case of database error. */ - private int maxWrtAttempts = DFLT_WRITE_ATTEMPTS; - - /** Parallel load cache minimum threshold. If {@code 0} then load sequentially. */ - private int parallelLoadCacheMinThreshold = DFLT_PARALLEL_LOAD_CACHE_MINIMUM_THRESHOLD; - - /** Types that store could process. */ - private CacheJdbcPojoStoreType[] types; - - /** Map for quick check whether type is POJO or Portable. */ - private volatile Map<String, Map<String, Boolean>> keepSerializedTypes = new HashMap<>(); - +public class CacheJdbcPojoStore<K, V> extends CacheAbstractJdbcStore<K, V> { /** POJO methods cache. */ private volatile Map<String, Map<String, PojoMethodsCache>> pojoMethods = Collections.emptyMap(); @@ -147,28 +50,6 @@ public class CacheJdbcPojoStore<K, V> implements CacheStore<K, V>, LifecycleAwar private volatile Map<String, Map<String, Integer>> portableTypeIds = Collections.emptyMap(); /** - * Checks for POJO/portable format. - * - * @param cacheName Cache name to get types settings. - * @param typeName Type name to check for POJO/portable format. - * @return {@code true} If portable format configured. - * @throws CacheException In case of error. - */ - private boolean isKeepSerialized(String cacheName, String typeName) { - Map<String, Boolean> cacheTypes = keepSerializedTypes.get(cacheName); - - if (cacheTypes == null) - throw new CacheException("Failed to find types metadata for cache: " + cacheName); - - Boolean keepSerialized = cacheTypes.get(typeName); - - if (keepSerialized == null) - throw new CacheException("Failed to find type metadata for type: " + typeName); - - return keepSerialized; - } - - /** * Get field value from object for use as query parameter. * * @param cacheName Cache name. @@ -178,7 +59,7 @@ public class CacheJdbcPojoStore<K, V> implements CacheStore<K, V>, LifecycleAwar * @return Field value from object. * @throws CacheException in case of error. */ - @Nullable private Object extractParameter(@Nullable String cacheName, String typeName, String fieldName, + @Nullable protected Object extractParameter(@Nullable String cacheName, String typeName, String fieldName, Object obj) throws CacheException { return isKeepSerialized(cacheName, typeName) ? extractPortableParameter(fieldName, obj) @@ -255,8 +136,8 @@ public class CacheJdbcPojoStore<K, V> implements CacheStore<K, V>, LifecycleAwar * @return Constructed object. * @throws CacheLoaderException If failed to construct cache object. */ - private <R> R buildObject(@Nullable String cacheName, String typeName, - CacheJdbcPojoStoreTypeField[] fields, Map<String, Integer> loadColIdxs, ResultSet rs) + @Override protected <R> R buildObject(@Nullable String cacheName, String typeName, + JdbcTypeField[] fields, Map<String, Integer> loadColIdxs, ResultSet rs) throws CacheLoaderException { return (R)(isKeepSerialized(cacheName, typeName) ? buildPortableObject(cacheName, typeName, fields, loadColIdxs, rs) @@ -275,7 +156,7 @@ public class CacheJdbcPojoStore<K, V> implements CacheStore<K, V>, LifecycleAwar * @throws CacheLoaderException If failed to construct POJO. */ private Object buildPojoObject(@Nullable String cacheName, String typeName, - CacheJdbcPojoStoreTypeField[] fields, Map<String, Integer> loadColIdxs, ResultSet rs) + JdbcTypeField[] fields, Map<String, Integer> loadColIdxs, ResultSet rs) throws CacheLoaderException { Map<String, PojoMethodsCache> cacheMethods = pojoMethods.get(cacheName); @@ -290,14 +171,14 @@ public class CacheJdbcPojoStore<K, V> implements CacheStore<K, V>, LifecycleAwar try { if (mc.simple) { - CacheJdbcPojoStoreTypeField field = fields[0]; + JdbcTypeField field = fields[0]; return getColumnValue(rs, loadColIdxs.get(field.getDatabaseFieldName()), mc.cls); } Object obj = mc.ctor.newInstance(); - for (CacheJdbcPojoStoreTypeField field : fields) { + for (JdbcTypeField field : fields) { String fldJavaName = field.getJavaFieldName(); Method setter = mc.setters.get(fldJavaName); @@ -340,7 +221,7 @@ public class CacheJdbcPojoStore<K, V> implements CacheStore<K, V>, LifecycleAwar * @return Constructed portable object. * @throws CacheLoaderException If failed to construct portable object. */ - protected PortableObject buildPortableObject(String cacheName, String typeName, CacheJdbcPojoStoreTypeField[] fields, + protected PortableObject buildPortableObject(String cacheName, String typeName, JdbcTypeField[] fields, Map<String, Integer> loadColIdxs, ResultSet rs) throws CacheException { Map<String, Integer> cacheTypeIds = portableTypeIds.get(cacheName); @@ -352,21 +233,28 @@ public class CacheJdbcPojoStore<K, V> implements CacheStore<K, V>, LifecycleAwar if (typeId == null) throw new CacheLoaderException("Failed to find portable type ID for type: " + typeName); - PortableBuilder builder = portables.builder(typeId); + PortableBuilder builder = ignite.portables().builder(typeId); if (builder == null) throw new CacheException("Failed to find portable builder for type: " + typeName); try { - for (CacheJdbcPojoStoreTypeField field : fields) { + int hashCode = 1; + + for (JdbcTypeField field : fields) { Class<?> type = field.getJavaFieldType(); Integer colIdx = loadColIdxs.get(field.getDatabaseFieldName()); - builder.setField(field.getJavaFieldName(), getColumnValue(rs, colIdx, type)); + Object colVal = getColumnValue(rs, colIdx, type); + + if (colVal != null) + hashCode = 31 * hashCode + colVal.hashCode(); + + builder.setField(field.getJavaFieldName(), colVal); } - return builder.build(); + return builder.hashCode(hashCode).build(); } catch (SQLException e) { throw new CacheException("Failed to read portable object", e); @@ -380,7 +268,7 @@ public class CacheJdbcPojoStore<K, V> implements CacheStore<K, V>, LifecycleAwar * @return Type ID. * @throws CacheException If failed to calculate type ID for given object. */ - private Object typeIdForObject(Object obj) throws CacheException { + @Override protected Object typeIdForObject(Object obj) throws CacheException { if (obj instanceof PortableObject) return ((PortableObject)obj).typeId(); @@ -395,9 +283,9 @@ public class CacheJdbcPojoStore<K, V> implements CacheStore<K, V>, LifecycleAwar * @return Type ID. * @throws CacheException If failed to get type ID for given type name. */ - private Object typeIdForTypeName(boolean keepSerialized, String typeName) throws CacheException { + @Override protected Object typeIdForTypeName(boolean keepSerialized, String typeName) throws CacheException { if (keepSerialized) - return portables.typeId(typeName); + return ignite.portables().typeId(typeName); try { return Class.forName(typeName); @@ -408,17 +296,31 @@ public class CacheJdbcPojoStore<K, V> implements CacheStore<K, V>, LifecycleAwar } /** + * Prepare internal store specific builders for provided types metadata. + * + * @param cacheName Cache name to prepare builders for. + * @param types Collection of types. + * @throws CacheException If failed to prepare internal builders for types. + */ + @Override protected void prepareBuilders(@Nullable String cacheName, Collection<JdbcType> types) + throws CacheException { + preparePojoBuilders(cacheName, types); + preparePortableBuilders(cacheName, types); + } + + + /** * Prepare builders for POJOs via reflection (getters and setters). * * @param cacheName Cache name to prepare builders for. * @param types Collection of types. * @throws CacheException If failed to prepare internal builders for types. */ - private void preparePojoBuilders(@Nullable String cacheName, Collection<CacheJdbcPojoStoreType> types) + private void preparePojoBuilders(@Nullable String cacheName, Collection<JdbcType> types) throws CacheException { Map<String, PojoMethodsCache> typeMethods = U.newHashMap(types.size() * 2); - for (CacheJdbcPojoStoreType type : types) { + for (JdbcType type : types) { if (!type.isKeepSerialized()) { String keyType = type.getKeyType(); @@ -450,11 +352,11 @@ public class CacheJdbcPojoStore<K, V> implements CacheStore<K, V>, LifecycleAwar * @param types Collection of types. * @throws CacheException If failed to prepare internal builders for types. */ - private void preparePortableBuilders(@Nullable String cacheName, Collection<CacheJdbcPojoStoreType> types) + private void preparePortableBuilders(@Nullable String cacheName, Collection<JdbcType> types) throws CacheException { Map<String, Integer> typeIds = U.newHashMap(types.size() * 2); - for (CacheJdbcPojoStoreType type : types) { + for (JdbcType type : types) { if (type.isKeepSerialized()) { Ignite ignite = ignite(); @@ -478,1438 +380,76 @@ public class CacheJdbcPojoStore<K, V> implements CacheStore<K, V>, LifecycleAwar } /** - * Perform dialect resolution. - * - * @return The resolved dialect. - * @throws CacheException Indicates problems accessing the metadata. - */ - private JdbcDialect resolveDialect() throws CacheException { - Connection conn = null; - - String dbProductName = null; - - try { - conn = openConnection(false); - - dbProductName = conn.getMetaData().getDatabaseProductName(); - } - catch (SQLException e) { - throw new CacheException("Failed access to metadata for detect database dialect.", e); - } - finally { - U.closeQuiet(conn); - } - - if ("H2".equals(dbProductName)) - return new H2Dialect(); - - if ("MySQL".equals(dbProductName)) - return new MySQLDialect(); - - if (dbProductName.startsWith("Microsoft SQL Server")) - return new SQLServerDialect(); - - if ("Oracle".equals(dbProductName)) - return new OracleDialect(); - - if (dbProductName.startsWith("DB2/")) - return new DB2Dialect(); - - U.warn(log, "Failed to resolve dialect (BasicJdbcDialect will be used): " + dbProductName); - - return new BasicJdbcDialect(); - } - - /** {@inheritDoc} */ - @Override public void start() throws IgniteException { - if (dataSrc == null) - throw new IgniteException("Failed to initialize cache store (data source is not provided)."); - - if (dialect == null) { - dialect = resolveDialect(); - - if (log.isDebugEnabled() && dialect.getClass() != BasicJdbcDialect.class) - log.debug("Resolved database dialect: " + U.getSimpleName(dialect.getClass())); - } - - portables = ignite.portables(); - } - - /** {@inheritDoc} */ - @Override public void stop() throws IgniteException { - // No-op. - } - - /** - * Gets connection from a pool. - * - * @param autocommit {@code true} If connection should use autocommit mode. - * @return Pooled connection. - * @throws SQLException In case of error. - */ - protected Connection openConnection(boolean autocommit) throws SQLException { - Connection conn = dataSrc.getConnection(); - - conn.setAutoCommit(autocommit); - - return conn; - } - - /** - * @return Connection. - * @throws SQLException In case of error. - */ - private Connection connection() throws SQLException { - CacheStoreSession ses = session(); - - if (ses.transaction() != null) { - Map<String, Connection> prop = ses.properties(); - - Connection conn = prop.get(ATTR_CONN_PROP); - - if (conn == null) { - conn = openConnection(false); - - // Store connection in session to used it for other operations in the same session. - prop.put(ATTR_CONN_PROP, conn); - } - - return conn; - } - // Transaction can be null in case of simple load operation. - else - return openConnection(true); - } - - /** - * Closes connection. - * - * @param conn Connection to close. + * POJO methods cache. */ - private void closeConnection(@Nullable Connection conn) { - CacheStoreSession ses = session(); + private static class PojoMethodsCache { + /** POJO class. */ + private final Class<?> cls; - // Close connection right away if there is no transaction. - if (ses.transaction() == null) - U.closeQuiet(conn); - } + /** Constructor for POJO object. */ + private Constructor ctor; - /** - * Closes allocated resources depending on transaction status. - * - * @param conn Allocated connection. - * @param st Created statement, - */ - private void end(@Nullable Connection conn, @Nullable Statement st) { - U.closeQuiet(st); + /** {@code true} if object is a simple type. */ + private final boolean simple; - closeConnection(conn); - } + /** Cached setters for POJO object. */ + private Map<String, Method> getters; - /** {@inheritDoc} */ - @Override public void sessionEnd(boolean commit) throws CacheWriterException { - CacheStoreSession ses = session(); + /** Cached getters for POJO object. */ + private Map<String, Method> setters; - Transaction tx = ses.transaction(); + /** + * POJO methods cache. + * + * @param clsName Class name. + * @param fields Fields. + * @throws CacheException If failed to construct type cache. + */ + public PojoMethodsCache(String clsName, JdbcTypeField[] fields) throws CacheException { + try { + cls = Class.forName(clsName); - if (tx != null) { - Connection conn = ses.<String, Connection>properties().remove(ATTR_CONN_PROP); + if (simple = simpleType(cls)) + return; - assert conn != null; + ctor = cls.getDeclaredConstructor(); - try { - if (commit) - conn.commit(); - else - conn.rollback(); - } - catch (SQLException e) { - throw new CacheWriterException( - "Failed to end transaction [xid=" + tx.xid() + ", commit=" + commit + ']', e); + if (!ctor.isAccessible()) + ctor.setAccessible(true); } - finally { - U.closeQuiet(conn); + catch (ClassNotFoundException e) { + throw new CacheException("Failed to find class: " + clsName, e); } - - if (log.isDebugEnabled()) - log.debug("Transaction ended [xid=" + tx.xid() + ", commit=" + commit + ']'); - } - } - - /** - * Retrieves the value of the designated column in the current row of this <code>ResultSet</code> object and will - * convert to the requested Java data type. - * - * @param rs Result set. - * @param colIdx Column index in result set. - * @param type Class representing the Java data type to convert the designated column to. - * @return Value in column. - * @throws SQLException If a database access error occurs or this method is called. - */ - private Object getColumnValue(ResultSet rs, int colIdx, Class<?> type) throws SQLException { - Object val = rs.getObject(colIdx); - - if (val == null) - return null; - - if (type == int.class) - return rs.getInt(colIdx); - - if (type == long.class) - return rs.getLong(colIdx); - - if (type == double.class) - return rs.getDouble(colIdx); - - if (type == boolean.class || type == Boolean.class) - return rs.getBoolean(colIdx); - - if (type == byte.class) - return rs.getByte(colIdx); - - if (type == short.class) - return rs.getShort(colIdx); - - if (type == float.class) - return rs.getFloat(colIdx); - - if (type == Integer.class || type == Long.class || type == Double.class || - type == Byte.class || type == Short.class || type == Float.class) { - Number num = (Number)val; - - if (type == Integer.class) - return num.intValue(); - else if (type == Long.class) - return num.longValue(); - else if (type == Double.class) - return num.doubleValue(); - else if (type == Byte.class) - return num.byteValue(); - else if (type == Short.class) - return num.shortValue(); - else if (type == Float.class) - return num.floatValue(); - } - - if (type == UUID.class) { - if (val instanceof UUID) - return val; - - if (val instanceof byte[]) { - ByteBuffer bb = ByteBuffer.wrap((byte[])val); - - long most = bb.getLong(); - long least = bb.getLong(); - - return new UUID(most, least); + catch (NoSuchMethodException e) { + throw new CacheException("Failed to find default constructor for class: " + clsName, e); } - if (val instanceof String) - return UUID.fromString((String)val); - } - - return val; - } + setters = U.newHashMap(fields.length); - /** - * Construct load cache from range. - * - * @param em Type mapping description. - * @param clo Closure that will be applied to loaded values. - * @param lowerBound Lower bound for range. - * @param upperBound Upper bound for range. - * @return Callable for pool submit. - */ - private Callable<Void> loadCacheRange(final EntryMapping em, final IgniteBiInClosure<K, V> clo, - @Nullable final Object[] lowerBound, @Nullable final Object[] upperBound) { - return new Callable<Void>() { - @Override public Void call() throws Exception { - Connection conn = null; + getters = U.newHashMap(fields.length); - PreparedStatement stmt = null; + for (JdbcTypeField field : fields) { + String prop = capitalFirst(field.getJavaFieldName()); try { - conn = openConnection(true); - - stmt = conn.prepareStatement(lowerBound == null && upperBound == null - ? em.loadCacheQry - : em.loadCacheRangeQuery(lowerBound != null, upperBound != null)); - - int ix = 1; - - if (lowerBound != null) - for (int i = lowerBound.length; i > 0; i--) - for (int j = 0; j < i; j++) - stmt.setObject(ix++, lowerBound[j]); - - if (upperBound != null) - for (int i = upperBound.length; i > 0; i--) - for (int j = 0; j < i; j++) - stmt.setObject(ix++, upperBound[j]); - - ResultSet rs = stmt.executeQuery(); - - long t = System.currentTimeMillis(); - - while (rs.next()) { - K key = buildObject(em.cacheName, em.keyType(), em.keyColumns(), em.loadColIdxs, rs); - V val = buildObject(em.cacheName, em.valueType(), em.valueColumns(), em.loadColIdxs, rs); - - clo.apply(key, val); - } - } - catch (SQLException e) { - throw new IgniteCheckedException("Failed to load cache", e); - } - finally { - U.closeQuiet(stmt); - - U.closeQuiet(conn); + getters.put(field.getJavaFieldName(), cls.getMethod("get" + prop)); } - - return null; - } - }; - } - - /** - * Construct load cache in one select. - * - * @param m Type mapping description. - * @param clo Closure for loaded values. - * @return Callable for pool submit. - */ - private Callable<Void> loadCacheFull(EntryMapping m, IgniteBiInClosure<K, V> clo) { - return loadCacheRange(m, clo, null, null); - } - - /** - * Object is a simple type. - * - * @param cls Class. - * @return {@code True} if object is a simple type. - */ - private static boolean simpleType(Class<?> cls) { - return (Number.class.isAssignableFrom(cls) || String.class.isAssignableFrom(cls) || - java.util.Date.class.isAssignableFrom(cls) || Boolean.class.isAssignableFrom(cls) || - UUID.class.isAssignableFrom(cls)); - } - - /** - * @param cacheName Cache name to check mapping for. - * @param clsName Class name. - * @param fields Fields descriptors. - * @throws CacheException If failed to check type metadata. - */ - private static void checkMapping(@Nullable String cacheName, String clsName, - CacheJdbcPojoStoreTypeField[] fields) throws CacheException { - try { - Class<?> cls = Class.forName(clsName); - - if (simpleType(cls)) { - if (fields.length != 1) - throw new CacheException("More than one field for simple type [cache name=" + cacheName - + ", type=" + clsName + " ]"); - - CacheJdbcPojoStoreTypeField field = fields[0]; - - if (field.getDatabaseFieldName() == null) - throw new CacheException("Missing database name in mapping description [cache name=" + cacheName - + ", type=" + clsName + " ]"); - - field.setJavaFieldType(cls); - } - else - for (CacheJdbcPojoStoreTypeField field : fields) { - if (field.getDatabaseFieldName() == null) - throw new CacheException("Missing database name in mapping description [cache name=" + cacheName - + ", type=" + clsName + " ]"); - - if (field.getJavaFieldName() == null) - throw new CacheException("Missing field name in mapping description [cache name=" + cacheName - + ", type=" + clsName + " ]"); - - if (field.getJavaFieldType() == null) - throw new CacheException("Missing field type in mapping description [cache name=" + cacheName - + ", type=" + clsName + " ]"); + catch (NoSuchMethodException ignored) { + try { + getters.put(field.getJavaFieldName(), cls.getMethod("is" + prop)); + } + catch (NoSuchMethodException e) { + throw new CacheException("Failed to find getter in POJO class [clsName=" + clsName + + ", prop=" + field.getJavaFieldName() + "]", e); + } } - } - catch (ClassNotFoundException e) { - throw new CacheException("Failed to find class: " + clsName, e); - } - } - - /** - * For backward compatibility translate old field type descriptors to new format. - * - * @param oldFields Fields in old format. - * @return Fields in new format. - */ - @Deprecated - private CacheJdbcPojoStoreTypeField[] translateFields(Collection<CacheTypeFieldMetadata> oldFields) { - CacheJdbcPojoStoreTypeField[] newFields = new CacheJdbcPojoStoreTypeField[oldFields.size()]; - - int idx = 0; - - for (CacheTypeFieldMetadata oldField : oldFields) { - newFields[idx] = new CacheJdbcPojoStoreTypeField(oldField.getDatabaseType(), oldField.getDatabaseName(), - oldField.getJavaType(), oldField.getJavaName()); - - idx++; - } - - return newFields; - } - - /** - * @param cacheName Cache name to check mappings for. - * @return Type mappings for specified cache name. - * @throws CacheException If failed to initialize cache mappings. - */ - private Map<Object, EntryMapping> cacheMappings(@Nullable String cacheName) throws CacheException { - Map<Object, EntryMapping> entryMappings = cacheMappings.get(cacheName); - - if (entryMappings != null) - return entryMappings; - - cacheMappingsLock.lock(); - - try { - entryMappings = cacheMappings.get(cacheName); - - if (entryMappings != null) - return entryMappings; - - // If no types configured, check CacheTypeMetadata for backward compatibility. - if (types == null) { - CacheConfiguration ccfg = ignite().cache(cacheName).getConfiguration(CacheConfiguration.class); - - Collection<CacheTypeMetadata> oldTypes = ccfg.getTypeMetadata(); - - types = new CacheJdbcPojoStoreType[oldTypes.size()]; - - int idx = 0; - - for (CacheTypeMetadata oldType : oldTypes) { - CacheJdbcPojoStoreType newType = new CacheJdbcPojoStoreType(); - newType.setCacheName(cacheName); - - newType.setDatabaseSchema(oldType.getDatabaseSchema()); - newType.setDatabaseTable(oldType.getDatabaseTable()); - - newType.setKeyType(oldType.getKeyType()); - newType.setKeyFields(translateFields(oldType.getKeyFields())); - - newType.setValueType(oldType.getValueType()); - newType.setValueFields(translateFields(oldType.getValueFields())); - - types[idx] = newType; - - idx++; + try { + setters.put(field.getJavaFieldName(), cls.getMethod("set" + prop, field.getJavaFieldType())); } - } - - List<CacheJdbcPojoStoreType> cacheTypes = new ArrayList<>(types.length); - - for (CacheJdbcPojoStoreType type : types) - if ((cacheName != null && cacheName.equals(type.getCacheName())) || - (cacheName == null && type.getCacheName() == null)) - cacheTypes.add(type); - - entryMappings = U.newHashMap(cacheTypes.size()); - - if (!cacheTypes.isEmpty()) { - Map<String, Boolean> tk = new HashMap<>(cacheTypes.size() * 2); - - for (CacheJdbcPojoStoreType type : cacheTypes) { - boolean keepSerialized = type.isKeepSerialized(); - - String keyType = type.getKeyType(); - String valType = type.getValueType(); - - tk.put(keyType, keepSerialized); - tk.put(valType, keepSerialized); - - Object keyTypeId = typeIdForTypeName(keepSerialized, keyType); - - if (entryMappings.containsKey(keyTypeId)) - throw new CacheException("Key type must be unique in type metadata [cache name=" + cacheName + - ", key type=" + keyType + "]"); - - if (!keepSerialized) { - checkMapping(cacheName, keyType, type.getKeyFields()); - checkMapping(cacheName, valType, type.getValueFields()); - } - - entryMappings.put(keyTypeId, new EntryMapping(cacheName, dialect, type)); - } - - keepSerializedTypes.put(cacheName, tk); - - Map<String, Map<Object, EntryMapping>> mappings = new HashMap<>(cacheMappings); - mappings.put(cacheName, entryMappings); - - preparePojoBuilders(cacheName, cacheTypes); - preparePortableBuilders(cacheName, cacheTypes); - - cacheMappings = mappings; - } - - return entryMappings; - } - finally { - cacheMappingsLock.unlock(); - } - } - - /** - * @param cacheName Cache name. - * @param keyTypeId Key type id. - * @param key Key object. - * @return Entry mapping. - * @throws CacheException If mapping for key was not found. - */ - private EntryMapping entryMapping(String cacheName, Object keyTypeId, Object key) throws CacheException { - EntryMapping em = cacheMappings(cacheName).get(keyTypeId); - - if (em == null) { - String maskedCacheName = U.maskName(cacheName); - - throw new CacheException("Failed to find mapping description [key=" + key + - ", cache=" + maskedCacheName + "]. Please configure CacheJdbcPojoStoreType to associate '" + maskedCacheName + "' with JdbcPojoStore."); - } - - return em; - } - - /** {@inheritDoc} */ - @Override public void loadCache(final IgniteBiInClosure<K, V> clo, @Nullable Object... args) - throws CacheLoaderException { - ExecutorService pool = null; - - String cacheName = session().cacheName(); - - try { - pool = Executors.newFixedThreadPool(maxPoolSz); - - Collection<Future<?>> futs = new ArrayList<>(); - - if (args != null && args.length > 0) { - if (args.length % 2 != 0) - throw new CacheLoaderException("Expected even number of arguments, but found: " + args.length); - - if (log.isDebugEnabled()) - log.debug("Start loading entries from db using user queries from arguments"); - - for (int i = 0; i < args.length; i += 2) { - String keyType = args[i].toString(); - - String selQry = args[i + 1].toString(); - - // We must build cache mappings first. - cacheMappings(cacheName); - - EntryMapping em = entryMapping(cacheName, typeIdForTypeName(isKeepSerialized(cacheName, keyType), - keyType), keyType); - - futs.add(pool.submit(new LoadCacheCustomQueryWorker<>(em, selQry, clo))); - } - } - else { - Collection<EntryMapping> entryMappings = cacheMappings(session().cacheName()).values(); - - for (EntryMapping em : entryMappings) { - if (parallelLoadCacheMinThreshold > 0) { - if (log.isDebugEnabled()) - log.debug("Multithread loading entries from db [cache name=" + cacheName + - ", key type=" + em.keyType() + " ]"); - - Connection conn = null; - - try { - conn = connection(); - - PreparedStatement stmt = conn.prepareStatement(em.loadCacheSelRangeQry); - - stmt.setInt(1, parallelLoadCacheMinThreshold); - - ResultSet rs = stmt.executeQuery(); - - if (rs.next()) { - int keyCnt = em.keyCols.size(); - - Object[] upperBound = new Object[keyCnt]; - - for (int i = 0; i < keyCnt; i++) - upperBound[i] = rs.getObject(i + 1); - - futs.add(pool.submit(loadCacheRange(em, clo, null, upperBound))); - - while (rs.next()) { - Object[] lowerBound = upperBound; - - upperBound = new Object[keyCnt]; - - for (int i = 0; i < keyCnt; i++) - upperBound[i] = rs.getObject(i + 1); - - futs.add(pool.submit(loadCacheRange(em, clo, lowerBound, upperBound))); - } - - futs.add(pool.submit(loadCacheRange(em, clo, upperBound, null))); - } - else - futs.add(pool.submit(loadCacheFull(em, clo))); - } - catch (SQLException ignored) { - futs.add(pool.submit(loadCacheFull(em, clo))); - } - finally { - U.closeQuiet(conn); - } - } - else { - if (log.isDebugEnabled()) - log.debug("Single thread loading entries from db [cache name=" + cacheName + - ", key type=" + em.keyType() + " ]"); - - futs.add(pool.submit(loadCacheFull(em, clo))); - } - } - } - - for (Future<?> fut : futs) - U.get(fut); - - if (log.isDebugEnabled()) - log.debug("Cache loaded from db: " + cacheName); - } - catch (IgniteCheckedException e) { - throw new CacheLoaderException("Failed to load cache: " + cacheName, e.getCause()); - } - finally { - U.shutdownNow(getClass(), pool, log); - } - } - - /** {@inheritDoc} */ - @Nullable @Override public V load(K key) throws CacheLoaderException { - assert key != null; - - EntryMapping em = entryMapping(session().cacheName(), typeIdForObject(key), key); - - if (log.isDebugEnabled()) - log.debug("Load value from db [table= " + em.fullTableName() + ", key=" + key + "]"); - - Connection conn = null; - - PreparedStatement stmt = null; - - try { - conn = connection(); - - stmt = conn.prepareStatement(em.loadQrySingle); - - fillKeyParameters(stmt, em, key); - - ResultSet rs = stmt.executeQuery(); - - if (rs.next()) - return buildObject(em.cacheName, em.valueType(), em.valueColumns(), em.loadColIdxs, rs); - } - catch (SQLException e) { - throw new CacheLoaderException("Failed to load object [table=" + em.fullTableName() + - ", key=" + key + "]", e); - } - finally { - end(conn, stmt); - } - - return null; - } - - /** {@inheritDoc} */ - @Override public Map<K, V> loadAll(Iterable<? extends K> keys) throws CacheLoaderException { - assert keys != null; - - Connection conn = null; - - try { - conn = connection(); - - String cacheName = session().cacheName(); - - Map<Object, LoadWorker<K, V>> workers = U.newHashMap(cacheMappings(cacheName).size()); - - Map<K, V> res = new HashMap<>(); - - for (K key : keys) { - Object keyTypeId = typeIdForObject(key); - - EntryMapping em = entryMapping(cacheName, keyTypeId, key); - - LoadWorker<K, V> worker = workers.get(keyTypeId); - - if (worker == null) - workers.put(keyTypeId, worker = new LoadWorker<>(conn, em)); - - worker.keys.add(key); - - if (worker.keys.size() == em.maxKeysPerStmt) - res.putAll(workers.remove(keyTypeId).call()); - } - - for (LoadWorker<K, V> worker : workers.values()) - res.putAll(worker.call()); - - return res; - } - catch (Exception e) { - throw new CacheWriterException("Failed to load entries from database", e); - } - finally { - closeConnection(conn); - } - } - - /** - * @param insStmt Insert statement. - * @param updStmt Update statement. - * @param em Entry mapping. - * @param entry Cache entry. - * @throws CacheWriterException If failed to update record in database. - */ - private void writeUpsert(PreparedStatement insStmt, PreparedStatement updStmt, - EntryMapping em, Cache.Entry<? extends K, ? extends V> entry) throws CacheWriterException { - try { - CacheWriterException we = null; - - for (int attempt = 0; attempt < maxWrtAttempts; attempt++) { - int paramIdx = fillValueParameters(updStmt, 1, em, entry.getValue()); - - fillKeyParameters(updStmt, paramIdx, em, entry.getKey()); - - if (updStmt.executeUpdate() == 0) { - paramIdx = fillKeyParameters(insStmt, em, entry.getKey()); - - fillValueParameters(insStmt, paramIdx, em, entry.getValue()); - - try { - insStmt.executeUpdate(); - - if (attempt > 0) - U.warn(log, "Entry was inserted in database on second try [table=" + em.fullTableName() + - ", entry=" + entry + "]"); - } - catch (SQLException e) { - String sqlState = e.getSQLState(); - - SQLException nested = e.getNextException(); - - while (sqlState == null && nested != null) { - sqlState = nested.getSQLState(); - - nested = nested.getNextException(); - } - - // The error with code 23505 or 23000 is thrown when trying to insert a row that - // would violate a unique index or primary key. - if ("23505".equals(sqlState) || "23000".equals(sqlState)) { - if (we == null) - we = new CacheWriterException("Failed insert entry in database, violate a unique" + - " index or primary key [table=" + em.fullTableName() + ", entry=" + entry + "]"); - - we.addSuppressed(e); - - U.warn(log, "Failed insert entry in database, violate a unique index or primary key" + - " [table=" + em.fullTableName() + ", entry=" + entry + "]"); - - continue; - } - - throw new CacheWriterException("Failed insert entry in database [table=" + em.fullTableName() + - ", entry=" + entry, e); - } - } - - if (attempt > 0) - U.warn(log, "Entry was updated in database on second try [table=" + em.fullTableName() + - ", entry=" + entry + "]"); - - return; - } - - throw we; - } - catch (SQLException e) { - throw new CacheWriterException("Failed update entry in database [table=" + em.fullTableName() + - ", entry=" + entry + "]", e); - } - } - - /** {@inheritDoc} */ - @Override public void write(Cache.Entry<? extends K, ? extends V> entry) throws CacheWriterException { - assert entry != null; - - K key = entry.getKey(); - - EntryMapping em = entryMapping(session().cacheName(), typeIdForObject(key), key); - - if (log.isDebugEnabled()) - log.debug("Start write entry to database [table=" + em.fullTableName() + ", entry=" + entry + "]"); - - Connection conn = null; - - try { - conn = connection(); - - if (dialect.hasMerge()) { - PreparedStatement stmt = null; - - try { - stmt = conn.prepareStatement(em.mergeQry); - - int i = fillKeyParameters(stmt, em, key); - - fillValueParameters(stmt, i, em, entry.getValue()); - - int updCnt = stmt.executeUpdate(); - - if (updCnt != 1) - U.warn(log, "Unexpected number of updated entries [table=" + em.fullTableName() + - ", entry=" + entry + "expected=1, actual=" + updCnt + "]"); - } - finally { - U.closeQuiet(stmt); - } - } - else { - PreparedStatement insStmt = null; - - PreparedStatement updStmt = null; - - try { - insStmt = conn.prepareStatement(em.insQry); - - updStmt = conn.prepareStatement(em.updQry); - - writeUpsert(insStmt, updStmt, em, entry); - } - finally { - U.closeQuiet(insStmt); - - U.closeQuiet(updStmt); - } - } - } - catch (SQLException e) { - throw new CacheWriterException("Failed to write entry to database [table=" + em.fullTableName() + - ", entry=" + entry + "]", e); - } - finally { - closeConnection(conn); - } - } - - /** {@inheritDoc} */ - @Override public void writeAll(final Collection<Cache.Entry<? extends K, ? extends V>> entries) - throws CacheWriterException { - assert entries != null; - - Connection conn = null; - - try { - conn = connection(); - - String cacheName = session().cacheName(); - - Object currKeyTypeId = null; - - if (dialect.hasMerge()) { - PreparedStatement mergeStmt = null; - - try { - EntryMapping em = null; - - LazyValue<Object[]> lazyEntries = new LazyValue<Object[]>() { - @Override public Object[] create() { - return entries.toArray(); - } - }; - - int fromIdx = 0, prepared = 0; - - for (Cache.Entry<? extends K, ? extends V> entry : entries) { - K key = entry.getKey(); - - Object keyTypeId = typeIdForObject(key); - - em = entryMapping(cacheName, keyTypeId, key); - - if (currKeyTypeId == null || !currKeyTypeId.equals(keyTypeId)) { - if (mergeStmt != null) { - if (log.isDebugEnabled()) - log.debug("Write entries to db [cache name=" + cacheName + - ", key type=" + em.keyType() + ", count=" + prepared + "]"); - - executeBatch(em, mergeStmt, "writeAll", fromIdx, prepared, lazyEntries); - - U.closeQuiet(mergeStmt); - } - - mergeStmt = conn.prepareStatement(em.mergeQry); - - currKeyTypeId = keyTypeId; - - fromIdx += prepared; - - prepared = 0; - } - - int i = fillKeyParameters(mergeStmt, em, key); - - fillValueParameters(mergeStmt, i, em, entry.getValue()); - - mergeStmt.addBatch(); - - if (++prepared % batchSz == 0) { - if (log.isDebugEnabled()) - log.debug("Write entries to db [cache name=" + cacheName + - ", key type=" + em.keyType() + ", count=" + prepared + "]"); - - executeBatch(em, mergeStmt, "writeAll", fromIdx, prepared, lazyEntries); - - fromIdx += prepared; - - prepared = 0; - } - } - - if (mergeStmt != null && prepared % batchSz != 0) { - if (log.isDebugEnabled()) - log.debug("Write entries to db [cache name=" + cacheName + - ", key type=" + em.keyType() + ", count=" + prepared + "]"); - - executeBatch(em, mergeStmt, "writeAll", fromIdx, prepared, lazyEntries); - - } - } - finally { - U.closeQuiet(mergeStmt); - } - } - else { - if (log.isDebugEnabled()) - log.debug("Write entries to db one by one using update and insert statements [cache name=" + - cacheName + ", count=" + entries.size() + "]"); - - PreparedStatement insStmt = null; - - PreparedStatement updStmt = null; - - try { - for (Cache.Entry<? extends K, ? extends V> entry : entries) { - K key = entry.getKey(); - - Object keyTypeId = typeIdForObject(key); - - EntryMapping em = entryMapping(cacheName, keyTypeId, key); - - if (currKeyTypeId == null || !currKeyTypeId.equals(keyTypeId)) { - U.closeQuiet(insStmt); - - insStmt = conn.prepareStatement(em.insQry); - - U.closeQuiet(updStmt); - - updStmt = conn.prepareStatement(em.updQry); - - currKeyTypeId = keyTypeId; - } - - writeUpsert(insStmt, updStmt, em, entry); - } - } - finally { - U.closeQuiet(insStmt); - - U.closeQuiet(updStmt); - } - } - } - catch (SQLException e) { - throw new CacheWriterException("Failed to write entries in database", e); - } - finally { - closeConnection(conn); - } - } - - /** {@inheritDoc} */ - @Override public void delete(Object key) throws CacheWriterException { - assert key != null; - - EntryMapping em = entryMapping(session().cacheName(), typeIdForObject(key), key); - - if (log.isDebugEnabled()) - log.debug("Remove value from db [table=" + em.fullTableName() + ", key=" + key + "]"); - - Connection conn = null; - - PreparedStatement stmt = null; - - try { - conn = connection(); - - stmt = conn.prepareStatement(em.remQry); - - fillKeyParameters(stmt, em, key); - - int delCnt = stmt.executeUpdate(); - - if (delCnt != 1) - U.warn(log, "Unexpected number of deleted entries [table=" + em.fullTableName() + ", key=" + key + - ", expected=1, actual=" + delCnt + "]"); - } - catch (SQLException e) { - throw new CacheWriterException("Failed to remove value from database [table=" + em.fullTableName() + - ", key=" + key + "]", e); - } - finally { - end(conn, stmt); - } - } - - /** - * @param em Entry mapping. - * @param stmt Statement. - * @param desc Statement description for error message. - * @param fromIdx Objects in batch start from index. - * @param prepared Expected objects in batch. - * @param lazyObjs All objects used in batch statement as array. - * @throws SQLException If failed to execute batch statement. - */ - private void executeBatch(EntryMapping em, Statement stmt, String desc, int fromIdx, int prepared, - LazyValue<Object[]> lazyObjs) throws SQLException { - try { - int[] rowCounts = stmt.executeBatch(); - - int numOfRowCnt = rowCounts.length; - - if (numOfRowCnt != prepared) - U.warn(log, "Unexpected number of updated rows [table=" + em.fullTableName() + ", expected=" + prepared + - ", actual=" + numOfRowCnt + "]"); - - for (int i = 0; i < numOfRowCnt; i++) { - int cnt = rowCounts[i]; - - if (cnt != 1 && cnt != SUCCESS_NO_INFO) { - Object[] objs = lazyObjs.value(); - - U.warn(log, "Batch " + desc + " returned unexpected updated row count [table=" + em.fullTableName() + - ", entry=" + objs[fromIdx + i] + ", expected=1, actual=" + cnt + "]"); - } - } - } - catch (BatchUpdateException be) { - int[] rowCounts = be.getUpdateCounts(); - - for (int i = 0; i < rowCounts.length; i++) { - if (rowCounts[i] == EXECUTE_FAILED) { - Object[] objs = lazyObjs.value(); - - U.warn(log, "Batch " + desc + " failed on execution [table=" + em.fullTableName() + - ", entry=" + objs[fromIdx + i] + "]"); - } - } - - throw be; - } - } - - /** {@inheritDoc} */ - @Override public void deleteAll(final Collection<?> keys) throws CacheWriterException { - assert keys != null; - - Connection conn = null; - - try { - conn = connection(); - - LazyValue<Object[]> lazyKeys = new LazyValue<Object[]>() { - @Override public Object[] create() { - return keys.toArray(); - } - }; - - String cacheName = session().cacheName(); - - Object currKeyTypeId = null; - - EntryMapping em = null; - - PreparedStatement delStmt = null; - - int fromIdx = 0, prepared = 0; - - for (Object key : keys) { - Object keyTypeId = typeIdForObject(key); - - em = entryMapping(cacheName, keyTypeId, key); - - if (delStmt == null) { - delStmt = conn.prepareStatement(em.remQry); - - currKeyTypeId = keyTypeId; - } - - if (!currKeyTypeId.equals(keyTypeId)) { - if (log.isDebugEnabled()) - log.debug("Delete entries from db [cache name=" + cacheName + - ", key type=" + em.keyType() + ", count=" + prepared + "]"); - - executeBatch(em, delStmt, "deleteAll", fromIdx, prepared, lazyKeys); - - fromIdx += prepared; - - prepared = 0; - - currKeyTypeId = keyTypeId; - } - - fillKeyParameters(delStmt, em, key); - - delStmt.addBatch(); - - if (++prepared % batchSz == 0) { - if (log.isDebugEnabled()) - log.debug("Delete entries from db [cache name=" + cacheName + - ", key type=" + em.keyType() + ", count=" + prepared + "]"); - - executeBatch(em, delStmt, "deleteAll", fromIdx, prepared, lazyKeys); - - fromIdx += prepared; - - prepared = 0; - } - } - - if (delStmt != null && prepared % batchSz != 0) { - if (log.isDebugEnabled()) - log.debug("Delete entries from db [cache name=" + cacheName + - ", key type=" + em.keyType() + ", count=" + prepared + "]"); - - executeBatch(em, delStmt, "deleteAll", fromIdx, prepared, lazyKeys); - } - } - catch (SQLException e) { - throw new CacheWriterException("Failed to remove values from database", e); - } - finally { - closeConnection(conn); - } - } - - /** - * Sets the value of the designated parameter using the given object. - * - * @param stmt Prepare statement. - * @param i Index for parameters. - * @param field Field descriptor. - * @param fieldVal Field value. - * @throws CacheException If failed to set statement parameter. - */ - private void fillParameter(PreparedStatement stmt, int i, CacheJdbcPojoStoreTypeField field, @Nullable Object fieldVal) - throws CacheException { - try { - if (fieldVal != null) { - if (field.getJavaFieldType() == UUID.class) { - switch (field.getDatabaseFieldType()) { - case Types.BINARY: - fieldVal = U.uuidToBytes((UUID)fieldVal); - - break; - case Types.CHAR: - case Types.VARCHAR: - fieldVal = fieldVal.toString(); - - break; - } - } - - stmt.setObject(i, fieldVal); - } - else - stmt.setNull(i, field.getDatabaseFieldType()); - } - catch (SQLException e) { - throw new CacheException("Failed to set statement parameter name: " + field.getDatabaseFieldName(), e); - } - } - - /** - * @param stmt Prepare statement. - * @param idx Start index for parameters. - * @param em Entry mapping. - * @param key Key object. - * @return Next index for parameters. - * @throws CacheException If failed to set statement parameters. - */ - private int fillKeyParameters(PreparedStatement stmt, int idx, EntryMapping em, - Object key) throws CacheException { - for (CacheJdbcPojoStoreTypeField field : em.keyColumns()) { - Object fieldVal = extractParameter(em.cacheName, em.keyType(), field.getJavaFieldName(), key); - - fillParameter(stmt, idx++, field, fieldVal); - } - - return idx; - } - - /** - * @param stmt Prepare statement. - * @param m Type mapping description. - * @param key Key object. - * @return Next index for parameters. - * @throws CacheException If failed to set statement parameters. - */ - private int fillKeyParameters(PreparedStatement stmt, EntryMapping m, Object key) throws CacheException { - return fillKeyParameters(stmt, 1, m, key); - } - - /** - * @param stmt Prepare statement. - * @param idx Start index for parameters. - * @param em Type mapping description. - * @param val Value object. - * @return Next index for parameters. - * @throws CacheException If failed to set statement parameters. - */ - private int fillValueParameters(PreparedStatement stmt, int idx, EntryMapping em, Object val) - throws CacheWriterException { - for (CacheJdbcPojoStoreTypeField field : em.uniqValFields) { - Object fieldVal = extractParameter(em.cacheName, em.valueType(), field.getJavaFieldName(), val); - - fillParameter(stmt, idx++, field, fieldVal); - } - - return idx; - } - - /** - * @return Data source. - */ - public DataSource getDataSource() { - return dataSrc; - } - - /** - * @param dataSrc Data source. - */ - public void setDataSource(DataSource dataSrc) { - this.dataSrc = dataSrc; - } - - /** - * Get database dialect. - * - * @return Database dialect. - */ - public JdbcDialect getDialect() { - return dialect; - } - - /** - * Set database dialect. - * - * @param dialect Database dialect. - */ - public void setDialect(JdbcDialect dialect) { - this.dialect = dialect; - } - - /** - * Get Max workers thread count. These threads are responsible for execute query. - * - * @return Max workers thread count. - */ - public int getMaximumPoolSize() { - return maxPoolSz; - } - - /** - * Gets maximum number of write attempts in case of database error. - * - * @return Maximum number of write attempts. - */ - public int getMaximumWriteAttempts() { - return maxWrtAttempts; - } - - /** - * Sets maximum number of write attempts in case of database error. - * - * @param maxWrtAttempts Number of write attempts. - * @return {@code This} for chaining. - */ - public CacheJdbcPojoStore<K, V> setMaximumWriteAttempts(int maxWrtAttempts) { - this.maxWrtAttempts = maxWrtAttempts; - - return this; - } - - /** - * Gets types known by store. - * - * @return Types known by store. - */ - public CacheJdbcPojoStoreType[] getTypes() { - return types; - } - - /** - * Sets store configurations. - * - * @param types Store should process. - * @return {@code This} for chaining. - */ - public CacheJdbcPojoStore<K, V> setTypes(CacheJdbcPojoStoreType... types) { - this.types = types; - - return this; - } - - /** - * Set Max workers thread count. These threads are responsible for execute query. - * - * @param maxPoolSz Max workers thread count. - */ - public void setMaximumPoolSize(int maxPoolSz) { - this.maxPoolSz = maxPoolSz; - } - - /** - * Get maximum batch size for delete and delete operations. - * - * @return Maximum batch size. - */ - public int getBatchSize() { - return batchSz; - } - - /** - * Set maximum batch size for write and delete operations. - * - * @param batchSz Maximum batch size. - */ - public void setBatchSize(int batchSz) { - this.batchSz = batchSz; - } - - /** - * Parallel load cache minimum row count threshold. - * - * @return If {@code 0} then load sequentially. - */ - public int getParallelLoadCacheMinimumThreshold() { - return parallelLoadCacheMinThreshold; - } - - /** - * Parallel load cache minimum row count threshold. - * - * @param parallelLoadCacheMinThreshold Minimum row count threshold. If {@code 0} then load sequentially. - */ - public void setParallelLoadCacheMinimumThreshold(int parallelLoadCacheMinThreshold) { - this.parallelLoadCacheMinThreshold = parallelLoadCacheMinThreshold; - } - - /** - * @return Ignite instance. - */ - private Ignite ignite() { - return ignite; - } - - /** - * @return Store session. - */ - private CacheStoreSession session() { - return ses; - } - - /** - * POJO methods cache. - */ - private static class PojoMethodsCache { - /** POJO class. */ - private final Class<?> cls; - - /** Constructor for POJO object. */ - private Constructor ctor; - - /** {@code true} if object is a simple type. */ - private final boolean simple; - - /** Cached setters for POJO object. */ - private Map<String, Method> getters; - - /** Cached getters for POJO object. */ - private Map<String, Method> setters; - - /** - * POJO methods cache. - * - * @param clsName Class name. - * @param fields Fields. - * @throws CacheException If failed to construct type cache. - */ - public PojoMethodsCache(String clsName, CacheJdbcPojoStoreTypeField[] fields) throws CacheException { - try { - cls = Class.forName(clsName); - - if (simple = simpleType(cls)) - return; - - ctor = cls.getDeclaredConstructor(); - - if (!ctor.isAccessible()) - ctor.setAccessible(true); - } - catch (ClassNotFoundException e) { - throw new CacheException("Failed to find class: " + clsName, e); - } - catch (NoSuchMethodException e) { - throw new CacheException("Failed to find default constructor for class: " + clsName, e); - } - - setters = U.newHashMap(fields.length); - - getters = U.newHashMap(fields.length); - - for (CacheJdbcPojoStoreTypeField field : fields) { - String prop = capitalFirst(field.getJavaFieldName()); - - try { - getters.put(field.getJavaFieldName(), cls.getMethod("get" + prop)); - } - catch (NoSuchMethodException ignored) { - try { - getters.put(field.getJavaFieldName(), cls.getMethod("is" + prop)); - } - catch (NoSuchMethodException e) { - throw new CacheException("Failed to find getter in POJO class [clsName=" + clsName + - ", prop=" + field.getJavaFieldName() + "]", e); - } - } - - try { - setters.put(field.getJavaFieldName(), cls.getMethod("set" + prop, field.getJavaFieldType())); - } - catch (NoSuchMethodException e) { - throw new CacheException("Failed to find setter in POJO class [clsName=" + clsName + - ", prop=" + field.getJavaFieldName() + "]", e); + catch (NoSuchMethodException e) { + throw new CacheException("Failed to find setter in POJO class [clsName=" + clsName + + ", prop=" + field.getJavaFieldName() + "]", e); } } } @@ -1925,367 +465,4 @@ public class CacheJdbcPojoStore<K, V> implements CacheStore<K, V>, LifecycleAwar str.isEmpty() ? "" : Character.toUpperCase(str.charAt(0)) + str.substring(1); } } - - /** - * Entry mapping description. - */ - private static class EntryMapping { - /** Cache name. */ - private final String cacheName; - - /** Database dialect. */ - private final JdbcDialect dialect; - - /** Select border for range queries. */ - private final String loadCacheSelRangeQry; - - /** Select all items query. */ - private final String loadCacheQry; - - /** Select item query. */ - private final String loadQrySingle; - - /** Select items query. */ - private final String loadQry; - - /** Merge item(s) query. */ - private final String mergeQry; - - /** Update item query. */ - private final String insQry; - - /** Update item query. */ - private final String updQry; - - /** Remove item(s) query. */ - private final String remQry; - - /** Max key count for load query per statement. */ - private final int maxKeysPerStmt; - - /** Database key columns. */ - private final Collection<String> keyCols; - - /** Database unique value columns. */ - private final Collection<String> cols; - - /** Select query columns index. */ - private final Map<String, Integer> loadColIdxs; - - /** Unique value fields. */ - private final Collection<CacheJdbcPojoStoreTypeField> uniqValFields; - - /** Type metadata. */ - private final CacheJdbcPojoStoreType typeMeta; - - /** Full table name. */ - private final String fullTblName; - - /** - * @param cacheName Cache name. - * @param dialect JDBC dialect. - * @param typeMeta Type metadata. - */ - public EntryMapping(@Nullable String cacheName, JdbcDialect dialect, CacheJdbcPojoStoreType typeMeta) { - this.cacheName = cacheName; - - this.dialect = dialect; - - this.typeMeta = typeMeta; - - CacheJdbcPojoStoreTypeField[] keyFields = typeMeta.getKeyFields(); - - CacheJdbcPojoStoreTypeField[] valFields = typeMeta.getValueFields(); - - keyCols = databaseColumns(F.asList(keyFields)); - - uniqValFields = F.view(F.asList(valFields), new IgnitePredicate<CacheJdbcPojoStoreTypeField>() { - @Override public boolean apply(CacheJdbcPojoStoreTypeField col) { - return !keyCols.contains(col.getDatabaseFieldName()); - } - }); - - String schema = typeMeta.getDatabaseSchema(); - - String tblName = typeMeta.getDatabaseTable(); - - fullTblName = F.isEmpty(schema) ? tblName : schema + "." + tblName; - - Collection<String> uniqValCols = databaseColumns(uniqValFields); - - cols = F.concat(false, keyCols, uniqValCols); - - loadColIdxs = U.newHashMap(cols.size()); - - int idx = 1; - - for (String col : cols) - loadColIdxs.put(col, idx++); - - loadCacheQry = dialect.loadCacheQuery(fullTblName, cols); - - loadCacheSelRangeQry = dialect.loadCacheSelectRangeQuery(fullTblName, keyCols); - - loadQrySingle = dialect.loadQuery(fullTblName, keyCols, cols, 1); - - maxKeysPerStmt = dialect.getMaxParameterCount() / keyCols.size(); - - loadQry = dialect.loadQuery(fullTblName, keyCols, cols, maxKeysPerStmt); - - insQry = dialect.insertQuery(fullTblName, keyCols, uniqValCols); - - updQry = dialect.updateQuery(fullTblName, keyCols, uniqValCols); - - mergeQry = dialect.mergeQuery(fullTblName, keyCols, uniqValCols); - - remQry = dialect.removeQuery(fullTblName, keyCols); - } - - /** - * Extract database column names from {@link CacheJdbcPojoStoreTypeField}. - * - * @param dsc collection of {@link CacheJdbcPojoStoreTypeField}. - * @return Collection with database column names. - */ - private static Collection<String> databaseColumns(Collection<CacheJdbcPojoStoreTypeField> dsc) { - return F.transform(dsc, new C1<CacheJdbcPojoStoreTypeField, String>() { - /** {@inheritDoc} */ - @Override public String apply(CacheJdbcPojoStoreTypeField col) { - return col.getDatabaseFieldName(); - } - }); - } - - /** - * Construct query for select values with key count less or equal {@code maxKeysPerStmt} - * - * @param keyCnt Key count. - * @return Load query statement text. - */ - private String loadQuery(int keyCnt) { - assert keyCnt <= maxKeysPerStmt; - - if (keyCnt == maxKeysPerStmt) - return loadQry; - - if (keyCnt == 1) - return loadQrySingle; - - return dialect.loadQuery(fullTblName, keyCols, cols, keyCnt); - } - - /** - * Construct query for select values in range. - * - * @param appendLowerBound Need add lower bound for range. - * @param appendUpperBound Need add upper bound for range. - * @return Query with range. - */ - private String loadCacheRangeQuery(boolean appendLowerBound, boolean appendUpperBound) { - return dialect.loadCacheRangeQuery(fullTblName, keyCols, cols, appendLowerBound, appendUpperBound); - } - - /** - * @return Key type. - */ - protected String keyType() { - return typeMeta.getKeyType(); - } - - /** - * @return Value type. - */ - protected String valueType() { - return typeMeta.getValueType(); - } - - /** - * Gets key columns. - * - * @return Key columns. - */ - protected CacheJdbcPojoStoreTypeField[] keyColumns() { - return typeMeta.getKeyFields(); - } - - /** - * Gets value columns. - * - * @return Value columns. - */ - protected CacheJdbcPojoStoreTypeField[] valueColumns() { - return typeMeta.getValueFields(); - } - - /** - * Get full table name. - * - * @return <schema>.<table name> - */ - protected String fullTableName() { - return fullTblName; - } - } - - /** - * Worker for load cache using custom user query. - * - * @param <K1> Key type. - * @param <V1> Value type. - */ - private class LoadCacheCustomQueryWorker<K1, V1> implements Callable<Void> { - /** Entry mapping description. */ - private final EntryMapping em; - - /** User query. */ - private final String qry; - - /** Closure for loaded values. */ - private final IgniteBiInClosure<K1, V1> clo; - - /** - * @param em Entry mapping description. - * @param qry User query. - * @param clo Closure for loaded values. - */ - private LoadCacheCustomQueryWorker(EntryMapping em, String qry, IgniteBiInClosure<K1, V1> clo) { - this.em = em; - this.qry = qry; - this.clo = clo; - } - - /** {@inheritDoc} */ - @Override public Void call() throws Exception { - if (log.isDebugEnabled()) - log.debug("Load cache using custom query [cache name= " + em.cacheName + - ", key type=" + em.keyType() + ", query=" + qry + "]"); - - Connection conn = null; - - PreparedStatement stmt = null; - - try { - conn = openConnection(true); - - stmt = conn.prepareStatement(qry); - - ResultSet rs = stmt.executeQuery(); - - ResultSetMetaData meta = rs.getMetaData(); - - Map<String, Integer> colIdxs = U.newHashMap(meta.getColumnCount()); - - for (int i = 1; i <= meta.getColumnCount(); i++) - colIdxs.put(meta.getColumnLabel(i), i); - - while (rs.next()) { - K1 key = buildObject(em.cacheName, em.keyType(), em.keyColumns(), colIdxs, rs); - V1 val = buildObject(em.cacheName, em.valueType(), em.valueColumns(), colIdxs, rs); - - clo.apply(key, val); - } - - return null; - } - catch (SQLException e) { - throw new CacheLoaderException("Failed to execute custom query for load cache", e); - } - finally { - U.closeQuiet(stmt); - - U.closeQuiet(conn); - } - } - } - - /** - * Lazy initialization of value. - * - * @param <T> Cached object type - */ - private abstract static class LazyValue<T> { - /** Cached value. */ - private T val; - - /** - * @return Construct value. - */ - protected abstract T create(); - - /** - * @return Value. - */ - public T value() { - if (val == null) - val = create(); - - return val; - } - } - - /** - * Worker for load by keys. - * - * @param <K1> Key type. - * @param <V1> Value type. - */ - private class LoadWorker<K1, V1> implements Callable<Map<K1, V1>> { - /** Connection. */ - private final Connection conn; - - /** Keys for load. */ - private final Collection<K1> keys; - - /** Entry mapping description. */ - private final EntryMapping em; - - /** - * @param conn Connection. - * @param em Entry mapping description. - */ - private LoadWorker(Connection conn, EntryMapping em) { - this.conn = conn; - this.em = em; - - keys = new ArrayList<>(em.maxKeysPerStmt); - } - - /** {@inheritDoc} */ - @Override public Map<K1, V1> call() throws Exception { - if (log.isDebugEnabled()) - log.debug("Load values from db [table= " + em.fullTableName() + - ", key count=" + keys.size() + "]"); - - PreparedStatement stmt = null; - - try { - stmt = conn.prepareStatement(em.loadQuery(keys.size())); - - int idx = 1; - - for (Object key : keys) - for (CacheJdbcPojoStoreTypeField field : em.keyColumns()) { - Object fieldVal = extractParameter(em.cacheName, em.keyType(), field.getJavaFieldName(), key); - - fillParameter(stmt, idx++, field, fieldVal); - } - - ResultSet rs = stmt.executeQuery(); - - Map<K1, V1> entries = U.newHashMap(keys.size()); - - while (rs.next()) { - K1 key = buildObject(em.cacheName, em.keyType(), em.keyColumns(), em.loadColIdxs, rs); - V1 val = buildObject(em.cacheName, em.valueType(), em.valueColumns(), em.loadColIdxs, rs); - - entries.put(key, val); - } - - return entries; - } - finally { - U.closeQuiet(stmt); - } - } - } }
http://git-wip-us.apache.org/repos/asf/ignite/blob/2961aaa7/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreConfiguration.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreConfiguration.java index b333bc7..206f1fe 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreConfiguration.java @@ -56,7 +56,7 @@ public class CacheJdbcPojoStoreConfiguration implements Serializable { private int parallelLoadCacheMinThreshold = DFLT_PARALLEL_LOAD_CACHE_MINIMUM_THRESHOLD; /** Types that store could process. */ - private CacheJdbcPojoStoreType[] types; + private JdbcType[] types; /** * Empty constructor (all values are initialized to their defaults). @@ -212,7 +212,7 @@ public class CacheJdbcPojoStoreConfiguration implements Serializable { * * @return Types known by store. */ - public CacheJdbcPojoStoreType[] getTypes() { + public JdbcType[] getTypes() { return types; } @@ -222,7 +222,7 @@ public class CacheJdbcPojoStoreConfiguration implements Serializable { * @param types Store should process. * @return {@code This} for chaining. */ - public CacheJdbcPojoStoreConfiguration setTypes(CacheJdbcPojoStoreType... types) { + public CacheJdbcPojoStoreConfiguration setTypes(JdbcType... types) { this.types = types; return this; http://git-wip-us.apache.org/repos/asf/ignite/blob/2961aaa7/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreType.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreType.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreType.java deleted file mode 100644 index e755165..0000000 --- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreType.java +++ /dev/null @@ -1,272 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.cache.store.jdbc; - -import java.io.Serializable; -import org.apache.ignite.internal.util.tostring.GridToStringInclude; - -/** - * Description for type that could be stored into database by store. - */ -public class CacheJdbcPojoStoreType implements Serializable { - /** */ - private static final long serialVersionUID = 0L; - - /** Cache name. */ - private String cacheName; - - /** Schema name in database. */ - private String dbSchema; - - /** Table name in database. */ - private String dbTbl; - - /** Key class used to store key in cache. */ - private String keyType; - - /** List of fields descriptors for key object. */ - @GridToStringInclude - private CacheJdbcPojoStoreTypeField[] keyFields; - - /** Value class used to store value in cache. */ - private String valType; - - /** List of fields descriptors for value object. */ - @GridToStringInclude - private CacheJdbcPojoStoreTypeField[] valFields; - - /** If {@code true} object is stored as IgniteObject. */ - private boolean keepSerialized; - - /** - * Empty constructor (all values are initialized to their defaults). - */ - public CacheJdbcPojoStoreType() { - /* No-op. */ - } - - /** - * Copy constructor. - * - * @param type Type to copy. - */ - public CacheJdbcPojoStoreType(CacheJdbcPojoStoreType type) { - cacheName = type.getCacheName(); - - dbSchema = type.getDatabaseSchema(); - dbTbl = type.getDatabaseTable(); - - keyType = type.getKeyType(); - keyFields = type.getKeyFields(); - - valType = type.getValueType(); - valFields = type.getValueFields(); - - keepSerialized = type.isKeepSerialized(); - } - - /** - * Gets associated cache name. - * - * @return Сache name. - */ - public String getCacheName() { - return cacheName; - } - - /** - * Sets associated cache name. - * - * @param cacheName Cache name. - */ - public CacheJdbcPojoStoreType setCacheName(String cacheName) { - this.cacheName = cacheName; - - return this; - } - - /** - * Gets database schema name. - * - * @return Schema name. - */ - public String getDatabaseSchema() { - return dbSchema; - } - - /** - * Sets database schema name. - * - * @param dbSchema Schema name. - */ - public CacheJdbcPojoStoreType setDatabaseSchema(String dbSchema) { - this.dbSchema = dbSchema; - - return this; - } - - /** - * Gets table name in database. - * - * @return Table name in database. - */ - public String getDatabaseTable() { - return dbTbl; - } - - /** - * Table name in database. - * - * @param dbTbl Table name in database. - * @return {@code this} for chaining. - */ - public CacheJdbcPojoStoreType setDatabaseTable(String dbTbl) { - this.dbTbl = dbTbl; - - return this; - } - - /** - * Gets key type. - * - * @return Key type. - */ - public String getKeyType() { - return keyType; - } - - /** - * Sets key type. - * - * @param keyType Key type. - * @return {@code this} for chaining. - */ - public CacheJdbcPojoStoreType setKeyType(String keyType) { - this.keyType = keyType; - - return this; - } - - /** - * Sets key type. - * - * @param cls Key type class. - * @return {@code this} for chaining. - */ - public CacheJdbcPojoStoreType setKeyType(Class<?> cls) { - setKeyType(cls.getName()); - - return this; - } - - /** - * Gets value type. - * - * @return Key type. - */ - public String getValueType() { - return valType; - } - - /** - * Sets value type. - * - * @param valType Value type. - * @return {@code this} for chaining. - */ - public CacheJdbcPojoStoreType setValueType(String valType) { - this.valType = valType; - - return this; - } - - /** - * Sets value type. - * - * @param cls Value type class. - * @return {@code this} for chaining. - */ - public CacheJdbcPojoStoreType setValueType(Class<?> cls) { - setValueType(cls.getName()); - - return this; - } - - /** - * Gets optional persistent key fields (needed only if {@link CacheJdbcPojoStore} is used). - * - * @return Persistent key fields. - */ - public CacheJdbcPojoStoreTypeField[] getKeyFields() { - return keyFields; - } - - /** - * Sets optional persistent key fields (needed only if {@link CacheJdbcPojoStore} is used). - * - * @param keyFields Persistent key fields. - * @return {@code this} for chaining. - */ - public CacheJdbcPojoStoreType setKeyFields(CacheJdbcPojoStoreTypeField... keyFields) { - this.keyFields = keyFields; - - return this; - } - - /** - * Gets optional persistent value fields (needed only if {@link CacheJdbcPojoStore} is used). - * - * @return Persistent value fields. - */ - public CacheJdbcPojoStoreTypeField[] getValueFields() { - return valFields; - } - - /** - * Sets optional persistent value fields (needed only if {@link CacheJdbcPojoStore} is used). - * - * @param valFields Persistent value fields. - * @return {@code this} for chaining. - */ - public CacheJdbcPojoStoreType setValueFields(CacheJdbcPojoStoreTypeField... valFields) { - this.valFields = valFields; - - return this; - } - - /** - * Gets how value stored in cache. - * - * @return {@code true} if object is stored as IgniteObject. - */ - public boolean isKeepSerialized() { - return keepSerialized; - } - - /** - * Sets how value stored in cache. - * - * @param keepSerialized {@code true} if object is stored as IgniteObject. - * @return {@code this} for chaining. - */ - public CacheJdbcPojoStoreType setKeepSerialized(boolean keepSerialized) { - this.keepSerialized = keepSerialized; - - return this; - } -}
