Repository: ignite
Updated Branches:
  refs/heads/ignite-1753 bbdc33eb0 -> 1dfef7aa2


IGNITE-1753 WIP on reworking JDBC POJO store to new configuration.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1dfef7aa
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1dfef7aa
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1dfef7aa

Branch: refs/heads/ignite-1753
Commit: 1dfef7aa266c3120e838d10f24bb1ac9c05ef11a
Parents: bbdc33e
Author: Alexey Kuznetsov <akuznet...@apache.org>
Authored: Mon Oct 26 14:37:02 2015 +0700
Committer: Alexey Kuznetsov <akuznet...@apache.org>
Committed: Mon Oct 26 14:37:02 2015 +0700

----------------------------------------------------------------------
 .../cache/store/jdbc/CacheJdbcPojoStore.java    | 202 +++++++++++++------
 1 file changed, 140 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/1dfef7aa/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStore.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStore.java
 
b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStore.java
index b25017c..f9225c5 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStore.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStore.java
@@ -71,6 +71,7 @@ import org.apache.ignite.lang.IgniteBiInClosure;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.lifecycle.LifecycleAware;
 import org.apache.ignite.portable.PortableBuilder;
+import org.apache.ignite.portable.PortableObject;
 import org.apache.ignite.resources.CacheStoreSessionResource;
 import org.apache.ignite.resources.IgniteInstanceResource;
 import org.apache.ignite.resources.LoggerResource;
@@ -134,13 +135,35 @@ public class CacheJdbcPojoStore<K, V> implements 
CacheStore<K, V>, LifecycleAwar
     private CacheJdbcPojoStoreType[] types;
 
     /** Map for quick check whether type is POJO or Portable. */
-    private volatile Map<String, Map<String, Boolean>> typesKind = 
Collections.emptyMap();
+    private volatile Map<String, Map<String, Boolean>> keepSerializedTypes = 
new HashMap<>();
 
     /** POJO methods cache. */
     private volatile Map<String, Map<String, PojoMethodsCache>> pojoMethods = 
Collections.emptyMap();
 
     /** Portables builders cache. */
