http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ffd84fa0/eagle-core/eagle-query/eagle-storage-jdbc/pom.xml ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-storage-jdbc/pom.xml b/eagle-core/eagle-query/eagle-storage-jdbc/pom.xml index c099e98..c37247e 100644 --- a/eagle-core/eagle-query/eagle-storage-jdbc/pom.xml +++ b/eagle-core/eagle-query/eagle-storage-jdbc/pom.xml @@ -51,5 +51,9 @@ <groupId>org.apache.torque</groupId> <artifactId>torque-runtime</artifactId> </dependency> + <dependency> + <groupId>org.apache.derby</groupId> + <artifactId>derby</artifactId> + </dependency> </dependencies> </project>
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ffd84fa0/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/JdbcConstants.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/JdbcConstants.java b/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/JdbcConstants.java index 7fb4dbd..af5be8f 100644 --- a/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/JdbcConstants.java +++ b/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/JdbcConstants.java @@ -16,6 +16,8 @@ */ package org.apache.eagle.storage.jdbc; +import java.sql.Types; + /** * Jdbc Storage Constants */ @@ -25,6 +27,9 @@ public class JdbcConstants { public static final String METRIC_NAME_COLUMN_NAME = "metric"; public static final String ROW_KEY_COLUMN_NAME = "uuid"; + public static final int DEFAULT_TYPE_FOR_COMPLEX_TYPE = Types.BLOB; + public static final int DEFAULT_VARCHAR_SIZE =1024; + // Eagle JDBC Storage Configuration public final static String EAGLE_DB_USERNAME = "eagle.service.storage-username"; public final static String EAGLE_DB_PASSWORD = "eagle.service.storage-password"; @@ -34,4 +39,8 @@ public class JdbcConstants { public final static String EAGLE_DATABASE= "eagle.service.storage-database"; public final static String EAGLE_DRIVER_CLASS= "eagle.service.storage-driver-class"; public final static String EAGLE_CONN_MAX_SIZE= "eagle.service.storage-connection-max"; + + public static final boolean isReservedField(String columnName){ + return TIMESTAMP_COLUMN_NAME.equals(columnName) || METRIC_NAME_COLUMN_NAME.equals(columnName) || ROW_KEY_COLUMN_NAME.equals(columnName); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ffd84fa0/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/JdbcStorage.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/JdbcStorage.java b/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/JdbcStorage.java index 66b3c5a..c490dcd 100644 --- a/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/JdbcStorage.java +++ b/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/JdbcStorage.java @@ -30,13 +30,17 @@ import org.apache.eagle.storage.jdbc.entity.impl.JdbcEntityUpdaterImpl; import org.apache.eagle.storage.jdbc.entity.impl.JdbcEntityWriterImpl; import org.apache.eagle.storage.jdbc.schema.JdbcEntityDefinition; import org.apache.eagle.storage.jdbc.schema.JdbcEntityDefinitionManager; +import org.apache.eagle.storage.jdbc.schema.JdbcEntitySchemaManager; import org.apache.eagle.storage.operation.CompiledQuery; import org.apache.eagle.storage.result.ModifyResult; import org.apache.eagle.storage.result.QueryResult; +import org.apache.torque.ConstraintViolationException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.sql.SQLIntegrityConstraintViolationException; +import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -52,8 +56,9 @@ public class JdbcStorage extends DataStorageBase { try { JdbcEntityDefinitionManager.load(); ConnectionManagerFactory.getInstance(); + JdbcEntitySchemaManager.getInstance().init(); } catch (Exception e) { - LOG.error("Failed to initialize connection manager",e); + LOG.error("Failed to start connection manager",e); throw new IOException(e); } } @@ -70,14 +75,14 @@ public class JdbcStorage extends DataStorageBase { } catch (Exception e) { LOG.error(e.getMessage(), e); result.setSuccess(false); - throw new IOException(e); + throw new IOException(e.getCause()); } return result; } @Override public <E extends TaggedLogAPIEntity> ModifyResult<String> create(List<E> entities, EntityDefinition entityDefinition) throws IOException { - ModifyResult<String> result = new ModifyResult<String>(); + ModifyResult<String> result = new ModifyResult<>(); try { JdbcEntityDefinition jdbcEntityDefinition = JdbcEntityDefinitionManager.getJdbcEntityDefinition(entityDefinition); JdbcEntityWriter writer = new JdbcEntityWriterImpl(jdbcEntityDefinition); @@ -86,9 +91,9 @@ public class JdbcStorage extends DataStorageBase { result.setSize(keys.size()); result.setSuccess(true); } catch (Exception e) { - LOG.error(e.getMessage(), e); + LOG.error(e.getMessage(), e.getCause()); result.setSuccess(false); - throw new IOException(e); + throw new IOException(e.getCause()); } return result; } @@ -105,7 +110,7 @@ public class JdbcStorage extends DataStorageBase { } catch (Exception e) { LOG.error(e.getMessage(), e); result.setSuccess(false); - throw new IOException(e); + throw new IOException(e.getCause()); } return result; } @@ -123,7 +128,7 @@ public class JdbcStorage extends DataStorageBase { } catch (Exception e) { LOG.error(e.getMessage(), e); result.setSuccess(false); - throw new IOException(e); + throw new IOException(e.getCause()); } return result; } @@ -140,7 +145,7 @@ public class JdbcStorage extends DataStorageBase { } catch (Exception e) { LOG.error(e.getMessage(), e); result.setSuccess(false); - throw new IOException(e); + throw new IOException(e.getCause()); } return result; } @@ -170,7 +175,7 @@ public class JdbcStorage extends DataStorageBase { } catch (Exception e) { LOG.error(e.getMessage(), e); result.setSuccess(false); - throw new IOException(e); + throw new IOException(e.getCause()); } return result; } @@ -195,7 +200,7 @@ public class JdbcStorage extends DataStorageBase { } catch (Exception e) { LOG.error(e.getMessage(), e); result.setSuccess(false); - throw new IOException(e); + throw new IOException(e.getCause()); } return result; } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ffd84fa0/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/conn/impl/EncodedRowkeyPrimaryKeyBuilder.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/conn/impl/EncodedRowkeyPrimaryKeyBuilder.java b/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/conn/impl/EncodedRowkeyPrimaryKeyBuilder.java new file mode 100644 index 0000000..4ab8f56 --- /dev/null +++ b/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/conn/impl/EncodedRowkeyPrimaryKeyBuilder.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.storage.jdbc.conn.impl; + + +import org.apache.eagle.common.EagleBase64Wrapper; +import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity; +import org.apache.eagle.log.entity.meta.EntityDefinition; +import org.apache.eagle.log.entity.meta.EntityDefinitionManager; +import org.apache.eagle.log.entity.old.RowkeyHelper; +import org.apache.eagle.storage.jdbc.conn.PrimaryKeyBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class EncodedRowkeyPrimaryKeyBuilder implements PrimaryKeyBuilder<String> { + private static final Logger LOG = LoggerFactory.getLogger(EncodedRowkeyPrimaryKeyBuilder.class); + + @Override + public <T> String build(T t) { + if(t == null) return null; + + try { + EntityDefinition entityDefinition + = EntityDefinitionManager.getEntityDefinitionByEntityClass((Class<? extends TaggedLogAPIEntity>) t.getClass()); + return EagleBase64Wrapper.encodeByteArray2URLSafeString(RowkeyHelper.getRowkey((TaggedLogAPIEntity) t,entityDefinition)); + } catch (Exception e) { + LOG.error("Got error to build rowKey for {}",t,e); + throw new RuntimeException("Got error to build rowKey",e); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ffd84fa0/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/conn/impl/TorqueStatementPeerImpl.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/conn/impl/TorqueStatementPeerImpl.java b/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/conn/impl/TorqueStatementPeerImpl.java index 69e1192..833d7ae 100644 --- a/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/conn/impl/TorqueStatementPeerImpl.java +++ b/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/conn/impl/TorqueStatementPeerImpl.java @@ -45,7 +45,8 @@ public class TorqueStatementPeerImpl<T> implements StatementExecutor { this.basePeer.setTableMap(tableMap); } - private static PrimaryKeyBuilder<String> _primaryKeyBuilderInstance = new UUIDPrimaryKeyBuilder();; + private static PrimaryKeyBuilder<String> _primaryKeyBuilderInstance = new EncodedRowkeyPrimaryKeyBuilder(); + @Override public PrimaryKeyBuilder<String> getPrimaryKeyBuilder() { return _primaryKeyBuilderInstance; http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ffd84fa0/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/criteria/impl/ExpressionCriterionBuilder.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/criteria/impl/ExpressionCriterionBuilder.java b/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/criteria/impl/ExpressionCriterionBuilder.java index ec92849..b3eb462 100644 --- a/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/criteria/impl/ExpressionCriterionBuilder.java +++ b/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/criteria/impl/ExpressionCriterionBuilder.java @@ -19,6 +19,7 @@ package org.apache.eagle.storage.jdbc.criteria.impl; import org.apache.eagle.log.entity.EntityQualifierUtils; import org.apache.eagle.storage.jdbc.criteria.CriterionBuilder; import org.apache.eagle.query.parser.*; +import org.apache.eagle.storage.jdbc.schema.JdbcEntityDefinition; import org.apache.torque.ColumnImpl; import org.apache.torque.criteria.Criterion; import org.apache.torque.criteria.SqlEnum; @@ -33,10 +34,12 @@ import java.util.regex.Matcher; public class ExpressionCriterionBuilder implements CriterionBuilder { private final String tableName; private final ORExpression expression; + private final JdbcEntityDefinition jdbcEntityDefinition; - public ExpressionCriterionBuilder(ORExpression expression,String tableName) { + public ExpressionCriterionBuilder(ORExpression expression, JdbcEntityDefinition entityDefinition) { this.expression = expression; - this.tableName = tableName; + this.tableName = entityDefinition.getJdbcTableName(); + this.jdbcEntityDefinition = entityDefinition; } @Override @@ -65,23 +68,58 @@ public class ExpressionCriterionBuilder implements CriterionBuilder { } private Criterion toAtomicCriterion(AtomicExpression atomic){ - Object left = toColumn(atomic.getKeyType(), atomic.getKey(),atomic.getOp()); - Object right = toColumn(atomic.getValueType(), atomic.getValue(),atomic.getOp()); + Class<?> columnType = locateColumnType(atomic); + Object left = toColumn(atomic.getKeyType(), atomic.getKey(),atomic.getOp(),columnType); + Object right = toColumn(atomic.getValueType(), atomic.getValue(), atomic.getOp(),columnType); SqlEnum op = toSqlEnum(atomic.getOp()); return new Criterion(left,right,op); } - private Object toColumn(TokenType tokenType,String value,ComparisonOperator op){ - if(op.equals(ComparisonOperator.CONTAINS) && tokenType.equals(TokenType.STRING)){ - return "%"+value+"%"; - }else if(tokenType.equals(TokenType.ID)){ - return new ColumnImpl(this.tableName,parseEntityAttribute(value)); - }else if(!tokenType.equals(TokenType.ID) && op.equals(ComparisonOperator.IN)){ + private Class<?> locateColumnType(AtomicExpression atomic) { + String columnName = null; + if(atomic.getKeyType().equals(TokenType.ID)){ + columnName = parseEntityAttribute(atomic.getKey()); + }else if(atomic.getValueType().equals(TokenType.ID)){ + columnName = parseEntityAttribute(atomic.getValue()); + } + if(jdbcEntityDefinition.getInternal().getDisplayNameMap().containsKey(columnName)){ + try { + return jdbcEntityDefinition.getColumnType(columnName); + } catch (NoSuchFieldException e) { + throw new RuntimeException(e); + } + }else{ + return null; + } + } + + /** + * this place is used for rewriting query for jdbc connection + * @param tokenType + * @param value + * @param op + * @return + */ + private Object toColumn(TokenType tokenType,String value,ComparisonOperator op, Class<?> columnType) { + if (op.equals(ComparisonOperator.CONTAINS) && tokenType.equals(TokenType.STRING)) { + return "%" + value + "%"; + } else if (tokenType.equals(TokenType.ID)) { + return new ColumnImpl(this.tableName, parseEntityAttribute(value)); + } else if (!tokenType.equals(TokenType.ID) && op.equals(ComparisonOperator.IN)) { return EntityQualifierUtils.parseList(value); - }else if(tokenType.equals(TokenType.NUMBER)){ + } else if (tokenType.equals(TokenType.NUMBER)) { // TODO: currently only treat all number value as double - return Double.parseDouble(value); - }else{ + if(columnType.equals(Long.class)) { + return Long.parseLong(value); + } else { + return Double.parseDouble(value); + } + } else if (op.equals(ComparisonOperator.LIKE) && value.equals(".*")){ + return "%"; + } else{ + if((boolean.class.equals(columnType) || Boolean.class.equals(columnType)) && value != null){ + return Boolean.valueOf(value); + } // TODO: parse type according entity field type return value; } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ffd84fa0/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/criteria/impl/QueryCriteriaBuilder.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/criteria/impl/QueryCriteriaBuilder.java b/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/criteria/impl/QueryCriteriaBuilder.java index 4d590de..a82d185 100644 --- a/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/criteria/impl/QueryCriteriaBuilder.java +++ b/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/criteria/impl/QueryCriteriaBuilder.java @@ -23,6 +23,7 @@ import org.apache.eagle.query.parser.ORExpression; import org.apache.eagle.storage.jdbc.criteria.CriteriaBuilder; import org.apache.eagle.storage.jdbc.criteria.CriterionBuilder; import org.apache.eagle.storage.jdbc.JdbcConstants; +import org.apache.eagle.storage.jdbc.schema.JdbcEntityDefinition; import org.apache.eagle.storage.operation.CompiledQuery; import org.apache.torque.ColumnImpl; import org.apache.torque.criteria.Criteria; @@ -38,10 +39,18 @@ public class QueryCriteriaBuilder implements CriteriaBuilder { private final CompiledQuery query; private final String tableName; + private final Boolean limitEnabled; + private final JdbcEntityDefinition jdbcEntityDefinition; - public QueryCriteriaBuilder(CompiledQuery query, String tableName){ + public QueryCriteriaBuilder(CompiledQuery query, JdbcEntityDefinition entityDefinition){ + this(query,entityDefinition,true); + } + + public QueryCriteriaBuilder(CompiledQuery query, JdbcEntityDefinition entityDefinition, Boolean limitEnabled){ this.query = query; - this.tableName = tableName; + this.tableName = entityDefinition.getJdbcTableName(); + this.limitEnabled = limitEnabled; + this.jdbcEntityDefinition = entityDefinition; } @Override @@ -79,6 +88,12 @@ public class QueryCriteriaBuilder implements CriteriaBuilder { } } + // If no columns are specified, then select * by default + if(root.getSelectColumns() == null || root.getSelectColumns().size() ==0){ + // SELECT * + root.addSelectColumn(new ColumnImpl(this.tableName, "*")); + } + // FROM $tableName root.addFrom(this.tableName); @@ -87,7 +102,7 @@ public class QueryCriteriaBuilder implements CriteriaBuilder { .and(new Criterion(new ColumnImpl(this.tableName, JdbcConstants.TIMESTAMP_COLUMN_NAME),query.getEndTime(), SqlEnum.LESS_THAN)); ORExpression expression = searchCondition.getQueryExpression(); if(expression!=null){ - CriterionBuilder criterionBuilder = new ExpressionCriterionBuilder(expression,tableName); + CriterionBuilder criterionBuilder = new ExpressionCriterionBuilder(expression,this.jdbcEntityDefinition); where = where.and(criterionBuilder.build()); } @@ -98,8 +113,10 @@ public class QueryCriteriaBuilder implements CriteriaBuilder { root.where(where); - // LIMITED BY $pageSize - root.setLimit((int) searchCondition.getPageSize()); + if(this.limitEnabled) { + // LIMITED BY $pageSize + root.setLimit((int) searchCondition.getPageSize()); + } // TODO: GROUP BY if(query.isHasAgg()){ http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ffd84fa0/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/entity/JdbcEntitySerDeserHelper.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/entity/JdbcEntitySerDeserHelper.java b/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/entity/JdbcEntitySerDeserHelper.java index 651ee19..3750f71 100644 --- a/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/entity/JdbcEntitySerDeserHelper.java +++ b/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/entity/JdbcEntitySerDeserHelper.java @@ -28,6 +28,8 @@ import org.apache.commons.beanutils.PropertyUtils; import org.apache.torque.ColumnImpl; import org.apache.torque.util.ColumnValues; import org.apache.torque.util.JdbcTypedValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.beans.PropertyDescriptor; import java.io.IOException; @@ -44,6 +46,7 @@ import java.util.Map; * @since 3/26/15 */ public class JdbcEntitySerDeserHelper { + private final static Logger LOG = LoggerFactory.getLogger(JdbcEntitySerDeserHelper.class); /** * * @param row @@ -55,7 +58,7 @@ public class JdbcEntitySerDeserHelper { * @throws InvocationTargetException * @throws NoSuchMethodException */ - public static <E extends TaggedLogAPIEntity> E buildEntity(Map<String, Object> row, JdbcEntityDefinition entityDefinition) throws IllegalAccessException, InstantiationException, InvocationTargetException, NoSuchMethodException { + public static <E extends TaggedLogAPIEntity> E buildEntity(Map<String, Object> row, JdbcEntityDefinition entityDefinition) throws IOException { EntityDefinition ed = entityDefinition.getInternal(); Class<? extends TaggedLogAPIEntity> clazz = ed.getEntityClass(); @@ -63,24 +66,35 @@ public class JdbcEntitySerDeserHelper { throw new NullPointerException("Entity class of service "+ed.getService()+" is null"); } - TaggedLogAPIEntity obj = clazz.newInstance(); - Map<String, Qualifier> map = ed.getDisplayNameMap(); + TaggedLogAPIEntity obj = null; + try { + obj = clazz.newInstance(); + } catch (Exception e) { + LOG.error(e.getMessage(),e.getCause()); + throw new IOException(e); + } + Map<String, Qualifier> rawmap = ed.getDisplayNameMap(); + // rdbms may contains field which is not case insensitive, we need convert all into lower case + Map<String, Qualifier> map = new HashMap<>(); + for(Map.Entry<String, Qualifier> e : rawmap.entrySet()){ + map.put(e.getKey().toLowerCase(), e.getValue()); + } for(Map.Entry<String, Object> entry : row.entrySet()){ // timestamp; - if(JdbcConstants.TIMESTAMP_COLUMN_NAME.equals(entry.getKey())){ + if(JdbcConstants.TIMESTAMP_COLUMN_NAME.equalsIgnoreCase(entry.getKey())){ obj.setTimestamp((Long) entry.getValue()); continue; } // set metric as prefix for generic metric if(entityDefinition.getInternal().getService().equals(GenericMetricEntity.GENERIC_METRIC_SERVICE) && - JdbcConstants.METRIC_NAME_COLUMN_NAME.equals(entry.getKey())){ + JdbcConstants.METRIC_NAME_COLUMN_NAME.equalsIgnoreCase(entry.getKey())){ obj.setPrefix((String) entry.getValue()); continue; } // rowkey: uuid - if(JdbcConstants.ROW_KEY_COLUMN_NAME.equals(entry.getKey())){ + if(JdbcConstants.ROW_KEY_COLUMN_NAME.equalsIgnoreCase(entry.getKey())){ obj.setEncodedRowkey((String) entry.getValue()); continue; } @@ -91,16 +105,37 @@ public class JdbcEntitySerDeserHelper { if(obj.getTags() == null){ obj.setTags(new HashMap<String, String>()); } - obj.getTags().put(entry.getKey(), (String) entry.getValue()); + // get normalized tag name, not efficient, but we need make it work first + String key = null; + if(ed.getTags() != null) { + for (String tag : ed.getTags()) { + if (tag.equalsIgnoreCase(entry.getKey())) { + key = tag; + break; + } + } + } + try { + obj.getTags().put(key == null ? entry.getKey() : key, (String) entry.getValue()); + }catch (ClassCastException ex){ + LOG.error("Tag value {} = {} is not String",key,entry.getValue(),ex); + throw ex; + } continue; } // parse different types of qualifiers String fieldName = q.getDisplayName(); // PropertyDescriptor pd = PropertyUtils.getPropertyDescriptor(obj, fieldName); - PropertyDescriptor pd = getPropertyDescriptor(obj,fieldName); - if(entry.getValue() != null){ - pd.getWriteMethod().invoke(obj, entry.getValue()); + PropertyDescriptor pd = null; + try { + pd = getPropertyDescriptor(obj,fieldName); + if(entry.getValue() != null) { + pd.getWriteMethod().invoke(obj, entry.getValue()); + } + } catch (Exception ex){ + LOG.error("Failed to set value {} = {}",fieldName,entry.getValue(),ex); + throw new IOException(String.format("Failed to set value %s = %s",fieldName,entry.getValue()),ex); } } @@ -166,7 +201,19 @@ public class JdbcEntitySerDeserHelper { if(serDeser==null){ throw new IOException("SQLSerDeser for column: "+columnName+" is null"); } - Object value = serDeser.readValue(resultSet, columnName, entityDefinition); + Object value; + + if (entityDefinition.isField(columnName)) { + try { + value = serDeser.toJavaTypedValue(resultSet, entityDefinition.getColumnType(columnName), columnName, entityDefinition.getColumnQualifier(columnName)); + } catch (NoSuchFieldException e) { + LOG.error("No field {} in entity {}", columnName, entityDefinition.getInternal().getEntityClass()); + throw new IOException(String.format("No field %s in entity %s", columnName, entityDefinition.getInternal().getEntityClass()), e); + } + }else{ + // treat as tag or others + value = resultSet.getObject(columnName); + } row.put(columnName,value); } return row; @@ -205,7 +252,8 @@ public class JdbcEntitySerDeserHelper { Class<?> fieldType = qualifier.getSerDeser().type(); JdbcSerDeser jdbcSerDeser = JdbcEntityDefinitionManager.getJdbcSerDeser(fieldType); - JdbcTypedValue jdbcTypedValue = jdbcSerDeser.getJdbcTypedValue(fieldValue, fieldType); + + JdbcTypedValue jdbcTypedValue = jdbcSerDeser.toJdbcTypedValue(fieldValue, fieldType, qualifier); columnValues.put(new ColumnImpl(tableName,displayName),jdbcTypedValue); } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ffd84fa0/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/entity/JdbcEntityUtils.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/entity/JdbcEntityUtils.java b/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/entity/JdbcEntityUtils.java new file mode 100644 index 0000000..b2788ea --- /dev/null +++ b/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/entity/JdbcEntityUtils.java @@ -0,0 +1,26 @@ +package org.apache.eagle.storage.jdbc.entity; + +import org.apache.eagle.log.entity.meta.Qualifier; +import org.apache.eagle.storage.jdbc.schema.JdbcEntityDefinition; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +public class JdbcEntityUtils { + public static Qualifier getColumnQualifier(JdbcEntityDefinition entityDefinition,String columnName){ + return entityDefinition.getInternal().getDisplayNameMap().get(columnName); + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ffd84fa0/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/entity/JdbcEntityWriter.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/entity/JdbcEntityWriter.java b/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/entity/JdbcEntityWriter.java index 072174e..eb5a074 100644 --- a/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/entity/JdbcEntityWriter.java +++ b/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/entity/JdbcEntityWriter.java @@ -33,5 +33,5 @@ public interface JdbcEntityWriter<E extends TaggedLogAPIEntity> { * @return primary keys' list * @throws Exception */ - public List<String> write(List<E> entities) throws Exception; + List<String> write(List<E> entities) throws Exception; } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ffd84fa0/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/entity/impl/JdbcEntityDeleterImpl.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/entity/impl/JdbcEntityDeleterImpl.java b/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/entity/impl/JdbcEntityDeleterImpl.java index 35e1078..4ebd792 100644 --- a/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/entity/impl/JdbcEntityDeleterImpl.java +++ b/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/entity/impl/JdbcEntityDeleterImpl.java @@ -61,7 +61,7 @@ public class JdbcEntityDeleterImpl<E extends TaggedLogAPIEntity> implements Jdbc @Override public int deleteByQuery(CompiledQuery query) throws Exception { - QueryCriteriaBuilder criteriaBuilder = new QueryCriteriaBuilder(query,this.jdbcEntityDefinition.getJdbcTableName()); + QueryCriteriaBuilder criteriaBuilder = new QueryCriteriaBuilder(query,this.jdbcEntityDefinition,false); Criteria criteria = criteriaBuilder.build(); return deleteByCriteria(criteria); } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ffd84fa0/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/entity/impl/JdbcEntityReaderImpl.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/entity/impl/JdbcEntityReaderImpl.java b/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/entity/impl/JdbcEntityReaderImpl.java index d0d6a61..a7b93b5 100644 --- a/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/entity/impl/JdbcEntityReaderImpl.java +++ b/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/entity/impl/JdbcEntityReaderImpl.java @@ -24,6 +24,7 @@ import org.apache.eagle.storage.jdbc.entity.JdbcEntityReader; import org.apache.eagle.storage.jdbc.schema.JdbcEntityDefinition; import org.apache.eagle.storage.operation.CompiledQuery; import org.apache.commons.lang.time.StopWatch; +import org.apache.torque.ColumnImpl; import org.apache.torque.criteria.Criteria; import org.apache.torque.om.mapper.RecordMapper; import org.apache.torque.sql.SqlBuilder; @@ -46,9 +47,10 @@ public class JdbcEntityReaderImpl implements JdbcEntityReader { this.jdbcEntityDefinition = jdbcEntityDefinition; } + @Override @SuppressWarnings("unchecked") public <E extends Object> List<E> query(CompiledQuery query) throws Exception { - QueryCriteriaBuilder criteriaBuilder = new QueryCriteriaBuilder(query,this.jdbcEntityDefinition.getJdbcTableName()); + QueryCriteriaBuilder criteriaBuilder = new QueryCriteriaBuilder(query,this.jdbcEntityDefinition); Criteria criteria = criteriaBuilder.build(); String displaySql = SqlBuilder.buildQuery(criteria).getDisplayString(); @@ -88,6 +90,7 @@ public class JdbcEntityReaderImpl implements JdbcEntityReader { try { stopWatch.start(); TorqueStatementPeerImpl peer = ConnectionManagerFactory.getInstance().getStatementExecutor(); + criteria.addSelectColumn(new ColumnImpl(jdbcEntityDefinition.getJdbcTableName(),"*")); result = peer.delegate().doSelect(criteria, recordMapper); LOG.info(String.format("Read %s records in %s ms (sql: %s)",result.size(),stopWatch.getTime(),displaySql)); }catch (Exception ex){ http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ffd84fa0/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/entity/impl/JdbcEntityUpdaterImpl.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/entity/impl/JdbcEntityUpdaterImpl.java b/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/entity/impl/JdbcEntityUpdaterImpl.java index 6f9bf3d..4baa216 100644 --- a/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/entity/impl/JdbcEntityUpdaterImpl.java +++ b/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/entity/impl/JdbcEntityUpdaterImpl.java @@ -16,7 +16,10 @@ */ package org.apache.eagle.storage.jdbc.entity.impl; +import org.apache.commons.configuration.ConfigurationFactory; import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity; +import org.apache.eagle.log.entity.meta.EntityDefinitionManager; +import org.apache.eagle.log.entity.old.RowkeyHelper; import org.apache.eagle.storage.jdbc.conn.ConnectionManager; import org.apache.eagle.storage.jdbc.conn.ConnectionManagerFactory; import org.apache.eagle.storage.jdbc.conn.impl.TorqueStatementPeerImpl; @@ -33,6 +36,7 @@ import org.slf4j.LoggerFactory; import java.sql.Connection; import java.util.Arrays; +import java.util.Collections; import java.util.List; /** @@ -59,7 +63,11 @@ public class JdbcEntityUpdaterImpl<E extends TaggedLogAPIEntity> implements Jdbc try { for (E entity : entities) { String primaryKey = entity.getEncodedRowkey(); - PrimaryKeyCriteriaBuilder pkBuilder = new PrimaryKeyCriteriaBuilder(Arrays.asList(primaryKey), this.jdbcEntityDefinition.getJdbcTableName()); + if(primaryKey==null) { + primaryKey = ConnectionManagerFactory.getInstance().getStatementExecutor().getPrimaryKeyBuilder().build(entity); + entity.setEncodedRowkey(primaryKey); + } + PrimaryKeyCriteriaBuilder pkBuilder = new PrimaryKeyCriteriaBuilder(Collections.singletonList(primaryKey), this.jdbcEntityDefinition.getJdbcTableName()); Criteria selectCriteria = pkBuilder.build(); if(LOG.isDebugEnabled()) LOG.debug("Updating by query: "+SqlBuilder.buildQuery(selectCriteria).getDisplayString()); ColumnValues columnValues = JdbcEntitySerDeserHelper.buildColumnValues(entity, this.jdbcEntityDefinition); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ffd84fa0/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/entity/impl/JdbcEntityWriterImpl.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/entity/impl/JdbcEntityWriterImpl.java b/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/entity/impl/JdbcEntityWriterImpl.java index d1dc9bd..de15384 100644 --- a/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/entity/impl/JdbcEntityWriterImpl.java +++ b/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/entity/impl/JdbcEntityWriterImpl.java @@ -20,17 +20,22 @@ import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity; import org.apache.eagle.storage.jdbc.conn.ConnectionManager; import org.apache.eagle.storage.jdbc.conn.ConnectionManagerFactory; import org.apache.eagle.storage.jdbc.conn.impl.TorqueStatementPeerImpl; +import org.apache.eagle.storage.jdbc.criteria.impl.PrimaryKeyCriteriaBuilder; import org.apache.eagle.storage.jdbc.entity.JdbcEntitySerDeserHelper; import org.apache.eagle.storage.jdbc.entity.JdbcEntityWriter; import org.apache.eagle.storage.jdbc.schema.JdbcEntityDefinition; import org.apache.commons.lang.time.StopWatch; +import org.apache.torque.ConstraintViolationException; +import org.apache.torque.criteria.Criteria; import org.apache.torque.om.ObjectKey; +import org.apache.torque.sql.SqlBuilder; import org.apache.torque.util.ColumnValues; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.sql.Connection; import java.util.ArrayList; +import java.util.Collections; import java.util.List; /** @@ -68,17 +73,33 @@ public class JdbcEntityWriterImpl<E extends TaggedLogAPIEntity> implements JdbcE entity.setEncodedRowkey(peer.getPrimaryKeyBuilder().build(entity)); ColumnValues columnValues = JdbcEntitySerDeserHelper.buildColumnValues(entity, this.jdbcEntityDefinition); - // TODO: implement batch insert for better performance - ObjectKey key = peer.delegate().doInsert(columnValues,connection); - + ObjectKey key = null; try { + // TODO: implement batch insert for better performance + key = peer.delegate().doInsert(columnValues,connection); + if (key != null) { keys.add((String) key.getValue()); } else { keys.add(entity.getEncodedRowkey()); } } catch (ClassCastException ex) { + assert key != null; throw new RuntimeException("Key is not in type of String (VARCHAR) , but JdbcType (java.sql.Types): " + key.getJdbcType() + ", value: " + key.getValue(), ex); + } catch (ConstraintViolationException e){ + // Override with updating if duplicated key exception + if(e.getMessage().contains("The statement was aborted because it would have caused a duplicate key value in a unique or primary key constraint or unique index identified by")){ + String primaryKey = entity.getEncodedRowkey(); + if(primaryKey==null) { + primaryKey = ConnectionManagerFactory.getInstance().getStatementExecutor().getPrimaryKeyBuilder().build(entity); + entity.setEncodedRowkey(primaryKey); + } + PrimaryKeyCriteriaBuilder pkBuilder = new PrimaryKeyCriteriaBuilder(Collections.singletonList(primaryKey), this.jdbcEntityDefinition.getJdbcTableName()); + Criteria selectCriteria = pkBuilder.build(); + if(LOG.isDebugEnabled()) LOG.debug("Updating by query: "+ SqlBuilder.buildQuery(selectCriteria).getDisplayString()); + peer.delegate().doUpdate(selectCriteria, columnValues, connection); + keys.add(primaryKey); + } } } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ffd84fa0/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/schema/IJdbcEntityDDLManager.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/schema/IJdbcEntityDDLManager.java b/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/schema/IJdbcEntityDDLManager.java new file mode 100644 index 0000000..9e3f10b --- /dev/null +++ b/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/schema/IJdbcEntityDDLManager.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.storage.jdbc.schema; + +/** + * @since 3/27/15 + */ +public interface IJdbcEntityDDLManager { + void init(); + void reinit(); + void shutdown(); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ffd84fa0/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/schema/JdbcEntityDefinition.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/schema/JdbcEntityDefinition.java b/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/schema/JdbcEntityDefinition.java index 16cee99..eb5e874 100644 --- a/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/schema/JdbcEntityDefinition.java +++ b/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/schema/JdbcEntityDefinition.java @@ -19,8 +19,12 @@ package org.apache.eagle.storage.jdbc.schema; import org.apache.eagle.log.entity.GenericMetricEntity; import org.apache.eagle.log.entity.meta.EntityDefinition; import org.apache.eagle.log.entity.meta.Qualifier; +import org.apache.eagle.storage.jdbc.JdbcConstants; import org.apache.eagle.storage.jdbc.schema.serializer.JdbcSerDeser; +import java.util.Map; +import java.util.Set; + /** * @since 3/26/15 */ @@ -52,42 +56,42 @@ public class JdbcEntityDefinition { } public Class<?> getColumnType(String fieldName) throws NoSuchFieldException { - return internal.getEntityClass().getField(fieldName).getType(); + if (fieldName.equals(JdbcConstants.TIMESTAMP_COLUMN_NAME)){ + return Long.class; + }else if(fieldName.equals(JdbcConstants.ROW_KEY_COLUMN_NAME)) { + return String.class; + }else if(fieldName.equals(JdbcConstants.METRIC_NAME_COLUMN_NAME)){ + return String.class; + } + for(String realField:internal.getDisplayNameMap().keySet()){ + if(realField.equalsIgnoreCase(fieldName)){ + return internal.getEntityClass().getDeclaredField(realField).getType(); + } + } + throw new NoSuchFieldException(fieldName); } - /** - * - * TODO: Generate table schema DDL according entity definition - * - * @link https://db.apache.org/ddlutils/ - * - * CREATE TABLE ${prefix}${tableName}{ - * prefix prefix; - * encodedRowkey varchar; - * intField1 int; - * longField bitint; - * tag varchar; - * } PRIMARY KEY(encodedRowkey); - * - * CREATE TABLE ${metricTable}{ - * encodedRowkey varchar; - * prefix varchar; - * intField1 int; - * longField bitint; - * tag varchar; - * } PRIMARY KEY(rowkey,prefix); - * - * @param tagsFields - * @return - */ - @SuppressWarnings("unused") - public String getJdbcSchemaDDL(String[] tagsFields){ - throw new RuntimeException("TODO: not implemented yet"); + public Class<?> getColumnTypeOrNull(String fieldName){ + try { + return getColumnType(fieldName); + } catch (NoSuchFieldException e) { + return null; + } + } + + public Integer getJdbcColumnTypeCodeOrNull(String fieldName){ + Class<?> columnType; + try { + columnType = getColumnType(fieldName); + return JdbcEntityDefinitionManager.getJdbcType(columnType); + } catch (NoSuchFieldException e) { + return null; + } } @SuppressWarnings("unchecked") public JdbcSerDeser getJdbcSerDeser(String columnName) { - Qualifier qualifier = this.internal.getQualifierNameMap().get(columnName); + Qualifier qualifier = this.getColumnQualifier(columnName); if(qualifier == null){ return JdbcEntityDefinitionManager.DEFAULT_JDBC_SERDESER; }else { @@ -98,4 +102,22 @@ public class JdbcEntityDefinition { public boolean isGenericMetric(){ return this.internal.getEntityClass().equals(GenericMetricEntity.class); } + + public boolean isField(String columnName){ + for(String name:this.internal.getDisplayNameMap().keySet()){ + if(name.equalsIgnoreCase(columnName)){ + return true; + } + } + return false; + } + + public Qualifier getColumnQualifier(String columnName) { + for(Map.Entry<String,Qualifier> entry:this.internal.getDisplayNameMap().entrySet()){ + if(entry.getKey().equalsIgnoreCase(columnName)){ + return entry.getValue(); + } + } + return null; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ffd84fa0/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/schema/JdbcEntityDefinitionManager.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/schema/JdbcEntityDefinitionManager.java b/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/schema/JdbcEntityDefinitionManager.java index cfad442..cf163fb 100644 --- a/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/schema/JdbcEntityDefinitionManager.java +++ b/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/schema/JdbcEntityDefinitionManager.java @@ -18,9 +18,12 @@ package org.apache.eagle.storage.jdbc.schema; import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity; import org.apache.eagle.log.entity.meta.EntityDefinition; +import org.apache.eagle.log.entity.meta.EntityDefinitionManager; import org.apache.eagle.log.entity.meta.EntitySerDeser; import org.apache.eagle.storage.jdbc.schema.serializer.JdbcSerDeser; import org.apache.eagle.storage.jdbc.schema.serializer.DefaultJdbcSerDeser; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.sql.Types; import java.util.HashMap; @@ -33,9 +36,13 @@ import java.util.Map; * @since 3/27/15 */ public class JdbcEntityDefinitionManager { + private final static Logger LOG = LoggerFactory.getLogger(JdbcEntityDefinitionManager.class); private final static Map<Class<? extends TaggedLogAPIEntity>,JdbcEntityDefinition> sqlEntityDefinitionCache = new HashMap<Class<? extends TaggedLogAPIEntity>,JdbcEntityDefinition>(); + private static Boolean initialized = false; public static JdbcEntityDefinition getJdbcEntityDefinition(EntityDefinition entityDefinition){ + checkInit(); + Class<? extends TaggedLogAPIEntity> entityClass = entityDefinition.getEntityClass(); JdbcEntityDefinition jdbcEntityDefinition = sqlEntityDefinitionCache.get(entityClass); if(jdbcEntityDefinition == null){ @@ -45,9 +52,37 @@ public class JdbcEntityDefinitionManager { return jdbcEntityDefinition; } + public static Map<Class<? extends TaggedLogAPIEntity>,JdbcEntityDefinition> getJdbcEntityDefinitionMap(){ + checkInit(); + return sqlEntityDefinitionCache; + } + + public static JdbcEntityDefinition getJdbcEntityDefinition(Class<? extends TaggedLogAPIEntity> clazz) throws IllegalAccessException, InstantiationException { + checkInit(); + return getJdbcEntityDefinition(EntityDefinitionManager.getEntityDefinitionByEntityClass(clazz)); + } + + private static void checkInit(){ + if (!initialized) { + try { + Map<String,EntityDefinition> entries = EntityDefinitionManager.entities(); + for (Map.Entry<String, EntityDefinition> entry : entries.entrySet()) { + Class<? extends TaggedLogAPIEntity> entityClass = entry.getValue().getEntityClass(); + JdbcEntityDefinition jdbcEntityDefinition = sqlEntityDefinitionCache.get(entityClass); + if(jdbcEntityDefinition == null){ + jdbcEntityDefinition = new JdbcEntityDefinition(entry.getValue()); + sqlEntityDefinitionCache.put(entityClass, jdbcEntityDefinition); + } + } + initialized = true; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } + public static void load(){ - // TODO: 1. load all SQLEntityDefinition on init - // TODO: 2. do more initializing works + checkInit(); } public static DefaultJdbcSerDeser DEFAULT_JDBC_SERDESER = new DefaultJdbcSerDeser(); @@ -83,15 +118,19 @@ public class JdbcEntityDefinitionManager { * @see java.sql.Types * * @param fieldType entity field type class - * @return java.sql.Types + * @return java.sql.Types, return Types.NULL if not found */ public static Integer getJdbcType(Class<?> fieldType) { - if(!_classJdbcType.containsKey(fieldType)){ - throw new IllegalArgumentException("Unable to locate jdbc type for: "+fieldType); + if(fieldType == null){ + return Types.NULL; + } else if(!_classJdbcType.containsKey(fieldType)){ + LOG.debug("Unable to locate simple jdbc type for: {}, return type as JAVA_OBJECT",fieldType); + return Types.JAVA_OBJECT; } return _classJdbcType.get(fieldType); } + /** * Register fieldType with SQL types * @@ -119,7 +158,10 @@ public class JdbcEntityDefinitionManager { registerJdbcType(double.class, Types.DOUBLE); registerJdbcType(long.class, Types.BIGINT); registerJdbcType(short.class, Types.INTEGER); - registerJdbcType(char[].class, Types.VARCHAR); + // registerJdbcType(char[].class, Types.VARCHAR); registerJdbcType(char.class, Types.CHAR); + + registerJdbcType(Boolean.class, Types.BOOLEAN); + registerJdbcType(boolean.class, Types.BOOLEAN); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ffd84fa0/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/schema/JdbcEntitySchemaManager.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/schema/JdbcEntitySchemaManager.java b/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/schema/JdbcEntitySchemaManager.java new file mode 100644 index 0000000..8489da7 --- /dev/null +++ b/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/schema/JdbcEntitySchemaManager.java @@ -0,0 +1,224 @@ +package org.apache.eagle.storage.jdbc.schema; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.ddlutils.Platform; +import org.apache.ddlutils.PlatformFactory; +import org.apache.ddlutils.model.*; +import org.apache.eagle.log.entity.GenericMetricEntity; +import org.apache.eagle.log.entity.meta.Qualifier; +import org.apache.eagle.storage.jdbc.JdbcConstants; +import org.apache.eagle.storage.jdbc.conn.ConnectionConfig; +import org.apache.eagle.storage.jdbc.conn.ConnectionConfigFactory; +import org.apache.eagle.storage.jdbc.conn.ConnectionManagerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Types; +import java.util.Collection; +import java.util.Map; + +public class JdbcEntitySchemaManager implements IJdbcEntityDDLManager { + private final static Logger LOG = LoggerFactory.getLogger(JdbcEntitySchemaManager.class); + private Database database; + private Platform platform; + + private static IJdbcEntityDDLManager instance; + + private JdbcEntitySchemaManager(){ + instance = null; + ConnectionConfig config = ConnectionConfigFactory.getFromEagleConfig(); + this.platform = PlatformFactory.createNewPlatformInstance(config.getAdapter()); + Connection connection = null; + try { + connection = ConnectionManagerFactory.getInstance().getConnection(); + this.database = platform.readModelFromDatabase(connection,config.getDatabaseName()); + LOG.info("Loaded "+database); + } catch (Exception e) { + throw new RuntimeException(e.getMessage(),e); + } finally { + if(connection!=null){ + try { + connection.close(); + } catch (SQLException e) { + LOG.warn(e.getMessage(),e); + } + } + } + } + + public static IJdbcEntityDDLManager getInstance(){ + if(instance == null){ + instance = new JdbcEntitySchemaManager(); + } + return instance; + } + + @Override + public void init() { + Connection connection = null; + try { + Database _database = identifyNewTables(); + if(_database.getTableCount() >0 ) { + LOG.info("Creating {} new tables (totally {} tables)", _database.getTableCount(),database.getTableCount()); + connection = ConnectionManagerFactory.getInstance().getConnection(); + this.platform.createTables(connection,_database, false, true); + LOG.info("Created {} new tables: ",_database.getTableCount(),_database.getTables()); + } else { + LOG.debug("All the {} tables have already been created, no new tables", database.getTableCount()); + } + } catch (Exception e) { + LOG.error(e.getMessage(),e); + throw new IllegalStateException(e); + } finally { + if(connection != null){ + try { + connection.close(); + } catch (SQLException e) { + LOG.warn(e.getMessage(),e); + } + } + } + } + + private Database identifyNewTables(){ + Database _database = new Database(); + _database.setName(database.getName()); + Collection<JdbcEntityDefinition> entityDefinitions = JdbcEntityDefinitionManager.getJdbcEntityDefinitionMap().values(); + LOG.info("Initializing database and creating tables"); + for (JdbcEntityDefinition entityDefinition : entityDefinitions) { + if (database.findTable(entityDefinition.getJdbcTableName()) == null) { + Table table = createTable(entityDefinition); + LOG.info("Creating {}", table.toVerboseString()); + _database.addTable(table); + database.addTable(table); + } else { + LOG.debug("Table {} already exists", entityDefinition.getJdbcTableName()); + } + } + return _database; + } + + @Override + public void reinit(){ + Connection connection = null; + try { + identifyNewTables(); + connection = ConnectionManagerFactory.getInstance().getConnection(); + this.platform.createTables(connection,database, true, true); + } catch (Exception e) { + LOG.error(e.getMessage(),e); + throw new IllegalStateException(e); + } finally { + if(connection != null){ + try { + connection.close(); + } catch (SQLException e) { + LOG.warn(e.getMessage(),e); + } + } + } + } + + @Override + public void shutdown() { + this.platform.shutdownDatabase(); + } + + private Table createTable(JdbcEntityDefinition entityDefinition){ + Table table = new Table(); + table.setName(entityDefinition.getJdbcTableName()); + buildTable(entityDefinition,table); + return table; + } + + private Column createTagColumn(String tagName){ + Column tagColumn = new Column(); + tagColumn.setName(tagName); + tagColumn.setTypeCode(Types.VARCHAR); + tagColumn.setJavaName(tagName); +// tagColumn.setScale(1024); + tagColumn.setSize(String.valueOf(JdbcConstants.DEFAULT_VARCHAR_SIZE)); + tagColumn.setDefaultValue(null); + tagColumn.setDescription("eagle entity tag column for "+tagName); + return tagColumn; + } + + private void buildTable(JdbcEntityDefinition entityDefinition, Table table){ + // METRIC + if(entityDefinition.getInternal().getService() + .equals(GenericMetricEntity.GENERIC_METRIC_SERVICE)){ + Column metricColumn = new Column(); + metricColumn.setName(JdbcConstants.METRIC_NAME_COLUMN_NAME); + metricColumn.setTypeCode(Types.VARCHAR); +// metricColumn.setSizeAndScale(1024,1024); + metricColumn.setDescription("eagle entity metric column"); + table.addColumn(metricColumn); + } + + // ROWKEY + Column pkColumn = new Column(); + pkColumn.setName(JdbcConstants.ROW_KEY_COLUMN_NAME); + pkColumn.setPrimaryKey(true); + pkColumn.setRequired(true); + pkColumn.setTypeCode(Types.VARCHAR); + pkColumn.setSize("1024"); +// pkColumn.setSizeAndScale(1024,10240); + + pkColumn.setDescription("eagle entity row-key column"); + table.addColumn(pkColumn); + + // TIMESTAMP + Column tsColumn = new Column(); + tsColumn.setName(JdbcConstants.TIMESTAMP_COLUMN_NAME); + tsColumn.setTypeCode(Types.BIGINT); + tsColumn.setDescription("eagle entity timestamp column"); + table.addColumn(tsColumn); + + // TAGS + if(entityDefinition.getInternal().getTags() != null) { +// Index index = new UniqueIndex(); + for (String tag : entityDefinition.getInternal().getTags()) { + Column tagColumn = createTagColumn(tag); + tagColumn.setSize(String.valueOf(JdbcConstants.DEFAULT_VARCHAR_SIZE)); + table.addColumn(tagColumn); +// IndexColumn indexColumn = new IndexColumn(); +// indexColumn.setName(tag); +// indexColumn.setOrdinalPosition(0); +// index.addColumn(indexColumn); +// index.setName(entityDefinition.getJdbcTableName()+"_tags_unique_index"); + } +// TODO: enable index when experiencing performance issue on tag filtering. +// table.addIndex(index); + } + + for(Map.Entry<String,Qualifier> entry: entityDefinition.getInternal().getDisplayNameMap().entrySet()){ + Column fieldColumn = new Column(); + fieldColumn.setName(entry.getKey()); + fieldColumn.setJavaName(entry.getKey()); + Integer typeCode = entityDefinition.getJdbcColumnTypeCodeOrNull(entry.getKey()); + typeCode = typeCode == null? Types.VARCHAR:typeCode; + if(typeCode == Types.VARCHAR) fieldColumn.setSize(String.valueOf(JdbcConstants.DEFAULT_VARCHAR_SIZE)); + fieldColumn.setTypeCode(typeCode); + fieldColumn.setDescription("eagle field column "+entry.getKey()+":"+entityDefinition.getColumnTypeOrNull(entry.getKey())); + table.addColumn(fieldColumn); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ffd84fa0/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/schema/ddl/JdbcEntityDdlManager.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/schema/ddl/JdbcEntityDdlManager.java b/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/schema/ddl/JdbcEntityDdlManager.java deleted file mode 100644 index 1e622a5..0000000 --- a/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/schema/ddl/JdbcEntityDdlManager.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.storage.jdbc.schema.ddl; - -/** - * @since 3/27/15 - */ -public interface JdbcEntityDdlManager { - void init(); -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ffd84fa0/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/schema/ddl/package-info.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/schema/ddl/package-info.java b/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/schema/ddl/package-info.java deleted file mode 100644 index b732b31..0000000 --- a/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/schema/ddl/package-info.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * Manage RDBMS schemas according java entity definition - * - * TODO: not implemented yet - * - * @since 3/31/15 - */ -package org.apache.eagle.storage.jdbc.schema.ddl; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ffd84fa0/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/schema/package-info.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/schema/package-info.java b/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/schema/package-info.java new file mode 100644 index 0000000..adc653a --- /dev/null +++ b/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/schema/package-info.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Manage RDBMS schemas according java entity definition + * + * TODO: not implemented yet + * + * @since 3/31/15 + */ +package org.apache.eagle.storage.jdbc.schema; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ffd84fa0/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/schema/serializer/DefaultJdbcSerDeser.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/schema/serializer/DefaultJdbcSerDeser.java b/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/schema/serializer/DefaultJdbcSerDeser.java index f096d5f..3414e8c 100644 --- a/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/schema/serializer/DefaultJdbcSerDeser.java +++ b/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/schema/serializer/DefaultJdbcSerDeser.java @@ -16,13 +16,15 @@ */ package org.apache.eagle.storage.jdbc.schema.serializer; -import org.apache.eagle.storage.jdbc.schema.JdbcEntityDefinition; +import org.apache.eagle.log.entity.meta.Qualifier; +import org.apache.eagle.storage.jdbc.JdbcConstants; import org.apache.eagle.storage.jdbc.schema.JdbcEntityDefinitionManager; import org.apache.torque.util.JdbcTypedValue; import java.io.IOException; import java.sql.ResultSet; import java.sql.SQLException; +import java.sql.Types; /** * @since 3/26/15 @@ -30,12 +32,19 @@ import java.sql.SQLException; public class DefaultJdbcSerDeser<T,R> implements JdbcSerDeser<T> { @Override - public T readValue(ResultSet result, String fieldName, JdbcEntityDefinition JdbcEntityDefinition) throws IOException { + public T toJavaTypedValue(ResultSet result, Class<?> fieldType, String fieldName, Qualifier qualifier) throws IOException { + int jdbcType = JdbcEntityDefinitionManager.getJdbcType(fieldType); try { - Object val = result.getObject(fieldName); - return (T) val; + if(Types.JAVA_OBJECT == jdbcType){ + byte[] bytes = result.getBytes(fieldName); + return (T) qualifier.getSerDeser().deserialize(bytes); + } else if(Types.BOOLEAN == jdbcType){ + return (T) new Boolean(result.getBoolean(fieldName)); + } else { + return (T) result.getObject(fieldName); + } } catch (SQLException e) { - throw new IOException(e); + throw new IOException("Field: "+fieldName+", java type:"+fieldType+", jdbc type: "+jdbcType,e); } } @@ -46,7 +55,13 @@ public class DefaultJdbcSerDeser<T,R> implements JdbcSerDeser<T> { * @return */ @Override - public JdbcTypedValue getJdbcTypedValue(Object fieldValue, Class<?> fieldType) { - return new JdbcTypedValue(fieldValue, JdbcEntityDefinitionManager.getJdbcType(fieldType)); + public JdbcTypedValue toJdbcTypedValue(Object fieldValue, Class<?> fieldType, Qualifier qualifier) { + int jdbcTypeCode = JdbcEntityDefinitionManager.getJdbcType(fieldType); + if(Types.JAVA_OBJECT == jdbcTypeCode){ + byte[] bytes = qualifier.getSerDeser().serialize(fieldValue); + return new JdbcTypedValue(bytes, JdbcConstants.DEFAULT_TYPE_FOR_COMPLEX_TYPE); + } else { + return new JdbcTypedValue(fieldValue, jdbcTypeCode); + } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ffd84fa0/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/schema/serializer/JdbcSerDeser.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/schema/serializer/JdbcSerDeser.java b/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/schema/serializer/JdbcSerDeser.java index 3d4f793..ffc5ea2 100644 --- a/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/schema/serializer/JdbcSerDeser.java +++ b/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/schema/serializer/JdbcSerDeser.java @@ -16,6 +16,7 @@ */ package org.apache.eagle.storage.jdbc.schema.serializer; +import org.apache.eagle.log.entity.meta.Qualifier; import org.apache.eagle.storage.jdbc.schema.JdbcEntityDefinition; import org.apache.torque.util.JdbcTypedValue; @@ -26,6 +27,21 @@ import java.sql.ResultSet; * @since 3/26/15 */ public interface JdbcSerDeser<T> { - T readValue(ResultSet result, String fieldName, JdbcEntityDefinition JdbcEntityDefinition) throws IOException; - JdbcTypedValue getJdbcTypedValue(Object fieldValue, Class<?> fieldType); + /** + * for entity read + * convert value from jdbc storage into user-typed object + * @param result + * @return + * @throws IOException + */ + T toJavaTypedValue(ResultSet result, Class<?> fieldType, String fieldName, Qualifier qualifier) throws IOException; + + /** + * for write entity + * convert user-typed fieldValue into fieldType-compatible value and persist that value into jdbc storage + * @param fieldValue + * @param fieldType + * @return + */ + JdbcTypedValue toJdbcTypedValue(Object fieldValue, Class<?> fieldType, Qualifier qualifier); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ffd84fa0/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/schema/serializer/JsonJdbcSerDeser.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/schema/serializer/JsonJdbcSerDeser.java b/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/schema/serializer/JsonJdbcSerDeser.java index cece94d..5a934b8 100644 --- a/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/schema/serializer/JsonJdbcSerDeser.java +++ b/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/schema/serializer/JsonJdbcSerDeser.java @@ -16,7 +16,7 @@ */ package org.apache.eagle.storage.jdbc.schema.serializer; -import org.apache.eagle.storage.jdbc.schema.JdbcEntityDefinition; +import org.apache.eagle.log.entity.meta.Qualifier; import org.apache.torque.util.JdbcTypedValue; import org.codehaus.jackson.map.ObjectMapper; @@ -33,21 +33,19 @@ public class JsonJdbcSerDeser<T extends Object> implements JdbcSerDeser<T> { @SuppressWarnings("unchecked") @Override - public T readValue(ResultSet result, String fieldName, JdbcEntityDefinition jdbcEntityDefinition) throws IOException { + public T toJavaTypedValue(ResultSet result, Class<?> fieldType, String fieldName, Qualifier qualifier) throws IOException { try { String jsonString = result.getString(fieldName); - return (T) objectMapper.readValue(jsonString, jdbcEntityDefinition.getColumnType(fieldName)); + return (T) objectMapper.readValue(jsonString, fieldType); } catch (IOException e) { throw e; } catch (SQLException e) { throw new IOException(e); - } catch (NoSuchFieldException e) { - throw new IOException(e); } } @Override - public JdbcTypedValue getJdbcTypedValue(Object fieldValue, Class<?> fieldType) { + public JdbcTypedValue toJdbcTypedValue(Object fieldValue, Class<?> fieldType, Qualifier qualifier) { try { return new JdbcTypedValue(objectMapper.writeValueAsString(objectMapper.writeValueAsString(fieldValue)), Types.VARCHAR); } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ffd84fa0/eagle-core/eagle-query/eagle-storage-jdbc/src/test/java/org/apache/eagle/storage/jdbc/TestJdbcStorage.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-storage-jdbc/src/test/java/org/apache/eagle/storage/jdbc/TestJdbcStorage.java b/eagle-core/eagle-query/eagle-storage-jdbc/src/test/java/org/apache/eagle/storage/jdbc/TestJdbcStorage.java index c0ba13c..0b4178e 100644 --- a/eagle-core/eagle-query/eagle-storage-jdbc/src/test/java/org/apache/eagle/storage/jdbc/TestJdbcStorage.java +++ b/eagle-core/eagle-query/eagle-storage-jdbc/src/test/java/org/apache/eagle/storage/jdbc/TestJdbcStorage.java @@ -23,7 +23,6 @@ import org.apache.eagle.log.entity.meta.EntityDefinition; import org.apache.eagle.log.entity.meta.EntityDefinitionManager; import org.apache.eagle.log.entity.test.TestTimeSeriesAPIEntity; import org.apache.eagle.storage.DataStorageManager; -import org.apache.eagle.storage.exception.IllegalDataStorageTypeException; import org.apache.eagle.storage.exception.QueryCompileException; import org.apache.eagle.storage.operation.CompiledQuery; import org.apache.eagle.storage.operation.RawQuery; @@ -35,60 +34,77 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.UUID; +import java.util.*; public class TestJdbcStorage { - JdbcStorage storage; EntityDefinition entityDefinition; + long baseTimestamp; final static Logger LOG = LoggerFactory.getLogger(TestJdbcStorage.class); @Before - public void setUp() throws IOException, IllegalAccessException, InstantiationException, IllegalDataStorageTypeException { + public void setUp() throws Exception { + entityDefinition = EntityDefinitionManager.getEntityDefinitionByEntityClass(TestTimeSeriesAPIEntity.class); + entityDefinition.setTags(new String[]{"cluster","datacenter","random"}); storage = (JdbcStorage) DataStorageManager.getDataStorageByEagleConfig(); storage.init(); - entityDefinition = EntityDefinitionManager.getEntityDefinitionByEntityClass(TestTimeSeriesAPIEntity.class); + GregorianCalendar gc = new GregorianCalendar(); + gc.clear(); + gc.set(2014, 1, 6, 1, 40, 12); + gc.setTimeZone(TimeZone.getTimeZone("UTC")); + baseTimestamp = gc.getTime().getTime(); + System.out.println("timestamp:" + baseTimestamp); } - //@Test + @Test public void testReadBySimpleQuery() throws QueryCompileException, IOException { RawQuery rawQuery = new RawQuery(); - rawQuery.setQuery("TestTimeSeriesAPIEntity[]{*}"); - rawQuery.setStartTime("2014-01-06 01:40:02"); - rawQuery.setEndTime("2016-01-06 01:40:02"); + rawQuery.setQuery("TestTimeSeriesAPIEntity[@cluster=\"c4ut\"]{*}"); + System.out.println(DateTimeUtil.millisecondsToHumanDateWithSeconds(baseTimestamp)); + rawQuery.setStartTime(DateTimeUtil.millisecondsToHumanDateWithSeconds(baseTimestamp)); + rawQuery.setEndTime(DateTimeUtil.millisecondsToHumanDateWithMilliseconds(baseTimestamp+2000)); rawQuery.setPageSize(1000); CompiledQuery query = new CompiledQuery(rawQuery); QueryResult<TestTimeSeriesAPIEntity> result = storage.query(query, entityDefinition); Assert.assertNotNull(result); } -// @Test + @Test public void testReadByComplexQuery() throws QueryCompileException, IOException { RawQuery rawQuery = new RawQuery(); - rawQuery.setQuery("TestTimeSeriesAPIEntity[@cluster=\"cluster\" AND @field4 > 1000 AND @field7 CONTAINS \"subtext\" OR @jobID =\"jobID\" ]{@field1,@field2}"); - rawQuery.setStartTime("2015-01-06 01:40:02"); - rawQuery.setEndTime("2016-01-06 01:40:02"); + rawQuery.setQuery("TestTimeSeriesAPIEntity[@cluster=\"c4ut\" AND @field4 > 1000 OR @datacenter =\"d4ut\" ]{@field1,@field2}"); + rawQuery.setStartTime(DateTimeUtil.millisecondsToHumanDateWithSeconds(baseTimestamp)); + rawQuery.setEndTime(DateTimeUtil.millisecondsToHumanDateWithSeconds(baseTimestamp + 2000)); + rawQuery.setPageSize(1000); + CompiledQuery query = new CompiledQuery(rawQuery); + storage.query(query,entityDefinition); + } + + @Test + public void testReadByComplexQueryWithLike() throws QueryCompileException, IOException { + RawQuery rawQuery = new RawQuery(); + rawQuery.setQuery("TestTimeSeriesAPIEntity[@cluster=\"c4ut\" AND @field4 > 1000 AND @field7 CONTAINS \"99404f47e309\" OR @datacenter =\"d4ut\" ]{@field1,@field2}"); + rawQuery.setStartTime(DateTimeUtil.millisecondsToHumanDateWithSeconds(baseTimestamp)); + rawQuery.setEndTime(DateTimeUtil.millisecondsToHumanDateWithSeconds(baseTimestamp + 2000)); rawQuery.setPageSize(1000); CompiledQuery query = new CompiledQuery(rawQuery); storage.query(query,entityDefinition); } - //@Test + @Test public void testWrite() throws IOException { List<TestTimeSeriesAPIEntity> entityList = new ArrayList<TestTimeSeriesAPIEntity>(); - int i= 0; while( i++ < 1000){ - entityList.add(newInstance()); + TestTimeSeriesAPIEntity entity = newInstance(); + + entityList.add(entity); } ModifyResult<String> result = storage.create(entityList, entityDefinition); Assert.assertTrue(result.getSize() > 0); } - //@Test + @Test public void testWriteAndRead() throws IOException, QueryCompileException { // record insert init time long startTime = System.currentTimeMillis(); @@ -103,18 +119,18 @@ public class TestJdbcStorage { // record insertion finish time long endTime = System.currentTimeMillis(); - // init read in time range [startTime, endTime) + // init read in time range [startTime, endTime) RawQuery rawQuery = new RawQuery(); rawQuery.setQuery("TestTimeSeriesAPIEntity[]{*}"); rawQuery.setStartTime(DateTimeUtil.millisecondsToHumanDateWithSeconds(startTime)); - rawQuery.setEndTime(DateTimeUtil.millisecondsToHumanDateWithSeconds(endTime+1)); - rawQuery.setPageSize(1000000); + rawQuery.setEndTime(DateTimeUtil.millisecondsToHumanDateWithSeconds(endTime+1000)); + rawQuery.setPageSize(10000); CompiledQuery query = new CompiledQuery(rawQuery); QueryResult queryResult = storage.query(query, entityDefinition); Assert.assertTrue(queryResult.getSize() >= 1000); } - //@Test + @Test public void testWriteAndAggregation() throws IOException, QueryCompileException { // record insert init time long startTime = System.currentTimeMillis(); @@ -133,14 +149,14 @@ public class TestJdbcStorage { RawQuery rawQuery = new RawQuery(); rawQuery.setQuery("TestTimeSeriesAPIEntity[]<@cluster,@datacenter>{count,max(@field1),min(@field2),sum(@field3)}"); rawQuery.setStartTime(DateTimeUtil.millisecondsToHumanDateWithSeconds(startTime)); - rawQuery.setEndTime(DateTimeUtil.millisecondsToHumanDateWithSeconds(endTime)); + rawQuery.setEndTime(DateTimeUtil.millisecondsToHumanDateWithSeconds(endTime+1000)); rawQuery.setPageSize(1000000); CompiledQuery query = new CompiledQuery(rawQuery); QueryResult queryResult = storage.query(query, entityDefinition); Assert.assertTrue(queryResult.getSize() >= 1); } - //@Test + @Test public void testWriteAndDelete() throws IOException, QueryCompileException { // record insert init time long startTime = System.currentTimeMillis(); @@ -158,15 +174,15 @@ public class TestJdbcStorage { // delete in time range [startTime, endTime) RawQuery rawQuery = new RawQuery(); rawQuery.setQuery("TestTimeSeriesAPIEntity[]{*}"); - rawQuery.setStartTime(DateTimeUtil.millisecondsToHumanDateWithSeconds(startTime)); - rawQuery.setEndTime(DateTimeUtil.millisecondsToHumanDateWithSeconds(endTime)); + rawQuery.setStartTime(DateTimeUtil.millisecondsToHumanDateWithSeconds(startTime-1000)); + rawQuery.setEndTime(DateTimeUtil.millisecondsToHumanDateWithSeconds(endTime+1000)); rawQuery.setPageSize(1000000); CompiledQuery query = new CompiledQuery(rawQuery); ModifyResult<String> queryResult = storage.delete(query, entityDefinition); Assert.assertTrue(queryResult.getSize() >= 1000); } - //@Test + @Test public void testWriteAndUpdate() throws IOException, QueryCompileException { // Write 1000 entities List<TestTimeSeriesAPIEntity> entityList = new ArrayList<TestTimeSeriesAPIEntity>(); @@ -190,7 +206,7 @@ public class TestJdbcStorage { * * @throws IOException */ - //@Test +// @Test public void testWriterPerformance() throws IOException { StopWatch stopWatch = new StopWatch(); stopWatch.start(); @@ -221,13 +237,14 @@ public class TestJdbcStorage { instance.setTags(new HashMap<String, String>() {{ put("cluster", "c4ut"); put("datacenter", "d4ut"); + put("random",UUID.randomUUID().toString()); }}); instance.setTimestamp(System.currentTimeMillis()); return instance; } @Test - public void test() { + public void testInitSuccessfully() { } } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ffd84fa0/eagle-core/eagle-query/eagle-storage-jdbc/src/test/java/org/apache/eagle/storage/jdbc/conn/TestConnectionFactory.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-storage-jdbc/src/test/java/org/apache/eagle/storage/jdbc/conn/TestConnectionFactory.java b/eagle-core/eagle-query/eagle-storage-jdbc/src/test/java/org/apache/eagle/storage/jdbc/conn/TestConnectionFactory.java deleted file mode 100644 index 78b0210..0000000 --- a/eagle-core/eagle-query/eagle-storage-jdbc/src/test/java/org/apache/eagle/storage/jdbc/conn/TestConnectionFactory.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.storage.jdbc.conn; - -import junit.framework.Assert; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.sql.Connection; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.Statement; - -/** - * @since 3/27/15 - */ -public class TestConnectionFactory { - final static Logger LOG = LoggerFactory.getLogger(TestConnectionFactory.class); - -// @Test - public void testConnection(){ - try { - Connection connection = ConnectionManagerFactory.getInstance().getConnection(); - Statement statement = connection.createStatement(); - ResultSet resultSet = statement.executeQuery("select 1"); - Assert.assertTrue(resultSet.next()); - Assert.assertEquals(1, resultSet.getInt(1)); - } catch (SQLException e) { - LOG.error(e.getMessage(),e); - Assert.fail(e.getMessage()); - } catch (Exception e) { - e.printStackTrace(); - } - } - - @Test - public void test() { - - } -}