Repository: ignite Updated Branches: refs/heads/ignite-1753 bbdc33eb0 -> 1dfef7aa2
IGNITE-1753 WIP on reworking JDBC POJO store to new configuration. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1dfef7aa Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1dfef7aa Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1dfef7aa Branch: refs/heads/ignite-1753 Commit: 1dfef7aa266c3120e838d10f24bb1ac9c05ef11a Parents: bbdc33e Author: Alexey Kuznetsov <akuznet...@apache.org> Authored: Mon Oct 26 14:37:02 2015 +0700 Committer: Alexey Kuznetsov <akuznet...@apache.org> Committed: Mon Oct 26 14:37:02 2015 +0700 ---------------------------------------------------------------------- .../cache/store/jdbc/CacheJdbcPojoStore.java | 202 +++++++++++++------ 1 file changed, 140 insertions(+), 62 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/1dfef7aa/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStore.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStore.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStore.java index b25017c..f9225c5 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStore.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStore.java @@ -71,6 +71,7 @@ import org.apache.ignite.lang.IgniteBiInClosure; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.lifecycle.LifecycleAware; import org.apache.ignite.portable.PortableBuilder; +import org.apache.ignite.portable.PortableObject; import org.apache.ignite.resources.CacheStoreSessionResource; import org.apache.ignite.resources.IgniteInstanceResource; import org.apache.ignite.resources.LoggerResource; @@ -134,13 +135,35 @@ public class CacheJdbcPojoStore<K, V> implements CacheStore<K, V>, LifecycleAwar private CacheJdbcPojoStoreType[] types; /** Map for quick check whether type is POJO or Portable. */ - private volatile Map<String, Map<String, Boolean>> typesKind = Collections.emptyMap(); + private volatile Map<String, Map<String, Boolean>> keepSerializedTypes = new HashMap<>(); /** POJO methods cache. */ private volatile Map<String, Map<String, PojoMethodsCache>> pojoMethods = Collections.emptyMap(); /** Portables builders cache. */ - protected volatile Map<String, Map<String, PortableBuilder>> portableBuilders = Collections.emptyMap(); + private volatile Map<String, Map<String, PortableBuilder>> portableBuilders = Collections.emptyMap(); + + /** + * Checks for POJO/portable format. + * + * @param cacheName Cache name to get types settings. + * @param typeName Type name to check for POJO/portable format. + * @return {@code true} If portable format configured. + * @throws CacheException In case of error. + */ + private boolean isKeepSerialized(String cacheName, String typeName) { + Map<String, Boolean> cacheTypes = keepSerializedTypes.get(cacheName); + + if (cacheTypes == null) + throw new CacheException("Failed to find types metadata for cache: " + cacheName); + + Boolean keepSerialized = cacheTypes.get(typeName); + + if (keepSerialized == null) + throw new CacheException("Failed to find type metadata for type: " + typeName); + + return keepSerialized; + } /** * Get field value from object for use as query parameter. @@ -154,11 +177,33 @@ public class CacheJdbcPojoStore<K, V> implements CacheStore<K, V>, LifecycleAwar */ @Nullable private Object extractParameter(@Nullable String cacheName, String typeName, String fieldName, Object obj) throws CacheException { + return isKeepSerialized(cacheName, typeName) + ? extractPortableParameter(fieldName, obj) + : extractPojoParameter(cacheName, typeName, fieldName, obj); + } + + /** + * Get field value from POJO for use as query parameter. + * + * @param cacheName Cache name. + * @param typeName Type name. + * @param fieldName Field name. + * @param obj Cache object. + * @return Field value from object. + * @throws CacheException in case of error. + */ + @Nullable private Object extractPojoParameter(@Nullable String cacheName, String typeName, String fieldName, + Object obj) throws CacheException { try { - PojoMethodsCache mc = pojoMethods.get(cacheName).get(typeName); + Map<String, PojoMethodsCache> cacheMethods = pojoMethods.get(cacheName); + + if (cacheMethods == null) + throw new CacheException("Failed to find POJO type metadata for cache: " + cacheName); + + PojoMethodsCache mc = cacheMethods.get(typeName); if (mc == null) - throw new CacheException("Failed to find cache type metadata for type: " + typeName); + throw new CacheException("Failed to find POJO type metadata for type: " + typeName); if (mc.simple) return obj; @@ -177,6 +222,25 @@ public class CacheJdbcPojoStore<K, V> implements CacheStore<K, V>, LifecycleAwar } /** + * Get field value from Portable for use as query parameter. + * + * @param fieldName Field name to extract query parameter for. + * @param obj Object to process. + * @return Field value from object. + * @throws CacheException in case of error. + */ + private Object extractPortableParameter(String fieldName, Object obj) throws CacheException { + if (obj instanceof PortableObject) { + PortableObject pobj = (PortableObject)obj; + + return pobj.field(fieldName); + } + + throw new CacheException("Failed to read property value from non portable object [class name=" + + obj.getClass() + ", property=" + fieldName + "]"); + } + + /** * Construct object from query result. * * @param <R> Type of result object. @@ -191,42 +255,30 @@ public class CacheJdbcPojoStore<K, V> implements CacheStore<K, V>, LifecycleAwar private <R> R buildObject(@Nullable String cacheName, String typeName, CacheJdbcPojoStoreTypeField[] fields, Map<String, Integer> loadColIdxs, ResultSet rs) throws CacheLoaderException { - - Map<String, Boolean> z = typesKind.get(cacheName); - - if (z == null) - throw new CacheLoaderException("Failed to find type metadata for cache: " + cacheName); - - Boolean keepSerialized = z.get(typeName); - - if (keepSerialized == null) - throw new CacheLoaderException("Failed to find type metadata for type: " + typeName); - - return (R)(keepSerialized + return (R)(isKeepSerialized(cacheName, typeName) ? buildPortableObject(cacheName, typeName, fields, loadColIdxs, rs) : buildPojoObject(cacheName, typeName, fields, loadColIdxs, rs)); } /** - * Construct object from query result. + * Construct POJO from query result. * - * @param <R> Type of result object. * @param cacheName Cache name. * @param typeName Type name. * @param fields Fields descriptors. * @param loadColIdxs Select query columns index. * @param rs ResultSet. - * @return Constructed object. - * @throws CacheLoaderException If failed to construct cache object. + * @return Constructed POJO. + * @throws CacheLoaderException If failed to construct POJO. */ - private <R> R buildPojoObject(@Nullable String cacheName, String typeName, + private Object buildPojoObject(@Nullable String cacheName, String typeName, CacheJdbcPojoStoreTypeField[] fields, Map<String, Integer> loadColIdxs, ResultSet rs) throws CacheLoaderException { Map<String, PojoMethodsCache> z = pojoMethods.get(cacheName); if (z == null) - throw new CacheLoaderException("Failed to find POJO type metadata for cache: " + cacheName); + throw new CacheLoaderException("Failed to find POJO types metadata for cache: " + cacheName); PojoMethodsCache mc = z.get(typeName); @@ -237,7 +289,7 @@ public class CacheJdbcPojoStore<K, V> implements CacheStore<K, V>, LifecycleAwar if (mc.simple) { CacheJdbcPojoStoreTypeField field = fields[0]; - return (R)getColumnValue(rs, loadColIdxs.get(field.getDatabaseFieldName()), mc.cls); + return getColumnValue(rs, loadColIdxs.get(field.getDatabaseFieldName()), mc.cls); } Object obj = mc.ctor.newInstance(); @@ -264,7 +316,7 @@ public class CacheJdbcPojoStore<K, V> implements CacheStore<K, V>, LifecycleAwar } } - return (R)obj; + return obj; } catch (SQLException e) { throw new CacheLoaderException("Failed to read object of class: " + typeName, e); @@ -274,17 +326,27 @@ public class CacheJdbcPojoStore<K, V> implements CacheStore<K, V>, LifecycleAwar } } - /** {@inheritDoc} */ - protected <R> R buildPortableObject(String cacheName, String typeName, CacheJdbcPojoStoreTypeField[] fields, + /** + * Construct portable object from query result. + * + * @param cacheName Cache name. + * @param typeName Type name. + * @param fields Fields descriptors. + * @param loadColIdxs Select query columns index. + * @param rs ResultSet. + * @return Constructed portable object. + * @throws CacheLoaderException If failed to construct portable object. + */ + protected PortableObject buildPortableObject(String cacheName, String typeName, CacheJdbcPojoStoreTypeField[] fields, Map<String, Integer> loadColIdxs, ResultSet rs) throws CacheException { - Map<String, PortableBuilder> z = portableBuilders.get(cacheName); + Map<String, PortableBuilder> cachePortables = portableBuilders.get(cacheName); - if (z == null) - throw new CacheException("Failed to find portable builder for cache: " + cacheName); + if (cachePortables == null) + throw new CacheException("Failed to find portable builders for cache: " + cacheName); - PortableBuilder b = z.get(typeName); + PortableBuilder builder = cachePortables.get(typeName); - if (b == null) + if (builder == null) throw new CacheException("Failed to find portable builder for type: " + typeName); try { @@ -293,10 +355,10 @@ public class CacheJdbcPojoStore<K, V> implements CacheStore<K, V>, LifecycleAwar Integer colIdx = loadColIdxs.get(field.getDatabaseFieldName()); - b.setField(field.getJavaFieldName(), getColumnValue(rs, colIdx, type)); + builder.setField(field.getJavaFieldName(), getColumnValue(rs, colIdx, type)); } - return (R)b.build(); + return builder.build(); } catch (SQLException e) { throw new CacheException("Failed to read portable object", e); @@ -304,29 +366,36 @@ public class CacheJdbcPojoStore<K, V> implements CacheStore<K, V>, LifecycleAwar } /** - * Extract key type id from key object. + * Calculate type ID for object. * - * @param key Key object. - * @return Key type id. - * @throws CacheException If failed to get type key id from object. + * @param obj Object to calculate type ID for. + * @return Type ID. + * @throws CacheException If failed to calculate type ID for given object. */ - private Object keyTypeId(Object key) throws CacheException { - return key.getClass(); + private Object typeIdForObject(Object obj) throws CacheException { + if (obj instanceof PortableObject) + return ((PortableObject)obj).typeId(); + + return obj.getClass(); } /** - * Extract key type id from key class name. + * Calculate type ID for given type name. * - * @param type String description of key type. - * @return Key type id. - * @throws CacheException If failed to get type key id from object. + * @param keepSerialized If {@code true} then calculate type ID for portable object otherwise for POJO. + * @param typeName String description of type name. + * @return Type ID. + * @throws CacheException If failed to get type ID for given type name. */ - private Object keyTypeId(String type) throws CacheException { + private Object typeIdForTypeName(boolean keepSerialized, String typeName) throws CacheException { + if (keepSerialized) + return ignite().portables().typeId(typeName); + try { - return Class.forName(type); + return Class.forName(typeName); } catch (ClassNotFoundException e) { - throw new CacheException("Failed to find class: " + type, e); + throw new CacheException("Failed to find class: " + typeName, e); } } @@ -838,22 +907,27 @@ public class CacheJdbcPojoStore<K, V> implements CacheStore<K, V>, LifecycleAwar Map<String, Boolean> tk = new HashMap<>(cacheTypes.size() * 2); for (CacheJdbcPojoStoreType type : cacheTypes) { - tk.put(type.getKeyType(), type.isKeepSerialized()); - tk.put(type.getValueType(), type.isKeepSerialized()); + boolean keepSerialized = type.isKeepSerialized(); - Object keyTypeId = keyTypeId(type.getKeyType()); + String keyType = type.getKeyType(); + String valType = type.getValueType(); + + tk.put(keyType, keepSerialized); + tk.put(valType, keepSerialized); + + Object keyTypeId = typeIdForTypeName(keepSerialized, keyType); if (entryMappings.containsKey(keyTypeId)) throw new CacheException("Key type must be unique in type metadata [cache name=" + cacheName + - ", key type=" + type.getKeyType() + "]"); + ", key type=" + keyType + "]"); - checkMapping(cacheName, type.getKeyType(), type.getKeyFields()); - checkMapping(cacheName, type.getValueType(), type.getValueFields()); + checkMapping(cacheName, keyType, type.getKeyFields()); + checkMapping(cacheName, valType, type.getValueFields()); - entryMappings.put(keyTypeId(type.getKeyType()), new EntryMapping(cacheName, dialect, type)); + entryMappings.put(keyTypeId, new EntryMapping(cacheName, dialect, type)); } - typesKind.put(cacheName, tk); + keepSerializedTypes.put(cacheName, tk); Map<String, Map<Object, EntryMapping>> mappings = new HashMap<>(cacheMappings); mappings.put(cacheName, entryMappings); @@ -915,7 +989,11 @@ public class CacheJdbcPojoStore<K, V> implements CacheStore<K, V>, LifecycleAwar String selQry = args[i + 1].toString(); - EntryMapping em = entryMapping(cacheName, keyTypeId(keyType), keyType); + // We must build cache mappings first. + cacheMappings(cacheName); + + EntryMapping em = entryMapping(cacheName, typeIdForTypeName(isKeepSerialized(cacheName, keyType), + keyType), keyType); futs.add(pool.submit(new LoadCacheCustomQueryWorker<>(em, selQry, clo))); } @@ -1000,7 +1078,7 @@ public class CacheJdbcPojoStore<K, V> implements CacheStore<K, V>, LifecycleAwar @Nullable @Override public V load(K key) throws CacheLoaderException { assert key != null; - EntryMapping em = entryMapping(session().cacheName(), keyTypeId(key), key); + EntryMapping em = entryMapping(session().cacheName(), typeIdForObject(key), key); if (log.isDebugEnabled()) log.debug("Load value from db [table= " + em.fullTableName() + ", key=" + key + "]"); @@ -1048,7 +1126,7 @@ public class CacheJdbcPojoStore<K, V> implements CacheStore<K, V>, LifecycleAwar Map<K, V> res = new HashMap<>(); for (K key : keys) { - Object keyTypeId = keyTypeId(key); + Object keyTypeId = typeIdForObject(key); EntryMapping em = entryMapping(cacheName, keyTypeId, key); @@ -1157,7 +1235,7 @@ public class CacheJdbcPojoStore<K, V> implements CacheStore<K, V>, LifecycleAwar K key = entry.getKey(); - EntryMapping em = entryMapping(session().cacheName(), keyTypeId(key), key); + EntryMapping em = entryMapping(session().cacheName(), typeIdForObject(key), key); if (log.isDebugEnabled()) log.debug("Start write entry to database [table=" + em.fullTableName() + ", entry=" + entry + "]"); @@ -1246,7 +1324,7 @@ public class CacheJdbcPojoStore<K, V> implements CacheStore<K, V>, LifecycleAwar for (Cache.Entry<? extends K, ? extends V> entry : entries) { K key = entry.getKey(); - Object keyTypeId = keyTypeId(key); + Object keyTypeId = typeIdForObject(key); em = entryMapping(cacheName, keyTypeId, key); @@ -1314,7 +1392,7 @@ public class CacheJdbcPojoStore<K, V> implements CacheStore<K, V>, LifecycleAwar for (Cache.Entry<? extends K, ? extends V> entry : entries) { K key = entry.getKey(); - Object keyTypeId = keyTypeId(key); + Object keyTypeId = typeIdForObject(key); EntryMapping em = entryMapping(cacheName, keyTypeId, key); @@ -1352,7 +1430,7 @@ public class CacheJdbcPojoStore<K, V> implements CacheStore<K, V>, LifecycleAwar @Override public void delete(Object key) throws CacheWriterException { assert key != null; - EntryMapping em = entryMapping(session().cacheName(), keyTypeId(key), key); + EntryMapping em = entryMapping(session().cacheName(), typeIdForObject(key), key); if (log.isDebugEnabled()) log.debug("Remove value from db [table=" + em.fullTableName() + ", key=" + key + "]"); @@ -1456,7 +1534,7 @@ public class CacheJdbcPojoStore<K, V> implements CacheStore<K, V>, LifecycleAwar int fromIdx = 0, prepared = 0; for (Object key : keys) { - Object keyTypeId = keyTypeId(key); + Object keyTypeId = typeIdForObject(key); em = entryMapping(cacheName, keyTypeId, key);