-    protected volatile Map<String, Map<String, PortableBuilder>> 
portableBuilders = Collections.emptyMap();
+    private volatile Map<String, Map<String, PortableBuilder>> 
portableBuilders = Collections.emptyMap();
+
+    /**
+     * Checks for POJO/portable format.
+     *
+     * @param cacheName Cache name to get types settings.
+     * @param typeName Type name to check for POJO/portable format.
+     * @return {@code true} If portable format configured.
+     * @throws CacheException In case of error.
+     */
+    private boolean isKeepSerialized(String cacheName, String typeName) {
+        Map<String, Boolean> cacheTypes = keepSerializedTypes.get(cacheName);
+
+        if (cacheTypes == null)
+            throw new CacheException("Failed to find types metadata for cache: 
" + cacheName);
+
+        Boolean keepSerialized = cacheTypes.get(typeName);
+
+        if (keepSerialized == null)
+            throw new CacheException("Failed to find type metadata for type: " 
+ typeName);
+
+        return keepSerialized;
+    }
 
     /**
      * Get field value from object for use as query parameter.
@@ -154,11 +177,33 @@ public class CacheJdbcPojoStore<K, V> implements 
CacheStore<K, V>, LifecycleAwar
      */
     @Nullable private Object extractParameter(@Nullable String cacheName, 
String typeName, String fieldName,
         Object obj) throws CacheException {
+        return isKeepSerialized(cacheName, typeName)
+            ? extractPortableParameter(fieldName, obj)
+            : extractPojoParameter(cacheName, typeName, fieldName, obj);
+    }
+
+    /**
+     * Get field value from POJO for use as query parameter.
+     *
+     * @param cacheName Cache name.
+     * @param typeName Type name.
+     * @param fieldName Field name.
+     * @param obj Cache object.
+     * @return Field value from object.
+     * @throws CacheException in case of error.
+     */
+    @Nullable private Object extractPojoParameter(@Nullable String cacheName, 
String typeName, String fieldName,
+        Object obj) throws CacheException {
         try {
-            PojoMethodsCache mc = pojoMethods.get(cacheName).get(typeName);
+            Map<String, PojoMethodsCache> cacheMethods = 
pojoMethods.get(cacheName);
+
+            if (cacheMethods == null)
+                throw new CacheException("Failed to find POJO type metadata 
for cache: " + cacheName);
+
+            PojoMethodsCache mc = cacheMethods.get(typeName);
 
             if (mc == null)
-                throw new CacheException("Failed to find cache type metadata 
for type: " + typeName);
+                throw new CacheException("Failed to find POJO type metadata 
for type: " + typeName);
 
             if (mc.simple)
                 return obj;
@@ -177,6 +222,25 @@ public class CacheJdbcPojoStore<K, V> implements 
CacheStore<K, V>, LifecycleAwar
     }
 
     /**
+     * Get field value from Portable for use as query parameter.
+     *
+     * @param fieldName Field name to extract query parameter for.
+     * @param obj Object to process.
+     * @return Field value from object.
+     * @throws CacheException in case of error.
+     */
+    private Object extractPortableParameter(String fieldName, Object obj) 
throws CacheException {
+        if (obj instanceof PortableObject) {
+            PortableObject pobj = (PortableObject)obj;
+
+            return pobj.field(fieldName);
+        }
+
+        throw new CacheException("Failed to read property value from non 
portable object [class name=" +
+            obj.getClass() + ", property=" + fieldName + "]");
+    }
+
+    /**
      * Construct object from query result.
      *
      * @param <R> Type of result object.
@@ -191,42 +255,30 @@ public class CacheJdbcPojoStore<K, V> implements 
CacheStore<K, V>, LifecycleAwar
     private <R> R buildObject(@Nullable String cacheName, String typeName,
         CacheJdbcPojoStoreTypeField[] fields, Map<String, Integer> 
loadColIdxs, ResultSet rs)
         throws CacheLoaderException {
-
-        Map<String, Boolean> z = typesKind.get(cacheName);
-
-        if (z == null)
-            throw new CacheLoaderException("Failed to find type metadata for 
cache: " + cacheName);
-
-        Boolean keepSerialized = z.get(typeName);
-
-        if (keepSerialized == null)
-            throw new CacheLoaderException("Failed to find type metadata for 
type: " + typeName);
-
-        return (R)(keepSerialized
+        return (R)(isKeepSerialized(cacheName, typeName)
             ? buildPortableObject(cacheName, typeName, fields, loadColIdxs, rs)
             : buildPojoObject(cacheName, typeName, fields, loadColIdxs, rs));
     }
 
     /**
-     * Construct object from query result.
+     * Construct POJO from query result.
      *
-     * @param <R> Type of result object.
      * @param cacheName Cache name.
      * @param typeName Type name.
      * @param fields Fields descriptors.
      * @param loadColIdxs Select query columns index.
      * @param rs ResultSet.
-     * @return Constructed object.
-     * @throws CacheLoaderException If failed to construct cache object.
+     * @return Constructed POJO.
+     * @throws CacheLoaderException If failed to construct POJO.
      */
-    private <R> R buildPojoObject(@Nullable String cacheName, String typeName,
+    private Object buildPojoObject(@Nullable String cacheName, String typeName,
         CacheJdbcPojoStoreTypeField[] fields, Map<String, Integer> 
loadColIdxs, ResultSet rs)
         throws CacheLoaderException {
 
         Map<String, PojoMethodsCache> z = pojoMethods.get(cacheName);
 
         if (z == null)
-            throw new CacheLoaderException("Failed to find POJO type metadata 
for cache: " + cacheName);
+            throw new CacheLoaderException("Failed to find POJO types metadata 
for cache: " + cacheName);
 
         PojoMethodsCache mc = z.get(typeName);
 
@@ -237,7 +289,7 @@ public class CacheJdbcPojoStore<K, V> implements 
CacheStore<K, V>, LifecycleAwar
             if (mc.simple) {
                 CacheJdbcPojoStoreTypeField field = fields[0];
 
-                return (R)getColumnValue(rs, 
loadColIdxs.get(field.getDatabaseFieldName()), mc.cls);
+                return getColumnValue(rs, 
loadColIdxs.get(field.getDatabaseFieldName()), mc.cls);
             }
 
             Object obj = mc.ctor.newInstance();
@@ -264,7 +316,7 @@ public class CacheJdbcPojoStore<K, V> implements 
CacheStore<K, V>, LifecycleAwar
                 }
             }
 
-            return (R)obj;
+            return obj;
         }
         catch (SQLException e) {
             throw new CacheLoaderException("Failed to read object of class: " 
+ typeName, e);
@@ -274,17 +326,27 @@ public class CacheJdbcPojoStore<K, V> implements 
CacheStore<K, V>, LifecycleAwar
         }
     }
 
-    /** {@inheritDoc} */
-    protected <R> R buildPortableObject(String cacheName, String typeName, 
CacheJdbcPojoStoreTypeField[] fields,
+    /**
+     * Construct portable object from query result.
+     *
+     * @param cacheName Cache name.
+     * @param typeName Type name.
+     * @param fields Fields descriptors.
+     * @param loadColIdxs Select query columns index.
+     * @param rs ResultSet.
+     * @return Constructed portable object.
+     * @throws CacheLoaderException If failed to construct portable object.
+     */
+    protected PortableObject buildPortableObject(String cacheName, String 
typeName, CacheJdbcPojoStoreTypeField[] fields,
         Map<String, Integer> loadColIdxs, ResultSet rs) throws CacheException {
-        Map<String, PortableBuilder> z = portableBuilders.get(cacheName);
+        Map<String, PortableBuilder> cachePortables = 
portableBuilders.get(cacheName);
 
-        if (z == null)
-            throw new CacheException("Failed to find portable builder for 
cache: " + cacheName);
+        if (cachePortables == null)
+            throw new CacheException("Failed to find portable builders for 
cache: " + cacheName);
 
-        PortableBuilder b = z.get(typeName);
+        PortableBuilder builder = cachePortables.get(typeName);
 
-        if (b == null)
+        if (builder == null)
             throw new CacheException("Failed to find portable builder for 
type: " + typeName);
 
         try {
@@ -293,10 +355,10 @@ public class CacheJdbcPojoStore<K, V> implements 
CacheStore<K, V>, LifecycleAwar
 
                 Integer colIdx = loadColIdxs.get(field.getDatabaseFieldName());
 
-                b.setField(field.getJavaFieldName(), getColumnValue(rs, 
colIdx, type));
+                builder.setField(field.getJavaFieldName(), getColumnValue(rs, 
colIdx, type));
             }
 
-            return (R)b.build();
+            return builder.build();
         }
         catch (SQLException e) {
             throw new CacheException("Failed to read portable object", e);
@@ -304,29 +366,36 @@ public class CacheJdbcPojoStore<K, V> implements 
CacheStore<K, V>, LifecycleAwar
     }
 
     /**
-     * Extract key type id from key object.
+     * Calculate type ID for object.
      *
-     * @param key Key object.
-     * @return Key type id.
-     * @throws CacheException If failed to get type key id from object.
+     * @param obj Object to calculate type ID for.
+     * @return Type ID.
+     * @throws CacheException If failed to calculate type ID for given object.
      */
-    private Object keyTypeId(Object key) throws CacheException {
-        return key.getClass();
+    private Object typeIdForObject(Object obj) throws CacheException {
+        if (obj instanceof PortableObject)
+            return ((PortableObject)obj).typeId();
+
+        return obj.getClass();
     }
 
     /**
-     * Extract key type id from key class name.
+     * Calculate type ID for given type name.
      *
-     * @param type String description of key type.
-     * @return Key type id.
-     * @throws CacheException If failed to get type key id from object.
+     * @param keepSerialized If {@code true} then calculate type ID for 
portable object otherwise for POJO.
+     * @param typeName String description of type name.
+     * @return Type ID.
+     * @throws CacheException If failed to get type ID for given type name.
      */
-    private Object keyTypeId(String type) throws CacheException {
+    private Object typeIdForTypeName(boolean keepSerialized, String typeName) 
throws CacheException {
+        if (keepSerialized)
+            return ignite().portables().typeId(typeName);
+
         try {
-            return Class.forName(type);
+            return Class.forName(typeName);
         }
         catch (ClassNotFoundException e) {
-            throw new CacheException("Failed to find class: " + type, e);
+            throw new CacheException("Failed to find class: " + typeName, e);
         }
     }
 
@@ -838,22 +907,27 @@ public class CacheJdbcPojoStore<K, V> implements 
CacheStore<K, V>, LifecycleAwar
                 Map<String, Boolean> tk = new HashMap<>(cacheTypes.size() * 2);
 
                 for (CacheJdbcPojoStoreType type : cacheTypes) {
-                    tk.put(type.getKeyType(), type.isKeepSerialized());
-                    tk.put(type.getValueType(), type.isKeepSerialized());
+                    boolean keepSerialized = type.isKeepSerialized();
 
-                    Object keyTypeId = keyTypeId(type.getKeyType());
+                    String keyType = type.getKeyType();
+                    String valType = type.getValueType();
+
+                    tk.put(keyType, keepSerialized);
+                    tk.put(valType, keepSerialized);
+
+                    Object keyTypeId = typeIdForTypeName(keepSerialized, 
keyType);
 
                     if (entryMappings.containsKey(keyTypeId))
                         throw new CacheException("Key type must be unique in 
type metadata [cache name=" + cacheName +
-                            ", key type=" + type.getKeyType() + "]");
+                            ", key type=" + keyType + "]");
 
-                    checkMapping(cacheName, type.getKeyType(), 
type.getKeyFields());
-                    checkMapping(cacheName, type.getValueType(), 
type.getValueFields());
+                    checkMapping(cacheName, keyType, type.getKeyFields());
+                    checkMapping(cacheName, valType, type.getValueFields());
 
-                    entryMappings.put(keyTypeId(type.getKeyType()), new 
EntryMapping(cacheName, dialect, type));
+                    entryMappings.put(keyTypeId, new EntryMapping(cacheName, 
dialect, type));
                 }
 
-                typesKind.put(cacheName, tk);
+                keepSerializedTypes.put(cacheName, tk);
 
                 Map<String, Map<Object, EntryMapping>> mappings = new 
HashMap<>(cacheMappings);
                 mappings.put(cacheName, entryMappings);
@@ -915,7 +989,11 @@ public class CacheJdbcPojoStore<K, V> implements 
CacheStore<K, V>, LifecycleAwar
 
                     String selQry = args[i + 1].toString();
 
-                    EntryMapping em = entryMapping(cacheName, 
keyTypeId(keyType), keyType);
+                    // We must build cache mappings first.
+                    cacheMappings(cacheName);
+
+                    EntryMapping em = entryMapping(cacheName, 
typeIdForTypeName(isKeepSerialized(cacheName, keyType),
+                        keyType), keyType);
 
                     futs.add(pool.submit(new LoadCacheCustomQueryWorker<>(em, 
selQry, clo)));
                 }
@@ -1000,7 +1078,7 @@ public class CacheJdbcPojoStore<K, V> implements 
CacheStore<K, V>, LifecycleAwar
     @Nullable @Override public V load(K key) throws CacheLoaderException {
         assert key != null;
 
-        EntryMapping em = entryMapping(session().cacheName(), keyTypeId(key), 
key);
+        EntryMapping em = entryMapping(session().cacheName(), 
typeIdForObject(key), key);
 
         if (log.isDebugEnabled())
             log.debug("Load value from db [table= " + em.fullTableName() + ", 
key=" + key + "]");
@@ -1048,7 +1126,7 @@ public class CacheJdbcPojoStore<K, V> implements 
CacheStore<K, V>, LifecycleAwar
             Map<K, V> res = new HashMap<>();
 
             for (K key : keys) {
-                Object keyTypeId = keyTypeId(key);
+                Object keyTypeId = typeIdForObject(key);
 
                 EntryMapping em = entryMapping(cacheName, keyTypeId, key);
 
@@ -1157,7 +1235,7 @@ public class CacheJdbcPojoStore<K, V> implements 
CacheStore<K, V>, LifecycleAwar
 
         K key = entry.getKey();
 
-        EntryMapping em = entryMapping(session().cacheName(), keyTypeId(key), 
key);
+        EntryMapping em = entryMapping(session().cacheName(), 
typeIdForObject(key), key);
 
         if (log.isDebugEnabled())
             log.debug("Start write entry to database [table=" + 
em.fullTableName() + ", entry=" + entry + "]");
@@ -1246,7 +1324,7 @@ public class CacheJdbcPojoStore<K, V> implements 
CacheStore<K, V>, LifecycleAwar
                     for (Cache.Entry<? extends K, ? extends V> entry : 
entries) {
                         K key = entry.getKey();
 
-                        Object keyTypeId = keyTypeId(key);
+                        Object keyTypeId = typeIdForObject(key);
 
                         em = entryMapping(cacheName, keyTypeId, key);
 
@@ -1314,7 +1392,7 @@ public class CacheJdbcPojoStore<K, V> implements 
CacheStore<K, V>, LifecycleAwar
                     for (Cache.Entry<? extends K, ? extends V> entry : 
entries) {
                         K key = entry.getKey();
 
-                        Object keyTypeId = keyTypeId(key);
+                        Object keyTypeId = typeIdForObject(key);
 
                         EntryMapping em = entryMapping(cacheName, keyTypeId, 
key);
 
@@ -1352,7 +1430,7 @@ public class CacheJdbcPojoStore<K, V> implements 
CacheStore<K, V>, LifecycleAwar
     @Override public void delete(Object key) throws CacheWriterException {
         assert key != null;
 
-        EntryMapping em = entryMapping(session().cacheName(), keyTypeId(key), 
key);
+        EntryMapping em = entryMapping(session().cacheName(), 
typeIdForObject(key), key);
 
         if (log.isDebugEnabled())
             log.debug("Remove value from db [table=" + em.fullTableName() + ", 
key=" + key + "]");
@@ -1456,7 +1534,7 @@ public class CacheJdbcPojoStore<K, V> implements 
CacheStore<K, V>, LifecycleAwar
             int fromIdx = 0, prepared = 0;
 
             for (Object key : keys) {
-                Object keyTypeId = keyTypeId(key);
+                Object keyTypeId = typeIdForObject(key);
 
                 em = entryMapping(cacheName, keyTypeId, key);
 

Reply via email to