Repository: apex-malhar Updated Branches: refs/heads/master ddd5bcf1a -> 32840a2ce
APEXMALHAR-1966: Update casandra output opreator Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/32840a2c Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/32840a2c Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/32840a2c Branch: refs/heads/master Commit: 32840a2cee5b4c9fde1c80c14df86679e47621eb Parents: ddd5bcf Author: Priyanka Gugale <[email protected]> Authored: Fri Apr 1 14:50:21 2016 +0530 Committer: Priyanka Gugale <[email protected]> Committed: Fri Jul 1 15:28:36 2016 +0530 ---------------------------------------------------------------------- .../cassandra/CassandraOutputOperator.java | 4 +- ...tCassandraTransactionableOutputOperator.java | 35 ++++- ...assandraTransactionableOutputOperatorPS.java | 84 ----------- .../cassandra/CassandraPOJOOutputOperator.java | 150 +++++++++++++++---- .../cassandra/CassandraOperatorTest.java | 134 +++++++++++++---- 5 files changed, 256 insertions(+), 151 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/32840a2c/benchmark/src/main/java/com/datatorrent/benchmark/cassandra/CassandraOutputOperator.java ---------------------------------------------------------------------- diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/cassandra/CassandraOutputOperator.java b/benchmark/src/main/java/com/datatorrent/benchmark/cassandra/CassandraOutputOperator.java index 7d6f08c..666746b 100644 --- a/benchmark/src/main/java/com/datatorrent/benchmark/cassandra/CassandraOutputOperator.java +++ b/benchmark/src/main/java/com/datatorrent/benchmark/cassandra/CassandraOutputOperator.java @@ -23,7 +23,7 @@ import com.datastax.driver.core.BoundStatement; import com.datastax.driver.core.PreparedStatement; import com.datastax.driver.core.Statement; import com.datastax.driver.core.exceptions.DriverException; -import com.datatorrent.contrib.cassandra.AbstractCassandraTransactionableOutputOperatorPS; +import com.datatorrent.contrib.cassandra.AbstractCassandraTransactionableOutputOperator; /** @@ -31,7 +31,7 @@ import com.datatorrent.contrib.cassandra.AbstractCassandraTransactionableOutputO * * @since 1.0.3 */ -public class CassandraOutputOperator extends AbstractCassandraTransactionableOutputOperatorPS<Integer>{ +public class CassandraOutputOperator extends AbstractCassandraTransactionableOutputOperator<Integer>{ private int id = 0; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/32840a2c/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraTransactionableOutputOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraTransactionableOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraTransactionableOutputOperator.java index 9694bf0..9048383 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraTransactionableOutputOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraTransactionableOutputOperator.java @@ -20,14 +20,20 @@ package com.datatorrent.contrib.cassandra; import java.util.Collection; +import javax.annotation.Nonnull; + import com.datastax.driver.core.BatchStatement; +import com.datastax.driver.core.PreparedStatement; import com.datastax.driver.core.Statement; import com.datastax.driver.core.exceptions.DriverException; +import com.datatorrent.api.Context; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.Operator.ActivationListener; import com.datatorrent.lib.db.AbstractBatchTransactionableStoreOutputOperator; /** * <p> - * Generic base output adaptor which creates a transaction at the start of window. Subclasses should provide implementation for getting the update statement. <br/> + * Generic Cassandra output adaptor which creates a transaction at the start of window. Subclasses should provide implementation for getting the update statement and setting the statement parameters. <br/> * </p> * * <p> @@ -48,20 +54,33 @@ import com.datatorrent.lib.db.AbstractBatchTransactionableStoreOutputOperator; * @param <T>type of tuple</T> * @since 1.0.2 */ -public abstract class AbstractCassandraTransactionableOutputOperator<T> extends AbstractBatchTransactionableStoreOutputOperator<T, CassandraTransactionalStore> { +public abstract class AbstractCassandraTransactionableOutputOperator<T> extends AbstractBatchTransactionableStoreOutputOperator<T, CassandraTransactionalStore> implements ActivationListener<Context.OperatorContext> +{ + private transient PreparedStatement updateCommand; - public AbstractCassandraTransactionableOutputOperator(){ - super(); + @Override + public void activate(OperatorContext context) + { + updateCommand = getUpdateCommand(); } /** + * Gets the statement which insert/update the table in the database. + * + * @return the cql statement to update a tuple in the database. + */ + @Nonnull + protected abstract PreparedStatement getUpdateCommand(); + + /** * Sets the parameter of the insert/update statement with values from the tuple. * * @param tuple tuple * @return statement The statement to execute * @throws DriverException */ - protected abstract Statement getUpdateStatement(T tuple) throws DriverException; + protected abstract Statement setStatementParameters(PreparedStatement updateCommand, T tuple) throws DriverException; + @Override public void processBatch(Collection<T> tuples) @@ -69,8 +88,12 @@ public abstract class AbstractCassandraTransactionableOutputOperator<T> extends BatchStatement batchCommand = store.getBatchCommand(); for(T tuple: tuples) { - batchCommand.add(getUpdateStatement(tuple)); + batchCommand.add(setStatementParameters(updateCommand, tuple)); } } + @Override + public void deactivate() + { + } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/32840a2c/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraTransactionableOutputOperatorPS.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraTransactionableOutputOperatorPS.java b/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraTransactionableOutputOperatorPS.java deleted file mode 100644 index 21f1840..0000000 --- a/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraTransactionableOutputOperatorPS.java +++ /dev/null @@ -1,84 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.contrib.cassandra; - -import com.datastax.driver.core.PreparedStatement; -import com.datastax.driver.core.Statement; -import com.datastax.driver.core.exceptions.DriverException; -import com.datatorrent.api.Context; - -import javax.annotation.Nonnull; - - -/** - * <p> - * Generic Cassandra Output Adaptor which creates a transaction at the start of window. Subclasses should provide implementation for getting the update statement and setting the statement parameters. <br/> - * </p> - * - * <p> - * Executes batch of CQL updates and closes the transaction at the end of the window. - * Each tuple corresponds to an CQL update statement. The operator groups the updates in a batch - * and submits them with one call to the database. Batch processing improves performance considerably and also provides atomicity.<br/> - * The size of a batch is equal to the size of the window. - * </p> - * - * <p> - * The tuples in a window are stored in check-pointed collection which is cleared in the endWindow(). - * This is needed for the recovery. The operator writes a tuple exactly once in the database, which is why - * only when all the updates are executed, the transaction is committed in the end window call. - * </p> - * @displayName Abstract Cassandra Transactionable Output With Prepared Statement - * @category Output - * @tags cassandra, batch, transactionable - * @param <T>type of tuple</T> - * @since 1.0.2 - */ -public abstract class AbstractCassandraTransactionableOutputOperatorPS<T> extends AbstractCassandraTransactionableOutputOperator<T>{ - - private transient PreparedStatement updateCommand; - - /** - * Gets the statement which insert/update the table in the database. - * - * @return the cql statement to update a tuple in the database. - */ - @Nonnull - protected abstract PreparedStatement getUpdateCommand(); - - @Override - public void setup(Context.OperatorContext context) - { - super.setup(context); - updateCommand = getUpdateCommand(); - } - - /** - * Sets the parameter of the insert/update statement with values from the tuple. - * - * @param tuple tuple - * @return statement The statement to execute - * @throws DriverException - */ - protected abstract Statement setStatementParameters(PreparedStatement updateCommand, T tuple) throws DriverException; - - @Override - protected Statement getUpdateStatement(T tuple){ - return setStatementParameters(updateCommand, tuple); - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/32840a2c/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOOutputOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOOutputOperator.java index 5f3235a..2d1fea3 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOOutputOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOOutputOperator.java @@ -18,23 +18,23 @@ */ package com.datatorrent.contrib.cassandra; +import java.lang.reflect.Field; import java.math.BigDecimal; import java.util.*; -import javax.validation.constraints.NotNull; - import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.collect.Lists; import com.datastax.driver.core.*; import com.datastax.driver.core.exceptions.DriverException; - +import com.datatorrent.api.AutoMetric; import com.datatorrent.api.Context; import com.datatorrent.api.DefaultInputPort; -import com.datatorrent.api.Operator; +import com.datatorrent.api.DefaultOutputPort; import com.datatorrent.api.annotation.InputPortFieldAnnotation; - +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; import com.datatorrent.lib.util.FieldInfo; import com.datatorrent.lib.util.PojoUtils; import com.datatorrent.lib.util.PojoUtils.*; @@ -50,17 +50,21 @@ import com.datatorrent.lib.util.PojoUtils.*; * @since 2.1.0 */ @Evolving -public class CassandraPOJOOutputOperator extends AbstractCassandraTransactionableOutputOperatorPS<Object> implements Operator.ActivationListener<Context.OperatorContext> +public class CassandraPOJOOutputOperator extends AbstractCassandraTransactionableOutputOperator<Object> { - @NotNull private List<FieldInfo> fieldInfos; - @NotNull private String tablename; + private String query; protected final transient ArrayList<DataType> columnDataTypes; protected final transient ArrayList<Object> getters; protected transient Class<?> pojoClass; + @AutoMetric + private long successfulRecords; + @AutoMetric + private long errorRecords; + /** * The input port on which tuples are received for writing. */ @@ -81,18 +85,8 @@ public class CassandraPOJOOutputOperator extends AbstractCassandraTransactionabl }; - /* - * Tablename in cassandra. - */ - public String getTablename() - { - return tablename; - } - - public void setTablename(String tablename) - { - this.tablename = tablename; - } + @OutputPortFieldAnnotation(error = true) + public final transient DefaultOutputPort<Object> error = new DefaultOutputPort<>(); public CassandraPOJOOutputOperator() { @@ -102,20 +96,29 @@ public class CassandraPOJOOutputOperator extends AbstractCassandraTransactionabl } @Override + public void beginWindow(long windowId) + { + super.beginWindow(windowId); + successfulRecords = 0; + errorRecords = 0; + } + + @Override public void activate(Context.OperatorContext context) { com.datastax.driver.core.ResultSet rs = store.getSession().execute("select * from " + store.keyspace + "." + tablename); - final ColumnDefinitions rsMetaData = rs.getColumnDefinitions(); - final int numberOfColumns = rsMetaData.size(); + if(fieldInfos == null) { + populateFieldInfosFromPojo(rsMetaData); + } - for (int i = 0; i < numberOfColumns; i++) { + for (FieldInfo fieldInfo : getFieldInfos()) { // get the designated column's data type. - final DataType type = rsMetaData.getType(i); + final DataType type = rsMetaData.getType(fieldInfo.getColumnName()); columnDataTypes.add(type); final Object getter; - final String getterExpr = fieldInfos.get(i).getPojoFieldExpression(); + final String getterExpr = fieldInfo.getPojoFieldExpression(); switch (type.getName()) { case ASCII: case TEXT: @@ -162,24 +165,64 @@ public class CassandraPOJOOutputOperator extends AbstractCassandraTransactionabl } getters.add(getter); } + super.activate(context); } - @Override - public void deactivate() + private void populateFieldInfosFromPojo(ColumnDefinitions rsMetaData) + { + fieldInfos = Lists.newArrayList(); + Field[] fields = pojoClass.getDeclaredFields(); + for (int i = 0; i < rsMetaData.size(); i++) { + String columnName = rsMetaData.getName(i); + String pojoField = getMatchingField(fields, columnName); + if (pojoField != null && pojoField.length() != 0) { + fieldInfos.add(new FieldInfo(columnName, pojoField, null)); + } else { + LOG.warn("Couldn't find corrosponding pojo field for column: " + columnName); + } + } + } + + private String getMatchingField(Field[] fields, String columnName) { + for (Field f : fields) { + if (f.getName().equalsIgnoreCase(columnName)) { + return f.getName(); + } + } + return null; } + + /** + * {@inheritDoc} <br/> + * If statement/query is not specified by user, insert query is constructed from fileInfo object and table name. + */ @Override protected PreparedStatement getUpdateCommand() { + PreparedStatement statement; + if (query == null) { + statement = prepareStatementFromFieldsAndTableName(); + } else { + statement = store.getSession().prepare(query); + } + LOG.debug("Statement is: " + statement.getQueryString()); + return statement; + } + + private PreparedStatement prepareStatementFromFieldsAndTableName() + { + if (tablename == null || tablename.length() == 0) { + throw new RuntimeException("Please sepcify query or table name."); + } StringBuilder queryfields = new StringBuilder(); StringBuilder values = new StringBuilder(); for (FieldInfo fieldInfo: fieldInfos) { if (queryfields.length() == 0) { queryfields.append(fieldInfo.getColumnName()); values.append("?"); - } - else { + } else { queryfields.append(",").append(fieldInfo.getColumnName()); values.append(",").append("?"); } @@ -191,6 +234,7 @@ public class CassandraPOJOOutputOperator extends AbstractCassandraTransactionabl + "VALUES (" + values.toString() + ");"; LOG.debug("statement is {}", statement); return store.getSession().prepare(statement); + } @Override @@ -260,6 +304,18 @@ public class CassandraPOJOOutputOperator extends AbstractCassandraTransactionabl return boundStmnt; } + @Override + public void processTuple(Object tuple) + { + try { + super.processTuple(tuple); + successfulRecords++; + } catch (RuntimeException e) { + LOG.error(e.getMessage()); + error.emit(tuple); + errorRecords++; + } + } /** * A list of {@link FieldInfo}s where each item maps a column name to a pojo field name. */ @@ -281,5 +337,41 @@ public class CassandraPOJOOutputOperator extends AbstractCassandraTransactionabl this.fieldInfos = fieldInfos; } + /** + * Gets cassandra table name + * @return tableName + */ + public String getTablename() + { + return tablename; + } + + /** + * Sets cassandra table name (optional if query is specified) + * @param tablename + */ + public void setTablename(String tablename) + { + this.tablename = tablename; + } + + /** + * Gets cql Query + * @return query + */ + public String getQuery() + { + return query; + } + + /** + * Sets cql Query + * @param query + */ + public void setQuery(String query) + { + this.query = query; + } + private static final Logger LOG = LoggerFactory.getLogger(CassandraPOJOOutputOperator.class); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/32840a2c/contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java index f4aa29a..74f99a8 100644 --- a/contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java +++ b/contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java @@ -33,7 +33,9 @@ import com.google.common.collect.Lists; import java.util.*; +import org.junit.After; import org.junit.Assert; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; import org.junit.AfterClass; @@ -54,6 +56,8 @@ public class CassandraOperatorTest private static final int OPERATOR_ID = 0; private static Cluster cluster = null; private static Session session = null; + private OperatorContextTestHelper.TestIdOperatorContext context; + private TestPortContext tpc; @SuppressWarnings("unused") private static class TestEvent @@ -107,6 +111,25 @@ public class CassandraOperatorTest } } + @Before + public void setupForTest() + { + AttributeMap.DefaultAttributeMap attributeMap = new AttributeMap.DefaultAttributeMap(); + attributeMap.put(DAG.APPLICATION_ID, APP_ID); + context = new OperatorContextTestHelper.TestIdOperatorContext(OPERATOR_ID, attributeMap); + + Attribute.AttributeMap.DefaultAttributeMap portAttributes = new Attribute.AttributeMap.DefaultAttributeMap(); + portAttributes.put(Context.PortContext.TUPLE_CLASS, TestPojo.class); + tpc = new TestPortContext(portAttributes); + } + + @After + public void afterTest() + { + session.execute("TRUNCATE " + CassandraTransactionalStore.DEFAULT_META_TABLE); + session.execute("TRUNCATE " + KEYSPACE + "." + TABLE_NAME); + } + private static class TestOutputOperator extends CassandraPOJOOutputOperator { public long getNumOfEventsInStore() @@ -220,23 +243,9 @@ public class CassandraOperatorTest @Test public void testCassandraProtocolVersion() { - CassandraTransactionalStore transactionalStore = new CassandraTransactionalStore(); - transactionalStore.setNode(NODE); - transactionalStore.setKeyspace(KEYSPACE); - transactionalStore.setProtocolVersion("v2"); - - AttributeMap.DefaultAttributeMap attributeMap = new AttributeMap.DefaultAttributeMap(); - attributeMap.put(DAG.APPLICATION_ID, APP_ID); - OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(OPERATOR_ID, attributeMap); - - TestOutputOperator outputOperator = new TestOutputOperator(); - - outputOperator.setTablename(TABLE_NAME); - List<FieldInfo> fieldInfos = Lists.newArrayList(); - fieldInfos.add(new FieldInfo("id", "id", null)); + TestOutputOperator outputOperator = setupForOutputOperatorTest(); + outputOperator.getStore().setProtocolVersion("v2"); - outputOperator.setStore(transactionalStore); - outputOperator.setFieldInfos(fieldInfos); outputOperator.setup(context); Configuration config = outputOperator.getStore().getCluster().getConfiguration(); @@ -246,17 +255,8 @@ public class CassandraOperatorTest @Test public void testCassandraOutputOperator() { - CassandraTransactionalStore transactionalStore = new CassandraTransactionalStore(); - transactionalStore.setNode(NODE); - transactionalStore.setKeyspace(KEYSPACE); - - AttributeMap.DefaultAttributeMap attributeMap = new AttributeMap.DefaultAttributeMap(); - attributeMap.put(DAG.APPLICATION_ID, APP_ID); - OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(OPERATOR_ID, attributeMap); + TestOutputOperator outputOperator = setupForOutputOperatorTest(); - TestOutputOperator outputOperator = new TestOutputOperator(); - - outputOperator.setTablename(TABLE_NAME); List<FieldInfo> fieldInfos = Lists.newArrayList(); fieldInfos.add(new FieldInfo("id", "id", null)); fieldInfos.add(new FieldInfo("age", "age", null)); @@ -269,14 +269,37 @@ public class CassandraOperatorTest fieldInfos.add(new FieldInfo("set1", "set1", null)); fieldInfos.add(new FieldInfo("test", "test", null)); - outputOperator.setStore(transactionalStore); outputOperator.setFieldInfos(fieldInfos); outputOperator.setup(context); + outputOperator.input.setup(tpc); + outputOperator.activate(context); - Attribute.AttributeMap.DefaultAttributeMap portAttributes = new Attribute.AttributeMap.DefaultAttributeMap(); - portAttributes.put(Context.PortContext.TUPLE_CLASS, TestPojo.class); - TestPortContext tpc = new TestPortContext(portAttributes); + List<TestPojo> events = Lists.newArrayList(); + for (int i = 0; i < 3; i++) { + Set<Integer> set = new HashSet<Integer>(); + set.add(i); + List<Integer> list = new ArrayList<Integer>(); + list.add(i); + Map<String, Integer> map = new HashMap<String, Integer>(); + map.put("key" + i, i); + events.add(new TestPojo(UUID.randomUUID(), i, "abclast", true, i, 2.0, set, list, map, new Date(System.currentTimeMillis()))); + } + + outputOperator.beginWindow(0); + for (TestPojo event : events) { + outputOperator.input.process(event); + } + outputOperator.endWindow(); + + Assert.assertEquals("rows in db", 3, outputOperator.getNumOfEventsInStore()); + outputOperator.getEventsInStore(); + } + @Test + public void testPopulateFieldInfo() + { + TestOutputOperator outputOperator = setupForOutputOperatorTest(); + outputOperator.setup(context); outputOperator.input.setup(tpc); outputOperator.activate(context); @@ -301,6 +324,57 @@ public class CassandraOperatorTest outputOperator.getEventsInStore(); } + @Test + public void testupdateQueryWithParameters() throws InterruptedException + { + UUID id = UUID.fromString("94ab597c-a5ff-4997-8343-68993d446b14"); + TestPojo testPojo = new TestPojo(id, 20, "Laura", true, 10, 2.0, new HashSet<Integer>(), new ArrayList<Integer>(), null, new Date(System.currentTimeMillis())); + String insert = "INSERT INTO " + KEYSPACE + "." + TABLE_NAME + " (ID, age, lastname, test, floatValue, doubleValue)" + " VALUES (94ab597c-a5ff-4997-8343-68993d446b14, 20, 'Laura', true, 10, 2.0);"; + session.execute(insert); + String recordsQuery = "SELECT * from " + TABLE_NAME + ";"; + ResultSet resultSetRecords = session.execute(recordsQuery); + Row row = resultSetRecords.iterator().next(); + Assert.assertEquals("Updated last name", "Laura", row.getString("lastname")); + Thread.sleep(1000); // wait till cassandra writes the record + + // update record + String updateLastName = "Laurel"; + String updateQuery = "update " + KEYSPACE + "." + TABLE_NAME + " set lastname='" + updateLastName + "' where id=?"; + // set specific files required by update command in order as per query + List<FieldInfo> fieldInfos = Lists.newArrayList(); + fieldInfos.add(new FieldInfo("id", "id", null)); + + // reset the operator to run new query + TestOutputOperator outputOperator = setupForOutputOperatorTest(); + outputOperator.setQuery(updateQuery); + outputOperator.setFieldInfos(fieldInfos); + outputOperator.setup(context); + outputOperator.input.setup(tpc); + outputOperator.activate(context); + + outputOperator.beginWindow(1); + outputOperator.input.process(testPojo); + outputOperator.endWindow(); + + recordsQuery = "SELECT * from " + TABLE_NAME + ";"; + resultSetRecords = session.execute(recordsQuery); + row = resultSetRecords.iterator().next(); + Assert.assertEquals("Updated last name", updateLastName, row.getString("lastname")); + } + + private TestOutputOperator setupForOutputOperatorTest() + { + CassandraTransactionalStore transactionalStore = new CassandraTransactionalStore(); + transactionalStore.setNode(NODE); + transactionalStore.setKeyspace(KEYSPACE); + + TestOutputOperator operator = new TestOutputOperator(); + operator = new TestOutputOperator(); + operator.setTablename(TABLE_NAME); + operator.setStore(transactionalStore); + return operator; + } + /* * This test can be run on cassandra server installed on node17. */
