http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/exceptions/QakkaException.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/exceptions/QakkaException.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/exceptions/QakkaException.java new file mode 100644 index 0000000..84e0681 --- /dev/null +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/exceptions/QakkaException.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.qakka.exceptions; + + +public class QakkaException extends Exception { + + public QakkaException(String message) { + super( message ); + } + + public QakkaException(String message, Throwable cause) { + super( message, cause ); + } +}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/exceptions/QakkaRuntimeException.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/exceptions/QakkaRuntimeException.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/exceptions/QakkaRuntimeException.java new file mode 100644 index 0000000..fbb788b --- /dev/null +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/exceptions/QakkaRuntimeException.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.qakka.exceptions; + + +public class QakkaRuntimeException extends RuntimeException { + + public QakkaRuntimeException(String message) { + super( message ); + } + + public QakkaRuntimeException(String message, Throwable cause) { + super( message, cause ); + } +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardMessageIterator.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardMessageIterator.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardMessageIterator.java new file mode 100644 index 0000000..1c733a6 --- /dev/null +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardMessageIterator.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.qakka.serialization; + +import com.datastax.driver.core.Row; +import com.datastax.driver.core.Statement; +import com.datastax.driver.core.querybuilder.Clause; +import com.datastax.driver.core.querybuilder.QueryBuilder; +import org.apache.usergrid.persistence.qakka.core.CassandraClient; +import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage; +import org.apache.usergrid.persistence.qakka.serialization.queuemessages.impl.QueueMessageSerializationImpl; +import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; + +import static org.apache.usergrid.persistence.qakka.serialization.queuemessages.impl.QueueMessageSerializationImpl.*; + + +public class MultiShardMessageIterator implements Iterator<DatabaseQueueMessage> { + + private static final Logger logger = LoggerFactory.getLogger( MultiShardMessageIterator.class ); + + private final CassandraClient cassandraClient; + + + private final int PAGE_SIZE = 100; + private final String queueName; + private final String region; + private final DatabaseQueueMessage.Type messageType; + private final Iterator<Shard> shardIterator; + + private Iterator<DatabaseQueueMessage> currentIterator; + private Shard currentShard; + private UUID nextStart; + + + public MultiShardMessageIterator( + final CassandraClient cassandraClient, + final String queueName, + final String region, + final DatabaseQueueMessage.Type messageType, + final Iterator<Shard> shardIterator, + final UUID nextStart) { + + this.queueName = queueName; + this.region = region; + this.messageType = messageType; + this.shardIterator = shardIterator; + this.nextStart = nextStart; + this.cassandraClient = cassandraClient; + + if (shardIterator == null) { + throw new RuntimeException("shardIterator cannot be null"); + } + + } + + @Override + public boolean hasNext() { + + if ( shardIterator.hasNext() && currentIterator == null) { + advance(); + } + + if ( shardIterator.hasNext() && !currentIterator.hasNext()) { + advance(); + } + + if ( !shardIterator.hasNext() && ( currentIterator == null || !currentIterator.hasNext()) ) { + advance(); + } + + return currentIterator.hasNext(); + } + + @Override + public DatabaseQueueMessage next() { + + if ( !hasNext() ) { + throw new NoSuchElementException( "No next message exists" ); + } + + return currentIterator.next(); + } + + private void advance(){ + + if (currentShard == null){ + currentShard = shardIterator.next(); + } + + Clause queueNameClause = QueryBuilder.eq( COLUMN_QUEUE_NAME, queueName); + Clause regionClause = QueryBuilder.eq( COLUMN_REGION, region); + Clause shardIdClause = QueryBuilder.eq( COLUMN_SHARD_ID, currentShard.getShardId()); + + // if we have a pointer from the shard and this is the first seek, init from the pointer's position + if ( currentShard.getPointer() != null && nextStart == null ){ + nextStart = currentShard.getPointer(); + } + + Statement query; + + if ( nextStart == null) { + + query = QueryBuilder.select().all().from(QueueMessageSerializationImpl.getTableName(messageType)) + .where(queueNameClause) + .and(regionClause) + .and(shardIdClause) + .limit(PAGE_SIZE); + } else { + + Clause messageIdClause = QueryBuilder.gt( COLUMN_QUEUE_MESSAGE_ID, nextStart); + query = QueryBuilder.select().all().from(QueueMessageSerializationImpl.getTableName(messageType)) + .where(queueNameClause) + .and(regionClause) + .and(shardIdClause) + .and(messageIdClause) + .limit(PAGE_SIZE); + } + + List<Row> rows = cassandraClient.getSession().execute(query).all(); + + if ( (rows == null || rows.size() == 0) && shardIterator.hasNext()) { + + currentShard = shardIterator.next(); + advance(); + + } else { + + currentIterator = getIteratorFromRows(rows); + + } + } + + + private Iterator<DatabaseQueueMessage> getIteratorFromRows(List<Row> rows){ + + List<DatabaseQueueMessage> messages = new ArrayList<>(rows.size()); + + rows.forEach(row -> { + + final String queueName = row.getString( COLUMN_QUEUE_NAME); + final String region = row.getString( COLUMN_REGION); + final long shardId = row.getLong( COLUMN_SHARD_ID); + final UUID queueMessageId = row.getUUID( COLUMN_QUEUE_MESSAGE_ID); + final UUID messageId = row.getUUID( COLUMN_MESSAGE_ID); + final long queuedAt = row.getLong( COLUMN_QUEUED_AT); + final long inflightAt = row.getLong( COLUMN_INFLIGHT_AT); + + messages.add(new DatabaseQueueMessage( + messageId, messageType, queueName, region, shardId, queuedAt, inflightAt, queueMessageId)); + + //queueMessageId is internal to the messages_available and messages_inflight tables + nextStart = queueMessageId; + + }); + + return messages.iterator(); + + } + +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/Result.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/Result.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/Result.java new file mode 100644 index 0000000..b2b0934 --- /dev/null +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/Result.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.qakka.serialization; + +import com.datastax.driver.core.PagingState; + +import java.util.List; + + +/** + * Created by Dave Johnson ([email protected]) on 8/8/16. + */ +public interface Result<T> { + PagingState getPagingState(); + + List<T> getEntities(); +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/auditlog/AuditLog.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/auditlog/AuditLog.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/auditlog/AuditLog.java new file mode 100644 index 0000000..dcf0d1a --- /dev/null +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/auditlog/AuditLog.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.qakka.serialization.auditlog; + +import java.util.UUID; + + +public class AuditLog { + + public enum Status { SUCCESS, ERROR }; + + public enum Action { SEND, ACK, GET, REQUEUE }; + + Action action; + Status status; + String queueName; + String region; + UUID messageId; + + UUID queueMessageId; + long transfer_time; + + public AuditLog( + Action action, + Status status, + String queueName, + String region, + UUID messageId, + UUID queueMessageId, + long transfer_time) { + + this.action = action; + this.status = status; + this.queueName = queueName; + this.region = region; + this.messageId = messageId; + this.queueMessageId = queueMessageId; + this.transfer_time = transfer_time; + } + + public Action getAction() { + return action; + } + + public Status getStatus() { + return status; + } + + public String getQueueName() { + return queueName; + } + + public void setQueueName(String queueName) { + this.queueName = queueName; + } + + public String getRegion() { + return region; + } + + public void setRegion(String region) { + this.region = region; + } + + public UUID getMessageId() { + return messageId; + } + + public void setMessageId(UUID messageId) { + this.messageId = messageId; + } + + public long getTransfer_time() { + return transfer_time; + } + + public void setTransfer_time(long transfer_time) { + this.transfer_time = transfer_time; + } + + public UUID getQueueMessageId() { + return queueMessageId; + } +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/auditlog/AuditLogSerialization.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/auditlog/AuditLogSerialization.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/auditlog/AuditLogSerialization.java new file mode 100644 index 0000000..95f2dbe --- /dev/null +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/auditlog/AuditLogSerialization.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.qakka.serialization.auditlog; + +import org.apache.usergrid.persistence.core.migration.schema.Migration; +import org.apache.usergrid.persistence.qakka.serialization.Result; + +import java.util.UUID; + + +public interface AuditLogSerialization extends Migration { + + /** + * Record audit log record. + */ + void recordAuditLog( + AuditLog.Action action, + AuditLog.Status status, + String queueName, + String region, + UUID messageId, + UUID queueMessageId); + + /** + * Get all audit logs for a specific queue message. + */ + Result<AuditLog> getAuditLogs(UUID messageId); +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/auditlog/impl/AuditLogSerializationImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/auditlog/impl/AuditLogSerializationImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/auditlog/impl/AuditLogSerializationImpl.java new file mode 100644 index 0000000..d9dbab6 --- /dev/null +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/auditlog/impl/AuditLogSerializationImpl.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.qakka.serialization.auditlog.impl; + +import com.datastax.driver.core.PagingState; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Statement; +import com.datastax.driver.core.querybuilder.QueryBuilder; +import com.google.inject.Inject; +import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamilyDefinition; +import org.apache.usergrid.persistence.core.datastax.TableDefinition; +import org.apache.usergrid.persistence.core.datastax.impl.TableDefinitionStringImpl; +import org.apache.usergrid.persistence.qakka.core.CassandraClient; +import org.apache.usergrid.persistence.qakka.serialization.Result; +import org.apache.usergrid.persistence.qakka.serialization.auditlog.AuditLog; +import org.apache.usergrid.persistence.qakka.serialization.auditlog.AuditLogSerialization; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.UUID; +import java.util.stream.Collectors; + + +public class AuditLogSerializationImpl implements AuditLogSerialization { + + private static final Logger logger = LoggerFactory.getLogger( AuditLogSerializationImpl.class ); + + private final CassandraClient cassandraClient; + + public final static String TABLE_AUDIT_LOG = "audit_log"; + + public final static String COLUMN_ACTION = "action"; + public final static String COLUMN_STATUS = "status"; + public final static String COLUMN_QUEUE_NAME = "queue_name"; + public final static String COLUMN_REGION = "region"; + public final static String COLUMN_MESSAGE_ID = "message_id"; + public final static String COLUMN_QUEUE_MESSAGE_ID = "queue_message_id"; + public final static String COLUMN_TRANSFER_TIME = "transfer_time"; + + + // design note: want to be able to query this by message_id, so we can do "garbage collection" + // of message data items that have been processed in all target regions + + static final String CQL = + "CREATE TABLE IF NOT EXISTS audit_log ( " + + "action text, " + + "status text, " + + "queue_name text, " + + "region text, " + + "message_id timeuuid, " + + "queue_message_id timeuuid, " + + "transfer_time bigint, " + + "PRIMARY KEY (message_id, transfer_time) " + + ") WITH CLUSTERING ORDER BY (transfer_time ASC); "; + + + @Inject + public AuditLogSerializationImpl( CassandraClient cassandraClient ) { + this.cassandraClient = cassandraClient; + } + + + @Override + public void recordAuditLog( + AuditLog.Action action, + AuditLog.Status status, + String queueName, + String region, + UUID messageId, + UUID queueMessageId ) { + + Statement insert = QueryBuilder.insertInto(TABLE_AUDIT_LOG) + .value(COLUMN_ACTION, action.toString() ) + .value(COLUMN_STATUS, status.toString() ) + .value(COLUMN_QUEUE_NAME, queueName ) + .value(COLUMN_REGION, region ) + .value(COLUMN_MESSAGE_ID, messageId ) + .value(COLUMN_QUEUE_MESSAGE_ID, queueMessageId ) + .value(COLUMN_TRANSFER_TIME, System.currentTimeMillis() ); + cassandraClient.getSession().execute(insert); + } + + + @Override + public Result<AuditLog> getAuditLogs( UUID messageId ) { + + Statement query = QueryBuilder.select().all().from(TABLE_AUDIT_LOG) + .where( QueryBuilder.eq( COLUMN_MESSAGE_ID, messageId ) ); + + ResultSet rs = cassandraClient.getSession().execute( query ); + + final List<AuditLog> auditLogs = rs.all().stream().map( row -> + new AuditLog( + AuditLog.Action.valueOf( row.getString( COLUMN_ACTION )), + AuditLog.Status.valueOf( row.getString( COLUMN_STATUS )), + row.getString( COLUMN_QUEUE_NAME ), + row.getString( COLUMN_REGION ), + row.getUUID( COLUMN_MESSAGE_ID ), + row.getUUID( COLUMN_QUEUE_MESSAGE_ID ), + row.getLong( COLUMN_TRANSFER_TIME ) ) + ).collect( Collectors.toList() ); + + return new Result<AuditLog>() { + + @Override + public PagingState getPagingState() { + return null; // no paging + } + + @Override + public List<AuditLog> getEntities() { + return auditLogs; + } + }; + + } + + + @Override + public Collection<MultiTenantColumnFamilyDefinition> getColumnFamilies() { + return Collections.EMPTY_LIST; + } + + @Override + public Collection<TableDefinition> getTables() { + return Collections.singletonList( new TableDefinitionStringImpl( TABLE_AUDIT_LOG, CQL ) ); + } +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/DatabaseQueueMessage.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/DatabaseQueueMessage.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/DatabaseQueueMessage.java new file mode 100644 index 0000000..dab47d5 --- /dev/null +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/DatabaseQueueMessage.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.qakka.serialization.queuemessages; + +import java.util.UUID; + + +public class DatabaseQueueMessage { + + public enum Type { + DEFAULT, INFLIGHT + } + + private final String queueName; + private final String region; + private final Long queuedAt; + private final UUID messageId; + private final UUID queueMessageId; + + private Type type; + private Long inflightAt; + + private Long shardId; + + + public DatabaseQueueMessage( + final UUID messageId, + final Type type, + final String queueName, + final String region, + final Long shardId, + final Long queuedAt, + final Long inflightAt, + UUID queueMessageId){ + + this.messageId = messageId; + this.type = type; + this.queueName = queueName; + this.region = region; + this.shardId = shardId; + this.queuedAt = queuedAt; + this.inflightAt = inflightAt; + this.queueMessageId = queueMessageId; + + } + + public void setType(Type type) { + this.type = type; + } + + public String getQueueName() { + return queueName; + } + + public String getRegion() { + return region; + } + + public Long getShardId() { + return shardId; + } + + public UUID getMessageId() { + return messageId; + } + + public Type getType() { + return type; + } + + public Long getQueuedAt() { + return queuedAt; + } + + public UUID getQueueMessageId() { + return queueMessageId; + } + + public Long getInflightAt() { + return inflightAt; + } + + public void setInflightAt(Long inflightAt) { + this.inflightAt = inflightAt; + } + + public void setShardId(Long shardId) { + this.shardId = shardId; + } + + + + @Override + public int hashCode() { + int result = queueName.hashCode(); + result = ( 31 * result ) + region.hashCode(); + result = ( 31 * result ) + (int)( shardId != null ? shardId : 0L ); + result = ( 31 * result ) + messageId.hashCode(); + result = ( 31 * result ) + type.hashCode(); + + return result; + } + + @Override + public boolean equals(Object obj) { + + if( this == obj){ + return true; + } + + if( !(obj instanceof DatabaseQueueMessage)){ + return false; + } + + DatabaseQueueMessage that = (DatabaseQueueMessage) obj; + + if( !this.queueName.equalsIgnoreCase(that.queueName)){ + return false; + } + if( !this.region.equalsIgnoreCase(that.region)){ + return false; + } + if( this.shardId != that.shardId){ + return false; + } + if( !messageId.equals(that.messageId)){ + return false; + } + if( !type.equals(that.type)){ + return false; + } + + return true; + + } + + +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/DatabaseQueueMessageBody.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/DatabaseQueueMessageBody.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/DatabaseQueueMessageBody.java new file mode 100644 index 0000000..c4c7fce --- /dev/null +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/DatabaseQueueMessageBody.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.qakka.serialization.queuemessages; + +import com.google.common.base.Preconditions; + +import java.nio.ByteBuffer; + + +public class DatabaseQueueMessageBody { + + + private final ByteBuffer blob; + private final String contentType; + + + public DatabaseQueueMessageBody(final ByteBuffer blob, final String contentType){ + + Preconditions.checkNotNull(blob, "Blob data cannot be null"); + + this.blob = blob; + this.contentType = contentType; + + } + + public ByteBuffer getBlob() { + return blob; + } + + public String getContentType() { + return contentType; + } + + +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/MessageCounterSerialization.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/MessageCounterSerialization.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/MessageCounterSerialization.java new file mode 100644 index 0000000..cbbf11f --- /dev/null +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/MessageCounterSerialization.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.usergrid.persistence.qakka.serialization.queuemessages; + +import org.apache.usergrid.persistence.core.migration.schema.Migration; + + +public interface MessageCounterSerialization extends Migration { + + void incrementCounter(String queueName, DatabaseQueueMessage.Type type, long increment); + + void decrementCounter(String queueName, DatabaseQueueMessage.Type type, long increment); + + long getCounterValue(String name, DatabaseQueueMessage.Type type); +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/QueueMessageSerialization.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/QueueMessageSerialization.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/QueueMessageSerialization.java new file mode 100644 index 0000000..3ebe735 --- /dev/null +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/QueueMessageSerialization.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.qakka.serialization.queuemessages; + +import org.apache.usergrid.persistence.core.migration.schema.Migration; + +import java.util.UUID; + + +public interface QueueMessageSerialization extends Migration { + + /** + * Write message to storage.. + * If queueMessageId or createdTime are null, then values will be generated. + */ + UUID writeMessage(final DatabaseQueueMessage message); + + DatabaseQueueMessage loadMessage( + final String queueName, + final String region, + final Long shardIdOrNull, + final DatabaseQueueMessage.Type type, + final UUID queueMessageId); + + void deleteMessage( + final String queueName, + final String region, + final Long shardIdOrNull, + final DatabaseQueueMessage.Type type, + final UUID queueMessageId); + + void writeMessageData(final UUID messageId, final DatabaseQueueMessageBody messageBody); + + DatabaseQueueMessageBody loadMessageData(final UUID messageId); + + void deleteMessageData(final UUID messageId); +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java new file mode 100644 index 0000000..65ffc47 --- /dev/null +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.usergrid.persistence.qakka.serialization.queuemessages.impl; + +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Row; +import com.datastax.driver.core.Statement; +import com.datastax.driver.core.querybuilder.QueryBuilder; +import com.google.inject.Inject; +import com.google.inject.Singleton; +import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamilyDefinition; +import org.apache.usergrid.persistence.core.datastax.TableDefinition; +import org.apache.usergrid.persistence.core.datastax.impl.TableDefinitionStringImpl; +import org.apache.usergrid.persistence.qakka.QakkaFig; +import org.apache.usergrid.persistence.qakka.core.CassandraClient; +import org.apache.usergrid.persistence.qakka.exceptions.NotFoundException; +import org.apache.usergrid.persistence.qakka.exceptions.QakkaRuntimeException; +import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard; +import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardCounterSerialization; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.text.MessageFormat; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; + + +@Singleton +public class MessageCounterSerializationImpl implements ShardCounterSerialization { + private static final Logger logger = LoggerFactory.getLogger( MessageCounterSerializationImpl.class ); + + private final CassandraClient cassandraClient; + + final static String TABLE_SHARD_COUNTERS = "counters"; + final static String COLUMN_QUEUE_NAME = "queue_name"; + final static String COLUMN_SHARD_ID = "shard_id"; + final static String COLUMN_COUNTER_VALUE = "counter_value"; + final static String COLUMN_SHARD_TYPE = "shard_type"; + + // design note: counters based on DataStax example here: + // https://docs.datastax.com/en/cql/3.1/cql/cql_using/use_counter_t.html + + static final String CQL = + "CREATE TABLE IF NOT EXISTS shard_counters ( " + + "counter_value counter, " + + "queue_name varchar, " + + "shard_type varchar, " + + "shard_id bigint, " + + "PRIMARY KEY (queue_name, shard_type, shard_id) " + + ");"; + + + final long maxInMemoryIncrement; + + class InMemoryCount { + long baseCount; + final AtomicLong increment = new AtomicLong( 0L ); + InMemoryCount( long baseCount ) { + this.baseCount = baseCount; + } + public long value() { + return baseCount + increment.get(); + } + public AtomicLong getIncrement() { + return increment; + } + void setBaseCount( long baseCount ) { + this.baseCount = baseCount; + } + } + + private Map<String, InMemoryCount> inMemoryCounters = new ConcurrentHashMap<>(200); + + + @Inject + public MessageCounterSerializationImpl(QakkaFig qakkaFig, CassandraClient cassandraClient ) { + this.maxInMemoryIncrement = qakkaFig.getMaxInMemoryShardCounter(); + this.cassandraClient = cassandraClient; + } + + + @Override + public void incrementCounter(String queueName, Shard.Type type, long shardId, long increment ) { + + String key = queueName + type + shardId; + synchronized ( inMemoryCounters ) { + + if ( inMemoryCounters.get( key ) == null ) { + + Long value = retrieveCounterFromStorage( queueName, type, shardId ); + + if ( value == null ) { + incrementCounterInStorage( queueName, type, shardId, 0L ); + inMemoryCounters.put( key, new InMemoryCount( 0L )); + } else { + inMemoryCounters.put( key, new InMemoryCount( value )); + } + inMemoryCounters.get( key ).getIncrement().addAndGet( increment ); + return; + } + } + + InMemoryCount inMemoryCount = inMemoryCounters.get( key ); + + synchronized ( inMemoryCount ) { + long totalIncrement = inMemoryCount.getIncrement().addAndGet( increment ); + + if (totalIncrement > maxInMemoryIncrement) { + incrementCounterInStorage( queueName, type, shardId, totalIncrement ); + inMemoryCount.setBaseCount( retrieveCounterFromStorage( queueName, type, shardId ) ); + inMemoryCount.getIncrement().set( 0L ); + } + } + + } + + + @Override + public long getCounterValue( String queueName, Shard.Type type, long shardId ) { + + String key = queueName + type + shardId; + + synchronized ( inMemoryCounters ) { + + if ( inMemoryCounters.get( key ) == null ) { + + Long value = retrieveCounterFromStorage( queueName, type, shardId ); + + if ( value == null ) { + throw new NotFoundException( + MessageFormat.format( "No counter found for queue {0} type {1} shardId {2}", + queueName, type, shardId )); + } else { + inMemoryCounters.put( key, new InMemoryCount( value )); + } + } + } + + return inMemoryCounters.get( key ).value(); + } + + void incrementCounterInStorage( String queueName, Shard.Type type, long shardId, long increment ) { + + Statement update = QueryBuilder.update( TABLE_SHARD_COUNTERS ) + .where( QueryBuilder.eq( COLUMN_QUEUE_NAME, queueName ) ) + .and( QueryBuilder.eq( COLUMN_SHARD_TYPE, type.toString() ) ) + .and( QueryBuilder.eq( COLUMN_SHARD_ID, shardId ) ) + .with( QueryBuilder.incr( COLUMN_COUNTER_VALUE, increment ) ); + cassandraClient.getSession().execute( update ); + } + + + Long retrieveCounterFromStorage( String queueName, Shard.Type type, long shardId ) { + + Statement query = QueryBuilder.select().from( TABLE_SHARD_COUNTERS ) + .where( QueryBuilder.eq( COLUMN_QUEUE_NAME, queueName ) ) + .and( QueryBuilder.eq( COLUMN_SHARD_TYPE, type.toString()) ) + .and( QueryBuilder.eq( COLUMN_SHARD_ID, shardId ) ); + + ResultSet resultSet = cassandraClient.getSession().execute( query ); + List<Row> all = resultSet.all(); + + if ( all.size() > 1 ) { + throw new QakkaRuntimeException( + "Multiple rows for counter " + queueName + " type " + type + " shardId " + shardId ); + } + if ( all.isEmpty() ) { + return null; + } + return all.get(0).getLong( COLUMN_COUNTER_VALUE ); + } + + + @Override + public Collection<MultiTenantColumnFamilyDefinition> getColumnFamilies() { + return Collections.EMPTY_LIST; + } + + @Override + public Collection<TableDefinition> getTables() { + return Collections.singletonList( new TableDefinitionStringImpl( TABLE_SHARD_COUNTERS, CQL ) ); + } + +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java new file mode 100644 index 0000000..f55b936 --- /dev/null +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java @@ -0,0 +1,320 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.qakka.serialization.queuemessages.impl; + +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Row; +import com.datastax.driver.core.Statement; +import com.datastax.driver.core.querybuilder.Clause; +import com.datastax.driver.core.querybuilder.QueryBuilder; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.inject.Inject; +import org.apache.usergrid.persistence.actorsystem.ActorSystemFig; +import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamilyDefinition; +import org.apache.usergrid.persistence.core.datastax.TableDefinition; +import org.apache.usergrid.persistence.core.datastax.impl.TableDefinitionStringImpl; +import org.apache.usergrid.persistence.qakka.core.CassandraClient; +import org.apache.usergrid.persistence.qakka.core.QakkaUtils; +import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage; +import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessageBody; +import org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization; +import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard; +import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardCounterSerialization; +import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardStrategy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; +import java.util.Collections; +import java.util.UUID; + + +public class QueueMessageSerializationImpl implements QueueMessageSerialization { + + private static final Logger logger = LoggerFactory.getLogger( QueueMessageSerializationImpl.class ); + + private final CassandraClient cassandraClient; + + private final ActorSystemFig actorSystemFig; + private final ShardStrategy shardStrategy; + private final ShardCounterSerialization shardCounterSerialization; + + public final static String COLUMN_QUEUE_NAME = "queue_name"; + public final static String COLUMN_REGION = "region"; + public final static String COLUMN_SHARD_ID = "shard_id"; + public final static String COLUMN_QUEUED_AT = "queued_at"; + public final static String COLUMN_INFLIGHT_AT = "inflight_at"; + public final static String COLUMN_QUEUE_MESSAGE_ID = "queue_message_id"; + public final static String COLUMN_MESSAGE_ID = "message_id"; + public final static String COLUMN_CONTENT_TYPE = "content_type"; + public final static String COLUMN_MESSAGE_DATA = "data"; + + public final static String TABLE_MESSAGES_AVAILABLE = "messages_available"; + + public final static String TABLE_MESSAGES_INFLIGHT = "messages_inflight"; + + public final static String TABLE_MESSAGE_DATA = "message_data"; + + static final String MESSAGES_AVAILABLE = + "CREATE TABLE IF NOT EXISTS messages_available ( " + + "queue_name text, " + + "region text, " + + "shard_id bigint, " + + "queue_message_id timeuuid, " + + "message_id uuid, " + + "queued_at bigint, " + + "inflight_at bigint, " + + "PRIMARY KEY ((queue_name, region, shard_id), queue_message_id ) " + + ") WITH CLUSTERING ORDER BY (queue_message_id ASC); "; + + static final String MESSAGES_INFLIGHT = + "CREATE TABLE IF NOT EXISTS messages_inflight ( " + + "queue_name text, " + + "region text, " + + "shard_id bigint, " + + "queue_message_id timeuuid, " + + "message_id uuid, " + + "queued_at bigint, " + + "inflight_at bigint, " + + "PRIMARY KEY ((queue_name, region, shard_id), queue_message_id ) " + + ") WITH CLUSTERING ORDER BY (queue_message_id ASC); "; + + static final String MESSAGE_DATA = + "CREATE TABLE IF NOT EXISTS message_data ( " + + "message_id uuid, " + + "data blob, " + + "content_type text, " + + "PRIMARY KEY ((message_id)) " + + "); "; + + @Inject + public QueueMessageSerializationImpl( + ActorSystemFig actorSystemFig, + ShardStrategy shardStrategy, + ShardCounterSerialization shardCounterSerialization, + CassandraClient cassandraClient + ) { + this.actorSystemFig = actorSystemFig; + this.shardStrategy = shardStrategy; + this.shardCounterSerialization = shardCounterSerialization; + this.cassandraClient = cassandraClient; + } + + + @Override + public UUID writeMessage(final DatabaseQueueMessage message) { + + final UUID queueMessageId = message.getQueueMessageId() == null ? + QakkaUtils.getTimeUuid() : message.getQueueMessageId(); + + long queuedAt = message.getQueuedAt() == null ? + System.currentTimeMillis() : message.getQueuedAt(); + + long inflightAt = message.getInflightAt() == null ? + message.getQueuedAt() : message.getInflightAt(); + + Shard.Type shardType = DatabaseQueueMessage.Type.DEFAULT.equals( message.getType() ) ? + Shard.Type.DEFAULT : Shard.Type.INFLIGHT; + + if ( message.getShardId() == null ) { + Shard shard = shardStrategy.selectShard( + message.getQueueName(), actorSystemFig.getRegionLocal(), shardType, queueMessageId ); + message.setShardId( shard.getShardId() ); + } + + Statement insert = QueryBuilder.insertInto(getTableName(message.getType())) + .value( COLUMN_QUEUE_NAME, message.getQueueName()) + .value( COLUMN_REGION, message.getRegion()) + .value( COLUMN_SHARD_ID, message.getShardId()) + .value( COLUMN_MESSAGE_ID, message.getMessageId()) + .value( COLUMN_QUEUE_MESSAGE_ID, queueMessageId) + .value( COLUMN_INFLIGHT_AT, inflightAt ) + .value( COLUMN_QUEUED_AT, queuedAt); + + cassandraClient.getSession().execute(insert); + + shardCounterSerialization.incrementCounter( message.getQueueName(), shardType, message.getShardId(), 1 ); + + return queueMessageId; + } + + + @Override + public DatabaseQueueMessage loadMessage( + final String queueName, + final String region, + final Long shardIdOrNull, + final DatabaseQueueMessage.Type type, + final UUID queueMessageId ) { + + if ( queueMessageId == null ) { + return null; + } + + final long shardId; + if ( shardIdOrNull == null ) { + Shard.Type shardType = DatabaseQueueMessage.Type.DEFAULT.equals( type ) ? + Shard.Type.DEFAULT : Shard.Type.INFLIGHT; + Shard shard = shardStrategy.selectShard( + queueName, actorSystemFig.getRegionLocal(), shardType, queueMessageId ); + shardId = shard.getShardId(); + } else { + shardId = shardIdOrNull; + } + + Clause queueNameClause = QueryBuilder.eq( COLUMN_QUEUE_NAME, queueName ); + Clause regionClause = QueryBuilder.eq( COLUMN_REGION, region ); + Clause shardIdClause = QueryBuilder.eq( COLUMN_SHARD_ID, shardId ); + Clause queueMessageIdClause = QueryBuilder.eq( COLUMN_QUEUE_MESSAGE_ID, queueMessageId); + + Statement select = QueryBuilder.select().from(getTableName( type )) + .where(queueNameClause) + .and(regionClause) + .and(shardIdClause) + .and(queueMessageIdClause); + + Row row = cassandraClient.getSession().execute(select).one(); + + if (row == null) { + return null; + } + + return new DatabaseQueueMessage( + row.getUUID( COLUMN_MESSAGE_ID), + type, + row.getString( COLUMN_QUEUE_NAME), + row.getString( COLUMN_REGION), + row.getLong( COLUMN_SHARD_ID), + row.getLong( COLUMN_QUEUED_AT), + row.getLong( COLUMN_INFLIGHT_AT), + row.getUUID( COLUMN_QUEUE_MESSAGE_ID) + ); + } + + + @Override + public void deleteMessage( + final String queueName, + final String region, + final Long shardIdOrNull, + final DatabaseQueueMessage.Type type, + final UUID queueMessageId ) { + + final long shardId; + if ( shardIdOrNull == null ) { + Shard.Type shardType = DatabaseQueueMessage.Type.DEFAULT.equals( type ) ? + Shard.Type.DEFAULT : Shard.Type.INFLIGHT; + Shard shard = shardStrategy.selectShard( + queueName, actorSystemFig.getRegionLocal(), shardType, queueMessageId ); + shardId = shard.getShardId(); + } else { + shardId = shardIdOrNull; + } + + Clause queueNameClause = QueryBuilder.eq( COLUMN_QUEUE_NAME, queueName ); + Clause regionClause = QueryBuilder.eq( COLUMN_REGION, region ); + Clause shardIdClause = QueryBuilder.eq( COLUMN_SHARD_ID, shardId ); + Clause queueMessageIdClause = QueryBuilder.eq( COLUMN_QUEUE_MESSAGE_ID, queueMessageId); + + Statement delete = QueryBuilder.delete().from(getTableName( type )) + .where(queueNameClause) + .and(regionClause) + .and(shardIdClause) + .and(queueMessageIdClause); + + ResultSet resultSet = cassandraClient.getSession().execute( delete ); + + String s = "s"; + } + + + @Override + public DatabaseQueueMessageBody loadMessageData(final UUID messageId ){ + + Clause messageIdClause = QueryBuilder.eq( COLUMN_MESSAGE_ID, messageId ); + + Statement select = QueryBuilder.select().from( TABLE_MESSAGE_DATA).where(messageIdClause); + + Row row = cassandraClient.getSession().execute(select).one(); + if ( row == null ) { + return null; + } + + return new DatabaseQueueMessageBody( + row.getBytes( COLUMN_MESSAGE_DATA), + row.getString( COLUMN_CONTENT_TYPE)); + } + + + @Override + public void writeMessageData( final UUID messageId, final DatabaseQueueMessageBody messageBody ) { + Preconditions.checkArgument(QakkaUtils.isTimeUuid(messageId), "MessageId is not a type 1 UUID"); + + Statement insert = QueryBuilder.insertInto(TABLE_MESSAGE_DATA) + .value( COLUMN_MESSAGE_ID, messageId) + .value( COLUMN_MESSAGE_DATA, messageBody.getBlob()) + .value( COLUMN_CONTENT_TYPE, messageBody.getContentType()); + + cassandraClient.getSession().execute(insert); + } + + + @Override + public void deleteMessageData( final UUID messageId ) { + + Clause messageIdClause = QueryBuilder.eq(COLUMN_MESSAGE_ID, messageId); + + Statement delete = QueryBuilder.delete().from(TABLE_MESSAGE_DATA) + .where(messageIdClause); + + cassandraClient.getSession().execute(delete); + } + + + public static String getTableName(DatabaseQueueMessage.Type messageType){ + + String table; + if( messageType.equals(DatabaseQueueMessage.Type.DEFAULT)) { + table = TABLE_MESSAGES_AVAILABLE; + }else if (messageType.equals(DatabaseQueueMessage.Type.INFLIGHT)) { + table = TABLE_MESSAGES_INFLIGHT; + }else{ + throw new IllegalArgumentException("Unknown DatabaseQueueMessage Type"); + } + + return table; + } + + @Override + public Collection<MultiTenantColumnFamilyDefinition> getColumnFamilies() { + return Collections.EMPTY_LIST; + } + + @Override + public Collection<TableDefinition> getTables() { + return Lists.newArrayList( + new TableDefinitionStringImpl( TABLE_MESSAGES_AVAILABLE, MESSAGES_AVAILABLE ), + new TableDefinitionStringImpl( TABLE_MESSAGES_INFLIGHT, MESSAGES_INFLIGHT ), + new TableDefinitionStringImpl( TABLE_MESSAGE_DATA, MESSAGE_DATA ) + ); + } + +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queues/DatabaseQueue.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queues/DatabaseQueue.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queues/DatabaseQueue.java new file mode 100644 index 0000000..dcf273a --- /dev/null +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queues/DatabaseQueue.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.qakka.serialization.queues; + +public class DatabaseQueue { + + private final String name; + private final String regions; + + private String defaultDestinations; + private Long defaultDelayMs; + private Integer retryCount; + private Integer handlingTimeoutSec; + private String deadLetterQueue; + + + public DatabaseQueue(final String name, + final String regions, + final String defaultDestinations, + final Long defaultDelayMs, + final Integer retryCount, + final Integer handlingTimeoutSec, + final String deadLetterQueue ){ + + this.name = name; + this.regions = regions; + this.defaultDestinations = defaultDestinations; + this.defaultDelayMs = defaultDelayMs; + this.retryCount = retryCount; + this.handlingTimeoutSec = handlingTimeoutSec; + this.deadLetterQueue = deadLetterQueue; + + } + + public String getName() { + return name; + } + + public String getDeadLetterQueue() { + return deadLetterQueue; + } + + public Integer getHandlingTimeoutSec() { + return handlingTimeoutSec; + } + + public Integer getRetryCount() { + return retryCount; + } + + public Long getDefaultDelayMs() { + return defaultDelayMs; + } + + public String getDefaultDestinations() { + return defaultDestinations; + } + + public String getRegions() { + return regions; + } + + @Override + public int hashCode() { + int result = name.hashCode(); + result = ( 31 * result ) + regions.hashCode(); + + return result; + } + + @Override + public boolean equals(Object obj) { + + if( this == obj){ + return true; + } + + if( !(obj instanceof DatabaseQueue)){ + return false; + } + + DatabaseQueue that = (DatabaseQueue) obj; + + if( !this.name.equalsIgnoreCase(that.name)){ + return false; + } + if( !this.regions.equals(that.regions)){ + return false; + } + + return true; + + } + + + +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queues/QueueSerialization.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queues/QueueSerialization.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queues/QueueSerialization.java new file mode 100644 index 0000000..b00d269 --- /dev/null +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queues/QueueSerialization.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.qakka.serialization.queues; + +import org.apache.usergrid.persistence.core.migration.schema.Migration; + +import java.util.List; + + +public interface QueueSerialization extends Migration { + + void writeQueue(DatabaseQueue queue); + + DatabaseQueue getQueue(String name); + + void deleteQueue(String name); + + List<String> getListOfQueues(); +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queues/impl/QueueSerializationImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queues/impl/QueueSerializationImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queues/impl/QueueSerializationImpl.java new file mode 100644 index 0000000..932097a --- /dev/null +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queues/impl/QueueSerializationImpl.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.qakka.serialization.queues.impl; + + +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Row; +import com.datastax.driver.core.Statement; +import com.datastax.driver.core.querybuilder.Clause; +import com.datastax.driver.core.querybuilder.QueryBuilder; +import com.google.inject.Inject; +import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamilyDefinition; +import org.apache.usergrid.persistence.core.datastax.TableDefinition; +import org.apache.usergrid.persistence.core.datastax.impl.TableDefinitionStringImpl; +import org.apache.usergrid.persistence.qakka.core.CassandraClient; +import org.apache.usergrid.persistence.qakka.serialization.queuemessages.impl.QueueMessageSerializationImpl; +import org.apache.usergrid.persistence.qakka.serialization.queues.DatabaseQueue; +import org.apache.usergrid.persistence.qakka.serialization.queues.QueueSerialization; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +public class QueueSerializationImpl implements QueueSerialization { + + private static final Logger logger = LoggerFactory.getLogger( QueueMessageSerializationImpl.class ); + + private final CassandraClient cassandraClient; + + public final static String COLUMN_QUEUE_NAME = "queue_name"; + public final static String COLUMN_REGIONS = "regions"; + public final static String COLUMN_DEFAULT_DESTINATIONS = "default_destinations"; + public final static String COLUMN_DEFAULT_DELAY_MS = "default_delay_ms"; + public final static String COLUMN_RETRY_COUNT = "retry_count"; + public final static String COLUMN_HANDLING_TIMEOUT_SEC = "handling_timeout_sec"; + public final static String COLUMN_DEAD_LETTER_QUEUE = "dead_letter_queue"; + + + public final static String TABLE_QUEUES = "queues"; + + static final String CQL = + "CREATE TABLE IF NOT EXISTS queues ( " + + "queue_name text, " + + "regions text, " + + "default_destinations text, " + + "default_delay_ms bigint, " + + "retry_count int, " + + "handling_timeout_sec int, " + + "dead_letter_queue text, " + + "PRIMARY KEY ((queue_name)) " + + "); "; + + + @Inject + public QueueSerializationImpl( CassandraClient cassandraClient ) { + this.cassandraClient = cassandraClient; + } + + + @Override + public void writeQueue(DatabaseQueue queue) { + + Statement insert = QueryBuilder.insertInto(TABLE_QUEUES) + .value(COLUMN_QUEUE_NAME, queue.getName()) + .value(COLUMN_REGIONS, queue.getRegions()) + .value(COLUMN_DEFAULT_DESTINATIONS, queue.getDefaultDestinations()) + .value(COLUMN_DEFAULT_DELAY_MS, queue.getDefaultDelayMs()) + .value(COLUMN_RETRY_COUNT, queue.getRetryCount()) + .value(COLUMN_HANDLING_TIMEOUT_SEC, queue.getHandlingTimeoutSec()) + .value(COLUMN_DEAD_LETTER_QUEUE, queue.getDeadLetterQueue()); + + + cassandraClient.getSession().execute(insert); + + } + + @Override + public DatabaseQueue getQueue(String name) { + + Clause queueNameClause = QueryBuilder.eq(COLUMN_QUEUE_NAME, name); + + Statement query = QueryBuilder.select().all().from(TABLE_QUEUES) + .where(queueNameClause); + + Row row = cassandraClient.getSession().execute(query).one(); + + if(row == null){ + return null; + } + + final String queueName = row.getString(COLUMN_QUEUE_NAME); + final String regions = row.getString(COLUMN_REGIONS); + final String defaultDestinations = row.getString(COLUMN_DEFAULT_DESTINATIONS); + final long defaultDelayMs = row.getLong(COLUMN_DEFAULT_DELAY_MS); + final int retryCount = row.getInt(COLUMN_RETRY_COUNT); + final int handlingTimeoutSec = row.getInt(COLUMN_HANDLING_TIMEOUT_SEC); + final String deadLetterQueue = row.getString(COLUMN_DEAD_LETTER_QUEUE); + + return new DatabaseQueue( queueName, regions, defaultDestinations, defaultDelayMs, retryCount, + handlingTimeoutSec, deadLetterQueue); + + } + + @Override + public void deleteQueue(String name) { + + Clause queueNameClause = QueryBuilder.eq(COLUMN_QUEUE_NAME, name); + + Statement delete = QueryBuilder.delete().from(TABLE_QUEUES) + .where(queueNameClause); + + cassandraClient.getSession().execute(delete); + } + + @Override + public List<String> getListOfQueues() { + + Statement select = QueryBuilder.select().all().from( TABLE_QUEUES ); + ResultSet rs = cassandraClient.getSession().execute( select ); + + return rs.all().stream() + .map( row -> row.getString( COLUMN_QUEUE_NAME )) + .collect( Collectors.toList() ); + } + + + @Override + public Collection<MultiTenantColumnFamilyDefinition> getColumnFamilies() { + return Collections.EMPTY_LIST; + } + + @Override + public Collection<TableDefinition> getTables() { + return Collections.singletonList( new TableDefinitionStringImpl( "queues", CQL ) ); + } + +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/Shard.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/Shard.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/Shard.java new file mode 100644 index 0000000..20c802d --- /dev/null +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/Shard.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.qakka.serialization.sharding; + + +import java.util.UUID; + +public class Shard { + + public enum Type { + + DEFAULT, INFLIGHT + } + + private String queueName; + private String region; + private long shardId; + private Type type; + private UUID pointer; + + public Shard(final String queueName, final String region, final Type type, final long shardId, UUID pointer){ + + this.queueName = queueName; + this.region = region; + this.type = type; + this.shardId = shardId; + this.pointer = pointer; + + } + + public String getQueueName() { + return queueName; + } + + public String getRegion() { + return region; + } + + public long getShardId() { + return shardId; + } + + public Type getType() { + return type; + } + + public UUID getPointer() { + return pointer; + } + + public void setPointer(UUID pointer) { + this.pointer = pointer; + } + + @Override + public int hashCode() { + int result = queueName.hashCode(); + result = ( 31 * result ) + region.hashCode(); + result = ( 31 * result ) + (int)shardId; + result = ( 31 * result ) + type.hashCode(); + return result; + } + + @Override + public boolean equals(Object obj) { + + if( this == obj){ + return true; + } + + if( !(obj instanceof Shard)){ + return false; + } + + Shard that = (Shard) obj; + + if( !this.queueName.equalsIgnoreCase(that.queueName)){ + return false; + } + if( !this.region.equalsIgnoreCase(that.region)){ + return false; + } + if( this.shardId != that.shardId){ + return false; + } + if( !this.type.equals(that.type)){ + return false; + } + + return true; + + } + +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardCounterSerialization.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardCounterSerialization.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardCounterSerialization.java new file mode 100644 index 0000000..c29c548 --- /dev/null +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardCounterSerialization.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.usergrid.persistence.qakka.serialization.sharding; + +import org.apache.usergrid.persistence.core.migration.schema.Migration; + + +public interface ShardCounterSerialization extends Migration { + + void incrementCounter(String queueName, Shard.Type type, long shardId, long increment); + + long getCounterValue(String name, Shard.Type type, long shardId); +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardIterator.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardIterator.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardIterator.java new file mode 100644 index 0000000..31e31ce --- /dev/null +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardIterator.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.qakka.serialization.sharding; + +import com.datastax.driver.core.Row; +import com.datastax.driver.core.Statement; +import com.datastax.driver.core.querybuilder.Clause; +import com.datastax.driver.core.querybuilder.QueryBuilder; +import org.apache.usergrid.persistence.qakka.core.CassandraClient; +import org.apache.usergrid.persistence.qakka.serialization.sharding.impl.ShardSerializationImpl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; + +public class ShardIterator implements Iterator<Shard> { + + private static final Logger logger = LoggerFactory.getLogger( ShardIterator.class ); + + private final CassandraClient cassandraClient; + + private final int PAGE_SIZE = 100; + private final String queueName; + private final String region; + private final Shard.Type shardType; + private final Optional<Long> shardId; + + private Iterator<Shard> currentIterator; + + private long nextStart = 0L; + + + public ShardIterator( + final CassandraClient cassandraClient, + final String queueName, + final String region, + final Shard.Type shardtype, + final Optional<Long> lastShardId){ + + this.queueName = queueName; + this.region = region; + this.shardType = shardtype; + this.shardId = lastShardId.isPresent() ? lastShardId : Optional.of(0L); + this.cassandraClient = cassandraClient; + } + + @Override + public boolean hasNext() { + + if(currentIterator == null || !currentIterator.hasNext()){ + advance(); + } + + return currentIterator.hasNext(); + + } + + @Override + public Shard next() { + + if ( !hasNext() ) { + throw new NoSuchElementException( "No next shard exists" ); + } + + return currentIterator.next(); + + } + + private void advance(){ + + + Clause queueNameClause = QueryBuilder.eq( ShardSerializationImpl.COLUMN_QUEUE_NAME, queueName); + Clause regionClause = QueryBuilder.eq( ShardSerializationImpl.COLUMN_REGION, region); + Clause activeClause = QueryBuilder.eq( ShardSerializationImpl.COLUMN_ACTIVE, 1); + Clause shardIdClause; + if(nextStart == 0L && shardId.isPresent()){ + shardIdClause = QueryBuilder.gt( ShardSerializationImpl.COLUMN_SHARD_ID, shardId.get()); + }else if( nextStart == 0L && !shardId.isPresent()){ + shardIdClause = QueryBuilder.gte( ShardSerializationImpl.COLUMN_SHARD_ID, 0L); + + }else{ + shardIdClause = QueryBuilder.gt( ShardSerializationImpl.COLUMN_SHARD_ID, nextStart); + } + + + + Statement query = QueryBuilder.select().all().from(ShardSerializationImpl.getTableName(shardType)) + .where(queueNameClause) + .and(regionClause) + .and(activeClause) + .and(shardIdClause) + .limit(PAGE_SIZE); + + List<Row> rows = cassandraClient.getSession().execute(query).all(); + + + currentIterator = getIteratorFromRows(rows); + + + } + + + private Iterator<Shard> getIteratorFromRows( List<Row> rows){ + + List<Shard> shards = new ArrayList<>(rows.size()); + + rows.forEach(row -> { + + final String queueName = row.getString( ShardSerializationImpl.COLUMN_QUEUE_NAME); + final String region = row.getString( ShardSerializationImpl.COLUMN_REGION); + final long shardId = row.getLong( ShardSerializationImpl.COLUMN_SHARD_ID); + final UUID pointer = row.getUUID( ShardSerializationImpl.COLUMN_POINTER); + + shards.add(new Shard(queueName, region, shardType, shardId, pointer)); + + nextStart = shardId; + + }); + + return shards.iterator(); + + } + + +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardSerialization.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardSerialization.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardSerialization.java new file mode 100644 index 0000000..c0173ab --- /dev/null +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardSerialization.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.qakka.serialization.sharding; + +import org.apache.usergrid.persistence.core.migration.schema.Migration; + + +public interface ShardSerialization extends Migration { + + void createShard(final Shard shard); + + Shard loadShard(final Shard shard); + + void deleteShard(final Shard shard); + + void updateShardPointer(final Shard shard); + +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardStrategy.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardStrategy.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardStrategy.java new file mode 100644 index 0000000..013f0b6 --- /dev/null +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardStrategy.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.qakka.serialization.sharding; + +import java.util.UUID; + + +public interface ShardStrategy { + + /** + * Select shard that should be used for the specified Queue Message. + * @param queueName Name of queue + * @param region Region + * @param type Indicates whether message is inflight or available + * @param pointer Queue Message ID (must be Time-based) + */ + Shard selectShard(String queueName, String region, Shard.Type type, UUID pointer); +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/PlaceholderShardStrategy.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/PlaceholderShardStrategy.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/PlaceholderShardStrategy.java new file mode 100644 index 0000000..3ca79e6 --- /dev/null +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/PlaceholderShardStrategy.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.qakka.serialization.sharding.impl; + + +import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard; +import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardStrategy; + +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + + +// TODO: delete me! + +public class PlaceholderShardStrategy implements ShardStrategy { + + private Map<String, Shard> shardMap = new HashMap<>(); + + + @Override + public Shard selectShard(String queueName, String region, Shard.Type type, UUID pointer) { + String key = queueName + region + type; + shardMap.putIfAbsent( key, new Shard( queueName, region, type, 0L, pointer ) ); + return shardMap.get( key ); + } +}
