IGNITE-1753 Wip reworking code.

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2961aaa7
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2961aaa7
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2961aaa7

Branch: refs/heads/ignite-1753-1282
Commit: 2961aaa72ad587fd1578983551c4397a8fdb3b3d
Parents: 8d325a4
Author: Alexey Kuznetsov <[email protected]>
Authored: Wed Oct 28 11:16:39 2015 +0700
Committer: Alexey Kuznetsov <[email protected]>
Committed: Wed Oct 28 11:16:39 2015 +0700

----------------------------------------------------------------------
 .../store/jdbc/CacheAbstractJdbcStore.java      |  317 ++-
 .../cache/store/jdbc/CacheJdbcPojoStore.java    | 2001 +-----------------
 .../jdbc/CacheJdbcPojoStoreConfiguration.java   |    6 +-
 .../store/jdbc/CacheJdbcPojoStoreType.java      |  272 ---
 .../store/jdbc/CacheJdbcPojoStoreTypeField.java |  172 --
 .../ignite/cache/store/jdbc/JdbcType.java       |  272 +++
 .../ignite/cache/store/jdbc/JdbcTypeField.java  |  172 ++
 .../store/jdbc/CacheJdbcPojoStoreSelfTest.java  |   29 +-
 .../store/jdbc/CacheJdbcPojoStoreTest.java      |   56 +-
 .../jdbc/CacheJdbcPortableStoreSelfTest.java    |   42 +-
 ...eJdbcStoreAbstractMultithreadedSelfTest.java |    4 +-
 .../jdbc/CacheJdbcStoreAbstractSelfTest.java    |   18 +-
 12 files changed, 830 insertions(+), 2531 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/2961aaa7/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java
 
b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java
index 8b3d44a..576522c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java
@@ -30,6 +30,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.Callable;
@@ -75,6 +76,10 @@ import org.jetbrains.annotations.Nullable;
 import static java.sql.Statement.EXECUTE_FAILED;
 import static java.sql.Statement.SUCCESS_NO_INFO;
 
+import static 
org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStoreConfiguration.DFLT_BATCH_SIZE;
+import static 
org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStoreConfiguration.DFLT_WRITE_ATTEMPTS;
+import static 
org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStoreConfiguration.DFLT_PARALLEL_LOAD_CACHE_MINIMUM_THRESHOLD;
+
 /**
  * Implementation of {@link CacheStore} backed by JDBC.
  * <p>
@@ -115,28 +120,16 @@ import static java.sql.Statement.SUCCESS_NO_INFO;
  */
 @Deprecated
 public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, 
