Repository: apex-malhar
Updated Branches:
  refs/heads/master 05a7ca3e4 -> 664257b4a


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/664257b4/contrib/src/main/java/com/datatorrent/contrib/cassandra/ConnectionStateManager.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/main/java/com/datatorrent/contrib/cassandra/ConnectionStateManager.java
 
b/contrib/src/main/java/com/datatorrent/contrib/cassandra/ConnectionStateManager.java
new file mode 100644
index 0000000..cfc2b46
--- /dev/null
+++ 
b/contrib/src/main/java/com/datatorrent/contrib/cassandra/ConnectionStateManager.java
@@ -0,0 +1,477 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.cassandra;
+
+import java.io.Serializable;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.Host;
+import com.datastax.driver.core.Metadata;
+import com.datastax.driver.core.ProtocolVersion;
+import com.datastax.driver.core.QueryOptions;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy;
+import com.datastax.driver.core.policies.DowngradingConsistencyRetryPolicy;
+import com.datastax.driver.core.policies.ExponentialReconnectionPolicy;
+import com.datastax.driver.core.policies.LoadBalancingPolicy;
+import com.datastax.driver.core.policies.ReconnectionPolicy;
+import com.datastax.driver.core.policies.RetryPolicy;
+import com.datastax.driver.core.policies.TokenAwarePolicy;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * This is used to specify the connection parameters that is used to process 
mutations for the
+ * {@link AbstractUpsertOutputOperator}.
+ * The Connection can only be set for one table in a given space and as such 
is used to define the following
+ * - Connection Parameters
+ * - Defaults to be used in case the execution context tuple {@link 
UpsertExecutionContext} does not want to
+ * set one explicitly.
+ * Note that the {@link ConnectionBuilder} is used to build an instance of the 
ConnectionStateManager.
+ * An instance of this class is typically instantiated by implementing the 
following inside
+ * {@link AbstractUpsertOutputOperator} withConnectionBuilder() method.
+ * {@code
+      @Override
+      public ConnectionStateManager.ConnectionBuilder withConnectionBuilder()
+      {
+        return ConnectionStateManager.withNewBuilder()
+        .withSeedNodes("localhost") // format of 
host1:port;host2:port;host3:port
+        .withClusterNameAs("Test Cluster") // mandatory param
+        .withDCNameAs("datacenter1") // mandatory param
+        .withTableNameAs("users") // mandatory param
+        .withKeySpaceNameAs("unittests") // mandatory param
+        .withdefaultConsistencyLevel(ConsistencyLevel.LOCAL_ONE); // Set if 
required. Default of LOCAL_QUORUM
+        // Rest of the configs are initialized to sane defaults
+      }
+ * }
+ * Please refer {@link ConnectionBuilder} for details about parameters that 
can be used to define the connection
+ * and its default behaviour
+ */
[email protected]
+public class ConnectionStateManager implements AutoCloseable, Serializable
+{
+
+  private static final long serialVersionUID = -6024334738016015213L;
+  private static final long DEFAULT_MAX_DELAY_MS = 30000L;
+  private static final long DEFAULT_BASE_DELAY_MS = 10000L;
+  // Cassandra cluster name
+  private final String clusterName;
+  // Cassandra DC Name
+  private final String dcName;
+  // Seeds nodes. The format for specifying the host names are 
host1:port;host2:port
+  private final String seedNodesStr;
+  private final long baseDelayMs;
+  private final long maxDelayMs;
+  // Other operational constraints
+  private final transient LoadBalancingPolicy loadBalancingPolicy;
+  private final transient RetryPolicy retryPolicy;
+  private final transient QueryOptions queryOptions;
+  private final transient ReconnectionPolicy reconnectionPolicy;
+  private final transient ProtocolVersion protocolVersion;
+  private transient Logger LOG = 
LoggerFactory.getLogger(ConnectionStateManager.class);
+  // Connection specific config elements
+  private transient Cluster cluster;
+  // Session specific metadata
+  private transient Session session;
+  private String keyspaceName;
+  private String tableName;
+  private int defaultTtlInSecs;
+  private ConsistencyLevel defaultConsistencyLevel;
+  // Standard defaults
+  private boolean isTTLSet = false;
+
+  private ConnectionStateManager(final ConnectionBuilder connectionBuilder)
+  {
+    checkNotNull(connectionBuilder, "Connection Builder passed in as Null");
+    checkNotNull(connectionBuilder.clusterName, "Cluster Name not set for 
Cassandra");
+    checkNotNull(connectionBuilder.dcName, "DataCenter Name not set for 
Cassandra");
+    checkNotNull(connectionBuilder.seedNodesStr,
+        "Seed nodes not set for Cassandra. Pattern is host1:port;host2:port");
+    checkNotNull(connectionBuilder.keyspaceName, "Keyspace Name not set for 
Cassandra");
+    checkNotNull(connectionBuilder.tableName, "Table Name not set for 
Cassandra");
+    //Required params
+    this.clusterName = connectionBuilder.clusterName;
+    this.dcName = connectionBuilder.dcName;
+    this.seedNodesStr = connectionBuilder.seedNodesStr;
+    this.keyspaceName = connectionBuilder.keyspaceName;
+    this.tableName = connectionBuilder.tableName;
+
+    // optional params
+    if (connectionBuilder.maxDelayMs != null) {
+      maxDelayMs = connectionBuilder.maxDelayMs;
+    } else {
+      maxDelayMs = DEFAULT_MAX_DELAY_MS; // 30 seconds
+    }
+    if (connectionBuilder.baseDelayMs != null) {
+      baseDelayMs = connectionBuilder.baseDelayMs;
+    } else {
+      baseDelayMs = DEFAULT_BASE_DELAY_MS; // 10 seconds
+    }
+    if (connectionBuilder.defaultTtlInSecs != null) {
+      defaultTtlInSecs = connectionBuilder.defaultTtlInSecs;
+      isTTLSet = true;
+    }
+    if (connectionBuilder.defaultConsistencyLevel != null) {
+      defaultConsistencyLevel = connectionBuilder.defaultConsistencyLevel;
+    } else {
+      defaultConsistencyLevel = ConsistencyLevel.LOCAL_QUORUM;
+    }
+    if (connectionBuilder.loadBalancingPolicy != null) {
+      loadBalancingPolicy = connectionBuilder.loadBalancingPolicy;
+    } else {
+      loadBalancingPolicy = new TokenAwarePolicy(
+        DCAwareRoundRobinPolicy.builder()
+          .withLocalDc(dcName)
+          .build());
+    }
+    if (connectionBuilder.retryPolicy != null) {
+      this.retryPolicy = connectionBuilder.retryPolicy;
+    } else {
+      retryPolicy = DowngradingConsistencyRetryPolicy.INSTANCE;
+    }
+    if (connectionBuilder.queryOptions != null) {
+      this.queryOptions = connectionBuilder.queryOptions;
+    } else {
+      this.queryOptions = new 
QueryOptions().setConsistencyLevel(defaultConsistencyLevel);
+    }
+    if (connectionBuilder.reconnectionPolicy != null) {
+      this.reconnectionPolicy = connectionBuilder.reconnectionPolicy;
+    } else {
+      this.reconnectionPolicy = new ExponentialReconnectionPolicy(baseDelayMs, 
maxDelayMs);
+    }
+    if (connectionBuilder.protocolVersion != null) {
+      this.protocolVersion = connectionBuilder.protocolVersion;
+    } else {
+      this.protocolVersion = ProtocolVersion.NEWEST_SUPPORTED;
+    }
+    establishSessionWithPolicies();
+  }
+
+  public static ConnectionBuilder withNewBuilder()
+  {
+    return new ConnectionBuilder();
+  }
+
+  private void establishSessionWithPolicies()
+  {
+    Cluster.Builder clusterBuilder = Cluster.builder();
+    String[] seedNodesSplit = seedNodesStr.split(";");
+    Collection<InetAddress> allSeeds = new ArrayList<>();
+    Set<String> allKnownPorts = new HashSet<>();
+    for (String seedNode : seedNodesSplit) {
+      String[] nodeAndPort = seedNode.split(":");
+      if (nodeAndPort.length > 1) {
+        allKnownPorts.add(nodeAndPort[1]);
+      }
+      try {
+        allSeeds.add(InetAddress.getByName(nodeAndPort[0]));
+      } catch (UnknownHostException e) {
+        LOG.error(" Error while trying to initialize the seed brokers for the 
cassandra cluster " + e.getMessage(), e);
+      }
+    }
+    clusterBuilder = clusterBuilder.addContactPoints(allSeeds);
+    clusterBuilder
+      .withClusterName(clusterName)
+      // We can fail if all of the nodes die in the local DC
+      .withLoadBalancingPolicy(loadBalancingPolicy)
+      // Want Strong consistency
+      .withRetryPolicy(retryPolicy)
+      // Tolerate some nodes down
+      .withQueryOptions(queryOptions)
+      // Keep establishing connections after detecting a node down
+      .withReconnectionPolicy(reconnectionPolicy);
+    // Finally initialize the cluster info
+    if (allKnownPorts.size() > 0) {
+      int shortlistedPort = Integer.parseInt(allKnownPorts.iterator().next());
+      clusterBuilder = clusterBuilder.withPort(shortlistedPort);
+    }
+    cluster = clusterBuilder.build();
+    Metadata metadata = cluster.getMetadata();
+    LOG.info("Connected to cluster: \n" + metadata.getClusterName());
+    for (Host host : metadata.getAllHosts()) {
+      LOG.info(String.format("Datacenter: %s; Host: %s; Rack: %s\n",
+          host.getDatacenter(), host.getAddress(), host.getRack()));
+    }
+    session = cluster.connect(keyspaceName);
+  }
+
+  @Override
+  public void close()
+  {
+    if (session != null) {
+      session.close();
+    }
+    if (cluster != null) {
+      cluster.close();
+    }
+  }
+
+  public Cluster getCluster()
+  {
+    return cluster;
+  }
+
+  public void setCluster(Cluster cluster)
+  {
+    this.cluster = cluster;
+  }
+
+  public Session getSession()
+  {
+    return session;
+  }
+
+  public void setSession(Session session)
+  {
+    this.session = session;
+  }
+
+  public String getKeyspaceName()
+  {
+    return keyspaceName;
+  }
+
+  public void setKeyspaceName(String keyspaceName)
+  {
+    this.keyspaceName = keyspaceName;
+  }
+
+  public String getTableName()
+  {
+    return tableName;
+  }
+
+  public void setTableName(String tableName)
+  {
+    this.tableName = tableName;
+  }
+
+  public int getDefaultTtlInSecs()
+  {
+    return defaultTtlInSecs;
+  }
+
+  public void setDefaultTtlInSecs(int defaultTtlInSecs)
+  {
+    this.defaultTtlInSecs = defaultTtlInSecs;
+  }
+
+  public boolean isTTLSet()
+  {
+    return isTTLSet;
+  }
+
+  public void setTTLSet(boolean TTLSet)
+  {
+    isTTLSet = TTLSet;
+  }
+
+  public static class ConnectionBuilder
+  {
+
+    private String clusterName;
+    private String dcName;
+    private String seedNodesStr;
+    private Long baseDelayMs; // Class to enable check for nulls and is 
optional
+    private Long maxDelayMs; // Class to enable check for nulls and is optional
+
+    private String keyspaceName;
+    private String tableName;
+    private Integer defaultTtlInSecs; // Class to enable checks for nulls and 
is optional
+    private ConsistencyLevel defaultConsistencyLevel;
+
+    private LoadBalancingPolicy loadBalancingPolicy;
+    private RetryPolicy retryPolicy;
+    private QueryOptions queryOptions;
+    private ReconnectionPolicy reconnectionPolicy;
+    private ProtocolVersion protocolVersion;
+
+    public static final String CLUSTER_NAME_IN_PROPS_FILE = "cluster.name";
+    public static final String DC_NAME_IN_PROPS_FILE = "dc.name";
+    public static final String KEYSPACE_NAME_IN_PROPS_FILE = "keyspace.name";
+    public static final String TABLE_NAME_IN_PROPS_FILE = "table.name";
+    public static final String SEEDNODES_IN_PROPS_FILE = "seednodes";
+
+    public ConnectionBuilder withClusterNameAs(String clusterName)
+    {
+      this.clusterName = clusterName;
+      return this;
+    }
+
+    public ConnectionBuilder withDCNameAs(String dcName)
+    {
+      this.dcName = dcName;
+      return this;
+    }
+
+    /**
+     * Used to specify the seed nodes of the target cassandra cluster. Format 
is
+     * host1:port;host2:port;host3:port
+     * @param seedNodesStr
+     * @return The builder instance as initially created updated with this 
value
+       */
+    public ConnectionBuilder withSeedNodes(String seedNodesStr)
+    {
+      this.seedNodesStr = seedNodesStr;
+      return this;
+    }
+
+    /**
+     * Used to specify the base delay while trying to set a Connection attempt 
policy
+     * @param baseDelayMillis
+     * @return The builder instance as initially created updated with this 
value
+       */
+    public ConnectionBuilder withBaseDelayMillis(long baseDelayMillis)
+    {
+      this.baseDelayMs = baseDelayMillis;
+      return this;
+    }
+
+    /**
+     * Used to specify the maximum time that can elapse before which a 
connection is given up as a failure attmept
+     * @param maxDelayMillis
+     * @return The builder instance as initially created updated with this 
value
+       */
+    public ConnectionBuilder withMaxDelayMillis(long maxDelayMillis)
+    {
+      this.maxDelayMs = maxDelayMillis;
+      return this;
+    }
+
+    public ConnectionBuilder withKeySpaceNameAs(String keyspaceName)
+    {
+      this.keyspaceName = keyspaceName;
+      return this;
+    }
+
+    public ConnectionBuilder withTableNameAs(String tableName)
+    {
+      this.tableName = tableName;
+      return this;
+    }
+
+    public ConnectionBuilder withdefaultTTL(Integer defaultTtlInSecs)
+    {
+      this.defaultTtlInSecs = defaultTtlInSecs;
+      return this;
+    }
+
+    /**
+     * Used to specify the default consistency level when executing the 
mutations on the cluster.
+     * Default if not set is LOCAL_QUORUM. Can be overriden at the tuple level 
using {@link UpsertExecutionContext}
+     * @param consistencyLevel
+     * @return The builder instance as initially created updated with this 
value
+       */
+    public ConnectionBuilder withdefaultConsistencyLevel(ConsistencyLevel 
consistencyLevel)
+    {
+      this.defaultConsistencyLevel = consistencyLevel;
+      return this;
+    }
+
+    /**
+     * Used to define how the nodes in the cluster will be contacted for 
executing a mutation.
+     * The following is the default behaviour if not set.
+     * 1. Use a TokenAware approach i.e. the row key is used to decide the 
right node to execute the mutation
+     *    on the target cassandra node. i.e. One of the R-1 replicas is used 
as the coordinator node.
+     *    This effectively balances the traffic onto all nodes of the 
cassandra cluster for the given
+     *    Operator instance. Of course this assumes the keys are evenly 
distributed in the cluster
+     *    which is normally the case
+     * 2. Overlay TokenAware with DC aware approach - The above token aware 
approach is further segmented to use only
+     *    the local DC for the mutation executions. Cassandras multi-DC 
execution will take care of the cross DC
+     *    replication thus achieving the lowest possible latencies for the 
given mutation of writes.
+     *
+     * Using this effectively removes the need for an extra implementation of 
the Partitioning logic of the Operator
+     * Nor would we need any extra logic ( for most use cases ) for dynamic 
partitioning implementations as the
+     * underlying driver normalizes the traffic pattern anyways.
+     * @param loadBalancingPolicy
+     * @return The builder instance as initially created updated with this 
value
+       */
+    public ConnectionBuilder withLoadBalancingPolicy(LoadBalancingPolicy 
loadBalancingPolicy)
+    {
+      this.loadBalancingPolicy = loadBalancingPolicy;
+      return this;
+    }
+
+    /**
+     * Used to specify how queries will need to be retried in case the current 
in progress one fails.
+     * The default is to use a DowngradingConsistency Policy i.e. first 
LOCAL_QUORUM is attempted and if
+     * there is a failure because less than RF/2-1 nodes are alive, it 
automatically switches to the Consistency Level
+     * of LOCAL_ONE and so on. ( and hope that hint windows take care of the 
rest when the nodes come back up )
+     * @param retryPolicy
+     * @return The builder instance as initially created updated with this 
value
+       */
+    public ConnectionBuilder withRetryPolicy(RetryPolicy retryPolicy)
+    {
+      this.retryPolicy = retryPolicy;
+      return this;
+    }
+
+    /**
+     * Used to set various aspects for executing a given query / mutation.
+     * The default is to use  LOCAL_QUORUM consistency for all mutation queries
+     * @param queryOptions
+     * @return The builder instance as initially created updated with this 
value
+       */
+    public ConnectionBuilder withQueryOptions(QueryOptions queryOptions)
+    {
+      this.queryOptions = queryOptions;
+      return this;
+    }
+
+    /**
+     * Used to decide how to establish a connection to the cluster in case the 
current one fails.
+     * The default if not set is to use an ExponentialRetry Policy.
+     * The baseDelay and maxDelay are the two time windows that are used to 
specify the retry attempts
+     * in an exponential manner
+     * @param reconnectionPolicy
+     * @return The builder instance as initially created updated with this 
value
+     */
+    public ConnectionBuilder withReconnectionPolicy(ReconnectionPolicy 
reconnectionPolicy)
+    {
+      this.reconnectionPolicy = reconnectionPolicy;
+      return this;
+    }
+
+    public ConnectionBuilder withProtocolVersion(ProtocolVersion 
protocolVersion)
+    {
+      this.protocolVersion = protocolVersion;
+      return this;
+    }
+
+    protected ConnectionStateManager initialize()
+    {
+      ConnectionStateManager operatorConnection = new 
ConnectionStateManager(this);
+      return operatorConnection;
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/664257b4/contrib/src/main/java/com/datatorrent/contrib/cassandra/UpsertExecutionContext.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/main/java/com/datatorrent/contrib/cassandra/UpsertExecutionContext.java
 
b/contrib/src/main/java/com/datatorrent/contrib/cassandra/UpsertExecutionContext.java
new file mode 100644
index 0000000..66d0b6c
--- /dev/null
+++ 
b/contrib/src/main/java/com/datatorrent/contrib/cassandra/UpsertExecutionContext.java
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.cassandra;
+
+import com.datastax.driver.core.ConsistencyLevel;
+
+/**
+ * Each mutation in the cassandra table is decided by a context which can be 
passed to the Operator at runtime.
+ * This class represents such a context. The context is to be set by the 
upstream operator and is used to
+ * define how to mutate a row by passing it as a tuple to the {@link 
AbstractUpsertOutputOperator}
+ * The row to be mutated is represented by the payload represented by the 
template parameter T
+ * The following aspects of the mutation can be controlled by the context 
tuple.
+ *  1. Collection Mutation Style
+ *  2. List Placement style
+ *  3. Null Handling styles for partial mutations of a given row
+ *  4. Update only if a Primary Key exists
+ *  5. Override the TTL that is set at the default connection config.
+ *  See {@link ConnectionStateManager.ConnectionBuilder} to set the default 
TTL for all payload executions
+ *  6. Override the default Consistency level to be used for the current 
mutation
+ *  See {@link ConnectionStateManager.ConnectionBuilder} for setting default 
consistency.
+ */
+public class UpsertExecutionContext<T>
+{
+
+  private CollectionMutationStyle collectionMutationStyle = 
CollectionMutationStyle.UNDEFINED;
+
+
+  private ListPlacementStyle listPlacementStyle = ListPlacementStyle.UNDEFINED;
+
+  private NullHandlingMutationStyle nullHandlingMutationStyle = 
NullHandlingMutationStyle.UNDEFINED;
+
+  private int overridingTTL;
+
+  private ConsistencyLevel overridingConsistencyLevel;
+
+  private boolean isTtlOverridden = false;
+
+  private boolean isOverridingConsistencyLevelSet = false;
+
+  private boolean updateOnlyIfPrimaryKeyExists = false;
+
+  private T payload;
+
+  public T getPayload()
+  {
+    return payload;
+  }
+
+  public void setPayload(T payload)
+  {
+    this.payload = payload;
+  }
+
+  /**
+   * Represents if the collection object inside the pojo needs to be added or 
remove
+   * from the table column represented by the POJO field. Refer {@link 
CollectionMutationStyle} for possible value
+   * If set as UNDEFINED, the default behaviour is to add the entry to the 
collection column
+   * @return
+  */
+  public CollectionMutationStyle getCollectionMutationStyle()
+  {
+    return collectionMutationStyle;
+  }
+
+  public void setCollectionMutationStyle(CollectionMutationStyle 
collectionMutationStyle)
+  {
+    this.collectionMutationStyle = collectionMutationStyle;
+  }
+
+  /**
+   * Represents how the POJO List type field is mutated on the cassandra 
column which is of type
+   * list. Note that this is only applicable for list collections and decides 
whether the POJO element ( of type list)
+   * will be placed before
+   * @return The List Placement style that is used to execute this mutation. 
Defaults to Append mode if not set
+  */
+  public ListPlacementStyle getListPlacementStyle()
+  {
+    return listPlacementStyle;
+  }
+
+  public void setListPlacementStyle(ListPlacementStyle listPlacementStyle)
+  {
+    this.listPlacementStyle = listPlacementStyle;
+  }
+
+  /**
+   * This decides how to handle nulls for POJO fields. This comes in handy 
when one would want to
+   * avoid a read of the column and then mutate the row i.e. in other words if 
the POJO is representing only a subset
+   * of the columns that needs to be mutated on the table. In such scenarios, 
set NullHandlingMutation Style to
+   * IGNORE_NULL_COLUMNS which would make the operator avoid an update on the 
column and thus preventing over-writing
+   * an existing value
+   * @return The Null handling style that is going to be used for the 
execution context. Defaults to Set Nulls and not
+   * ignore them
+   * */
+  public NullHandlingMutationStyle getNullHandlingMutationStyle()
+  {
+    return nullHandlingMutationStyle;
+  }
+
+  public void setNullHandlingMutationStyle(NullHandlingMutationStyle 
nullHandlingMutationStyle)
+  {
+    this.nullHandlingMutationStyle = nullHandlingMutationStyle;
+  }
+
+  /**
+   * This decides if we want to override the default TTL if at all set in the
+   * {@link 
com.datatorrent.contrib.cassandra.ConnectionStateManager.ConnectionBuilder} 
that is used to execute a
+   * mutation. Note that TTLs are not mandatory for mutations.
+   * Also it is supported to have TTLs only for the current execution context 
but not set a default at the
+   * connection state manager level
+   * Unit of time is seconds
+   * @return The overriding TTL that will be used for the given execution 
context.
+     */
+  public int getOverridingTTL()
+  {
+    return overridingTTL;
+  }
+
+  public void setOverridingTTL(int overridingTTL)
+  {
+    this.overridingTTL = overridingTTL;
+    isTtlOverridden = true;
+  }
+
+  public boolean isTtlOverridden()
+  {
+    return isTtlOverridden;
+  }
+
+  /**
+   * Used to override the default consistency level that is set at the {@link 
ConnectionStateManager } level
+   * The default is to use LOCAL_QUORUM. This can be overridden at the 
execution context level on a per
+   * tuple basis.
+   * @return The consistency level that would be used to execute the current 
payload mutation
+  * */
+  public ConsistencyLevel getOverridingConsistencyLevel()
+  {
+    return overridingConsistencyLevel;
+  }
+
+  public void setOverridingConsistencyLevel(ConsistencyLevel 
overridingConsistencyLevel)
+  {
+    this.overridingConsistencyLevel = overridingConsistencyLevel;
+    isOverridingConsistencyLevelSet = true;
+  }
+
+  public boolean isOverridingConsistencyLevelSet()
+  {
+    return isOverridingConsistencyLevelSet;
+  }
+
+  public boolean isUpdateOnlyIfPrimaryKeyExists()
+  {
+    return updateOnlyIfPrimaryKeyExists;
+  }
+
+  /**
+   * Used to execute a mutation only if the primary key exists. This can be 
used to conditionally execute
+   * a mutation i.e. if only the primary key exists. This can be used to force 
an "UPDATE" only use case
+   * for the current mutation.
+   * @param updateOnlyIfPrimaryKeyExists
+  */
+  public void setUpdateOnlyIfPrimaryKeyExists(boolean 
updateOnlyIfPrimaryKeyExists)
+  {
+    this.updateOnlyIfPrimaryKeyExists = updateOnlyIfPrimaryKeyExists;
+  }
+
+  enum CollectionMutationStyle implements BaseMutationStyle
+  {
+    ADD_TO_EXISTING_COLLECTION,
+    REMOVE_FROM_EXISTING_COLLECTION,
+    UNDEFINED
+  }
+
+  enum ListPlacementStyle implements BaseMutationStyle
+  {
+    APPEND_TO_EXISTING_LIST,
+    PREPEND_TO_EXISTING_LIST,
+    UNDEFINED
+  }
+
+  enum NullHandlingMutationStyle implements BaseMutationStyle
+  {
+    IGNORE_NULL_COLUMNS,
+    SET_NULL_COLUMNS,
+    UNDEFINED
+  }
+
+  interface BaseMutationStyle
+  {
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/664257b4/contrib/src/test/java/com/datatorrent/contrib/cassandra/AbstractUpsertOutputOperatorCodecsTest.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/test/java/com/datatorrent/contrib/cassandra/AbstractUpsertOutputOperatorCodecsTest.java
 
b/contrib/src/test/java/com/datatorrent/contrib/cassandra/AbstractUpsertOutputOperatorCodecsTest.java
new file mode 100644
index 0000000..87c783f
--- /dev/null
+++ 
b/contrib/src/test/java/com/datatorrent/contrib/cassandra/AbstractUpsertOutputOperatorCodecsTest.java
@@ -0,0 +1,476 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.cassandra;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.lib.helper.OperatorContextTestHelper;
+import com.datatorrent.lib.helper.TestPortContext;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public class AbstractUpsertOutputOperatorCodecsTest
+{
+
+  /***
+   * The schema that is used
+   *
+   *
+   *
+   *
+   * CREATE KEYSPACE unittests
+   * WITH replication = {
+   * 'class' : 'SimpleStrategy',
+   * 'replication_factor' : 1
+   * };
+   *
+   * CREATE TYPE unittests.address (
+   * street text,
+   * city text,
+   * zip_code int,
+   * phones set<text>
+   * );
+   *
+   * CREATE TYPE unittests.fullname ( firstname text, lastname text );
+   *
+   * CREATE TABLE unittests.users (
+   * userid text PRIMARY KEY,
+   * username FROZEN<fullname>,
+   * emails set<text>,
+   * top_scores list<int>,
+   * todo map<timestamp, text>,
+   * siblings tuple<int, text,text>,
+   * currentaddress FROZEN<address>,
+   * previousnames FROZEN<list<fullname>>
+   * );
+   *
+   * CREATE TABLE unittests.userupdates (
+   * userid text PRIMARY KEY,
+   * updatecount counter
+   * );
+   *
+   * CREATE TABLE unittests.userstatus (
+   * userid text,
+   * day int,
+   * month int,
+   * year int,
+   * employeeid text,
+   * currentstatus text,
+   * PRIMARY KEY ((userid,day,month,year), employeeid));
+   */
+
+  public static final String APP_ID = "TestCassandraUpsertOperator";
+  public static final int OPERATOR_ID_FOR_USER_UPSERTS = 0;
+
+
+  UserUpsertOperator userUpsertOperator = null;
+  OperatorContextTestHelper.TestIdOperatorContext contextForUserUpsertOperator;
+  TestPortContext testPortContextForUserUpserts;
+
+
+  @Before
+  public void setupApexContexts() throws Exception
+  {
+    Attribute.AttributeMap.DefaultAttributeMap attributeMap = new 
Attribute.AttributeMap.DefaultAttributeMap();
+    attributeMap.put(DAG.APPLICATION_ID, APP_ID);
+    contextForUserUpsertOperator = new 
OperatorContextTestHelper.TestIdOperatorContext(OPERATOR_ID_FOR_USER_UPSERTS,
+            attributeMap);
+    userUpsertOperator = new UserUpsertOperator();
+
+    Attribute.AttributeMap.DefaultAttributeMap portAttributes = new 
Attribute.AttributeMap.DefaultAttributeMap();
+    portAttributes.put(Context.PortContext.TUPLE_CLASS, User.class);
+    testPortContextForUserUpserts = new TestPortContext(portAttributes);
+
+    userUpsertOperator.setup(contextForUserUpsertOperator);
+    userUpsertOperator.activate(contextForUserUpsertOperator);
+    userUpsertOperator.input.setup(testPortContextForUserUpserts);
+  }
+
+  @Test
+  public void testActivateForSchemaDetection() throws Exception
+  {
+    assertEquals(12, userUpsertOperator.getPreparedStatementTypes().size());
+    assertEquals(
+        " UPDATE unittests.users  SET  currentaddress = :currentaddress, 
emails = emails - :emails, " +
+        "todo = todo - :todo, top_scores = top_scores - :top_scores, siblings 
= :siblings, " +
+        "previousnames = :previousnames, username = :username WHERE  userid = 
:userid",
+        
userUpsertOperator.getPreparedStatementTypes().get(101001100L).getQueryString());
+    assertEquals(8, userUpsertOperator.getColumnDefinitions().size());
+    assertEquals(true, 
userUpsertOperator.getColumnDefinitions().get("currentaddress").isFrozen());
+    assertEquals(false, 
userUpsertOperator.getColumnDefinitions().get("currentaddress").isCollection());
+    assertEquals(true, 
userUpsertOperator.getColumnDefinitions().get("top_scores").isCollection());
+    assertEquals(true, 
userUpsertOperator.getColumnDefinitions().get("username").isFrozen());
+    assertEquals(false, 
userUpsertOperator.getColumnDefinitions().get("username").isCollection());
+  }
+
+  @Test
+  public void testForGetters() throws Exception
+  {
+    Map<String, Object> getters = userUpsertOperator.getGetters();
+    assertNotNull(getters);
+    assertEquals(7, getters.size());
+  }
+
+  @Test
+  public void testForSingleRowInsertWithCodecs() throws Exception
+  {
+    User aUser = new User();
+    aUser.setUserid("user" + System.currentTimeMillis());
+    FullName fullName = new FullName("first1" + System.currentTimeMillis(), 
"last1" + System.currentTimeMillis());
+    aUser.setUsername(fullName);
+    Address address = new Address("wer", "hjfh", 12, null);
+    aUser.setCurrentaddress(address);
+    UpsertExecutionContext<User> anUpdate = new UpsertExecutionContext<>();
+    anUpdate.setPayload(aUser);
+    userUpsertOperator.beginWindow(0);
+    userUpsertOperator.input.process(anUpdate);
+    userUpsertOperator.endWindow();
+
+    ResultSet results = userUpsertOperator.session.execute(
+        "SELECT * FROM unittests.users WHERE userid = '" + aUser.getUserid() + 
"'");
+    List<Row> rows = results.all();
+    assertEquals(rows.size(), 1);
+    assertTrue(results.isExhausted());
+  }
+
+  @Test
+  public void testForListAppend() throws Exception
+  {
+    User aUser = new User();
+    String userId = "user" + System.currentTimeMillis();
+    aUser.setUserid(userId);
+    FullName fullName = new FullName("first1" + System.currentTimeMillis(), 
"last1" + System.currentTimeMillis());
+    aUser.setUsername(fullName);
+    Address address = new Address("street1", "city1", 13, null);
+    aUser.setCurrentaddress(address);
+    Set<String> emails = new HashSet<>();
+    emails.add(new String("1"));
+    emails.add(new String("2"));
+    aUser.setEmails(emails);
+    List<Integer> topScores = new ArrayList<>();
+    topScores.add(1);
+    topScores.add(2);
+    aUser.setTopScores(topScores);
+    UpsertExecutionContext<User> originalEntry = new 
UpsertExecutionContext<>();
+    originalEntry.setPayload(aUser);
+
+    UpsertExecutionContext<User> subsequentUpdateForTopScores = new 
UpsertExecutionContext<>();
+    subsequentUpdateForTopScores.setListPlacementStyle(
+        UpsertExecutionContext.ListPlacementStyle.APPEND_TO_EXISTING_LIST);
+    subsequentUpdateForTopScores.setCollectionMutationStyle(
+        
UpsertExecutionContext.CollectionMutationStyle.ADD_TO_EXISTING_COLLECTION);
+    subsequentUpdateForTopScores.setNullHandlingMutationStyle(
+        UpsertExecutionContext.NullHandlingMutationStyle.IGNORE_NULL_COLUMNS);
+    User oldUser = new User();
+    oldUser.setUserid(userId);
+    List<Integer> topScoresAppended = new ArrayList<>();
+    topScoresAppended.add(3);
+    oldUser.setTopScores(topScoresAppended);
+    subsequentUpdateForTopScores.setPayload(oldUser);
+    userUpsertOperator.beginWindow(1);
+    userUpsertOperator.input.process(originalEntry);
+    userUpsertOperator.input.process(subsequentUpdateForTopScores);
+    userUpsertOperator.endWindow();
+
+    ResultSet results = userUpsertOperator.session.execute(
+        "SELECT * FROM unittests.users WHERE userid = '" + userId + "'");
+    List<Row> rows = results.all();
+    Row userRow = rows.get(0);
+    List<Integer> topScoresEntry = userRow.getList("top_scores", 
Integer.class);
+    assertEquals(3, topScoresEntry.size());
+    assertEquals("" + 3, "" + topScoresEntry.get(2));
+  }
+
+  @Test
+  public void testForListPrepend() throws Exception
+  {
+    User aUser = new User();
+    String userId = "user" + System.currentTimeMillis();
+    aUser.setUserid(userId);
+    FullName fullName = new FullName("first1" + System.currentTimeMillis(), 
"last1" + System.currentTimeMillis());
+    aUser.setUsername(fullName);
+    List<Integer> topScores = new ArrayList<>();
+    topScores.add(1);
+    topScores.add(2);
+    aUser.setTopScores(topScores);
+    UpsertExecutionContext<User> originalEntry = new 
UpsertExecutionContext<>();
+    originalEntry.setPayload(aUser);
+
+    UpsertExecutionContext<User> subsequentUpdateForTopScores = new 
UpsertExecutionContext<>();
+    subsequentUpdateForTopScores.setListPlacementStyle(
+        UpsertExecutionContext.ListPlacementStyle.PREPEND_TO_EXISTING_LIST);
+    subsequentUpdateForTopScores.setCollectionMutationStyle(
+        
UpsertExecutionContext.CollectionMutationStyle.ADD_TO_EXISTING_COLLECTION);
+    subsequentUpdateForTopScores.setNullHandlingMutationStyle(
+        UpsertExecutionContext.NullHandlingMutationStyle.IGNORE_NULL_COLUMNS);
+    User oldUser = new User();
+    oldUser.setUserid(userId);
+    List<Integer> topScoresAppended = new ArrayList<>();
+    topScoresAppended.add(3);
+    oldUser.setTopScores(topScoresAppended);
+    subsequentUpdateForTopScores.setPayload(oldUser);
+    userUpsertOperator.beginWindow(2);
+    userUpsertOperator.input.process(originalEntry);
+    userUpsertOperator.input.process(subsequentUpdateForTopScores);
+    userUpsertOperator.endWindow();
+
+    ResultSet results = userUpsertOperator.session.execute(
+        "SELECT * FROM unittests.users WHERE userid = '" + userId + "'");
+    List<Row> rows = results.all();
+    Row userRow = rows.get(0);
+    List<Integer> topScoresEntry = userRow.getList("top_scores", 
Integer.class);
+    assertEquals(3, topScoresEntry.size());
+    assertEquals("" + 3, "" + topScoresEntry.get(0));
+  }
+
+  @Test
+  public void testForCollectionRemoval() throws Exception
+  {
+    User aUser = new User();
+    String userId = "user" + System.currentTimeMillis();
+    aUser.setUserid(userId);
+    FullName fullName = new FullName("first12" + System.currentTimeMillis(), 
"last12" + System.currentTimeMillis());
+    aUser.setUsername(fullName);
+    Set<String> emails = new HashSet<>();
+    emails.add(new String("1"));
+    emails.add(new String("2"));
+    aUser.setEmails(emails);
+
+    UpsertExecutionContext<User> originalEntry = new 
UpsertExecutionContext<>();
+    originalEntry.setPayload(aUser);
+
+    UpsertExecutionContext<User> subsequentUpdateForEmails = new 
UpsertExecutionContext<>();
+    subsequentUpdateForEmails.setCollectionMutationStyle(
+        
UpsertExecutionContext.CollectionMutationStyle.REMOVE_FROM_EXISTING_COLLECTION);
+    subsequentUpdateForEmails.setNullHandlingMutationStyle(
+        UpsertExecutionContext.NullHandlingMutationStyle.IGNORE_NULL_COLUMNS);
+    User oldUser = new User();
+    oldUser.setUserid(userId);
+    Set<String> updatedEmails = new HashSet<>();
+    updatedEmails.add(new String("1"));
+    oldUser.setEmails(updatedEmails);
+    subsequentUpdateForEmails.setPayload(oldUser);
+    userUpsertOperator.beginWindow(3);
+    userUpsertOperator.input.process(originalEntry);
+    userUpsertOperator.input.process(subsequentUpdateForEmails);
+    userUpsertOperator.endWindow();
+
+    ResultSet results = userUpsertOperator.session.execute(
+        "SELECT * FROM unittests.users WHERE userid = '" + userId + "'");
+    List<Row> rows = results.all();
+    Row userRow = rows.get(0);
+    Set<String> existingEmailsEntry = userRow.getSet("emails", String.class);
+    assertEquals(1, existingEmailsEntry.size());
+    assertEquals("" + 2, "" + existingEmailsEntry.iterator().next());
+  }
+
+  @Test
+  public void testForCollectionRemovalAndIfExists() throws Exception
+  {
+    User aUser = new User();
+    String userId = "user" + System.currentTimeMillis();
+    aUser.setUserid(userId);
+    FullName fullName = new FullName("first12" + System.currentTimeMillis(), 
"last12" + System.currentTimeMillis());
+    aUser.setUsername(fullName);
+    Set<String> emails = new HashSet<>();
+    emails.add(new String("1"));
+    emails.add(new String("2"));
+    aUser.setEmails(emails);
+
+    UpsertExecutionContext<User> originalEntry = new 
UpsertExecutionContext<>();
+    originalEntry.setPayload(aUser);
+
+    UpsertExecutionContext<User> subsequentUpdateForEmails = new 
UpsertExecutionContext<>();
+    subsequentUpdateForEmails.setCollectionMutationStyle(
+        
UpsertExecutionContext.CollectionMutationStyle.REMOVE_FROM_EXISTING_COLLECTION);
+    subsequentUpdateForEmails.setNullHandlingMutationStyle(
+        UpsertExecutionContext.NullHandlingMutationStyle.IGNORE_NULL_COLUMNS);
+    subsequentUpdateForEmails.setUpdateOnlyIfPrimaryKeyExists(true);
+    User oldUser = new User();
+    oldUser.setUserid(userId + System.currentTimeMillis()); // overriding with 
a non-existent user id
+    Set<String> updatedEmails = new HashSet<>();
+    updatedEmails.add(new String("1"));
+    oldUser.setEmails(updatedEmails);
+    subsequentUpdateForEmails.setPayload(oldUser);
+    userUpsertOperator.beginWindow(4);
+    userUpsertOperator.input.process(originalEntry);
+    userUpsertOperator.input.process(subsequentUpdateForEmails);
+    userUpsertOperator.endWindow();
+
+    ResultSet results = userUpsertOperator.session.execute(
+        "SELECT * FROM unittests.users WHERE userid = '" + userId + "'");
+    List<Row> rows = results.all();
+    Row userRow = rows.get(0);
+    Set<String> existingEmailsEntry = userRow.getSet("emails", String.class);
+    assertEquals(2, existingEmailsEntry.size());
+    assertEquals("" + 1, "" + existingEmailsEntry.iterator().next());
+  }
+
+  @Test
+  public void testForListAppendAndIfExists() throws Exception
+  {
+    User aUser = new User();
+    String userId = "user" + System.currentTimeMillis();
+    aUser.setUserid(userId);
+    FullName fullName = new FullName("first" + System.currentTimeMillis(), 
"last" + System.currentTimeMillis());
+    aUser.setUsername(fullName);
+    Address address = new Address("street1", "city1", 13, null);
+    aUser.setCurrentaddress(address);
+    Set<String> emails = new HashSet<>();
+    emails.add(new String("1"));
+    emails.add(new String("2"));
+    aUser.setEmails(emails);
+    List<Integer> topScores = new ArrayList<>();
+    topScores.add(1);
+    topScores.add(2);
+    aUser.setTopScores(topScores);
+    UpsertExecutionContext<User> originalEntry = new 
UpsertExecutionContext<>();
+    originalEntry.setPayload(aUser);
+
+    UpsertExecutionContext<User> subsequentUpdateForTopScores = new 
UpsertExecutionContext<>();
+    subsequentUpdateForTopScores.setListPlacementStyle(
+        UpsertExecutionContext.ListPlacementStyle.APPEND_TO_EXISTING_LIST);
+    subsequentUpdateForTopScores.setCollectionMutationStyle(
+        
UpsertExecutionContext.CollectionMutationStyle.ADD_TO_EXISTING_COLLECTION);
+    subsequentUpdateForTopScores.setNullHandlingMutationStyle(
+        UpsertExecutionContext.NullHandlingMutationStyle.IGNORE_NULL_COLUMNS);
+    subsequentUpdateForTopScores.setUpdateOnlyIfPrimaryKeyExists(true);
+    User oldUser = new User();
+    oldUser.setUserid(userId + System.currentTimeMillis());
+    List<Integer> topScoresAppended = new ArrayList<>();
+    topScoresAppended.add(3);
+    oldUser.setTopScores(topScoresAppended);
+    subsequentUpdateForTopScores.setPayload(oldUser);
+    userUpsertOperator.beginWindow(5);
+    userUpsertOperator.input.process(originalEntry);
+    userUpsertOperator.input.process(subsequentUpdateForTopScores);
+    userUpsertOperator.endWindow();
+
+    ResultSet results = userUpsertOperator.session.execute(
+        "SELECT * FROM unittests.users WHERE userid = '" + userId + "'");
+    List<Row> rows = results.all();
+    Row userRow = rows.get(0);
+    List<Integer> topScoresEntry = userRow.getList("top_scores", 
Integer.class);
+    assertEquals(2, topScoresEntry.size());
+    assertEquals("" + 2, "" + topScoresEntry.get(1));
+  }
+
+  @Test
+  public void testForListPrependAndExplicitNullForSomeColumns() throws 
Exception
+  {
+    User aUser = new User();
+    String userId = "user" + System.currentTimeMillis();
+    aUser.setUserid(userId);
+    FullName fullName = new FullName("first24" + System.currentTimeMillis(), 
"last" + System.currentTimeMillis());
+    aUser.setUsername(fullName);
+    List<Integer> topScores = new ArrayList<>();
+    topScores.add(1);
+    topScores.add(2);
+    aUser.setTopScores(topScores);
+    UpsertExecutionContext<User> originalEntry = new 
UpsertExecutionContext<>();
+    originalEntry.setPayload(aUser);
+
+    UpsertExecutionContext<User> subsequentUpdateForTopScores = new 
UpsertExecutionContext<>();
+    subsequentUpdateForTopScores.setListPlacementStyle(
+        UpsertExecutionContext.ListPlacementStyle.PREPEND_TO_EXISTING_LIST);
+    subsequentUpdateForTopScores.setCollectionMutationStyle(
+        
UpsertExecutionContext.CollectionMutationStyle.ADD_TO_EXISTING_COLLECTION);
+    subsequentUpdateForTopScores.setNullHandlingMutationStyle(
+        UpsertExecutionContext.NullHandlingMutationStyle.SET_NULL_COLUMNS);
+    User oldUser = new User();
+    oldUser.setUserid(userId);
+    List<Integer> topScoresAppended = new ArrayList<>();
+    topScoresAppended.add(3);
+    oldUser.setTopScores(topScoresAppended);
+    subsequentUpdateForTopScores.setPayload(oldUser);
+    userUpsertOperator.beginWindow(6);
+    userUpsertOperator.input.process(originalEntry);
+    userUpsertOperator.input.process(subsequentUpdateForTopScores);
+    userUpsertOperator.endWindow();
+
+    ResultSet results = userUpsertOperator.session.execute(
+        "SELECT * FROM unittests.users WHERE userid = '" + userId + "'");
+    List<Row> rows = results.all();
+    Row userRow = rows.get(0);
+    FullName name = userRow.get("username", FullName.class);
+    assertEquals(null, name);
+  }
+
+  @Test
+  public void testForSingleRowInsertWithTTL() throws Exception
+  {
+    User aUser = new User();
+    aUser.setUserid("userWithTTL" + System.currentTimeMillis());
+    FullName fullName = new FullName("firstname" + System.currentTimeMillis(), 
"lasName" + System.currentTimeMillis());
+    aUser.setUsername(fullName);
+    Address address = new Address("city1", "Street1", 12, null);
+    aUser.setCurrentaddress(address);
+    UpsertExecutionContext<User> anUpdate = new UpsertExecutionContext<>();
+    anUpdate.setOverridingTTL(5000);
+    anUpdate.setPayload(aUser);
+    userUpsertOperator.beginWindow(7);
+    userUpsertOperator.input.process(anUpdate);
+    userUpsertOperator.endWindow();
+
+    ResultSet results = userUpsertOperator.session.execute(
+        "SELECT * FROM unittests.users WHERE userid = '" + aUser.getUserid() + 
"'");
+    List<Row> rows = results.all();
+    assertEquals(rows.size(), 1);
+    assertTrue(results.isExhausted());
+  }
+
+  @Test
+  public void testForSingleRowInsertWithOverridingConsistency() throws 
Exception
+  {
+    User aUser = new User();
+    aUser.setUserid("userWithConsistency" + System.currentTimeMillis());
+    FullName fullName = new FullName("first" + System.currentTimeMillis(), 
"last" + System.currentTimeMillis());
+    aUser.setUsername(fullName);
+    Address address = new Address("city21", "Street31", 12, null);
+    aUser.setCurrentaddress(address);
+    UpsertExecutionContext<User> anUpdate = new UpsertExecutionContext<>();
+    anUpdate.setOverridingConsistencyLevel(ConsistencyLevel.LOCAL_SERIAL);
+    anUpdate.setPayload(aUser);
+    userUpsertOperator.beginWindow(8);
+    userUpsertOperator.input.process(anUpdate);
+    userUpsertOperator.endWindow();
+
+    ResultSet results = userUpsertOperator.session.execute(
+        "SELECT * FROM unittests.users WHERE userid = '" + aUser.getUserid() + 
"'");
+    List<Row> rows = results.all();
+    assertEquals(rows.size(), 1);
+    assertTrue(results.isExhausted());
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/664257b4/contrib/src/test/java/com/datatorrent/contrib/cassandra/AbstractUpsertOutputOperatorCompositePKTest.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/test/java/com/datatorrent/contrib/cassandra/AbstractUpsertOutputOperatorCompositePKTest.java
 
b/contrib/src/test/java/com/datatorrent/contrib/cassandra/AbstractUpsertOutputOperatorCompositePKTest.java
new file mode 100644
index 0000000..b0863e0
--- /dev/null
+++ 
b/contrib/src/test/java/com/datatorrent/contrib/cassandra/AbstractUpsertOutputOperatorCompositePKTest.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.cassandra;
+
+import java.util.List;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.lib.helper.OperatorContextTestHelper;
+import com.datatorrent.lib.helper.TestPortContext;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * A simple test class that checks functionality when composite primary keys 
are present as table definitions
+ */
+public class AbstractUpsertOutputOperatorCompositePKTest
+{
+
+
+  public static final String APP_ID = "TestCassandraUpsertOperator";
+  public static final int OPERATOR_ID_FOR_COMPOSITE_PRIMARY_KEYS = 2;
+
+  CompositePrimaryKeyUpdateOperator compositePrimaryKeysOperator = null;
+  OperatorContextTestHelper.TestIdOperatorContext 
contextForCompositePrimaryKeysOperator;
+  TestPortContext testPortContextForCompositePrimaryKeys;
+
+  @Before
+  public void setupApexContexts() throws Exception
+  {
+    Attribute.AttributeMap.DefaultAttributeMap 
attributeMapForCompositePrimaryKey =
+        new Attribute.AttributeMap.DefaultAttributeMap();
+    attributeMapForCompositePrimaryKey.put(DAG.APPLICATION_ID, APP_ID);
+    contextForCompositePrimaryKeysOperator = new 
OperatorContextTestHelper.TestIdOperatorContext(
+                OPERATOR_ID_FOR_COMPOSITE_PRIMARY_KEYS,
+                attributeMapForCompositePrimaryKey);
+
+    Attribute.AttributeMap.DefaultAttributeMap 
portAttributesForCompositePrimaryKeys =
+        new Attribute.AttributeMap.DefaultAttributeMap();
+    portAttributesForCompositePrimaryKeys.put(Context.PortContext.TUPLE_CLASS, 
CompositePrimaryKeyRow.class);
+    testPortContextForCompositePrimaryKeys = new 
TestPortContext(portAttributesForCompositePrimaryKeys);
+
+    compositePrimaryKeysOperator = new CompositePrimaryKeyUpdateOperator();
+    compositePrimaryKeysOperator.setup(contextForCompositePrimaryKeysOperator);
+    
compositePrimaryKeysOperator.activate(contextForCompositePrimaryKeysOperator);
+    
compositePrimaryKeysOperator.input.setup(testPortContextForCompositePrimaryKeys);
+  }
+
+  @Test
+  public void testForCompositeRowKeyBasedTable() throws Exception
+  {
+    CompositePrimaryKeyRow aCompositeRowKey = new CompositePrimaryKeyRow();
+    String userId = new String("user1" + System.currentTimeMillis());
+    String employeeId = new String(userId + System.currentTimeMillis());
+    int day = 1;
+    int month = 12;
+    int year = 2017;
+    aCompositeRowKey.setDay(day);
+    aCompositeRowKey.setMonth(month);
+    aCompositeRowKey.setYear(year);
+    aCompositeRowKey.setCurrentstatus("status" + System.currentTimeMillis());
+    aCompositeRowKey.setUserid(userId);
+    aCompositeRowKey.setEmployeeid(employeeId);
+    UpsertExecutionContext<CompositePrimaryKeyRow> anUpdate = new 
UpsertExecutionContext<>();
+    anUpdate.setPayload(aCompositeRowKey);
+    compositePrimaryKeysOperator.beginWindow(12);
+    compositePrimaryKeysOperator.input.process(anUpdate);
+    compositePrimaryKeysOperator.endWindow();
+
+    ResultSet results = compositePrimaryKeysOperator.session.execute(
+        "SELECT * FROM unittests.userstatus WHERE userid = '" + userId + "' 
and day=" + day + " and month=" +
+        month + " and year=" + year + " and employeeid='" + employeeId + "'");
+    List<Row> rows = results.all();
+    assertEquals(rows.size(), 1);
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/664257b4/contrib/src/test/java/com/datatorrent/contrib/cassandra/AbstractUpsertOutputOperatorCountersTest.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/test/java/com/datatorrent/contrib/cassandra/AbstractUpsertOutputOperatorCountersTest.java
 
b/contrib/src/test/java/com/datatorrent/contrib/cassandra/AbstractUpsertOutputOperatorCountersTest.java
new file mode 100644
index 0000000..b32dc43
--- /dev/null
+++ 
b/contrib/src/test/java/com/datatorrent/contrib/cassandra/AbstractUpsertOutputOperatorCountersTest.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.cassandra;
+
+import java.util.List;
+import org.junit.Before;
+import org.junit.Test;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.lib.helper.OperatorContextTestHelper;
+import com.datatorrent.lib.helper.TestPortContext;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * A simple test class that  tests the functionality when a table with 
counters is the sink for POJOS
+ */
+public class AbstractUpsertOutputOperatorCountersTest
+{
+
+  public static final String APP_ID = "TestCassandraUpsertOperator";
+  public static final int OPERATOR_ID_FOR_COUNTER_COLUMNS = 1;
+
+  CounterColumnUpdatesOperator counterUpdatesOperator = null;
+  OperatorContextTestHelper.TestIdOperatorContext contextForCountersOperator;
+  TestPortContext testPortContextForCounters;
+
+  @Before
+  public void setupApexContexts() throws Exception
+  {
+    Attribute.AttributeMap.DefaultAttributeMap attributeMapForCounters =
+        new Attribute.AttributeMap.DefaultAttributeMap();
+    attributeMapForCounters.put(DAG.APPLICATION_ID, APP_ID);
+    contextForCountersOperator = new 
OperatorContextTestHelper.TestIdOperatorContext(OPERATOR_ID_FOR_COUNTER_COLUMNS,
+                attributeMapForCounters);
+
+    Attribute.AttributeMap.DefaultAttributeMap portAttributesForCounters =
+        new Attribute.AttributeMap.DefaultAttributeMap();
+    portAttributesForCounters.put(Context.PortContext.TUPLE_CLASS, 
CounterColumnTableEntry.class);
+    testPortContextForCounters = new 
TestPortContext(portAttributesForCounters);
+
+    counterUpdatesOperator = new CounterColumnUpdatesOperator();
+    counterUpdatesOperator.setup(contextForCountersOperator);
+    counterUpdatesOperator.activate(contextForCountersOperator);
+    counterUpdatesOperator.input.setup(testPortContextForCounters);
+  }
+
+  @Test
+  public void testForSingleRowInsertForCounterTables() throws Exception
+  {
+
+    CounterColumnTableEntry aCounterEntry = new CounterColumnTableEntry();
+    String userId = new String("user1" + System.currentTimeMillis());
+    aCounterEntry.setUserId(userId);
+    aCounterEntry.setUpdatecount(3);
+    UpsertExecutionContext<CounterColumnTableEntry> anUpdate = new 
UpsertExecutionContext<>();
+    anUpdate.setOverridingConsistencyLevel(ConsistencyLevel.LOCAL_ONE);
+    anUpdate.setPayload(aCounterEntry);
+    counterUpdatesOperator.beginWindow(9);
+    counterUpdatesOperator.input.process(anUpdate);
+    counterUpdatesOperator.endWindow();
+
+    ResultSet results = counterUpdatesOperator.session.execute(
+        "SELECT * FROM unittests.userupdates WHERE userid = '" + userId + "'");
+    List<Row> rows = results.all();
+    assertEquals(rows.size(), 1);
+    assertEquals(3, rows.get(0).getLong("updatecount"));
+
+    aCounterEntry = new CounterColumnTableEntry();
+    aCounterEntry.setUserId(userId);
+    aCounterEntry.setUpdatecount(2);
+    anUpdate = new UpsertExecutionContext<>();
+    anUpdate.setOverridingConsistencyLevel(ConsistencyLevel.LOCAL_ONE);
+    anUpdate.setPayload(aCounterEntry);
+    counterUpdatesOperator.beginWindow(10);
+    counterUpdatesOperator.input.process(anUpdate);
+    counterUpdatesOperator.endWindow();
+    results = counterUpdatesOperator.session.execute(
+                "SELECT * FROM unittests.userupdates WHERE userid = '" + 
userId + "'");
+    rows = results.all();
+    assertEquals(5, rows.get(0).getLong("updatecount"));
+    aCounterEntry = new CounterColumnTableEntry();
+    aCounterEntry.setUserId(userId);
+    aCounterEntry.setUpdatecount(-1);
+    anUpdate = new UpsertExecutionContext<>();
+    anUpdate.setOverridingConsistencyLevel(ConsistencyLevel.LOCAL_ONE);
+    anUpdate.setPayload(aCounterEntry);
+
+    counterUpdatesOperator.beginWindow(11);
+    counterUpdatesOperator.input.process(anUpdate);
+    counterUpdatesOperator.endWindow();
+
+    results = counterUpdatesOperator.session.execute(
+                "SELECT * FROM unittests.userupdates WHERE userid = '" + 
userId + "'");
+    rows = results.all();
+    assertEquals(4, rows.get(0).getLong("updatecount"));
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/664257b4/contrib/src/test/java/com/datatorrent/contrib/cassandra/Address.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/test/java/com/datatorrent/contrib/cassandra/Address.java 
b/contrib/src/test/java/com/datatorrent/contrib/cassandra/Address.java
new file mode 100644
index 0000000..faeef03
--- /dev/null
+++ b/contrib/src/test/java/com/datatorrent/contrib/cassandra/Address.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.cassandra;
+
+import java.util.Set;
+
+/**
+ *
+ */
+public class Address
+{
+  private String city;
+
+  private String street;
+
+  private int zip_code;
+  private Set<String> phones;
+
+  public Address(String city, String street, int zipcode, Set<String> phones)
+  {
+    this.city = city;
+    this.street = street;
+    this.zip_code = zipcode;
+    this.phones = phones;
+  }
+
+  public String getCity()
+  {
+    return city;
+  }
+
+  public void setCity(String city)
+  {
+    this.city = city;
+  }
+
+  public String getStreet()
+  {
+    return street;
+  }
+
+  public void setStreet(String street)
+  {
+    this.street = street;
+  }
+
+  public int getZip_code()
+  {
+    return zip_code;
+  }
+
+  public void setZip_code(int zip_code)
+  {
+    this.zip_code = zip_code;
+  }
+
+  public Set<String> getPhones()
+  {
+    return phones;
+  }
+
+  public void setPhones(Set<String> phones)
+  {
+    this.phones = phones;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/664257b4/contrib/src/test/java/com/datatorrent/contrib/cassandra/AddressCodec.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/test/java/com/datatorrent/contrib/cassandra/AddressCodec.java 
b/contrib/src/test/java/com/datatorrent/contrib/cassandra/AddressCodec.java
new file mode 100644
index 0000000..017b34d
--- /dev/null
+++ b/contrib/src/test/java/com/datatorrent/contrib/cassandra/AddressCodec.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.cassandra;
+
+import java.nio.ByteBuffer;
+
+import com.datastax.driver.core.ProtocolVersion;
+import com.datastax.driver.core.TypeCodec;
+import com.datastax.driver.core.UDTValue;
+import com.datastax.driver.core.UserType;
+import com.datastax.driver.core.exceptions.InvalidTypeException;
+
+public class AddressCodec extends TypeCodec<Address>
+{
+
+  private final TypeCodec<UDTValue> innerCodec;
+
+  private final UserType userType;
+
+  public AddressCodec(TypeCodec<UDTValue> innerCodec, Class<Address> javaType)
+  {
+    super(innerCodec.getCqlType(), javaType);
+    this.innerCodec = innerCodec;
+    this.userType = (UserType)innerCodec.getCqlType();
+  }
+
+  @Override
+  public ByteBuffer serialize(Address value, ProtocolVersion protocolVersion) 
throws InvalidTypeException
+  {
+    return innerCodec.serialize(toUDTValue(value), protocolVersion);
+  }
+
+  @Override
+  public Address deserialize(ByteBuffer bytes, ProtocolVersion 
protocolVersion) throws InvalidTypeException
+  {
+    return toAddress(innerCodec.deserialize(bytes, protocolVersion));
+  }
+
+  @Override
+  public Address parse(String value) throws InvalidTypeException
+  {
+    return value == null || value.isEmpty() ? null : 
toAddress(innerCodec.parse(value));
+  }
+
+  @Override
+  public String format(Address value) throws InvalidTypeException
+  {
+    return value == null ? null : innerCodec.format(toUDTValue(value));
+  }
+
+  protected Address toAddress(UDTValue value)
+  {
+    return value == null ? null : new Address(
+      value.getString("city"),
+      value.getString("street"),
+      value.getInt("zip_code"),
+      value.getSet("phones", String.class)
+    );
+  }
+
+  protected UDTValue toUDTValue(Address value)
+  {
+    return value == null ? null : userType.newValue()
+      .setString("street", value.getStreet())
+      .setInt("zip_code", value.getZip_code())
+      .setString("city", value.getCity())
+      .setSet("phones", value.getPhones());
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/664257b4/contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java
 
b/contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java
index 74f99a8..4a1f883 100644
--- 
a/contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java
+++ 
b/contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java
@@ -18,30 +18,45 @@
  */
 package com.datatorrent.contrib.cassandra;
 
-import com.datastax.driver.core.*;
-import com.datastax.driver.core.exceptions.DriverException;
-import com.datatorrent.api.Attribute;
-import com.datatorrent.api.Attribute.AttributeMap;
-import com.datatorrent.api.Context;
-import com.datatorrent.api.DAG;
-import com.datatorrent.lib.helper.TestPortContext;
-import com.datatorrent.lib.util.FieldInfo;
-import com.datatorrent.netlet.util.DTThrowable;
-import com.datatorrent.lib.helper.OperatorContextTestHelper;
-import com.datatorrent.lib.testbench.CollectorTestSink;
-import com.google.common.collect.Lists;
-
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
 
 import org.junit.After;
+import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
-import org.junit.AfterClass;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Configuration;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ProtocolVersion;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.exceptions.DriverException;
+import com.google.common.collect.Lists;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Attribute.AttributeMap;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.lib.helper.OperatorContextTestHelper;
+import com.datatorrent.lib.helper.TestPortContext;
+import com.datatorrent.lib.testbench.CollectorTestSink;
+import com.datatorrent.lib.util.FieldInfo;
+import com.datatorrent.netlet.util.DTThrowable;
+
 /**
  * Tests for {@link AbstractCassandraTransactionableOutputOperator} and {@link 
AbstractCassandraInputOperator}
  */
@@ -249,7 +264,7 @@ public class CassandraOperatorTest
     outputOperator.setup(context);
 
     Configuration config = 
outputOperator.getStore().getCluster().getConfiguration();
-    Assert.assertEquals("Procotol version was not set to V2.", 
ProtocolVersion.V2, config.getProtocolOptions().getProtocolVersionEnum());
+    Assert.assertEquals("Procotol version was not set to V2.", 
ProtocolVersion.V2, config.getProtocolOptions().getProtocolVersion());
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/664257b4/contrib/src/test/java/com/datatorrent/contrib/cassandra/CompositePrimaryKeyRow.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/test/java/com/datatorrent/contrib/cassandra/CompositePrimaryKeyRow.java
 
b/contrib/src/test/java/com/datatorrent/contrib/cassandra/CompositePrimaryKeyRow.java
new file mode 100644
index 0000000..9aa2dae
--- /dev/null
+++ 
b/contrib/src/test/java/com/datatorrent/contrib/cassandra/CompositePrimaryKeyRow.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.cassandra;
+
+/**
+ A pojo that represents the table row in which a row key is a composite
+ */
+public class CompositePrimaryKeyRow
+{
+
+  private String userid;
+
+  private int day;
+
+  private int month;
+
+  private int year;
+
+  private String employeeid;
+
+  private String currentstatus;
+
+  public String getUserid()
+  {
+    return userid;
+  }
+
+  public void setUserid(String userid)
+  {
+    this.userid = userid;
+  }
+
+  public int getDay()
+  {
+    return day;
+  }
+
+  public void setDay(int day)
+  {
+    this.day = day;
+  }
+
+  public int getMonth()
+  {
+    return month;
+  }
+
+  public void setMonth(int month)
+  {
+    this.month = month;
+  }
+
+  public int getYear()
+  {
+    return year;
+  }
+
+  public void setYear(int year)
+  {
+    this.year = year;
+  }
+
+  public String getEmployeeid()
+  {
+    return employeeid;
+  }
+
+  public void setEmployeeid(String employeeid)
+  {
+    this.employeeid = employeeid;
+  }
+
+  public String getCurrentstatus()
+  {
+    return currentstatus;
+  }
+
+  public void setCurrentstatus(String currentstatus)
+  {
+    this.currentstatus = currentstatus;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/664257b4/contrib/src/test/java/com/datatorrent/contrib/cassandra/CompositePrimaryKeyUpdateOperator.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/test/java/com/datatorrent/contrib/cassandra/CompositePrimaryKeyUpdateOperator.java
 
b/contrib/src/test/java/com/datatorrent/contrib/cassandra/CompositePrimaryKeyUpdateOperator.java
new file mode 100644
index 0000000..f2d6737
--- /dev/null
+++ 
b/contrib/src/test/java/com/datatorrent/contrib/cassandra/CompositePrimaryKeyUpdateOperator.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.cassandra;
+
+import java.util.Map;
+import com.datastax.driver.core.TypeCodec;
+
+/**
+ A concrete implementation of the AbstractUpsertOperator that tests composite 
primary key use cases
+ Please run the schema as given in {@link 
AbstractUpsertOutputOperatorCodecsTest}
+ */
+public class CompositePrimaryKeyUpdateOperator extends 
AbstractUpsertOutputOperator
+{
+  @Override
+  public ConnectionStateManager.ConnectionBuilder withConnectionBuilder()
+  {
+    return ConnectionStateManager.withNewBuilder()
+                .withSeedNodes("localhost")
+                .withClusterNameAs("Test Cluster")
+                .withDCNameAs("datacenter1")
+                .withTableNameAs("userstatus")
+                .withKeySpaceNameAs("unittests");
+  }
+
+  @Override
+  public Map<String, TypeCodec> getCodecsForUserDefinedTypes()
+  {
+    return null;
+  }
+
+  @Override
+  public Class getPayloadPojoClass()
+  {
+    return CompositePrimaryKeyRow.class;
+  }
+
+  @Override
+  boolean reconcileRecord(Object T, long windowId)
+  {
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/664257b4/contrib/src/test/java/com/datatorrent/contrib/cassandra/CounterColumnTableEntry.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/test/java/com/datatorrent/contrib/cassandra/CounterColumnTableEntry.java
 
b/contrib/src/test/java/com/datatorrent/contrib/cassandra/CounterColumnTableEntry.java
new file mode 100644
index 0000000..72b3373
--- /dev/null
+++ 
b/contrib/src/test/java/com/datatorrent/contrib/cassandra/CounterColumnTableEntry.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.cassandra;
+
+public class CounterColumnTableEntry
+{
+  private String userId;
+
+  private long updatecount;
+
+  public String getUserId()
+  {
+    return userId;
+  }
+
+  public void setUserId(String userId)
+  {
+    this.userId = userId;
+  }
+
+  public long getUpdatecount()
+  {
+    return updatecount;
+  }
+
+  public void setUpdatecount(long updatecount)
+  {
+    this.updatecount = updatecount;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/664257b4/contrib/src/test/java/com/datatorrent/contrib/cassandra/CounterColumnUpdatesOperator.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/test/java/com/datatorrent/contrib/cassandra/CounterColumnUpdatesOperator.java
 
b/contrib/src/test/java/com/datatorrent/contrib/cassandra/CounterColumnUpdatesOperator.java
new file mode 100644
index 0000000..1ec416c
--- /dev/null
+++ 
b/contrib/src/test/java/com/datatorrent/contrib/cassandra/CounterColumnUpdatesOperator.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.cassandra;
+
+import java.util.Map;
+import com.datastax.driver.core.TypeCodec;
+
+public class CounterColumnUpdatesOperator extends AbstractUpsertOutputOperator
+{
+  @Override
+  public ConnectionStateManager.ConnectionBuilder withConnectionBuilder()
+  {
+    return ConnectionStateManager.withNewBuilder()
+                .withSeedNodes("localhost")
+                .withClusterNameAs("Test Cluster")
+                .withDCNameAs("datacenter1")
+                .withTableNameAs("userupdates")
+                .withKeySpaceNameAs("unittests");
+  }
+
+  @Override
+  public Map<String, TypeCodec> getCodecsForUserDefinedTypes()
+  {
+    return null;
+  }
+
+  @Override
+  public Class getPayloadPojoClass()
+  {
+    return CounterColumnTableEntry.class;
+  }
+
+  @Override
+  boolean reconcileRecord(Object T, long windowId)
+  {
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/664257b4/contrib/src/test/java/com/datatorrent/contrib/cassandra/FullName.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/test/java/com/datatorrent/contrib/cassandra/FullName.java 
b/contrib/src/test/java/com/datatorrent/contrib/cassandra/FullName.java
new file mode 100644
index 0000000..61636bb
--- /dev/null
+++ b/contrib/src/test/java/com/datatorrent/contrib/cassandra/FullName.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.cassandra;
+
+public class FullName
+{
+  private String firstname;
+
+  private String lastname;
+
+  public FullName(String firstname, String lastname)
+  {
+    this.firstname = firstname;
+    this.lastname = lastname;
+  }
+
+  public String getFirstname()
+  {
+    return firstname;
+  }
+
+  public void setFirstname(String firstname)
+  {
+    this.firstname = firstname;
+  }
+
+  public String getLastname()
+  {
+    return lastname;
+  }
+
+  public void setLastname(String lastname)
+  {
+    this.lastname = lastname;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/664257b4/contrib/src/test/java/com/datatorrent/contrib/cassandra/FullNameCodec.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/test/java/com/datatorrent/contrib/cassandra/FullNameCodec.java 
b/contrib/src/test/java/com/datatorrent/contrib/cassandra/FullNameCodec.java
new file mode 100644
index 0000000..9816d43
--- /dev/null
+++ b/contrib/src/test/java/com/datatorrent/contrib/cassandra/FullNameCodec.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.cassandra;
+
+import java.nio.ByteBuffer;
+
+import com.datastax.driver.core.ProtocolVersion;
+import com.datastax.driver.core.TypeCodec;
+import com.datastax.driver.core.UDTValue;
+import com.datastax.driver.core.UserType;
+import com.datastax.driver.core.exceptions.InvalidTypeException;
+
+public class FullNameCodec extends TypeCodec<FullName>
+{
+
+  private final TypeCodec<UDTValue> innerCodec;
+
+  private final UserType userType;
+
+  public FullNameCodec(TypeCodec<UDTValue> innerCodec, Class<FullName> 
javaType)
+  {
+    super(innerCodec.getCqlType(), javaType);
+    this.innerCodec = innerCodec;
+    this.userType = (UserType)innerCodec.getCqlType();
+  }
+
+  @Override
+  public ByteBuffer serialize(FullName value, ProtocolVersion protocolVersion) 
throws InvalidTypeException
+  {
+    return innerCodec.serialize(toUDTValue(value), protocolVersion);
+  }
+
+  @Override
+  public FullName deserialize(ByteBuffer bytes, ProtocolVersion 
protocolVersion) throws InvalidTypeException
+  {
+    return toFullName(innerCodec.deserialize(bytes, protocolVersion));
+  }
+
+  @Override
+  public FullName parse(String value) throws InvalidTypeException
+  {
+    return value == null || value.isEmpty() ? null : 
toFullName(innerCodec.parse(value));
+  }
+
+  @Override
+  public String format(FullName value) throws InvalidTypeException
+  {
+    return value == null ? null : innerCodec.format(toUDTValue(value));
+  }
+
+  protected FullName toFullName(UDTValue value)
+  {
+    return value == null ? null : new FullName(
+      value.getString("firstname"),
+      value.getString("lastname"));
+  }
+
+  protected UDTValue toUDTValue(FullName value)
+  {
+    return value == null ? null : userType.newValue()
+      .setString("lastname", value.getLastname())
+      .setString("firstname", value.getFirstname());
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/664257b4/contrib/src/test/java/com/datatorrent/contrib/cassandra/User.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/cassandra/User.java 
b/contrib/src/test/java/com/datatorrent/contrib/cassandra/User.java
new file mode 100644
index 0000000..f4c1d8d
--- /dev/null
+++ b/contrib/src/test/java/com/datatorrent/contrib/cassandra/User.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.cassandra;
+
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+
+public class User
+{
+  private String userid;
+  private FullName username;
+  private Set<String> emails;
+  private List<Integer> topScores;
+  private Map<Date, String> todo;
+  private Address currentaddress;
+  private List<FullName> previousnames;
+
+  private String nonMatchingColumn;
+
+  private List<Integer> nonMatchingCollectionColumn;
+
+  public String getUserid()
+  {
+    return userid;
+  }
+
+  public void setUserid(String userid)
+  {
+    this.userid = userid;
+  }
+
+  public FullName getUsername()
+  {
+    return username;
+  }
+
+  public void setUsername(FullName username)
+  {
+    this.username = username;
+  }
+
+  public Set<String> getEmails()
+  {
+    return emails;
+  }
+
+  public void setEmails(Set<String> emails)
+  {
+    this.emails = emails;
+  }
+
+  public List<Integer> getTopScores()
+  {
+    return topScores;
+  }
+
+  public void setTopScores(List<Integer> topScores)
+  {
+    this.topScores = topScores;
+  }
+
+  public Map<Date, String> getTodo()
+  {
+    return todo;
+  }
+
+  public void setTodo(Map<Date, String> todo)
+  {
+    this.todo = todo;
+  }
+
+  public String getNonMatchingColumn()
+  {
+    return nonMatchingColumn;
+  }
+
+  public void setNonMatchingColumn(String nonMatchingColumn)
+  {
+    this.nonMatchingColumn = nonMatchingColumn;
+  }
+
+  public List<Integer> getNonMatchingCollectionColumn()
+  {
+    return nonMatchingCollectionColumn;
+  }
+
+  public void setNonMatchingCollectionColumn(List<Integer> 
nonMatchingCollectionColumn)
+  {
+    this.nonMatchingCollectionColumn = nonMatchingCollectionColumn;
+  }
+
+  public Address getCurrentaddress()
+  {
+    return currentaddress;
+  }
+
+  public void setCurrentaddress(Address currentaddress)
+  {
+    this.currentaddress = currentaddress;
+  }
+
+  public List<FullName> getPreviousnames()
+  {
+    return previousnames;
+  }
+
+  public void setPreviousnames(List<FullName> previousnames)
+  {
+    this.previousnames = previousnames;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/664257b4/contrib/src/test/java/com/datatorrent/contrib/cassandra/UserUpsertOperator.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/test/java/com/datatorrent/contrib/cassandra/UserUpsertOperator.java
 
b/contrib/src/test/java/com/datatorrent/contrib/cassandra/UserUpsertOperator.java
new file mode 100644
index 0000000..e15c9f6
--- /dev/null
+++ 
b/contrib/src/test/java/com/datatorrent/contrib/cassandra/UserUpsertOperator.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.cassandra;
+
+import java.util.HashMap;
+import java.util.Map;
+import com.datastax.driver.core.CodecRegistry;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.TypeCodec;
+import com.datastax.driver.core.UDTValue;
+import com.datastax.driver.core.UserType;
+
+/**
+  This operator is a concrete implementation of the {@link 
AbstractUpsertOutputOperator}. This is used
+  in unit tests ( refer {@link AbstractUpsertOutputOperatorCodecsTest} to 
showcase the use cases of:
+    => Connectivity options
+    => Codecs for uder defined types
+    => How to specify null overrides to avoid patterns of read and then set 
values in a row
+    => Collection mutations Add / Remove
+    => List placements Append/Prepend if there is a list column
+    => Map POJO field names to Cassandra columns if they do not match exactly
+    => Specifying TTL
+    => Overriding default consistency level set with Connection builder at a 
tuple level if required
+ */
+public class UserUpsertOperator extends AbstractUpsertOutputOperator
+{
+  @Override
+  public ConnectionStateManager.ConnectionBuilder withConnectionBuilder()
+  {
+    return ConnectionStateManager.withNewBuilder()
+                .withSeedNodes("localhost")
+                .withClusterNameAs("Test Cluster")
+                .withDCNameAs("datacenter1")
+                .withTableNameAs("users")
+                .withKeySpaceNameAs("unittests")
+                .withdefaultConsistencyLevel(ConsistencyLevel.LOCAL_ONE);
+  }
+
+  @Override
+  public Map<String, TypeCodec> getCodecsForUserDefinedTypes()
+  {
+    Map<String, TypeCodec> allCodecs = new HashMap<>();
+    CodecRegistry codecRegistry = 
cluster.getConfiguration().getCodecRegistry();
+
+    UserType addressType = 
cluster.getMetadata().getKeyspace(getConnectionStateManager().getKeyspaceName())
+        .getUserType("address");
+    TypeCodec<UDTValue> addressTypeCodec = codecRegistry.codecFor(addressType);
+    AddressCodec addressCodec = new AddressCodec(addressTypeCodec, 
Address.class);
+    allCodecs.put("currentaddress", addressCodec);
+
+    UserType userFullNameType = 
cluster.getMetadata().getKeyspace(getConnectionStateManager().getKeyspaceName())
+        .getUserType("fullname");
+    TypeCodec<UDTValue> userFullNameTypeCodec = 
codecRegistry.codecFor(userFullNameType);
+    FullNameCodec fullNameCodec = new FullNameCodec(userFullNameTypeCodec, 
FullName.class);
+    allCodecs.put("username", fullNameCodec);
+
+    return allCodecs;
+  }
+
+  @Override
+  public Class getPayloadPojoClass()
+  {
+    return User.class;
+  }
+
+
+  @Override
+  protected Map<String, String> getPojoFieldNameToCassandraColumnNameOverride()
+  {
+    Map<String,String> overridingColMap = new HashMap<>();
+    overridingColMap.put("topScores","top_scores");
+    return overridingColMap;
+  }
+
+  @Override
+  boolean reconcileRecord(Object T, long windowId)
+  {
+    return true;
+  }
+}
+

Reply via email to