Repository: ignite Updated Branches: refs/heads/ignite-1753-1282 [created] 8c1a71b28
http://git-wip-us.apache.org/repos/asf/ignite/blob/8c1a71b2/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreConfiguration.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreConfiguration.java new file mode 100644 index 0000000..b333bc7 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreConfiguration.java @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.store.jdbc; + +import org.apache.ignite.cache.store.jdbc.dialect.*; + +import java.io.*; + +/** + * JDBC POJO store configuration. + */ +public class CacheJdbcPojoStoreConfiguration implements Serializable { + /** */ + private static final long serialVersionUID = 0L; + + /** Default value for write attempts. */ + public static final int DFLT_WRITE_ATTEMPTS = 2; + + /** Default batch size for put and remove operations. */ + public static final int DFLT_BATCH_SIZE = 512; + + /** Default batch size for put and remove operations. */ + public static final int DFLT_PARALLEL_LOAD_CACHE_MINIMUM_THRESHOLD = 512; + + /** Maximum batch size for writeAll and deleteAll operations. */ + private int batchSz = DFLT_BATCH_SIZE; + + /** Name of data source bean. */ + private String dataSrcBean; + + /** Database dialect. */ + private JdbcDialect dialect; + + /** Max workers thread count. These threads are responsible for load cache. */ + private int maxPoolSz = Runtime.getRuntime().availableProcessors(); + + /** Maximum write attempts in case of database error. */ + private int maxWrtAttempts = DFLT_WRITE_ATTEMPTS; + + /** Parallel load cache minimum threshold. If {@code 0} then load sequentially. */ + private int parallelLoadCacheMinThreshold = DFLT_PARALLEL_LOAD_CACHE_MINIMUM_THRESHOLD; + + /** Types that store could process. */ + private CacheJdbcPojoStoreType[] types; + + /** + * Empty constructor (all values are initialized to their defaults). + */ + public CacheJdbcPojoStoreConfiguration() { + /* No-op. */ + } + + /** + * Copy constructor. + * + * @param cfg Configuration to copy. + */ + public CacheJdbcPojoStoreConfiguration(CacheJdbcPojoStoreConfiguration cfg) { + // Order alphabetically for maintenance purposes. + batchSz = cfg.getBatchSize(); + dataSrcBean = cfg.getDataSourceBean(); + dialect = cfg.getDialect(); + maxPoolSz = cfg.getMaximumPoolSize(); + maxWrtAttempts = cfg.getMaximumWriteAttempts(); + parallelLoadCacheMinThreshold = cfg.getParallelLoadCacheMinimumThreshold(); + types = cfg.getTypes(); + } + + /** + * Get maximum batch size for delete and delete operations. + * + * @return Maximum batch size. + */ + public int getBatchSize() { + return batchSz; + } + + /** + * Set maximum batch size for write and delete operations. + * + * @param batchSz Maximum batch size. + * @return {@code This} for chaining. + */ + public CacheJdbcPojoStoreConfiguration setBatchSize(int batchSz) { + this.batchSz = batchSz; + + return this; + } + + /** + * Gets name of the data source bean. + * + * @return Data source bean name. + */ + public String getDataSourceBean() { + return dataSrcBean; + } + + /** + * Sets name of the data source bean. + * + * @param dataSrcBean Data source bean name. + * @return {@code This} for chaining. + */ + public CacheJdbcPojoStoreConfiguration setDataSourceBean(String dataSrcBean) { + this.dataSrcBean = dataSrcBean; + + return this; + } + + /** + * Get database dialect. + * + * @return Database dialect. + */ + public JdbcDialect getDialect() { + return dialect; + } + + /** + * Set database dialect. + * + * @param dialect Database dialect. + * @return {@code This} for chaining. + */ + public CacheJdbcPojoStoreConfiguration setDialect(JdbcDialect dialect) { + this.dialect = dialect; + + return this; + } + + /** + * Get maximum workers thread count. These threads are responsible for queries execution. + * + * @return Maximum workers thread count. + */ + public int getMaximumPoolSize() { + return maxPoolSz; + } + + /** + * Set Maximum workers thread count. These threads are responsible for queries execution. + * + * @param maxPoolSz Max workers thread count. + * @return {@code This} for chaining. + */ + public CacheJdbcPojoStoreConfiguration setMaximumPoolSize(int maxPoolSz) { + this.maxPoolSz = maxPoolSz; + + return this; + } + + /** + * Gets maximum number of write attempts in case of database error. + * + * @return Maximum number of write attempts. + */ + public int getMaximumWriteAttempts() { + return maxWrtAttempts; + } + + /** + * Sets maximum number of write attempts in case of database error. + * + * @param maxWrtAttempts Number of write attempts. + * @return {@code This} for chaining. + */ + public CacheJdbcPojoStoreConfiguration setMaximumWriteAttempts(int maxWrtAttempts) { + this.maxWrtAttempts = maxWrtAttempts; + + return this; + } + + /** + * Parallel load cache minimum row count threshold. + * + * @return If {@code 0} then load sequentially. + */ + public int getParallelLoadCacheMinimumThreshold() { + return parallelLoadCacheMinThreshold; + } + + /** + * Parallel load cache minimum row count threshold. + * + * @param parallelLoadCacheMinThreshold Minimum row count threshold. If {@code 0} then load sequentially. + * @return {@code This} for chaining. + */ + public CacheJdbcPojoStoreConfiguration setParallelLoadCacheMinimumThreshold(int parallelLoadCacheMinThreshold) { + this.parallelLoadCacheMinThreshold = parallelLoadCacheMinThreshold; + + return this; + } + + /** + * Gets types known by store. + * + * @return Types known by store. + */ + public CacheJdbcPojoStoreType[] getTypes() { + return types; + } + + /** + * Sets store configurations. + * + * @param types Store should process. + * @return {@code This} for chaining. + */ + public CacheJdbcPojoStoreConfiguration setTypes(CacheJdbcPojoStoreType... types) { + this.types = types; + + return this; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/8c1a71b2/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreFactory.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreFactory.java index c90a69b..6d8f8af 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreFactory.java @@ -61,43 +61,65 @@ public class CacheJdbcPojoStoreFactory<K, V> implements Factory<CacheJdbcPojoSto /** */ private static final long serialVersionUID = 0L; + /** POJO store configuration. */ + private CacheJdbcPojoStoreConfiguration cfg; + /** Name of data source bean. */ + @Deprecated private String dataSrcBean; - /** Data source. */ - private transient DataSource dataSrc; - /** Database dialect. */ + @Deprecated private JdbcDialect dialect; + /** Data source. */ + private transient DataSource dataSrc; + /** Application context. */ @SpringApplicationContextResource - private transient Object appContext; + private transient Object appCtx; /** {@inheritDoc} */ @Override public CacheJdbcPojoStore<K, V> create() { CacheJdbcPojoStore<K, V> store = new CacheJdbcPojoStore<>(); - store.setDialect(dialect); + // For backward compatibility create store configuration. + if (cfg == null) { + cfg = new CacheJdbcPojoStoreConfiguration(); + + cfg.setDataSourceBean(dataSrcBean); + cfg.setDialect(dialect); + } + + store.setBatchSize(cfg.getBatchSize()); + store.setDialect(cfg.getDialect()); + store.setMaximumPoolSize(cfg.getMaximumPoolSize()); + store.setMaximumWriteAttempts(cfg.getMaximumWriteAttempts()); + store.setParallelLoadCacheMinimumThreshold(cfg.getParallelLoadCacheMinimumThreshold()); + store.setTypes(cfg.getTypes()); if (dataSrc != null) store.setDataSource(dataSrc); - else if (dataSrcBean != null) { - if (appContext == null) - throw new IgniteException("Spring application context resource is not injected."); + else { + String dtSrcBean = cfg.getDataSourceBean(); - IgniteSpringHelper spring; + if (dtSrcBean != null) { + if (appCtx == null) + throw new IgniteException("Spring application context resource is not injected."); - try { - spring = IgniteComponentType.SPRING.create(false); + IgniteSpringHelper spring; - DataSource data = spring.loadBeanFromAppContext(appContext, dataSrcBean); + try { + spring = IgniteComponentType.SPRING.create(false); - store.setDataSource(data); - } - catch (Exception e) { - throw new IgniteException("Failed to load bean in application context [beanName=" + dataSrcBean + - ", igniteConfig=" + appContext + ']', e); + DataSource data = spring.loadBeanFromAppContext(appCtx, dtSrcBean); + + store.setDataSource(data); + } + catch (Exception e) { + throw new IgniteException("Failed to load bean in application context [beanName=" + dtSrcBean + + ", igniteConfig=" + appCtx + ']', e); + } } } @@ -105,27 +127,27 @@ public class CacheJdbcPojoStoreFactory<K, V> implements Factory<CacheJdbcPojoSto } /** - * Sets name of the data source bean. + * Sets store configuration. * - * @param dataSrcBean Data source bean name. + * @param cfg Configuration to use. * @return {@code This} for chaining. - * @see CacheJdbcPojoStore#setDataSource(DataSource) */ - public CacheJdbcPojoStoreFactory<K, V> setDataSourceBean(String dataSrcBean) { - this.dataSrcBean = dataSrcBean; + public CacheJdbcPojoStoreFactory<K, V> setConfiguration(CacheJdbcPojoStoreConfiguration cfg) { + this.cfg = cfg; return this; } /** - * Sets data source. Data source should be fully configured and ready-to-use. + * Sets name of the data source bean. * - * @param dataSrc Data source. + * @param dataSrcBean Data source bean name. * @return {@code This} for chaining. * @see CacheJdbcPojoStore#setDataSource(DataSource) */ - public CacheJdbcPojoStoreFactory<K, V> setDataSource(DataSource dataSrc) { - this.dataSrc = dataSrc; + @Deprecated + public CacheJdbcPojoStoreFactory<K, V> setDataSourceBean(String dataSrcBean) { + this.dataSrcBean = dataSrcBean; return this; } @@ -136,12 +158,26 @@ public class CacheJdbcPojoStoreFactory<K, V> implements Factory<CacheJdbcPojoSto * @param dialect Database dialect. * @see CacheJdbcPojoStore#setDialect(JdbcDialect) */ + @Deprecated public void setDialect(JdbcDialect dialect) { this.dialect = dialect; } + /** + * Sets data source. Data source should be fully configured and ready-to-use. + * + * @param dataSrc Data source. + * @return {@code This} for chaining. + * @see CacheJdbcPojoStore#setDataSource(DataSource) + */ + public CacheJdbcPojoStoreFactory<K, V> setDataSource(DataSource dataSrc) { + this.dataSrc = dataSrc; + + return this; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(CacheJdbcPojoStoreFactory.class, this); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/8c1a71b2/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreType.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreType.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreType.java new file mode 100644 index 0000000..e755165 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreType.java @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.store.jdbc; + +import java.io.Serializable; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; + +/** + * Description for type that could be stored into database by store. + */ +public class CacheJdbcPojoStoreType implements Serializable { + /** */ + private static final long serialVersionUID = 0L; + + /** Cache name. */ + private String cacheName; + + /** Schema name in database. */ + private String dbSchema; + + /** Table name in database. */ + private String dbTbl; + + /** Key class used to store key in cache. */ + private String keyType; + + /** List of fields descriptors for key object. */ + @GridToStringInclude + private CacheJdbcPojoStoreTypeField[] keyFields; + + /** Value class used to store value in cache. */ + private String valType; + + /** List of fields descriptors for value object. */ + @GridToStringInclude + private CacheJdbcPojoStoreTypeField[] valFields; + + /** If {@code true} object is stored as IgniteObject. */ + private boolean keepSerialized; + + /** + * Empty constructor (all values are initialized to their defaults). + */ + public CacheJdbcPojoStoreType() { + /* No-op. */ + } + + /** + * Copy constructor. + * + * @param type Type to copy. + */ + public CacheJdbcPojoStoreType(CacheJdbcPojoStoreType type) { + cacheName = type.getCacheName(); + + dbSchema = type.getDatabaseSchema(); + dbTbl = type.getDatabaseTable(); + + keyType = type.getKeyType(); + keyFields = type.getKeyFields(); + + valType = type.getValueType(); + valFields = type.getValueFields(); + + keepSerialized = type.isKeepSerialized(); + } + + /** + * Gets associated cache name. + * + * @return Сache name. + */ + public String getCacheName() { + return cacheName; + } + + /** + * Sets associated cache name. + * + * @param cacheName Cache name. + */ + public CacheJdbcPojoStoreType setCacheName(String cacheName) { + this.cacheName = cacheName; + + return this; + } + + /** + * Gets database schema name. + * + * @return Schema name. + */ + public String getDatabaseSchema() { + return dbSchema; + } + + /** + * Sets database schema name. + * + * @param dbSchema Schema name. + */ + public CacheJdbcPojoStoreType setDatabaseSchema(String dbSchema) { + this.dbSchema = dbSchema; + + return this; + } + + /** + * Gets table name in database. + * + * @return Table name in database. + */ + public String getDatabaseTable() { + return dbTbl; + } + + /** + * Table name in database. + * + * @param dbTbl Table name in database. + * @return {@code this} for chaining. + */ + public CacheJdbcPojoStoreType setDatabaseTable(String dbTbl) { + this.dbTbl = dbTbl; + + return this; + } + + /** + * Gets key type. + * + * @return Key type. + */ + public String getKeyType() { + return keyType; + } + + /** + * Sets key type. + * + * @param keyType Key type. + * @return {@code this} for chaining. + */ + public CacheJdbcPojoStoreType setKeyType(String keyType) { + this.keyType = keyType; + + return this; + } + + /** + * Sets key type. + * + * @param cls Key type class. + * @return {@code this} for chaining. + */ + public CacheJdbcPojoStoreType setKeyType(Class<?> cls) { + setKeyType(cls.getName()); + + return this; + } + + /** + * Gets value type. + * + * @return Key type. + */ + public String getValueType() { + return valType; + } + + /** + * Sets value type. + * + * @param valType Value type. + * @return {@code this} for chaining. + */ + public CacheJdbcPojoStoreType setValueType(String valType) { + this.valType = valType; + + return this; + } + + /** + * Sets value type. + * + * @param cls Value type class. + * @return {@code this} for chaining. + */ + public CacheJdbcPojoStoreType setValueType(Class<?> cls) { + setValueType(cls.getName()); + + return this; + } + + /** + * Gets optional persistent key fields (needed only if {@link CacheJdbcPojoStore} is used). + * + * @return Persistent key fields. + */ + public CacheJdbcPojoStoreTypeField[] getKeyFields() { + return keyFields; + } + + /** + * Sets optional persistent key fields (needed only if {@link CacheJdbcPojoStore} is used). + * + * @param keyFields Persistent key fields. + * @return {@code this} for chaining. + */ + public CacheJdbcPojoStoreType setKeyFields(CacheJdbcPojoStoreTypeField... keyFields) { + this.keyFields = keyFields; + + return this; + } + + /** + * Gets optional persistent value fields (needed only if {@link CacheJdbcPojoStore} is used). + * + * @return Persistent value fields. + */ + public CacheJdbcPojoStoreTypeField[] getValueFields() { + return valFields; + } + + /** + * Sets optional persistent value fields (needed only if {@link CacheJdbcPojoStore} is used). + * + * @param valFields Persistent value fields. + * @return {@code this} for chaining. + */ + public CacheJdbcPojoStoreType setValueFields(CacheJdbcPojoStoreTypeField... valFields) { + this.valFields = valFields; + + return this; + } + + /** + * Gets how value stored in cache. + * + * @return {@code true} if object is stored as IgniteObject. + */ + public boolean isKeepSerialized() { + return keepSerialized; + } + + /** + * Sets how value stored in cache. + * + * @param keepSerialized {@code true} if object is stored as IgniteObject. + * @return {@code this} for chaining. + */ + public CacheJdbcPojoStoreType setKeepSerialized(boolean keepSerialized) { + this.keepSerialized = keepSerialized; + + return this; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/8c1a71b2/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreTypeField.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreTypeField.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreTypeField.java new file mode 100644 index 0000000..46a2647 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreTypeField.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.store.jdbc; + +import java.io.Serializable; +import org.apache.ignite.internal.util.typedef.internal.S; + +/** + * Description of how field declared in database and in cache. + */ +public class CacheJdbcPojoStoreTypeField implements Serializable { + /** */ + private static final long serialVersionUID = 0L; + + /** Field JDBC type in database. */ + private int dbFieldType; + + /** Field name in database. */ + private String dbFieldName; + + /** Field java type. */ + private Class<?> javaFieldType; + + /** Field name in java object. */ + private String javaFieldName; + + /** + * Default constructor. + */ + public CacheJdbcPojoStoreTypeField() { + // No-op. + } + + /** + * Full constructor. + * + * @param dbFieldType Field JDBC type in database. + * @param dbFieldName Field name in database. + * @param javaFieldType Field java type. + * @param javaFieldName Field name in java object. + */ + public CacheJdbcPojoStoreTypeField(int dbFieldType, String dbFieldName, Class<?> javaFieldType, String javaFieldName) { + this.dbFieldType = dbFieldType; + this.dbFieldName = dbFieldName; + this.javaFieldType = javaFieldType; + this.javaFieldName = javaFieldName; + } + + /** + * Copy constructor. + * + * @param field Field to copy. + */ + public CacheJdbcPojoStoreTypeField(CacheJdbcPojoStoreTypeField field) { + this(field.getDatabaseFieldType(), field.getDatabaseFieldName(), + field.getJavaFieldType(), field.getJavaFieldName()); + } + + /** + * @return Column JDBC type in database. + */ + public int getDatabaseFieldType() { + return dbFieldType; + } + + /** + * @param dbType Column JDBC type in database. + */ + public void setDatabaseFieldType(int dbType) { + this.dbFieldType = dbType; + } + + + /** + * @return Column name in database. + */ + public String getDatabaseFieldName() { + return dbFieldName; + } + + /** + * @param dbName Column name in database. + */ + public void setDatabaseFieldName(String dbName) { + this.dbFieldName = dbName; + } + + /** + * @return Field java type. + */ + public Class<?> getJavaFieldType() { + return javaFieldType; + } + + /** + * @param javaType Corresponding java type. + */ + public void setJavaFieldType(Class<?> javaType) { + this.javaFieldType = javaType; + } + + /** + * @return Field name in java object. + */ + public String getJavaFieldName() { + return javaFieldName; + } + + /** + * @param javaName Field name in java object. + */ + public void setJavaFieldName(String javaName) { + this.javaFieldName = javaName; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (!(o instanceof CacheJdbcPojoStoreTypeField)) + return false; + + CacheJdbcPojoStoreTypeField that = (CacheJdbcPojoStoreTypeField)o; + + return dbFieldType == that.dbFieldType && dbFieldName.equals(that.dbFieldName) && + javaFieldType == that.javaFieldType && javaFieldName.equals(that.javaFieldName); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int res = dbFieldType; + res = 31 * res + dbFieldName.hashCode(); + + res = 31 * res + javaFieldType.hashCode(); + res = 31 * res + javaFieldName.hashCode(); + + return res; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(CacheJdbcPojoStoreTypeField.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/8c1a71b2/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreTest.java b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreTest.java index 0ad2cad..b2d871c 100644 --- a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreTest.java +++ b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreTest.java @@ -17,23 +17,19 @@ package org.apache.ignite.cache.store.jdbc; -import java.net.MalformedURLException; -import java.net.URL; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; import java.sql.Statement; import java.sql.Timestamp; +import java.sql.Types; import java.util.ArrayList; import java.util.Collection; -import java.util.HashMap; -import java.util.Map; +import java.util.UUID; import java.util.concurrent.ConcurrentLinkedQueue; import javax.cache.integration.CacheWriterException; -import org.apache.ignite.IgniteException; -import org.apache.ignite.cache.CacheTypeMetadata; -import org.apache.ignite.cache.store.jdbc.dialect.BasicJdbcDialect; -import org.apache.ignite.cache.store.jdbc.dialect.JdbcDialect; + +import org.apache.ignite.cache.store.jdbc.dialect.H2Dialect; import org.apache.ignite.cache.store.jdbc.model.Organization; import org.apache.ignite.cache.store.jdbc.model.OrganizationKey; import org.apache.ignite.cache.store.jdbc.model.Person; @@ -41,16 +37,11 @@ import org.apache.ignite.cache.store.jdbc.model.PersonComplexKey; import org.apache.ignite.cache.store.jdbc.model.PersonKey; import org.apache.ignite.internal.processors.cache.CacheEntryImpl; import org.apache.ignite.internal.util.typedef.CI2; -import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiInClosure; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.cache.GridAbstractCacheStoreSelfTest; import org.h2.jdbcx.JdbcConnectionPool; -import org.springframework.beans.BeansException; -import org.springframework.beans.factory.xml.XmlBeanDefinitionReader; -import org.springframework.context.support.GenericApplicationContext; -import org.springframework.core.io.UrlResource; /** * Class for {@code PojoCacheStore} tests. @@ -59,9 +50,6 @@ public class CacheJdbcPojoStoreTest extends GridAbstractCacheStoreSelfTest<Cache /** DB connection URL. */ private static final String DFLT_CONN_URL = "jdbc:h2:mem:autoCacheStore;DB_CLOSE_DELAY=-1"; - /** Default config with mapping. */ - private static final String DFLT_MAPPING_CONFIG = "modules/core/src/test/config/store/jdbc/ignite-type-metadata.xml"; - /** Organization count. */ protected static final int ORGANIZATION_CNT = 1000; @@ -77,71 +65,89 @@ public class CacheJdbcPojoStoreTest extends GridAbstractCacheStoreSelfTest<Cache /** {@inheritDoc} */ @Override protected CacheJdbcPojoStore<Object, Object> store() { - CacheJdbcPojoStore<Object, Object> store = new CacheJdbcPojoStore<>(); - -// PGPoolingDataSource ds = new PGPoolingDataSource(); -// ds.setUser("postgres"); -// ds.setPassword("postgres"); -// ds.setServerName("ip"); -// ds.setDatabaseName("postgres"); -// store.setDataSource(ds); - -// MysqlDataSource ds = new MysqlDataSource(); -// ds.setURL("jdbc:mysql://ip:port/dbname"); -// ds.setUser("mysql"); -// ds.setPassword("mysql"); - + CacheJdbcPojoStoreFactory<Object, Object> storeFactory = new CacheJdbcPojoStoreFactory<>(); + + CacheJdbcPojoStoreConfiguration storeCfg = new CacheJdbcPojoStoreConfiguration(); + storeCfg.setDialect(new H2Dialect()); + + CacheJdbcPojoStoreType[] storeTypes = new CacheJdbcPojoStoreType[6]; + + storeTypes[0] = new CacheJdbcPojoStoreType(); + storeTypes[0].setDatabaseSchema("PUBLIC"); + storeTypes[0].setDatabaseTable("ORGANIZATION"); + storeTypes[0].setKeyType("org.apache.ignite.cache.store.jdbc.model.OrganizationKey"); + storeTypes[0].setKeyFields(new CacheJdbcPojoStoreTypeField(Types.INTEGER, "ID", Integer.class, "id")); + + storeTypes[0].setValueType("org.apache.ignite.cache.store.jdbc.model.Organization"); + storeTypes[0].setValueFields( + new CacheJdbcPojoStoreTypeField(Types.INTEGER, "ID", Integer.class, "id"), + new CacheJdbcPojoStoreTypeField(Types.VARCHAR, "NAME", String.class, "name"), + new CacheJdbcPojoStoreTypeField(Types.VARCHAR, "CITY", String.class, "city")); + + storeTypes[1] = new CacheJdbcPojoStoreType(); + storeTypes[1].setDatabaseSchema("PUBLIC"); + storeTypes[1].setDatabaseTable("PERSON"); + storeTypes[1].setKeyType("org.apache.ignite.cache.store.jdbc.model.PersonKey"); + storeTypes[1].setKeyFields(new CacheJdbcPojoStoreTypeField(Types.INTEGER, "ID", Integer.class, "id")); + + storeTypes[1].setValueType("org.apache.ignite.cache.store.jdbc.model.Person"); + storeTypes[1].setValueFields( + new CacheJdbcPojoStoreTypeField(Types.INTEGER, "ID", Integer.class, "id"), + new CacheJdbcPojoStoreTypeField(Types.INTEGER, "ORG_ID", Integer.class, "orgId"), + new CacheJdbcPojoStoreTypeField(Types.VARCHAR, "NAME", String.class, "name")); + + storeTypes[2] = new CacheJdbcPojoStoreType(); + storeTypes[2].setDatabaseSchema("PUBLIC"); + storeTypes[2].setDatabaseTable("PERSON_COMPLEX"); + storeTypes[2].setKeyType("org.apache.ignite.cache.store.jdbc.model.PersonComplexKey"); + storeTypes[2].setKeyFields( + new CacheJdbcPojoStoreTypeField(Types.INTEGER, "ID", int.class, "id"), + new CacheJdbcPojoStoreTypeField(Types.INTEGER, "ORG_ID", int.class, "orgId"), + new CacheJdbcPojoStoreTypeField(Types.INTEGER, "CITY_ID", int.class, "cityId")); + + storeTypes[2].setValueType("org.apache.ignite.cache.store.jdbc.model.Person"); + storeTypes[2].setValueFields( + new CacheJdbcPojoStoreTypeField(Types.INTEGER, "ID", Integer.class, "id"), + new CacheJdbcPojoStoreTypeField(Types.INTEGER, "ORG_ID", Integer.class, "orgId"), + new CacheJdbcPojoStoreTypeField(Types.VARCHAR, "NAME", String.class, "name"), + new CacheJdbcPojoStoreTypeField(Types.VARCHAR, "SALARY", Integer.class, "salary")); + + storeTypes[3] = new CacheJdbcPojoStoreType(); + storeTypes[3].setDatabaseSchema("PUBLIC"); + storeTypes[3].setDatabaseTable("TIMESTAMP_ENTRIES"); + storeTypes[3].setKeyType("java.sql.Timestamp"); + storeTypes[3].setKeyFields(new CacheJdbcPojoStoreTypeField(Types.TIMESTAMP, "KEY", Timestamp.class, null)); + + storeTypes[3].setValueType("java.lang.Integer"); + storeTypes[3].setValueFields(new CacheJdbcPojoStoreTypeField(Types.INTEGER, "VAL", Integer.class, null)); + + storeTypes[4] = new CacheJdbcPojoStoreType(); + storeTypes[4].setDatabaseSchema("PUBLIC"); + storeTypes[4].setDatabaseTable("STRING_ENTRIES"); + storeTypes[4].setKeyType("java.lang.String"); + storeTypes[4].setKeyFields(new CacheJdbcPojoStoreTypeField(Types.VARCHAR, "KEY", String.class, null)); + + storeTypes[4].setValueType("java.lang.String"); + storeTypes[4].setValueFields(new CacheJdbcPojoStoreTypeField(Types.VARCHAR, "VAL", Integer.class, null)); + + storeTypes[5] = new CacheJdbcPojoStoreType(); + storeTypes[5].setDatabaseSchema("PUBLIC"); + storeTypes[5].setDatabaseTable("UUID_ENTRIES"); + storeTypes[5].setKeyType("java.util.UUID"); + storeTypes[5].setKeyFields(new CacheJdbcPojoStoreTypeField(Types.BINARY, "KEY", UUID.class, null)); + + storeTypes[5].setValueType("java.util.UUID"); + storeTypes[5].setValueFields(new CacheJdbcPojoStoreTypeField(Types.BINARY, "VAL", UUID.class, null)); + + storeCfg.setTypes(storeTypes); + + storeFactory.setConfiguration(storeCfg); + + CacheJdbcPojoStore<Object, Object> store = storeFactory.create(); + + // H2 DataSource store.setDataSource(JdbcConnectionPool.create(DFLT_CONN_URL, "sa", "")); - URL cfgUrl; - - try { - cfgUrl = new URL(DFLT_MAPPING_CONFIG); - } - catch (MalformedURLException ignore) { - cfgUrl = U.resolveIgniteUrl(DFLT_MAPPING_CONFIG); - } - - if (cfgUrl == null) - throw new IgniteException("Failed to resolve metadata path: " + DFLT_MAPPING_CONFIG); - - try { - GenericApplicationContext springCtx = new GenericApplicationContext(); - - new XmlBeanDefinitionReader(springCtx).loadBeanDefinitions(new UrlResource(cfgUrl)); - - springCtx.refresh(); - - Collection<CacheTypeMetadata> typeMeta = springCtx.getBeansOfType(CacheTypeMetadata.class).values(); - - Map<Integer, Map<Object, CacheAbstractJdbcStore.EntryMapping>> cacheMappings = new HashMap<>(); - - JdbcDialect dialect = store.resolveDialect(); - - GridTestUtils.setFieldValue(store, CacheAbstractJdbcStore.class, "dialect", dialect); - - Map<Object, CacheAbstractJdbcStore.EntryMapping> entryMappings = U.newHashMap(typeMeta.size()); - - for (CacheTypeMetadata type : typeMeta) - entryMappings.put(store.keyTypeId(type.getKeyType()), - new CacheAbstractJdbcStore.EntryMapping(null, dialect, type)); - - store.prepareBuilders(null, typeMeta); - - cacheMappings.put(null, entryMappings); - - GridTestUtils.setFieldValue(store, CacheAbstractJdbcStore.class, "cacheMappings", cacheMappings); - } - catch (BeansException e) { - if (X.hasCause(e, ClassNotFoundException.class)) - throw new IgniteException("Failed to instantiate Spring XML application context " + - "(make sure all classes used in Spring configuration are present at CLASSPATH) " + - "[springUrl=" + cfgUrl + ']', e); - else - throw new IgniteException("Failed to instantiate Spring XML application context [springUrl=" + - cfgUrl + ", err=" + e.getMessage() + ']', e); - } - return store; } @@ -152,7 +158,7 @@ public class CacheJdbcPojoStoreTest extends GridAbstractCacheStoreSelfTest<Cache @Override protected void inject(CacheJdbcPojoStore<Object, Object> store) throws Exception { getTestResources().inject(store); - GridTestUtils.setFieldValue(store, CacheAbstractJdbcStore.class, "ses", ses); + GridTestUtils.setFieldValue(store, CacheJdbcPojoStore.class, "ses", ses); } /** {@inheritDoc} */ @@ -224,7 +230,6 @@ public class CacheJdbcPojoStoreTest extends GridAbstractCacheStoreSelfTest<Cache super.beforeTest(); } - /** * @throws Exception If failed. */ @@ -274,7 +279,7 @@ public class CacheJdbcPojoStoreTest extends GridAbstractCacheStoreSelfTest<Cache if (i > 0) prnComplexStmt.setInt(5, 1000 + i * 500); else // Add person with null salary - prnComplexStmt.setNull(5, java.sql.Types.INTEGER); + prnComplexStmt.setNull(5, Types.INTEGER); prnComplexStmt.addBatch(); } @@ -302,9 +307,9 @@ public class CacheJdbcPojoStoreTest extends GridAbstractCacheStoreSelfTest<Cache Person val = (Person)v; - assert key.getId() == val.getId(); - assert key.getOrgId() == val.getOrgId(); - assertEquals("name" + key.getId(), val.getName()); + assertTrue("Key ID should be the same as value ID", key.getId() == val.getId()); + assertTrue("Key orgID should be the same as value orgID", key.getOrgId() == val.getOrgId()); + assertEquals("name" + key.getId(), val.getName()); prnComplexKeys.add((PersonComplexKey)k); } @@ -351,25 +356,23 @@ public class CacheJdbcPojoStoreTest extends GridAbstractCacheStoreSelfTest<Cache * @throws Exception If failed. */ public void testWriteRetry() throws Exception { + CacheJdbcPojoStore<Object, Object> store = store(); + // Special dialect that will skip updates, to test write retry. - BasicJdbcDialect dialect = new BasicJdbcDialect() { + store.setDialect(new H2Dialect() { /** {@inheritDoc} */ - @Override public String updateQuery(String tblName, Collection<String> keyCols, Iterable<String> valCols) { - return super.updateQuery(tblName, keyCols, valCols) + " AND 1 = 0"; + @Override public boolean hasMerge() { + return false; } - }; - - store.setDialect(dialect); - - Map<String, Map<Object, CacheAbstractJdbcStore.EntryMapping>> cacheMappings = - GridTestUtils.getFieldValue(store, CacheAbstractJdbcStore.class, "cacheMappings"); - - CacheAbstractJdbcStore.EntryMapping em = cacheMappings.get(null).get(OrganizationKey.class); - CacheTypeMetadata typeMeta = GridTestUtils.getFieldValue(em, CacheAbstractJdbcStore.EntryMapping.class, "typeMeta"); + /** {@inheritDoc} */ + @Override public String updateQuery(String tblName, Collection<String> keyCols, + Iterable<String> valCols) { + return super.updateQuery(tblName, keyCols, valCols) + " AND 1 = 0"; + } + }); - cacheMappings.get(null).put(OrganizationKey.class, - new CacheAbstractJdbcStore.EntryMapping(null, dialect, typeMeta)); + inject(store); Connection conn = store.openConnection(false); @@ -392,6 +395,8 @@ public class CacheJdbcPojoStoreTest extends GridAbstractCacheStoreSelfTest<Cache try { store.write(new CacheEntryImpl<>(k1, v1)); + + fail("CacheWriterException wasn't thrown."); } catch (CacheWriterException e) { if (!e.getMessage().startsWith("Failed insert entry in database, violate a unique index or primary key") || @@ -418,4 +423,4 @@ public class CacheJdbcPojoStoreTest extends GridAbstractCacheStoreSelfTest<Cache assertNull(store.load(k)); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/8c1a71b2/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcStoreAbstractMultithreadedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcStoreAbstractMultithreadedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcStoreAbstractMultithreadedSelfTest.java index 757cedd..42cc4c9 100644 --- a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcStoreAbstractMultithreadedSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcStoreAbstractMultithreadedSelfTest.java @@ -60,7 +60,7 @@ import static org.apache.ignite.testframework.GridTestUtils.runMultiThreadedAsyn /** * */ -public abstract class CacheJdbcStoreAbstractMultithreadedSelfTest<T extends CacheAbstractJdbcStore> +public abstract class CacheJdbcStoreAbstractMultithreadedSelfTest<T extends CacheJdbcPojoStore> extends GridCommonAbstractTest { /** Default config with mapping. */ private static final String DFLT_MAPPING_CONFIG = "modules/core/src/test/config/store/jdbc/ignite-type-metadata.xml"; @@ -78,7 +78,7 @@ public abstract class CacheJdbcStoreAbstractMultithreadedSelfTest<T extends Cach private static final int BATCH_CNT = 2000; /** Cache store. */ - protected static CacheAbstractJdbcStore store; + protected static CacheJdbcPojoStore store; /** {@inheritDoc} */ @Override protected void beforeTestsStarted() throws Exception { @@ -308,4 +308,4 @@ public abstract class CacheJdbcStoreAbstractMultithreadedSelfTest<T extends Cach } }, 8, "tx"); } -} \ No newline at end of file +}