V>, LifecycleAware {
-    /** Max attempt write count. */
-    protected static final int MAX_ATTEMPT_WRITE_COUNT = 2;
-
-    /** Default batch size for put and remove operations. */
-    protected static final int DFLT_BATCH_SIZE = 512;
-
-    /** Default batch size for put and remove operations. */
-    protected static final int DFLT_PARALLEL_LOAD_CACHE_MINIMUM_THRESHOLD = 
512;
-
     /** Connection attribute property name. */
     protected static final String ATTR_CONN_PROP = "JDBC_STORE_CONNECTION";
 
-    /** Empty column value. */
-    protected static final Object[] EMPTY_COLUMN_VALUE = new Object[] { null };
-
     /** Auto-injected store session. */
     @CacheStoreSessionResource
     private CacheStoreSession ses;
 
     /** Auto injected ignite instance. */
     @IgniteInstanceResource
-    private Ignite ignite;
+    protected Ignite ignite;
 
     /** Auto-injected logger instance. */
     @LoggerResource
@@ -152,18 +145,27 @@ public abstract class CacheAbstractJdbcStore<K, V> 
implements CacheStore<K, V>,
     /** Cache with entry mapping description. (cache name, (key id, mapping 
description)). */
     protected volatile Map<String, Map<Object, EntryMapping>> cacheMappings = 
Collections.emptyMap();
 
+    /** Map for quick check whether type is POJO or Portable. */
+    private volatile Map<String, Map<String, Boolean>> keepSerializedTypes = 
new HashMap<>();
+
+    /** Maximum batch size for writeAll and deleteAll operations. */
+    private int batchSz = DFLT_BATCH_SIZE;
+
     /** Database dialect. */
     protected JdbcDialect dialect;
 
+    /** Maximum write attempts in case of database error. */
+    private int maxWrtAttempts = DFLT_WRITE_ATTEMPTS;
+
     /** Max workers thread count. These threads are responsible for load 
cache. */
     private int maxPoolSz = Runtime.getRuntime().availableProcessors();
 
-    /** Maximum batch size for writeAll and deleteAll operations. */
-    private int batchSz = DFLT_BATCH_SIZE;
-
     /** Parallel load cache minimum threshold. If {@code 0} then load 
sequentially. */
     private int parallelLoadCacheMinThreshold = 
DFLT_PARALLEL_LOAD_CACHE_MINIMUM_THRESHOLD;
 
+    /** Types that store could process. */
+    private JdbcType[] types;
+
     /**
      * Get field value from object for use as query parameter.
      *
@@ -190,26 +192,27 @@ public abstract class CacheAbstractJdbcStore<K, V> 
implements CacheStore<K, V>,
      * @throws CacheLoaderException If failed to construct cache object.
      */
     protected abstract <R> R buildObject(@Nullable String cacheName, String 
typeName,
-        Collection<CacheTypeFieldMetadata> fields, Map<String, Integer> 
loadColIdxs, ResultSet rs)
+        JdbcTypeField[] fields, Map<String, Integer> loadColIdxs, ResultSet rs)
         throws CacheLoaderException;
 
     /**
-     * Extract key type id from key object.
+     * Calculate type ID for object.
      *
-     * @param key Key object.
-     * @return Key type id.
-     * @throws CacheException If failed to get type key id from object.
+     * @param obj Object to calculate type ID for.
+     * @return Type ID.
+     * @throws CacheException If failed to calculate type ID for given object.
      */
-    protected abstract Object keyTypeId(Object key) throws CacheException;
+    protected abstract Object typeIdForObject(Object obj) throws 
CacheException;
 
     /**
-     * Extract key type id from key class name.
+     * Calculate type ID for given type name.
      *
-     * @param type String description of key type.
-     * @return Key type id.
-     * @throws CacheException If failed to get type key id from object.
+     * @param keepSerialized If {@code true} then calculate type ID for 
portable object otherwise for POJO.
+     * @param typeName String description of type name.
+     * @return Type ID.
+     * @throws CacheException If failed to get type ID for given type name.
      */
-    protected abstract Object keyTypeId(String type) throws CacheException;
+    protected abstract Object typeIdForTypeName(boolean keepSerialized, String 
typeName) throws CacheException;
 
     /**
      * Prepare internal store specific builders for provided types metadata.
@@ -218,7 +221,7 @@ public abstract class CacheAbstractJdbcStore<K, V> 
implements CacheStore<K, V>,
      * @param types Collection of types.
      * @throws CacheException If failed to prepare internal builders for types.
      */
-    protected abstract void prepareBuilders(@Nullable String cacheName, 
Collection<CacheTypeMetadata> types)
+    protected abstract void prepareBuilders(@Nullable String cacheName, 
Collection<JdbcType> types)
         throws CacheException;
 
     /**
@@ -541,35 +544,35 @@ public abstract class CacheAbstractJdbcStore<K, V> 
implements CacheStore<K, V>,
      * @param fields Fields descriptors.
      * @throws CacheException If failed to check type metadata.
      */
