http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/QakkaUtils.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/QakkaUtils.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/QakkaUtils.java new file mode 100644 index 0000000..9fe5c6e --- /dev/null +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/QakkaUtils.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.qakka.core; + +import com.datastax.driver.core.utils.UUIDs; + +import java.util.UUID; + +public class QakkaUtils { + + public static UUID getTimeUuid() { + return UUIDs.timeBased(); + } + + public static Boolean isTimeUuid(UUID uuid) { + return uuid.version() == 1; + } + + public static Boolean isNullOrEmpty(String s) { + return (s == null || s.equals("")); + } + + public static UUID getTimeUUID(long when) { + return UUIDs.startOf( when ); + } + +}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/Queue.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/Queue.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/Queue.java new file mode 100644 index 0000000..e3d2790 --- /dev/null +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/Queue.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.qakka.core; + +import org.apache.usergrid.persistence.qakka.serialization.queues.DatabaseQueue; + +import javax.xml.bind.annotation.XmlRootElement; + +@XmlRootElement +public class Queue { + + // will eventually control these via properties file + private static final Integer defaultRetryCount = 3; + private static final Integer defaultHandlingTimeoutSec = 30; + private static final String defaultDeadLetterQueueExtension = "_DLQ"; + + private String name; + private String queueType; + private String regions; + private String defaultDestinations; + private Long defaultDelayMs; + private Integer retryCount; + private Integer handlingTimeoutSec; + private String deadLetterQueue; + + public Queue() {} // Jackson needs no-arg ctor + + public Queue(String name, String queueType, String regions, String defaultDestinations, Long defaultDelayMs, + Integer retryCount, Integer handlingTimeoutSec, String deadLetterQueue) { + this.name = name; + this.queueType = queueType; + this.regions = regions; + this.defaultDestinations = defaultDestinations; + this.defaultDelayMs = defaultDelayMs; + this.retryCount = retryCount; + this.handlingTimeoutSec = handlingTimeoutSec; + this.deadLetterQueue = deadLetterQueue; + } + + public Queue(String name, String queueType, String regions, String defaultDestinations, Long defaultDelayMs) { + this(name, queueType, regions, defaultDestinations, defaultDelayMs, defaultRetryCount, + defaultHandlingTimeoutSec, name + defaultDeadLetterQueueExtension); + } + + public Queue(String name, String queueType, String regions, String defaultDestinations) { + this(name, queueType, regions, defaultDestinations, 0L, defaultRetryCount, + defaultHandlingTimeoutSec, name + defaultDeadLetterQueueExtension); + } + + public Queue(String name) { + this(name, QueueType.MULTIREGION, Regions.LOCAL, Regions.LOCAL, 0L, defaultRetryCount, + defaultHandlingTimeoutSec, name + defaultDeadLetterQueueExtension); + } + + public Queue(DatabaseQueue databaseQueue) { + this( databaseQueue.getName(), + QueueType.MULTIREGION, + databaseQueue.getRegions(), + databaseQueue.getDefaultDestinations(), + databaseQueue.getDefaultDelayMs(), + databaseQueue.getRetryCount(), + databaseQueue.getHandlingTimeoutSec(), + databaseQueue.getDeadLetterQueue()); + } + + public String getName() { + return name; + } + public void setName(String name) { + this.name = name; + } + + public String getQueueType() { + return queueType; + } + + public String getRegions() { + return regions; + } + public void setRegions(String regions) { + this.regions = regions; + } + + public String getDefaultDestinations() { + return defaultDestinations; + } + public void setDefaultDestinations(String defaultDestinations) { + this.defaultDestinations = defaultDestinations; + } + + public Long getDefaultDelayMs() { + return defaultDelayMs; + } + public void setDefaultDelayMs(Long defaultDelayMs) { + this.defaultDelayMs = defaultDelayMs; + } + + public Integer getRetryCount() { + return retryCount; + } + public void setRetryCount(Integer retryCount) { + this.retryCount = retryCount; + } + + public Integer getHandlingTimeoutSec() { + return handlingTimeoutSec; + } + public void setHandlingTimeoutSec(Integer handlingTimeoutSec) { + this.handlingTimeoutSec = handlingTimeoutSec; + } + + public String getDeadLetterQueue() { + return deadLetterQueue; + } + public void setDeadLetterQueue(String deadLetterQueue) { + this.deadLetterQueue = deadLetterQueue; + } + + public DatabaseQueue toDatabaseQueue() { + return new DatabaseQueue( + name, + regions, + defaultDestinations, + defaultDelayMs, + retryCount, + handlingTimeoutSec, + deadLetterQueue); + } +} + http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/QueueManager.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/QueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/QueueManager.java new file mode 100644 index 0000000..478fa12 --- /dev/null +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/QueueManager.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.qakka.core; + +import java.util.List; + +public interface QueueManager { + + void createQueue(Queue queue); + + void updateQueueConfig(Queue queue); + + void deleteQueue(String queueName); + + Queue getQueueConfig(String queueName); + + List<String> getListOfQueues(); +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/QueueMessage.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/QueueMessage.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/QueueMessage.java new file mode 100644 index 0000000..e79a241 --- /dev/null +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/QueueMessage.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.qakka.core; + +import javax.xml.bind.annotation.XmlRootElement; +import java.io.Serializable; +import java.util.UUID; + +@XmlRootElement +public class QueueMessage implements Serializable { + + private UUID queueMessageId; + private UUID messageId; + + private String queueName; + private String sendingRegion; + private String receivingRegion; + + private Long delayUntilDate; + private Long expirationDate; + private Long createDate; + private Long retries; + + private Boolean dataReceived; + + /** MIME content type of data */ + private String contentType; + + /** If contentType is application/json then data will be the JSON payload for this queue message */ + private String data; + + /** If contentType is not then href will be the URL where the payload may be fetched */ + private String href; + + + public QueueMessage() {} // for Jackson + + public QueueMessage( + UUID queueMessageId, String queueName, String sendingRegion, String receivingRegion, UUID messageId, + Long delayUntilDate, Long expirationDate, Long createDate, Long retries, Boolean dataReceived) { + + if (queueMessageId == null) { + this.queueMessageId = QakkaUtils.getTimeUuid(); + } else { + this.queueMessageId = queueMessageId; + } + this.queueName = queueName; + this.sendingRegion = sendingRegion; + this.receivingRegion = receivingRegion; + this.messageId = messageId; + this.delayUntilDate = delayUntilDate; + this.expirationDate = expirationDate; + this.createDate = createDate; + this.retries = retries; + this.dataReceived = dataReceived; + } + + public UUID getQueueMessageId() { + return queueMessageId; + } + + public void setQueueMessageId(UUID queueMessageId) { + this.queueMessageId = queueMessageId; + } + + public String getQueueName() { + return queueName; + } + + public String getSendingRegion() { + return sendingRegion; + } + + public String getReceivingRegion() { + return receivingRegion; + } + + public UUID getMessageId() { + return messageId; + } + + public Long getDelayUntilDate() { + return delayUntilDate; + } + + public Long getDelayUntilMs() { + if ( delayUntilDate == null ) { + return null; + } + return delayUntilDate - System.currentTimeMillis(); + } + + public void setDelayUntilDate(Long delayUntilDate) { + this.delayUntilDate = delayUntilDate; + } + + public void setDelayUntilMs(Long delayMs) { + this.delayUntilDate = System.currentTimeMillis() + delayMs; + } + + public Long getExpirationDate() { + return expirationDate; + } + + public Long getExpirationMs() { + if ( expirationDate == null ) { + return null; + } + return expirationDate - System.currentTimeMillis(); + } + + public void setExpirationDate(Long expirationDate) { + this.expirationDate = expirationDate; + } + + public void setExpirationMs(Long expirationMs) { + this.expirationDate = System.currentTimeMillis() + expirationMs; + } + + public Long getCreateDate() { + return createDate; + } + + public void setCreateDate(Long createDate) { + this.createDate = createDate; + } + + public Long getRetries() { + return retries; + } + + public void setRetries(Long retries) { + this.retries = retries; + } + + public Boolean getDataReceived() { + return dataReceived; + } + + public void setDataReceived(Boolean dataReceived) { + this.dataReceived = dataReceived; + } + + + public String getData() { + return data; + } + + public void setData(String data) { + this.data = data; + } + + public String getContentType() { + return contentType; + } + + public void setContentType(String contentType) { + this.contentType = contentType; + } + + public String getHref() { + return href; + } + + public void setHref(String href) { + this.href = href; + } + +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManager.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManager.java new file mode 100644 index 0000000..15203d8 --- /dev/null +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManager.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.qakka.core; + +import java.nio.ByteBuffer; +import java.util.List; +import java.util.UUID; + + +public interface QueueMessageManager { + + /** + * Send Queue Message to one or more destination regions. + * @param queueName Name of queue + * @param destinationRegions List of destination regions + * @param delayMs Delay before sending queue message + * @param expirationSecs Time before message expires + * @param contentType Content type of message data + * @param messageData Message content + */ + void sendMessages(String queueName, List<String> destinationRegions, + Long delayMs, Long expirationSecs, String contentType, ByteBuffer messageData); + + /** + * Get next available messages from the specified queue. + * + * @param queueName Name of queue + * @param count Number of messages to get + * @return List of next messages, empty if non-available + */ + List<QueueMessage> getNextMessages(String queueName, int count); + + /** + * Acknowledge that message has been received and is no longer inflight. + * + * @param queueName Name of queue + * @param queueMessageId ID of queue message + */ + void ackMessage(String queueName, UUID queueMessageId); + + /** + * Put message back in the queue. + * + * @param queueName Name of the queue + * @param messageId ID of the queue message + * @param delayMs Delay before re-queueing message + */ + void requeueMessage(String queueName, UUID messageId, Long delayMs); + + /** + * Clear all messages from queue + * + * @param queueName Name of queue + */ + void clearMessages(String queueName); + + /** + * Get message payload data. + */ + ByteBuffer getMessageData(UUID messageId); + + /** + * Get message from messages available or messages inflight storage. + */ + QueueMessage getMessage(String queueName, UUID queueMessageId); +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/QueueType.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/QueueType.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/QueueType.java new file mode 100644 index 0000000..f0f59e5 --- /dev/null +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/QueueType.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.qakka.core; + + +public class QueueType { + public static final String LOCAL = "_LOCAL"; + public static final String MULTIREGION = "_MULTIREGION"; +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/Regions.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/Regions.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/Regions.java new file mode 100644 index 0000000..3097e92 --- /dev/null +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/Regions.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.qakka.core; + +import com.google.inject.Inject; +import com.google.inject.Singleton; +import org.apache.usergrid.persistence.actorsystem.ActorSystemFig; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + + +@Singleton +public class Regions { + public static final String LOCAL = "LOCAL"; + public static final String ALL = "ALL"; + public static final String REMOTE = "REMOTE"; + + // load regions from properties + String localRegion; + List<String> regionList; + + + @Inject + public Regions( ActorSystemFig actorSystemFig ) { + localRegion = actorSystemFig.getRegionLocal(); + regionList = Arrays.asList( actorSystemFig.getRegionsList().split(",")); + } + + + public List<String> getRegions(String region) { + List<String> ret = null; + + switch (region) { + case ALL: + ret = new ArrayList<>(regionList); + break; + case LOCAL: + ret = Collections.singletonList(localRegion); + break; + case REMOTE: + ret = new ArrayList<>(regionList); + ret.remove(localRegion); + break; + default: + // parse regions into list -- assume a single region now, but can do region1,region2 + + // validate regions + + ret = Collections.singletonList(region); + break; + } + + return ret; + } + + public String getLocalRegion() { + return localRegion; + } + + public Boolean isValidRegion(String region) { + return regionList.contains(region); + } + +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java new file mode 100644 index 0000000..474ef5c --- /dev/null +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.qakka.core.impl; + +import com.google.inject.Inject; +import com.google.inject.Singleton; +import org.apache.usergrid.persistence.qakka.QakkaFig; +import org.apache.usergrid.persistence.qakka.distributed.actors.QueueRefresher; +import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; +import java.util.Queue; +import java.util.UUID; +import java.util.concurrent.ConcurrentLinkedQueue; + + +@Singleton +public class InMemoryQueue { + private static final Logger logger = LoggerFactory.getLogger( QueueRefresher.class ); + + private final Map<String, Queue<DatabaseQueueMessage>> queuesByName; + private final Map<String, UUID> newestByQueueName; + + + @Inject + InMemoryQueue(QakkaFig qakkaFig) { + queuesByName = new HashMap<>( qakkaFig.getQueueInMemorySize() ); + newestByQueueName = new HashMap<>( qakkaFig.getQueueInMemorySize() ); + } + + private Queue<DatabaseQueueMessage> getQueue( String queueName ) { + synchronized ( queuesByName ) { + if ( !queuesByName.containsKey( queueName )) { + queuesByName.put( queueName, new ConcurrentLinkedQueue<>() ); + } + return queuesByName.get( queueName ); + } + } + + public void add( String queueName, DatabaseQueueMessage databaseQueueMessage ) { + UUID newest = newestByQueueName.get( queueName ); + if ( newest == null ) { + newest = databaseQueueMessage.getQueueMessageId(); + } else { + if ( databaseQueueMessage.getQueueMessageId().compareTo( newest ) > 0 ) { + newest = databaseQueueMessage.getQueueMessageId(); + } + } + newestByQueueName.put( queueName, newest ); + getQueue( queueName ).add( databaseQueueMessage ); + } + + public UUID getNewest( String queueName ) { + return newestByQueueName.get( queueName ); + } + + public DatabaseQueueMessage poll( String queueName ) { + return getQueue( queueName ).poll(); + } + + public int size( String queueName ) { + return getQueue( queueName ).size(); + } +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueManagerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueManagerImpl.java new file mode 100644 index 0000000..bbb46a8 --- /dev/null +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueManagerImpl.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.qakka.core.impl; + +import com.google.inject.Inject; +import org.apache.commons.lang3.StringUtils; +import org.apache.usergrid.persistence.actorsystem.ActorSystemFig; +import org.apache.usergrid.persistence.qakka.core.QakkaUtils; +import org.apache.usergrid.persistence.qakka.core.Queue; +import org.apache.usergrid.persistence.qakka.core.QueueManager; +import org.apache.usergrid.persistence.qakka.core.Regions; +import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService; +import org.apache.usergrid.persistence.qakka.serialization.queues.DatabaseQueue; +import org.apache.usergrid.persistence.qakka.serialization.queues.QueueSerialization; +import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard; +import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardSerialization; + +import java.util.ArrayList; +import java.util.List; + + +public class QueueManagerImpl implements QueueManager { + private final ActorSystemFig actorSystemFig; + private final QueueSerialization queueSerialization; + private final DistributedQueueService distributedQueueService; + private final ShardSerialization shardSerialization; + + + @Inject + public QueueManagerImpl( + ActorSystemFig actorSystemFig, + QueueSerialization queueSerialization, + DistributedQueueService distributedQueueService, + ShardSerialization shardSerialization ) { + + this.actorSystemFig = actorSystemFig; + this.queueSerialization = queueSerialization; + this.distributedQueueService = distributedQueueService; + this.shardSerialization = shardSerialization; + } + + @Override + public void createQueue(Queue queue) { + + queueSerialization.writeQueue(queue.toDatabaseQueue()); + + List<String> regions = new ArrayList<>(); + + if ( Regions.LOCAL.equals( queue.getRegions() ) || StringUtils.isEmpty( queue.getRegions() ) ) { + regions.add( actorSystemFig.getRegionLocal() ); + + } else if ( Regions.ALL.equals( queue.getRegions() )) { + for ( String region : actorSystemFig.getRegionsList().split(",")) { + regions.add( region ); + } + + } else { + for (String region : queue.getRegions().split( "," )) { + regions.add( region ); + } + } + + for ( String region : regions ) { + + Shard available = new Shard( queue.getName(), region, Shard.Type.DEFAULT, 1L, QakkaUtils.getTimeUuid()); + shardSerialization.createShard( available ); + + Shard inflight = new Shard( queue.getName(), region, Shard.Type.INFLIGHT, 1L, QakkaUtils.getTimeUuid()); + shardSerialization.createShard( inflight ); + } + + distributedQueueService.initQueue( queue.getName() ); + distributedQueueService.refreshQueue( queue.getName() ); + } + + @Override + public void updateQueueConfig(Queue queue) { + + queueSerialization.writeQueue(queue.toDatabaseQueue()); + + distributedQueueService.initQueue( queue.getName() ); + distributedQueueService.refreshQueue( queue.getName() ); + } + + @Override + public void deleteQueue(String queueName) { + + queueSerialization.deleteQueue(queueName); + + // TODO: implement delete queue for Akka, stop schedulers, etc. + //qas.deleteQueue(queueName); + } + + @Override + public Queue getQueueConfig(String queueName) { + + DatabaseQueue databaseQueue = queueSerialization.getQueue(queueName); + if ( databaseQueue != null ) { + return new Queue( databaseQueue ); + } + return null; + } + + @Override + public List<String> getListOfQueues() { + return queueSerialization.getListOfQueues(); + } +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java new file mode 100644 index 0000000..bcd0f58 --- /dev/null +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java @@ -0,0 +1,299 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.qakka.core.impl; + +import com.google.inject.Inject; +import com.google.inject.Singleton; +import org.apache.usergrid.persistence.actorsystem.ActorSystemFig; +import org.apache.usergrid.persistence.qakka.api.URIStrategy; +import org.apache.usergrid.persistence.qakka.core.QakkaUtils; +import org.apache.usergrid.persistence.qakka.core.QueueManager; +import org.apache.usergrid.persistence.qakka.core.QueueMessage; +import org.apache.usergrid.persistence.qakka.core.QueueMessageManager; +import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService; +import org.apache.usergrid.persistence.qakka.exceptions.BadRequestException; +import org.apache.usergrid.persistence.qakka.exceptions.NotFoundException; +import org.apache.usergrid.persistence.qakka.exceptions.QakkaRuntimeException; +import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage; +import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessageBody; +import org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization; +import org.apache.usergrid.persistence.qakka.serialization.transferlog.TransferLogSerialization; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.UnsupportedEncodingException; +import java.net.URISyntaxException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.UUID; + + +@Singleton +public class QueueMessageManagerImpl implements QueueMessageManager { + + private static final Logger logger = LoggerFactory.getLogger( QueueMessageManagerImpl.class ); + + private final ActorSystemFig actorSystemFig; + private final QueueManager queueManager; + private final QueueMessageSerialization queueMessageSerialization; + private final DistributedQueueService distributedQueueService; + private final TransferLogSerialization transferLogSerialization; + private final URIStrategy uriStrategy; + + + @Inject + public QueueMessageManagerImpl( + ActorSystemFig actorSystemFig, + QueueManager queueManager, + QueueMessageSerialization queueMessageSerialization, + DistributedQueueService distributedQueueService, + TransferLogSerialization transferLogSerialization, + URIStrategy uriStrategy + ) { + + this.actorSystemFig = actorSystemFig; + this.queueManager = queueManager; + this.queueMessageSerialization = queueMessageSerialization; + this.distributedQueueService = distributedQueueService; + this.transferLogSerialization = transferLogSerialization; + this.uriStrategy = uriStrategy; + } + + + @Override + public void sendMessages(String queueName, List<String> destinationRegions, + Long delayMs, Long expirationSecs, String contentType, ByteBuffer messageData) { + + // TODO: implement delay and expiration + +// Preconditions.checkArgument(delayMs == null || delayMs > 0L, +// "Delay milliseconds must be greater than zero"); +// Preconditions.checkArgument(expirationSecs == null || expirationSecs > 0L, +// "Expiration seconds must be greater than zero"); + + if ( queueManager.getQueueConfig( queueName ) == null ) { + throw new NotFoundException( "Queue not found: " + queueName ); + } + + // get current time + Long currentTimeMs = System.currentTimeMillis(); + + // create message id + UUID messageId = QakkaUtils.getTimeUuid(); + + Long deliveryTime = delayMs != null ? currentTimeMs + delayMs : null; + Long expirationTime = expirationSecs != null ? currentTimeMs + (1000 * expirationSecs) : null; + + // write message data to C* + queueMessageSerialization.writeMessageData( + messageId, new DatabaseQueueMessageBody(messageData, contentType)); + + for (String region : destinationRegions) { + + transferLogSerialization.recordTransferLog( + queueName, actorSystemFig.getRegionLocal(), region, messageId ); + + // send message to destination region's queue + DistributedQueueService.Status status = null; + try { + status = distributedQueueService.sendMessageToRegion( + queueName, + actorSystemFig.getRegionLocal(), + region, + messageId, + deliveryTime, + expirationTime ); + + //logger.debug("Send message to queueName {} in region {}", queueName, region ); + + } catch ( QakkaRuntimeException qae ) { + logger.error("Error sending message " + messageId + " to " + region, qae); + } + } + } + + + @Override + public List<QueueMessage> getNextMessages(String queueName, int count) { + + if ( queueManager.getQueueConfig( queueName ) == null ) { + throw new NotFoundException( "Queue not found: " + queueName ); + } + + Collection<DatabaseQueueMessage> dbMessages = distributedQueueService.getNextMessages( queueName, count ); + + List<QueueMessage> queueMessages = joinMessages( queueName, dbMessages ); + + if ( queueMessages.size() < count && queueMessages.size() < dbMessages.size() ) { + logger.debug("Messages failed to join for queue:{}, get more", queueName); + + // some messages failed to join, get more + dbMessages = distributedQueueService.getNextMessages( queueName, count - queueMessages.size() ); + queueMessages.addAll( joinMessages( queueName, dbMessages ) ); + } + + return queueMessages; + } + + + private List<QueueMessage> joinMessages( String queueName, Collection<DatabaseQueueMessage> dbMessages) { + + List<QueueMessage> queueMessages = new ArrayList<>(); + + for (DatabaseQueueMessage dbMessage : dbMessages) { + + DatabaseQueueMessageBody data = queueMessageSerialization.loadMessageData( dbMessage.getMessageId() ); + + if ( data != null ) { + + QueueMessage queueMessage = new QueueMessage( + dbMessage.getQueueMessageId(), + queueName, + null, // sending region + dbMessage.getRegion(), // receiving region + dbMessage.getMessageId(), + null, // delay until date + null, // expiration date + dbMessage.getQueuedAt(), + null, // retries + true ); + + queueMessage.setContentType( data.getContentType() ); + if ( "application/json".equals( data.getContentType() )) { + try { + String json = new String( data.getBlob().array(), "UTF-8"); + queueMessage.setData( json ); + + } catch (UnsupportedEncodingException e) { + logger.error("Error unencoding data for messageId=" + queueMessage.getMessageId(), e); + } + } else { + try { + queueMessage.setHref( uriStrategy.queueMessageDataURI( + queueName, queueMessage.getQueueMessageId()).toString()); + + } catch (URISyntaxException e) { + throw new QakkaRuntimeException( "Error forming URI for message data", e ); + } + } + + queueMessages.add( queueMessage ); + } + } + + return queueMessages; + } + + + @Override + public void ackMessage(String queueName, UUID queueMessageId) { + + if ( queueManager.getQueueConfig( queueName ) == null ) { + throw new NotFoundException( "Queue not found: " + queueName ); + } + + DistributedQueueService.Status status = distributedQueueService.ackMessage( queueName, queueMessageId ); + + if ( DistributedQueueService.Status.BAD_REQUEST.equals( status )) { + throw new BadRequestException( "Message not inflight" ); + + } else if ( DistributedQueueService.Status.ERROR.equals( status )) { + throw new QakkaRuntimeException( "Unable to ack message due to error" ); + } + } + + + @Override + public void requeueMessage(String queueName, UUID messageId, Long delayMs) { + + if ( queueManager.getQueueConfig( queueName ) == null ) { + throw new NotFoundException( "Queue not found: " + queueName ); + } + + // TODO: implement requeueMessage + + throw new UnsupportedOperationException( "requeueMessage not yet implemented" ); + } + + + @Override + public void clearMessages(String queueName) { + + if ( queueManager.getQueueConfig( queueName ) == null ) { + throw new NotFoundException( "Queue not found: " + queueName ); + } + + // TODO: implement clearMessages + + throw new UnsupportedOperationException( "clearMessages not yet implemented" ); + } + + + @Override + public ByteBuffer getMessageData( UUID messageId ) { + DatabaseQueueMessageBody body = queueMessageSerialization.loadMessageData( messageId ); + return body != null ? body.getBlob() : null; + } + + + /** + * Get but do not put inflight specified queue message, first looking in INFLIGHT table then DEFAULT. + */ + @Override + public QueueMessage getMessage( String queueName, UUID queueMessageId ) { + + QueueMessage queueMessage = null; + + // first look in INFLIGHT storage + + + DatabaseQueueMessage dbMessage = queueMessageSerialization.loadMessage( + queueName, actorSystemFig.getRegionLocal(), null, + DatabaseQueueMessage.Type.INFLIGHT, queueMessageId ); + + if ( dbMessage == null ) { + + // not found, so now look in DEFAULT storage + + dbMessage = queueMessageSerialization.loadMessage( + queueName, actorSystemFig.getRegionLocal(), null, + DatabaseQueueMessage.Type.DEFAULT, queueMessageId ); + } + + if ( dbMessage != null ) { + queueMessage = new QueueMessage( + dbMessage.getQueueMessageId(), + queueName, + null, // sending region + dbMessage.getRegion(), // receiving region + dbMessage.getMessageId(), + null, // delay until date + null, // expiration date + dbMessage.getQueuedAt(), + null, // retries + true ); + } + + return queueMessage; + } + +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/DistributedQueueService.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/DistributedQueueService.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/DistributedQueueService.java new file mode 100644 index 0000000..c2ca6b1 --- /dev/null +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/DistributedQueueService.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.qakka.distributed; + +import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage; + +import java.util.Collection; +import java.util.UUID; + + +/** + * Interface to distributed part of Qakka queue implementation. + */ +public interface DistributedQueueService { + + enum Status { SUCCESS, ERROR, BAD_REQUEST }; + + void init(); + + void initQueue(String queueName); + + void refresh(); + + void refreshQueue(String queueName); + + void processTimeouts(); + + Status sendMessageToRegion( + String queueName, + String sourceRegion, + String destRegion, + UUID messageId, + Long deliveryTime, + Long expirationTime); + + Collection<DatabaseQueueMessage> getNextMessages(String queueName, int numMessages); + + Status ackMessage(String queueName, UUID messageId); + + Status requeueMessage(String queueName, UUID messageId); + + Status clearMessages(String queueName); +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java new file mode 100644 index 0000000..6ecffba --- /dev/null +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.qakka.distributed.actors; + +import akka.actor.ActorRef; +import akka.actor.Cancellable; +import akka.actor.Props; +import akka.actor.UntypedActor; +import com.codahale.metrics.Timer; +import com.google.inject.Injector; +import org.apache.usergrid.persistence.qakka.App; +import org.apache.usergrid.persistence.qakka.MetricsService; +import org.apache.usergrid.persistence.qakka.QakkaFig; +import org.apache.usergrid.persistence.qakka.core.impl.InMemoryQueue; +import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService; +import org.apache.usergrid.persistence.qakka.distributed.messages.*; +import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.duration.Duration; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + + +public class QueueActor extends UntypedActor { + private static final Logger logger = LoggerFactory.getLogger( QueueActor.class ); + + private final QakkaFig qakkaFig; + private final InMemoryQueue inMemoryQueue; + private final QueueActorHelper queueActorHelper; + private final MetricsService metricsService; + + private final Map<String, Cancellable> refreshSchedulersByQueueName = new HashMap<>(); + private final Map<String, Cancellable> timeoutSchedulersByQueueName = new HashMap<>(); + private final Map<String, Cancellable> shardAllocationSchedulersByQueueName = new HashMap<>(); + + private final Map<String, ActorRef> queueReadersByQueueName = new HashMap<>(); + private final Map<String, ActorRef> queueTimeoutersByQueueName = new HashMap<>(); + private final Map<String, ActorRef> shardAllocatorsByQueueName = new HashMap<>(); + + + public QueueActor() { + + Injector injector = App.INJECTOR; + + qakkaFig = injector.getInstance( QakkaFig.class ); + inMemoryQueue = injector.getInstance( InMemoryQueue.class ); + queueActorHelper = injector.getInstance( QueueActorHelper.class ); + metricsService = injector.getInstance( MetricsService.class ); + } + + @Override + public void onReceive(Object message) { + + if ( message instanceof QueueInitRequest) { + QueueInitRequest request = (QueueInitRequest)message; + + if ( refreshSchedulersByQueueName.get( request.getQueueName() ) == null ) { + Cancellable scheduler = getContext().system().scheduler().schedule( + Duration.create( 0, TimeUnit.MILLISECONDS), + Duration.create( qakkaFig.getQueueRefreshMilliseconds(), TimeUnit.MILLISECONDS), + self(), + new QueueRefreshRequest( request.getQueueName() ), + getContext().dispatcher(), + getSelf()); + refreshSchedulersByQueueName.put( request.getQueueName(), scheduler ); + } + + if ( timeoutSchedulersByQueueName.get( request.getQueueName() ) == null ) { + Cancellable scheduler = getContext().system().scheduler().schedule( + Duration.create( 0, TimeUnit.MILLISECONDS), + Duration.create( qakkaFig.getQueueTimeoutSeconds()/2, TimeUnit.SECONDS), + self(), + new QueueTimeoutRequest( request.getQueueName() ), + getContext().dispatcher(), + getSelf()); + timeoutSchedulersByQueueName.put( request.getQueueName(), scheduler ); + } + + if ( shardAllocationSchedulersByQueueName.get( request.getQueueName() ) == null ) { + Cancellable scheduler = getContext().system().scheduler().schedule( + Duration.create( 0, TimeUnit.MILLISECONDS), + Duration.create( qakkaFig.getShardAllocationCheckFrequencyMillis(), TimeUnit.MILLISECONDS), + self(), + new ShardCheckRequest( request.getQueueName() ), + getContext().dispatcher(), + getSelf()); + shardAllocationSchedulersByQueueName.put( request.getQueueName(), scheduler ); + } + + } else if ( message instanceof QueueRefreshRequest ) { + QueueRefreshRequest request = (QueueRefreshRequest)message; + + if ( queueReadersByQueueName.get( request.getQueueName() ) == null ) { + ActorRef readerRef = getContext().actorOf( Props.create( + QueueRefresher.class, request.getQueueName()), request.getQueueName() + "_reader"); + queueReadersByQueueName.put( request.getQueueName(), readerRef ); + } + + // hand-off to queue's reader + queueReadersByQueueName.get( request.getQueueName() ).tell( request, self() ); + + } else if ( message instanceof QueueTimeoutRequest ) { + QueueTimeoutRequest request = (QueueTimeoutRequest)message; + + if ( queueTimeoutersByQueueName.get( request.getQueueName() ) == null ) { + ActorRef readerRef = getContext().actorOf( Props.create( + QueueTimeouter.class, request.getQueueName()), request.getQueueName() + "_timeouter"); + queueTimeoutersByQueueName.put( request.getQueueName(), readerRef ); + } + + // hand-off to queue's timeouter + queueTimeoutersByQueueName.get( request.getQueueName() ).tell( request, self() ); + + + } else if ( message instanceof ShardCheckRequest ) { + ShardCheckRequest request = (ShardCheckRequest)message; + + if ( shardAllocatorsByQueueName.get( request.getQueueName() ) == null ) { + ActorRef readerRef = getContext().actorOf( Props.create( + ShardAllocator.class, request.getQueueName()), request.getQueueName() + "_shard_allocator"); + shardAllocatorsByQueueName.put( request.getQueueName(), readerRef ); + } + + // hand-off to queue's shard allocator + shardAllocatorsByQueueName.get( request.getQueueName() ).tell( request, self() ); + + + } else if ( message instanceof QueueGetRequest) { + + Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.GET_TIME_GET ).time(); + try { + QueueGetRequest queueGetRequest = (QueueGetRequest) message; + + Collection<DatabaseQueueMessage> queueMessages = new ArrayList<>(); + + while (queueMessages.size() < queueGetRequest.getNumRequested()) { + + DatabaseQueueMessage queueMessage = inMemoryQueue.poll( queueGetRequest.getQueueName() ); + + if (queueMessage != null) { + if (queueActorHelper.putInflight( queueGetRequest.getQueueName(), queueMessage )) { + queueMessages.add( queueMessage ); + } + } else { + logger.debug("in-memory queue for {} is empty, object is: {}", + queueGetRequest.getQueueName(), inMemoryQueue ); + break; + } + } + + getSender().tell( new QueueGetResponse( + DistributedQueueService.Status.SUCCESS, queueMessages ), getSender() ); + + } finally { + timer.close(); + } + + + } else if ( message instanceof QueueAckRequest) { + + Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.ACK_TIME_ACK ).time(); + try { + + QueueAckRequest queueAckRequest = (QueueAckRequest) message; + + DistributedQueueService.Status status = queueActorHelper.ackQueueMessage( + queueAckRequest.getQueueName(), + queueAckRequest.getQueueMessageId() ); + + getSender().tell( new QueueAckResponse( + queueAckRequest.getQueueName(), + queueAckRequest.getQueueMessageId(), + status ), getSender() ); + + } finally { + timer.close(); + } + + } else { + unhandled( message ); + } + + } + +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java new file mode 100644 index 0000000..26db903 --- /dev/null +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.qakka.distributed.actors; + +import com.google.inject.Inject; +import org.apache.usergrid.persistence.actorsystem.ActorSystemFig; +import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService; +import org.apache.usergrid.persistence.qakka.serialization.auditlog.AuditLog; +import org.apache.usergrid.persistence.qakka.serialization.auditlog.AuditLogSerialization; +import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage; +import org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.UUID; + + +public class QueueActorHelper { + private static final Logger logger = LoggerFactory.getLogger( QueueActorHelper.class ); + + private final ActorSystemFig actorSystemFig; + private final QueueMessageSerialization messageSerialization; + private final AuditLogSerialization auditLogSerialization; + + + @Inject + public QueueActorHelper( + ActorSystemFig actorSystemFig, + QueueMessageSerialization messageSerialization, + AuditLogSerialization auditLogSerialization + ) { + + this.actorSystemFig = actorSystemFig; + this.messageSerialization = messageSerialization; + this.auditLogSerialization = auditLogSerialization; + } + + + DatabaseQueueMessage loadDatabaseQueueMessage( + String queueName, UUID queueMessageId, DatabaseQueueMessage.Type type ) { + + try { + return messageSerialization.loadMessage( + queueName, + actorSystemFig.getRegionLocal(), + null, + type, + queueMessageId ); + + } catch (Throwable t) { + logger.error( "Error reading queueMessage", t ); + } + + return null; + } + + + DistributedQueueService.Status ackQueueMessage(String queueName, UUID queueMessageId ) { + + DatabaseQueueMessage queueMessage = loadDatabaseQueueMessage( + queueName, queueMessageId, DatabaseQueueMessage.Type.INFLIGHT ); + + if ( queueMessage == null ) { + return DistributedQueueService.Status.BAD_REQUEST; + } + + boolean error = false; + try { + messageSerialization.deleteMessage( + queueName, + actorSystemFig.getRegionLocal(), + null, + DatabaseQueueMessage.Type.INFLIGHT, + queueMessageId ); + + } catch (Throwable t) { + logger.error( "Error deleting queueMessage for ack", t ); + error = true; + } + + if ( !error ) { + + auditLogSerialization.recordAuditLog( + AuditLog.Action.ACK, + AuditLog.Status.SUCCESS, + queueName, + actorSystemFig.getRegionLocal(), + queueMessage.getMessageId(), + queueMessageId ); + + return DistributedQueueService.Status.SUCCESS; + + } else { + + auditLogSerialization.recordAuditLog( + AuditLog.Action.ACK, + AuditLog.Status.ERROR, + queueName, + actorSystemFig.getRegionLocal(), + queueMessage.getMessageId(), + queueMessageId ); + + return DistributedQueueService.Status.ERROR; + } + } + + + boolean putInflight( String queueName, DatabaseQueueMessage queueMessage ) { + + UUID qmid = queueMessage.getQueueMessageId(); + try { + queueMessage.setType( DatabaseQueueMessage.Type.INFLIGHT ); + queueMessage.setShardId( null ); + queueMessage.setInflightAt( System.currentTimeMillis() ); + messageSerialization.writeMessage( queueMessage ); + + messageSerialization.deleteMessage( + queueName, + actorSystemFig.getRegionLocal(), + null, + DatabaseQueueMessage.Type.DEFAULT, + qmid); + + //logger.debug("Put message {} inflight for queue name {}", qmid, queueName); + + } catch ( Throwable t ) { + logger.error("Error putting inflight queue message " + qmid + " queue name: " + queueName, t); + + auditLogSerialization.recordAuditLog( + AuditLog.Action.GET, + AuditLog.Status.ERROR, + queueName, + actorSystemFig.getRegionLocal(), + queueMessage.getMessageId(), + qmid); + + return false; + } + + auditLogSerialization.recordAuditLog( + AuditLog.Action.GET, + AuditLog.Status.SUCCESS, + queueName, + actorSystemFig.getRegionLocal(), + queueMessage.getMessageId(), + qmid); + + return true; + } +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java new file mode 100644 index 0000000..97e591c --- /dev/null +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.qakka.distributed.actors; + +import akka.actor.ActorRef; +import akka.actor.Props; +import akka.actor.UntypedActor; +import akka.routing.ConsistentHashingRouter; +import akka.routing.FromConfig; +import org.apache.usergrid.persistence.qakka.distributed.messages.*; + + +/** + * Use consistent hashing to route messages to QueueActors + */ +public class QueueActorRouter extends UntypedActor { + + private final ActorRef routerRef; + + + public QueueActorRouter() { + routerRef = getContext().actorOf( + FromConfig.getInstance().props( Props.create(QueueActor.class)), "router"); + } + + @Override + public void onReceive(Object message) { + + // TODO: can we do something smarter than this if-then-else structure + // e.g. if message is recognized as one of ours, then we just pass it on? + + if ( message instanceof QueueGetRequest) { + QueueGetRequest qgr = (QueueGetRequest) message; + + ConsistentHashingRouter.ConsistentHashableEnvelope envelope = + new ConsistentHashingRouter.ConsistentHashableEnvelope( message, qgr.getQueueName() ); + routerRef.tell( envelope, getSender() ); + + } else if ( message instanceof QueueAckRequest) { + QueueAckRequest qar = (QueueAckRequest)message; + + ConsistentHashingRouter.ConsistentHashableEnvelope envelope = + new ConsistentHashingRouter.ConsistentHashableEnvelope( message, qar.getQueueName() ); + routerRef.tell( envelope, getSender()); + + } else if ( message instanceof QueueInitRequest) { + QueueInitRequest qar = (QueueInitRequest)message; + + ConsistentHashingRouter.ConsistentHashableEnvelope envelope = + new ConsistentHashingRouter.ConsistentHashableEnvelope( message, qar.getQueueName() ); + routerRef.tell( envelope, getSender()); + + } else if ( message instanceof QueueRefreshRequest) { + QueueRefreshRequest qar = (QueueRefreshRequest)message; + + ConsistentHashingRouter.ConsistentHashableEnvelope envelope = + new ConsistentHashingRouter.ConsistentHashableEnvelope( message, qar.getQueueName() ); + routerRef.tell( envelope, getSender()); + + } else if ( message instanceof QueueTimeoutRequest) { + QueueTimeoutRequest qar = (QueueTimeoutRequest)message; + + ConsistentHashingRouter.ConsistentHashableEnvelope envelope = + new ConsistentHashingRouter.ConsistentHashableEnvelope( message, qar.getQueueName() ); + routerRef.tell( envelope, getSender()); + + } else if ( message instanceof ShardCheckRequest) { + ShardCheckRequest qar = (ShardCheckRequest)message; + + ConsistentHashingRouter.ConsistentHashableEnvelope envelope = + new ConsistentHashingRouter.ConsistentHashableEnvelope( message, qar.getQueueName() ); + routerRef.tell( envelope, getSender()); + + } else { + unhandled(message); + } + } +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java new file mode 100644 index 0000000..03ab1ec --- /dev/null +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.qakka.distributed.actors; + +import akka.actor.UntypedActor; +import com.codahale.metrics.Timer; +import com.google.inject.Injector; +import org.apache.usergrid.persistence.actorsystem.ActorSystemFig; +import org.apache.usergrid.persistence.qakka.App; +import org.apache.usergrid.persistence.qakka.MetricsService; +import org.apache.usergrid.persistence.qakka.QakkaFig; +import org.apache.usergrid.persistence.qakka.core.CassandraClient; +import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl; +import org.apache.usergrid.persistence.qakka.core.impl.InMemoryQueue; +import org.apache.usergrid.persistence.qakka.distributed.messages.QueueRefreshRequest; +import org.apache.usergrid.persistence.qakka.exceptions.QakkaRuntimeException; +import org.apache.usergrid.persistence.qakka.serialization.MultiShardMessageIterator; +import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage; +import org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization; +import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard; +import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Optional; +import java.util.UUID; + + +public class QueueRefresher extends UntypedActor { + private static final Logger logger = LoggerFactory.getLogger( QueueRefresher.class ); + + private final String queueName; + + private final QueueMessageSerialization serialization; + private final InMemoryQueue inMemoryQueue; + private final QakkaFig qakkaFig; + private final ActorSystemFig actorSystemFig; + private final MetricsService metricsService; + private final CassandraClient cassandraClient; + + public QueueRefresher(String queueName ) { + this.queueName = queueName; + + Injector injector = App.INJECTOR; + + serialization = injector.getInstance( QueueMessageSerialization.class ); + inMemoryQueue = injector.getInstance( InMemoryQueue.class ); + qakkaFig = injector.getInstance( QakkaFig.class ); + actorSystemFig = injector.getInstance( ActorSystemFig.class ); + metricsService = injector.getInstance( MetricsService.class ); + cassandraClient = injector.getInstance( CassandraClientImpl.class ); + } + + + @Override + public void onReceive(Object message) { + + if ( message instanceof QueueRefreshRequest ) { + + QueueRefreshRequest request = (QueueRefreshRequest) message; + + if (!request.getQueueName().equals( queueName )) { + throw new QakkaRuntimeException( + "QueueWriter for " + queueName + ": Incorrect queueName " + request.getQueueName() ); + } + + Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.REFRESH_TIME).time(); + + try { + + if (inMemoryQueue.size( queueName ) < qakkaFig.getQueueInMemorySize()) { + + ShardIterator shardIterator = new ShardIterator( + cassandraClient, queueName, actorSystemFig.getRegionLocal(), + Shard.Type.DEFAULT, Optional.empty() ); + + UUID since = inMemoryQueue.getNewest( queueName ); + + String region = actorSystemFig.getRegionLocal(); + MultiShardMessageIterator multiShardIterator = new MultiShardMessageIterator( + cassandraClient, queueName, region, DatabaseQueueMessage.Type.DEFAULT, + shardIterator, since); + + int need = qakkaFig.getQueueInMemorySize() - inMemoryQueue.size( queueName ); + int count = 0; + + while ( multiShardIterator.hasNext() && count < need ) { + DatabaseQueueMessage queueMessage = multiShardIterator.next(); + inMemoryQueue.add( queueName, queueMessage ); + count++; + } + + if ( count > 0 ) { + logger.debug( "Added {} in-memory for queue {}, new size = {}", + count, queueName, inMemoryQueue.size( queueName ) ); + } + } + + } finally { + timer.close(); + } + + } else { + unhandled( message ); + } + } +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java new file mode 100644 index 0000000..8bd733b --- /dev/null +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.qakka.distributed.actors; + +import akka.actor.ActorRef; +import akka.actor.UntypedActor; +import akka.cluster.client.ClusterClient; +import akka.pattern.Patterns; +import akka.util.Timeout; +import com.codahale.metrics.Timer; +import com.google.inject.Injector; +import org.apache.commons.lang3.RandomStringUtils; +import org.apache.usergrid.persistence.actorsystem.ActorSystemFig; +import org.apache.usergrid.persistence.actorsystem.ActorSystemManager; +import org.apache.usergrid.persistence.qakka.App; +import org.apache.usergrid.persistence.qakka.MetricsService; +import org.apache.usergrid.persistence.qakka.QakkaFig; +import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService; +import org.apache.usergrid.persistence.qakka.distributed.messages.QueueSendRequest; +import org.apache.usergrid.persistence.qakka.distributed.messages.QueueSendResponse; +import org.apache.usergrid.persistence.qakka.distributed.messages.QueueWriteRequest; +import org.apache.usergrid.persistence.qakka.distributed.messages.QueueWriteResponse; +import org.apache.usergrid.persistence.qakka.exceptions.QakkaException; +import org.apache.usergrid.persistence.qakka.exceptions.QakkaRuntimeException; +import org.apache.usergrid.persistence.qakka.serialization.auditlog.AuditLog; +import org.apache.usergrid.persistence.qakka.serialization.auditlog.AuditLogSerialization; +import org.apache.usergrid.persistence.qakka.serialization.transferlog.TransferLogSerialization; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.Await; +import scala.concurrent.Future; + +import java.util.UUID; +import java.util.concurrent.TimeUnit; + + +public class QueueSender extends UntypedActor { + private static final Logger logger = LoggerFactory.getLogger( QueueSender.class ); + + private final String name = RandomStringUtils.randomAlphanumeric( 4 ); + + private final ActorSystemManager actorSystemManager; + private final TransferLogSerialization transferLogSerialization; + private final AuditLogSerialization auditLogSerialization; + private final ActorSystemFig actorSystemFig; + private final QakkaFig qakkaFig; + private final MetricsService metricsService; + + public QueueSender() { + + Injector injector = App.INJECTOR; + + actorSystemManager = injector.getInstance( ActorSystemManager.class ); + transferLogSerialization = injector.getInstance( TransferLogSerialization.class ); + auditLogSerialization = injector.getInstance( AuditLogSerialization.class ); + actorSystemFig = injector.getInstance( ActorSystemFig.class ); + qakkaFig = injector.getInstance( QakkaFig.class ); + metricsService = injector.getInstance( MetricsService.class ); + } + + @Override + public void onReceive(Object message) { + + if ( message instanceof QueueSendRequest) { + QueueSendRequest qa = (QueueSendRequest) message; + + // as far as caller is concerned, we are done. + getSender().tell( new QueueSendResponse( + DistributedQueueService.Status.SUCCESS ), getSender() ); + + final QueueWriter.WriteStatus writeStatus = sendMessageToRegion( + qa.getQueueName(), + qa.getSourceRegion(), + qa.getDestRegion(), + qa.getMessageId(), + qa.getDeliveryTime(), + qa.getExpirationTime() ); + + logResponse( writeStatus, qa.getQueueName(), qa.getDestRegion(), qa.getMessageId() ); + + } else { + unhandled( message ); + } + } + + + QueueWriter.WriteStatus sendMessageToRegion( + + String queueName, + String sourceRegion, + String destRegion, + UUID messageId, + Long deliveryTime, + Long expirationTime ) { + + Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.SEND_TIME_SEND ).time(); + try { + + int maxRetries = qakkaFig.getMaxSendRetries(); + int retries = 0; + + QueueWriteRequest request = new QueueWriteRequest( + queueName, sourceRegion, destRegion, messageId, deliveryTime, expirationTime ); + + while (retries++ < maxRetries) { + try { + Timeout t = new Timeout( qakkaFig.getSendTimeoutSeconds(), TimeUnit.SECONDS ); + + Future<Object> fut; + + if (actorSystemManager.getCurrentRegion().equals( destRegion )) { + + // send to current region via local clientActor + ActorRef clientActor = actorSystemManager.getClientActor(); + fut = Patterns.ask( clientActor, request, t ); + + } else { + + // send to remote region via cluster client for that region + ActorRef clusterClient = actorSystemManager.getClusterClient( destRegion ); + fut = Patterns.ask( + clusterClient, new ClusterClient.Send( "/user/clientActor", request ), t ); + } + + // wait for response... + final Object response = Await.result( fut, t.duration() ); + + if (response != null && response instanceof QueueWriteResponse) { + QueueWriteResponse qarm = (QueueWriteResponse) response; + if (!QueueWriter.WriteStatus.ERROR.equals( qarm.getSendStatus() )) { + + if (retries > 1) { + logger.debug( "queueAdd TOTAL_SUCCESS after {} retries", retries ); + } + return qarm.getSendStatus(); + + } else { + logger.debug( "ERROR STATUS adding to queue, retrying {}", retries ); + } + + } else if (response != null) { + logger.debug( "NULL RESPONSE adding to queue, retrying {}", retries ); + + } else { + logger.debug( "TIMEOUT adding to queue, retrying {}", retries ); + } + + } catch (Exception e) { + logger.debug( "ERROR adding to queue, retrying " + retries, e ); + } + } + + throw new QakkaRuntimeException( "Error adding to queue after " + retries ); + + } finally { + timer.stop(); + } + } + + + void logResponse( QueueWriter.WriteStatus writeStatus, String queueName, String region, UUID messageId ) { + + if ( writeStatus != null + && writeStatus.equals( QueueWriter.WriteStatus.ERROR ) ) { + + logger.debug( "ERROR status sending message: {}, {}, {}, {}", + new Object[]{queueName, actorSystemFig.getRegionLocal(), region, messageId} ); + + auditLogSerialization.recordAuditLog( + AuditLog.Action.SEND, + AuditLog.Status.ERROR, + queueName, + region, + messageId, + null); + + } else if ( writeStatus != null + && writeStatus.equals( QueueWriter.WriteStatus.SUCCESS_XFERLOG_NOTDELETED ) ) { + + //logger.debug( "Delivery Success, now removing transfer log: {}, {}, {}, {}", + // new Object[]{queueName, actorSystemFig.getRegionLocal(), region, messageId} ); + + // queue actor failed to clean up transfer log + try { + transferLogSerialization.removeTransferLog( + queueName, actorSystemFig.getRegionLocal(), region, messageId ); + + } catch (QakkaException se) { + logger.error( "Unable to delete remove transfer log for {}, {}, {}, {}", + new Object[]{queueName, actorSystemFig.getRegionLocal(), region, messageId} ); + logger.debug( "Unable to delete remove transfer log exception is:", se ); + } + + } else if ( writeStatus != null + && writeStatus.equals( QueueWriter.WriteStatus.SUCCESS_XFERLOG_DELETED ) ) { + + //logger.debug( "Delivery Success: {}, {}, {}, {}", + // new Object[]{queueName, actorSystemFig.getRegionLocal(), region, messageId} ); + } + + } + +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSenderRouter.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSenderRouter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSenderRouter.java new file mode 100644 index 0000000..20603a5 --- /dev/null +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSenderRouter.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.qakka.distributed.actors; + +import akka.actor.ActorRef; +import akka.actor.Props; +import akka.actor.UntypedActor; +import akka.routing.FromConfig; +import org.apache.usergrid.persistence.qakka.distributed.messages.QueueSendRequest; + + +/** + * Route messages to QueueWriters + */ +public class QueueSenderRouter extends UntypedActor { + + private final ActorRef router; + + + public QueueSenderRouter() { + + router = getContext().actorOf( + FromConfig.getInstance().props(Props.create(QueueSender.class )), "router"); + } + + @Override + public void onReceive(Object message) { + + if ( message instanceof QueueSendRequest) { + router.tell( message, getSender() ); + + } else { + unhandled(message); + } + } +}