APEXMALHAR-1920 #resolve #comment Add JDBC dimension output operator to Malhar
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/6937c20b Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/6937c20b Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/6937c20b Branch: refs/heads/devel-3 Commit: 6937c20b100f57583ecc9fbcefc2fc77a1076fae Parents: 5cecce4 Author: Timothy Farkas <[email protected]> Authored: Sun Feb 21 20:10:54 2016 +0530 Committer: Timothy Farkas <[email protected]> Committed: Tue Mar 15 01:59:01 2016 -0700 ---------------------------------------------------------------------- .../db/jdbc/JDBCDimensionalOutputOperator.java | 464 +++++++++++++++++++ 1 file changed, 464 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/6937c20b/library/src/main/java/com/datatorrent/lib/db/jdbc/JDBCDimensionalOutputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/JDBCDimensionalOutputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/JDBCDimensionalOutputOperator.java new file mode 100644 index 0000000..3021521 --- /dev/null +++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/JDBCDimensionalOutputOperator.java @@ -0,0 +1,464 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.db.jdbc; + + +import java.sql.PreparedStatement; +import java.sql.SQLException; + +import java.util.List; +import java.util.Map; + +import javax.validation.constraints.Min; +import javax.validation.constraints.NotNull; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +import com.datatorrent.api.Context; + +import com.datatorrent.lib.appdata.gpo.GPOMutable; +import com.datatorrent.lib.appdata.schemas.DimensionalConfigurationSchema; +import com.datatorrent.lib.appdata.schemas.FieldsDescriptor; +import com.datatorrent.lib.appdata.schemas.Type; +import com.datatorrent.lib.db.AbstractPassThruTransactionableStoreOutputOperator; +import com.datatorrent.lib.dimensions.DimensionsDescriptor; +import com.datatorrent.lib.dimensions.DimensionsEvent.Aggregate; +import com.datatorrent.lib.dimensions.DimensionsEvent.EventKey; +import com.datatorrent.lib.dimensions.aggregator.AggregatorRegistry; + +import it.unimi.dsi.fastutil.ints.Int2ObjectMap; + +/** + * This operator writes updates emitted by a {@link DimensionsStoreHDHT} + * operator to a Mysql database. Updates are written to the database in the + * following fashion: <br/> + * <br/> + * <ol> + * <li>Aggregates are received from an upstream + * {@link AbstractDimensionsComputationFlexibleSingleSchema} operator.</li> + * <li>Each aggregate is written to a different table based on its dimension + * combination, time bucket, and corresponding aggregation</li> + * </ol> + */ [email protected] +public class JDBCDimensionalOutputOperator + extends AbstractPassThruTransactionableStoreOutputOperator<Aggregate, JdbcTransactionalStore> +{ + protected static int DEFAULT_BATCH_SIZE = 1000; + + @Min(1) + private int batchSize; + private final List<Aggregate> tuples; + + private transient int batchStartIdx; + + @NotNull + private Map<Integer, Map<String, String>> tableNames; + @NotNull + private String eventSchema; + @NotNull + private AggregatorRegistry aggregatorRegistry = AggregatorRegistry.DEFAULT_AGGREGATOR_REGISTRY; + private DimensionalConfigurationSchema schema; + + private transient Map<Integer, Map<Integer, PreparedStatement>> ddIDToAggIDToStatement = Maps.newHashMap(); + + public JDBCDimensionalOutputOperator() + { + tuples = Lists.newArrayList(); + batchSize = DEFAULT_BATCH_SIZE; + batchStartIdx = 0; + store = new JdbcTransactionalStore(); + } + + @Override + public void setup(Context.OperatorContext context) + { + super.setup(context); + + LOG.info("Done setting up super"); + aggregatorRegistry.setup(); + + //Create prepared statements + schema = new DimensionalConfigurationSchema(eventSchema, aggregatorRegistry); + + List<FieldsDescriptor> keyFDs = schema.getDimensionsDescriptorIDToKeyDescriptor(); + + for (int ddID = 0; ddID < keyFDs.size(); ddID++) { + + LOG.info("ddID {}", ddID); + FieldsDescriptor keyFD = keyFDs.get(ddID); + Int2ObjectMap<FieldsDescriptor> aggIDToAggFD = schema + .getDimensionsDescriptorIDToAggregatorIDToOutputAggregatorDescriptor().get(ddID); + + Map<Integer, PreparedStatement> aggIDToStatement = ddIDToAggIDToStatement.get(ddID); + + if (aggIDToStatement == null) { + aggIDToStatement = Maps.newHashMap(); + ddIDToAggIDToStatement.put(ddID, aggIDToStatement); + } + + for (Map.Entry<String, String> aggTable : tableNames.get(ddID).entrySet()) { + int aggID = aggregatorRegistry.getIncrementalAggregatorNameToID().get(aggTable.getKey()); + + LOG.info("aggID {}", aggID); + FieldsDescriptor aggFD = aggIDToAggFD.get(aggID); + + List<String> keyNames = keyFD.getFieldList(); + keyNames.remove(DimensionsDescriptor.DIMENSION_TIME_BUCKET); + + LOG.info("List fields {}", keyNames); + List<String> aggregateNames = aggFD.getFieldList(); + LOG.info("List fields {}", aggregateNames); + String tableName = aggTable.getValue(); + + String statementString = buildStatement(tableName, keyNames, aggregateNames); + + try { + aggIDToStatement.put(aggID, store.getConnection().prepareStatement(statementString)); + } catch (SQLException ex) { + throw new RuntimeException(ex); + } + } + } + } + + private String buildStatement(String tableName, List<String> keyNames, List<String> aggregateNames) + { + LOG.info("building statement"); + StringBuilder sb = new StringBuilder(); + sb.append("INSERT INTO "); + sb.append(tableName); + sb.append(" ("); + + addList(sb, keyNames); + sb.append(","); + addList(sb, aggregateNames); + + sb.append(") VALUES ("); + + for (int qCounter = 0;; qCounter++) { + sb.append("?"); + + if (qCounter == keyNames.size() + aggregateNames.size() - 1) { + break; + } + + sb.append(","); + } + + sb.append(") ON DUPLICATE KEY UPDATE "); + + addOnDuplicate(sb, aggregateNames); + + return sb.toString(); + } + + private void addOnDuplicate(StringBuilder sb, List<String> names) + { + LOG.info("add Duplicate"); + for (int index = 0;; index++) { + + String name = names.get(index); + sb.append(name); + sb.append("="); + sb.append("VALUES("); + sb.append(name); + sb.append(")"); + + if (index == names.size() - 1) { + break; + } + + sb.append(","); + } + } + + private void addList(StringBuilder sb, List<String> names) + { + for (int index = 0;; index++) { + sb.append(names.get(index)); + + if (index == names.size() - 1) { + break; + } + + sb.append(","); + } + } + + /** + * This sets the table names that corresponds to the dimensions combinations + * specified in your {@link DimensionalConfigurationSchema}. The structure of + * this property is as follows: <br/> + * <br/> + * <ol> + * <li>The first key is the dimension combination id assigned to a dimension + * combination in your {@link DimensionalConfigurationSchema}. <br/> + * <br/> + * The dimensions descriptor id is determined by the following factors: + * <ul> + * <li>The dimensions combination specified in the + * {@link DimensionalConfigurationSchema}.</li> + * <li>The the Time Buckets defined in your + * {@link DimensionalConfigurationSchema}.</li> + * </ul> + * The dimensions descriptor id is computed in the following way: + * <ol> + * <li>The first dimensions descriptor id is 0</li> + * <li>A dimension combination is selected</li> + * <li>A time bucket is selected</li> + * <li>The current dimension combination and time bucket pair is assigned a + * dimensions descriptor id</li> + * <li>The current dimensions descriptor id is incremented</li> + * <li>Steps 3 - 5 are repeated until all the time buckets are done</li> + * <li>Steps 2 - 6 are repeated until all the dimension combinations are done. + * </li> + * </ol> + * <br/> + * <</li> + * <li>The second key is the name of an aggregation being performed for that + * dimensions combination.</li> + * <li>The value is the name of the output Mysql table</li> + * </ol> + * + * @param tableNames + * The table names that corresponds to the dimensions combinations + * specified in your {@link DimensionalConfigurationSchema}. + */ + public void setTableNames(Map<Integer, Map<String, String>> tableNames) + { + this.tableNames = Preconditions.checkNotNull(tableNames); + } + + /** + * Sets the JSON corresponding to the {@link DimensionalConfigurationSchema} + * which was set on the upstream {@link AppDataSingleSchemaDimensionStoreHDHT} + * and {@link AbstractDimensionsComputationFlexibleSingleSchema} operators. + * + * @param eventSchema + * The JSON corresponding to the + * {@link DimensionalConfigurationSchema} which was set on the + * upstream {@link AppDataSingleSchemaDimensionStoreHDHT} and + * {@link AbstractDimensionsComputationFlexibleSingleSchema} + * operators. + */ + public void setEventSchema(String eventSchema) + { + this.eventSchema = eventSchema; + } + + /** + * Sets the {@link AggregatorRegistry} that is used to determine what + * aggregators correspond to what ids. + * + * @param aggregatorRegistry + * The {@link AggregatorRegistry} that is used to determine what + * aggregators correspond to what ids. + */ + public void setAggregatorRegistry(AggregatorRegistry aggregatorRegistry) + { + this.aggregatorRegistry = aggregatorRegistry; + } + + @Override + public void endWindow() + { + //Process any remaining tuples. + if (tuples.size() - batchStartIdx > 0) { + processBatch(); + } + super.endWindow(); + tuples.clear(); + batchStartIdx = 0; + } + + @Override + public void processTuple(Aggregate tuple) + { + tuples.add(tuple); + if ((tuples.size() - batchStartIdx) >= batchSize) { + processBatch(); + } + } + + /** + * Processes all the tuples in the batch once the batch size for the operator + * is reached. + */ + private void processBatch() + { + LOG.info("start {} end {}", batchStartIdx, tuples.size()); + try { + for (int i = batchStartIdx; i < tuples.size(); i++) { + setStatementParameters(tuples.get(i)); + } + + for (Map.Entry<Integer, Map<Integer, PreparedStatement>> ddIDToAggIDToStatementEntry : ddIDToAggIDToStatement + .entrySet()) { + for (Map.Entry<Integer, PreparedStatement> entry : ddIDToAggIDToStatementEntry.getValue().entrySet()) { + entry.getValue().executeBatch(); + entry.getValue().clearBatch(); + } + } + } catch (SQLException e) { + throw new RuntimeException("processing batch", e); + } finally { + batchStartIdx += tuples.size() - batchStartIdx; + } + } + + /** + * Sets the parameters on the {@link java.sql.PreparedStatement} based on the + * values in the given {@link Aggregate}. + * + * @param aggregate + * The {@link Aggregate} whose values will be set on the + * corresponding {@link java.sql.PreparedStatement}. + */ + private void setStatementParameters(Aggregate aggregate) + { + EventKey eventKey = aggregate.getEventKey(); + + int ddID = eventKey.getDimensionDescriptorID(); + int aggID = eventKey.getAggregatorID(); + + LOG.info("Setting statement params {} {}", ddID, aggID); + + FieldsDescriptor keyFD = schema.getDimensionsDescriptorIDToKeyDescriptor().get(ddID); + FieldsDescriptor aggFD = schema.getDimensionsDescriptorIDToAggregatorIDToOutputAggregatorDescriptor().get(ddID) + .get(aggID); + + GPOMutable key = eventKey.getKey(); + key.setFieldDescriptor(keyFD); + + GPOMutable value = aggregate.getAggregates(); + value.setFieldDescriptor(aggFD); + + int qCounter = 1; + + PreparedStatement ps = ddIDToAggIDToStatement.get(ddID).get(aggID); + + try { + qCounter = setParams(ps, key, qCounter, true); + setParams(ps, value, qCounter, false); + ps.addBatch(); + } catch (SQLException ex) { + throw new RuntimeException(ex); + } + } + + /** + * @param ps + * The {@link java.sql.PreparedStatement} which will do an insert + * into the Mysql database. + * @param gpo + * The {@link GPOMutable} object whose values need to be set in the + * preparted statement. + * @param qCounter + * The current index in the prepared statement + * @param isKey + * TODO use this + * @return The current index in the prepared statement. + * @throws SQLException + */ + private int setParams(PreparedStatement ps, GPOMutable gpo, int qCounter, boolean isKey) throws SQLException + { + FieldsDescriptor fd = gpo.getFieldDescriptor(); + + Map<String, Type> fieldToType = fd.getFieldToType(); + List<String> fields = fd.getFieldList(); + + for (int fieldCounter = 0; fieldCounter < fields.size(); fieldCounter++, qCounter++) { + String fieldName = fields.get(fieldCounter); + + if (fieldName.equals(DimensionsDescriptor.DIMENSION_TIME_BUCKET)) { + qCounter--; + continue; + } + + Type type = fieldToType.get(fieldName); + + LOG.info("Field Name {} {}", fieldName, qCounter); + + switch (type) { + case BOOLEAN: { + ps.setByte(qCounter, (byte)(gpo.getFieldBool(fieldName) ? 1 : 0)); + break; + } + case BYTE: { + ps.setByte(qCounter, gpo.getFieldByte(fieldName)); + break; + } + case CHAR: { + ps.setString(qCounter, Character.toString(gpo.getFieldChar(fieldName))); + break; + } + case STRING: { + ps.setString(qCounter, gpo.getFieldString(fieldName)); + break; + } + case SHORT: { + ps.setInt(qCounter, gpo.getFieldShort(fieldName)); + break; + } + case INTEGER: { + ps.setInt(qCounter, gpo.getFieldInt(fieldName)); + break; + } + case LONG: { + ps.setLong(qCounter, gpo.getFieldLong(fieldName)); + break; + } + case FLOAT: { + ps.setFloat(qCounter, gpo.getFieldFloat(fieldName)); + break; + } + case DOUBLE: { + ps.setDouble(qCounter, gpo.getFieldDouble(fieldName)); + break; + } + default: { + throw new UnsupportedOperationException("The type: " + type + " is not supported."); + } + } + } + + return qCounter; + } + + /** + * Sets the size of a batch operation.<br/> + * <b>Default:</b> {@value #DEFAULT_BATCH_SIZE} + * + * @param batchSize + * size of a batch + */ + public void setBatchSize(int batchSize) + { + this.batchSize = batchSize; + } + + private static final Logger LOG = LoggerFactory.getLogger(AbstractJdbcTransactionableOutputOperator.class); +} +
