Repository: apex-malhar Updated Branches: refs/heads/master a017dfaa4 -> c46398f11
APEXMALHAR-2432 fix javadoc for cassandra upsert operator Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/c46398f1 Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/c46398f1 Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/c46398f1 Branch: refs/heads/master Commit: c46398f11fef77af25391ac48d03075f00e61d74 Parents: a017dfa Author: Ananth <[email protected]> Authored: Mon Mar 20 06:54:32 2017 +1100 Committer: Ananth <[email protected]> Committed: Mon Mar 20 06:54:32 2017 +1100 ---------------------------------------------------------------------- .../cassandra/AbstractUpsertOutputOperator.java | 136 +++++++++++-------- .../cassandra/ConnectionStateManager.java | 48 ++++--- .../cassandra/UpsertExecutionContext.java | 24 ++-- 3 files changed, 119 insertions(+), 89 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c46398f1/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractUpsertOutputOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractUpsertOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractUpsertOutputOperator.java index b790492..f287385 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractUpsertOutputOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractUpsertOutputOperator.java @@ -69,61 +69,84 @@ import com.datatorrent.common.util.BaseOperator; import static com.google.common.base.Preconditions.checkNotNull; /** - * An abstract operator that is used to mutate cassandra rows using PreparedStatements for faster executions - * and accommodates EXACTLY_ONCE Semantics if concrete implementations choose to implement an abstract method with - * meaningful implementation (as Cassandra is not a pure transactional database , the burden is on the concrete - * implementation of the operator ONLY during the reconciliation window (and not for any other windows). + * <p>An abstract operator that is used to mutate cassandra rows using PreparedStatements for faster executions. + * It accommodates EXACTLY_ONCE Semantics if concrete implementations choose to implement an abstract method with + * meaningful implementation. Cassandra not being a pure transactional database , the burden is on the concrete + * implementation of the operator to handle these semantics for EXACTLY_ONCE scenarios. + * It may also be noted that the transaction previously committed check is ONLY invoked + * during the reconciliation window (and not for any other windows).</p> * + * <p> * The typical implementation model is as follows : - * 1. Create a concrete implementation of this class by extending this class and implement a few methods. - * 2. Define the payload that is the POJO that represents a Cassandra Row is part of this execution context - * {@link UpsertExecutionContext}. The payload is a template Parameter of this class - * 3. The Upstream operator that wants to write to Cassandra does the following - * a. Create an instance of {@link UpsertExecutionContext} - * b. Set the payload ( an instance of the POJO created as step two above ) - * c. Set additional execution context parameters like CollectionHandling style, List placement Styles - * overriding TTLs, Update only if Primary keys exist and Consistency Levels etc. - * 4. The concrete implementation would then execute this context as a cassandra row mutation + * <ol> + * <li> Create a concrete implementation of this class by extending this class and implementing a few methods.</li> + * <li> Define the payload ( a POJO ) that represents a Cassandra Row and is also part of this execution context </li> + * {@link UpsertExecutionContext}. The payload POJO is a template Parameter of this class + * <li> The Upstream operator that wants to write to Cassandra does the following </li> <ol type="a"> + * <li> Create an instance of {@link UpsertExecutionContext} </li> + * <li> Set the payload ( an instance of the POJO created as step two above )</li> + * <li> Set additional execution context parameters like CollectionHandling style, List placement Styles </li> + * overriding TTLs, Update only if Primary keys exist and Consistency Levels etc. </ol> * + * <li> The concrete implementation would then execute this context as a cassandra row mutation </li> + * </ol> + * </p> + * + * <p>Please refer unit tests in the source repository for some concrete examples of usage.</p> + * + * + * <p> * This operator supports the following features - * 1. Highly customizable Connection policies. This is achieved by specifying the ConnectionStateManager. + * <ol> + * <li> The user need not surface multiple operator instances for multiple update patterns. The incoming POJO that + * is part of the UpsertExecutionContext tuple is automatically interpreted along with the context to account for + * the right Update Statement to be executed.</li> + * <li>Highly customizable Connection policies. This is achieved by specifying the ConnectionStateManager. * There are quite a few connection management aspects that can be * controlled via {@link ConnectionStateManager} like consistency, load balancing, connection retries, - * table to use, keyspace to use etc. Please refer javadoc of {@link ConnectionStateManager} - * 2. Support for Collections : Map, List and Sets are supported - * User Defined types as part of collections is also supported. - * 3. Support exists for both adding to an existing collection or removing entries from an existing collection. - * The POJO field that represents a collection is used to represent the collection that is added or removed. + * table to use, keyspace to use etc. Please refer javadoc of {@link ConnectionStateManager} </li> + * <li> Support for Collections : Map, List and Sets are supported + * User Defined types as part of collections are also supported. </li> + * <li> Support exists for both adding to an existing collection or removing entries from an existing collection. + * The POJO field that represents a collection is used to represent the collection that is to be added or removed + * from the database table. * Thus this can be used to avoid a pattern of read and then write the final value into the cassandra column - * which can be used for low latency / high write pattern applications as we can avoid a read in the process. - * 4. Supports List Placements : The execution context can be used to specify where the new incoming list + * which can be used for low latency / high write pattern applications as we can avoid a read in the process. </li> + * <li> Supports List Placements : The execution context can be used to specify where the new incoming list * is to be added ( in case there is an existing list in the current column of the current row being mutated. - * Supported options are APPEND or PREPEND to an existing list - * 5. Support for User Defined Types. A pojo can have fields that represent the Cassandra Columns that are custom + * Supported options are APPEND or PREPEND to an existing list </li> + * <li> Support for User Defined Types. A pojo can have fields that represent the Cassandra Columns that are custom * user defined types. Concrete implementations of the operator provide a mapping of the cassandra column name - * to the TypeCodec that is to be used for that field inside cassandra. Please refer javadocs of - * {@link this.getCodecsForUserDefinedTypes() } for more details - * 6. Support for custom mapping of POJO payload field names to that of cassandra columns. Practically speaking, + * to the TypeCodec that is to be used for that field inside cassandra. Please refer javadoc of the method + * getCodecsForUserDefinedTypes() in this class for more details </li> + * <li> Support for custom mapping of POJO payload field names to that of cassandra columns. Practically speaking, * POJO field names might not always match with Cassandra Column names and hence this support. This will also avoid * writing a POJO just for the cassandra operator and thus an existing POJO can be passed around to this operator. - * Please refer javadoc {@link this.getPojoFieldNameToCassandraColumnNameOverride()} for an example - * 7. TTL support - A default TTL can be set for the Connection ( via {@link ConnectionStateManager} and then used - * for all mutations. This TTL can further be overridden at a tuple execution level to accomodate use cases of - * setting custom column expirations typically useful in wide row implementations. - * 8. Support for Counter Column tables. Counter tables are also supported with the values inside the incoming - * POJO added/subtracted from the counter column accordingly. Please note that the value is not absolute set but - * rather representing the value that needs to be added to or subtracted from the current counter. - * 9. Support for Composite Primary Keys is also supported. All the POJO fields that map to the composite - * primary key are used to resolve the primary key in case of a Composite Primary key table - * 10. Support for conditional updates : This operator can be used as an Update Only operator as opposed to an + * Please refer javadoc {@link this.getPojoFieldNameToCassandraColumnNameOverride()} for an example </li> + * <li> TTL support - A default TTL can be set for the Connection ( via {@link ConnectionStateManager} ) and then used + * for all mutations. This TTL can further be overridden at a tuple execution level to accommodate use cases of + * setting custom column expiry typically useful in wide row implementations. </li> + * <li> Support for Counter Column tables. Counter tables are also supported with the values inside the incoming + * POJO added/subtracted from the counter column accordingly.Please note that the value is not an absolute overwrite. + * Rather the pojo column value represents the value that needs to be added to or subtracted from the current + * counter. </li> + * <li> Support for Composite Primary Keys is also supported. All the POJO fields that map to the composite + * primary key are used to resolve the primary key in case of a Composite Primary key table </li> + * <li> Support for conditional updates : This operator can be used as an Update Only operator as opposed to an * Upsert operator. i.e. Update only IF EXISTS . This is achieved by setting the appropriate boolean in the - * {@link UpsertExecutionContext} tuple that is passed from the upstream operator. - * 11. Lenient mapping of POJO fields to Cassandra column names. By default the POJO field names are case insensitive + * {@link UpsertExecutionContext} tuple that is passed from the upstream operator.</li> + * <li> Lenient mapping of POJO fields to Cassandra column names. By default the POJO field names are case insensitive * to cassandra column names. This can be further enhanced by over-riding mappings. Please refer feature 6 above. - * 12. Defaults can be overridden at at tuple execution level for TTL & Consistency Policies - * 13. Support for handling Nulls i.e. whether null values in the POJO are to be persisted as is or to be ignored so - * that the application need not perform a read to populate a POJO field if it is not available in the context - * 14. A few autometrics are provided for monitoring the latency aspects of the cassandra cluster + * </li> + * <li> Defaults can be overridden at at tuple execution level for TTL & Consistency Policies </li> + * <li> Support for handling Nulls i.e. whether null values in the POJO are to be persisted as is or to be ignored so + * that the application need not perform a read to populate a POJO field if it is not available in the context</li> + * <li> A few autometrics are provided for monitoring the latency aspects of the cassandra cluster </li> + * </ol> + * </p> + * + * <b>Note that the operator also supports mutating only few columns of a given row (apart from the normal upsert use + * cases) if required. This is achieved by setting the appropriate directives in the UpsertExecutionContext tuple</b> * * @since 3.6.0 */ @@ -956,20 +979,21 @@ public abstract class AbstractUpsertOutputOperator extends BaseOperator implemen /** * Concrete implementations can override this method to provide a custom map of a POJO file name to the cassandra - * column name. This is useful when POJOs that are acting as payloads - * 1. Cannot comply with code conventions of POJO as opposed to cassandra column names Ex: Cassandra column names - * might have underscores and POJO fields might not be in that format. - * It may be noted case sensitivity is ignored when trying to match Cassandra Column names - * {@code - * @Override - protected Map<String, String> getPojoFieldNameToCassandraColumnNameOverride() - { - Map<String,String> overridingColMap = new HashMap<>(); - overridingColMap.put("topScores","top_scores"); // topScores is POJO field name and top_scores is Cassandra col - return overridingColMap; - } - * - * } + * column name. This is useful when POJOs that are acting as payloads cannot comply with code conventions of POJO + * as opposed to cassandra column names Ex: Cassandra column names might have underscores and POJO fields + * might not be in that format. It may be noted case sensitivity is ignored when trying to match Cassandra + * Column names in this operator. The following snippet provides an overview of the custom mapping that can be + * done by the concrete operator implementation if required. + * <p> + * <pre> + * protected Map<String, String> getPojoFieldNameToCassandraColumnNameOverride() + * { + * Map<String,String> overridingColMap = new HashMap<>(); + * overridingColMap.put("topScores","top_scores"); // topScores is POJO field name and top_scores is Cassandra col + * return overridingColMap; + * } + * </pre> + * </p> * @return A map of the POJO field name as key and value as the Cassandra Column name */ protected Map<String,String> getPojoFieldNameToCassandraColumnNameOverride() http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c46398f1/contrib/src/main/java/com/datatorrent/contrib/cassandra/ConnectionStateManager.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/cassandra/ConnectionStateManager.java b/contrib/src/main/java/com/datatorrent/contrib/cassandra/ConnectionStateManager.java index b4fd21f..f089137 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/cassandra/ConnectionStateManager.java +++ b/contrib/src/main/java/com/datatorrent/contrib/cassandra/ConnectionStateManager.java @@ -49,31 +49,37 @@ import com.datastax.driver.core.policies.TokenAwarePolicy; import static com.google.common.base.Preconditions.checkNotNull; /** - * This is used to specify the connection parameters that is used to process mutations for the + * <p>This is used to specify the connection parameters that is used to process mutations for the * {@link AbstractUpsertOutputOperator}. - * The Connection can only be set for one table in a given space and as such is used to define the following - * - Connection Parameters - * - Defaults to be used in case the execution context tuple {@link UpsertExecutionContext} does not want to - * set one explicitly. + * The Connection can only be set for one table in a given keyspace and as such is used to define the following + * <ul> + * <li>Connection Parameters</li> + * <li>Defaults to be used in case the execution context tuple {@link UpsertExecutionContext} does not want to + * set one explicitly.</li> + * </ul></p> + * <p> * Note that the {@link ConnectionBuilder} is used to build an instance of the ConnectionStateManager. * An instance of this class is typically instantiated by implementing the following inside * {@link AbstractUpsertOutputOperator} withConnectionBuilder() method. - * {@code - @Override - public ConnectionStateManager.ConnectionBuilder withConnectionBuilder() - { - return ConnectionStateManager.withNewBuilder() - .withSeedNodes("localhost") // format of host1:port;host2:port;host3:port - .withClusterNameAs("Test Cluster") // mandatory param - .withDCNameAs("datacenter1") // mandatory param - .withTableNameAs("users") // mandatory param - .withKeySpaceNameAs("unittests") // mandatory param - .withdefaultConsistencyLevel(ConsistencyLevel.LOCAL_ONE); // Set if required. Default of LOCAL_QUORUM - // Rest of the configs are initialized to sane defaults - } - * } - * Please refer {@link ConnectionBuilder} for details about parameters that can be used to define the connection - * and its default behaviour + * </p> + * <p> A typical implementation of the ConnectionBuilder would like this: + * <pre> + * + * public ConnectionStateManager.ConnectionBuilder withConnectionBuilder() + * { + * return ConnectionStateManager.withNewBuilder() + * .withSeedNodes("localhost") // format of host1:port;host2:port;host3:port + * .withClusterNameAs("Test Cluster") // mandatory param + * .withDCNameAs("datacenter1") // mandatory param + * .withTableNameAs("users") // mandatory param + * .withKeySpaceNameAs("unittests") // mandatory param + * .withdefaultConsistencyLevel(ConsistencyLevel.LOCAL_ONE); // Set if required. Default of LOCAL_QUORUM + * // Rest of the configs are initialized to sane defaults + * } + * </pre> + * </p> + * <p>Please refer {@link ConnectionBuilder} for details about parameters that can be used to define the connection + * and its default behaviour </p> * * @since 3.6.0 */ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c46398f1/contrib/src/main/java/com/datatorrent/contrib/cassandra/UpsertExecutionContext.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/cassandra/UpsertExecutionContext.java b/contrib/src/main/java/com/datatorrent/contrib/cassandra/UpsertExecutionContext.java index 65e4af2..92be546 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/cassandra/UpsertExecutionContext.java +++ b/contrib/src/main/java/com/datatorrent/contrib/cassandra/UpsertExecutionContext.java @@ -21,20 +21,20 @@ package com.datatorrent.contrib.cassandra; import com.datastax.driver.core.ConsistencyLevel; /** - * Each mutation in the cassandra table is decided by a context which can be passed to the Operator at runtime. + * <p>Each mutation in the cassandra table is decided by a context which can be passed to the Operator at runtime. * This class represents such a context. The context is to be set by the upstream operator and is used to * define how to mutate a row by passing it as a tuple to the {@link AbstractUpsertOutputOperator} - * The row to be mutated is represented by the payload represented by the template parameter T - * The following aspects of the mutation can be controlled by the context tuple. - * 1. Collection Mutation Style - * 2. List Placement style - * 3. Null Handling styles for partial mutations of a given row - * 4. Update only if a Primary Key exists - * 5. Override the TTL that is set at the default connection config. - * See {@link ConnectionStateManager.ConnectionBuilder} to set the default TTL for all payload executions - * 6. Override the default Consistency level to be used for the current mutation - * See {@link ConnectionStateManager.ConnectionBuilder} for setting default consistency. - * + * The row to be mutated is represented by the payload represented by the template parameter T</p> + * <p>The following aspects of the mutation can be controlled by the context tuple.<il> + * <li>Collection Mutation Style</li> + * <li>List Placement style</li> + * <li>Null Handling styles for partial mutations of a given row</li> + * <li>Update only if a Primary Key exists</li> + * <li>Override the TTL that is set at the default connection config. + * See {@link ConnectionStateManager.ConnectionBuilder} to set the default TTL for all payload executions</li> + * <li>Override the default Consistency level to be used for the current mutation + * See {@link ConnectionStateManager.ConnectionBuilder} for setting default consistency.</li> + * </il></p> * @since 3.6.0 */ public class UpsertExecutionContext<T>
