APEXMALHAR-1988: Updating cassandra batch fetch logic to use Cassandra Paging feature
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/d094a04f Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/d094a04f Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/d094a04f Branch: refs/heads/master Commit: d094a04fcbb3cea459610e55dae30ef2c86179a7 Parents: 4b126aa Author: Priyanka Gugale <[email protected]> Authored: Wed Feb 10 14:45:03 2016 +0530 Committer: Priyanka Gugale <[email protected]> Committed: Tue May 17 14:31:14 2016 -0700 ---------------------------------------------------------------------- .../AbstractCassandraInputOperator.java | 36 ++++++++- .../cassandra/CassandraPOJOInputOperator.java | 77 +++++--------------- 2 files changed, 52 insertions(+), 61 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/d094a04f/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraInputOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraInputOperator.java index 366d97a..0f2f0d0 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraInputOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraInputOperator.java @@ -19,8 +19,10 @@ package com.datatorrent.contrib.cassandra; +import com.datastax.driver.core.PagingState; import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.Row; +import com.datastax.driver.core.SimpleStatement; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,7 +47,8 @@ import com.datatorrent.netlet.util.DTThrowable; public abstract class AbstractCassandraInputOperator<T> extends AbstractStoreInputOperator<T, CassandraStore> { private static final Logger logger = LoggerFactory.getLogger(AbstractCassandraInputOperator.class); - + private PagingState nextPageState; + private int fetchSize; int waitForDataTimeout = 100; @AutoMetric protected long tuplesRead; @@ -112,8 +115,15 @@ public abstract class AbstractCassandraInputOperator<T> extends AbstractStoreInp String query = queryToRetrieveData(); logger.debug("select statement: {}", query); + SimpleStatement stmt = new SimpleStatement(query); + stmt.setFetchSize(fetchSize); try { - ResultSet result = store.getSession().execute(query); + if (nextPageState != null) { + stmt.setPagingState(nextPageState); + } + ResultSet result = store.getSession().execute(stmt); + nextPageState = result.getExecutionInfo().getPagingState(); + if (!result.isExhausted()) { for (Row row : result) { T tuple = getTuple(row); @@ -124,8 +134,7 @@ public abstract class AbstractCassandraInputOperator<T> extends AbstractStoreInp // No rows available wait for some time before retrying so as to not continuously slam the database Thread.sleep(waitForDataTimeout); } - } - catch (Exception ex) { + } catch (Exception ex) { store.disconnect(); DTThrowable.rethrow(ex); } @@ -135,4 +144,23 @@ public abstract class AbstractCassandraInputOperator<T> extends AbstractStoreInp { outputPort.emit(tuple); } + + /** + * Get page fetch Size + * @return + */ + public int getFetchSize() + { + return fetchSize; + } + + /** + * Set page fetch size + * @param fetchSize + */ + public void setFetchSize(int fetchSize) + { + this.fetchSize = fetchSize; + } + } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/d094a04f/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java index 3d87711..b7301cb 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java @@ -61,6 +61,7 @@ import com.datatorrent.lib.util.PojoUtils.*; @Evolving public class CassandraPOJOInputOperator extends AbstractCassandraInputOperator<Object> implements Operator.ActivationListener<OperatorContext> { + private String TOKEN_QUERY; @NotNull private List<FieldInfo> fieldInfos; private Number startRow; @@ -71,15 +72,9 @@ public class CassandraPOJOInputOperator extends AbstractCassandraInputOperator<O private String query; @NotNull private String primaryKeyColumn; - @Min(1) private int limit = 10; - private String TOKEN_QUERY; - private transient DataType primaryKeyColumnType; - private transient Row lastRowInBatch; - private transient BoundStatement fetchKeyStatement; - protected final transient List<Object> setters; protected final transient List<DataType> columnDataTypes; protected transient Class<?> pojoClass; @@ -94,14 +89,19 @@ public class CassandraPOJOInputOperator extends AbstractCassandraInputOperator<O } }; - /* - * Number of records to be fetched in one time from cassandra table. + /** + * Gets number of records to be fetched at one time from cassandra table. + * @return limit */ public int getLimit() { return limit; } + /** + * Sets number of records to be fetched at one time from cassandra table. + * @param limit + */ public void setLimit(int limit) { this.limit = limit; @@ -138,7 +138,7 @@ public class CassandraPOJOInputOperator extends AbstractCassandraInputOperator<O /* * Parameterized query with parameters such as %t for table name , %p for primary key, %s for start value and %l for limit. * Example of retrieveQuery: - * select * from %t where token(%p) > %s limit %l; + * select * from %t where token(%p) > %s LIMIT %l; */ public String getQuery() { @@ -196,22 +196,22 @@ public class CassandraPOJOInputOperator extends AbstractCassandraInputOperator<O public void setup(OperatorContext context) { super.setup(context); - Long keyToken; TOKEN_QUERY = "select token(" + primaryKeyColumn + ") from " + store.keyspace + "." + tablename + " where " + primaryKeyColumn + " = ?"; - PreparedStatement statement = store.getSession().prepare(TOKEN_QUERY); - fetchKeyStatement = new BoundStatement(statement); - if (startRow != null && (keyToken = fetchKeyTokenFromDB(startRow)) != null) { - startRowToken = keyToken; - } } @Override public void activate(OperatorContext context) { + Long keyToken; + if (startRow != null) { + if ((keyToken = fetchKeyTokenFromDB(startRow)) != null) { + startRowToken = keyToken; + } + } + com.datastax.driver.core.ResultSet rs = store.getSession().execute("select * from " + store.keyspace + "." + tablename + " LIMIT " + 1); ColumnDefinitions rsMetaData = rs.getColumnDefinitions(); - primaryKeyColumnType = rsMetaData.getType(primaryKeyColumn); if (query.contains("%t")) { query = query.replace("%t", tablename); } @@ -282,7 +282,6 @@ public class CassandraPOJOInputOperator extends AbstractCassandraInputOperator<O @SuppressWarnings("unchecked") public Object getTuple(Row row) { - lastRowInBatch = row; Object obj; try { @@ -370,48 +369,12 @@ public class CassandraPOJOInputOperator extends AbstractCassandraInputOperator<O return query; } - - /* - * Overriding emitTupes to save primarykey column value from last row in batch. - */ - @Override - public void emitTuples() - { - super.emitTuples(); - if (lastRowInBatch != null) { - startRowToken = getPrimaryKeyToken(primaryKeyColumnType.getName()); - } - } - - private Long getPrimaryKeyToken(DataType.Name primaryKeyDataType) - { - Object keyValue; - switch (primaryKeyDataType) { - case UUID: - keyValue = lastRowInBatch.getUUID(primaryKeyColumn); - break; - case INT: - keyValue = lastRowInBatch.getInt(primaryKeyColumn); - break; - case COUNTER: - keyValue = lastRowInBatch.getLong(primaryKeyColumn); - break; - case FLOAT: - keyValue = lastRowInBatch.getFloat(primaryKeyColumn); - break; - case DOUBLE: - keyValue = lastRowInBatch.getDouble(primaryKeyColumn); - break; - default: - throw new RuntimeException("unsupported data type " + primaryKeyColumnType.getName()); - } - return fetchKeyTokenFromDB(keyValue); - } - private Long fetchKeyTokenFromDB(Object keyValue) { - fetchKeyStatement.bind(keyValue); - ResultSet rs = store.getSession().execute(fetchKeyStatement); + PreparedStatement statement = store.getSession().prepare(TOKEN_QUERY); + BoundStatement boundStatement = new BoundStatement(statement); + boundStatement.bind(keyValue); + ResultSet rs = store.getSession().execute(boundStatement); Long keyTokenValue = rs.one().getLong(0); return keyTokenValue; }
