APEXMALHAR-2181 Cassandra Upsert Operator
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/664257b4 Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/664257b4 Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/664257b4 Branch: refs/heads/master Commit: 664257b4a90d200137c4254428aa806dcd55c92d Parents: 05a7ca3 Author: ananthc <[email protected]> Authored: Tue Nov 22 07:26:12 2016 +1100 Committer: ananthc <[email protected]> Committed: Tue Nov 22 07:26:12 2016 +1100 ---------------------------------------------------------------------- contrib/pom.xml | 8 +- .../cassandra/AbstractUpsertOutputOperator.java | 1012 ++++++++++++++++++ .../cassandra/CassandraPOJOInputOperator.java | 2 +- .../cassandra/CassandraPOJOOutputOperator.java | 2 +- .../contrib/cassandra/CassandraPojoUtils.java | 316 ++++++ .../CassandraPreparedStatementGenerator.java | 287 +++++ .../cassandra/ConnectionStateManager.java | 477 +++++++++ .../cassandra/UpsertExecutionContext.java | 210 ++++ .../AbstractUpsertOutputOperatorCodecsTest.java | 476 ++++++++ ...ractUpsertOutputOperatorCompositePKTest.java | 99 ++ ...bstractUpsertOutputOperatorCountersTest.java | 120 +++ .../datatorrent/contrib/cassandra/Address.java | 82 ++ .../contrib/cassandra/AddressCodec.java | 85 ++ .../cassandra/CassandraOperatorTest.java | 47 +- .../cassandra/CompositePrimaryKeyRow.java | 98 ++ .../CompositePrimaryKeyUpdateOperator.java | 58 + .../cassandra/CounterColumnTableEntry.java | 47 + .../cassandra/CounterColumnUpdatesOperator.java | 54 + .../datatorrent/contrib/cassandra/FullName.java | 52 + .../contrib/cassandra/FullNameCodec.java | 80 ++ .../com/datatorrent/contrib/cassandra/User.java | 130 +++ .../contrib/cassandra/UserUpsertOperator.java | 97 ++ 22 files changed, 3817 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/664257b4/contrib/pom.xml ---------------------------------------------------------------------- diff --git a/contrib/pom.xml b/contrib/pom.xml index 84a7e05..566d03d 100755 --- a/contrib/pom.xml +++ b/contrib/pom.xml @@ -487,13 +487,13 @@ <dependency> <groupId>com.datastax.cassandra</groupId> <artifactId>cassandra-driver-core</artifactId> - <version>2.1.8</version> + <version>3.1.0</version> <optional>true</optional> </dependency> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> - <version>14.0.1</version> + <version>16.0.1</version> <scope>provided</scope> <optional>true</optional> </dependency> @@ -639,13 +639,13 @@ <artifactId>*</artifactId> </exclusion> </exclusions> - </dependency> + </dependency> <dependency> <groupId>com.googlecode.json-simple</groupId> <artifactId>json-simple</artifactId> <version>1.1.1</version> <optional>true</optional> - </dependency> + </dependency> <dependency> <groupId>com.github.fge</groupId> <artifactId>json-schema-validator</artifactId> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/664257b4/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractUpsertOutputOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractUpsertOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractUpsertOutputOperator.java new file mode 100644 index 0000000..95f98fe --- /dev/null +++ b/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractUpsertOutputOperator.java @@ -0,0 +1,1012 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.cassandra; + +import java.io.IOException; +import java.io.InputStream; +import java.lang.reflect.Field; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; + + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.lib.wal.FSWindowDataManager; +import org.apache.apex.malhar.lib.wal.WindowDataManager; +import org.apache.hadoop.classification.InterfaceStability; + +import com.codahale.metrics.Timer; + +import com.datastax.driver.core.BoundStatement; +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.CodecRegistry; +import com.datastax.driver.core.ColumnMetadata; +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.DataType; +import com.datastax.driver.core.Host; +import com.datastax.driver.core.KeyspaceMetadata; +import com.datastax.driver.core.Metrics; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.TableMetadata; +import com.datastax.driver.core.TypeCodec; + + + +import com.datatorrent.api.AutoMetric; +import com.datatorrent.api.Context; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.Operator; +import com.datatorrent.api.annotation.InputPortFieldAnnotation; +import com.datatorrent.api.annotation.Stateless; +import com.datatorrent.common.util.BaseOperator; + + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * An abstract operator that is used to mutate cassandra rows using PreparedStatements for faster executions + * and accommodates EXACTLY_ONCE Semantics if concrete implementations choose to implement an abstract method with + * meaningful implementation (as Cassandra is not a pure transactional database , the burden is on the concrete + * implementation of the operator ONLY during the reconciliation window (and not for any other windows). + * + * The typical implementation model is as follows : + * 1. Create a concrete implementation of this class by extending this class and implement a few methods. + * 2. Define the payload that is the POJO that represents a Cassandra Row is part of this execution context + * {@link UpsertExecutionContext}. The payload is a template Parameter of this class + * 3. The Upstream operator that wants to write to Cassandra does the following + * a. Create an instance of {@link UpsertExecutionContext} + * b. Set the payload ( an instance of the POJO created as step two above ) + * c. Set additional execution context parameters like CollectionHandling style, List placement Styles + * overriding TTLs, Update only if Primary keys exist and Consistency Levels etc. + * 4. The concrete implementation would then execute this context as a cassandra row mutation + * + * This operator supports the following features + * 1. Highly customizable Connection policies. This is achieved by specifying the ConnectionStateManager. + * There are quite a few connection management aspects that can be + * controlled via {@link ConnectionStateManager} like consistency, load balancing, connection retries, + * table to use, keyspace to use etc. Please refer javadoc of {@link ConnectionStateManager} + * 2. Support for Collections : Map, List and Sets are supported + * User Defined types as part of collections is also supported. + * 3. Support exists for both adding to an existing collection or removing entries from an existing collection. + * The POJO field that represents a collection is used to represent the collection that is added or removed. + * Thus this can be used to avoid a pattern of read and then write the final value into the cassandra column + * which can be used for low latency / high write pattern applications as we can avoid a read in the process. + * 4. Supports List Placements : The execution context can be used to specify where the new incoming list + * is to be added ( in case there is an existing list in the current column of the current row being mutated. + * Supported options are APPEND or PREPEND to an existing list + * 5. Support for User Defined Types. A pojo can have fields that represent the Cassandra Columns that are custom + * user defined types. Concrete implementations of the operator provide a mapping of the cassandra column name + * to the TypeCodec that is to be used for that field inside cassandra. Please refer javadocs of + * {@link this.getCodecsForUserDefinedTypes() } for more details + * 6. Support for custom mapping of POJO payload field names to that of cassandra columns. Practically speaking, + * POJO field names might not always match with Cassandra Column names and hence this support. This will also avoid + * writing a POJO just for the cassandra operator and thus an existing POJO can be passed around to this operator. + * Please refer javadoc {@link this.getPojoFieldNameToCassandraColumnNameOverride()} for an example + * 7. TTL support - A default TTL can be set for the Connection ( via {@link ConnectionStateManager} and then used + * for all mutations. This TTL can further be overridden at a tuple execution level to accomodate use cases of + * setting custom column expirations typically useful in wide row implementations. + * 8. Support for Counter Column tables. Counter tables are also supported with the values inside the incoming + * POJO added/subtracted from the counter column accordingly. Please note that the value is not absolute set but + * rather representing the value that needs to be added to or subtracted from the current counter. + * 9. Support for Composite Primary Keys is also supported. All the POJO fields that map to the composite + * primary key are used to resolve the primary key in case of a Composite Primary key table + * 10. Support for conditional updates : This operator can be used as an Update Only operator as opposed to an + * Upsert operator. i.e. Update only IF EXISTS . This is achieved by setting the appropriate boolean in the + * {@link UpsertExecutionContext} tuple that is passed from the upstream operator. + * 11. Lenient mapping of POJO fields to Cassandra column names. By default the POJO field names are case insensitive + * to cassandra column names. This can be further enhanced by over-riding mappings. Please refer feature 6 above. + * 12. Defaults can be overridden at at tuple execution level for TTL & Consistency Policies + * 13. Support for handling Nulls i.e. whether null values in the POJO are to be persisted as is or to be ignored so + * that the application need not perform a read to populate a POJO field if it is not available in the context + * 14. A few autometrics are provided for monitoring the latency aspects of the cassandra cluster + */ + [email protected] +public abstract class AbstractUpsertOutputOperator extends BaseOperator implements + Operator.ActivationListener<Context.OperatorContext>, Operator.CheckpointNotificationListener +{ + + protected ConnectionStateManager connectionStateManager; + + private WindowDataManager windowDataManager; + + private long currentWindowId; + + private transient boolean isInSafeMode; + + private transient long reconcilingWindowId; + + private transient boolean isInReconcilingMode; + + protected transient Session session; + + protected transient Cluster cluster; + + transient Map<String, TypeCodec> complexTypeCodecs; + + transient Map<String, Class> userDefinedTypesClass; + + transient Map<String, DataType> columnDefinitions; + + transient Map<String, String> colNamesMap; + + transient Set<String> pkColumnNames; + + transient Set<String> counterColumns; + + transient Set<String> collectionColumns; + + transient Set<String> listColumns; + + transient Set<String> mapColumns; + + transient Set<String> setColumns; + + transient Set<String> userDefinedTypeColumns; + + transient Set<String> regularColumns; + + protected Map<String, Object> getters; + + protected Map<String, TypeCodec> codecsForCassandraColumnNames; + + CassandraPreparedStatementGenerator cassandraPreparedStatementGenerationUtil; + + transient Map<Long, PreparedStatement> preparedStatementTypes; + transient Class<?> tuplePayloadClass; + + private static final transient Logger LOG = LoggerFactory.getLogger(AbstractUpsertOutputOperator.class); + + private static final String CASSANDRA_CONNECTION_PROPS_FILENAME = "CassandraOutputOperator.properties"; + + // Metrics + + @AutoMetric + transient long ignoredRequestsDuetoIfExistsCheck = 0; + + @AutoMetric + transient long successfullWrites = 0; + + @AutoMetric + long totalWriteRequestsAttempted = 0; + + @AutoMetric + transient int numberOfHostsWrittenTo = 0; + + @AutoMetric + transient double fifteenMinuteWriteRateLatency = 0.0; + + @AutoMetric + transient double fiveMinuteWriteRateLatency = 0.0; + + @AutoMetric + transient double oneMinuteWriteRateLatency = 0.0; + + @AutoMetric + transient double meanWriteRateLatency = 0.0; + + @AutoMetric + transient long totalIgnoresInThisWindow = 0; + + @AutoMetric + long totalIgnoresSinceStart = 0; + + @AutoMetric + transient long totalWriteTimeoutsInThisWindow = 0; + + @AutoMetric + long totalWriteTimeoutsSinceStart = 0; + + @AutoMetric + transient long totalWriteRetriesInThisWindow = 0; + + @AutoMetric + long totalWriteRetriesSinceStart = 0; + + @AutoMetric + transient long writesWithConsistencyOne = 0; + + @AutoMetric + transient long writesWithConsistencyTwo = 0; + + @AutoMetric + transient long writesWithConsistencyThree = 0; + + @AutoMetric + transient long writesWithConsistencyAll = 0; + + @AutoMetric + transient long writesWithConsistencyLocalOne = 0; + + @AutoMetric + transient long writesWithConsistencyQuorum = 0; + + @AutoMetric + transient long writesWithConsistencyLocalQuorum = 0; + + @AutoMetric + transient long writeWithConsistencyLocalSerial = 0; + + @AutoMetric + transient long writesWithConsistencyEachQuorum = 0; + + @AutoMetric + transient long writesWithConsistencySerial = 0; + + @AutoMetric + transient long writesWithConsistencyAny = 0; + + transient Set<Host> uniqueHostsWrittenToInCurrentWindow; + + + @InputPortFieldAnnotation + public final transient DefaultInputPort<UpsertExecutionContext> input = new DefaultInputPort<UpsertExecutionContext>() + { + @Override + public void process(final UpsertExecutionContext tuple) + { + if (!isEligbleForPassivation(tuple)) { + return; + } + BoundStatement stmnt = setDefaultsAndPrepareBoundStatement(tuple); + ResultSet result = session.execute(stmnt); + updatePerRowMetric(result); + } + }; // end of input port implementation + + @Override + public void setup(Context.OperatorContext context) + { + super.setup(context); + windowDataManager = getWindowDataManager(); + if ( windowDataManager == null) { + windowDataManager = new FSWindowDataManager(); + } + windowDataManager.setup(context); + } + + @Override + public void teardown() + { + super.teardown(); + if (null != windowDataManager) { + windowDataManager.teardown(); + } + } + + + /** + * Primarily resets the per window counter metrics. + * @param windowId The windowid as provided by the apex framework + */ + @Override + public void beginWindow(long windowId) + { + super.beginWindow(windowId); + totalIgnoresInThisWindow = 0; + totalWriteTimeoutsInThisWindow = 0; + totalWriteRetriesInThisWindow = 0; + uniqueHostsWrittenToInCurrentWindow.clear(); + successfullWrites = 0; + ignoredRequestsDuetoIfExistsCheck = 0; + writesWithConsistencyOne = 0; + writesWithConsistencyTwo = 0; + writesWithConsistencyThree = 0; + writesWithConsistencyAll = 0; + writesWithConsistencyLocalOne = 0; + writesWithConsistencyQuorum = 0; + writesWithConsistencyLocalQuorum = 0; + writeWithConsistencyLocalSerial = 0; + writesWithConsistencyEachQuorum = 0; + writesWithConsistencySerial = 0; + writesWithConsistencyAny = 0; + currentWindowId = windowId; + if ( currentWindowId != Stateless.WINDOW_ID) { + if (currentWindowId > reconcilingWindowId) { + isInSafeMode = false; + isInReconcilingMode = false; + } + if (currentWindowId == reconcilingWindowId) { + isInReconcilingMode = true; + isInSafeMode = false; + } + if (currentWindowId < reconcilingWindowId) { + isInReconcilingMode = false; + isInSafeMode = true; + } + } + } + + /** + * Builds the metrics that can be sent to Application master. + * Note that some of the metrics are computed in the cassandra driver itself and hence just + * extracted from the driver state itself. + */ + @Override + public void endWindow() + { + super.endWindow(); + Timer timerForThisWindow = session.getCluster().getMetrics().getRequestsTimer(); + totalWriteRequestsAttempted += timerForThisWindow.getCount(); + numberOfHostsWrittenTo = uniqueHostsWrittenToInCurrentWindow.size(); + fifteenMinuteWriteRateLatency = timerForThisWindow.getFifteenMinuteRate(); + fiveMinuteWriteRateLatency = timerForThisWindow.getFiveMinuteRate(); + oneMinuteWriteRateLatency = timerForThisWindow.getOneMinuteRate(); + meanWriteRateLatency = timerForThisWindow.getMeanRate(); + Metrics.Errors errors = session.getCluster().getMetrics().getErrorMetrics(); + totalIgnoresInThisWindow = errors.getIgnores().getCount() - totalIgnoresSinceStart; + totalIgnoresSinceStart = errors.getIgnores().getCount(); + totalWriteTimeoutsInThisWindow = errors.getWriteTimeouts().getCount() - totalWriteTimeoutsSinceStart; + totalWriteTimeoutsSinceStart = errors.getWriteTimeouts().getCount(); + totalWriteRetriesInThisWindow = errors.getRetriesOnWriteTimeout().getCount() - totalWriteRetriesSinceStart; + totalWriteRetriesSinceStart = errors.getRetriesOnWriteTimeout().getCount(); + try { + // we do not need any particular state and hence reusing the window id itself + windowDataManager.save(currentWindowId,currentWindowId); + } catch (IOException e) { + LOG.error("Error while persisting the current window state " + currentWindowId + " because " + e.getMessage()); + throw new RuntimeException(e.getMessage()); + } + } + + /** + * Initializes cassandra cluster connections as specified by the Connection State manager. + * Aspects that are initialized here include Identifying primary key column names, non-primary key columns, + * collection type columns, counter columns + * It also queries the Keyspace and Table metadata for extracting the above information. + * It finally prepares all possible prepared statements that can be executed in the lifetime of the operator + * for various permutations like APPEND/REMOVE COLLECTIONS , LIST APPEND/PREPEND , set nulls, set TTLs etc + * @param context The apex framework context + */ + @Override + public void activate(Context.OperatorContext context) + { + ConnectionStateManager.ConnectionBuilder connectionBuilder = withConnectionBuilder(); + if (connectionBuilder == null) { + connectionBuilder = buildConnectionBuilderFromPropertiesFile(); + } + checkNotNull(connectionBuilder, " Connection Builder cannot be null."); + connectionStateManager = connectionBuilder.initialize(); + cluster = connectionStateManager.getCluster(); + session = connectionStateManager.getSession(); + checkNotNull(session, "Cassandra session cannot be null"); + tuplePayloadClass = getPayloadPojoClass(); + columnDefinitions = new HashMap<>(); + counterColumns = new HashSet<>(); + collectionColumns = new HashSet<>(); + pkColumnNames = new HashSet<>(); + listColumns = new HashSet<>(); + mapColumns = new HashSet<>(); + setColumns = new HashSet<>(); + codecsForCassandraColumnNames = new HashMap<>(); + userDefinedTypeColumns = new HashSet<>(); + regularColumns = new HashSet<>(); + colNamesMap = new HashMap<>(); + getters = new HashMap<>(); + userDefinedTypesClass = new HashMap<>(); + uniqueHostsWrittenToInCurrentWindow = new HashSet<>(); + registerCodecs(); + KeyspaceMetadata keyspaceMetadata = cluster.getMetadata() + .getKeyspace(connectionStateManager.getKeyspaceName()); + TableMetadata tableMetadata = keyspaceMetadata + .getTable(connectionStateManager.getTableName()); + registerPrimaryKeyColumnDefinitions(tableMetadata); + registerNonPKColumnDefinitions(tableMetadata); + preparedStatementTypes = new HashMap<>(); + generatePreparedStatements(); + registerGettersForPayload(); + isInSafeMode = false; + isInReconcilingMode = false; + reconcilingWindowId = Stateless.WINDOW_ID; + if ( (context.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID) != Stateless.WINDOW_ID) && + context.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID) < + windowDataManager.getLargestCompletedWindow()) { + isInSafeMode = true; + reconcilingWindowId = windowDataManager.getLargestCompletedWindow() + 1; + isInReconcilingMode = false; + } + } + + @Override + public void deactivate() + { + connectionStateManager.close(); + } + + + @Override + public void committed(long windowId) + { + try { + windowDataManager.committed(windowId); + } catch (IOException e) { + LOG.error("Error while committing the window id " + windowId + " because " + e.getMessage()); + throw new RuntimeException(e.getMessage()); + } + } + + @Override + public void beforeCheckpoint(long windowId) + { + // nothing to be done here. Prevent concrete implementations to be forced to implement this + } + + @Override + public void checkpointed(long windowId) + { + // Nothing to be done here. Concrete operators can override if needed. + } + + private ConnectionStateManager.ConnectionBuilder buildConnectionBuilderFromPropertiesFile() + { + ConnectionStateManager.ConnectionBuilder propFileBasedConnectionBuilder = null; + Properties config = new Properties(); + try (InputStream cassandraPropsFile = getClass().getClassLoader().getResourceAsStream( + CASSANDRA_CONNECTION_PROPS_FILENAME)) { + config.load(cassandraPropsFile); + propFileBasedConnectionBuilder = new ConnectionStateManager.ConnectionBuilder(); + return propFileBasedConnectionBuilder + .withClusterNameAs(config.getProperty(ConnectionStateManager.ConnectionBuilder.CLUSTER_NAME_IN_PROPS_FILE)) + .withDCNameAs(config.getProperty(ConnectionStateManager.ConnectionBuilder.DC_NAME_IN_PROPS_FILE)) + .withKeySpaceNameAs(config.getProperty(ConnectionStateManager.ConnectionBuilder.KEYSPACE_NAME_IN_PROPS_FILE)) + .withTableNameAs(config.getProperty(ConnectionStateManager.ConnectionBuilder.TABLE_NAME_IN_PROPS_FILE)) + .withSeedNodes(config.getProperty(ConnectionStateManager.ConnectionBuilder.SEEDNODES_IN_PROPS_FILE)); + } catch (Exception ex) { + LOG.error("Error while trying to load cassandra config from properties file " + + CASSANDRA_CONNECTION_PROPS_FILENAME + " because " + ex.getMessage(), ex); + return null; + } + } + + protected boolean isEligbleForPassivation(final UpsertExecutionContext tuple) + { + if (isInSafeMode) { + return false; + } + if (isInReconcilingMode) { + return reconcileRecord(tuple,currentWindowId); + } + return true; + } + + private BoundStatement setDefaultsAndPrepareBoundStatement(UpsertExecutionContext tuple) + { + UpsertExecutionContext.NullHandlingMutationStyle nullHandlingMutationStyle = tuple.getNullHandlingMutationStyle(); + if (UpsertExecutionContext.NullHandlingMutationStyle.UNDEFINED == nullHandlingMutationStyle) { + nullHandlingMutationStyle = UpsertExecutionContext.NullHandlingMutationStyle.SET_NULL_COLUMNS; + } + boolean setNulls = true; + if (nullHandlingMutationStyle != UpsertExecutionContext.NullHandlingMutationStyle.SET_NULL_COLUMNS) { + setNulls = false; + } + UpsertExecutionContext.CollectionMutationStyle collectionMutationStyle = tuple.getCollectionMutationStyle(); + if ((collectionMutationStyle == null) || + (collectionMutationStyle == UpsertExecutionContext.CollectionMutationStyle.UNDEFINED) ) { + tuple.setCollectionMutationStyle(UpsertExecutionContext.CollectionMutationStyle.ADD_TO_EXISTING_COLLECTION); + } + UpsertExecutionContext.ListPlacementStyle listPlacementStyle = tuple.getListPlacementStyle(); + if ( (listPlacementStyle == null) || (listPlacementStyle == UpsertExecutionContext.ListPlacementStyle.UNDEFINED) ) { + tuple.setListPlacementStyle(UpsertExecutionContext.ListPlacementStyle.APPEND_TO_EXISTING_LIST); + } + PreparedStatement preparedStatement = resolvePreparedStatementForCurrentExecutionContext(tuple); + BoundStatement stmnt = processPayloadForExecution(preparedStatement, tuple, setNulls); + if ((tuple.isTtlOverridden()) || (connectionStateManager.isTTLSet())) { + int ttlToUse = connectionStateManager.getDefaultTtlInSecs(); + if (tuple.isTtlOverridden()) { + ttlToUse = tuple.getOverridingTTL(); + } + stmnt.setInt(CassandraPreparedStatementGenerator.TTL_PARAM_NAME, ttlToUse); + } + if (tuple.isOverridingConsistencyLevelSet()) { + ConsistencyLevel currentConsistencyLevel = tuple.getOverridingConsistencyLevel(); + if (currentConsistencyLevel.isSerial()) { + stmnt.setSerialConsistencyLevel(tuple.getOverridingConsistencyLevel()); + } else { + stmnt.setConsistencyLevel(tuple.getOverridingConsistencyLevel()); + } + } + LOG.debug("Executing statement " + preparedStatement.getQueryString()); + return stmnt; + } + + private void updatePerRowMetric(ResultSet result) + { + uniqueHostsWrittenToInCurrentWindow.add(result.getExecutionInfo().getQueriedHost()); + updateConsistencyLevelMetrics(result.getExecutionInfo().getAchievedConsistencyLevel()); + successfullWrites += 1; + if (!result.wasApplied()) { + ignoredRequestsDuetoIfExistsCheck += 1; + } + } + + private void updateConsistencyLevelMetrics(ConsistencyLevel resultConsistencyLevel) + { + if (resultConsistencyLevel == null) { + return; + } + switch (resultConsistencyLevel) { + case ALL: + writesWithConsistencyAll += 1; + break; + case ANY: + writesWithConsistencyAny += 1; + break; + case EACH_QUORUM: + writesWithConsistencyEachQuorum += 1; + break; + case LOCAL_ONE: + writesWithConsistencyLocalOne += 1; + break; + case LOCAL_QUORUM: + writesWithConsistencyLocalQuorum += 1; + break; + case LOCAL_SERIAL: + writeWithConsistencyLocalSerial += 1; + break; + case ONE: + writesWithConsistencyOne += 1; + break; + case QUORUM: + writesWithConsistencyQuorum += 1; + break; + case SERIAL: + writesWithConsistencySerial += 1; + break; + case THREE: + writesWithConsistencyThree += 1; + break; + case TWO: + writesWithConsistencyTwo += 1; + break; + default: + break; + } + } + + /** + * Shortlists the prepared statement from a cache that is populated initially. + * @param tuple The execution context that is used to mutate the current cassandra row + * @return The prepared statement that is applicable for the current execution context + */ + private PreparedStatement resolvePreparedStatementForCurrentExecutionContext(UpsertExecutionContext tuple) + { + EnumSet<OperationContext> operationContextValue = EnumSet.noneOf(OperationContext.class); + + UpsertExecutionContext.CollectionMutationStyle collectionMutationStyle = tuple.getCollectionMutationStyle(); + if (collectionMutationStyle != null) { + if (collectionMutationStyle == UpsertExecutionContext.CollectionMutationStyle.ADD_TO_EXISTING_COLLECTION) { + operationContextValue.add(OperationContext.COLLECTIONS_APPEND); + } + if (collectionMutationStyle == UpsertExecutionContext.CollectionMutationStyle.REMOVE_FROM_EXISTING_COLLECTION) { + operationContextValue.add(OperationContext.COLLECTIONS_REMOVE); + } + } + UpsertExecutionContext.ListPlacementStyle listPlacementStyle = tuple.getListPlacementStyle(); + boolean isListContextSet = false; + if ((listPlacementStyle != null) && (collectionMutationStyle == + UpsertExecutionContext.CollectionMutationStyle.ADD_TO_EXISTING_COLLECTION)) { + if (listPlacementStyle == UpsertExecutionContext.ListPlacementStyle.APPEND_TO_EXISTING_LIST) { + operationContextValue.add(OperationContext.LIST_APPEND); + isListContextSet = true; + } + if (listPlacementStyle == UpsertExecutionContext.ListPlacementStyle.PREPEND_TO_EXISTING_LIST) { + operationContextValue.add(OperationContext.LIST_PREPEND); + isListContextSet = true; + } + } + if (!isListContextSet) { + // use cases when remove is specified but we do not want to build separate prepared statments + operationContextValue.add(OperationContext.LIST_APPEND); + } + if ((connectionStateManager.isTTLSet()) || (tuple.isTtlOverridden())) { + operationContextValue.add(OperationContext.TTL_SET); + } else { + operationContextValue.add(OperationContext.TTL_NOT_SET); + } + if (tuple.isUpdateOnlyIfPrimaryKeyExists()) { + operationContextValue.add(OperationContext.IF_EXISTS_CHECK_PRESENT); + } else { + operationContextValue.add(OperationContext.IF_EXISTS_CHECK_ABSENT); + } + return preparedStatementTypes.get(CassandraPreparedStatementGenerator + .getSlotIndexForMutationContextPreparedStatement(operationContextValue)); + } + + /** + * Generates a Boundstatement that can be executed for the given incoming tuple. This boundstatement is then + * executed as a command + * @param ps The prepared statement that was shortlisted to execute the given tuple + * @param tuple The tuple that represents the current execution context + * @param setNulls This represents the value whether the columns in the prepared statement need to be ignored or + * considered + * @return The boundstatement appropriately built + */ + @SuppressWarnings(value = "unchecked") + private BoundStatement processPayloadForExecution(final PreparedStatement ps, final UpsertExecutionContext tuple, + final boolean setNulls) + { + BoundStatement boundStatement = ps.bind(); + Object pojoPayload = tuple.getPayload(); + for (String cassandraColName : getters.keySet()) { + DataType dataType = columnDefinitions.get(cassandraColName); + CassandraPojoUtils.populateBoundStatementWithValue(boundStatement,getters,dataType,cassandraColName, + pojoPayload,setNulls,codecsForCassandraColumnNames); + } + return boundStatement; + } + + + /** + * Builds th map that manages the getters for a given cassandra column + * Aspects like case-insensitiveness , over-riding of column names to custom mappings + */ + private void registerGettersForPayload() + { + Field[] classFields = tuplePayloadClass.getDeclaredFields(); + Set<String> allColNames = new HashSet<>(); + Map<String, DataType> dataTypeMap = new HashMap<>(); + Map<String,String> overridingColnamesMap = getPojoFieldNameToCassandraColumnNameOverride(); + if ( overridingColnamesMap == null) { + overridingColnamesMap = new HashMap<>(); + } + allColNames.addAll(pkColumnNames); + allColNames.addAll(regularColumns); + Set<String> normalizedColNames = new HashSet<>(); + Iterator<String> simpleColIterator = allColNames.iterator(); + while (simpleColIterator.hasNext()) { + String aCol = simpleColIterator.next(); + normalizedColNames.add(aCol.toLowerCase()); + dataTypeMap.put(aCol.toLowerCase(), columnDefinitions.get(aCol)); + colNamesMap.put(aCol.toLowerCase(), aCol); + codecsForCassandraColumnNames.put(aCol, complexTypeCodecs.get(aCol.toLowerCase())); + } + for (Field aField : classFields) { + String aFieldName = aField.getName(); + if ( (normalizedColNames.contains(aFieldName.toLowerCase())) || + (overridingColnamesMap.containsKey(aFieldName)) ) { + + String getterExpr = aFieldName; + DataType returnDataTypeOfGetter = dataTypeMap.get(aFieldName.toLowerCase()); + if (returnDataTypeOfGetter == null) { + returnDataTypeOfGetter = dataTypeMap.get(overridingColnamesMap.get(aFieldName)); + } + Object getter = CassandraPojoUtils.resolveGetterForField(tuplePayloadClass,getterExpr, + returnDataTypeOfGetter,userDefinedTypesClass); + String resolvedColumnName = colNamesMap.get(aFieldName.toLowerCase()); + if (overridingColnamesMap.containsKey(aFieldName)) { + resolvedColumnName = overridingColnamesMap.get(aFieldName); + } + getters.put(resolvedColumnName, getter); + } + } + } + + private void registerCodecs() + { + complexTypeCodecs = getCodecsForUserDefinedTypes(); + if (complexTypeCodecs != null) { + CodecRegistry registry = cluster.getConfiguration().getCodecRegistry(); + if (cluster.getConfiguration().getProtocolOptions().getProtocolVersion().toInt() < 4) { + LOG.error("Custom codecs are not supported for protocol version < 4"); + throw new RuntimeException("Custom codecs are not supported for protocol version < 4"); + } + for (String typeCodecStr : complexTypeCodecs.keySet()) { + TypeCodec codec = complexTypeCodecs.get(typeCodecStr); + registry.register(codec); + userDefinedTypesClass.put(typeCodecStr, codec.getJavaType().getRawType()); + } + } else { + complexTypeCodecs = new HashMap<>(); + } + } + + private void registerNonPKColumnDefinitions(final TableMetadata tableMetadata) + { + List<ColumnMetadata> colInfoForTable = tableMetadata.getColumns(); + for (ColumnMetadata aColumnDefinition : colInfoForTable) { + if (aColumnDefinition.getType().isCollection()) { + collectionColumns.add(aColumnDefinition.getName()); + } + if (!pkColumnNames.contains(aColumnDefinition.getName())) { + columnDefinitions.put(aColumnDefinition.getName(), aColumnDefinition.getType()); + regularColumns.add(aColumnDefinition.getName()); + } + parseForSpecialDataType(aColumnDefinition); + } + } + + private void parseForSpecialDataType(final ColumnMetadata aColumnDefinition) + { + switch (aColumnDefinition.getType().getName()) { + case COUNTER: + counterColumns.add(aColumnDefinition.getName()); + break; + case MAP: + mapColumns.add(aColumnDefinition.getName()); + break; + case SET: + setColumns.add(aColumnDefinition.getName()); + break; + case LIST: + listColumns.add(aColumnDefinition.getName()); + break; + case UDT: + userDefinedTypeColumns.add(aColumnDefinition.getName()); + break; + default: + break; + } + } + + private void registerPrimaryKeyColumnDefinitions(final TableMetadata tableMetadata) + { + List<ColumnMetadata> primaryKeyColumns = tableMetadata.getPrimaryKey(); + for (ColumnMetadata primaryColumn : primaryKeyColumns) { + columnDefinitions.put(primaryColumn.getName(), primaryColumn.getType()); + pkColumnNames.add(primaryColumn.getName()); + parseForSpecialDataType(primaryColumn); + } + } + + private void generatePreparedStatements() + { + cassandraPreparedStatementGenerationUtil = new CassandraPreparedStatementGenerator( + pkColumnNames, counterColumns, listColumns, + mapColumns, setColumns, columnDefinitions); + cassandraPreparedStatementGenerationUtil.generatePreparedStatements(session, preparedStatementTypes, + connectionStateManager.getKeyspaceName(), connectionStateManager.getTableName()); + } + + public Map<String, DataType> getColumnDefinitions() + { + return columnDefinitions; + } + + public void setColumnDefinitions(final Map<String, DataType> columnDefinitions) + { + this.columnDefinitions = columnDefinitions; + } + + public Map<String, Class> getUserDefinedTypesClass() + { + return userDefinedTypesClass; + } + + public void setUserDefinedTypesClass(final Map<String, Class> userDefinedTypesClass) + { + this.userDefinedTypesClass = userDefinedTypesClass; + } + + public Set<String> getPkColumnNames() + { + return pkColumnNames; + } + + public void setPkColumnNames(final Set<String> pkColumnNames) + { + this.pkColumnNames = pkColumnNames; + } + + public Set<String> getCounterColumns() + { + return counterColumns; + } + + public void setCounterColumns(final Set<String> counterColumns) + { + this.counterColumns = counterColumns; + } + + public Set<String> getCollectionColumns() + { + return collectionColumns; + } + + public void setCollectionColumns(final Set<String> collectionColumns) + { + this.collectionColumns = collectionColumns; + } + + public Set<String> getListColumns() + { + return listColumns; + } + + public void setListColumns(final Set<String> listColumns) + { + this.listColumns = listColumns; + } + + public Set<String> getMapColumns() + { + return mapColumns; + } + + public void setMapColumns(Set<String> mapColumns) + { + this.mapColumns = mapColumns; + } + + public Set<String> getSetColumns() + { + return setColumns; + } + + public void setSetColumns(Set<String> setColumns) + { + this.setColumns = setColumns; + } + + public Set<String> getUserDefinedTypeColumns() + { + return userDefinedTypeColumns; + } + + public void setUserDefinedTypeColumns(Set<String> userDefinedTypeColumns) + { + this.userDefinedTypeColumns = userDefinedTypeColumns; + } + + public Set<String> getRegularColumns() + { + return regularColumns; + } + + public void setRegularColumns(Set<String> regularColumns) + { + this.regularColumns = regularColumns; + } + + public Map<Long, PreparedStatement> getPreparedStatementTypes() + { + return preparedStatementTypes; + } + + public void setPreparedStatementTypes(Map<Long, PreparedStatement> preparedStatementTypes) + { + this.preparedStatementTypes = preparedStatementTypes; + } + + public Map<String, Object> getGetters() + { + return getters; + } + + public void setGetters(Map<String, Object> getters) + { + this.getters = getters; + } + + public ConnectionStateManager getConnectionStateManager() + { + return connectionStateManager; + } + + public void setConnectionStateManager(ConnectionStateManager connectionStateManager) + { + this.connectionStateManager = connectionStateManager; + } + + public WindowDataManager getWindowDataManager() + { + return windowDataManager; + } + + public void setWindowDataManager(WindowDataManager windowDataManager) + { + this.windowDataManager = windowDataManager; + } + + /*** + * Implementing concrete Operator instances define the Connection Builder properties by implementing this method + * Please refer to {@link com.datatorrent.contrib.cassandra.ConnectionStateManager.ConnectionBuilder} for + * an example implementation of the ConnectionStateManager instantiation. + * Note that if this method is returning null, the connection properties are + * fetched from a properties file loaded from the classpath. + * @return The connection state manager that is to be used for this Operator. + */ + public ConnectionStateManager.ConnectionBuilder withConnectionBuilder() + { + return null; + } + + /** + * The implementing concrete operator needs to implement this map. The key is the name of the cassandra column + * that this codec is used to map to. The TypeCode is used to represent the codec for that column in cassandra + * Please refer to test example UserUpsertOperator.java for implementation. + * Concrete implementations can return a null if there are no user defined types + * @return A map giving column name to the codec to be used + */ + public abstract Map<String, TypeCodec> getCodecsForUserDefinedTypes(); + + /** + * Defines the Pojo class that is used to represent the row in the table that is set in the ConnectionStateManager + * instance. The Class that is returned here should match the template type of the execution context + * {@link UpsertExecutionContext} + * @return The class that is used as the payload of the Execution context. + */ + public abstract Class getPayloadPojoClass(); + + /** + * Concrete implementations can override this method to provide a custom map of a POJO file name to the cassandra + * column name. This is useful when POJOs that are acting as payloads + * 1. Cannot comply with code conventions of POJO as opposed to cassandra column names Ex: Cassandra column names + * might have underscores and POJO fields might not be in that format. + * It may be noted case sensitivity is ignored when trying to match Cassandra Column names + * {@code + * @Override + protected Map<String, String> getPojoFieldNameToCassandraColumnNameOverride() + { + Map<String,String> overridingColMap = new HashMap<>(); + overridingColMap.put("topScores","top_scores"); // topScores is POJO field name and top_scores is Cassandra col + return overridingColMap; + } + * + * } + * @return A map of the POJO field name as key and value as the Cassandra Column name + */ + protected Map<String,String> getPojoFieldNameToCassandraColumnNameOverride() + { + return new HashMap<>(); + } + + /** + * + * Since Cassandra is not a strictly transactional system and if the Apex operator crashes when a window is in + * transit, we might be replaying the same data to be written to cassandra. In the event of such situations, we + * would like the control to the concrete operator instance to resolve if they want to let the write happen + * or simply skip it. Return true if the write needs to go through or return false to prevent the write + * from happening. + * Note that this check only happens for one window of data when an operator is resuming from a previous start + * In the case of a restart from a previously checkpointed window, the operator simply runs in a "safe mode" + * until it reaches the reconciliation window. This is the only window in which this method is called. Hence it + * might be okay if this method is "heavy". For example the implementor can choose to read from cassandra for the + * incoming record key entry and decide to let the write through or ignore it completely. This is on a per tuple + * basis just for the reconciliation window only. Post reconciliation window, the data simply flows through + * without this check. + * @param T + * @param windowId + * @return Whether the current POJO that is being passed in should be allowed to write into the cassandra row just for + * the reconciling window phase + ***/ + + abstract boolean reconcileRecord(Object T, long windowId); + + enum OperationContext + { + UNDEFINED, + COLLECTIONS_APPEND, + COLLECTIONS_REMOVE, + LIST_APPEND, + LIST_PREPEND, + TTL_SET, + TTL_NOT_SET, + IF_EXISTS_CHECK_PRESENT, + IF_EXISTS_CHECK_ABSENT, + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/664257b4/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java index 9c56178..f43777f 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java @@ -345,7 +345,7 @@ public class CassandraPOJOInputOperator extends AbstractCassandraInputOperator<O ((Setter<Object, List<?>>)setters.get(i)).set(obj, list); break; case TIMESTAMP: - final Date date = row.getDate(columnName); + final Date date = new Date(row.getDate(columnName).getMillisSinceEpoch()); ((Setter<Object, Date>)setters.get(i)).set(obj, date); break; default: http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/664257b4/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOOutputOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOOutputOperator.java index 2d1fea3..a191bb0 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOOutputOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOOutputOperator.java @@ -295,7 +295,7 @@ public class CassandraPOJOOutputOperator extends AbstractCassandraTransactionabl break; case TIMESTAMP: final Date date = ((Getter<Object, Date>)getters.get(i)).get(tuple); - boundStmnt.setDate(i, date); + boundStmnt.setDate(i, LocalDate.fromMillisSinceEpoch(date.getTime())); break; default: throw new RuntimeException("unsupported data type " + type.getName()); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/664257b4/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPojoUtils.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPojoUtils.java b/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPojoUtils.java new file mode 100644 index 0000000..b1f5f4a --- /dev/null +++ b/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPojoUtils.java @@ -0,0 +1,316 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.cassandra; + +import java.math.BigDecimal; +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; + +import com.datastax.driver.core.BoundStatement; +import com.datastax.driver.core.DataType; +import com.datastax.driver.core.LocalDate; +import com.datastax.driver.core.TypeCodec; +import com.datatorrent.lib.util.PojoUtils; + +/*** + * Used to manage simple data type based getters for given cassandra columns + */ +public class CassandraPojoUtils +{ + /*** + * Resolves a getter that can be associated with the given field name in the Pojo matching to the given + * data type of cassandra + * @param tuplePayloadClass The tuple class that is used to build the getter from + * @param getterExpr The name of the field representing the getter that needs to be generated + * @param returnDataTypeOfGetter The Data type of the cassandra column + * @param userDefinedTypesClass A map that can provide for a UDT class given a column name + * @return The getter object that can be used to extract the value at runtime + */ + public static Object resolveGetterForField(Class tuplePayloadClass, String getterExpr, + DataType returnDataTypeOfGetter, Map<String,Class> userDefinedTypesClass) + { + Object getter = null; + switch (returnDataTypeOfGetter.getName()) { + case INT: + getter = PojoUtils.createGetterInt(tuplePayloadClass, getterExpr); + break; + case BIGINT: + case COUNTER: + getter = PojoUtils.createGetterLong(tuplePayloadClass, getterExpr); + break; + case ASCII: + case TEXT: + case VARCHAR: + getter = PojoUtils.createGetter(tuplePayloadClass, getterExpr, String.class); + break; + case BOOLEAN: + getter = PojoUtils.createGetterBoolean(tuplePayloadClass, getterExpr); + break; + case FLOAT: + getter = PojoUtils.createGetterFloat(tuplePayloadClass, getterExpr); + break; + case DOUBLE: + getter = PojoUtils.createGetterDouble(tuplePayloadClass, getterExpr); + break; + case DECIMAL: + getter = PojoUtils.createGetter(tuplePayloadClass, getterExpr, BigDecimal.class); + break; + case SET: + getter = PojoUtils.createGetter(tuplePayloadClass, getterExpr, Set.class); + break; + case MAP: + getter = PojoUtils.createGetter(tuplePayloadClass, getterExpr, Map.class); + break; + case LIST: + getter = PojoUtils.createGetter(tuplePayloadClass, getterExpr, List.class); + break; + case TIMESTAMP: + getter = PojoUtils.createGetter(tuplePayloadClass, getterExpr, Date.class); + break; + case UUID: + getter = PojoUtils.createGetter(tuplePayloadClass, getterExpr, UUID.class); + break; + case UDT: + getter = PojoUtils.createGetter(tuplePayloadClass, getterExpr, userDefinedTypesClass.get(getterExpr)); + break; + default: + getter = PojoUtils.createGetter(tuplePayloadClass, getterExpr, Object.class); + break; + } + return getter; + } + + /*** + * Populates a given bound statement column with a value give a POJO and the map representing the getters + * @param boundStatement The statement that needs to be populated with the value + * @param getters A map mapping the applicable getter for a given column name as key + * @param dataType The data type of the cassandra column name + * @param cassandraColName The name of the cassandra column + * @param pojoPayload The POJO from which the value needs to be extracted + * @param setNulls Whether nulls can be set explicitly + * @param codecsForCassandraColumnNames A map giving column name to codec with key as column name + */ + @SuppressWarnings(value = "unchecked") + public static void populateBoundStatementWithValue(BoundStatement boundStatement, Map<String,Object> getters, + DataType dataType, String cassandraColName, Object pojoPayload, boolean setNulls, + Map<String,TypeCodec> codecsForCassandraColumnNames) + { + switch (dataType.getName()) { + case BOOLEAN: + PojoUtils.GetterBoolean<Object> boolGetter = ((PojoUtils.GetterBoolean<Object>)getters + .get(cassandraColName)); + if (boolGetter != null) { + final boolean bool = boolGetter.get(pojoPayload); + boundStatement.setBool(cassandraColName, bool); + } else { + boundStatement.unset(cassandraColName); + } + break; + case INT: + PojoUtils.GetterInt<Object> inGetter = ((PojoUtils.GetterInt<Object>)getters.get(cassandraColName)); + if (inGetter != null) { + final int intValue = inGetter.get(pojoPayload); + boundStatement.setInt(cassandraColName, intValue); + } else { + boundStatement.unset(cassandraColName); + } + break; + case BIGINT: + case COUNTER: + PojoUtils.GetterLong<Object> longGetter = ((PojoUtils.GetterLong<Object>)getters.get(cassandraColName)); + if (longGetter != null) { + final long longValue = longGetter.get(pojoPayload); + boundStatement.setLong(cassandraColName, longValue); + } else { + boundStatement.unset(cassandraColName); + } + break; + case FLOAT: + PojoUtils.GetterFloat<Object> floatGetter = ((PojoUtils.GetterFloat<Object>)getters.get(cassandraColName)); + if (floatGetter != null) { + final float floatValue = floatGetter.get(pojoPayload); + boundStatement.setFloat(cassandraColName, floatValue); + } else { + boundStatement.unset(cassandraColName); + } + break; + case DOUBLE: + PojoUtils.GetterDouble<Object> doubleGetter = ((PojoUtils.GetterDouble<Object>)getters + .get(cassandraColName)); + if (doubleGetter != null) { + final double doubleValue = doubleGetter.get(pojoPayload); + boundStatement.setDouble(cassandraColName, doubleValue); + } else { + boundStatement.unset(cassandraColName); + } + break; + case DECIMAL: + PojoUtils.Getter<Object, BigDecimal> bigDecimalGetter = ((PojoUtils.Getter<Object, BigDecimal>)getters + .get(cassandraColName)); + if (bigDecimalGetter != null) { + final BigDecimal decimal = bigDecimalGetter.get(pojoPayload); + if (decimal == null) { + if (!setNulls) { + boundStatement.unset(cassandraColName); + } else { + boundStatement.setDecimal(cassandraColName, null); + } + } else { + boundStatement.setDecimal(cassandraColName, decimal); + } + } else { + boundStatement.unset(cassandraColName); + } + break; + case UUID: + PojoUtils.Getter<Object, UUID> uuidGetter = ((PojoUtils.Getter<Object, UUID>)getters.get(cassandraColName)); + if (uuidGetter != null) { + final UUID uuid = uuidGetter.get(pojoPayload); + if (uuid == null) { + if (!setNulls) { + boundStatement.unset(cassandraColName); + } else { + boundStatement.setUUID(cassandraColName, null); + } + } else { + boundStatement.setUUID(cassandraColName, uuid); + } + } else { + boundStatement.unset(cassandraColName); + } + break; + case ASCII: + case VARCHAR: + case TEXT: + PojoUtils.Getter<Object, String> stringGetter = ((PojoUtils.Getter<Object, String>)getters + .get(cassandraColName)); + if (stringGetter != null) { + final String ascii = stringGetter.get(pojoPayload); + if (ascii == null) { + if (!setNulls) { + boundStatement.unset(cassandraColName); + } else { + boundStatement.setString(cassandraColName, null); + } + } else { + boundStatement.setString(cassandraColName, ascii); + } + } else { + boundStatement.unset(cassandraColName); + } + break; + case SET: + PojoUtils.Getter<Object, Set<?>> getterForSet = ((PojoUtils.Getter<Object, Set<?>>)getters + .get(cassandraColName)); + if (getterForSet != null) { + final Set<?> set = getterForSet.get(pojoPayload); + if (set == null) { + if (!setNulls) { + boundStatement.unset(cassandraColName); + } else { + boundStatement.setSet(cassandraColName, null); + } + } else { + boundStatement.setSet(cassandraColName, set); + } + } else { + boundStatement.unset(cassandraColName); + } + break; + case MAP: + PojoUtils.Getter<Object, Map<?, ?>> mapGetter = ((PojoUtils.Getter<Object, Map<?, ?>>)getters + .get(cassandraColName)); + if (mapGetter != null) { + final Map<?, ?> map = mapGetter.get(pojoPayload); + if (map == null) { + if (!setNulls) { + boundStatement.unset(cassandraColName); + } else { + boundStatement.setMap(cassandraColName, null); + } + } else { + boundStatement.setMap(cassandraColName, map); + } + } else { + boundStatement.unset(cassandraColName); + } + break; + case LIST: + PojoUtils.Getter<Object, List<?>> listGetter = ((PojoUtils.Getter<Object, List<?>>)getters + .get(cassandraColName)); + if (listGetter != null) { + final List<?> list = listGetter.get(pojoPayload); + if (list == null) { + if (!setNulls) { + boundStatement.unset(cassandraColName); + } else { + boundStatement.setList(cassandraColName, null); + } + } else { + boundStatement.setList(cassandraColName, list); + } + } else { + boundStatement.unset(cassandraColName); + } + break; + case TIMESTAMP: + PojoUtils.Getter<Object, Date> dateGetter = ((PojoUtils.Getter<Object, Date>)getters.get(cassandraColName)); + if (dateGetter != null) { + final Date date = dateGetter.get(pojoPayload); + if (date == null) { + if (!setNulls) { + boundStatement.unset(cassandraColName); + } else { + boundStatement.setMap(cassandraColName, null); + } + } else { + boundStatement.setDate(cassandraColName, LocalDate.fromMillisSinceEpoch(date.getTime())); + } + } else { + boundStatement.unset(cassandraColName); + } + break; + case UDT: + PojoUtils.Getter<Object, Object> udtGetter = ((PojoUtils.Getter<Object, Object>)getters + .get(cassandraColName)); + if (udtGetter != null) { + final Object udtPayload = udtGetter.get(pojoPayload); + if (udtPayload == null) { + if (!setNulls) { + boundStatement.unset(cassandraColName); + } else { + boundStatement.setUDTValue(cassandraColName, null); + } + } else { + boundStatement.set(cassandraColName, udtPayload, codecsForCassandraColumnNames + .get(cassandraColName).getJavaType().getRawType()); + } + } else { + boundStatement.unset(cassandraColName); + } + break; + default: + throw new RuntimeException("Type not supported for " + dataType.getName()); + } + } +} + http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/664257b4/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPreparedStatementGenerator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPreparedStatementGenerator.java b/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPreparedStatementGenerator.java new file mode 100644 index 0000000..355fccf --- /dev/null +++ b/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPreparedStatementGenerator.java @@ -0,0 +1,287 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.cassandra; + + +import java.util.EnumSet; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datastax.driver.core.DataType; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.Session; + +/*** + * Used to generate CQL strings that can be used to generate prepared statements. + */ +public class CassandraPreparedStatementGenerator +{ + + private Set<String> pkColumnNames; + + private Set<String> counterColumns; + + private Set<String> listColumns; + + private Set<String> mapColumns; + + private Set<String> setColumns; + + private Map<String, DataType> columnDefinitions; + + private static final transient Logger LOG = LoggerFactory.getLogger(CassandraPreparedStatementGenerator.class); + + public static final String TTL_PARAM_NAME = "ttl"; + + public CassandraPreparedStatementGenerator(Set<String> pkColumnNames, Set<String> counterColumns, + Set<String> listColumns, Set<String> mapColumns, Set<String> setColumns, + Map<String, DataType> columnDefinitions) + { + this.pkColumnNames = pkColumnNames; + this.counterColumns = counterColumns; + this.listColumns = listColumns; + this.mapColumns = mapColumns; + this.setColumns = setColumns; + this.columnDefinitions = columnDefinitions; + } + + public void generatePreparedStatements(Session session,Map<Long, PreparedStatement> preparedStatementTypes, + String keyspaceName,String tableName) + { + + Map<Long, String> stringsWithoutPKAndExistsClauses = generatePreparedStatementsQueryStrings(keyspaceName,tableName); + String ifExistsClause = " IF EXISTS"; + Map<Long, String> finalSetOfQueryStrings = new HashMap<>(); + for (Long currentIndexPos : stringsWithoutPKAndExistsClauses.keySet()) { + StringBuilder aQueryStub = new StringBuilder(stringsWithoutPKAndExistsClauses.get(currentIndexPos)); + buildWhereClauseForPrimaryKeys(aQueryStub); + finalSetOfQueryStrings.put(currentIndexPos + + getSlotIndexForMutationContextPreparedStatement(EnumSet.of( + AbstractUpsertOutputOperator.OperationContext.IF_EXISTS_CHECK_ABSENT)), + aQueryStub.toString()); + if (counterColumns.size() == 0) { + // IF exists cannot be used for counter column tables + finalSetOfQueryStrings.put(currentIndexPos + + getSlotIndexForMutationContextPreparedStatement(EnumSet.of( + AbstractUpsertOutputOperator.OperationContext.IF_EXISTS_CHECK_PRESENT)), + aQueryStub.toString() + ifExistsClause); + } + } + for (Long currentIndexPos : finalSetOfQueryStrings.keySet()) { + String currentQueryStr = finalSetOfQueryStrings.get(currentIndexPos); + LOG.info("Registering query support for " + currentQueryStr); + PreparedStatement preparedStatementForThisQuery = session.prepare(currentQueryStr); + preparedStatementTypes.put(currentIndexPos, preparedStatementForThisQuery); + } + } + + private void buildWhereClauseForPrimaryKeys(final StringBuilder queryExpression) + { + queryExpression.append(" WHERE "); + int count = 0; + for (String pkColName : pkColumnNames) { + if (count > 0) { + queryExpression.append(" AND "); + } + count += 1; + queryExpression.append(" ").append(pkColName).append(" = :").append(pkColName); + } + } + + + private void buildQueryStringForTTLSetCollectionsAppendAndListPrepend(StringBuilder updateQueryRoot, + String ttlSetString, Map<Long,String> queryStrings) + { + // TTL set , Collections Append , List prepend + StringBuilder queryExpTTLSetCollAppendListPrepend = new StringBuilder(updateQueryRoot.toString()); + queryExpTTLSetCollAppendListPrepend.append(ttlSetString); + buildNonPKColumnsExpression(queryExpTTLSetCollAppendListPrepend, + UpsertExecutionContext.ListPlacementStyle.PREPEND_TO_EXISTING_LIST, + UpsertExecutionContext.CollectionMutationStyle.ADD_TO_EXISTING_COLLECTION); + queryStrings.put(getSlotIndexForMutationContextPreparedStatement(EnumSet.of( + AbstractUpsertOutputOperator.OperationContext.TTL_SET, + AbstractUpsertOutputOperator.OperationContext.COLLECTIONS_APPEND, + AbstractUpsertOutputOperator.OperationContext.LIST_PREPEND + )), queryExpTTLSetCollAppendListPrepend.toString()); + } + + private void buildQueryStringForTTLSetCollectionsAppendAndListAppend(StringBuilder updateQueryRoot, + String ttlSetString,Map<Long,String> queryStrings) + { + // TTL set , Collections Append , List append + StringBuilder queryExpTTLSetCollAppendListAppend = new StringBuilder(updateQueryRoot.toString()); + queryExpTTLSetCollAppendListAppend.append(ttlSetString); + buildNonPKColumnsExpression(queryExpTTLSetCollAppendListAppend, + UpsertExecutionContext.ListPlacementStyle.APPEND_TO_EXISTING_LIST, + UpsertExecutionContext.CollectionMutationStyle.ADD_TO_EXISTING_COLLECTION); + queryStrings.put(getSlotIndexForMutationContextPreparedStatement(EnumSet.of( + AbstractUpsertOutputOperator.OperationContext.TTL_SET, + AbstractUpsertOutputOperator.OperationContext.COLLECTIONS_APPEND, + AbstractUpsertOutputOperator.OperationContext.LIST_APPEND + )), queryExpTTLSetCollAppendListAppend.toString()); + } + + private void buildQueryStringForTTLSetCollectionsRemove(StringBuilder updateQueryRoot, + String ttlSetString,Map<Long,String> queryStrings) + { + // TTL set , Collections Remove + StringBuilder queryExpTTLSetCollRemove = new StringBuilder(updateQueryRoot.toString()); + queryExpTTLSetCollRemove.append(ttlSetString); + buildNonPKColumnsExpression(queryExpTTLSetCollRemove, + UpsertExecutionContext.ListPlacementStyle.APPEND_TO_EXISTING_LIST, // Just in case user sets it + UpsertExecutionContext.CollectionMutationStyle.REMOVE_FROM_EXISTING_COLLECTION); + queryStrings.put(getSlotIndexForMutationContextPreparedStatement(EnumSet.of( + AbstractUpsertOutputOperator.OperationContext.TTL_SET, + AbstractUpsertOutputOperator.OperationContext.COLLECTIONS_REMOVE, + AbstractUpsertOutputOperator.OperationContext.LIST_APPEND + )), queryExpTTLSetCollRemove.toString()); + } + + private void buildQueryStringForTTLNotSetCollectionsAppendWithListPrepend(StringBuilder updateQueryRoot, + Map<Long,String> queryStrings) + { + // TTL Not set , Collections Append , List prepend + StringBuilder queryExpTTLNotSetCollAppendListPrepend = new StringBuilder(updateQueryRoot.toString()); + queryExpTTLNotSetCollAppendListPrepend.append(" SET "); + buildNonPKColumnsExpression(queryExpTTLNotSetCollAppendListPrepend, + UpsertExecutionContext.ListPlacementStyle.PREPEND_TO_EXISTING_LIST, + UpsertExecutionContext.CollectionMutationStyle.ADD_TO_EXISTING_COLLECTION); + queryStrings.put(getSlotIndexForMutationContextPreparedStatement(EnumSet.of( + AbstractUpsertOutputOperator.OperationContext.TTL_NOT_SET, + AbstractUpsertOutputOperator.OperationContext.COLLECTIONS_APPEND, + AbstractUpsertOutputOperator.OperationContext.LIST_PREPEND + )), queryExpTTLNotSetCollAppendListPrepend.toString()); + } + + private void buildQueryStringForTTLNotSetCollectionsAppendWithListAppend(StringBuilder updateQueryRoot, + Map<Long,String> queryStrings) + { + // TTL Not set , Collections Append , List append + StringBuilder queryExpTTLNotSetCollAppendListAppend = new StringBuilder(updateQueryRoot.toString()); + queryExpTTLNotSetCollAppendListAppend.append(" SET "); + buildNonPKColumnsExpression(queryExpTTLNotSetCollAppendListAppend, + UpsertExecutionContext.ListPlacementStyle.APPEND_TO_EXISTING_LIST, + UpsertExecutionContext.CollectionMutationStyle.ADD_TO_EXISTING_COLLECTION); + queryStrings.put(getSlotIndexForMutationContextPreparedStatement(EnumSet.of( + AbstractUpsertOutputOperator.OperationContext.TTL_NOT_SET, + AbstractUpsertOutputOperator.OperationContext.COLLECTIONS_APPEND, + AbstractUpsertOutputOperator.OperationContext.LIST_APPEND + )), queryExpTTLNotSetCollAppendListAppend.toString()); + } + + + private void buildQueryStringForTTLNotSetCollectionsRemove(StringBuilder updateQueryRoot, + Map<Long,String> queryStrings) + { + // TTL Not set , Collections Remove + StringBuilder queryExpTTLNotSetCollRemove = new StringBuilder(updateQueryRoot.toString()); + queryExpTTLNotSetCollRemove.append(" SET "); + buildNonPKColumnsExpression(queryExpTTLNotSetCollRemove, + UpsertExecutionContext.ListPlacementStyle.APPEND_TO_EXISTING_LIST, // Just in case user sets it + UpsertExecutionContext.CollectionMutationStyle.REMOVE_FROM_EXISTING_COLLECTION); + queryStrings.put(getSlotIndexForMutationContextPreparedStatement(EnumSet.of( + AbstractUpsertOutputOperator.OperationContext.TTL_NOT_SET, + AbstractUpsertOutputOperator.OperationContext.COLLECTIONS_REMOVE, + AbstractUpsertOutputOperator.OperationContext.LIST_APPEND + )), queryExpTTLNotSetCollRemove.toString()); + } + + private Map<Long, String> generatePreparedStatementsQueryStrings(String keyspaceName,String tableName) + { + Map<Long, String> queryStrings = new HashMap<>(); + //UPDATE keyspace_name.table_name USING option AND option SET assignment, assignment, ... WHERE row_specification + StringBuilder updateQueryRoot = new StringBuilder(" UPDATE " + keyspaceName + + "." + tableName + " "); + String ttlSetString = " USING ttl :" + TTL_PARAM_NAME + " SET "; + buildQueryStringForTTLSetCollectionsAppendAndListPrepend(updateQueryRoot,ttlSetString,queryStrings); + buildQueryStringForTTLSetCollectionsAppendAndListAppend(updateQueryRoot,ttlSetString,queryStrings); + buildQueryStringForTTLSetCollectionsRemove(updateQueryRoot,ttlSetString,queryStrings); + buildQueryStringForTTLNotSetCollectionsAppendWithListPrepend(updateQueryRoot,queryStrings); + buildQueryStringForTTLNotSetCollectionsAppendWithListAppend(updateQueryRoot,queryStrings); + buildQueryStringForTTLNotSetCollectionsRemove(updateQueryRoot,queryStrings); + return queryStrings; + } + + + public static long getSlotIndexForMutationContextPreparedStatement( + final EnumSet<AbstractUpsertOutputOperator.OperationContext> context) + { + Iterator<AbstractUpsertOutputOperator.OperationContext> itrForContexts = context.iterator(); + long indexValue = 0; + while (itrForContexts.hasNext()) { + AbstractUpsertOutputOperator.OperationContext aContext = itrForContexts.next(); + indexValue += Math.pow(10, aContext.ordinal()); + } + return indexValue; + } + + private void buildNonPKColumnsExpression(final StringBuilder queryExpression, + UpsertExecutionContext.ListPlacementStyle listPlacementStyle, + UpsertExecutionContext.CollectionMutationStyle collectionMutationStyle) + { + int count = 0; + for (String colNameEntry : columnDefinitions.keySet()) { + if (pkColumnNames.contains(colNameEntry)) { + continue; + } + if (count > 0) { + queryExpression.append(","); + } + count += 1; + if (counterColumns.contains(colNameEntry)) { + queryExpression.append(" " + colNameEntry + " = " + colNameEntry + " + :" + colNameEntry); + continue; + } + DataType dataType = columnDefinitions.get(colNameEntry); + if ((!dataType.isCollection()) && (!counterColumns.contains(colNameEntry))) { + queryExpression.append(" " + colNameEntry + " = :" + colNameEntry); + continue; + } + if ((dataType.isCollection()) && (!dataType.isFrozen())) { + if (collectionMutationStyle == UpsertExecutionContext.CollectionMutationStyle.REMOVE_FROM_EXISTING_COLLECTION) { + queryExpression.append(" " + colNameEntry + " = " + colNameEntry + " - :" + colNameEntry); + } + if (collectionMutationStyle == UpsertExecutionContext.CollectionMutationStyle.ADD_TO_EXISTING_COLLECTION) { + if ((setColumns.contains(colNameEntry)) || (mapColumns.contains(colNameEntry))) { + queryExpression.append(" " + colNameEntry + " = " + colNameEntry + " + :" + colNameEntry); + } + if ((listColumns.contains(colNameEntry)) && + (listPlacementStyle == UpsertExecutionContext.ListPlacementStyle.APPEND_TO_EXISTING_LIST)) { + queryExpression.append(" " + colNameEntry + " = " + colNameEntry + " + :" + colNameEntry); + } + if ((listColumns.contains(colNameEntry)) && + (listPlacementStyle == UpsertExecutionContext.ListPlacementStyle.PREPEND_TO_EXISTING_LIST)) { + queryExpression.append(" " + colNameEntry + " = :" + colNameEntry + " + " + colNameEntry); + } + } + } else { + if ((dataType.isCollection()) && (dataType.isFrozen())) { + queryExpression.append(" " + colNameEntry + " = :" + colNameEntry); + } + } + } + } + +} +
