Repository: incubator-apex-malhar Updated Branches: refs/heads/devel-3 f7f9085e8 -> 9194a72c3
Cassandra integration with Schema Support Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/effc8385 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/effc8385 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/effc8385 Branch: refs/heads/devel-3 Commit: effc8385af173448f0dfc9e8e2243b5fd728acef Parents: f7f9085 Author: Chandni Singh <[email protected]> Authored: Sun Aug 2 21:27:59 2015 -0700 Committer: ishark <[email protected]> Committed: Fri Sep 18 13:02:44 2015 -0700 ---------------------------------------------------------------------- .../AbstractCassandraInputOperator.java | 9 +- .../cassandra/CassandraPOJOInputOperator.java | 276 +++++++++---------- .../cassandra/CassandraPOJOOutputOperator.java | 145 +++++----- .../cassandra/CassandraOperatorTest.java | 159 +++++------ 4 files changed, 293 insertions(+), 296 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/effc8385/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraInputOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraInputOperator.java index 0c14a0e..7560768 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraInputOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraInputOperator.java @@ -99,14 +99,14 @@ public abstract class AbstractCassandraInputOperator<T> extends AbstractStoreInp public void emitTuples() { String query = queryToRetrieveData(); - logger.debug(String.format("select statement: %s", query)); + logger.debug("select statement: {}", query); try { ResultSet result = store.getSession().execute(query); if (!result.isExhausted()) { for (Row row : result) { T tuple = getTuple(row); - outputPort.emit(tuple); + emit(tuple); } } else { // No rows available wait for some time before retrying so as to not continuously slam the database @@ -118,4 +118,9 @@ public abstract class AbstractCassandraInputOperator<T> extends AbstractStoreInp DTThrowable.rethrow(ex); } } + + protected void emit(T tuple) + { + outputPort.emit(tuple); + } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/effc8385/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java index 9d8e356..13f4dd0 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java @@ -21,17 +21,23 @@ import java.util.*; import javax.validation.constraints.Min; import javax.validation.constraints.NotNull; +import org.apache.hadoop.classification.InterfaceStability.Evolving; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import com.datastax.driver.core.ColumnDefinitions; import com.datastax.driver.core.DataType; import com.datastax.driver.core.Row; -import org.apache.hadoop.classification.InterfaceStability.Evolving; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import com.datatorrent.api.Context; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Operator; +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; +import com.datatorrent.lib.util.FieldInfo; import com.datatorrent.lib.util.PojoUtils; import com.datatorrent.lib.util.PojoUtils.*; -import com.datatorrent.api.Context.OperatorContext; /** * <p> @@ -49,29 +55,38 @@ import com.datatorrent.api.Context.OperatorContext; * @since 3.0.0 */ @Evolving -public class CassandraPOJOInputOperator extends AbstractCassandraInputOperator<Object> +public class CassandraPOJOInputOperator extends AbstractCassandraInputOperator<Object> implements Operator.ActivationListener<OperatorContext> { @NotNull - private List<String> columns; - private final transient List<DataType> columnDataTypes; + private List<FieldInfo> fieldInfos; private Number startRow = 0; @NotNull - private List<String> expressions; - @NotNull private String tablename; - private final transient List<Object> setters; @NotNull private String query; - - private transient Class<?> objectClass = null; @NotNull - protected String primaryKeyColumn; - protected transient DataType primaryKeyColumnType; - private transient Row lastRowInBatch; + private String primaryKeyColumn; @Min(1) private int limit = 10; + private transient DataType primaryKeyColumnType; + private transient Row lastRowInBatch; + + protected final transient List<Object> setters; + protected final transient List<DataType> columnDataTypes; + protected transient Class<?> pojoClass; + + @OutputPortFieldAnnotation(schemaRequired = true) + public final transient DefaultOutputPort<Object> outputPort = new DefaultOutputPort<Object>() + { + @Override + public void setup(Context.PortContext context) + { + pojoClass = context.getValue(Context.PortContext.TUPLE_CLASS); + } + }; + /* * Number of records to be fetched in one time from cassandra table. */ @@ -114,25 +129,6 @@ public class CassandraPOJOInputOperator extends AbstractCassandraInputOperator<O } /* - * POJO class which is generated as output from this operator. - * Example: - * public class TestPOJO{ int intfield; public int getInt(){} public void setInt(){} } - * outputClass = TestPOJO - * POJOs will be generated on fly in later implementation. - */ - private String outputClass; - - public String getOutputClass() - { - return outputClass; - } - - public void setOutputClass(String outputClass) - { - this.outputClass = outputClass; - } - - /* * Parameterized query with parameters such as %t for table name , %p for primary key, %s for start value and %l for limit. * Example of retrieveQuery: * select * from %t where token(%p) > %s limit %l; @@ -147,31 +143,25 @@ public class CassandraPOJOInputOperator extends AbstractCassandraInputOperator<O this.query = query.replace("%t", tablename); } - /* - * An ArrayList of Java expressions that will yield the cassandra column value to be set in output object. - * Each expression corresponds to one column in the Cassandra table. + /** + * A list of {@link FieldInfo}s where each item maps a column name to a pojo field name. */ - public List<String> getExpressions() + public List<FieldInfo> getFieldInfos() { - return expressions; + return fieldInfos; } - public void setExpressions(List<String> expressions) - { - this.expressions = expressions; - } - - /* - * List of column names specified by User in the same order as expressions for the particular fields. + /** + * Sets the {@link FieldInfo}s. A {@link FieldInfo} maps a store column to a pojo field name.<br/> + * The value from fieldInfo.column is assigned to fieldInfo.pojoFieldExpression. + * + * @description $[].columnName name of the database column name + * @description $[].pojoFieldExpression pojo field name or expression + * @useSchema $[].pojoFieldExpression outputPort.fields[].name */ - public List<String> getColumns() + public void setFieldInfos(List<FieldInfo> fieldInfos) { - return columns; - } - - public void setColumns(List<String> columns) - { - this.columns = columns; + this.fieldInfos = fieldInfos; } /* @@ -196,87 +186,72 @@ public class CassandraPOJOInputOperator extends AbstractCassandraInputOperator<O } @Override - public void setup(OperatorContext context) + public void activate(OperatorContext context) { - super.setup(context); - if (setters.isEmpty()) { - try { - // This code will be replaced after integration of creating POJOs on the fly utility. - objectClass = Class.forName(outputClass); - } - catch (ClassNotFoundException ex) { - throw new RuntimeException(ex); - } + com.datastax.driver.core.ResultSet rs = store.getSession().execute("select * from " + store.keyspace + "." + tablename + " LIMIT " + 1); + ColumnDefinitions rsMetaData = rs.getColumnDefinitions(); - com.datastax.driver.core.ResultSet rs = store.getSession().execute("select * from " + store.keyspace + "." + tablename + " LIMIT " + 1); - ColumnDefinitions rsMetaData = rs.getColumnDefinitions(); - - primaryKeyColumnType = rsMetaData.getType(primaryKeyColumn); - if(query.contains("%p")) - { - query = query.replace("%p", primaryKeyColumn); - } - if(query.contains("%l")) - { - query = query.replace("%l", limit+""); - } - - logger.debug("query is {}",query); - - //In case columns is a subset - int columnSize = columns.size(); - for (int i = 0; i < columnSize; i++) { - // Get the designated column's data type. - DataType type = rsMetaData.getType(columns.get(i)); - columnDataTypes.add(type); - Object setter; - final String setterExpr = expressions.get(i); - switch (type.getName()) { - case ASCII: - case TEXT: - case VARCHAR: - setter = PojoUtils.createSetter(objectClass, setterExpr, String.class); - break; - case BOOLEAN: - setter = PojoUtils.createSetterBoolean(objectClass, setterExpr); - break; - case INT: - setter = PojoUtils.createSetterInt(objectClass, setterExpr); - break; - case BIGINT: - case COUNTER: - setter = PojoUtils.createSetterLong(objectClass, setterExpr); - break; - case FLOAT: - setter = PojoUtils.createSetterFloat(objectClass, setterExpr); - break; - case DOUBLE: - setter = PojoUtils.createSetterDouble(objectClass, setterExpr); - break; - case DECIMAL: - setter = PojoUtils.createSetter(objectClass, setterExpr, BigDecimal.class); - break; - case SET: - setter = PojoUtils.createSetter(objectClass, setterExpr, Set.class); - break; - case MAP: - setter = PojoUtils.createSetter(objectClass, setterExpr, Map.class); - break; - case LIST: - setter = PojoUtils.createSetter(objectClass, setterExpr, List.class); - break; - case TIMESTAMP: - setter = PojoUtils.createSetter(objectClass, setterExpr, Date.class); - break; - case UUID: - setter = PojoUtils.createSetter(objectClass, setterExpr, UUID.class); - break; - default: - setter = PojoUtils.createSetter(objectClass, setterExpr, Object.class); - break; - } - setters.add(setter); + primaryKeyColumnType = rsMetaData.getType(primaryKeyColumn); + if (query.contains("%p")) { + query = query.replace("%p", primaryKeyColumn); + } + if (query.contains("%l")) { + query = query.replace("%l", limit + ""); + } + + LOG.debug("query is {}", query); + + for (FieldInfo fieldInfo : fieldInfos) { + // Get the designated column's data type. + DataType type = rsMetaData.getType(fieldInfo.getColumnName()); + columnDataTypes.add(type); + Object setter; + final String setterExpr = fieldInfo.getPojoFieldExpression(); + switch (type.getName()) { + case ASCII: + case TEXT: + case VARCHAR: + setter = PojoUtils.createSetter(pojoClass, setterExpr, String.class); + break; + case BOOLEAN: + setter = PojoUtils.createSetterBoolean(pojoClass, setterExpr); + break; + case INT: + setter = PojoUtils.createSetterInt(pojoClass, setterExpr); + break; + case BIGINT: + case COUNTER: + setter = PojoUtils.createSetterLong(pojoClass, setterExpr); + break; + case FLOAT: + setter = PojoUtils.createSetterFloat(pojoClass, setterExpr); + break; + case DOUBLE: + setter = PojoUtils.createSetterDouble(pojoClass, setterExpr); + break; + case DECIMAL: + setter = PojoUtils.createSetter(pojoClass, setterExpr, BigDecimal.class); + break; + case SET: + setter = PojoUtils.createSetter(pojoClass, setterExpr, Set.class); + break; + case MAP: + setter = PojoUtils.createSetter(pojoClass, setterExpr, Map.class); + break; + case LIST: + setter = PojoUtils.createSetter(pojoClass, setterExpr, List.class); + break; + case TIMESTAMP: + setter = PojoUtils.createSetter(pojoClass, setterExpr, Date.class); + break; + case UUID: + setter = PojoUtils.createSetter(pojoClass, setterExpr, UUID.class); + break; + default: + setter = PojoUtils.createSetter(pojoClass, setterExpr, Object.class); + break; } + setters.add(setter); } } @@ -285,23 +260,19 @@ public class CassandraPOJOInputOperator extends AbstractCassandraInputOperator<O public Object getTuple(Row row) { lastRowInBatch = row; - Object obj = null; - final int size = columnDataTypes.size(); + Object obj; try { // This code will be replaced after integration of creating POJOs on the fly utility. - obj = objectClass.newInstance(); - } - catch (InstantiationException ex) { - throw new RuntimeException(ex); + obj = pojoClass.newInstance(); } - catch (IllegalAccessException ex) { + catch (InstantiationException | IllegalAccessException ex) { throw new RuntimeException(ex); } - for (int i = 0; i < size; i++) { + for (int i = 0; i < columnDataTypes.size(); i++) { DataType type = columnDataTypes.get(i); - String columnName = columns.get(i); + String columnName = fieldInfos.get(i).getColumnName(); switch (type.getName()) { case UUID: final UUID id = row.getUUID(columnName); @@ -370,16 +341,10 @@ public class CassandraPOJOInputOperator extends AbstractCassandraInputOperator<O @Override public String queryToRetrieveData() { - String parameterizedQuery; - if(query.contains("%v")) - { - parameterizedQuery = query.replace("%v", startRow+""); + if (query.contains("%v")) { + return query.replace("%v", startRow + ""); } - else - { - parameterizedQuery = query; - } - return parameterizedQuery; + return query; } @@ -411,5 +376,16 @@ public class CassandraPOJOInputOperator extends AbstractCassandraInputOperator<O } - private static final Logger logger = LoggerFactory.getLogger(CassandraPOJOInputOperator.class); + @Override + protected void emit(Object tuple) + { + outputPort.emit(tuple); + } + + @Override + public void deactivate() + { + } + + private static final Logger LOG = LoggerFactory.getLogger(CassandraPOJOInputOperator.class); } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/effc8385/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOOutputOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOOutputOperator.java index 61ef26c..bc4f97e 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOOutputOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOOutputOperator.java @@ -15,16 +15,6 @@ */ package com.datatorrent.contrib.cassandra; -import com.datastax.driver.core.*; -import com.datastax.driver.core.exceptions.DriverException; -import com.datatorrent.lib.util.PojoUtils; -import com.datatorrent.lib.util.PojoUtils.GetterBoolean; -import com.datatorrent.lib.util.PojoUtils.GetterDouble; -import com.datatorrent.lib.util.PojoUtils.GetterFloat; -import com.datatorrent.lib.util.PojoUtils.GetterInt; -import com.datatorrent.lib.util.PojoUtils.GetterLong; -import com.datatorrent.lib.util.PojoUtils.Getter; - import java.math.BigDecimal; import java.util.*; @@ -34,6 +24,18 @@ import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.datastax.driver.core.*; +import com.datastax.driver.core.exceptions.DriverException; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.Operator; +import com.datatorrent.api.annotation.InputPortFieldAnnotation; + +import com.datatorrent.lib.util.FieldInfo; +import com.datatorrent.lib.util.PojoUtils; +import com.datatorrent.lib.util.PojoUtils.*; + /** * <p> * CassandraOutputOperator class.</p> @@ -45,46 +47,36 @@ import org.slf4j.LoggerFactory; * @since 2.1.0 */ @Evolving -public class CassandraPOJOOutputOperator extends AbstractCassandraTransactionableOutputOperatorPS<Object> +public class CassandraPOJOOutputOperator extends AbstractCassandraTransactionableOutputOperatorPS<Object> implements Operator.ActivationListener<Context.OperatorContext> { - private static final long serialVersionUID = 201506181024L; @NotNull - private ArrayList<String> columns; - private final transient ArrayList<DataType> columnDataTypes; + private List<FieldInfo> fieldInfos; @NotNull - private ArrayList<String> expressions; - private final transient ArrayList<Object> getters; - - /* - * An ArrayList of Java expressions that will yield the field value from the POJO. - * Each expression corresponds to one column in the Cassandra table. - */ - public ArrayList<String> getExpressions() - { - return expressions; - } + private String tablename; - public void setExpressions(ArrayList<String> expressions) - { - this.expressions = expressions; - } + protected final transient ArrayList<DataType> columnDataTypes; + protected final transient ArrayList<Object> getters; + protected transient Class<?> pojoClass; - /* - * An ArrayList of Columns in the Cassandra Table. + /** + * The input port on which tuples are received for writing. */ - public ArrayList<String> getColumns() + @InputPortFieldAnnotation(optional = true) + public final transient DefaultInputPort<Object> input = new DefaultInputPort<Object>() { - return columns; - } - - public void setColumns(ArrayList<String> columns) - { - this.columns = columns; - } + @Override + public void setup(Context.PortContext context) + { + pojoClass = context.getValue(Context.PortContext.TUPLE_CLASS); + } - @NotNull - private String tablename; + @Override + public void process(Object tuple) + { + CassandraPOJOOutputOperator.super.input.process(tuple); + } + }; /* * Tablename in cassandra. @@ -106,63 +98,63 @@ public class CassandraPOJOOutputOperator extends AbstractCassandraTransactionabl getters = new ArrayList<Object>(); } - public void processFirstTuple(Object tuple) + @Override + public void activate(Context.OperatorContext context) { com.datastax.driver.core.ResultSet rs = store.getSession().execute("select * from " + store.keyspace + "." + tablename); final ColumnDefinitions rsMetaData = rs.getColumnDefinitions(); final int numberOfColumns = rsMetaData.size(); - final Class<?> fqcn = tuple.getClass(); for (int i = 0; i < numberOfColumns; i++) { // get the designated column's data type. final DataType type = rsMetaData.getType(i); columnDataTypes.add(type); final Object getter; - final String getterExpr = expressions.get(i); + final String getterExpr = fieldInfos.get(i).getPojoFieldExpression(); switch (type.getName()) { case ASCII: case TEXT: case VARCHAR: - getter = PojoUtils.createGetter(fqcn, getterExpr, String.class); + getter = PojoUtils.createGetter(pojoClass, getterExpr, String.class); break; case BOOLEAN: - getter = PojoUtils.createGetterBoolean(fqcn, getterExpr); + getter = PojoUtils.createGetterBoolean(pojoClass, getterExpr); break; case INT: - getter = PojoUtils.createGetterInt(fqcn, getterExpr); + getter = PojoUtils.createGetterInt(pojoClass, getterExpr); break; case BIGINT: case COUNTER: - getter = PojoUtils.createGetterLong(fqcn, getterExpr); + getter = PojoUtils.createGetterLong(pojoClass, getterExpr); break; case FLOAT: - getter = PojoUtils.createGetterFloat(fqcn, getterExpr); + getter = PojoUtils.createGetterFloat(pojoClass, getterExpr); break; case DOUBLE: - getter = PojoUtils.createGetterDouble(fqcn, getterExpr); + getter = PojoUtils.createGetterDouble(pojoClass, getterExpr); break; case DECIMAL: - getter = PojoUtils.createGetter(fqcn, getterExpr, BigDecimal.class); + getter = PojoUtils.createGetter(pojoClass, getterExpr, BigDecimal.class); break; case SET: - getter = PojoUtils.createGetter(fqcn, getterExpr, Set.class); + getter = PojoUtils.createGetter(pojoClass, getterExpr, Set.class); break; case MAP: - getter = PojoUtils.createGetter(fqcn, getterExpr, Map.class); + getter = PojoUtils.createGetter(pojoClass, getterExpr, Map.class); break; case LIST: - getter = PojoUtils.createGetter(fqcn, getterExpr, List.class); + getter = PojoUtils.createGetter(pojoClass, getterExpr, List.class); break; case TIMESTAMP: - getter = PojoUtils.createGetter(fqcn, getterExpr, Date.class); + getter = PojoUtils.createGetter(pojoClass, getterExpr, Date.class); break; case UUID: - getter = PojoUtils.createGetter(fqcn, getterExpr, UUID.class); + getter = PojoUtils.createGetter(pojoClass, getterExpr, UUID.class); break; default: - getter = PojoUtils.createGetter(fqcn, getterExpr, Object.class); + getter = PojoUtils.createGetter(pojoClass, getterExpr, Object.class); break; } getters.add(getter); @@ -170,17 +162,22 @@ public class CassandraPOJOOutputOperator extends AbstractCassandraTransactionabl } @Override + public void deactivate() + { + } + + @Override protected PreparedStatement getUpdateCommand() { - StringBuilder queryfields = new StringBuilder(""); - StringBuilder values = new StringBuilder(""); - for (String column: columns) { + StringBuilder queryfields = new StringBuilder(); + StringBuilder values = new StringBuilder(); + for (FieldInfo fieldInfo: fieldInfos) { if (queryfields.length() == 0) { - queryfields.append(column); + queryfields.append(fieldInfo.getColumnName()); values.append("?"); } else { - queryfields.append(",").append(column); + queryfields.append(",").append(fieldInfo.getColumnName()); values.append(",").append("?"); } } @@ -197,9 +194,6 @@ public class CassandraPOJOOutputOperator extends AbstractCassandraTransactionabl @SuppressWarnings("unchecked") protected Statement setStatementParameters(PreparedStatement updateCommand, Object tuple) throws DriverException { - if (getters.isEmpty()) { - processFirstTuple(tuple); - } final BoundStatement boundStmnt = new BoundStatement(updateCommand); final int size = columnDataTypes.size(); for (int i = 0; i < size; i++) { @@ -263,5 +257,26 @@ public class CassandraPOJOOutputOperator extends AbstractCassandraTransactionabl return boundStmnt; } + /** + * A list of {@link FieldInfo}s where each item maps a column name to a pojo field name. + */ + public List<FieldInfo> getFieldInfos() + { + return fieldInfos; + } + + /** + * Sets the {@link FieldInfo}s. A {@link FieldInfo} maps a store column to a pojo field name.<br/> + * The value from fieldInfo.column is assigned to fieldInfo.pojoFieldExpression. + * + * @description $[].columnName name of the database column name + * @description $[].pojoFieldExpression pojo field name or expression + * @useSchema $[].pojoFieldExpression input.fields[].name + */ + public void setFieldInfos(List<FieldInfo> fieldInfos) + { + this.fieldInfos = fieldInfos; + } + private static final Logger LOG = LoggerFactory.getLogger(CassandraPOJOOutputOperator.class); } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/effc8385/contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java index 6f5f75c..ab7c91e 100644 --- a/contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java +++ b/contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java @@ -17,13 +17,19 @@ package com.datatorrent.contrib.cassandra; import com.datastax.driver.core.*; import com.datastax.driver.core.exceptions.DriverException; +import com.datatorrent.api.Attribute; import com.datatorrent.api.Attribute.AttributeMap; +import com.datatorrent.api.Context; import com.datatorrent.api.DAG; + +import com.datatorrent.lib.helper.TestPortContext; +import com.datatorrent.lib.util.FieldInfo; import com.datatorrent.netlet.util.DTThrowable; import com.datatorrent.lib.helper.OperatorContextTestHelper; import com.datatorrent.lib.testbench.CollectorTestSink; import com.google.common.collect.Lists; import java.util.*; + import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; @@ -62,6 +68,7 @@ public class CassandraOperatorTest @BeforeClass public static void setup() { + @SuppressWarnings("UnusedDeclaration") Class<?> clazz = org.codehaus.janino.CompilerFactory.class; try { cluster = Cluster.builder() .addContactPoint(NODE).build(); @@ -100,8 +107,6 @@ public class CassandraOperatorTest private static class TestOutputOperator extends CassandraPOJOOutputOperator { - private static final long serialVersionUID = 201506181038L; - public long getNumOfEventsInStore() { String countQuery = "SELECT count(*) from " + TABLE_NAME + ";"; @@ -117,65 +122,52 @@ public class CassandraOperatorTest { String recordsQuery = "SELECT * from " + TABLE_NAME + ";"; ResultSet resultSetRecords = session.execute(recordsQuery); - int count = 0; for (Row row: resultSetRecords) { - LOG.debug("Boolean value is {}", row.getBool("test")); - Assert.assertEquals(true, row.getBool("test")); - LOG.debug("lastname returned is {}", row.getString("lastname")); - Assert.assertEquals("abclast", row.getString("lastname")); - LOG.debug("Double value returned is {}", row.getDouble("doubleValue")); - Assert.assertEquals("Double value is", 2.0, row.getDouble("doubleValue"), 2); - LOG.debug("Float value returned is {}", row.getFloat("floatValue")); - LOG.debug("age returned is {}", row.getInt("age")); - LOG.debug("set returned is {} ", row.getSet("set1", Integer.class)); - LOG.debug("list returned is {}", row.getList("list1", Integer.class)); - LOG.debug("map returned is {}", row.getMap("map1", String.class, Integer.class)); - LOG.debug("date returned is {}", row.getDate("last_visited")); - Assert.assertNotEquals(new Date(System.currentTimeMillis()), row.getDate("last_visited")); - if (count == 0) { - Assert.assertEquals(2, row.getInt("age")); - Assert.assertEquals(2.0, row.getFloat("floatValue"), 2); + int age = row.getInt("age"); + Assert.assertEquals("check boolean", true, row.getBool("test")); + Assert.assertEquals("check last name", "abclast", row.getString("lastname")); + Assert.assertEquals("check double", 2.0, row.getDouble("doubleValue"), 2); + LOG.debug("age returned is {}", age); + Assert.assertNotEquals("check date", new Date(System.currentTimeMillis()), row.getDate("last_visited")); + if (age == 2) { + Assert.assertEquals("check float", 2.0, row.getFloat("floatValue"), 2); Set<Integer> set = new HashSet<Integer>(); List<Integer> list = new ArrayList<Integer>(); Map<String, Integer> map = new HashMap<String, Integer>(); set.add(2); list.add(2); map.put("key2", 2); - Assert.assertEquals(set, row.getSet("set1", Integer.class)); - Assert.assertEquals(map, row.getMap("map1", String.class, Integer.class)); - Assert.assertEquals(list, row.getList("list1", Integer.class)); + + Assert.assertEquals("check set", set, row.getSet("set1", Integer.class)); + Assert.assertEquals("check map", map, row.getMap("map1", String.class, Integer.class)); + Assert.assertEquals("check list", list, row.getList("list1", Integer.class)); } - if (count == 1) { - Assert.assertEquals(0, row.getInt("age")); - Assert.assertEquals(0.0, row.getFloat("floatValue"), 2); + if (age == 0) { + Assert.assertEquals("check float", 0.0, row.getFloat("floatValue"), 2); Set<Integer> set = new HashSet<Integer>(); List<Integer> list = new ArrayList<Integer>(); Map<String, Integer> map = new HashMap<String, Integer>(); set.add(0); list.add(0); map.put("key0", 0); - Assert.assertEquals(set, row.getSet("set1", Integer.class)); - Assert.assertEquals(map, row.getMap("map1", String.class, Integer.class)); - Assert.assertEquals(list, row.getList("list1", Integer.class)); + Assert.assertEquals("check set", set, row.getSet("set1", Integer.class)); + Assert.assertEquals("check map", map, row.getMap("map1", String.class, Integer.class)); + Assert.assertEquals("check list", list, row.getList("list1", Integer.class)); } - if (count == 2) { - Assert.assertEquals(1, row.getInt("age")); - Assert.assertEquals(1.0, row.getFloat("floatValue"), 2); + if (age == 1) { + Assert.assertEquals("check float", 1.0, row.getFloat("floatValue"), 2); Set<Integer> set = new HashSet<Integer>(); List<Integer> list = new ArrayList<Integer>(); Map<String, Integer> map = new HashMap<String, Integer>(); set.add(1); list.add(1); map.put("key1", 1); - Assert.assertEquals(set, row.getSet("set1", Integer.class)); - Assert.assertEquals(map, row.getMap("map1", String.class, Integer.class)); - Assert.assertEquals(list, row.getList("list1", Integer.class)); + Assert.assertEquals("check set", set, row.getSet("set1", Integer.class)); + Assert.assertEquals("check map", map, row.getMap("map1", String.class, Integer.class)); + Assert.assertEquals("check list", list, row.getList("list1", Integer.class)); } - count++; } - } - } private static class TestInputOperator extends CassandraPOJOInputOperator @@ -237,34 +229,29 @@ public class CassandraOperatorTest TestOutputOperator outputOperator = new TestOutputOperator(); outputOperator.setTablename(TABLE_NAME); - ArrayList<String> columns = new ArrayList<String>(); - columns.add("id"); - columns.add("age"); - columns.add("doubleValue"); - columns.add("floatValue"); - columns.add("last_visited"); - columns.add("lastname"); - columns.add("list1"); - columns.add("map1"); - columns.add("set1"); - columns.add("test"); - outputOperator.setColumns(columns); - ArrayList<String> expressions = new ArrayList<String>(); - expressions.add("id"); - expressions.add("age"); - expressions.add("doubleValue"); - expressions.add("floatValue"); - expressions.add("last_visited"); - expressions.add("lastname"); - expressions.add("list1"); - expressions.add("map1"); - expressions.add("set1"); - expressions.add("test"); - outputOperator.setExpressions(expressions); - outputOperator.setStore(transactionalStore); + List<FieldInfo> fieldInfos = Lists.newArrayList(); + fieldInfos.add(new FieldInfo("id", "id", null)); + fieldInfos.add(new FieldInfo("age", "age", null)); + fieldInfos.add(new FieldInfo("doubleValue", "doubleValue", null)); + fieldInfos.add(new FieldInfo("floatValue", "floatValue", null)); + fieldInfos.add(new FieldInfo("last_visited", "last_visited", null)); + fieldInfos.add(new FieldInfo("lastname", "lastname", null)); + fieldInfos.add(new FieldInfo("list1", "list1", null)); + fieldInfos.add(new FieldInfo("map1", "map1", null)); + fieldInfos.add(new FieldInfo("set1", "set1", null)); + fieldInfos.add(new FieldInfo("test", "test", null)); + outputOperator.setStore(transactionalStore); + outputOperator.setFieldInfos(fieldInfos); outputOperator.setup(context); + Attribute.AttributeMap.DefaultAttributeMap portAttributes = new Attribute.AttributeMap.DefaultAttributeMap(); + portAttributes.put(Context.PortContext.TUPLE_CLASS, TestPojo.class); + TestPortContext tpc = new TestPortContext(portAttributes); + + outputOperator.input.setup(tpc); + outputOperator.activate(context); + List<TestPojo> events = Lists.newArrayList(); for (int i = 0; i < 3; i++) { Set<Integer> set = new HashSet<Integer>(); @@ -277,7 +264,7 @@ public class CassandraOperatorTest } outputOperator.beginWindow(0); - for (TestPojo event: events) { + for (TestPojo event : events) { outputOperator.input.process(event); } outputOperator.endWindow(); @@ -290,7 +277,7 @@ public class CassandraOperatorTest * This test can be run on cassandra server installed on node17. */ @Test - public void TestCassandraInputOperator() + public void testCassandraInputOperator() { String query1 = "SELECT * FROM " + KEYSPACE + "." + "%t;"; CassandraStore store = new CassandraStore(); @@ -303,25 +290,28 @@ public class CassandraOperatorTest TestInputOperator inputOperator = new TestInputOperator(); inputOperator.setStore(store); - inputOperator.setOutputClass("com.datatorrent.contrib.cassandra.TestInputPojo"); inputOperator.setTablename(TABLE_NAME_INPUT); inputOperator.setQuery(query1); - ArrayList<String> columns = new ArrayList<String>(); - columns.add("id"); - columns.add("age"); - columns.add("lastname"); - - inputOperator.setColumns(columns); - ArrayList<String> expressions = new ArrayList<String>(); - expressions.add("id"); - expressions.add("age"); - expressions.add("lastname"); - inputOperator.setExpressions(expressions); - inputOperator.insertEventsInTable(30); inputOperator.setPrimaryKeyColumn("id"); - CollectorTestSink<Object> sink = new CollectorTestSink<Object>(); + + List<FieldInfo> fieldInfos = Lists.newArrayList(); + fieldInfos.add(new FieldInfo("id", "id", null)); + fieldInfos.add(new FieldInfo("age", "age", null)); + fieldInfos.add(new FieldInfo("lastname", "lastname", null)); + inputOperator.setFieldInfos(fieldInfos); + + inputOperator.insertEventsInTable(30); + CollectorTestSink<Object> sink = new CollectorTestSink<>(); inputOperator.outputPort.setSink(sink); + + Attribute.AttributeMap.DefaultAttributeMap portAttributes = new Attribute.AttributeMap.DefaultAttributeMap(); + portAttributes.put(Context.PortContext.TUPLE_CLASS, TestInputPojo.class); + TestPortContext tpc = new TestPortContext(portAttributes); + inputOperator.setup(context); + inputOperator.outputPort.setup(tpc); + inputOperator.activate(context); + inputOperator.beginWindow(0); inputOperator.emitTuples(); inputOperator.endWindow(); @@ -336,8 +326,14 @@ public class CassandraOperatorTest } sink.clear(); + inputOperator.columnDataTypes.clear(); + String query2 = "SELECT * FROM " + KEYSPACE + "." + "%t where token(%p) > %v;"; inputOperator.setQuery(query2); + inputOperator.setup(context); + inputOperator.outputPort.setup(tpc); + inputOperator.activate(context); + inputOperator.setStartRow(10); inputOperator.beginWindow(1); inputOperator.emitTuples(); @@ -345,15 +341,20 @@ public class CassandraOperatorTest Assert.assertEquals("rows from db", 14, sink.collectedTuples.size()); sink.clear(); + inputOperator.columnDataTypes.clear(); + String query3 = "SELECT * FROM " + KEYSPACE + "." + "%t where token(%p) > %v LIMIT %l;"; inputOperator.setQuery(query3); + inputOperator.setup(context); + inputOperator.outputPort.setup(tpc); + inputOperator.activate(context); + inputOperator.setStartRow(1); inputOperator.setLimit(10); inputOperator.beginWindow(2); inputOperator.emitTuples(); inputOperator.endWindow(); Assert.assertEquals("rows from db", 10, sink.collectedTuples.size()); - } public static class TestPojo