-    private static void checkMapping(@Nullable String cacheName, String 
clsName,
-        Collection<CacheTypeFieldMetadata> fields) throws CacheException {
+    private void checkMapping(@Nullable String cacheName, String clsName,
+        JdbcTypeField[] fields) throws CacheException {
         try {
             Class<?> cls = Class.forName(clsName);
 
             if (simpleType(cls)) {
-                if (fields.size() != 1)
+                if (fields.length != 1)
                     throw new CacheException("More than one field for simple 
type [cache name=" + cacheName
                         + ", type=" + clsName + " ]");
 
-                CacheTypeFieldMetadata field = F.first(fields);
+                JdbcTypeField field = fields[0];
 
-                if (field.getDatabaseName() == null)
+                if (field.getDatabaseFieldName() == null)
                     throw new CacheException("Missing database name in mapping 
description [cache name=" + cacheName
                         + ", type=" + clsName + " ]");
 
-                field.setJavaType(cls);
+                field.setJavaFieldType(cls);
             }
             else
-                for (CacheTypeFieldMetadata field : fields) {
-                    if (field.getDatabaseName() == null)
+                for (JdbcTypeField field : fields) {
+                    if (field.getDatabaseFieldName() == null)
                         throw new CacheException("Missing database name in 
mapping description [cache name=" + cacheName
                             + ", type=" + clsName + " ]");
 
-                    if (field.getJavaName() == null)
+                    if (field.getJavaFieldName() == null)
                         throw new CacheException("Missing field name in 
mapping description [cache name=" + cacheName
                             + ", type=" + clsName + " ]");
 
-                    if (field.getJavaType() == null)
+                    if (field.getJavaFieldType() == null)
                         throw new CacheException("Missing field type in 
mapping description [cache name=" + cacheName
                             + ", type=" + clsName + " ]");
                 }
@@ -580,6 +583,50 @@ public abstract class CacheAbstractJdbcStore<K, V> 
implements CacheStore<K, V>,
     }
 
     /**
+     * For backward compatibility translate old field type descriptors to new 
format.
+     *
+     * @param oldFields Fields in old format.
+     * @return Fields in new format.
+     */
+    @Deprecated
+    private JdbcTypeField[] translateFields(Collection<CacheTypeFieldMetadata> 
oldFields) {
+        JdbcTypeField[] newFields = new JdbcTypeField[oldFields.size()];
+
+        int idx = 0;
+
+        for (CacheTypeFieldMetadata oldField : oldFields) {
+            newFields[idx] = new JdbcTypeField(oldField.getDatabaseType(), 
oldField.getDatabaseName(),
+                oldField.getJavaType(), oldField.getJavaName());
+
+            idx++;
+        }
+
+        return newFields;
+    }
+
+    /**
+     * Checks for POJO/portable format.
+     *
+     * @param cacheName Cache name to get types settings.
+     * @param typeName Type name to check for POJO/portable format.
+     * @return {@code true} If portable format configured.
+     * @throws CacheException In case of error.
+     */
+    protected boolean isKeepSerialized(String cacheName, String typeName) {
+        Map<String, Boolean> cacheTypes = keepSerializedTypes.get(cacheName);
+
+        if (cacheTypes == null)
+            throw new CacheException("Failed to find types metadata for cache: 
" + cacheName);
+
+        Boolean keepSerialized = cacheTypes.get(typeName);
+
+        if (keepSerialized == null)
+            throw new CacheException("Failed to find type metadata for type: " 
+ typeName);
+
+        return keepSerialized;
+    }
+
+    /**
      * @param cacheName Cache name to check mappings for.
      * @return Type mappings for specified cache name.
      * @throws CacheException If failed to initialize cache mappings.
@@ -598,32 +645,80 @@ public abstract class CacheAbstractJdbcStore<K, V> 
implements CacheStore<K, V>,
             if (entryMappings != null)
                 return entryMappings;
 
-            CacheConfiguration ccfg = 
ignite().cache(cacheName).getConfiguration(CacheConfiguration.class);
+            // If no types configured, check CacheTypeMetadata for backward 
compatibility.
+            if (types == null) {
+                CacheConfiguration ccfg = 
ignite().cache(cacheName).getConfiguration(CacheConfiguration.class);
+
+                Collection<CacheTypeMetadata> oldTypes = 
ccfg.getTypeMetadata();
+
+                types = new JdbcType[oldTypes.size()];
+
+                int idx = 0;
 
-            Collection<CacheTypeMetadata> types = ccfg.getTypeMetadata();
+                for (CacheTypeMetadata oldType : oldTypes) {
+                    JdbcType newType = new JdbcType();
 
-            entryMappings = U.newHashMap(types.size());
+                    newType.setCacheName(cacheName);
 
-            for (CacheTypeMetadata type : types) {
-                Object keyTypeId = keyTypeId(type.getKeyType());
+                    newType.setDatabaseSchema(oldType.getDatabaseSchema());
+                    newType.setDatabaseTable(oldType.getDatabaseTable());
 
-                if (entryMappings.containsKey(keyTypeId))
-                    throw new CacheException("Key type must be unique in type 
metadata [cache name=" + cacheName +
-                        ", key type=" + type.getKeyType() + "]");
+                    newType.setKeyType(oldType.getKeyType());
+                    
newType.setKeyFields(translateFields(oldType.getKeyFields()));
 
-                checkMapping(cacheName, type.getKeyType(), 
type.getKeyFields());
-                checkMapping(cacheName, type.getValueType(), 
type.getValueFields());
+                    newType.setValueType(oldType.getValueType());
+                    
newType.setValueFields(translateFields(oldType.getValueFields()));
 
-                entryMappings.put(keyTypeId(type.getKeyType()), new 
EntryMapping(cacheName, dialect, type));
+                    types[idx] = newType;
+
+                    idx++;
+                }
             }
 
-            Map<String, Map<Object, EntryMapping>> mappings = new 
HashMap<>(cacheMappings);
+            List<JdbcType> cacheTypes = new ArrayList<>(types.length);
+
+            for (JdbcType type : types)
+                if ((cacheName != null && 
cacheName.equals(type.getCacheName())) ||
+                    (cacheName == null && type.getCacheName() == null))
+                    cacheTypes.add(type);
+
+            entryMappings = U.newHashMap(cacheTypes.size());
+
+            if (!cacheTypes.isEmpty()) {
+                Map<String, Boolean> tk = new HashMap<>(cacheTypes.size() * 2);
 
-            mappings.put(cacheName, entryMappings);
+                for (JdbcType type : cacheTypes) {
+                    boolean keepSerialized = type.isKeepSerialized();
 
-            prepareBuilders(cacheName, types);
+                    String keyType = type.getKeyType();
+                    String valType = type.getValueType();
 
-            cacheMappings = mappings;
+                    tk.put(keyType, keepSerialized);
+                    tk.put(valType, keepSerialized);
+
+                    Object keyTypeId = typeIdForTypeName(keepSerialized, 
keyType);
+
+                    if (entryMappings.containsKey(keyTypeId))
+                        throw new CacheException("Key type must be unique in 
type metadata [cache name=" + cacheName +
+                            ", key type=" + keyType + "]");
+
+                    if (!keepSerialized) {
+                        checkMapping(cacheName, keyType, type.getKeyFields());
+                        checkMapping(cacheName, valType, 
type.getValueFields());
+                    }
+
+                    entryMappings.put(keyTypeId, new EntryMapping(cacheName, 
dialect, type));
+                }
+
+                keepSerializedTypes.put(cacheName, tk);
+
+                Map<String, Map<Object, EntryMapping>> mappings = new 
HashMap<>(cacheMappings);
+                mappings.put(cacheName, entryMappings);
+
+                prepareBuilders(cacheName, cacheTypes);
+
+                cacheMappings = mappings;
+            }
 
             return entryMappings;
         }
@@ -646,7 +741,7 @@ public abstract class CacheAbstractJdbcStore<K, V> 
implements CacheStore<K, V>,
             String maskedCacheName = U.maskName(cacheName);
 
             throw new CacheException("Failed to find mapping description 
[key=" + key +
-                ", cache=" + maskedCacheName + "]. Please configure 
CacheTypeMetadata to associate '" + maskedCacheName + "' with JdbcPojoStore.");
+                ", cache=" + maskedCacheName + "]. Please configure JdbcType 
to associate '" + maskedCacheName + "' with JdbcPojoStore.");
         }
 
         return em;
@@ -671,12 +766,16 @@ public abstract class CacheAbstractJdbcStore<K, V> 
implements CacheStore<K, V>,
                 if (log.isDebugEnabled())
                     log.debug("Start loading entries from db using user 
queries from arguments...");
 
+                // We must build cache mappings first.
+                cacheMappings(cacheName);
+
                 for (int i = 0; i < args.length; i += 2) {
                     String keyType = args[i].toString();
 
                     String selQry = args[i + 1].toString();
 
-                    EntryMapping em = entryMapping(cacheName, 
keyTypeId(keyType), keyType);
+                    EntryMapping em = entryMapping(cacheName, 
typeIdForTypeName(isKeepSerialized(cacheName, keyType),
+                        keyType), keyType);
 
                     futs.add(pool.submit(new LoadCacheCustomQueryWorker<>(em, 
selQry, clo)));
                 }
@@ -762,7 +861,7 @@ public abstract class CacheAbstractJdbcStore<K, V> 
implements CacheStore<K, V>,
     @Nullable @Override public V load(K key) throws CacheLoaderException {
         assert key != null;
 
-        EntryMapping em = entryMapping(session().cacheName(), keyTypeId(key), 
key);
+        EntryMapping em = entryMapping(session().cacheName(), 
typeIdForObject(key), key);
 
         if (log.isDebugEnabled())
             log.debug("Load value from db [table= " + em.fullTableName() + ", 
key=" + key + "]");
@@ -810,7 +909,7 @@ public abstract class CacheAbstractJdbcStore<K, V> 
implements CacheStore<K, V>,
             Map<K, V> res = new HashMap<>();
 
             for (K key : keys) {
-                Object keyTypeId = keyTypeId(key);
+                Object keyTypeId = typeIdForObject(key);
 
                 EntryMapping em = entryMapping(cacheName, keyTypeId, key);
 
@@ -850,7 +949,7 @@ public abstract class CacheAbstractJdbcStore<K, V> 
implements CacheStore<K, V>,
         try {
             CacheWriterException we = null;
 
-            for (int attempt = 0; attempt < MAX_ATTEMPT_WRITE_COUNT; 
attempt++) {
+            for (int attempt = 0; attempt < maxWrtAttempts; attempt++) {
                 int paramIdx = fillValueParameters(updStmt, 1, em, 
entry.getValue());
 
                 fillKeyParameters(updStmt, paramIdx, em, entry.getKey());
@@ -919,7 +1018,7 @@ public abstract class CacheAbstractJdbcStore<K, V> 
implements CacheStore<K, V>,
 
         K key = entry.getKey();
 
-        EntryMapping em = entryMapping(session().cacheName(), keyTypeId(key), 
key);
+        EntryMapping em = entryMapping(session().cacheName(), 
typeIdForObject(key), key);
 
         if (log.isDebugEnabled())
             log.debug("Start write entry to database [table=" + 
em.fullTableName() + ", entry=" + entry + "]");
@@ -1008,7 +1107,7 @@ public abstract class CacheAbstractJdbcStore<K, V> 
implements CacheStore<K, V>,
                     for (Cache.Entry<? extends K, ? extends V> entry : 
entries) {
                         K key = entry.getKey();
 
-                        Object keyTypeId = keyTypeId(key);
+                        Object keyTypeId = typeIdForObject(key);
 
                         em = entryMapping(cacheName, keyTypeId, key);
 
@@ -1077,7 +1176,7 @@ public abstract class CacheAbstractJdbcStore<K, V> 
implements CacheStore<K, V>,
                     for (Cache.Entry<? extends K, ? extends V> entry : 
entries) {
                         K key = entry.getKey();
 
-                        Object keyTypeId = keyTypeId(key);
+                        Object keyTypeId = typeIdForObject(key);
 
                         EntryMapping em = entryMapping(cacheName, keyTypeId, 
key);
 
@@ -1115,7 +1214,7 @@ public abstract class CacheAbstractJdbcStore<K, V> 
implements CacheStore<K, V>,
     @Override public void delete(Object key) throws CacheWriterException {
         assert key != null;
 
-        EntryMapping em = entryMapping(session().cacheName(), keyTypeId(key), 
key);
+        EntryMapping em = entryMapping(session().cacheName(), 
typeIdForObject(key), key);
 
         if (log.isDebugEnabled())
             log.debug("Remove value from db [table=" + em.fullTableName() + ", 
key=" + key + "]");
@@ -1219,7 +1318,7 @@ public abstract class CacheAbstractJdbcStore<K, V> 
implements CacheStore<K, V>,
             int fromIdx = 0, prepared = 0;
 
             for (Object key : keys) {
-                Object keyTypeId = keyTypeId(key);
+                Object keyTypeId = typeIdForObject(key);
 
                 em = entryMapping(cacheName, keyTypeId, key);
 
@@ -1285,12 +1384,12 @@ public abstract class CacheAbstractJdbcStore<K, V> 
implements CacheStore<K, V>,
      * @param fieldVal Field value.
      * @throws CacheException If failed to set statement parameter.
      */
-    protected void fillParameter(PreparedStatement stmt, int i, 
CacheTypeFieldMetadata field, @Nullable Object fieldVal)
+    protected void fillParameter(PreparedStatement stmt, int i, JdbcTypeField 
field, @Nullable Object fieldVal)
         throws CacheException {
         try {
             if (fieldVal != null) {
-                if (field.getJavaType() == UUID.class) {
-                    switch (field.getDatabaseType()) {
+                if (field.getJavaFieldType() == UUID.class) {
+                    switch (field.getDatabaseFieldType()) {
                         case Types.BINARY:
                             fieldVal = U.uuidToBytes((UUID)fieldVal);
 
@@ -1306,10 +1405,10 @@ public abstract class CacheAbstractJdbcStore<K, V> 
implements CacheStore<K, V>,
                 stmt.setObject(i, fieldVal);
             }
             else
-                stmt.setNull(i, field.getDatabaseType());
+                stmt.setNull(i, field.getDatabaseFieldType());
         }
         catch (SQLException e) {
-            throw new CacheException("Failed to set statement parameter name: 
" + field.getDatabaseName(), e);
+            throw new CacheException("Failed to set statement parameter name: 
" + field.getDatabaseFieldName(), e);
         }
     }
 
@@ -1323,8 +1422,8 @@ public abstract class CacheAbstractJdbcStore<K, V> 
implements CacheStore<K, V>,
      */
     protected int fillKeyParameters(PreparedStatement stmt, int idx, 
EntryMapping em,
         Object key) throws CacheException {
-        for (CacheTypeFieldMetadata field : em.keyColumns()) {
-            Object fieldVal = extractParameter(em.cacheName, em.keyType(), 
field.getJavaName(), key);
+        for (JdbcTypeField field : em.keyColumns()) {
+            Object fieldVal = extractParameter(em.cacheName, em.keyType(), 
field.getJavaFieldName(), key);
 
             fillParameter(stmt, idx++, field, fieldVal);
         }
@@ -1353,8 +1452,8 @@ public abstract class CacheAbstractJdbcStore<K, V> 
implements CacheStore<K, V>,
      */
     protected int fillValueParameters(PreparedStatement stmt, int idx, 
EntryMapping em, Object val)
         throws CacheWriterException {
-        for (CacheTypeFieldMetadata field : em.uniqValFields) {
-            Object fieldVal = extractParameter(em.cacheName, em.valueType(), 
field.getJavaName(), val);
+        for (JdbcTypeField field : em.uniqValFields) {
+            Object fieldVal = extractParameter(em.cacheName, em.valueType(), 
field.getJavaFieldName(), val);
 
             fillParameter(stmt, idx++, field, fieldVal);
         }
@@ -1413,6 +1512,42 @@ public abstract class CacheAbstractJdbcStore<K, V> 
implements CacheStore<K, V>,
     }
 
     /**
+     * Gets maximum number of write attempts in case of database error.
+     *
+     * @return Maximum number of write attempts.
+     */
+    public int getMaximumWriteAttempts() {
+        return maxWrtAttempts;
+    }
+
+    /**
+     * Sets maximum number of write attempts in case of database error.
+     *
+     * @param maxWrtAttempts Number of write attempts.
+     */
+    public void setMaximumWriteAttempts(int maxWrtAttempts) {
+        this.maxWrtAttempts = maxWrtAttempts;
+    }
+
+    /**
+     * Gets types known by store.
+     *
+     * @return Types known by store.
+     */
+    public JdbcType[] getTypes() {
+        return types;
+    }
+
+    /**
+     * Sets store configurations.
+     *
+     * @param types Store should process.
+     */
+    public void setTypes(JdbcType... types) {
+        this.types = types;
+    }
+
+    /**
      * Get maximum batch size for delete and delete operations.
      *
      * @return Maximum batch size.
@@ -1509,10 +1644,10 @@ public abstract class CacheAbstractJdbcStore<K, V> 
implements CacheStore<K, V>,
         private final Map<String, Integer> loadColIdxs;
 
         /** Unique value fields. */
-        private final Collection<CacheTypeFieldMetadata> uniqValFields;
+        private final Collection<JdbcTypeField> uniqValFields;
 
         /** Type metadata. */
-        private final CacheTypeMetadata typeMeta;
+        private final JdbcType typeMeta;
 
         /** Full table name. */
         private final String fullTblName;
@@ -1522,22 +1657,22 @@ public abstract class CacheAbstractJdbcStore<K, V> 
implements CacheStore<K, V>,
          * @param dialect JDBC dialect.
          * @param typeMeta Type metadata.
          */
-        public EntryMapping(@Nullable String cacheName, JdbcDialect dialect, 
CacheTypeMetadata typeMeta) {
+        public EntryMapping(@Nullable String cacheName, JdbcDialect dialect, 
JdbcType typeMeta) {
             this.cacheName = cacheName;
 
             this.dialect = dialect;
 
             this.typeMeta = typeMeta;
 
-            Collection<CacheTypeFieldMetadata> keyFields = 
typeMeta.getKeyFields();
+            JdbcTypeField[] keyFields = typeMeta.getKeyFields();
 
-            Collection<CacheTypeFieldMetadata> valFields = 
typeMeta.getValueFields();
+            JdbcTypeField[] valFields = typeMeta.getValueFields();
 
-            keyCols = databaseColumns(keyFields);
+            keyCols = databaseColumns(F.asList(keyFields));
 
-            uniqValFields = F.view(valFields, new 
IgnitePredicate<CacheTypeFieldMetadata>() {
-                @Override public boolean apply(CacheTypeFieldMetadata col) {
-                    return !keyCols.contains(col.getDatabaseName());
+            uniqValFields = F.view(F.asList(valFields), new 
IgnitePredicate<JdbcTypeField>() {
+                @Override public boolean apply(JdbcTypeField col) {
+                    return !keyCols.contains(col.getDatabaseFieldName());
                 }
             });
 
@@ -1578,16 +1713,16 @@ public abstract class CacheAbstractJdbcStore<K, V> 
implements CacheStore<K, V>,
         }
 
         /**
-         * Extract database column names from {@link CacheTypeFieldMetadata}.
+         * Extract database column names from {@link JdbcTypeField}.
          *
-         * @param dsc collection of {@link CacheTypeFieldMetadata}.
+         * @param dsc collection of {@link JdbcTypeField}.
          * @return Collection with database column names.
          */
-        private static Collection<String> 
databaseColumns(Collection<CacheTypeFieldMetadata> dsc) {
-            return F.transform(dsc, new C1<CacheTypeFieldMetadata, String>() {
+        private static Collection<String> 
databaseColumns(Collection<JdbcTypeField> dsc) {
+            return F.transform(dsc, new C1<JdbcTypeField, String>() {
                 /** {@inheritDoc} */
-                @Override public String apply(CacheTypeFieldMetadata col) {
-                    return col.getDatabaseName();
+                @Override public String apply(JdbcTypeField col) {
+                    return col.getDatabaseFieldName();
                 }
             });
         }
@@ -1640,7 +1775,7 @@ public abstract class CacheAbstractJdbcStore<K, V> 
implements CacheStore<K, V>,
          *
          * @return Key columns.
          */
-        protected Collection<CacheTypeFieldMetadata> keyColumns() {
+        protected JdbcTypeField[] keyColumns() {
             return typeMeta.getKeyFields();
         }
 
@@ -1649,7 +1784,7 @@ public abstract class CacheAbstractJdbcStore<K, V> 
implements CacheStore<K, V>,
          *
          * @return Value columns.
          */
-        protected Collection<CacheTypeFieldMetadata> valueColumns() {
+        protected JdbcTypeField[] valueColumns() {
             return typeMeta.getValueFields();
         }
 
@@ -1799,8 +1934,8 @@ public abstract class CacheAbstractJdbcStore<K, V> 
implements CacheStore<K, V>,
                 int idx = 1;
 
                 for (Object key : keys)
-                    for (CacheTypeFieldMetadata field : em.keyColumns()) {
-                        Object fieldVal = extractParameter(em.cacheName, 
em.keyType(), field.getJavaName(), key);
+                    for (JdbcTypeField field : em.keyColumns()) {
+                        Object fieldVal = extractParameter(em.cacheName, 
em.keyType(), field.getJavaFieldName(), key);
 
                         fillParameter(stmt, idx++, field, fieldVal);
                     }

Reply via email to