http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/EntityDefinitionManager.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/EntityDefinitionManager.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/EntityDefinitionManager.java index 8795ba0..a4557f2 100755 --- a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/EntityDefinitionManager.java +++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/EntityDefinitionManager.java @@ -35,440 +35,483 @@ import java.util.concurrent.ConcurrentHashMap; * static initialization of all registered entities. As of now, dynamic registration is not supported */ public class EntityDefinitionManager { - private static final Logger LOG = LoggerFactory.getLogger(EntityDefinitionManager.class); - private static volatile boolean initialized = false; - /** - * using concurrent hashmap is due to the fact that entity can be registered any time from any thread - */ - private static Map<String, EntityDefinition> entityServiceMap = new ConcurrentHashMap<String, EntityDefinition>(); - private static Map<Class<? extends TaggedLogAPIEntity>, EntityDefinition> classMap = new ConcurrentHashMap<Class<? extends TaggedLogAPIEntity>, EntityDefinition>(); - private static Map<Class<?>, EntitySerDeser<?>> _serDeserMap = new ConcurrentHashMap<Class<?>, EntitySerDeser<?>>(); - private static Map<Class<?>, Integer> _serDeserClassIDMap = new ConcurrentHashMap<Class<?>, Integer>(); - private static Map<Integer, Class<?>> _serIDDeserClassMap = new ConcurrentHashMap<Integer, Class<?>>(); - private static Map<String, Map<Integer, EntityDefinition>> entityPrefixMap = new ConcurrentHashMap<String, Map<Integer, EntityDefinition>>(); - private static Map<String, Map<Integer, IndexDefinition>> indexPrefixMap = new ConcurrentHashMap<String, Map<Integer, IndexDefinition>>(); - - static{ - int id = 0; - _serDeserMap.put(NullObject.class, new NullSerDeser()); - _serIDDeserClassMap.put(id, NullObject.class); - _serDeserClassIDMap.put(NullObject.class, id++); - - _serDeserMap.put(String.class, new StringSerDeser()); - _serIDDeserClassMap.put(id, String.class); - _serDeserClassIDMap.put(String.class, id++); - - _serDeserMap.put(long.class, new LongSerDeser()); - _serIDDeserClassMap.put(id, long.class); - _serDeserClassIDMap.put(long.class, id++); - - _serDeserMap.put(Long.class, new LongSerDeser()); - _serIDDeserClassMap.put(id, Long.class); - _serDeserClassIDMap.put(Long.class, id++); - - _serDeserMap.put(int.class, new IntSerDeser()); - _serIDDeserClassMap.put(id, int.class); - _serDeserClassIDMap.put(int.class, id++); - - _serDeserMap.put(Integer.class, new IntSerDeser()); - _serIDDeserClassMap.put(id, Integer.class); - _serDeserClassIDMap.put(Integer.class, id++); - - _serDeserMap.put(Double.class, new DoubleSerDeser()); - _serIDDeserClassMap.put(id, Double.class); - _serDeserClassIDMap.put(Double.class, id++); - - _serDeserMap.put(double.class, new DoubleSerDeser()); - _serIDDeserClassMap.put(id, double.class); - _serDeserClassIDMap.put(double.class, id++); - - _serDeserMap.put(int[].class, new IntArraySerDeser()); - _serIDDeserClassMap.put(id, int[].class); - _serDeserClassIDMap.put(int[].class, id++); - - _serDeserMap.put(double[].class, new DoubleArraySerDeser()); - _serIDDeserClassMap.put(id, double[].class); - _serDeserClassIDMap.put(double[].class, id++); - - _serDeserMap.put(double[][].class, new Double2DArraySerDeser()); - _serIDDeserClassMap.put(id, double[][].class); - _serDeserClassIDMap.put(double[][].class, id++); - - _serDeserMap.put(Boolean.class, new BooleanSerDeser()); - _serIDDeserClassMap.put(id, Boolean.class); - _serDeserClassIDMap.put(Boolean.class, id++); - - _serDeserMap.put(boolean.class, new BooleanSerDeser()); - _serIDDeserClassMap.put(id, boolean.class); - _serDeserClassIDMap.put(boolean.class, id++); - - _serDeserMap.put(String[].class, new StringArraySerDeser()); - _serIDDeserClassMap.put(id, String[].class); - _serDeserClassIDMap.put(String[].class, id++); - - _serDeserMap.put(Map.class, new MapSerDeser()); - _serIDDeserClassMap.put(id, Map.class); - _serDeserClassIDMap.put(Map.class, id++); - - _serDeserMap.put(List.class, new ListSerDeser()); - _serIDDeserClassMap.put(id, List.class); - _serDeserClassIDMap.put(List.class, id++); - } - - - - @SuppressWarnings("rawtypes") - public static EntitySerDeser getSerDeser(Class<?> clazz){ - return _serDeserMap.get(clazz); - } - - /** - * Get internal ID by the predefined registered class - * @param clazz original for serialization/deserialization - * @return the internal id if the input class has been registered, otherwise return -1 - */ - public static int getIDBySerDerClass(Class<?> clazz) { - final Integer id = _serDeserClassIDMap.get(clazz); - if (id == null) { - return -1; - } - return id; - } - - - /** - * Get the predefined registered class by internal ID - * @param id the internal class ID - * @return the predefined registered class, if the class hasn't been registered, return null - */ - public static Class<?> getClassByID(int id) { - return _serIDDeserClassMap.get(id); - } - - /** - * it is allowed that user can register their own entity - * @param clazz entity class - * @throws IllegalArgumentException - */ - public static void registerEntity(Class<? extends TaggedLogAPIEntity> clazz) throws IllegalArgumentException{ - registerEntity(createEntityDefinition(clazz)); - } - - /** - * it is allowed that user can register their own entity - * @deprecated This API is deprecated since we need to use Service annotation to define service name for entities - * @param serviceName entity service name - * @param clazz entity class - * @throws IllegalArgumentException - * - */ + private static final Logger LOG = LoggerFactory.getLogger(EntityDefinitionManager.class); + private static volatile boolean initialized = false; + /** + * using concurrent hashmap is due to the fact that entity can be registered any time from any thread + */ + private static Map<String, EntityDefinition> entityServiceMap = new ConcurrentHashMap<String, EntityDefinition>(); + private static Map<Class<? extends TaggedLogAPIEntity>, EntityDefinition> classMap = new ConcurrentHashMap<Class<? extends TaggedLogAPIEntity>, EntityDefinition>(); + private static Map<Class<?>, EntitySerDeser<?>> _serDeserMap = new ConcurrentHashMap<Class<?>, EntitySerDeser<?>>(); + private static Map<Class<?>, Integer> _serDeserClassIDMap = new ConcurrentHashMap<Class<?>, Integer>(); + private static Map<Integer, Class<?>> _serIDDeserClassMap = new ConcurrentHashMap<Integer, Class<?>>(); + private static Map<String, Map<Integer, EntityDefinition>> entityPrefixMap = new ConcurrentHashMap<String, Map<Integer, EntityDefinition>>(); + private static Map<String, Map<Integer, IndexDefinition>> indexPrefixMap = new ConcurrentHashMap<String, Map<Integer, IndexDefinition>>(); + + static { + int id = 0; + _serDeserMap.put(NullObject.class, new NullSerDeser()); + _serIDDeserClassMap.put(id, NullObject.class); + _serDeserClassIDMap.put(NullObject.class, id++); + + _serDeserMap.put(String.class, new StringSerDeser()); + _serIDDeserClassMap.put(id, String.class); + _serDeserClassIDMap.put(String.class, id++); + + _serDeserMap.put(long.class, new LongSerDeser()); + _serIDDeserClassMap.put(id, long.class); + _serDeserClassIDMap.put(long.class, id++); + + _serDeserMap.put(Long.class, new LongSerDeser()); + _serIDDeserClassMap.put(id, Long.class); + _serDeserClassIDMap.put(Long.class, id++); + + _serDeserMap.put(int.class, new IntSerDeser()); + _serIDDeserClassMap.put(id, int.class); + _serDeserClassIDMap.put(int.class, id++); + + _serDeserMap.put(Integer.class, new IntSerDeser()); + _serIDDeserClassMap.put(id, Integer.class); + _serDeserClassIDMap.put(Integer.class, id++); + + _serDeserMap.put(Double.class, new DoubleSerDeser()); + _serIDDeserClassMap.put(id, Double.class); + _serDeserClassIDMap.put(Double.class, id++); + + _serDeserMap.put(double.class, new DoubleSerDeser()); + _serIDDeserClassMap.put(id, double.class); + _serDeserClassIDMap.put(double.class, id++); + + _serDeserMap.put(int[].class, new IntArraySerDeser()); + _serIDDeserClassMap.put(id, int[].class); + _serDeserClassIDMap.put(int[].class, id++); + + _serDeserMap.put(double[].class, new DoubleArraySerDeser()); + _serIDDeserClassMap.put(id, double[].class); + _serDeserClassIDMap.put(double[].class, id++); + + _serDeserMap.put(double[][].class, new Double2DArraySerDeser()); + _serIDDeserClassMap.put(id, double[][].class); + _serDeserClassIDMap.put(double[][].class, id++); + + _serDeserMap.put(Boolean.class, new BooleanSerDeser()); + _serIDDeserClassMap.put(id, Boolean.class); + _serDeserClassIDMap.put(Boolean.class, id++); + + _serDeserMap.put(boolean.class, new BooleanSerDeser()); + _serIDDeserClassMap.put(id, boolean.class); + _serDeserClassIDMap.put(boolean.class, id++); + + _serDeserMap.put(String[].class, new StringArraySerDeser()); + _serIDDeserClassMap.put(id, String[].class); + _serDeserClassIDMap.put(String[].class, id++); + + _serDeserMap.put(Map.class, new MapSerDeser()); + _serIDDeserClassMap.put(id, Map.class); + _serDeserClassIDMap.put(Map.class, id++); + + _serDeserMap.put(List.class, new ListSerDeser()); + _serIDDeserClassMap.put(id, List.class); + _serDeserClassIDMap.put(List.class, id++); + } + + @SuppressWarnings("rawtypes") + public static EntitySerDeser getSerDeser(Class<?> clazz) { + return _serDeserMap.get(clazz); + } + + /** + * Get internal ID by the predefined registered class + * + * @param clazz original for serialization/deserialization + * @return the internal id if the input class has been registered, otherwise return -1 + */ + public static int getIDBySerDerClass(Class<?> clazz) { + final Integer id = _serDeserClassIDMap.get(clazz); + if (id == null) { + return -1; + } + return id; + } + + /** + * Get the predefined registered class by internal ID + * + * @param id the internal class ID + * @return the predefined registered class, if the class hasn't been registered, return null + */ + public static Class<?> getClassByID(int id) { + return _serIDDeserClassMap.get(id); + } + + /** + * it is allowed that user can register their own entity + * + * @param clazz entity class + * @throws IllegalArgumentException + */ + public static void registerEntity(Class<? extends TaggedLogAPIEntity> clazz) + throws IllegalArgumentException { + registerEntity(createEntityDefinition(clazz)); + } + + /** + * it is allowed that user can register their own entity + * + * @deprecated This API is deprecated since we need to use Service annotation to define service name for + * entities + * @param serviceName entity service name + * @param clazz entity class + * @throws IllegalArgumentException + */ @Deprecated - public static void registerEntity(String serviceName, Class<? extends TaggedLogAPIEntity> clazz) throws IllegalArgumentException{ - registerEntity(serviceName, createEntityDefinition(clazz)); - } - - /** - * it is allowed that user can register their own entity definition - * @param entityDef entity definition - * @throws IllegalArgumentException - */ - public static void registerEntity(EntityDefinition entityDef) { - registerEntity(entityDef.getService(), entityDef); - } - - /** - * it is allowed that user can register their own entity definition - * @deprecated This API is deprecated since we need to use Service annotation to define service name for entities. - * - * @param entityDef entity definition - * @throws IllegalArgumentException - */ - public static void registerEntity(String serviceName, EntityDefinition entityDef) { - final String table = entityDef.getTable(); - if (entityServiceMap.containsKey(serviceName)) { - final EntityDefinition existing = entityServiceMap.get(serviceName); - if (entityDef.getClass().equals(existing.getClass())) { - return; - } - throw new IllegalArgumentException("Service " + serviceName + " has already been registered by " + existing.getClass().getName() + ", so class " + entityDef.getClass() + " can NOT be registered"); - } - synchronized (EntityDefinitionManager.class) { - checkPrefix(entityDef); - entityServiceMap.put(serviceName, entityDef); - Map<Integer, EntityDefinition> entityHashMap = entityPrefixMap.get(table); - if (entityHashMap == null) { - entityHashMap = new ConcurrentHashMap<Integer, EntityDefinition>(); - entityPrefixMap.put(table, entityHashMap); - } - entityHashMap.put(entityDef.getPrefix().hashCode(), entityDef); - final IndexDefinition[] indexes = entityDef.getIndexes(); - if (indexes != null) { - for (IndexDefinition index : indexes) { - Map<Integer, IndexDefinition> indexHashMap = indexPrefixMap.get(table); - if (indexHashMap == null) { - indexHashMap = new ConcurrentHashMap<Integer, IndexDefinition>(); - indexPrefixMap.put(table, indexHashMap); - } - indexHashMap.put(index.getIndexPrefix().hashCode(), index); - } - } - classMap.put(entityDef.getEntityClass(), entityDef); - } - if(LOG.isDebugEnabled()) { - LOG.debug(entityDef.getEntityClass().getSimpleName() + " entity registered successfully, table name: " + entityDef.getTable() + - ", prefix: " + entityDef.getPrefix() + ", service: " + serviceName + ", CF: " + entityDef.getColumnFamily()); - }else{ - LOG.info(String.format("Registered %s (%s)", entityDef.getEntityClass().getSimpleName(), serviceName)); + public static void registerEntity(String serviceName, Class<? extends TaggedLogAPIEntity> clazz) + throws IllegalArgumentException { + registerEntity(serviceName, createEntityDefinition(clazz)); + } + + /** + * it is allowed that user can register their own entity definition + * + * @param entityDef entity definition + * @throws IllegalArgumentException + */ + public static void registerEntity(EntityDefinition entityDef) { + registerEntity(entityDef.getService(), entityDef); + } + + /** + * it is allowed that user can register their own entity definition + * + * @deprecated This API is deprecated since we need to use Service annotation to define service name for + * entities. + * @param entityDef entity definition + * @throws IllegalArgumentException + */ + @Deprecated + public static void registerEntity(String serviceName, EntityDefinition entityDef) { + final String table = entityDef.getTable(); + if (entityServiceMap.containsKey(serviceName)) { + final EntityDefinition existing = entityServiceMap.get(serviceName); + if (entityDef.getClass().equals(existing.getClass())) { + return; + } + throw new IllegalArgumentException("Service " + serviceName + " has already been registered by " + + existing.getClass().getName() + ", so class " + + entityDef.getClass() + " can NOT be registered"); + } + synchronized (EntityDefinitionManager.class) { + checkPrefix(entityDef); + entityServiceMap.put(serviceName, entityDef); + Map<Integer, EntityDefinition> entityHashMap = entityPrefixMap.get(table); + if (entityHashMap == null) { + entityHashMap = new ConcurrentHashMap<Integer, EntityDefinition>(); + entityPrefixMap.put(table, entityHashMap); + } + entityHashMap.put(entityDef.getPrefix().hashCode(), entityDef); + final IndexDefinition[] indexes = entityDef.getIndexes(); + if (indexes != null) { + for (IndexDefinition index : indexes) { + Map<Integer, IndexDefinition> indexHashMap = indexPrefixMap.get(table); + if (indexHashMap == null) { + indexHashMap = new ConcurrentHashMap<Integer, IndexDefinition>(); + indexPrefixMap.put(table, indexHashMap); + } + indexHashMap.put(index.getIndexPrefix().hashCode(), index); + } + } + classMap.put(entityDef.getEntityClass(), entityDef); + } + if (LOG.isDebugEnabled()) { + LOG.debug(entityDef.getEntityClass().getSimpleName() + + " entity registered successfully, table name: " + entityDef.getTable() + ", prefix: " + + entityDef.getPrefix() + ", service: " + serviceName + ", CF: " + + entityDef.getColumnFamily()); + } else { + LOG.info(String.format("Registered %s (%s)", entityDef.getEntityClass().getSimpleName(), + serviceName)); } - } - - private static void checkPrefix(EntityDefinition entityDef) { - final Integer entityPrefixHashcode = entityDef.getPrefix().hashCode(); - if (entityPrefixMap.containsKey(entityDef.getTable())) { - final Map<Integer, EntityDefinition> entityHashMap = entityPrefixMap.get(entityDef.getTable()); - if (entityHashMap.containsKey(entityPrefixHashcode) && (!entityDef.equals(entityHashMap.get(entityPrefixHashcode)))) { - throw new IllegalArgumentException("Failed to register entity " + entityDef.getClass().getName() + ", because of the prefix hash code conflict! The entity prefix " + entityDef.getPrefix() + " has already been registered by entity service " + entityHashMap.get(entityPrefixHashcode).getService()); - } - final IndexDefinition[] indexes = entityDef.getIndexes(); - if (indexes != null) { - for (IndexDefinition index : indexes) { - final Integer indexPrefixHashcode = index.getIndexPrefix().hashCode(); - if (entityHashMap.containsKey(indexPrefixHashcode)) { - throw new IllegalArgumentException("Failed to register entity " + entityDef.getClass().getName() + ", because of the prefix hash code conflict! The index prefix " + index.getIndexPrefix() + " has already been registered by entity " + entityHashMap.get(indexPrefixHashcode).getService()); - } - final Map<Integer, IndexDefinition> indexHashMap = indexPrefixMap.get(entityDef.getTable()); - if (indexHashMap != null && indexHashMap.containsKey(indexPrefixHashcode) && (!index.equals(indexHashMap.get(indexPrefixHashcode)))) { - throw new IllegalArgumentException("Failed to register entity " + entityDef.getClass().getName() + ", because of the prefix hash code conflict! The index prefix " + index.getIndexPrefix() + " has already been registered by entity " + indexHashMap.get(indexPrefixHashcode).getEntityDefinition().getService()); - } - } - } - } - } - - /** - * Get entity definition by name - * @param serviceName - * @return - * @throws IllegalAccessException - * @throws InstantiationException - */ - public static EntityDefinition getEntityByServiceName(String serviceName) throws InstantiationException, IllegalAccessException{ - checkInit(); - return entityServiceMap.get(serviceName); - } - - public static EntityDefinition getEntityDefinitionByEntityClass(Class<? extends TaggedLogAPIEntity> clazz) throws InstantiationException, IllegalAccessException { - checkInit(); - return classMap.get(clazz); - } - - private static void checkInit() throws InstantiationException, IllegalAccessException { - if (!initialized) { - synchronized (EntityDefinitionManager.class) { - if (!initialized) { - EntityRepositoryScanner.scan(); - initialized = true; - } - } - } - } - - public static void load() throws IllegalAccessException, InstantiationException { - checkInit(); - } - - /** - * UserPrincipal can register their own field SerDeser - * @param clazz class of the the SerDeser - * @param entitySerDeser entity or field SerDeser - * @throws IllegalArgumentException - */ - public static void registerSerDeser(Class<?> clazz, EntitySerDeser<?> entitySerDeser) { - _serDeserMap.put(clazz, entitySerDeser); - } - - /** - * Check whether the entity class is time series, false by default - * @param clazz - * @return - */ - public static boolean isTimeSeries(Class<? extends TaggedLogAPIEntity> clazz){ - TimeSeries ts = clazz.getAnnotation(TimeSeries.class); - return ts != null && ts.value(); - } - - @SuppressWarnings("unchecked") - public static EntityDefinition createEntityDefinition(Class<? extends TaggedLogAPIEntity> cls) { - - final EntityDefinition ed = new EntityDefinition(); - - ed.setEntityClass(cls); - // parse cls' annotations - Table table = cls.getAnnotation(Table.class); - if(table == null || table.value().isEmpty()){ - throw new IllegalArgumentException("Entity class must have a non-empty table name annotated with @Table"); - } - String tableName = table.value(); - if(EagleConfigFactory.load().isTableNamePrefixedWithEnvironment()){ - tableName = EagleConfigFactory.load().getEnv() + "_" + tableName; - } - ed.setTable(tableName); - - ColumnFamily family = cls.getAnnotation(ColumnFamily.class); - if(family == null || family.value().isEmpty()){ - throw new IllegalArgumentException("Entity class must have a non-empty column family name annotated with @ColumnFamily"); - } - ed.setColumnFamily(family.value()); - - Prefix prefix = cls.getAnnotation(Prefix.class); - if(prefix == null || prefix.value().isEmpty()){ - throw new IllegalArgumentException("Entity class must have a non-empty prefix name annotated with @Prefix"); - } - ed.setPrefix(prefix.value()); - - TimeSeries ts = cls.getAnnotation(TimeSeries.class); - if(ts == null){ - throw new IllegalArgumentException("Entity class must have a non-empty timeseries name annotated with @TimeSeries"); - } - ed.setTimeSeries(ts.value()); - - Service service = cls.getAnnotation(Service.class); - if(service == null || service.value().isEmpty()){ - ed.setService(cls.getSimpleName()); - } else { - ed.setService(service.value()); - } - - Metric m = cls.getAnnotation(Metric.class); - Map<String, Class<?>> dynamicFieldTypes = new HashMap<String, Class<?>>(); - if(m != null){ - // metric has to be timeseries - if(!ts.value()){ - throw new IllegalArgumentException("Metric entity must be time series as well"); - } - MetricDefinition md = new MetricDefinition(); - md.setInterval(m.interval()); - ed.setMetricDefinition(md); - } - - java.lang.reflect.Field[] fields = cls.getDeclaredFields(); - for(java.lang.reflect.Field f : fields){ - Column column = f.getAnnotation(Column.class); - if(column == null || column.value().isEmpty()){ - continue; - } - Class<?> fldCls = f.getType(); - // intrusive check field type for metric entity - checkFieldTypeForMetric(ed.getMetricDefinition(), f.getName(), fldCls, dynamicFieldTypes); - Qualifier q = new Qualifier(); - q.setDisplayName(f.getName()); - q.setQualifierName(column.value()); - EntitySerDeser<?> serDeser = _serDeserMap.get(fldCls); - if(serDeser == null){ -// throw new IllegalArgumentException(fldCls.getName() + " in field " + f.getName() + -// " of entity " + cls.getSimpleName() + " has no serializer associated "); - serDeser = DefaultJavaObjctSerDeser.INSTANCE; - } - - q.setSerDeser((EntitySerDeser<Object>)serDeser); - ed.getQualifierNameMap().put(q.getQualifierName(), q); - ed.getDisplayNameMap().put(q.getDisplayName(), q); - // TODO: should refine rules, consider fields like "hCol", getter method should be gethCol() according to org.apache.commons.beanutils.PropertyUtils - final String propertyName = f.getName().substring(0,1).toUpperCase() + f.getName().substring(1); - String getterName = "get" + propertyName; - try { - Method method = cls.getMethod(getterName); - ed.getQualifierGetterMap().put(f.getName(), method); - } catch (Exception e) { - // Check if the type is boolean - getterName = "is" + propertyName; - try { - Method method = cls.getMethod(getterName); - ed.getQualifierGetterMap().put(f.getName(), method); - } catch (Exception e1) { - throw new IllegalArgumentException("Field " + f.getName() + " hasn't defined valid getter method: " + getterName, e); - } - } - if(LOG.isDebugEnabled()) LOG.debug("Field registered " + q); - } - - // TODO: Lazy create because not used at all - // dynamically create bean class - if(ed.getMetricDefinition() != null){ - Class<?> metricCls = createDynamicClassForMetric(cls.getName()+"_SingleTimestamp", dynamicFieldTypes); - ed.getMetricDefinition().setSingleTimestampEntityClass(metricCls); - } - - final Partition partition = cls.getAnnotation(Partition.class); - if (partition != null) { - final String[] partitions = partition.value(); - ed.setPartitions(partitions); - // Check if partition fields are all tag fields. Partition field can't be column field, must be tag field. - for (String part : partitions) { - if (!ed.isTag(part)) { - throw new IllegalArgumentException("Partition field can't be column field, must be tag field. " - + "Partition name: " + part); - } - } - } - - final Indexes indexes = cls.getAnnotation(Indexes.class); - if (indexes != null) { - final Index[] inds = indexes.value(); - final IndexDefinition[] indexDefinitions = new IndexDefinition[inds.length]; - for (int i = 0; i < inds.length; ++i) { - final Index ind = inds[i]; - indexDefinitions[i] = new IndexDefinition(ed, ind); - } - ed.setIndexes(indexDefinitions); - } - - final ServicePath path = cls.getAnnotation(ServicePath.class); - if (path != null) { - if (path.path() != null && (!path.path().isEmpty())) { - ed.setServiceCreationPath(path.path()); - } - } - - final Tags tags = cls.getAnnotation(Tags.class); - if(tags != null) { - String[] tagNames = tags.value(); - ed.setTags(tagNames); - } - - return ed; - } - - private static void checkFieldTypeForMetric(MetricDefinition md, String fieldName, Object fldCls, Map<String, Class<?>> dynamicFieldTypes){ - if(md != null){ - if(fldCls.equals(int[].class)){ - dynamicFieldTypes.put(fieldName, int.class); - return; - }else if(fldCls.equals(long[].class)){ - dynamicFieldTypes.put(fieldName, long.class); - return; - }else if(fldCls.equals(double[].class)){ - dynamicFieldTypes.put(fieldName, double.class); - return; - } - throw new IllegalArgumentException("Fields for metric entity must be one of int[], long[] or double[]"); - } - } - - private static Class<?> createDynamicClassForMetric(final String className, Map<String, Class<?>> dynamicFieldTypes){ - BeanGenerator beanGenerator = new BeanGenerator(); - beanGenerator.setNamingPolicy(new NamingPolicy(){ - @Override - public String getClassName(String prefix,String source, Object key, Predicate names){ - return className; - }}); - BeanGenerator.addProperties(beanGenerator, dynamicFieldTypes); - beanGenerator.setSuperclass(TaggedLogAPIEntity.class); - return (Class<?>) beanGenerator.createClass(); - } - - public static Map<String, EntityDefinition> entities() throws Exception{ - checkInit(); - return entityServiceMap; - } + } + + private static void checkPrefix(EntityDefinition entityDef) { + final Integer entityPrefixHashcode = entityDef.getPrefix().hashCode(); + if (entityPrefixMap.containsKey(entityDef.getTable())) { + final Map<Integer, EntityDefinition> entityHashMap = entityPrefixMap.get(entityDef.getTable()); + if (entityHashMap.containsKey(entityPrefixHashcode) + && (!entityDef.equals(entityHashMap.get(entityPrefixHashcode)))) { + throw new IllegalArgumentException("Failed to register entity " + entityDef.getClass() + .getName() + ", because of the prefix hash code conflict! The entity prefix " + + entityDef.getPrefix() + + " has already been registered by entity service " + + entityHashMap.get(entityPrefixHashcode).getService()); + } + final IndexDefinition[] indexes = entityDef.getIndexes(); + if (indexes != null) { + for (IndexDefinition index : indexes) { + final Integer indexPrefixHashcode = index.getIndexPrefix().hashCode(); + if (entityHashMap.containsKey(indexPrefixHashcode)) { + throw new IllegalArgumentException("Failed to register entity " + entityDef.getClass() + .getName() + ", because of the prefix hash code conflict! The index prefix " + + index.getIndexPrefix() + + " has already been registered by entity " + + entityHashMap.get(indexPrefixHashcode) + .getService()); + } + final Map<Integer, IndexDefinition> indexHashMap = indexPrefixMap + .get(entityDef.getTable()); + if (indexHashMap != null && indexHashMap.containsKey(indexPrefixHashcode) + && (!index.equals(indexHashMap.get(indexPrefixHashcode)))) { + throw new IllegalArgumentException("Failed to register entity " + entityDef.getClass() + .getName() + ", because of the prefix hash code conflict! The index prefix " + + index.getIndexPrefix() + + " has already been registered by entity " + + indexHashMap.get(indexPrefixHashcode) + .getEntityDefinition().getService()); + } + } + } + } + } + + /** + * Get entity definition by name + * + * @param serviceName + * @return + * @throws IllegalAccessException + * @throws InstantiationException + */ + public static EntityDefinition getEntityByServiceName(String serviceName) + throws InstantiationException, IllegalAccessException { + checkInit(); + return entityServiceMap.get(serviceName); + } + + public static EntityDefinition getEntityDefinitionByEntityClass(Class<? extends TaggedLogAPIEntity> clazz) + throws InstantiationException, IllegalAccessException { + checkInit(); + return classMap.get(clazz); + } + + private static void checkInit() throws InstantiationException, IllegalAccessException { + if (!initialized) { + synchronized (EntityDefinitionManager.class) { + if (!initialized) { + EntityRepositoryScanner.scan(); + initialized = true; + } + } + } + } + + public static void load() throws IllegalAccessException, InstantiationException { + checkInit(); + } + + /** + * UserPrincipal can register their own field SerDeser + * + * @param clazz class of the the SerDeser + * @param entitySerDeser entity or field SerDeser + * @throws IllegalArgumentException + */ + public static void registerSerDeser(Class<?> clazz, EntitySerDeser<?> entitySerDeser) { + _serDeserMap.put(clazz, entitySerDeser); + } + + /** + * Check whether the entity class is time series, false by default + * + * @param clazz + * @return + */ + public static boolean isTimeSeries(Class<? extends TaggedLogAPIEntity> clazz) { + TimeSeries ts = clazz.getAnnotation(TimeSeries.class); + return ts != null && ts.value(); + } + + @SuppressWarnings("unchecked") + public static EntityDefinition createEntityDefinition(Class<? extends TaggedLogAPIEntity> cls) { + + final EntityDefinition ed = new EntityDefinition(); + + ed.setEntityClass(cls); + // parse cls' annotations + Table table = cls.getAnnotation(Table.class); + if (table == null || table.value().isEmpty()) { + throw new IllegalArgumentException("Entity class must have a non-empty table name annotated with @Table"); + } + String tableName = table.value(); + if (EagleConfigFactory.load().isTableNamePrefixedWithEnvironment()) { + tableName = EagleConfigFactory.load().getEnv() + "_" + tableName; + } + ed.setTable(tableName); + + ColumnFamily family = cls.getAnnotation(ColumnFamily.class); + if (family == null || family.value().isEmpty()) { + throw new IllegalArgumentException("Entity class must have a non-empty column family name annotated with @ColumnFamily"); + } + ed.setColumnFamily(family.value()); + + Prefix prefix = cls.getAnnotation(Prefix.class); + if (prefix == null || prefix.value().isEmpty()) { + throw new IllegalArgumentException("Entity class must have a non-empty prefix name annotated with @Prefix"); + } + ed.setPrefix(prefix.value()); + + TimeSeries ts = cls.getAnnotation(TimeSeries.class); + if (ts == null) { + throw new IllegalArgumentException("Entity class must have a non-empty timeseries name annotated with @TimeSeries"); + } + ed.setTimeSeries(ts.value()); + + Service service = cls.getAnnotation(Service.class); + if (service == null || service.value().isEmpty()) { + ed.setService(cls.getSimpleName()); + } else { + ed.setService(service.value()); + } + + Metric m = cls.getAnnotation(Metric.class); + Map<String, Class<?>> dynamicFieldTypes = new HashMap<String, Class<?>>(); + if (m != null) { + // metric has to be timeseries + if (!ts.value()) { + throw new IllegalArgumentException("Metric entity must be time series as well"); + } + MetricDefinition md = new MetricDefinition(); + md.setInterval(m.interval()); + ed.setMetricDefinition(md); + } + + java.lang.reflect.Field[] fields = cls.getDeclaredFields(); + for (java.lang.reflect.Field f : fields) { + Column column = f.getAnnotation(Column.class); + if (column == null || column.value().isEmpty()) { + continue; + } + Class<?> fldCls = f.getType(); + // intrusive check field type for metric entity + checkFieldTypeForMetric(ed.getMetricDefinition(), f.getName(), fldCls, dynamicFieldTypes); + Qualifier q = new Qualifier(); + q.setDisplayName(f.getName()); + q.setQualifierName(column.value()); + EntitySerDeser<?> serDeser = _serDeserMap.get(fldCls); + if (serDeser == null) { + // throw new IllegalArgumentException(fldCls.getName() + " in field " + f.getName() + + // " of entity " + cls.getSimpleName() + " has no serializer associated "); + serDeser = DefaultJavaObjctSerDeser.INSTANCE; + } + + q.setSerDeser((EntitySerDeser<Object>)serDeser); + ed.getQualifierNameMap().put(q.getQualifierName(), q); + ed.getDisplayNameMap().put(q.getDisplayName(), q); + // TODO: should refine rules, consider fields like "hCol", getter method should be gethCol() + // according to org.apache.commons.beanutils.PropertyUtils + final String propertyName = f.getName().substring(0, 1).toUpperCase() + f.getName().substring(1); + String getterName = "get" + propertyName; + try { + Method method = cls.getMethod(getterName); + ed.getQualifierGetterMap().put(f.getName(), method); + } catch (Exception e) { + // Check if the type is boolean + getterName = "is" + propertyName; + try { + Method method = cls.getMethod(getterName); + ed.getQualifierGetterMap().put(f.getName(), method); + } catch (Exception e1) { + throw new IllegalArgumentException("Field " + f.getName() + + " hasn't defined valid getter method: " + getterName, + e); + } + } + if (LOG.isDebugEnabled()) { + LOG.debug("Field registered " + q); + } + } + + // TODO: Lazy create because not used at all + // dynamically create bean class + if (ed.getMetricDefinition() != null) { + Class<?> metricCls = createDynamicClassForMetric(cls.getName() + "_SingleTimestamp", + dynamicFieldTypes); + ed.getMetricDefinition().setSingleTimestampEntityClass(metricCls); + } + + final Partition partition = cls.getAnnotation(Partition.class); + if (partition != null) { + final String[] partitions = partition.value(); + ed.setPartitions(partitions); + // Check if partition fields are all tag fields. Partition field can't be column field, must be + // tag field. + for (String part : partitions) { + if (!ed.isTag(part)) { + throw new IllegalArgumentException("Partition field can't be column field, must be tag field. " + + "Partition name: " + part); + } + } + } + + final Indexes indexes = cls.getAnnotation(Indexes.class); + if (indexes != null) { + final Index[] inds = indexes.value(); + final IndexDefinition[] indexDefinitions = new IndexDefinition[inds.length]; + for (int i = 0; i < inds.length; ++i) { + final Index ind = inds[i]; + indexDefinitions[i] = new IndexDefinition(ed, ind); + } + ed.setIndexes(indexDefinitions); + } + + final ServicePath path = cls.getAnnotation(ServicePath.class); + if (path != null) { + if (path.path() != null && (!path.path().isEmpty())) { + ed.setServiceCreationPath(path.path()); + } + } + + final Tags tags = cls.getAnnotation(Tags.class); + if (tags != null) { + String[] tagNames = tags.value(); + ed.setTags(tagNames); + } + + return ed; + } + + private static void checkFieldTypeForMetric(MetricDefinition md, String fieldName, Object fldCls, + Map<String, Class<?>> dynamicFieldTypes) { + if (md != null) { + if (fldCls.equals(int[].class)) { + dynamicFieldTypes.put(fieldName, int.class); + return; + } else if (fldCls.equals(long[].class)) { + dynamicFieldTypes.put(fieldName, long.class); + return; + } else if (fldCls.equals(double[].class)) { + dynamicFieldTypes.put(fieldName, double.class); + return; + } + throw new IllegalArgumentException("Fields for metric entity must be one of int[], long[] or double[]"); + } + } + + private static Class<?> createDynamicClassForMetric(final String className, + Map<String, Class<?>> dynamicFieldTypes) { + BeanGenerator beanGenerator = new BeanGenerator(); + beanGenerator.setNamingPolicy(new NamingPolicy() { + @Override + public String getClassName(String prefix, String source, Object key, Predicate names) { + return className; + } + }); + BeanGenerator.addProperties(beanGenerator, dynamicFieldTypes); + beanGenerator.setSuperclass(TaggedLogAPIEntity.class); + return (Class<?>)beanGenerator.createClass(); + } + + public static Map<String, EntityDefinition> entities() throws Exception { + checkInit(); + return entityServiceMap; + } }
http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/EntitySerDeser.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/EntitySerDeser.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/EntitySerDeser.java index 25d55e0..08caeab 100755 --- a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/EntitySerDeser.java +++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/EntitySerDeser.java @@ -17,7 +17,9 @@ package org.apache.eagle.log.entity.meta; public interface EntitySerDeser<T> { - public T deserialize(byte[] bytes); - public byte[] serialize(T t); - public Class<T> type(); + public T deserialize(byte[] bytes); + + public byte[] serialize(T t); + + public Class<T> type(); } http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/EntitySerDeserializer.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/EntitySerDeserializer.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/EntitySerDeserializer.java index a7ec4e4..1e1ca48 100755 --- a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/EntitySerDeserializer.java +++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/EntitySerDeserializer.java @@ -26,54 +26,54 @@ import java.util.HashMap; import java.util.Map; public class EntitySerDeserializer { - private static final Logger LOG = LoggerFactory.getLogger(EntitySerDeserializer.class); - - // TODO throws seperate exceptions - @SuppressWarnings("unchecked") - public <T> T readValue(Map<String, byte[]> qualifierValues, EntityDefinition ed) throws Exception{ - Class<? extends TaggedLogAPIEntity> clazz = ed.getEntityClass(); - if(clazz == null){ - throw new NullPointerException("Entity class of service "+ed.getService()+" is null"); - } - TaggedLogAPIEntity obj = clazz.newInstance(); - Map<String, Qualifier> map = ed.getQualifierNameMap(); - for(Map.Entry<String, byte[]> entry : qualifierValues.entrySet()){ - Qualifier q = map.get(entry.getKey()); - if(q == null){ - // if it's not pre-defined qualifier, it must be tag unless it's a bug - if(obj.getTags() == null){ - obj.setTags(new HashMap<String, String>()); - } - obj.getTags().put(entry.getKey(), new StringSerDeser().deserialize(entry.getValue())); - continue; - } - - // TODO performance loss compared with new operator - // parse different types of qualifiers - String fieldName = q.getDisplayName(); - PropertyDescriptor pd = PropertyUtils.getPropertyDescriptor(obj, fieldName); - if(entry.getValue() != null){ - Object args = q.getSerDeser().deserialize(entry.getValue()); - pd.getWriteMethod().invoke(obj, args); -// if (logger.isDebugEnabled()) { -// logger.debug(entry.getKey() + ":" + args + " is deserialized"); -// } - } - } - return (T)obj; - } - - public Map<String, byte[]> writeValue(TaggedLogAPIEntity entity, EntityDefinition ed) throws Exception{ - Map<String, byte[]> qualifierValues = new HashMap<String, byte[]>(); - // iterate all modified qualifiers - for(String fieldName : entity.modifiedQualifiers()){ - PropertyDescriptor pd = PropertyUtils.getPropertyDescriptor(entity, fieldName); - Object obj = pd.getReadMethod().invoke(entity); - Qualifier q = ed.getDisplayNameMap().get(fieldName); - EntitySerDeser<Object> ser = q.getSerDeser(); - byte[] value = ser.serialize(obj); - qualifierValues.put(q.getQualifierName(), value); - } - return qualifierValues; - } + private static final Logger LOG = LoggerFactory.getLogger(EntitySerDeserializer.class); + + // TODO throws seperate exceptions + @SuppressWarnings("unchecked") + public <T> T readValue(Map<String, byte[]> qualifierValues, EntityDefinition ed) throws Exception { + Class<? extends TaggedLogAPIEntity> clazz = ed.getEntityClass(); + if (clazz == null) { + throw new NullPointerException("Entity class of service " + ed.getService() + " is null"); + } + TaggedLogAPIEntity obj = clazz.newInstance(); + Map<String, Qualifier> map = ed.getQualifierNameMap(); + for (Map.Entry<String, byte[]> entry : qualifierValues.entrySet()) { + Qualifier q = map.get(entry.getKey()); + if (q == null) { + // if it's not pre-defined qualifier, it must be tag unless it's a bug + if (obj.getTags() == null) { + obj.setTags(new HashMap<String, String>()); + } + obj.getTags().put(entry.getKey(), new StringSerDeser().deserialize(entry.getValue())); + continue; + } + + // TODO performance loss compared with new operator + // parse different types of qualifiers + String fieldName = q.getDisplayName(); + PropertyDescriptor pd = PropertyUtils.getPropertyDescriptor(obj, fieldName); + if (entry.getValue() != null) { + Object args = q.getSerDeser().deserialize(entry.getValue()); + pd.getWriteMethod().invoke(obj, args); + // if (logger.isDebugEnabled()) { + // logger.debug(entry.getKey() + ":" + args + " is deserialized"); + // } + } + } + return (T)obj; + } + + public Map<String, byte[]> writeValue(TaggedLogAPIEntity entity, EntityDefinition ed) throws Exception { + Map<String, byte[]> qualifierValues = new HashMap<String, byte[]>(); + // iterate all modified qualifiers + for (String fieldName : entity.modifiedQualifiers()) { + PropertyDescriptor pd = PropertyUtils.getPropertyDescriptor(entity, fieldName); + Object obj = pd.getReadMethod().invoke(entity); + Qualifier q = ed.getDisplayNameMap().get(fieldName); + EntitySerDeser<Object> ser = q.getSerDeser(); + byte[] value = ser.serialize(obj); + qualifierValues.put(q.getQualifierName(), value); + } + return qualifierValues; + } } http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Index.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Index.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Index.java index c7dc113..d13e550 100755 --- a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Index.java +++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Index.java @@ -21,12 +21,16 @@ import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; -@Target({ElementType.TYPE}) +@Target({ + ElementType.TYPE +}) @Retention(RetentionPolicy.RUNTIME) public @interface Index { public String name(); + public String[] columns(); + public boolean unique(); -// boolean unique() default true; + // boolean unique() default true; } http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/IndexDefinition.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/IndexDefinition.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/IndexDefinition.java index 2e62420..810ad6b 100755 --- a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/IndexDefinition.java +++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/IndexDefinition.java @@ -39,297 +39,304 @@ import org.apache.eagle.query.parser.ORExpression; import org.apache.eagle.common.ByteUtil; /** - * Eagle index schema definition. - * - * 1. Index schema can be defined in entity class by annotation. - * 2. One index schema can contain multiple fields/tags, defined in order - * 3. We only support immutable indexing for now - * 4. When entity is created or deleted, the corresponding index entity should be created or deleted at the same time - * 5. Index transparency to queries. Queries go through index when and only when index can serve all search conditions after query rewrite - * - * + * Eagle index schema definition. 1. Index schema can be defined in entity class by annotation. 2. One index + * schema can contain multiple fields/tags, defined in order 3. We only support immutable indexing for now 4. + * When entity is created or deleted, the corresponding index entity should be created or deleted at the same + * time 5. Index transparency to queries. Queries go through index when and only when index can serve all + * search conditions after query rewrite */ public class IndexDefinition { - - public enum IndexType { - UNIQUE_INDEX, - NON_CLUSTER_INDEX, - NON_INDEX - } - - private final EntityDefinition entityDef; - private final Index index; - private final IndexColumn[] columns; - private final String indexPrefix; - - private static final byte[] EMPTY_VALUE = new byte[0]; - private static final Charset UTF_8_CHARSET = Charset.forName("UTF-8"); - public static final int EMPTY_PARTITION_DEFAULT_HASH_CODE = 0; - public static final int MAX_INDEX_VALUE_BYTE_LENGTH = 65535; - - private static final String FIELD_NAME_PATTERN_STRING = "^@(.*)$"; - private static final Pattern FIELD_NAME_PATTERN = Pattern.compile(FIELD_NAME_PATTERN_STRING); - private final static Logger LOG = LoggerFactory.getLogger(IndexDefinition.class); - - public IndexDefinition(EntityDefinition entityDef, Index index) { - this.entityDef = entityDef; - this.index = index; - this.indexPrefix = entityDef.getPrefix() + "_" + index.name(); - final String[] indexColumns = index.columns(); - this.columns = new IndexColumn[indexColumns.length]; - for (int i = 0; i < indexColumns.length; ++i) { - final String name = indexColumns[i]; - final boolean isTag = entityDef.isTag(name); - final Qualifier qualifier = isTag ? null : entityDef.getDisplayNameMap().get(name); - columns[i] = new IndexColumn(name, isTag, qualifier); - } - LOG.info("Created index " + index.name() + " for " + entityDef.getEntityClass().getSimpleName()); - } - - public EntityDefinition getEntityDefinition() { - return entityDef; - } - - public Index getIndex() { - return index; - } - - public String getIndexName() { - return index.name(); - } - - public IndexColumn[] getIndexColumns() { - return columns; - } - - public String getIndexPrefix() { - return indexPrefix; - } - - public boolean isUnique() { - return index.unique(); - } - - /** - * Check if the query is suitable to go through index. If true, then return the value of index fields in order. Otherwise return null. - * TODO: currently index fields should be string type. - * - * @param query query expression after re-write - * @param rowkeys if the query can go through the index, all rowkeys will be added into rowkeys. - * @return true if the query can go through the index, otherwise return false - */ - public IndexType canGoThroughIndex(ORExpression query, List<byte[]> rowkeys) { - if (query == null || query.getANDExprList() == null || query.getANDExprList().isEmpty()) - return IndexType.NON_CLUSTER_INDEX; - if (rowkeys != null) { - rowkeys.clear(); - } - final Map<String, String> indexfieldMap = new HashMap<String, String>(); - for(ANDExpression andExpr : query.getANDExprList()) { - indexfieldMap.clear(); - for(AtomicExpression ae : andExpr.getAtomicExprList()) { - // TODO temporarily ignore those fields which are not for attributes - final String fieldName = parseEntityAttribute(ae.getKey()); - if(fieldName != null && ComparisonOperator.EQUAL.equals(ae.getOp())){ - indexfieldMap.put(fieldName, ae.getValue()); - } - } - final String[] partitions = entityDef.getPartitions(); - int[] partitionValueHashs = null; - if (partitions != null) { - partitionValueHashs = new int[partitions.length]; - for (int i = 0; i < partitions.length; ++i) { - final String value = indexfieldMap.get(partitions[i]); - if (value == null) { - throw new IllegalArgumentException("Partition " + partitions[i] + " is not defined in the query: " + query.toString()); - } - partitionValueHashs[i] = value.hashCode(); - } - } - final byte[][] indexFieldValues = new byte[columns.length][]; - for (int i = 0; i < columns.length; ++i) { - final IndexColumn col = columns[i]; - if (!indexfieldMap.containsKey(col.getColumnName())) { - // If we have to use scan anyway, there's no need to go through index - return IndexType.NON_INDEX; - } - final String value = indexfieldMap.get(col.getColumnName()); - indexFieldValues[i] = value.getBytes(); - } - final byte[] rowkey = generateUniqueIndexRowkey(indexFieldValues, partitionValueHashs, null); - if (rowkeys != null) { - rowkeys.add(rowkey); - } - } - if (index.unique()) { - return IndexType.UNIQUE_INDEX; - } - return IndexType.NON_CLUSTER_INDEX; - } - - private String parseEntityAttribute(String fieldName) { - Matcher m = FIELD_NAME_PATTERN.matcher(fieldName); - if(m.find()){ - return m.group(1); - } - return null; - } - - // TODO: We should move index rowkey generation later since this class is for general purpose, not only for hbase. - public byte[] generateIndexRowkey(TaggedLogAPIEntity entity) throws IllegalAccessException, InvocationTargetException, NoSuchMethodException { - if (entity.getClass() != entityDef.getEntityClass()) { - throw new IllegalArgumentException("Expected entity class: " + entityDef.getEntityClass().getName() + ", but got class " + entity.getClass().getName()); - } - final byte[][] indexValues = generateIndexValues(entity); - final int[] partitionHashCodes = generatePartitionHashCodes(entity); - SortedMap<Integer, Integer> tagMap = null; - if (!index.unique()) { - // non cluster index - tagMap = RowkeyBuilder.generateSortedTagMap(entityDef.getPartitions(), entity.getTags()); - } - - return generateUniqueIndexRowkey(indexValues, partitionHashCodes, tagMap); - } - - private byte[] generateUniqueIndexRowkey(byte[][] indexValues, int[] partitionHashCodes, SortedMap<Integer, Integer> tagMap) { - final int prefixHashCode = indexPrefix.hashCode(); - int totalLength = 4; - totalLength += (partitionHashCodes != null) ? (4 * partitionHashCodes.length) : 0; - - totalLength += (2 * indexValues.length); - for (int i = 0; i < indexValues.length; ++i) { - final byte[] value = indexValues[i]; - totalLength += value.length; - } - if (tagMap != null && (!tagMap.isEmpty())) { - totalLength += tagMap.size() * 8; - } - - int offset = 0; - final byte[] rowkey = new byte[totalLength]; - - // 1. set prefix - ByteUtil.intToBytes(prefixHashCode, rowkey, offset); - offset += 4; - - // 2. set partition - if (partitionHashCodes != null) { - for (Integer partitionHashCode : partitionHashCodes) { - ByteUtil.intToBytes(partitionHashCode, rowkey, offset); - offset += 4; - } - } - - // 3. set index values - for (int i = 0; i < columns.length; ++i) { - ByteUtil.shortToBytes((short)indexValues[i].length, rowkey, offset); - offset += 2; - for (int j = 0; j < indexValues[i].length; ++j) { - rowkey[offset++] = indexValues[i][j]; - } - } - - // Check if it's non clustered index, then set the tag/value hash code - if (tagMap != null && (!tagMap.isEmpty())) { - // 4. set tag key/value hashes - for (Map.Entry<Integer, Integer> entry : tagMap.entrySet()) { - ByteUtil.intToBytes(entry.getKey(), rowkey, offset); - offset += 4; - ByteUtil.intToBytes(entry.getValue(), rowkey, offset); - offset += 4; - } - } - - return rowkey; - } - - private int[] generatePartitionHashCodes(TaggedLogAPIEntity entity) { - final String[] partitions = entityDef.getPartitions(); - int[] result = null; - if (partitions != null) { - result = new int[partitions.length]; - final Map<String, String> tags = entity.getTags(); - for (int i = 0 ; i < partitions.length; ++i) { - final String partition = partitions[i]; - final String tagValue = tags.get(partition); - if (tagValue != null) { - result[i] = tagValue.hashCode(); - } else { - result[i] = EMPTY_PARTITION_DEFAULT_HASH_CODE; - } - } - } - return result; - } - - private byte[][] generateIndexValues(TaggedLogAPIEntity entity) throws IllegalAccessException, InvocationTargetException, NoSuchMethodException { - - final byte[][] result = new byte[columns.length][]; - for (int i = 0; i < columns.length; ++i) { - final IndexColumn column = columns[i]; - final String columnName = column.getColumnName(); - if (column.isTag) { - final Map<String, String> tags = entity.getTags(); - if (tags == null || tags.get(columnName) == null) { - result[i] = EMPTY_VALUE; - } else { - result[i] = tags.get(columnName).getBytes(UTF_8_CHARSET); - } - } else { - PropertyDescriptor pd = column.getPropertyDescriptor(); - if (pd == null) { - pd = PropertyUtils.getPropertyDescriptor(entity, columnName); - column.setPropertyDescriptor(pd); - } - final Object value = pd.getReadMethod().invoke(entity); - if (value == null) { - result[i] = EMPTY_VALUE; - } else { - final Qualifier q = column.getQualifier(); - result[i] = q.getSerDeser().serialize(value); - } - } - if (result[i].length > MAX_INDEX_VALUE_BYTE_LENGTH) { - throw new IllegalArgumentException("Index field value exceeded the max length: " + MAX_INDEX_VALUE_BYTE_LENGTH + ", actual length: " + result[i].length); - } - } - return result; - } - - /** - * Index column definition class - * - */ - public static class IndexColumn { - private final String columnName; - private final boolean isTag; - private final Qualifier qualifier; - private PropertyDescriptor propertyDescriptor; - - public IndexColumn(String columnName, boolean isTag, Qualifier qualifier) { - this.columnName = columnName; - this.isTag = isTag; - this.qualifier = qualifier; - } - - public String getColumnName() { - return columnName; - } - public boolean isTag() { - return isTag; - } - - public Qualifier getQualifier() { - return qualifier; - } - - public PropertyDescriptor getPropertyDescriptor() { - return propertyDescriptor; - } - - public void setPropertyDescriptor(PropertyDescriptor propertyDescriptor) { - this.propertyDescriptor = propertyDescriptor; - } - - } + + public enum IndexType { + UNIQUE_INDEX, + NON_CLUSTER_INDEX, + NON_INDEX + } + + private final EntityDefinition entityDef; + private final Index index; + private final IndexColumn[] columns; + private final String indexPrefix; + + private static final byte[] EMPTY_VALUE = new byte[0]; + private static final Charset UTF_8_CHARSET = Charset.forName("UTF-8"); + public static final int EMPTY_PARTITION_DEFAULT_HASH_CODE = 0; + public static final int MAX_INDEX_VALUE_BYTE_LENGTH = 65535; + + private static final String FIELD_NAME_PATTERN_STRING = "^@(.*)$"; + private static final Pattern FIELD_NAME_PATTERN = Pattern.compile(FIELD_NAME_PATTERN_STRING); + private static final Logger LOG = LoggerFactory.getLogger(IndexDefinition.class); + + public IndexDefinition(EntityDefinition entityDef, Index index) { + this.entityDef = entityDef; + this.index = index; + this.indexPrefix = entityDef.getPrefix() + "_" + index.name(); + final String[] indexColumns = index.columns(); + this.columns = new IndexColumn[indexColumns.length]; + for (int i = 0; i < indexColumns.length; ++i) { + final String name = indexColumns[i]; + final boolean isTag = entityDef.isTag(name); + final Qualifier qualifier = isTag ? null : entityDef.getDisplayNameMap().get(name); + columns[i] = new IndexColumn(name, isTag, qualifier); + } + LOG.info("Created index " + index.name() + " for " + entityDef.getEntityClass().getSimpleName()); + } + + public EntityDefinition getEntityDefinition() { + return entityDef; + } + + public Index getIndex() { + return index; + } + + public String getIndexName() { + return index.name(); + } + + public IndexColumn[] getIndexColumns() { + return columns; + } + + public String getIndexPrefix() { + return indexPrefix; + } + + public boolean isUnique() { + return index.unique(); + } + + /** + * Check if the query is suitable to go through index. If true, then return the value of index fields in + * order. Otherwise return null. TODO: currently index fields should be string type. + * + * @param query query expression after re-write + * @param rowkeys if the query can go through the index, all rowkeys will be added into rowkeys. + * @return true if the query can go through the index, otherwise return false + */ + public IndexType canGoThroughIndex(ORExpression query, List<byte[]> rowkeys) { + if (query == null || query.getANDExprList() == null || query.getANDExprList().isEmpty()) { + return IndexType.NON_CLUSTER_INDEX; + } + if (rowkeys != null) { + rowkeys.clear(); + } + final Map<String, String> indexfieldMap = new HashMap<String, String>(); + for (ANDExpression andExpr : query.getANDExprList()) { + indexfieldMap.clear(); + for (AtomicExpression ae : andExpr.getAtomicExprList()) { + // TODO temporarily ignore those fields which are not for attributes + final String fieldName = parseEntityAttribute(ae.getKey()); + if (fieldName != null && ComparisonOperator.EQUAL.equals(ae.getOp())) { + indexfieldMap.put(fieldName, ae.getValue()); + } + } + final String[] partitions = entityDef.getPartitions(); + int[] partitionValueHashs = null; + if (partitions != null) { + partitionValueHashs = new int[partitions.length]; + for (int i = 0; i < partitions.length; ++i) { + final String value = indexfieldMap.get(partitions[i]); + if (value == null) { + throw new IllegalArgumentException("Partition " + partitions[i] + + " is not defined in the query: " + + query.toString()); + } + partitionValueHashs[i] = value.hashCode(); + } + } + final byte[][] indexFieldValues = new byte[columns.length][]; + for (int i = 0; i < columns.length; ++i) { + final IndexColumn col = columns[i]; + if (!indexfieldMap.containsKey(col.getColumnName())) { + // If we have to use scan anyway, there's no need to go through index + return IndexType.NON_INDEX; + } + final String value = indexfieldMap.get(col.getColumnName()); + indexFieldValues[i] = value.getBytes(); + } + final byte[] rowkey = generateUniqueIndexRowkey(indexFieldValues, partitionValueHashs, null); + if (rowkeys != null) { + rowkeys.add(rowkey); + } + } + if (index.unique()) { + return IndexType.UNIQUE_INDEX; + } + return IndexType.NON_CLUSTER_INDEX; + } + + private String parseEntityAttribute(String fieldName) { + Matcher m = FIELD_NAME_PATTERN.matcher(fieldName); + if (m.find()) { + return m.group(1); + } + return null; + } + + // TODO: We should move index rowkey generation later since this class is for general purpose, not only + // for hbase. + public byte[] generateIndexRowkey(TaggedLogAPIEntity entity) + throws IllegalAccessException, InvocationTargetException, NoSuchMethodException { + if (entity.getClass() != entityDef.getEntityClass()) { + throw new IllegalArgumentException("Expected entity class: " + + entityDef.getEntityClass().getName() + ", but got class " + + entity.getClass().getName()); + } + final byte[][] indexValues = generateIndexValues(entity); + final int[] partitionHashCodes = generatePartitionHashCodes(entity); + SortedMap<Integer, Integer> tagMap = null; + if (!index.unique()) { + // non cluster index + tagMap = RowkeyBuilder.generateSortedTagMap(entityDef.getPartitions(), entity.getTags()); + } + + return generateUniqueIndexRowkey(indexValues, partitionHashCodes, tagMap); + } + + private byte[] generateUniqueIndexRowkey(byte[][] indexValues, int[] partitionHashCodes, + SortedMap<Integer, Integer> tagMap) { + final int prefixHashCode = indexPrefix.hashCode(); + int totalLength = 4; + totalLength += (partitionHashCodes != null) ? (4 * partitionHashCodes.length) : 0; + + totalLength += (2 * indexValues.length); + for (int i = 0; i < indexValues.length; ++i) { + final byte[] value = indexValues[i]; + totalLength += value.length; + } + if (tagMap != null && (!tagMap.isEmpty())) { + totalLength += tagMap.size() * 8; + } + + int offset = 0; + final byte[] rowkey = new byte[totalLength]; + + // 1. set prefix + ByteUtil.intToBytes(prefixHashCode, rowkey, offset); + offset += 4; + + // 2. set partition + if (partitionHashCodes != null) { + for (Integer partitionHashCode : partitionHashCodes) { + ByteUtil.intToBytes(partitionHashCode, rowkey, offset); + offset += 4; + } + } + + // 3. set index values + for (int i = 0; i < columns.length; ++i) { + ByteUtil.shortToBytes((short)indexValues[i].length, rowkey, offset); + offset += 2; + for (int j = 0; j < indexValues[i].length; ++j) { + rowkey[offset++] = indexValues[i][j]; + } + } + + // Check if it's non clustered index, then set the tag/value hash code + if (tagMap != null && (!tagMap.isEmpty())) { + // 4. set tag key/value hashes + for (Map.Entry<Integer, Integer> entry : tagMap.entrySet()) { + ByteUtil.intToBytes(entry.getKey(), rowkey, offset); + offset += 4; + ByteUtil.intToBytes(entry.getValue(), rowkey, offset); + offset += 4; + } + } + + return rowkey; + } + + private int[] generatePartitionHashCodes(TaggedLogAPIEntity entity) { + final String[] partitions = entityDef.getPartitions(); + int[] result = null; + if (partitions != null) { + result = new int[partitions.length]; + final Map<String, String> tags = entity.getTags(); + for (int i = 0; i < partitions.length; ++i) { + final String partition = partitions[i]; + final String tagValue = tags.get(partition); + if (tagValue != null) { + result[i] = tagValue.hashCode(); + } else { + result[i] = EMPTY_PARTITION_DEFAULT_HASH_CODE; + } + } + } + return result; + } + + private byte[][] generateIndexValues(TaggedLogAPIEntity entity) + throws IllegalAccessException, InvocationTargetException, NoSuchMethodException { + + final byte[][] result = new byte[columns.length][]; + for (int i = 0; i < columns.length; ++i) { + final IndexColumn column = columns[i]; + final String columnName = column.getColumnName(); + if (column.isTag) { + final Map<String, String> tags = entity.getTags(); + if (tags == null || tags.get(columnName) == null) { + result[i] = EMPTY_VALUE; + } else { + result[i] = tags.get(columnName).getBytes(UTF_8_CHARSET); + } + } else { + PropertyDescriptor pd = column.getPropertyDescriptor(); + if (pd == null) { + pd = PropertyUtils.getPropertyDescriptor(entity, columnName); + column.setPropertyDescriptor(pd); + } + final Object value = pd.getReadMethod().invoke(entity); + if (value == null) { + result[i] = EMPTY_VALUE; + } else { + final Qualifier q = column.getQualifier(); + result[i] = q.getSerDeser().serialize(value); + } + } + if (result[i].length > MAX_INDEX_VALUE_BYTE_LENGTH) { + throw new IllegalArgumentException("Index field value exceeded the max length: " + + MAX_INDEX_VALUE_BYTE_LENGTH + ", actual length: " + + result[i].length); + } + } + return result; + } + + /** + * Index column definition class + */ + public static class IndexColumn { + private final String columnName; + private final boolean isTag; + private final Qualifier qualifier; + private PropertyDescriptor propertyDescriptor; + + public IndexColumn(String columnName, boolean isTag, Qualifier qualifier) { + this.columnName = columnName; + this.isTag = isTag; + this.qualifier = qualifier; + } + + public String getColumnName() { + return columnName; + } + + public boolean isTag() { + return isTag; + } + + public Qualifier getQualifier() { + return qualifier; + } + + public PropertyDescriptor getPropertyDescriptor() { + return propertyDescriptor; + } + + public void setPropertyDescriptor(PropertyDescriptor propertyDescriptor) { + this.propertyDescriptor = propertyDescriptor; + } + + } } http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Indexes.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Indexes.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Indexes.java index 3c82a0a..b8ada4a 100644 --- a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Indexes.java +++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Indexes.java @@ -21,9 +21,11 @@ import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; -@Target({ElementType.TYPE}) +@Target({ + ElementType.TYPE +}) @Retention(RetentionPolicy.RUNTIME) public @interface Indexes { - public Index[] value(); + public Index[] value(); } http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/IntArraySerDeser.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/IntArraySerDeser.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/IntArraySerDeser.java index 8831223..3daf4a1 100755 --- a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/IntArraySerDeser.java +++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/IntArraySerDeser.java @@ -18,54 +18,56 @@ package org.apache.eagle.log.entity.meta; import org.apache.eagle.common.ByteUtil; -/** - * serialize int array which is stored like the following - * <int><int>*size, where the first <int> is the size of int +/* + * serialize int array which is stored like the following <int><int>*size, where the first entry is the size + * of int */ -public class IntArraySerDeser implements EntitySerDeser<int[]>{ +public class IntArraySerDeser implements EntitySerDeser<int[]> { + + public IntArraySerDeser() { + } - public IntArraySerDeser(){} + @Override + public int[] deserialize(byte[] bytes) { + if (bytes.length < 4) { + return null; + } + int offset = 0; + // get size of int array + int size = ByteUtil.bytesToInt(bytes, offset); + offset += 4; + int[] values = new int[size]; + for (int i = 0; i < size; i++) { + values[i] = ByteUtil.bytesToInt(bytes, offset); + offset += 4; + } + return values; + } - @Override - public int[] deserialize(byte[] bytes){ - if(bytes.length < 4) - return null; - int offset = 0; - // get size of int array - int size = ByteUtil.bytesToInt(bytes, offset); - offset += 4; - int[] values = new int[size]; - for(int i=0; i<size; i++){ - values[i] = ByteUtil.bytesToInt(bytes, offset); - offset += 4; - } - return values; - } - - /** - * - * @param obj - * @return - */ - @Override - public byte[] serialize(int[] obj){ - if(obj == null) - return null; - int size = obj.length; - byte[] array = new byte[4 + 4*size]; - byte[] first = ByteUtil.intToBytes(size); - int offset = 0; - System.arraycopy(first, 0, array, offset, first.length); - offset += first.length; - for(int i=0; i<size; i++){ - System.arraycopy(ByteUtil.intToBytes(obj[i]), 0, array, offset, 4); - offset += 4; - } - return array; - } + /** + * @param obj + * @return + */ + @Override + public byte[] serialize(int[] obj) { + if (obj == null) { + return null; + } + int size = obj.length; + byte[] array = new byte[4 + 4 * size]; + byte[] first = ByteUtil.intToBytes(size); + int offset = 0; + System.arraycopy(first, 0, array, offset, first.length); + offset += first.length; + for (int i = 0; i < size; i++) { + System.arraycopy(ByteUtil.intToBytes(obj[i]), 0, array, offset, 4); + offset += 4; + } + return array; + } - @Override - public Class<int[]> type() { - return int[].class; - } + @Override + public Class<int[]> type() { + return int[].class; + } } http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/IntSerDeser.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/IntSerDeser.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/IntSerDeser.java index 695badd..8353499 100755 --- a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/IntSerDeser.java +++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/IntSerDeser.java @@ -18,25 +18,28 @@ package org.apache.eagle.log.entity.meta; import org.apache.eagle.common.ByteUtil; -public class IntSerDeser implements EntitySerDeser<Integer>{ - public IntSerDeser(){} +public class IntSerDeser implements EntitySerDeser<Integer> { + public IntSerDeser() { + } - @Override - public Integer deserialize(byte[] bytes){ - if(bytes.length < 4) - return null; - return Integer.valueOf(ByteUtil.bytesToInt(bytes)); - } - - @Override - public byte[] serialize(Integer obj){ - if(obj == null) - return null; - return ByteUtil.intToBytes(obj); - } + @Override + public Integer deserialize(byte[] bytes) { + if (bytes.length < 4) { + return null; + } + return Integer.valueOf(ByteUtil.bytesToInt(bytes)); + } - @Override - public Class<Integer> type() { - return Integer.class; - } + @Override + public byte[] serialize(Integer obj) { + if (obj == null) { + return null; + } + return ByteUtil.intToBytes(obj); + } + + @Override + public Class<Integer> type() { + return Integer.class; + } } http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/ListSerDeser.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/ListSerDeser.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/ListSerDeser.java index eaf5e92..b77f3ff 100644 --- a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/ListSerDeser.java +++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/ListSerDeser.java @@ -25,104 +25,104 @@ import org.apache.eagle.common.ByteUtil; /** * Serialization/deserialization for map type - * */ @SuppressWarnings("rawtypes") public class ListSerDeser implements EntitySerDeser<List> { - @SuppressWarnings({ "unchecked" }) - @Override - public List deserialize(byte[] bytes) { - if (bytes == null || bytes.length == 0) { - return null; - } - final List list = new ArrayList(); - int offset = 0; - // get size of int array - final int size = ByteUtil.bytesToInt(bytes, offset); - offset += 4; - - for (int i = 0; i < size; ++i) { - final int valueID = ByteUtil.bytesToInt(bytes, offset); - offset += 4; - final Class<?> valueClass = EntityDefinitionManager.getClassByID(valueID); - if (valueClass == null) { - throw new IllegalArgumentException("Unsupported value type ID: " + valueID); - } - final EntitySerDeser valueSerDer = EntityDefinitionManager.getSerDeser(valueClass); - final int valueLength = ByteUtil.bytesToInt(bytes, offset); - offset += 4; - final byte[] valueContent = new byte[valueLength]; - System.arraycopy(bytes, offset, valueContent, 0, valueLength); - offset += valueLength; - final Object value = valueSerDer.deserialize(valueContent); - - list.add(value); - } - return list; - } + @SuppressWarnings({ + "unchecked" + }) + @Override + public List deserialize(byte[] bytes) { + if (bytes == null || bytes.length == 0) { + return null; + } + final List list = new ArrayList(); + int offset = 0; + // get size of int array + final int size = ByteUtil.bytesToInt(bytes, offset); + offset += 4; - /** - * size + value1 type id + value length + value1 binary content + ... - * 4B 4B 4B value1 bytes - */ - @SuppressWarnings({ "unchecked" }) - @Override - public byte[] serialize(List list) { - if(list == null) - return null; - final int size = list.size(); - final int[] valueIDs = new int[size]; - final byte[][] valueBytes = new byte[size][]; - - int totalSize = 4 + size * 8; - int i = 0; - Iterator iter = list.iterator(); - while (iter.hasNext()) { - final Object value = iter.next(); - Class<?> valueClass = value.getClass(); - int valueTypeID = EntityDefinitionManager.getIDBySerDerClass(valueClass); + for (int i = 0; i < size; ++i) { + final int valueID = ByteUtil.bytesToInt(bytes, offset); + offset += 4; + final Class<?> valueClass = EntityDefinitionManager.getClassByID(valueID); + if (valueClass == null) { + throw new IllegalArgumentException("Unsupported value type ID: " + valueID); + } + final EntitySerDeser valueSerDer = EntityDefinitionManager.getSerDeser(valueClass); + final int valueLength = ByteUtil.bytesToInt(bytes, offset); + offset += 4; + final byte[] valueContent = new byte[valueLength]; + System.arraycopy(bytes, offset, valueContent, 0, valueLength); + offset += valueLength; + final Object value = valueSerDer.deserialize(valueContent); - if (valueTypeID == -1) { - if (value instanceof List) { - valueClass = List.class; - valueTypeID = EntityDefinitionManager.getIDBySerDerClass(valueClass); - } - else if (value instanceof Map) { - valueClass = Map.class; - valueTypeID = EntityDefinitionManager.getIDBySerDerClass(valueClass); - } - else { - throw new IllegalArgumentException("Unsupported class: " + valueClass.getName()); - } - } - valueIDs[i] = valueTypeID; - final EntitySerDeser valueSerDer = EntityDefinitionManager.getSerDeser(valueClass); - if (valueSerDer == null) { - throw new IllegalArgumentException("Unsupported class: " + valueClass.getName()); - } - valueBytes[i] = valueSerDer.serialize(value); - totalSize += valueBytes[i].length; - ++i; - } - final byte[] result = new byte[totalSize]; - int offset = 0; - ByteUtil.intToBytes(size, result, offset); - offset += 4; - for (i = 0; i < size; ++i) { - ByteUtil.intToBytes(valueIDs[i], result, offset); - offset += 4; - ByteUtil.intToBytes(valueBytes[i].length, result, offset); - offset += 4; - System.arraycopy(valueBytes[i], 0, result, offset, valueBytes[i].length); - offset += valueBytes[i].length; - } - return result; - } + list.add(value); + } + return list; + } - @Override - public Class<List> type() { - return List.class; - } -} + /** + * size + value1 type id + value length + value1 binary content + ... 4B 4B 4B value1 bytes + */ + @SuppressWarnings({ + "unchecked" + }) + @Override + public byte[] serialize(List list) { + if (list == null) { + return null; + } + final int size = list.size(); + final int[] valueIDs = new int[size]; + final byte[][] valueBytes = new byte[size][]; + + int totalSize = 4 + size * 8; + int i = 0; + Iterator iter = list.iterator(); + while (iter.hasNext()) { + final Object value = iter.next(); + Class<?> valueClass = value.getClass(); + int valueTypeID = EntityDefinitionManager.getIDBySerDerClass(valueClass); + if (valueTypeID == -1) { + if (value instanceof List) { + valueClass = List.class; + valueTypeID = EntityDefinitionManager.getIDBySerDerClass(valueClass); + } else if (value instanceof Map) { + valueClass = Map.class; + valueTypeID = EntityDefinitionManager.getIDBySerDerClass(valueClass); + } else { + throw new IllegalArgumentException("Unsupported class: " + valueClass.getName()); + } + } + valueIDs[i] = valueTypeID; + final EntitySerDeser valueSerDer = EntityDefinitionManager.getSerDeser(valueClass); + if (valueSerDer == null) { + throw new IllegalArgumentException("Unsupported class: " + valueClass.getName()); + } + valueBytes[i] = valueSerDer.serialize(value); + totalSize += valueBytes[i].length; + ++i; + } + final byte[] result = new byte[totalSize]; + int offset = 0; + ByteUtil.intToBytes(size, result, offset); + offset += 4; + for (i = 0; i < size; ++i) { + ByteUtil.intToBytes(valueIDs[i], result, offset); + offset += 4; + ByteUtil.intToBytes(valueBytes[i].length, result, offset); + offset += 4; + System.arraycopy(valueBytes[i], 0, result, offset, valueBytes[i].length); + offset += valueBytes[i].length; + } + return result; + } + + @Override + public Class<List> type() { + return List.class; + } +} http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/LongSerDeser.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/LongSerDeser.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/LongSerDeser.java index 914cd95..6f0c6ab 100755 --- a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/LongSerDeser.java +++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/LongSerDeser.java @@ -18,26 +18,29 @@ package org.apache.eagle.log.entity.meta; import org.apache.eagle.common.ByteUtil; -public class LongSerDeser implements EntitySerDeser<Long>{ - public LongSerDeser(){} +public class LongSerDeser implements EntitySerDeser<Long> { + public LongSerDeser() { + } - @Override - public Long deserialize(byte[] bytes){ - if(bytes.length < 8) - return null; -// return new Long(ByteUtil.bytesToLong(bytes)); - return Long.valueOf(ByteUtil.bytesToLong(bytes)); - } - - @Override - public byte[] serialize(Long obj){ - if(obj == null) - return null; - return ByteUtil.longToBytes(obj); - } + @Override + public Long deserialize(byte[] bytes) { + if (bytes.length < 8) { + return null; + } + // return new Long(ByteUtil.bytesToLong(bytes)); + return Long.valueOf(ByteUtil.bytesToLong(bytes)); + } - @Override - public Class<Long> type() { - return Long.class; - } + @Override + public byte[] serialize(Long obj) { + if (obj == null) { + return null; + } + return ByteUtil.longToBytes(obj); + } + + @Override + public Class<Long> type() { + return Long.class; + } }
