Repository: apex-malhar Updated Branches: refs/heads/master 05a7ca3e4 -> 664257b4a
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/664257b4/contrib/src/main/java/com/datatorrent/contrib/cassandra/ConnectionStateManager.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/cassandra/ConnectionStateManager.java b/contrib/src/main/java/com/datatorrent/contrib/cassandra/ConnectionStateManager.java new file mode 100644 index 0000000..cfc2b46 --- /dev/null +++ b/contrib/src/main/java/com/datatorrent/contrib/cassandra/ConnectionStateManager.java @@ -0,0 +1,477 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.cassandra; + +import java.io.Serializable; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.classification.InterfaceStability; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.Host; +import com.datastax.driver.core.Metadata; +import com.datastax.driver.core.ProtocolVersion; +import com.datastax.driver.core.QueryOptions; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy; +import com.datastax.driver.core.policies.DowngradingConsistencyRetryPolicy; +import com.datastax.driver.core.policies.ExponentialReconnectionPolicy; +import com.datastax.driver.core.policies.LoadBalancingPolicy; +import com.datastax.driver.core.policies.ReconnectionPolicy; +import com.datastax.driver.core.policies.RetryPolicy; +import com.datastax.driver.core.policies.TokenAwarePolicy; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * This is used to specify the connection parameters that is used to process mutations for the + * {@link AbstractUpsertOutputOperator}. + * The Connection can only be set for one table in a given space and as such is used to define the following + * - Connection Parameters + * - Defaults to be used in case the execution context tuple {@link UpsertExecutionContext} does not want to + * set one explicitly. + * Note that the {@link ConnectionBuilder} is used to build an instance of the ConnectionStateManager. + * An instance of this class is typically instantiated by implementing the following inside + * {@link AbstractUpsertOutputOperator} withConnectionBuilder() method. + * {@code + @Override + public ConnectionStateManager.ConnectionBuilder withConnectionBuilder() + { + return ConnectionStateManager.withNewBuilder() + .withSeedNodes("localhost") // format of host1:port;host2:port;host3:port + .withClusterNameAs("Test Cluster") // mandatory param + .withDCNameAs("datacenter1") // mandatory param + .withTableNameAs("users") // mandatory param + .withKeySpaceNameAs("unittests") // mandatory param + .withdefaultConsistencyLevel(ConsistencyLevel.LOCAL_ONE); // Set if required. Default of LOCAL_QUORUM + // Rest of the configs are initialized to sane defaults + } + * } + * Please refer {@link ConnectionBuilder} for details about parameters that can be used to define the connection + * and its default behaviour + */ [email protected] +public class ConnectionStateManager implements AutoCloseable, Serializable +{ + + private static final long serialVersionUID = -6024334738016015213L; + private static final long DEFAULT_MAX_DELAY_MS = 30000L; + private static final long DEFAULT_BASE_DELAY_MS = 10000L; + // Cassandra cluster name + private final String clusterName; + // Cassandra DC Name + private final String dcName; + // Seeds nodes. The format for specifying the host names are host1:port;host2:port + private final String seedNodesStr; + private final long baseDelayMs; + private final long maxDelayMs; + // Other operational constraints + private final transient LoadBalancingPolicy loadBalancingPolicy; + private final transient RetryPolicy retryPolicy; + private final transient QueryOptions queryOptions; + private final transient ReconnectionPolicy reconnectionPolicy; + private final transient ProtocolVersion protocolVersion; + private transient Logger LOG = LoggerFactory.getLogger(ConnectionStateManager.class); + // Connection specific config elements + private transient Cluster cluster; + // Session specific metadata + private transient Session session; + private String keyspaceName; + private String tableName; + private int defaultTtlInSecs; + private ConsistencyLevel defaultConsistencyLevel; + // Standard defaults + private boolean isTTLSet = false; + + private ConnectionStateManager(final ConnectionBuilder connectionBuilder) + { + checkNotNull(connectionBuilder, "Connection Builder passed in as Null"); + checkNotNull(connectionBuilder.clusterName, "Cluster Name not set for Cassandra"); + checkNotNull(connectionBuilder.dcName, "DataCenter Name not set for Cassandra"); + checkNotNull(connectionBuilder.seedNodesStr, + "Seed nodes not set for Cassandra. Pattern is host1:port;host2:port"); + checkNotNull(connectionBuilder.keyspaceName, "Keyspace Name not set for Cassandra"); + checkNotNull(connectionBuilder.tableName, "Table Name not set for Cassandra"); + //Required params + this.clusterName = connectionBuilder.clusterName; + this.dcName = connectionBuilder.dcName; + this.seedNodesStr = connectionBuilder.seedNodesStr; + this.keyspaceName = connectionBuilder.keyspaceName; + this.tableName = connectionBuilder.tableName; + + // optional params + if (connectionBuilder.maxDelayMs != null) { + maxDelayMs = connectionBuilder.maxDelayMs; + } else { + maxDelayMs = DEFAULT_MAX_DELAY_MS; // 30 seconds + } + if (connectionBuilder.baseDelayMs != null) { + baseDelayMs = connectionBuilder.baseDelayMs; + } else { + baseDelayMs = DEFAULT_BASE_DELAY_MS; // 10 seconds + } + if (connectionBuilder.defaultTtlInSecs != null) { + defaultTtlInSecs = connectionBuilder.defaultTtlInSecs; + isTTLSet = true; + } + if (connectionBuilder.defaultConsistencyLevel != null) { + defaultConsistencyLevel = connectionBuilder.defaultConsistencyLevel; + } else { + defaultConsistencyLevel = ConsistencyLevel.LOCAL_QUORUM; + } + if (connectionBuilder.loadBalancingPolicy != null) { + loadBalancingPolicy = connectionBuilder.loadBalancingPolicy; + } else { + loadBalancingPolicy = new TokenAwarePolicy( + DCAwareRoundRobinPolicy.builder() + .withLocalDc(dcName) + .build()); + } + if (connectionBuilder.retryPolicy != null) { + this.retryPolicy = connectionBuilder.retryPolicy; + } else { + retryPolicy = DowngradingConsistencyRetryPolicy.INSTANCE; + } + if (connectionBuilder.queryOptions != null) { + this.queryOptions = connectionBuilder.queryOptions; + } else { + this.queryOptions = new QueryOptions().setConsistencyLevel(defaultConsistencyLevel); + } + if (connectionBuilder.reconnectionPolicy != null) { + this.reconnectionPolicy = connectionBuilder.reconnectionPolicy; + } else { + this.reconnectionPolicy = new ExponentialReconnectionPolicy(baseDelayMs, maxDelayMs); + } + if (connectionBuilder.protocolVersion != null) { + this.protocolVersion = connectionBuilder.protocolVersion; + } else { + this.protocolVersion = ProtocolVersion.NEWEST_SUPPORTED; + } + establishSessionWithPolicies(); + } + + public static ConnectionBuilder withNewBuilder() + { + return new ConnectionBuilder(); + } + + private void establishSessionWithPolicies() + { + Cluster.Builder clusterBuilder = Cluster.builder(); + String[] seedNodesSplit = seedNodesStr.split(";"); + Collection<InetAddress> allSeeds = new ArrayList<>(); + Set<String> allKnownPorts = new HashSet<>(); + for (String seedNode : seedNodesSplit) { + String[] nodeAndPort = seedNode.split(":"); + if (nodeAndPort.length > 1) { + allKnownPorts.add(nodeAndPort[1]); + } + try { + allSeeds.add(InetAddress.getByName(nodeAndPort[0])); + } catch (UnknownHostException e) { + LOG.error(" Error while trying to initialize the seed brokers for the cassandra cluster " + e.getMessage(), e); + } + } + clusterBuilder = clusterBuilder.addContactPoints(allSeeds); + clusterBuilder + .withClusterName(clusterName) + // We can fail if all of the nodes die in the local DC + .withLoadBalancingPolicy(loadBalancingPolicy) + // Want Strong consistency + .withRetryPolicy(retryPolicy) + // Tolerate some nodes down + .withQueryOptions(queryOptions) + // Keep establishing connections after detecting a node down + .withReconnectionPolicy(reconnectionPolicy); + // Finally initialize the cluster info + if (allKnownPorts.size() > 0) { + int shortlistedPort = Integer.parseInt(allKnownPorts.iterator().next()); + clusterBuilder = clusterBuilder.withPort(shortlistedPort); + } + cluster = clusterBuilder.build(); + Metadata metadata = cluster.getMetadata(); + LOG.info("Connected to cluster: \n" + metadata.getClusterName()); + for (Host host : metadata.getAllHosts()) { + LOG.info(String.format("Datacenter: %s; Host: %s; Rack: %s\n", + host.getDatacenter(), host.getAddress(), host.getRack())); + } + session = cluster.connect(keyspaceName); + } + + @Override + public void close() + { + if (session != null) { + session.close(); + } + if (cluster != null) { + cluster.close(); + } + } + + public Cluster getCluster() + { + return cluster; + } + + public void setCluster(Cluster cluster) + { + this.cluster = cluster; + } + + public Session getSession() + { + return session; + } + + public void setSession(Session session) + { + this.session = session; + } + + public String getKeyspaceName() + { + return keyspaceName; + } + + public void setKeyspaceName(String keyspaceName) + { + this.keyspaceName = keyspaceName; + } + + public String getTableName() + { + return tableName; + } + + public void setTableName(String tableName) + { + this.tableName = tableName; + } + + public int getDefaultTtlInSecs() + { + return defaultTtlInSecs; + } + + public void setDefaultTtlInSecs(int defaultTtlInSecs) + { + this.defaultTtlInSecs = defaultTtlInSecs; + } + + public boolean isTTLSet() + { + return isTTLSet; + } + + public void setTTLSet(boolean TTLSet) + { + isTTLSet = TTLSet; + } + + public static class ConnectionBuilder + { + + private String clusterName; + private String dcName; + private String seedNodesStr; + private Long baseDelayMs; // Class to enable check for nulls and is optional + private Long maxDelayMs; // Class to enable check for nulls and is optional + + private String keyspaceName; + private String tableName; + private Integer defaultTtlInSecs; // Class to enable checks for nulls and is optional + private ConsistencyLevel defaultConsistencyLevel; + + private LoadBalancingPolicy loadBalancingPolicy; + private RetryPolicy retryPolicy; + private QueryOptions queryOptions; + private ReconnectionPolicy reconnectionPolicy; + private ProtocolVersion protocolVersion; + + public static final String CLUSTER_NAME_IN_PROPS_FILE = "cluster.name"; + public static final String DC_NAME_IN_PROPS_FILE = "dc.name"; + public static final String KEYSPACE_NAME_IN_PROPS_FILE = "keyspace.name"; + public static final String TABLE_NAME_IN_PROPS_FILE = "table.name"; + public static final String SEEDNODES_IN_PROPS_FILE = "seednodes"; + + public ConnectionBuilder withClusterNameAs(String clusterName) + { + this.clusterName = clusterName; + return this; + } + + public ConnectionBuilder withDCNameAs(String dcName) + { + this.dcName = dcName; + return this; + } + + /** + * Used to specify the seed nodes of the target cassandra cluster. Format is + * host1:port;host2:port;host3:port + * @param seedNodesStr + * @return The builder instance as initially created updated with this value + */ + public ConnectionBuilder withSeedNodes(String seedNodesStr) + { + this.seedNodesStr = seedNodesStr; + return this; + } + + /** + * Used to specify the base delay while trying to set a Connection attempt policy + * @param baseDelayMillis + * @return The builder instance as initially created updated with this value + */ + public ConnectionBuilder withBaseDelayMillis(long baseDelayMillis) + { + this.baseDelayMs = baseDelayMillis; + return this; + } + + /** + * Used to specify the maximum time that can elapse before which a connection is given up as a failure attmept + * @param maxDelayMillis + * @return The builder instance as initially created updated with this value + */ + public ConnectionBuilder withMaxDelayMillis(long maxDelayMillis) + { + this.maxDelayMs = maxDelayMillis; + return this; + } + + public ConnectionBuilder withKeySpaceNameAs(String keyspaceName) + { + this.keyspaceName = keyspaceName; + return this; + } + + public ConnectionBuilder withTableNameAs(String tableName) + { + this.tableName = tableName; + return this; + } + + public ConnectionBuilder withdefaultTTL(Integer defaultTtlInSecs) + { + this.defaultTtlInSecs = defaultTtlInSecs; + return this; + } + + /** + * Used to specify the default consistency level when executing the mutations on the cluster. + * Default if not set is LOCAL_QUORUM. Can be overriden at the tuple level using {@link UpsertExecutionContext} + * @param consistencyLevel + * @return The builder instance as initially created updated with this value + */ + public ConnectionBuilder withdefaultConsistencyLevel(ConsistencyLevel consistencyLevel) + { + this.defaultConsistencyLevel = consistencyLevel; + return this; + } + + /** + * Used to define how the nodes in the cluster will be contacted for executing a mutation. + * The following is the default behaviour if not set. + * 1. Use a TokenAware approach i.e. the row key is used to decide the right node to execute the mutation + * on the target cassandra node. i.e. One of the R-1 replicas is used as the coordinator node. + * This effectively balances the traffic onto all nodes of the cassandra cluster for the given + * Operator instance. Of course this assumes the keys are evenly distributed in the cluster + * which is normally the case + * 2. Overlay TokenAware with DC aware approach - The above token aware approach is further segmented to use only + * the local DC for the mutation executions. Cassandras multi-DC execution will take care of the cross DC + * replication thus achieving the lowest possible latencies for the given mutation of writes. + * + * Using this effectively removes the need for an extra implementation of the Partitioning logic of the Operator + * Nor would we need any extra logic ( for most use cases ) for dynamic partitioning implementations as the + * underlying driver normalizes the traffic pattern anyways. + * @param loadBalancingPolicy + * @return The builder instance as initially created updated with this value + */ + public ConnectionBuilder withLoadBalancingPolicy(LoadBalancingPolicy loadBalancingPolicy) + { + this.loadBalancingPolicy = loadBalancingPolicy; + return this; + } + + /** + * Used to specify how queries will need to be retried in case the current in progress one fails. + * The default is to use a DowngradingConsistency Policy i.e. first LOCAL_QUORUM is attempted and if + * there is a failure because less than RF/2-1 nodes are alive, it automatically switches to the Consistency Level + * of LOCAL_ONE and so on. ( and hope that hint windows take care of the rest when the nodes come back up ) + * @param retryPolicy + * @return The builder instance as initially created updated with this value + */ + public ConnectionBuilder withRetryPolicy(RetryPolicy retryPolicy) + { + this.retryPolicy = retryPolicy; + return this; + } + + /** + * Used to set various aspects for executing a given query / mutation. + * The default is to use LOCAL_QUORUM consistency for all mutation queries + * @param queryOptions + * @return The builder instance as initially created updated with this value + */ + public ConnectionBuilder withQueryOptions(QueryOptions queryOptions) + { + this.queryOptions = queryOptions; + return this; + } + + /** + * Used to decide how to establish a connection to the cluster in case the current one fails. + * The default if not set is to use an ExponentialRetry Policy. + * The baseDelay and maxDelay are the two time windows that are used to specify the retry attempts + * in an exponential manner + * @param reconnectionPolicy + * @return The builder instance as initially created updated with this value + */ + public ConnectionBuilder withReconnectionPolicy(ReconnectionPolicy reconnectionPolicy) + { + this.reconnectionPolicy = reconnectionPolicy; + return this; + } + + public ConnectionBuilder withProtocolVersion(ProtocolVersion protocolVersion) + { + this.protocolVersion = protocolVersion; + return this; + } + + protected ConnectionStateManager initialize() + { + ConnectionStateManager operatorConnection = new ConnectionStateManager(this); + return operatorConnection; + } + + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/664257b4/contrib/src/main/java/com/datatorrent/contrib/cassandra/UpsertExecutionContext.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/cassandra/UpsertExecutionContext.java b/contrib/src/main/java/com/datatorrent/contrib/cassandra/UpsertExecutionContext.java new file mode 100644 index 0000000..66d0b6c --- /dev/null +++ b/contrib/src/main/java/com/datatorrent/contrib/cassandra/UpsertExecutionContext.java @@ -0,0 +1,210 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.cassandra; + +import com.datastax.driver.core.ConsistencyLevel; + +/** + * Each mutation in the cassandra table is decided by a context which can be passed to the Operator at runtime. + * This class represents such a context. The context is to be set by the upstream operator and is used to + * define how to mutate a row by passing it as a tuple to the {@link AbstractUpsertOutputOperator} + * The row to be mutated is represented by the payload represented by the template parameter T + * The following aspects of the mutation can be controlled by the context tuple. + * 1. Collection Mutation Style + * 2. List Placement style + * 3. Null Handling styles for partial mutations of a given row + * 4. Update only if a Primary Key exists + * 5. Override the TTL that is set at the default connection config. + * See {@link ConnectionStateManager.ConnectionBuilder} to set the default TTL for all payload executions + * 6. Override the default Consistency level to be used for the current mutation + * See {@link ConnectionStateManager.ConnectionBuilder} for setting default consistency. + */ +public class UpsertExecutionContext<T> +{ + + private CollectionMutationStyle collectionMutationStyle = CollectionMutationStyle.UNDEFINED; + + + private ListPlacementStyle listPlacementStyle = ListPlacementStyle.UNDEFINED; + + private NullHandlingMutationStyle nullHandlingMutationStyle = NullHandlingMutationStyle.UNDEFINED; + + private int overridingTTL; + + private ConsistencyLevel overridingConsistencyLevel; + + private boolean isTtlOverridden = false; + + private boolean isOverridingConsistencyLevelSet = false; + + private boolean updateOnlyIfPrimaryKeyExists = false; + + private T payload; + + public T getPayload() + { + return payload; + } + + public void setPayload(T payload) + { + this.payload = payload; + } + + /** + * Represents if the collection object inside the pojo needs to be added or remove + * from the table column represented by the POJO field. Refer {@link CollectionMutationStyle} for possible value + * If set as UNDEFINED, the default behaviour is to add the entry to the collection column + * @return + */ + public CollectionMutationStyle getCollectionMutationStyle() + { + return collectionMutationStyle; + } + + public void setCollectionMutationStyle(CollectionMutationStyle collectionMutationStyle) + { + this.collectionMutationStyle = collectionMutationStyle; + } + + /** + * Represents how the POJO List type field is mutated on the cassandra column which is of type + * list. Note that this is only applicable for list collections and decides whether the POJO element ( of type list) + * will be placed before + * @return The List Placement style that is used to execute this mutation. Defaults to Append mode if not set + */ + public ListPlacementStyle getListPlacementStyle() + { + return listPlacementStyle; + } + + public void setListPlacementStyle(ListPlacementStyle listPlacementStyle) + { + this.listPlacementStyle = listPlacementStyle; + } + + /** + * This decides how to handle nulls for POJO fields. This comes in handy when one would want to + * avoid a read of the column and then mutate the row i.e. in other words if the POJO is representing only a subset + * of the columns that needs to be mutated on the table. In such scenarios, set NullHandlingMutation Style to + * IGNORE_NULL_COLUMNS which would make the operator avoid an update on the column and thus preventing over-writing + * an existing value + * @return The Null handling style that is going to be used for the execution context. Defaults to Set Nulls and not + * ignore them + * */ + public NullHandlingMutationStyle getNullHandlingMutationStyle() + { + return nullHandlingMutationStyle; + } + + public void setNullHandlingMutationStyle(NullHandlingMutationStyle nullHandlingMutationStyle) + { + this.nullHandlingMutationStyle = nullHandlingMutationStyle; + } + + /** + * This decides if we want to override the default TTL if at all set in the + * {@link com.datatorrent.contrib.cassandra.ConnectionStateManager.ConnectionBuilder} that is used to execute a + * mutation. Note that TTLs are not mandatory for mutations. + * Also it is supported to have TTLs only for the current execution context but not set a default at the + * connection state manager level + * Unit of time is seconds + * @return The overriding TTL that will be used for the given execution context. + */ + public int getOverridingTTL() + { + return overridingTTL; + } + + public void setOverridingTTL(int overridingTTL) + { + this.overridingTTL = overridingTTL; + isTtlOverridden = true; + } + + public boolean isTtlOverridden() + { + return isTtlOverridden; + } + + /** + * Used to override the default consistency level that is set at the {@link ConnectionStateManager } level + * The default is to use LOCAL_QUORUM. This can be overridden at the execution context level on a per + * tuple basis. + * @return The consistency level that would be used to execute the current payload mutation + * */ + public ConsistencyLevel getOverridingConsistencyLevel() + { + return overridingConsistencyLevel; + } + + public void setOverridingConsistencyLevel(ConsistencyLevel overridingConsistencyLevel) + { + this.overridingConsistencyLevel = overridingConsistencyLevel; + isOverridingConsistencyLevelSet = true; + } + + public boolean isOverridingConsistencyLevelSet() + { + return isOverridingConsistencyLevelSet; + } + + public boolean isUpdateOnlyIfPrimaryKeyExists() + { + return updateOnlyIfPrimaryKeyExists; + } + + /** + * Used to execute a mutation only if the primary key exists. This can be used to conditionally execute + * a mutation i.e. if only the primary key exists. This can be used to force an "UPDATE" only use case + * for the current mutation. + * @param updateOnlyIfPrimaryKeyExists + */ + public void setUpdateOnlyIfPrimaryKeyExists(boolean updateOnlyIfPrimaryKeyExists) + { + this.updateOnlyIfPrimaryKeyExists = updateOnlyIfPrimaryKeyExists; + } + + enum CollectionMutationStyle implements BaseMutationStyle + { + ADD_TO_EXISTING_COLLECTION, + REMOVE_FROM_EXISTING_COLLECTION, + UNDEFINED + } + + enum ListPlacementStyle implements BaseMutationStyle + { + APPEND_TO_EXISTING_LIST, + PREPEND_TO_EXISTING_LIST, + UNDEFINED + } + + enum NullHandlingMutationStyle implements BaseMutationStyle + { + IGNORE_NULL_COLUMNS, + SET_NULL_COLUMNS, + UNDEFINED + } + + interface BaseMutationStyle + { + + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/664257b4/contrib/src/test/java/com/datatorrent/contrib/cassandra/AbstractUpsertOutputOperatorCodecsTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/cassandra/AbstractUpsertOutputOperatorCodecsTest.java b/contrib/src/test/java/com/datatorrent/contrib/cassandra/AbstractUpsertOutputOperatorCodecsTest.java new file mode 100644 index 0000000..87c783f --- /dev/null +++ b/contrib/src/test/java/com/datatorrent/contrib/cassandra/AbstractUpsertOutputOperatorCodecsTest.java @@ -0,0 +1,476 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.cassandra; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.junit.Before; +import org.junit.Test; + +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Row; +import com.datatorrent.api.Attribute; +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.lib.helper.OperatorContextTestHelper; +import com.datatorrent.lib.helper.TestPortContext; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +public class AbstractUpsertOutputOperatorCodecsTest +{ + + /*** + * The schema that is used + * + * + * + * + * CREATE KEYSPACE unittests + * WITH replication = { + * 'class' : 'SimpleStrategy', + * 'replication_factor' : 1 + * }; + * + * CREATE TYPE unittests.address ( + * street text, + * city text, + * zip_code int, + * phones set<text> + * ); + * + * CREATE TYPE unittests.fullname ( firstname text, lastname text ); + * + * CREATE TABLE unittests.users ( + * userid text PRIMARY KEY, + * username FROZEN<fullname>, + * emails set<text>, + * top_scores list<int>, + * todo map<timestamp, text>, + * siblings tuple<int, text,text>, + * currentaddress FROZEN<address>, + * previousnames FROZEN<list<fullname>> + * ); + * + * CREATE TABLE unittests.userupdates ( + * userid text PRIMARY KEY, + * updatecount counter + * ); + * + * CREATE TABLE unittests.userstatus ( + * userid text, + * day int, + * month int, + * year int, + * employeeid text, + * currentstatus text, + * PRIMARY KEY ((userid,day,month,year), employeeid)); + */ + + public static final String APP_ID = "TestCassandraUpsertOperator"; + public static final int OPERATOR_ID_FOR_USER_UPSERTS = 0; + + + UserUpsertOperator userUpsertOperator = null; + OperatorContextTestHelper.TestIdOperatorContext contextForUserUpsertOperator; + TestPortContext testPortContextForUserUpserts; + + + @Before + public void setupApexContexts() throws Exception + { + Attribute.AttributeMap.DefaultAttributeMap attributeMap = new Attribute.AttributeMap.DefaultAttributeMap(); + attributeMap.put(DAG.APPLICATION_ID, APP_ID); + contextForUserUpsertOperator = new OperatorContextTestHelper.TestIdOperatorContext(OPERATOR_ID_FOR_USER_UPSERTS, + attributeMap); + userUpsertOperator = new UserUpsertOperator(); + + Attribute.AttributeMap.DefaultAttributeMap portAttributes = new Attribute.AttributeMap.DefaultAttributeMap(); + portAttributes.put(Context.PortContext.TUPLE_CLASS, User.class); + testPortContextForUserUpserts = new TestPortContext(portAttributes); + + userUpsertOperator.setup(contextForUserUpsertOperator); + userUpsertOperator.activate(contextForUserUpsertOperator); + userUpsertOperator.input.setup(testPortContextForUserUpserts); + } + + @Test + public void testActivateForSchemaDetection() throws Exception + { + assertEquals(12, userUpsertOperator.getPreparedStatementTypes().size()); + assertEquals( + " UPDATE unittests.users SET currentaddress = :currentaddress, emails = emails - :emails, " + + "todo = todo - :todo, top_scores = top_scores - :top_scores, siblings = :siblings, " + + "previousnames = :previousnames, username = :username WHERE userid = :userid", + userUpsertOperator.getPreparedStatementTypes().get(101001100L).getQueryString()); + assertEquals(8, userUpsertOperator.getColumnDefinitions().size()); + assertEquals(true, userUpsertOperator.getColumnDefinitions().get("currentaddress").isFrozen()); + assertEquals(false, userUpsertOperator.getColumnDefinitions().get("currentaddress").isCollection()); + assertEquals(true, userUpsertOperator.getColumnDefinitions().get("top_scores").isCollection()); + assertEquals(true, userUpsertOperator.getColumnDefinitions().get("username").isFrozen()); + assertEquals(false, userUpsertOperator.getColumnDefinitions().get("username").isCollection()); + } + + @Test + public void testForGetters() throws Exception + { + Map<String, Object> getters = userUpsertOperator.getGetters(); + assertNotNull(getters); + assertEquals(7, getters.size()); + } + + @Test + public void testForSingleRowInsertWithCodecs() throws Exception + { + User aUser = new User(); + aUser.setUserid("user" + System.currentTimeMillis()); + FullName fullName = new FullName("first1" + System.currentTimeMillis(), "last1" + System.currentTimeMillis()); + aUser.setUsername(fullName); + Address address = new Address("wer", "hjfh", 12, null); + aUser.setCurrentaddress(address); + UpsertExecutionContext<User> anUpdate = new UpsertExecutionContext<>(); + anUpdate.setPayload(aUser); + userUpsertOperator.beginWindow(0); + userUpsertOperator.input.process(anUpdate); + userUpsertOperator.endWindow(); + + ResultSet results = userUpsertOperator.session.execute( + "SELECT * FROM unittests.users WHERE userid = '" + aUser.getUserid() + "'"); + List<Row> rows = results.all(); + assertEquals(rows.size(), 1); + assertTrue(results.isExhausted()); + } + + @Test + public void testForListAppend() throws Exception + { + User aUser = new User(); + String userId = "user" + System.currentTimeMillis(); + aUser.setUserid(userId); + FullName fullName = new FullName("first1" + System.currentTimeMillis(), "last1" + System.currentTimeMillis()); + aUser.setUsername(fullName); + Address address = new Address("street1", "city1", 13, null); + aUser.setCurrentaddress(address); + Set<String> emails = new HashSet<>(); + emails.add(new String("1")); + emails.add(new String("2")); + aUser.setEmails(emails); + List<Integer> topScores = new ArrayList<>(); + topScores.add(1); + topScores.add(2); + aUser.setTopScores(topScores); + UpsertExecutionContext<User> originalEntry = new UpsertExecutionContext<>(); + originalEntry.setPayload(aUser); + + UpsertExecutionContext<User> subsequentUpdateForTopScores = new UpsertExecutionContext<>(); + subsequentUpdateForTopScores.setListPlacementStyle( + UpsertExecutionContext.ListPlacementStyle.APPEND_TO_EXISTING_LIST); + subsequentUpdateForTopScores.setCollectionMutationStyle( + UpsertExecutionContext.CollectionMutationStyle.ADD_TO_EXISTING_COLLECTION); + subsequentUpdateForTopScores.setNullHandlingMutationStyle( + UpsertExecutionContext.NullHandlingMutationStyle.IGNORE_NULL_COLUMNS); + User oldUser = new User(); + oldUser.setUserid(userId); + List<Integer> topScoresAppended = new ArrayList<>(); + topScoresAppended.add(3); + oldUser.setTopScores(topScoresAppended); + subsequentUpdateForTopScores.setPayload(oldUser); + userUpsertOperator.beginWindow(1); + userUpsertOperator.input.process(originalEntry); + userUpsertOperator.input.process(subsequentUpdateForTopScores); + userUpsertOperator.endWindow(); + + ResultSet results = userUpsertOperator.session.execute( + "SELECT * FROM unittests.users WHERE userid = '" + userId + "'"); + List<Row> rows = results.all(); + Row userRow = rows.get(0); + List<Integer> topScoresEntry = userRow.getList("top_scores", Integer.class); + assertEquals(3, topScoresEntry.size()); + assertEquals("" + 3, "" + topScoresEntry.get(2)); + } + + @Test + public void testForListPrepend() throws Exception + { + User aUser = new User(); + String userId = "user" + System.currentTimeMillis(); + aUser.setUserid(userId); + FullName fullName = new FullName("first1" + System.currentTimeMillis(), "last1" + System.currentTimeMillis()); + aUser.setUsername(fullName); + List<Integer> topScores = new ArrayList<>(); + topScores.add(1); + topScores.add(2); + aUser.setTopScores(topScores); + UpsertExecutionContext<User> originalEntry = new UpsertExecutionContext<>(); + originalEntry.setPayload(aUser); + + UpsertExecutionContext<User> subsequentUpdateForTopScores = new UpsertExecutionContext<>(); + subsequentUpdateForTopScores.setListPlacementStyle( + UpsertExecutionContext.ListPlacementStyle.PREPEND_TO_EXISTING_LIST); + subsequentUpdateForTopScores.setCollectionMutationStyle( + UpsertExecutionContext.CollectionMutationStyle.ADD_TO_EXISTING_COLLECTION); + subsequentUpdateForTopScores.setNullHandlingMutationStyle( + UpsertExecutionContext.NullHandlingMutationStyle.IGNORE_NULL_COLUMNS); + User oldUser = new User(); + oldUser.setUserid(userId); + List<Integer> topScoresAppended = new ArrayList<>(); + topScoresAppended.add(3); + oldUser.setTopScores(topScoresAppended); + subsequentUpdateForTopScores.setPayload(oldUser); + userUpsertOperator.beginWindow(2); + userUpsertOperator.input.process(originalEntry); + userUpsertOperator.input.process(subsequentUpdateForTopScores); + userUpsertOperator.endWindow(); + + ResultSet results = userUpsertOperator.session.execute( + "SELECT * FROM unittests.users WHERE userid = '" + userId + "'"); + List<Row> rows = results.all(); + Row userRow = rows.get(0); + List<Integer> topScoresEntry = userRow.getList("top_scores", Integer.class); + assertEquals(3, topScoresEntry.size()); + assertEquals("" + 3, "" + topScoresEntry.get(0)); + } + + @Test + public void testForCollectionRemoval() throws Exception + { + User aUser = new User(); + String userId = "user" + System.currentTimeMillis(); + aUser.setUserid(userId); + FullName fullName = new FullName("first12" + System.currentTimeMillis(), "last12" + System.currentTimeMillis()); + aUser.setUsername(fullName); + Set<String> emails = new HashSet<>(); + emails.add(new String("1")); + emails.add(new String("2")); + aUser.setEmails(emails); + + UpsertExecutionContext<User> originalEntry = new UpsertExecutionContext<>(); + originalEntry.setPayload(aUser); + + UpsertExecutionContext<User> subsequentUpdateForEmails = new UpsertExecutionContext<>(); + subsequentUpdateForEmails.setCollectionMutationStyle( + UpsertExecutionContext.CollectionMutationStyle.REMOVE_FROM_EXISTING_COLLECTION); + subsequentUpdateForEmails.setNullHandlingMutationStyle( + UpsertExecutionContext.NullHandlingMutationStyle.IGNORE_NULL_COLUMNS); + User oldUser = new User(); + oldUser.setUserid(userId); + Set<String> updatedEmails = new HashSet<>(); + updatedEmails.add(new String("1")); + oldUser.setEmails(updatedEmails); + subsequentUpdateForEmails.setPayload(oldUser); + userUpsertOperator.beginWindow(3); + userUpsertOperator.input.process(originalEntry); + userUpsertOperator.input.process(subsequentUpdateForEmails); + userUpsertOperator.endWindow(); + + ResultSet results = userUpsertOperator.session.execute( + "SELECT * FROM unittests.users WHERE userid = '" + userId + "'"); + List<Row> rows = results.all(); + Row userRow = rows.get(0); + Set<String> existingEmailsEntry = userRow.getSet("emails", String.class); + assertEquals(1, existingEmailsEntry.size()); + assertEquals("" + 2, "" + existingEmailsEntry.iterator().next()); + } + + @Test + public void testForCollectionRemovalAndIfExists() throws Exception + { + User aUser = new User(); + String userId = "user" + System.currentTimeMillis(); + aUser.setUserid(userId); + FullName fullName = new FullName("first12" + System.currentTimeMillis(), "last12" + System.currentTimeMillis()); + aUser.setUsername(fullName); + Set<String> emails = new HashSet<>(); + emails.add(new String("1")); + emails.add(new String("2")); + aUser.setEmails(emails); + + UpsertExecutionContext<User> originalEntry = new UpsertExecutionContext<>(); + originalEntry.setPayload(aUser); + + UpsertExecutionContext<User> subsequentUpdateForEmails = new UpsertExecutionContext<>(); + subsequentUpdateForEmails.setCollectionMutationStyle( + UpsertExecutionContext.CollectionMutationStyle.REMOVE_FROM_EXISTING_COLLECTION); + subsequentUpdateForEmails.setNullHandlingMutationStyle( + UpsertExecutionContext.NullHandlingMutationStyle.IGNORE_NULL_COLUMNS); + subsequentUpdateForEmails.setUpdateOnlyIfPrimaryKeyExists(true); + User oldUser = new User(); + oldUser.setUserid(userId + System.currentTimeMillis()); // overriding with a non-existent user id + Set<String> updatedEmails = new HashSet<>(); + updatedEmails.add(new String("1")); + oldUser.setEmails(updatedEmails); + subsequentUpdateForEmails.setPayload(oldUser); + userUpsertOperator.beginWindow(4); + userUpsertOperator.input.process(originalEntry); + userUpsertOperator.input.process(subsequentUpdateForEmails); + userUpsertOperator.endWindow(); + + ResultSet results = userUpsertOperator.session.execute( + "SELECT * FROM unittests.users WHERE userid = '" + userId + "'"); + List<Row> rows = results.all(); + Row userRow = rows.get(0); + Set<String> existingEmailsEntry = userRow.getSet("emails", String.class); + assertEquals(2, existingEmailsEntry.size()); + assertEquals("" + 1, "" + existingEmailsEntry.iterator().next()); + } + + @Test + public void testForListAppendAndIfExists() throws Exception + { + User aUser = new User(); + String userId = "user" + System.currentTimeMillis(); + aUser.setUserid(userId); + FullName fullName = new FullName("first" + System.currentTimeMillis(), "last" + System.currentTimeMillis()); + aUser.setUsername(fullName); + Address address = new Address("street1", "city1", 13, null); + aUser.setCurrentaddress(address); + Set<String> emails = new HashSet<>(); + emails.add(new String("1")); + emails.add(new String("2")); + aUser.setEmails(emails); + List<Integer> topScores = new ArrayList<>(); + topScores.add(1); + topScores.add(2); + aUser.setTopScores(topScores); + UpsertExecutionContext<User> originalEntry = new UpsertExecutionContext<>(); + originalEntry.setPayload(aUser); + + UpsertExecutionContext<User> subsequentUpdateForTopScores = new UpsertExecutionContext<>(); + subsequentUpdateForTopScores.setListPlacementStyle( + UpsertExecutionContext.ListPlacementStyle.APPEND_TO_EXISTING_LIST); + subsequentUpdateForTopScores.setCollectionMutationStyle( + UpsertExecutionContext.CollectionMutationStyle.ADD_TO_EXISTING_COLLECTION); + subsequentUpdateForTopScores.setNullHandlingMutationStyle( + UpsertExecutionContext.NullHandlingMutationStyle.IGNORE_NULL_COLUMNS); + subsequentUpdateForTopScores.setUpdateOnlyIfPrimaryKeyExists(true); + User oldUser = new User(); + oldUser.setUserid(userId + System.currentTimeMillis()); + List<Integer> topScoresAppended = new ArrayList<>(); + topScoresAppended.add(3); + oldUser.setTopScores(topScoresAppended); + subsequentUpdateForTopScores.setPayload(oldUser); + userUpsertOperator.beginWindow(5); + userUpsertOperator.input.process(originalEntry); + userUpsertOperator.input.process(subsequentUpdateForTopScores); + userUpsertOperator.endWindow(); + + ResultSet results = userUpsertOperator.session.execute( + "SELECT * FROM unittests.users WHERE userid = '" + userId + "'"); + List<Row> rows = results.all(); + Row userRow = rows.get(0); + List<Integer> topScoresEntry = userRow.getList("top_scores", Integer.class); + assertEquals(2, topScoresEntry.size()); + assertEquals("" + 2, "" + topScoresEntry.get(1)); + } + + @Test + public void testForListPrependAndExplicitNullForSomeColumns() throws Exception + { + User aUser = new User(); + String userId = "user" + System.currentTimeMillis(); + aUser.setUserid(userId); + FullName fullName = new FullName("first24" + System.currentTimeMillis(), "last" + System.currentTimeMillis()); + aUser.setUsername(fullName); + List<Integer> topScores = new ArrayList<>(); + topScores.add(1); + topScores.add(2); + aUser.setTopScores(topScores); + UpsertExecutionContext<User> originalEntry = new UpsertExecutionContext<>(); + originalEntry.setPayload(aUser); + + UpsertExecutionContext<User> subsequentUpdateForTopScores = new UpsertExecutionContext<>(); + subsequentUpdateForTopScores.setListPlacementStyle( + UpsertExecutionContext.ListPlacementStyle.PREPEND_TO_EXISTING_LIST); + subsequentUpdateForTopScores.setCollectionMutationStyle( + UpsertExecutionContext.CollectionMutationStyle.ADD_TO_EXISTING_COLLECTION); + subsequentUpdateForTopScores.setNullHandlingMutationStyle( + UpsertExecutionContext.NullHandlingMutationStyle.SET_NULL_COLUMNS); + User oldUser = new User(); + oldUser.setUserid(userId); + List<Integer> topScoresAppended = new ArrayList<>(); + topScoresAppended.add(3); + oldUser.setTopScores(topScoresAppended); + subsequentUpdateForTopScores.setPayload(oldUser); + userUpsertOperator.beginWindow(6); + userUpsertOperator.input.process(originalEntry); + userUpsertOperator.input.process(subsequentUpdateForTopScores); + userUpsertOperator.endWindow(); + + ResultSet results = userUpsertOperator.session.execute( + "SELECT * FROM unittests.users WHERE userid = '" + userId + "'"); + List<Row> rows = results.all(); + Row userRow = rows.get(0); + FullName name = userRow.get("username", FullName.class); + assertEquals(null, name); + } + + @Test + public void testForSingleRowInsertWithTTL() throws Exception + { + User aUser = new User(); + aUser.setUserid("userWithTTL" + System.currentTimeMillis()); + FullName fullName = new FullName("firstname" + System.currentTimeMillis(), "lasName" + System.currentTimeMillis()); + aUser.setUsername(fullName); + Address address = new Address("city1", "Street1", 12, null); + aUser.setCurrentaddress(address); + UpsertExecutionContext<User> anUpdate = new UpsertExecutionContext<>(); + anUpdate.setOverridingTTL(5000); + anUpdate.setPayload(aUser); + userUpsertOperator.beginWindow(7); + userUpsertOperator.input.process(anUpdate); + userUpsertOperator.endWindow(); + + ResultSet results = userUpsertOperator.session.execute( + "SELECT * FROM unittests.users WHERE userid = '" + aUser.getUserid() + "'"); + List<Row> rows = results.all(); + assertEquals(rows.size(), 1); + assertTrue(results.isExhausted()); + } + + @Test + public void testForSingleRowInsertWithOverridingConsistency() throws Exception + { + User aUser = new User(); + aUser.setUserid("userWithConsistency" + System.currentTimeMillis()); + FullName fullName = new FullName("first" + System.currentTimeMillis(), "last" + System.currentTimeMillis()); + aUser.setUsername(fullName); + Address address = new Address("city21", "Street31", 12, null); + aUser.setCurrentaddress(address); + UpsertExecutionContext<User> anUpdate = new UpsertExecutionContext<>(); + anUpdate.setOverridingConsistencyLevel(ConsistencyLevel.LOCAL_SERIAL); + anUpdate.setPayload(aUser); + userUpsertOperator.beginWindow(8); + userUpsertOperator.input.process(anUpdate); + userUpsertOperator.endWindow(); + + ResultSet results = userUpsertOperator.session.execute( + "SELECT * FROM unittests.users WHERE userid = '" + aUser.getUserid() + "'"); + List<Row> rows = results.all(); + assertEquals(rows.size(), 1); + assertTrue(results.isExhausted()); + } + + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/664257b4/contrib/src/test/java/com/datatorrent/contrib/cassandra/AbstractUpsertOutputOperatorCompositePKTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/cassandra/AbstractUpsertOutputOperatorCompositePKTest.java b/contrib/src/test/java/com/datatorrent/contrib/cassandra/AbstractUpsertOutputOperatorCompositePKTest.java new file mode 100644 index 0000000..b0863e0 --- /dev/null +++ b/contrib/src/test/java/com/datatorrent/contrib/cassandra/AbstractUpsertOutputOperatorCompositePKTest.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.cassandra; + +import java.util.List; +import org.junit.Before; +import org.junit.Test; + +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Row; +import com.datatorrent.api.Attribute; +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.lib.helper.OperatorContextTestHelper; +import com.datatorrent.lib.helper.TestPortContext; + +import static org.junit.Assert.assertEquals; + +/** + * A simple test class that checks functionality when composite primary keys are present as table definitions + */ +public class AbstractUpsertOutputOperatorCompositePKTest +{ + + + public static final String APP_ID = "TestCassandraUpsertOperator"; + public static final int OPERATOR_ID_FOR_COMPOSITE_PRIMARY_KEYS = 2; + + CompositePrimaryKeyUpdateOperator compositePrimaryKeysOperator = null; + OperatorContextTestHelper.TestIdOperatorContext contextForCompositePrimaryKeysOperator; + TestPortContext testPortContextForCompositePrimaryKeys; + + @Before + public void setupApexContexts() throws Exception + { + Attribute.AttributeMap.DefaultAttributeMap attributeMapForCompositePrimaryKey = + new Attribute.AttributeMap.DefaultAttributeMap(); + attributeMapForCompositePrimaryKey.put(DAG.APPLICATION_ID, APP_ID); + contextForCompositePrimaryKeysOperator = new OperatorContextTestHelper.TestIdOperatorContext( + OPERATOR_ID_FOR_COMPOSITE_PRIMARY_KEYS, + attributeMapForCompositePrimaryKey); + + Attribute.AttributeMap.DefaultAttributeMap portAttributesForCompositePrimaryKeys = + new Attribute.AttributeMap.DefaultAttributeMap(); + portAttributesForCompositePrimaryKeys.put(Context.PortContext.TUPLE_CLASS, CompositePrimaryKeyRow.class); + testPortContextForCompositePrimaryKeys = new TestPortContext(portAttributesForCompositePrimaryKeys); + + compositePrimaryKeysOperator = new CompositePrimaryKeyUpdateOperator(); + compositePrimaryKeysOperator.setup(contextForCompositePrimaryKeysOperator); + compositePrimaryKeysOperator.activate(contextForCompositePrimaryKeysOperator); + compositePrimaryKeysOperator.input.setup(testPortContextForCompositePrimaryKeys); + } + + @Test + public void testForCompositeRowKeyBasedTable() throws Exception + { + CompositePrimaryKeyRow aCompositeRowKey = new CompositePrimaryKeyRow(); + String userId = new String("user1" + System.currentTimeMillis()); + String employeeId = new String(userId + System.currentTimeMillis()); + int day = 1; + int month = 12; + int year = 2017; + aCompositeRowKey.setDay(day); + aCompositeRowKey.setMonth(month); + aCompositeRowKey.setYear(year); + aCompositeRowKey.setCurrentstatus("status" + System.currentTimeMillis()); + aCompositeRowKey.setUserid(userId); + aCompositeRowKey.setEmployeeid(employeeId); + UpsertExecutionContext<CompositePrimaryKeyRow> anUpdate = new UpsertExecutionContext<>(); + anUpdate.setPayload(aCompositeRowKey); + compositePrimaryKeysOperator.beginWindow(12); + compositePrimaryKeysOperator.input.process(anUpdate); + compositePrimaryKeysOperator.endWindow(); + + ResultSet results = compositePrimaryKeysOperator.session.execute( + "SELECT * FROM unittests.userstatus WHERE userid = '" + userId + "' and day=" + day + " and month=" + + month + " and year=" + year + " and employeeid='" + employeeId + "'"); + List<Row> rows = results.all(); + assertEquals(rows.size(), 1); + } + + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/664257b4/contrib/src/test/java/com/datatorrent/contrib/cassandra/AbstractUpsertOutputOperatorCountersTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/cassandra/AbstractUpsertOutputOperatorCountersTest.java b/contrib/src/test/java/com/datatorrent/contrib/cassandra/AbstractUpsertOutputOperatorCountersTest.java new file mode 100644 index 0000000..b32dc43 --- /dev/null +++ b/contrib/src/test/java/com/datatorrent/contrib/cassandra/AbstractUpsertOutputOperatorCountersTest.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.cassandra; + +import java.util.List; +import org.junit.Before; +import org.junit.Test; +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Row; +import com.datatorrent.api.Attribute; +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.lib.helper.OperatorContextTestHelper; +import com.datatorrent.lib.helper.TestPortContext; + +import static org.junit.Assert.assertEquals; + +/** + * A simple test class that tests the functionality when a table with counters is the sink for POJOS + */ +public class AbstractUpsertOutputOperatorCountersTest +{ + + public static final String APP_ID = "TestCassandraUpsertOperator"; + public static final int OPERATOR_ID_FOR_COUNTER_COLUMNS = 1; + + CounterColumnUpdatesOperator counterUpdatesOperator = null; + OperatorContextTestHelper.TestIdOperatorContext contextForCountersOperator; + TestPortContext testPortContextForCounters; + + @Before + public void setupApexContexts() throws Exception + { + Attribute.AttributeMap.DefaultAttributeMap attributeMapForCounters = + new Attribute.AttributeMap.DefaultAttributeMap(); + attributeMapForCounters.put(DAG.APPLICATION_ID, APP_ID); + contextForCountersOperator = new OperatorContextTestHelper.TestIdOperatorContext(OPERATOR_ID_FOR_COUNTER_COLUMNS, + attributeMapForCounters); + + Attribute.AttributeMap.DefaultAttributeMap portAttributesForCounters = + new Attribute.AttributeMap.DefaultAttributeMap(); + portAttributesForCounters.put(Context.PortContext.TUPLE_CLASS, CounterColumnTableEntry.class); + testPortContextForCounters = new TestPortContext(portAttributesForCounters); + + counterUpdatesOperator = new CounterColumnUpdatesOperator(); + counterUpdatesOperator.setup(contextForCountersOperator); + counterUpdatesOperator.activate(contextForCountersOperator); + counterUpdatesOperator.input.setup(testPortContextForCounters); + } + + @Test + public void testForSingleRowInsertForCounterTables() throws Exception + { + + CounterColumnTableEntry aCounterEntry = new CounterColumnTableEntry(); + String userId = new String("user1" + System.currentTimeMillis()); + aCounterEntry.setUserId(userId); + aCounterEntry.setUpdatecount(3); + UpsertExecutionContext<CounterColumnTableEntry> anUpdate = new UpsertExecutionContext<>(); + anUpdate.setOverridingConsistencyLevel(ConsistencyLevel.LOCAL_ONE); + anUpdate.setPayload(aCounterEntry); + counterUpdatesOperator.beginWindow(9); + counterUpdatesOperator.input.process(anUpdate); + counterUpdatesOperator.endWindow(); + + ResultSet results = counterUpdatesOperator.session.execute( + "SELECT * FROM unittests.userupdates WHERE userid = '" + userId + "'"); + List<Row> rows = results.all(); + assertEquals(rows.size(), 1); + assertEquals(3, rows.get(0).getLong("updatecount")); + + aCounterEntry = new CounterColumnTableEntry(); + aCounterEntry.setUserId(userId); + aCounterEntry.setUpdatecount(2); + anUpdate = new UpsertExecutionContext<>(); + anUpdate.setOverridingConsistencyLevel(ConsistencyLevel.LOCAL_ONE); + anUpdate.setPayload(aCounterEntry); + counterUpdatesOperator.beginWindow(10); + counterUpdatesOperator.input.process(anUpdate); + counterUpdatesOperator.endWindow(); + results = counterUpdatesOperator.session.execute( + "SELECT * FROM unittests.userupdates WHERE userid = '" + userId + "'"); + rows = results.all(); + assertEquals(5, rows.get(0).getLong("updatecount")); + aCounterEntry = new CounterColumnTableEntry(); + aCounterEntry.setUserId(userId); + aCounterEntry.setUpdatecount(-1); + anUpdate = new UpsertExecutionContext<>(); + anUpdate.setOverridingConsistencyLevel(ConsistencyLevel.LOCAL_ONE); + anUpdate.setPayload(aCounterEntry); + + counterUpdatesOperator.beginWindow(11); + counterUpdatesOperator.input.process(anUpdate); + counterUpdatesOperator.endWindow(); + + results = counterUpdatesOperator.session.execute( + "SELECT * FROM unittests.userupdates WHERE userid = '" + userId + "'"); + rows = results.all(); + assertEquals(4, rows.get(0).getLong("updatecount")); + + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/664257b4/contrib/src/test/java/com/datatorrent/contrib/cassandra/Address.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/cassandra/Address.java b/contrib/src/test/java/com/datatorrent/contrib/cassandra/Address.java new file mode 100644 index 0000000..faeef03 --- /dev/null +++ b/contrib/src/test/java/com/datatorrent/contrib/cassandra/Address.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.cassandra; + +import java.util.Set; + +/** + * + */ +public class Address +{ + private String city; + + private String street; + + private int zip_code; + private Set<String> phones; + + public Address(String city, String street, int zipcode, Set<String> phones) + { + this.city = city; + this.street = street; + this.zip_code = zipcode; + this.phones = phones; + } + + public String getCity() + { + return city; + } + + public void setCity(String city) + { + this.city = city; + } + + public String getStreet() + { + return street; + } + + public void setStreet(String street) + { + this.street = street; + } + + public int getZip_code() + { + return zip_code; + } + + public void setZip_code(int zip_code) + { + this.zip_code = zip_code; + } + + public Set<String> getPhones() + { + return phones; + } + + public void setPhones(Set<String> phones) + { + this.phones = phones; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/664257b4/contrib/src/test/java/com/datatorrent/contrib/cassandra/AddressCodec.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/cassandra/AddressCodec.java b/contrib/src/test/java/com/datatorrent/contrib/cassandra/AddressCodec.java new file mode 100644 index 0000000..017b34d --- /dev/null +++ b/contrib/src/test/java/com/datatorrent/contrib/cassandra/AddressCodec.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.cassandra; + +import java.nio.ByteBuffer; + +import com.datastax.driver.core.ProtocolVersion; +import com.datastax.driver.core.TypeCodec; +import com.datastax.driver.core.UDTValue; +import com.datastax.driver.core.UserType; +import com.datastax.driver.core.exceptions.InvalidTypeException; + +public class AddressCodec extends TypeCodec<Address> +{ + + private final TypeCodec<UDTValue> innerCodec; + + private final UserType userType; + + public AddressCodec(TypeCodec<UDTValue> innerCodec, Class<Address> javaType) + { + super(innerCodec.getCqlType(), javaType); + this.innerCodec = innerCodec; + this.userType = (UserType)innerCodec.getCqlType(); + } + + @Override + public ByteBuffer serialize(Address value, ProtocolVersion protocolVersion) throws InvalidTypeException + { + return innerCodec.serialize(toUDTValue(value), protocolVersion); + } + + @Override + public Address deserialize(ByteBuffer bytes, ProtocolVersion protocolVersion) throws InvalidTypeException + { + return toAddress(innerCodec.deserialize(bytes, protocolVersion)); + } + + @Override + public Address parse(String value) throws InvalidTypeException + { + return value == null || value.isEmpty() ? null : toAddress(innerCodec.parse(value)); + } + + @Override + public String format(Address value) throws InvalidTypeException + { + return value == null ? null : innerCodec.format(toUDTValue(value)); + } + + protected Address toAddress(UDTValue value) + { + return value == null ? null : new Address( + value.getString("city"), + value.getString("street"), + value.getInt("zip_code"), + value.getSet("phones", String.class) + ); + } + + protected UDTValue toUDTValue(Address value) + { + return value == null ? null : userType.newValue() + .setString("street", value.getStreet()) + .setInt("zip_code", value.getZip_code()) + .setString("city", value.getCity()) + .setSet("phones", value.getPhones()); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/664257b4/contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java index 74f99a8..4a1f883 100644 --- a/contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java +++ b/contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java @@ -18,30 +18,45 @@ */ package com.datatorrent.contrib.cassandra; -import com.datastax.driver.core.*; -import com.datastax.driver.core.exceptions.DriverException; -import com.datatorrent.api.Attribute; -import com.datatorrent.api.Attribute.AttributeMap; -import com.datatorrent.api.Context; -import com.datatorrent.api.DAG; -import com.datatorrent.lib.helper.TestPortContext; -import com.datatorrent.lib.util.FieldInfo; -import com.datatorrent.netlet.util.DTThrowable; -import com.datatorrent.lib.helper.OperatorContextTestHelper; -import com.datatorrent.lib.testbench.CollectorTestSink; -import com.google.common.collect.Lists; - -import java.util.*; +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; import org.junit.After; +import org.junit.AfterClass; import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; -import org.junit.AfterClass; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.datastax.driver.core.BoundStatement; +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.Configuration; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ProtocolVersion; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Row; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.exceptions.DriverException; +import com.google.common.collect.Lists; + +import com.datatorrent.api.Attribute; +import com.datatorrent.api.Attribute.AttributeMap; +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.lib.helper.OperatorContextTestHelper; +import com.datatorrent.lib.helper.TestPortContext; +import com.datatorrent.lib.testbench.CollectorTestSink; +import com.datatorrent.lib.util.FieldInfo; +import com.datatorrent.netlet.util.DTThrowable; + /** * Tests for {@link AbstractCassandraTransactionableOutputOperator} and {@link AbstractCassandraInputOperator} */ @@ -249,7 +264,7 @@ public class CassandraOperatorTest outputOperator.setup(context); Configuration config = outputOperator.getStore().getCluster().getConfiguration(); - Assert.assertEquals("Procotol version was not set to V2.", ProtocolVersion.V2, config.getProtocolOptions().getProtocolVersionEnum()); + Assert.assertEquals("Procotol version was not set to V2.", ProtocolVersion.V2, config.getProtocolOptions().getProtocolVersion()); } @Test http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/664257b4/contrib/src/test/java/com/datatorrent/contrib/cassandra/CompositePrimaryKeyRow.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/cassandra/CompositePrimaryKeyRow.java b/contrib/src/test/java/com/datatorrent/contrib/cassandra/CompositePrimaryKeyRow.java new file mode 100644 index 0000000..9aa2dae --- /dev/null +++ b/contrib/src/test/java/com/datatorrent/contrib/cassandra/CompositePrimaryKeyRow.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.cassandra; + +/** + A pojo that represents the table row in which a row key is a composite + */ +public class CompositePrimaryKeyRow +{ + + private String userid; + + private int day; + + private int month; + + private int year; + + private String employeeid; + + private String currentstatus; + + public String getUserid() + { + return userid; + } + + public void setUserid(String userid) + { + this.userid = userid; + } + + public int getDay() + { + return day; + } + + public void setDay(int day) + { + this.day = day; + } + + public int getMonth() + { + return month; + } + + public void setMonth(int month) + { + this.month = month; + } + + public int getYear() + { + return year; + } + + public void setYear(int year) + { + this.year = year; + } + + public String getEmployeeid() + { + return employeeid; + } + + public void setEmployeeid(String employeeid) + { + this.employeeid = employeeid; + } + + public String getCurrentstatus() + { + return currentstatus; + } + + public void setCurrentstatus(String currentstatus) + { + this.currentstatus = currentstatus; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/664257b4/contrib/src/test/java/com/datatorrent/contrib/cassandra/CompositePrimaryKeyUpdateOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/cassandra/CompositePrimaryKeyUpdateOperator.java b/contrib/src/test/java/com/datatorrent/contrib/cassandra/CompositePrimaryKeyUpdateOperator.java new file mode 100644 index 0000000..f2d6737 --- /dev/null +++ b/contrib/src/test/java/com/datatorrent/contrib/cassandra/CompositePrimaryKeyUpdateOperator.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.cassandra; + +import java.util.Map; +import com.datastax.driver.core.TypeCodec; + +/** + A concrete implementation of the AbstractUpsertOperator that tests composite primary key use cases + Please run the schema as given in {@link AbstractUpsertOutputOperatorCodecsTest} + */ +public class CompositePrimaryKeyUpdateOperator extends AbstractUpsertOutputOperator +{ + @Override + public ConnectionStateManager.ConnectionBuilder withConnectionBuilder() + { + return ConnectionStateManager.withNewBuilder() + .withSeedNodes("localhost") + .withClusterNameAs("Test Cluster") + .withDCNameAs("datacenter1") + .withTableNameAs("userstatus") + .withKeySpaceNameAs("unittests"); + } + + @Override + public Map<String, TypeCodec> getCodecsForUserDefinedTypes() + { + return null; + } + + @Override + public Class getPayloadPojoClass() + { + return CompositePrimaryKeyRow.class; + } + + @Override + boolean reconcileRecord(Object T, long windowId) + { + return true; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/664257b4/contrib/src/test/java/com/datatorrent/contrib/cassandra/CounterColumnTableEntry.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/cassandra/CounterColumnTableEntry.java b/contrib/src/test/java/com/datatorrent/contrib/cassandra/CounterColumnTableEntry.java new file mode 100644 index 0000000..72b3373 --- /dev/null +++ b/contrib/src/test/java/com/datatorrent/contrib/cassandra/CounterColumnTableEntry.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.cassandra; + +public class CounterColumnTableEntry +{ + private String userId; + + private long updatecount; + + public String getUserId() + { + return userId; + } + + public void setUserId(String userId) + { + this.userId = userId; + } + + public long getUpdatecount() + { + return updatecount; + } + + public void setUpdatecount(long updatecount) + { + this.updatecount = updatecount; + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/664257b4/contrib/src/test/java/com/datatorrent/contrib/cassandra/CounterColumnUpdatesOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/cassandra/CounterColumnUpdatesOperator.java b/contrib/src/test/java/com/datatorrent/contrib/cassandra/CounterColumnUpdatesOperator.java new file mode 100644 index 0000000..1ec416c --- /dev/null +++ b/contrib/src/test/java/com/datatorrent/contrib/cassandra/CounterColumnUpdatesOperator.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.cassandra; + +import java.util.Map; +import com.datastax.driver.core.TypeCodec; + +public class CounterColumnUpdatesOperator extends AbstractUpsertOutputOperator +{ + @Override + public ConnectionStateManager.ConnectionBuilder withConnectionBuilder() + { + return ConnectionStateManager.withNewBuilder() + .withSeedNodes("localhost") + .withClusterNameAs("Test Cluster") + .withDCNameAs("datacenter1") + .withTableNameAs("userupdates") + .withKeySpaceNameAs("unittests"); + } + + @Override + public Map<String, TypeCodec> getCodecsForUserDefinedTypes() + { + return null; + } + + @Override + public Class getPayloadPojoClass() + { + return CounterColumnTableEntry.class; + } + + @Override + boolean reconcileRecord(Object T, long windowId) + { + return true; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/664257b4/contrib/src/test/java/com/datatorrent/contrib/cassandra/FullName.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/cassandra/FullName.java b/contrib/src/test/java/com/datatorrent/contrib/cassandra/FullName.java new file mode 100644 index 0000000..61636bb --- /dev/null +++ b/contrib/src/test/java/com/datatorrent/contrib/cassandra/FullName.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.cassandra; + +public class FullName +{ + private String firstname; + + private String lastname; + + public FullName(String firstname, String lastname) + { + this.firstname = firstname; + this.lastname = lastname; + } + + public String getFirstname() + { + return firstname; + } + + public void setFirstname(String firstname) + { + this.firstname = firstname; + } + + public String getLastname() + { + return lastname; + } + + public void setLastname(String lastname) + { + this.lastname = lastname; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/664257b4/contrib/src/test/java/com/datatorrent/contrib/cassandra/FullNameCodec.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/cassandra/FullNameCodec.java b/contrib/src/test/java/com/datatorrent/contrib/cassandra/FullNameCodec.java new file mode 100644 index 0000000..9816d43 --- /dev/null +++ b/contrib/src/test/java/com/datatorrent/contrib/cassandra/FullNameCodec.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.cassandra; + +import java.nio.ByteBuffer; + +import com.datastax.driver.core.ProtocolVersion; +import com.datastax.driver.core.TypeCodec; +import com.datastax.driver.core.UDTValue; +import com.datastax.driver.core.UserType; +import com.datastax.driver.core.exceptions.InvalidTypeException; + +public class FullNameCodec extends TypeCodec<FullName> +{ + + private final TypeCodec<UDTValue> innerCodec; + + private final UserType userType; + + public FullNameCodec(TypeCodec<UDTValue> innerCodec, Class<FullName> javaType) + { + super(innerCodec.getCqlType(), javaType); + this.innerCodec = innerCodec; + this.userType = (UserType)innerCodec.getCqlType(); + } + + @Override + public ByteBuffer serialize(FullName value, ProtocolVersion protocolVersion) throws InvalidTypeException + { + return innerCodec.serialize(toUDTValue(value), protocolVersion); + } + + @Override + public FullName deserialize(ByteBuffer bytes, ProtocolVersion protocolVersion) throws InvalidTypeException + { + return toFullName(innerCodec.deserialize(bytes, protocolVersion)); + } + + @Override + public FullName parse(String value) throws InvalidTypeException + { + return value == null || value.isEmpty() ? null : toFullName(innerCodec.parse(value)); + } + + @Override + public String format(FullName value) throws InvalidTypeException + { + return value == null ? null : innerCodec.format(toUDTValue(value)); + } + + protected FullName toFullName(UDTValue value) + { + return value == null ? null : new FullName( + value.getString("firstname"), + value.getString("lastname")); + } + + protected UDTValue toUDTValue(FullName value) + { + return value == null ? null : userType.newValue() + .setString("lastname", value.getLastname()) + .setString("firstname", value.getFirstname()); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/664257b4/contrib/src/test/java/com/datatorrent/contrib/cassandra/User.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/cassandra/User.java b/contrib/src/test/java/com/datatorrent/contrib/cassandra/User.java new file mode 100644 index 0000000..f4c1d8d --- /dev/null +++ b/contrib/src/test/java/com/datatorrent/contrib/cassandra/User.java @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.cassandra; + +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.Set; + + +public class User +{ + private String userid; + private FullName username; + private Set<String> emails; + private List<Integer> topScores; + private Map<Date, String> todo; + private Address currentaddress; + private List<FullName> previousnames; + + private String nonMatchingColumn; + + private List<Integer> nonMatchingCollectionColumn; + + public String getUserid() + { + return userid; + } + + public void setUserid(String userid) + { + this.userid = userid; + } + + public FullName getUsername() + { + return username; + } + + public void setUsername(FullName username) + { + this.username = username; + } + + public Set<String> getEmails() + { + return emails; + } + + public void setEmails(Set<String> emails) + { + this.emails = emails; + } + + public List<Integer> getTopScores() + { + return topScores; + } + + public void setTopScores(List<Integer> topScores) + { + this.topScores = topScores; + } + + public Map<Date, String> getTodo() + { + return todo; + } + + public void setTodo(Map<Date, String> todo) + { + this.todo = todo; + } + + public String getNonMatchingColumn() + { + return nonMatchingColumn; + } + + public void setNonMatchingColumn(String nonMatchingColumn) + { + this.nonMatchingColumn = nonMatchingColumn; + } + + public List<Integer> getNonMatchingCollectionColumn() + { + return nonMatchingCollectionColumn; + } + + public void setNonMatchingCollectionColumn(List<Integer> nonMatchingCollectionColumn) + { + this.nonMatchingCollectionColumn = nonMatchingCollectionColumn; + } + + public Address getCurrentaddress() + { + return currentaddress; + } + + public void setCurrentaddress(Address currentaddress) + { + this.currentaddress = currentaddress; + } + + public List<FullName> getPreviousnames() + { + return previousnames; + } + + public void setPreviousnames(List<FullName> previousnames) + { + this.previousnames = previousnames; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/664257b4/contrib/src/test/java/com/datatorrent/contrib/cassandra/UserUpsertOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/cassandra/UserUpsertOperator.java b/contrib/src/test/java/com/datatorrent/contrib/cassandra/UserUpsertOperator.java new file mode 100644 index 0000000..e15c9f6 --- /dev/null +++ b/contrib/src/test/java/com/datatorrent/contrib/cassandra/UserUpsertOperator.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.cassandra; + +import java.util.HashMap; +import java.util.Map; +import com.datastax.driver.core.CodecRegistry; +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.TypeCodec; +import com.datastax.driver.core.UDTValue; +import com.datastax.driver.core.UserType; + +/** + This operator is a concrete implementation of the {@link AbstractUpsertOutputOperator}. This is used + in unit tests ( refer {@link AbstractUpsertOutputOperatorCodecsTest} to showcase the use cases of: + => Connectivity options + => Codecs for uder defined types + => How to specify null overrides to avoid patterns of read and then set values in a row + => Collection mutations Add / Remove + => List placements Append/Prepend if there is a list column + => Map POJO field names to Cassandra columns if they do not match exactly + => Specifying TTL + => Overriding default consistency level set with Connection builder at a tuple level if required + */ +public class UserUpsertOperator extends AbstractUpsertOutputOperator +{ + @Override + public ConnectionStateManager.ConnectionBuilder withConnectionBuilder() + { + return ConnectionStateManager.withNewBuilder() + .withSeedNodes("localhost") + .withClusterNameAs("Test Cluster") + .withDCNameAs("datacenter1") + .withTableNameAs("users") + .withKeySpaceNameAs("unittests") + .withdefaultConsistencyLevel(ConsistencyLevel.LOCAL_ONE); + } + + @Override + public Map<String, TypeCodec> getCodecsForUserDefinedTypes() + { + Map<String, TypeCodec> allCodecs = new HashMap<>(); + CodecRegistry codecRegistry = cluster.getConfiguration().getCodecRegistry(); + + UserType addressType = cluster.getMetadata().getKeyspace(getConnectionStateManager().getKeyspaceName()) + .getUserType("address"); + TypeCodec<UDTValue> addressTypeCodec = codecRegistry.codecFor(addressType); + AddressCodec addressCodec = new AddressCodec(addressTypeCodec, Address.class); + allCodecs.put("currentaddress", addressCodec); + + UserType userFullNameType = cluster.getMetadata().getKeyspace(getConnectionStateManager().getKeyspaceName()) + .getUserType("fullname"); + TypeCodec<UDTValue> userFullNameTypeCodec = codecRegistry.codecFor(userFullNameType); + FullNameCodec fullNameCodec = new FullNameCodec(userFullNameTypeCodec, FullName.class); + allCodecs.put("username", fullNameCodec); + + return allCodecs; + } + + @Override + public Class getPayloadPojoClass() + { + return User.class; + } + + + @Override + protected Map<String, String> getPojoFieldNameToCassandraColumnNameOverride() + { + Map<String,String> overridingColMap = new HashMap<>(); + overridingColMap.put("topScores","top_scores"); + return overridingColMap; + } + + @Override + boolean reconcileRecord(Object T, long windowId) + { + return true; + } +} +
