Introduce maxTtl for queue message and message payload data, default is two weeks.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/8b79fb87 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/8b79fb87 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/8b79fb87 Branch: refs/heads/usergrid-1318-queue Commit: 8b79fb875a3aea88458e08429286d2119d6a3c88 Parents: 6c204b9 Author: Dave Johnson <[email protected]> Authored: Tue Sep 20 14:30:20 2016 -0400 Committer: Dave Johnson <[email protected]> Committed: Tue Sep 20 14:30:20 2016 -0400 ---------------------------------------------------------------------- .../usergrid/persistence/qakka/QakkaFig.java | 7 +++++++ .../impl/QueueMessageSerializationImpl.java | 22 ++++++++++++-------- 2 files changed, 20 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/8b79fb87/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java index aa4e349..c66001d 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java @@ -62,6 +62,8 @@ public interface QakkaFig extends GuicyFig, Serializable { String QUEUE_LONG_POLL_TIME_MILLIS = "queue.long.polling.time.millis"; + String QUEUE_MAX_TTL = "queue.max.ttl"; + /** True if Qakka is running standlone */ @Key(QUEUE_STANDALONE) @@ -141,4 +143,9 @@ public interface QakkaFig extends GuicyFig, Serializable { @Key(QUEUE_LONG_POLL_TIME_MILLIS) @Default("5000") long getLongPollTimeMillis(); + + /** Max time-to-live for queue message and payload data */ + @Key(QUEUE_MAX_TTL) + @Default("1209600") // default is two weeks + int getMaxTtlSeconds(); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/8b79fb87/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java index d868021..02862c4 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java @@ -19,7 +19,6 @@ package org.apache.usergrid.persistence.qakka.serialization.queuemessages.impl; -import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.Row; import com.datastax.driver.core.Statement; import com.datastax.driver.core.querybuilder.Clause; @@ -32,6 +31,7 @@ import org.apache.usergrid.persistence.core.CassandraConfig; import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamilyDefinition; import org.apache.usergrid.persistence.core.datastax.TableDefinition; import org.apache.usergrid.persistence.core.datastax.impl.TableDefinitionStringImpl; +import org.apache.usergrid.persistence.qakka.QakkaFig; import org.apache.usergrid.persistence.qakka.core.CassandraClient; import org.apache.usergrid.persistence.qakka.core.QakkaUtils; import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage; @@ -49,12 +49,13 @@ import java.util.UUID; public class QueueMessageSerializationImpl implements QueueMessageSerialization { - private static final Logger logger = LoggerFactory.getLogger( QueueMessageSerializationImpl.class ); private final CassandraClient cassandraClient; private final CassandraConfig cassandraConfig; + private final int maxTtl; + private final ActorSystemFig actorSystemFig; private final ShardStrategy shardStrategy; private final ShardCounterSerialization shardCounterSerialization; @@ -109,17 +110,20 @@ public class QueueMessageSerializationImpl implements QueueMessageSerialization @Inject public QueueMessageSerializationImpl( - CassandraConfig cassandraConfig, + CassandraConfig cassandraConfig, ActorSystemFig actorSystemFig, ShardStrategy shardStrategy, ShardCounterSerialization shardCounterSerialization, - CassandraClient cassandraClient + CassandraClient cassandraClient, + QakkaFig qakkaFig ) { this.cassandraConfig = cassandraConfig; this.actorSystemFig = actorSystemFig; this.shardStrategy = shardStrategy; this.shardCounterSerialization = shardCounterSerialization; this.cassandraClient = cassandraClient; + + this.maxTtl = qakkaFig.getMaxTtlSeconds(); } @@ -151,7 +155,8 @@ public class QueueMessageSerializationImpl implements QueueMessageSerialization .value( COLUMN_MESSAGE_ID, message.getMessageId()) .value( COLUMN_QUEUE_MESSAGE_ID, queueMessageId) .value( COLUMN_INFLIGHT_AT, inflightAt ) - .value( COLUMN_QUEUED_AT, queuedAt); + .value( COLUMN_QUEUED_AT, queuedAt) + .using( QueryBuilder.ttl( maxTtl ) ); cassandraClient.getQueueMessageSession().execute(insert); @@ -244,9 +249,7 @@ public class QueueMessageSerializationImpl implements QueueMessageSerialization .and(shardIdClause) .and(queueMessageIdClause); - ResultSet resultSet = cassandraClient.getQueueMessageSession().execute( delete ); - - String s = "s"; + cassandraClient.getQueueMessageSession().execute( delete ); } @@ -275,7 +278,8 @@ public class QueueMessageSerializationImpl implements QueueMessageSerialization Statement insert = QueryBuilder.insertInto(TABLE_MESSAGE_DATA) .value( COLUMN_MESSAGE_ID, messageId) .value( COLUMN_MESSAGE_DATA, messageBody.getBlob()) - .value( COLUMN_CONTENT_TYPE, messageBody.getContentType()); + .value( COLUMN_CONTENT_TYPE, messageBody.getContentType()) + .using( QueryBuilder.ttl( maxTtl ) ); cassandraClient.getApplicationSession().execute(insert); }
