Repository: incubator-apex-malhar Updated Branches: refs/heads/master 9c11400a2 -> a91287ce7
APEXMALHAR-1988: Updating cassandra batch fetch logic to use Cassandra Paging feature Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/a91287ce Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/a91287ce Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/a91287ce Branch: refs/heads/master Commit: a91287ce7652f3f0000498ebfa33c16315f44e45 Parents: 9c11400 Author: Priyanka Gugale <[email protected]> Authored: Wed Feb 10 14:45:03 2016 +0530 Committer: Priyanka Gugale <[email protected]> Committed: Wed May 25 15:35:29 2016 -0700 ---------------------------------------------------------------------- .../AbstractCassandraInputOperator.java | 41 +++++++--- .../cassandra/CassandraPOJOInputOperator.java | 79 ++++++-------------- 2 files changed, 53 insertions(+), 67 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a91287ce/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraInputOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraInputOperator.java index 366d97a..7bd47fc 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraInputOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraInputOperator.java @@ -19,14 +19,15 @@ package com.datatorrent.contrib.cassandra; +import com.datastax.driver.core.PagingState; import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.Row; +import com.datastax.driver.core.SimpleStatement; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.datatorrent.lib.db.AbstractStoreInputOperator; -import com.datatorrent.api.AutoMetric; import com.datatorrent.api.DefaultOutputPort; import com.datatorrent.netlet.util.DTThrowable; @@ -45,16 +46,14 @@ import com.datatorrent.netlet.util.DTThrowable; public abstract class AbstractCassandraInputOperator<T> extends AbstractStoreInputOperator<T, CassandraStore> { private static final Logger logger = LoggerFactory.getLogger(AbstractCassandraInputOperator.class); - + private PagingState nextPageState; + private int fetchSize; int waitForDataTimeout = 100; - @AutoMetric - protected long tuplesRead; @Override public void beginWindow(long l) { super.beginWindow(l); - tuplesRead = 0; } /** @@ -112,20 +111,25 @@ public abstract class AbstractCassandraInputOperator<T> extends AbstractStoreInp String query = queryToRetrieveData(); logger.debug("select statement: {}", query); + SimpleStatement stmt = new SimpleStatement(query); + stmt.setFetchSize(fetchSize); try { - ResultSet result = store.getSession().execute(query); + if (nextPageState != null) { + stmt.setPagingState(nextPageState); + } + ResultSet result = store.getSession().execute(stmt); + nextPageState = result.getExecutionInfo().getPagingState(); + if (!result.isExhausted()) { for (Row row : result) { T tuple = getTuple(row); emit(tuple); - tuplesRead++; } } else { // No rows available wait for some time before retrying so as to not continuously slam the database Thread.sleep(waitForDataTimeout); } - } - catch (Exception ex) { + } catch (Exception ex) { store.disconnect(); DTThrowable.rethrow(ex); } @@ -135,4 +139,23 @@ public abstract class AbstractCassandraInputOperator<T> extends AbstractStoreInp { outputPort.emit(tuple); } + + /** + * Get page fetch Size + * @return + */ + public int getFetchSize() + { + return fetchSize; + } + + /** + * Set page fetch size + * @param fetchSize + */ + public void setFetchSize(int fetchSize) + { + this.fetchSize = fetchSize; + } + } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a91287ce/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java index 3d87711..9c56178 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java @@ -61,6 +61,7 @@ import com.datatorrent.lib.util.PojoUtils.*; @Evolving public class CassandraPOJOInputOperator extends AbstractCassandraInputOperator<Object> implements Operator.ActivationListener<OperatorContext> { + private String tokenQuery; @NotNull private List<FieldInfo> fieldInfos; private Number startRow; @@ -71,15 +72,9 @@ public class CassandraPOJOInputOperator extends AbstractCassandraInputOperator<O private String query; @NotNull private String primaryKeyColumn; - @Min(1) private int limit = 10; - private String TOKEN_QUERY; - private transient DataType primaryKeyColumnType; - private transient Row lastRowInBatch; - private transient BoundStatement fetchKeyStatement; - protected final transient List<Object> setters; protected final transient List<DataType> columnDataTypes; protected transient Class<?> pojoClass; @@ -94,14 +89,19 @@ public class CassandraPOJOInputOperator extends AbstractCassandraInputOperator<O } }; - /* - * Number of records to be fetched in one time from cassandra table. + /** + * Gets number of records to be fetched at one time from cassandra table. + * @return limit */ public int getLimit() { return limit; } + /** + * Sets number of records to be fetched at one time from cassandra table. + * @param limit + */ public void setLimit(int limit) { this.limit = limit; @@ -138,7 +138,7 @@ public class CassandraPOJOInputOperator extends AbstractCassandraInputOperator<O /* * Parameterized query with parameters such as %t for table name , %p for primary key, %s for start value and %l for limit. * Example of retrieveQuery: - * select * from %t where token(%p) > %s limit %l; + * select * from %t where token(%p) > %s LIMIT %l; */ public String getQuery() { @@ -196,22 +196,22 @@ public class CassandraPOJOInputOperator extends AbstractCassandraInputOperator<O public void setup(OperatorContext context) { super.setup(context); - Long keyToken; - TOKEN_QUERY = "select token(" + primaryKeyColumn + ") from " + store.keyspace + "." + tablename + " where " + primaryKeyColumn + " = ?"; - PreparedStatement statement = store.getSession().prepare(TOKEN_QUERY); - fetchKeyStatement = new BoundStatement(statement); - if (startRow != null && (keyToken = fetchKeyTokenFromDB(startRow)) != null) { - startRowToken = keyToken; - } + tokenQuery = "select token(" + primaryKeyColumn + ") from " + store.keyspace + "." + tablename + " where " + primaryKeyColumn + " = ?"; } @Override public void activate(OperatorContext context) { + Long keyToken; + if (startRow != null) { + if ((keyToken = fetchKeyTokenFromDB(startRow)) != null) { + startRowToken = keyToken; + } + } + com.datastax.driver.core.ResultSet rs = store.getSession().execute("select * from " + store.keyspace + "." + tablename + " LIMIT " + 1); ColumnDefinitions rsMetaData = rs.getColumnDefinitions(); - primaryKeyColumnType = rsMetaData.getType(primaryKeyColumn); if (query.contains("%t")) { query = query.replace("%t", tablename); } @@ -282,7 +282,6 @@ public class CassandraPOJOInputOperator extends AbstractCassandraInputOperator<O @SuppressWarnings("unchecked") public Object getTuple(Row row) { - lastRowInBatch = row; Object obj; try { @@ -370,48 +369,12 @@ public class CassandraPOJOInputOperator extends AbstractCassandraInputOperator<O return query; } - - /* - * Overriding emitTupes to save primarykey column value from last row in batch. - */ - @Override - public void emitTuples() - { - super.emitTuples(); - if (lastRowInBatch != null) { - startRowToken = getPrimaryKeyToken(primaryKeyColumnType.getName()); - } - } - - private Long getPrimaryKeyToken(DataType.Name primaryKeyDataType) - { - Object keyValue; - switch (primaryKeyDataType) { - case UUID: - keyValue = lastRowInBatch.getUUID(primaryKeyColumn); - break; - case INT: - keyValue = lastRowInBatch.getInt(primaryKeyColumn); - break; - case COUNTER: - keyValue = lastRowInBatch.getLong(primaryKeyColumn); - break; - case FLOAT: - keyValue = lastRowInBatch.getFloat(primaryKeyColumn); - break; - case DOUBLE: - keyValue = lastRowInBatch.getDouble(primaryKeyColumn); - break; - default: - throw new RuntimeException("unsupported data type " + primaryKeyColumnType.getName()); - } - return fetchKeyTokenFromDB(keyValue); - } - private Long fetchKeyTokenFromDB(Object keyValue) { - fetchKeyStatement.bind(keyValue); - ResultSet rs = store.getSession().execute(fetchKeyStatement); + PreparedStatement statement = store.getSession().prepare(tokenQuery); + BoundStatement boundStatement = new BoundStatement(statement); + boundStatement.bind(keyValue); + ResultSet rs = store.getSession().execute(boundStatement); Long keyTokenValue = rs.one().getLong(0); return keyTokenValue; }
