Introduce maxTtl for queue message and message payload data, default is two 
weeks.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/8b79fb87
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/8b79fb87
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/8b79fb87

Branch: refs/heads/usergrid-1318-queue
Commit: 8b79fb875a3aea88458e08429286d2119d6a3c88
Parents: 6c204b9
Author: Dave Johnson <[email protected]>
Authored: Tue Sep 20 14:30:20 2016 -0400
Committer: Dave Johnson <[email protected]>
Committed: Tue Sep 20 14:30:20 2016 -0400

----------------------------------------------------------------------
 .../usergrid/persistence/qakka/QakkaFig.java    |  7 +++++++
 .../impl/QueueMessageSerializationImpl.java     | 22 ++++++++++++--------
 2 files changed, 20 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/8b79fb87/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
index aa4e349..c66001d 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
@@ -62,6 +62,8 @@ public interface QakkaFig extends GuicyFig, Serializable {
 
     String QUEUE_LONG_POLL_TIME_MILLIS            = 
"queue.long.polling.time.millis";
 
+    String QUEUE_MAX_TTL                          = "queue.max.ttl";
+
 
     /** True if Qakka is running standlone */
     @Key(QUEUE_STANDALONE)
@@ -141,4 +143,9 @@ public interface QakkaFig extends GuicyFig, Serializable {
     @Key(QUEUE_LONG_POLL_TIME_MILLIS)
     @Default("5000")
     long getLongPollTimeMillis();
+
+    /** Max time-to-live for queue message and payload data */
+    @Key(QUEUE_MAX_TTL)
+    @Default("1209600") // default is two weeks
+    int getMaxTtlSeconds();
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/8b79fb87/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java
index d868021..02862c4 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java
@@ -19,7 +19,6 @@
 
 package org.apache.usergrid.persistence.qakka.serialization.queuemessages.impl;
 
-import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.core.Row;
 import com.datastax.driver.core.Statement;
 import com.datastax.driver.core.querybuilder.Clause;
@@ -32,6 +31,7 @@ import org.apache.usergrid.persistence.core.CassandraConfig;
 import 
org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamilyDefinition;
 import org.apache.usergrid.persistence.core.datastax.TableDefinition;
 import 
org.apache.usergrid.persistence.core.datastax.impl.TableDefinitionStringImpl;
+import org.apache.usergrid.persistence.qakka.QakkaFig;
 import org.apache.usergrid.persistence.qakka.core.CassandraClient;
 import org.apache.usergrid.persistence.qakka.core.QakkaUtils;
 import 
org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
@@ -49,12 +49,13 @@ import java.util.UUID;
 
 
 public class QueueMessageSerializationImpl implements 
QueueMessageSerialization {
-
     private static final Logger logger = LoggerFactory.getLogger( 
QueueMessageSerializationImpl.class );
 
     private final CassandraClient cassandraClient;
     private final CassandraConfig cassandraConfig;
 
+    private final int maxTtl;
+
     private final ActorSystemFig            actorSystemFig;
     private final ShardStrategy             shardStrategy;
     private final ShardCounterSerialization shardCounterSerialization;
@@ -109,17 +110,20 @@ public class QueueMessageSerializationImpl implements 
QueueMessageSerialization
 
     @Inject
     public QueueMessageSerializationImpl(
-            CassandraConfig              cassandraConfig,
+            CassandraConfig           cassandraConfig,
             ActorSystemFig            actorSystemFig,
             ShardStrategy             shardStrategy,
             ShardCounterSerialization shardCounterSerialization,
-            CassandraClient           cassandraClient
+            CassandraClient           cassandraClient,
+            QakkaFig                  qakkaFig
         ) {
         this.cassandraConfig              = cassandraConfig;
         this.actorSystemFig            = actorSystemFig;
         this.shardStrategy             = shardStrategy;
         this.shardCounterSerialization = shardCounterSerialization;
         this.cassandraClient = cassandraClient;
+
+        this.maxTtl = qakkaFig.getMaxTtlSeconds();
     }
 
 
@@ -151,7 +155,8 @@ public class QueueMessageSerializationImpl implements 
QueueMessageSerialization
                 .value( COLUMN_MESSAGE_ID,       message.getMessageId())
                 .value( COLUMN_QUEUE_MESSAGE_ID, queueMessageId)
                 .value( COLUMN_INFLIGHT_AT,      inflightAt )
-                .value( COLUMN_QUEUED_AT,        queuedAt);
+                .value( COLUMN_QUEUED_AT,        queuedAt)
+            .using( QueryBuilder.ttl( maxTtl ) );
 
         cassandraClient.getQueueMessageSession().execute(insert);
 
@@ -244,9 +249,7 @@ public class QueueMessageSerializationImpl implements 
QueueMessageSerialization
                 .and(shardIdClause)
                 .and(queueMessageIdClause);
 
-        ResultSet resultSet = 
cassandraClient.getQueueMessageSession().execute( delete );
-
-        String s = "s";
+        cassandraClient.getQueueMessageSession().execute( delete );
     }
 
 
@@ -275,7 +278,8 @@ public class QueueMessageSerializationImpl implements 
QueueMessageSerialization
         Statement insert = QueryBuilder.insertInto(TABLE_MESSAGE_DATA)
                 .value( COLUMN_MESSAGE_ID, messageId)
                 .value( COLUMN_MESSAGE_DATA, messageBody.getBlob())
-                .value( COLUMN_CONTENT_TYPE, messageBody.getContentType());
+                .value( COLUMN_CONTENT_TYPE, messageBody.getContentType())
+            .using( QueryBuilder.ttl( maxTtl ) );
 
         cassandraClient.getApplicationSession().execute(insert);
     }

Reply via email to