Initial integration of Qakka into Usergrid Queue module, and implementation of Qakka-based LegacyQueueManager implementation.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/3075dce1 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/3075dce1 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/3075dce1 Branch: refs/heads/usergrid-1318-queue Commit: 3075dce1631cc7f225ddb34aa08fb16a4a8f486a Parents: 9016fd2 Author: Dave Johnson <[email protected]> Authored: Tue Sep 13 13:44:42 2016 -0400 Committer: Dave Johnson <[email protected]> Committed: Tue Sep 13 13:44:42 2016 -0400 ---------------------------------------------------------------------- stack/core/pom.xml | 2 +- .../usergrid/persistence/entities/Receipt.java | 33 +- .../usergrid/corepersistence/index/RxTest.java | 23 +- stack/corepersistence/queryindex/pom.xml | 1 + .../persistence/index/guice/IndexModule.java | 1 - stack/corepersistence/queue/pom.xml | 156 +++++++ .../apache/usergrid/persistence/qakka/App.java | 94 ++++ .../persistence/qakka/MetricsService.java | 39 ++ .../usergrid/persistence/qakka/QakkaFig.java | 131 ++++++ .../usergrid/persistence/qakka/QakkaModule.java | 120 +++++ .../persistence/qakka/api/ApiResponse.java | 68 +++ .../persistence/qakka/api/QueueResource.java | 443 +++++++++++++++++++ .../persistence/qakka/api/StatusResource.java | 82 ++++ .../persistence/qakka/api/URIStrategy.java | 31 ++ .../qakka/api/impl/BadRequestMapper.java | 43 ++ .../qakka/api/impl/GuiceFeature.java | 45 ++ .../qakka/api/impl/JacksonProvider.java | 44 ++ .../qakka/api/impl/JerseyResourceConfig.java | 31 ++ .../qakka/api/impl/NotFoundMapper.java | 43 ++ .../qakka/api/impl/StartupListener.java | 70 +++ .../qakka/api/impl/URIStrategyLocalhost.java | 49 ++ .../persistence/qakka/core/CassandraClient.java | 29 ++ .../qakka/core/CassandraClientImpl.java | 49 ++ .../persistence/qakka/core/QakkaUtils.java | 44 ++ .../usergrid/persistence/qakka/core/Queue.java | 147 ++++++ .../persistence/qakka/core/QueueManager.java | 35 ++ .../persistence/qakka/core/QueueMessage.java | 186 ++++++++ .../qakka/core/QueueMessageManager.java | 83 ++++ .../persistence/qakka/core/QueueType.java | 26 ++ .../persistence/qakka/core/Regions.java | 84 ++++ .../qakka/core/impl/InMemoryQueue.java | 84 ++++ .../qakka/core/impl/QueueManagerImpl.java | 125 ++++++ .../core/impl/QueueMessageManagerImpl.java | 299 +++++++++++++ .../distributed/DistributedQueueService.java | 60 +++ .../qakka/distributed/actors/QueueActor.java | 207 +++++++++ .../distributed/actors/QueueActorHelper.java | 167 +++++++ .../distributed/actors/QueueActorRouter.java | 95 ++++ .../distributed/actors/QueueRefresher.java | 124 ++++++ .../qakka/distributed/actors/QueueSender.java | 220 +++++++++ .../distributed/actors/QueueSenderRouter.java | 53 +++ .../distributed/actors/QueueTimeouter.java | 147 ++++++ .../qakka/distributed/actors/QueueWriter.java | 152 +++++++ .../distributed/actors/QueueWriterRouter.java | 53 +++ .../distributed/actors/ShardAllocator.java | 153 +++++++ .../impl/DistributedQueueServiceImpl.java | 296 +++++++++++++ .../impl/QueueActorRouterProducer.java | 141 ++++++ .../impl/QueueSenderRouterProducer.java | 134 ++++++ .../impl/QueueWriterRouterProducer.java | 134 ++++++ .../distributed/messages/QakkaMessage.java | 28 ++ .../distributed/messages/QueueAckRequest.java | 52 +++ .../distributed/messages/QueueAckResponse.java | 59 +++ .../distributed/messages/QueueGetRequest.java | 49 ++ .../distributed/messages/QueueGetResponse.java | 63 +++ .../distributed/messages/QueueInitRequest.java | 43 ++ .../messages/QueueRefreshRequest.java | 43 ++ .../distributed/messages/QueueSendRequest.java | 84 ++++ .../distributed/messages/QueueSendResponse.java | 43 ++ .../messages/QueueTimeoutRequest.java | 43 ++ .../distributed/messages/QueueWriteRequest.java | 84 ++++ .../messages/QueueWriteResponse.java | 43 ++ .../distributed/messages/ShardCheckRequest.java | 43 ++ .../qakka/exceptions/BadRequestException.java | 32 ++ .../qakka/exceptions/NotFoundException.java | 32 ++ .../qakka/exceptions/QakkaException.java | 32 ++ .../qakka/exceptions/QakkaRuntimeException.java | 32 ++ .../MultiShardMessageIterator.java | 181 ++++++++ .../persistence/qakka/serialization/Result.java | 34 ++ .../qakka/serialization/auditlog/AuditLog.java | 101 +++++ .../auditlog/AuditLogSerialization.java | 45 ++ .../impl/AuditLogSerializationImpl.java | 148 +++++++ .../queuemessages/DatabaseQueueMessage.java | 155 +++++++ .../queuemessages/DatabaseQueueMessageBody.java | 52 +++ .../MessageCounterSerialization.java | 31 ++ .../QueueMessageSerialization.java | 54 +++ .../impl/MessageCounterSerializationImpl.java | 204 +++++++++ .../impl/QueueMessageSerializationImpl.java | 320 ++++++++++++++ .../serialization/queues/DatabaseQueue.java | 114 +++++ .../queues/QueueSerialization.java | 36 ++ .../queues/impl/QueueSerializationImpl.java | 157 +++++++ .../qakka/serialization/sharding/Shard.java | 111 +++++ .../sharding/ShardCounterSerialization.java | 29 ++ .../serialization/sharding/ShardIterator.java | 142 ++++++ .../sharding/ShardSerialization.java | 35 ++ .../serialization/sharding/ShardStrategy.java | 35 ++ .../sharding/impl/PlaceholderShardStrategy.java | 44 ++ .../impl/ShardCounterSerializationImpl.java | 198 +++++++++ .../sharding/impl/ShardSerializationImpl.java | 200 +++++++++ .../sharding/impl/ShardStrategyImpl.java | 71 +++ .../serialization/transferlog/TransferLog.java | 84 ++++ .../transferlog/TransferLogSerialization.java | 62 +++ .../impl/TransferLogSerializationImpl.java | 165 +++++++ .../persistence/queue/guice/QueueModule.java | 20 +- .../queue/impl/QakkaQueueManager.java | 178 ++++++++ .../queue/impl/QueueManagerFactoryImpl.java | 4 +- .../queue/src/main/webapp/WEB-INF/web.xml | 51 +++ .../persistence/qakka/AbstractTest.java | 79 ++++ .../persistence/qakka/KeyspaceDropper.java | 70 +++ .../persistence/qakka/api/AbstractRestTest.java | 64 +++ .../persistence/qakka/api/PerformanceTest.java | 148 +++++++ .../qakka/api/QueueResourceTest.java | 418 +++++++++++++++++ .../qakka/common/CassandraClientTest.java | 46 ++ .../qakka/core/QueueMessageManagerTest.java | 272 ++++++++++++ .../distributed/QueueActorServiceTest.java | 175 ++++++++ .../actors/QueueActorHelperTest.java | 284 ++++++++++++ .../distributed/actors/QueueReaderTest.java | 111 +++++ .../distributed/actors/QueueTimeouterTest.java | 152 +++++++ .../distributed/actors/ShardAllocatorTest.java | 212 +++++++++ ...tiShardDatabaseQueueMessageIteratorTest.java | 127 ++++++ .../auditlogs/AuditLogSerializationTest.java | 102 +++++ .../DatabaseQueueMessageSerializationTest.java | 267 +++++++++++ .../queues/DatabaseQueueSerializationTest.java | 85 ++++ .../sharding/ShardCounterSerializationTest.java | 65 +++ .../sharding/ShardIteratorTest.java | 139 ++++++ .../sharding/ShardSerializationTest.java | 106 +++++ .../sharding/ShardStrategyTest.java | 71 +++ .../qakka/serialization/sharding/ShardTest.java | 53 +++ .../TransferLogSerializationTest.java | 133 ++++++ .../queue/LegacyQueueManagerTest.java | 117 +++-- .../queue/guice/TestQueueModule.java | 2 - .../queue/src/test/resources/cassandra.yaml | 53 +++ .../queue/src/test/resources/log4j.properties | 29 ++ .../queue/src/test/resources/qakka-duck.jpg | Bin 0 -> 11188 bytes .../queue/src/test/resources/qakka.properties | 50 +++ stack/pom.xml | 6 +- .../notifications/apns/APNsAdapter.java | 28 +- .../services/notifications/gcm/GCMAdapter.java | 11 +- 126 files changed, 12116 insertions(+), 101 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/core/pom.xml ---------------------------------------------------------------------- diff --git a/stack/core/pom.xml b/stack/core/pom.xml index 9f2dc88..7ee3b06 100644 --- a/stack/core/pom.xml +++ b/stack/core/pom.xml @@ -40,7 +40,6 @@ </plugins> </reporting> - <build> <resources> @@ -429,6 +428,7 @@ <groupId>org.apache.usergrid</groupId> <artifactId>queue</artifactId> <version>${project.version}</version> + <classifier>classes</classifier> <type>jar</type> </dependency> http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Receipt.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Receipt.java b/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Receipt.java index 1e145ac..e7c90b5 100644 --- a/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Receipt.java +++ b/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Receipt.java @@ -16,14 +16,15 @@ */ package org.apache.usergrid.persistence.entities; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.annotation.JsonSerialize; import org.apache.usergrid.persistence.TypedEntity; +import org.apache.usergrid.persistence.annotations.EntityProperty; import javax.xml.bind.annotation.XmlRootElement; import java.util.HashMap; import java.util.UUID; -import org.apache.usergrid.persistence.annotations.EntityProperty; -import org.mortbay.util.ajax.JSON; @XmlRootElement public class Receipt extends TypedEntity { @@ -31,6 +32,11 @@ public class Receipt extends TypedEntity { public static final String ENTITY_TYPE = "receipt"; public static final String NOTIFICATION_CONNECTION = "notification"; + private static ObjectMapper objectMapper = new ObjectMapper(); + static private final TypeReference<HashMap> hashMapTypeRef = new TypeReference<HashMap>() {}; + + + /** device id **/ @EntityProperty protected UUID deviceId; @@ -65,27 +71,28 @@ public class Receipt extends TypedEntity { public Receipt() { } - public Receipt(UUID notificationUUID, String notifierId, Object payload,UUID deviceId) { + public Receipt(UUID notificationUUID, String notifierId, Object payload, UUID deviceId) { this.notificationUUID = notificationUUID; this.notifierId = notifierId; HashMap receiptPayload; - if(! (payload instanceof HashMap) ){ - if(payload instanceof String){ + + if (!(payload instanceof HashMap)) { + if (payload instanceof String) { try { - receiptPayload = (HashMap) JSON.parse((String) payload); - }catch (Exception e){ + receiptPayload = (HashMap) objectMapper.readValue( (String)payload, hashMapTypeRef ); + } catch (Exception e) { receiptPayload = new HashMap<>(); - receiptPayload.put("payload", payload); + receiptPayload.put( "payload", payload ); } - }else { + } else { receiptPayload = new HashMap<>(); - receiptPayload.put("payload", payload); + receiptPayload.put( "payload", payload ); } - }else{ - receiptPayload = (HashMap)payload; + } else { + receiptPayload = (HashMap) payload; } this.payload = receiptPayload; - this.setDeviceId(deviceId); + this.setDeviceId( deviceId ); } @JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/RxTest.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/RxTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/RxTest.java index f7d52d6..d4fd33d 100644 --- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/RxTest.java +++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/RxTest.java @@ -20,18 +20,8 @@ package org.apache.usergrid.corepersistence.index; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -import org.apache.avro.generic.GenericData; import org.apache.usergrid.ExperimentalTest; -import org.junit.Ignore; import org.junit.Test; - import org.junit.experimental.categories.Category; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,6 +30,13 @@ import rx.Subscription; import rx.observables.ConnectableObservable; import rx.schedulers.Schedulers; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.Stream; + import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -61,8 +58,10 @@ public class RxTest { final CountDownLatch latch = new CountDownLatch( count+1 ); final Subscription connectedObservable = - Observable.range( 0, count ).doOnNext( integer -> latch.countDown() ).doOnCompleted( () -> latch.countDown() ).subscribeOn( Schedulers.io() ) - .subscribe(); + Observable.range( 0, count ) + .doOnNext( integer -> latch.countDown() ) + .doOnCompleted( () -> latch.countDown() ).subscribeOn( Schedulers.io() ) + .subscribe(); final boolean completed = latch.await( 3, TimeUnit.SECONDS ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queryindex/pom.xml ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/pom.xml b/stack/corepersistence/queryindex/pom.xml index 9b4eca4..a3f293d 100644 --- a/stack/corepersistence/queryindex/pom.xml +++ b/stack/corepersistence/queryindex/pom.xml @@ -97,6 +97,7 @@ <groupId>${project.parent.groupId}</groupId> <artifactId>queue</artifactId> <version>${project.version}</version> + <classifier>classes</classifier> <type>jar</type> </dependency> http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java index b828934..47399c7 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java @@ -23,7 +23,6 @@ import com.google.inject.TypeLiteral; import com.google.inject.multibindings.Multibinder; import org.apache.usergrid.persistence.core.migration.data.DataMigration; import org.apache.usergrid.persistence.core.migration.data.MigrationPlugin; -import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.index.*; import com.google.inject.AbstractModule; http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/pom.xml ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/pom.xml b/stack/corepersistence/queue/pom.xml index c81e71c..c74d49c 100644 --- a/stack/corepersistence/queue/pom.xml +++ b/stack/corepersistence/queue/pom.xml @@ -21,6 +21,7 @@ <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> <artifactId>persistence</artifactId> <groupId>org.apache.usergrid</groupId> @@ -30,6 +31,48 @@ <modelVersion>4.0.0</modelVersion> <artifactId>queue</artifactId> <name>Usergrid Queue</name> + <packaging>war</packaging> + + <properties> + <maven.compiler.source>1.8</maven.compiler.source> + <maven.compiler.target>1.8</maven.compiler.target> + <servlet.version>3.0.1</servlet.version> + <jersey.version>2.23.1</jersey.version> + <guice-bridge.version>2.4.0</guice-bridge.version> + </properties> + + + <build> + + <finalName>queue-${project.version}</finalName> + + <pluginManagement> + <plugins> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-war-plugin</artifactId> + <version>2.6</version> + <configuration> + <archiveClasses>true</archiveClasses> + <attachClasses>true</attachClasses> + </configuration> + </plugin> + + </plugins> + </pluginManagement> + + <testResources> + <testResource> + <directory>src/test/resources</directory> + <filtering>true</filtering> + <includes> + <include>**/**</include> + </includes> + </testResource> + </testResources> + + </build> <dependencies> @@ -90,6 +133,119 @@ <version>4.2</version> </dependency> + <dependency> + <groupId>org.apache.usergrid</groupId> + <artifactId>actorsystem</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.usergrid</groupId> + <artifactId>common</artifactId> + <version>${project.version}</version> + <exclusions> + <exclusion> + <artifactId>cassandra-all</artifactId> + <groupId>org.apache.cassandra</groupId> + </exclusion> + </exclusions> + </dependency> + + <!-- + <dependency> + <groupId>com.datastax.cassandra</groupId> + <artifactId>cassandra-driver-core</artifactId> + <version>2.1.9</version> + <exclusions> + <exclusion> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + <version>1.7.13</version> + </dependency> + + <dependency> + <groupId>com.fasterxml.uuid</groupId> + <artifactId>java-uuid-generator</artifactId> + <version>3.1.4</version> + </dependency> + + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-lang3</artifactId> + <version>3.3.2</version> + </dependency> + --> + + <!-- Java EE & Jersey --> + + <dependency> + <groupId>javax.servlet</groupId> + <artifactId>javax.servlet-api</artifactId> + <version>${servlet.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.glassfish.jersey.media</groupId> + <artifactId>jersey-media-json-jackson</artifactId> + <version>${jersey.version}</version> + </dependency> + + <dependency> + <groupId>org.glassfish.jersey.core</groupId> + <artifactId>jersey-client</artifactId> + <version>${jersey.version}</version> + </dependency> + + <dependency> + <groupId>org.glassfish.jersey.containers</groupId> + <artifactId>jersey-container-servlet</artifactId> + <version>${jersey.version}</version> + </dependency> + + <!-- added for Guice Jersey integration --> + <dependency> + <groupId>org.glassfish.hk2</groupId> + <artifactId>guice-bridge</artifactId> + <version>${guice-bridge.version}</version> + </dependency> + + <!-- added for Guice Jersey integration --> + <dependency> + <groupId>com.google.inject.extensions</groupId> + <artifactId>guice-servlet</artifactId> + <version>${guice.version}</version> + </dependency> + + <!-- added to enable logging from within Jersey --> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>jul-to-slf4j</artifactId> + <version>1.7.5</version> + </dependency> + + <!-- Testing --> + + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>4.12</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.glassfish.jersey.test-framework.providers</groupId> + <artifactId>jersey-test-framework-provider-jetty</artifactId> + <version>${jersey.version}</version> + </dependency> + </dependencies> <!-- http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/App.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/App.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/App.java new file mode 100644 index 0000000..9d9c972 --- /dev/null +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/App.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.qakka; + +import com.codahale.metrics.MetricRegistry; +import com.google.inject.Inject; +import com.google.inject.Injector; +import org.apache.usergrid.persistence.actorsystem.ActorSystemFig; +import org.apache.usergrid.persistence.actorsystem.ActorSystemManager; +import org.apache.usergrid.persistence.core.migration.schema.MigrationException; +import org.apache.usergrid.persistence.core.migration.schema.MigrationManager; +import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService; +import org.apache.usergrid.persistence.qakka.exceptions.QakkaRuntimeException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Akka queueing application + */ +public class App implements MetricsService { + private static final Logger logger = LoggerFactory.getLogger( App.class ); + + // TODO: can we avoid this kludge with better Akka-Guice integration? + static public Injector INJECTOR; + + private final ActorSystemFig actorSystemFig; + private final ActorSystemManager actorSystemManager; + private final DistributedQueueService distributedQueueService; + private final MetricRegistry metrics = new MetricRegistry(); + + + @Inject + public App( + Injector injector, + ActorSystemFig actorSystemFig, + ActorSystemManager actorSystemManager, + DistributedQueueService distributedQueueService, + MigrationManager migrationManager) { + + this.INJECTOR = injector; + this.actorSystemFig = actorSystemFig; + this.actorSystemManager = actorSystemManager; + this.distributedQueueService = distributedQueueService; + + try { + migrationManager.migrate(); + } catch (MigrationException e) { + throw new QakkaRuntimeException( "Error running migration", e ); + } + } + + /** + * Init Akka ActorSystems and wait for request actors to init. + */ + public void start() { + start( + actorSystemFig.getHostname(), + Integer.parseInt(actorSystemFig.getPort()), // TODO: make port an int in Actor System module + actorSystemFig.getRegionLocal()); + } + + /** + * For testing purposes only; does not wait for request actors to init. + */ + public void start( String h, Integer p, String r ) { + actorSystemManager.start( h, p, r ); + actorSystemManager.waitForClientActor(); + distributedQueueService.init(); + } + + + @Override + public MetricRegistry getMetricRegistry() { + return metrics; + } +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/MetricsService.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/MetricsService.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/MetricsService.java new file mode 100644 index 0000000..378ba0d --- /dev/null +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/MetricsService.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.qakka; + +import com.codahale.metrics.MetricRegistry; + + +public interface MetricsService { + + String SEND_TIME_TOTAL = "org.apache.usergrid.persistence.qakka.send.time.total"; + String SEND_TIME_SEND = "org.apache.usergrid.persistence.qakka.send.time.send"; + String SEND_TIME_WRITE = "org.apache.usergrid.persistence.qakka.send.time.write"; + String GET_TIME_TOTAL = "org.apache.usergrid.persistence.qakka.get.time.total"; + String GET_TIME_GET = "org.apache.usergrid.persistence.qakka.get.time.get"; + String ACK_TIME_TOTAL = "org.apache.usergrid.persistence.qakka.ack.time.total"; + String ACK_TIME_ACK = "org.apache.usergrid.persistence.qakka.ack.time.ack"; + String TIMEOUT_TIME = "org.apache.usergrid.persistence.qakka.timeout.time"; + String REFRESH_TIME = "org.apache.usergrid.persistence.qakka.timeout.time"; + String ALLOCATE_TIME = "org.apache.usergrid.persistence.qakka.allocate.time"; + + MetricRegistry getMetricRegistry(); +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java new file mode 100644 index 0000000..6f3df11 --- /dev/null +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.qakka; + +import org.safehaus.guicyfig.Default; +import org.safehaus.guicyfig.FigSingleton; +import org.safehaus.guicyfig.GuicyFig; +import org.safehaus.guicyfig.Key; + +import java.io.Serializable; + + +@FigSingleton +public interface QakkaFig extends GuicyFig, Serializable { + + String QUEUE_NUM_ACTORS = "queue.num.actors"; + + String QUEUE_SENDER_NUM_ACTORS = "queue.sender.num.actors"; + + String QUEUE_WRITER_NUM_ACTORS = "queue.writer.num.actors"; + + String QUEUE_TIMEOUT_SECONDS = "queue.timeout.seconds"; + + String QUEUE_REFRESH_MILLISECONDS = "queue.refresh.milliseconds"; + + String QUEUE_INMEMORY_SIZE = "queue.inmemory.size"; + + String QUEUE_SEND_MAX_RETRIES = "queue.send.max.retries"; + + String QUEUE_SEND_TIMEOUT = "queue.send.timeout.seconds"; + + String QUEUE_GET_MAX_RETRIES = "queue.get.max.retries"; + + String QUEUE_GET_TIMEOUT = "queue.get.timeout.seconds"; + + String QUEUE_MAX_SHARD_COUNTER = "queue.max.inmemory.shard.counter"; + + String QUEUE_SHARD_ALLOCATION_CHECK_FREQUENCY = "queue.shard.allocation.check.frequency.millis"; + + String QUEUE_SHARD_ALLOCATION_ADVANCE_TIME = "queue.shard.allocation.advance.time.millis"; + + String QUEUE_SHARD_MAX_SIZE = "queue.shard.max.size"; + + + /** Queue senders send to queue writers */ + @Key(QUEUE_SENDER_NUM_ACTORS) + @Default("200") + int getNumQueueSenderActors(); + + /** Queue writers write to Cassandra */ + @Key(QUEUE_WRITER_NUM_ACTORS) + @Default("500") + int getNumQueueWriterActors(); + + /** Queue actors handle get, ack and manage scheduled timeout and refersh tasks */ + @Key(QUEUE_NUM_ACTORS) + @Default("500") + int getNumQueueActors(); + + /** Time for queue messages to timeout, if not set per queue */ + @Key(QUEUE_TIMEOUT_SECONDS) + @Default("10") + int getQueueTimeoutSeconds(); + + /** How often to refresh each queue's in-memory data */ + @Key(QUEUE_REFRESH_MILLISECONDS) + @Default("500") + int getQueueRefreshMilliseconds(); + + /** How many queue messages to keep in-memory */ + @Key(QUEUE_INMEMORY_SIZE) + @Default("1000") + int getQueueInMemorySize(); + + /** Max number of times to retry call to queue actor for queue get operation */ + @Key(QUEUE_GET_MAX_RETRIES) + @Default("5") + int getMaxGetRetries(); + + /** How long to wait for response from queue actor before timing out and trying again */ + @Key(QUEUE_GET_TIMEOUT) + @Default("2") + int getGetTimeoutSeconds(); + + /** Max number of times to retry call to queue writer for queue send operation */ + @Key(QUEUE_SEND_MAX_RETRIES) + @Default("5") + int getMaxSendRetries(); + + /** How long to wait for response from queue writer before timing out and trying again */ + @Key(QUEUE_SEND_TIMEOUT) + @Default("2") + int getSendTimeoutSeconds(); + + /** Once counter reaches this value, write it to permanent storage */ + @Key(QUEUE_MAX_SHARD_COUNTER) + @Default("100") + long getMaxInMemoryShardCounter(); + + /** How often to check whether new shard is needed for each queue */ + @Key(QUEUE_SHARD_ALLOCATION_CHECK_FREQUENCY) + @Default("5000") + long getShardAllocationCheckFrequencyMillis(); + + /** New shards are created in advance of the time they will be used */ + @Key(QUEUE_SHARD_ALLOCATION_ADVANCE_TIME) + @Default("5000") + long getShardAllocationAdvanceTimeMillis(); + + /** Max size to allow for a shard */ + @Key(QUEUE_SHARD_MAX_SIZE) + @Default("400000") + long getMaxShardSize(); +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaModule.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaModule.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaModule.java new file mode 100644 index 0000000..b7c977c --- /dev/null +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaModule.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.qakka; + +import com.google.inject.AbstractModule; +import com.google.inject.Key; +import com.google.inject.multibindings.Multibinder; +import com.netflix.config.ConfigurationManager; +import org.apache.usergrid.persistence.actorsystem.ActorSystemModule; +import org.apache.usergrid.persistence.core.guice.CommonModule; +import org.apache.usergrid.persistence.core.migration.schema.Migration; +import org.apache.usergrid.persistence.qakka.api.URIStrategy; +import org.apache.usergrid.persistence.qakka.api.impl.URIStrategyLocalhost; +import org.apache.usergrid.persistence.qakka.core.*; +import org.apache.usergrid.persistence.qakka.core.impl.QueueManagerImpl; +import org.apache.usergrid.persistence.qakka.core.impl.QueueMessageManagerImpl; +import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService; +import org.apache.usergrid.persistence.qakka.distributed.actors.QueueActorHelper; +import org.apache.usergrid.persistence.qakka.distributed.impl.DistributedQueueServiceImpl; +import org.apache.usergrid.persistence.qakka.distributed.impl.QueueActorRouterProducer; +import org.apache.usergrid.persistence.qakka.distributed.impl.QueueSenderRouterProducer; +import org.apache.usergrid.persistence.qakka.distributed.impl.QueueWriterRouterProducer; +import org.apache.usergrid.persistence.qakka.serialization.auditlog.AuditLogSerialization; +import org.apache.usergrid.persistence.qakka.serialization.auditlog.impl.AuditLogSerializationImpl; +import org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization; +import org.apache.usergrid.persistence.qakka.serialization.queuemessages.impl.QueueMessageSerializationImpl; +import org.apache.usergrid.persistence.qakka.serialization.queues.QueueSerialization; +import org.apache.usergrid.persistence.qakka.serialization.queues.impl.QueueSerializationImpl; +import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardCounterSerialization; +import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardSerialization; +import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardStrategy; +import org.apache.usergrid.persistence.qakka.serialization.sharding.impl.ShardCounterSerializationImpl; +import org.apache.usergrid.persistence.qakka.serialization.sharding.impl.ShardSerializationImpl; +import org.apache.usergrid.persistence.qakka.serialization.sharding.impl.ShardStrategyImpl; +import org.apache.usergrid.persistence.qakka.serialization.transferlog.TransferLogSerialization; +import org.apache.usergrid.persistence.qakka.serialization.transferlog.impl.TransferLogSerializationImpl; +import org.safehaus.guicyfig.GuicyFigModule; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + + +public class QakkaModule extends AbstractModule { + private static final Logger logger = LoggerFactory.getLogger( QakkaModule.class ); + + static { + try { + // TODO: reconcile with usergrid props + // load properties from one properties file using Netflix Archaius so that GuicyFig will see them + ConfigurationManager.loadCascadedPropertiesFromResources( "qakka" ); + } catch (IOException e) { + throw new RuntimeException( "Cannot qakka.properties file", e ); + } + } + + @Override + protected void configure() { + + install( new CommonModule() ); + install( new ActorSystemModule() ); + install( new GuicyFigModule( QakkaFig.class ) ); + + bind( App.class ); + + bind( CassandraClient.class ).to( CassandraClientImpl.class ); + bind( MetricsService.class ).to( App.class ); + + bind( QueueManager.class ).to( QueueManagerImpl.class ); + bind( QueueSerialization.class ).to( QueueSerializationImpl.class ); + + bind( QueueMessageManager.class ).to( QueueMessageManagerImpl.class ); + bind( QueueMessageSerialization.class ).to( QueueMessageSerializationImpl.class ); + + bind( ShardSerialization.class ).to( ShardSerializationImpl.class ); + bind( ShardStrategy.class ).to( ShardStrategyImpl.class ); + + bind( ShardCounterSerialization.class ).to( ShardCounterSerializationImpl.class ); + + bind( TransferLogSerialization.class ).to( TransferLogSerializationImpl.class ); + bind( AuditLogSerialization.class ).to( AuditLogSerializationImpl.class ); + bind( DistributedQueueService.class ).to( DistributedQueueServiceImpl.class ); + + bind( QueueActorRouterProducer.class ); + bind( QueueWriterRouterProducer.class ); + bind( QueueSenderRouterProducer.class ); + bind( QueueActorHelper.class ); + + bind( Regions.class ); + bind( URIStrategy.class ).to( URIStrategyLocalhost.class ); + + Multibinder<Migration> migrationBinder = Multibinder.newSetBinder( binder(), Migration.class ); + + migrationBinder.addBinding().to( Key.get( AuditLogSerialization.class ) ); + //migrationBinder.addBinding().to( Key.get( MessageCounterSerialization.class ) ); + migrationBinder.addBinding().to( Key.get( QueueMessageSerialization.class ) ); + migrationBinder.addBinding().to( Key.get( QueueSerialization.class ) ); + migrationBinder.addBinding().to( Key.get( ShardCounterSerialization.class ) ); + migrationBinder.addBinding().to( Key.get( ShardSerialization.class ) ); + migrationBinder.addBinding().to( Key.get( TransferLogSerialization.class ) ); + + } +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/ApiResponse.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/ApiResponse.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/ApiResponse.java new file mode 100644 index 0000000..c2c0910 --- /dev/null +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/ApiResponse.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.qakka.api; + +import org.apache.usergrid.persistence.qakka.core.Queue; +import org.apache.usergrid.persistence.qakka.core.QueueMessage; + +import javax.xml.bind.annotation.XmlRootElement; +import java.util.Collection; + + +@XmlRootElement +public class ApiResponse { + + private String message; + private Integer count; + private Collection<Queue> queues; + private Collection<QueueMessage> queueMessages; + + public Collection<Queue> getQueues() { + return queues; + } + + public void setQueues(Collection<Queue> queues) { + this.queues = queues; + } + + public Collection<QueueMessage> getQueueMessages() { + return queueMessages; + } + + public void setQueueMessages(Collection<QueueMessage> queueMessages) { + this.queueMessages = queueMessages; + } + + public Integer getCount() { + return count; + } + + public void setCount(Integer count) { + this.count = count; + } + + public String getMessage() { + return message; + } + + public void setMessage(String message) { + this.message = message; + } +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/QueueResource.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/QueueResource.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/QueueResource.java new file mode 100644 index 0000000..f82661c --- /dev/null +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/QueueResource.java @@ -0,0 +1,443 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.qakka.api; + +import com.codahale.metrics.Timer; +import com.fasterxml.jackson.databind.util.ByteBufferBackedInputStream; +import com.google.common.base.Preconditions; +import com.google.common.io.ByteStreams; +import org.apache.usergrid.persistence.qakka.MetricsService; +import org.apache.usergrid.persistence.qakka.core.*; +import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardCounterSerialization; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.inject.Inject; +import javax.ws.rs.*; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.StreamingOutput; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.List; +import java.util.UUID; + + +@Path("queues") +public class QueueResource { + private static final Logger logger = LoggerFactory.getLogger( QueueResource.class ); + + private final QueueManager queueManager; + private final QueueMessageManager queueMessageManager; + private final MetricsService metricsService; + private final URIStrategy uriStrategy; + private final Regions regions; + private final ShardCounterSerialization shardCounterSerialization; + + + @Inject + public QueueResource( + QueueManager queueManager, + QueueMessageManager queueMessageManager, + MetricsService metricsService, + URIStrategy uriStrategy, + Regions regions, + ShardCounterSerialization shardCounterSerialization ) { + + this.queueManager = queueManager; + this.queueMessageManager = queueMessageManager; + this.metricsService = metricsService; + this.uriStrategy = uriStrategy; + this.regions = regions; + this.shardCounterSerialization = shardCounterSerialization; + } + + + @POST + @Consumes({MediaType.APPLICATION_JSON}) + @Produces({MediaType.APPLICATION_JSON}) + public Response createQueue(Queue queue ) throws Exception { + + Preconditions.checkArgument(queue != null, "Queue configuration is required"); + Preconditions.checkArgument(!QakkaUtils.isNullOrEmpty(queue.getName()), "Queue name is required"); + + queueManager.createQueue(queue); + + ApiResponse apiResponse = new ApiResponse(); + apiResponse.setQueues( Collections.singletonList(queue) ); + return Response.created( uriStrategy.queueURI( queue.getName() )).entity(apiResponse).build(); + } + + + @PUT + @Path( "{queueName}/config" ) + @Consumes({MediaType.APPLICATION_JSON}) + @Produces({MediaType.APPLICATION_JSON}) + public Response updateQueueConfig( @PathParam("queueName") String queueName, Queue queue) { + + Preconditions.checkArgument(!QakkaUtils.isNullOrEmpty(queueName), "Queue name is required"); + Preconditions.checkArgument(queue != null, "Queue configuration is required"); + + queue.setName(queueName); + queueManager.updateQueueConfig(queue); + + ApiResponse apiResponse = new ApiResponse(); + apiResponse.setQueues( Collections.singletonList(queue) ); + return Response.ok().entity(apiResponse).build(); + } + + + @DELETE + @Path( "{queueName}" ) + @Produces({MediaType.APPLICATION_JSON}) + public Response deleteQueue( @PathParam("queueName") String queueName, + @QueryParam("confirm") @DefaultValue("false") Boolean confirmedParam) { + + Preconditions.checkArgument(!QakkaUtils.isNullOrEmpty(queueName), "Queue name is required"); + Preconditions.checkArgument(confirmedParam != null, "Confirm parameter is required"); + + ApiResponse apiResponse = new ApiResponse(); + + if ( confirmedParam ) { + queueManager.deleteQueue( queueName ); + return Response.ok().entity( apiResponse ).build(); + } + + apiResponse.setMessage( "confirm parameter must be true" ); + return Response.status( Response.Status.BAD_REQUEST ).entity( apiResponse ).build(); + } + + + @GET + @Path( "{queueName}/config" ) + @Produces({MediaType.APPLICATION_JSON}) + public Response getQueueConfig( @PathParam("queueName") String queueName) { + + Preconditions.checkArgument(!QakkaUtils.isNullOrEmpty(queueName), "Queue name is required"); + + ApiResponse apiResponse = new ApiResponse(); + Queue queue = queueManager.getQueueConfig( queueName ); + if ( queue != null ) { + apiResponse.setQueues( Collections.singletonList(queue) ); + return Response.ok().entity(apiResponse).build(); + } + return Response.status( Response.Status.NOT_FOUND ).build(); + } + + + @GET + @Produces({MediaType.APPLICATION_JSON}) + public List<String> getListOfQueues() { + + // TODO: create design to handle large number of queues, e.g. paging and/or hierarchy of queues + + // TODO: create design to support multi-tenant usage, authentication, etc. + + return queueManager.getListOfQueues(); + } + + + @GET + @Path( "{queueName}/stats" ) + @Produces({MediaType.APPLICATION_JSON}) + public Response getQueueStats( @PathParam("queueName") String queueName) throws Exception { + // TODO: implement GET /queues/{queueName}/stats + throw new UnsupportedOperationException(); + } + + + Long convertDelayParameter(String delayParam) { + Long delayMs = 0L; + if (!QakkaUtils.isNullOrEmpty(delayParam)) { + switch (delayParam.toUpperCase()) { + case "NONE": + case "": + delayMs = 0L; + break; + default: + try { + delayMs = Long.parseLong(delayParam); + } catch (Exception e) { + throw new IllegalArgumentException("Invalid delay parameter"); + } + break; + } + } + return delayMs; + } + + Long convertExpirationParameter(String expirationParam) throws IllegalArgumentException { + Long expirationSecs = null; + if (!QakkaUtils.isNullOrEmpty(expirationParam)) { + switch (expirationParam.toUpperCase()) { + case "NEVER": + case "": + expirationSecs = null; + break; + default: + try { + expirationSecs = Long.parseLong(expirationParam); + } catch (Exception e) { + throw new IllegalArgumentException("Invalid expiration parameter"); + } + break; + } + } + return expirationSecs; + } + + + /** + * Send a queue message with a JSON payload. + * + * @param queueName Name of queue to target (queue must exist) + * @param regionsParam Comma-separated list of regions to send to + * @param delayParam Delay (ms) before sending message (not yet supported) + * @param expirationParam Time (ms) after which message will expire (not yet supported) + * @param messageBody JSON payload in string form + */ + @POST + @Path( "{queueName}/messages" ) + @Consumes(MediaType.APPLICATION_JSON) + @Produces(MediaType.APPLICATION_JSON) + public Response sendMessageJson( + @PathParam("queueName") String queueName, + @QueryParam("regions" ) @DefaultValue("") String regionsParam, + @QueryParam("delay") @DefaultValue("") String delayParam, + @QueryParam("expiration") @DefaultValue("") String expirationParam, + String messageBody) throws Exception { + + return sendMessage( queueName, regionsParam, delayParam, expirationParam, + MediaType.APPLICATION_JSON, ByteBuffer.wrap( messageBody.getBytes() ) ); + } + + + /** + * Send a queue message with a binary data payload. + * + * @param queueName Name of queue to target (queue must exist) + * @param regionsParam Comma-separated list of regions to send to + * @param delayParam Delay (ms) before sending message (not yet supported) + * @param expirationParam Time (ms) after which message will expire (not yet supported) + * @param actualContentType Content type of messageBody data (if not application/octet-stream) + * @param messageBody Binary data that is the payload of the queue message + */ + @POST + @Path( "{queueName}/messages" ) + @Consumes(MediaType.APPLICATION_OCTET_STREAM) + @Produces(MediaType.APPLICATION_JSON) + public Response sendMessageBinary( + @PathParam("queueName") String queueName, + @QueryParam("regions" ) @DefaultValue("") String regionsParam, + @QueryParam("delay") @DefaultValue("") String delayParam, + @QueryParam("expiration") @DefaultValue("") String expirationParam, + @QueryParam("contentType") String actualContentType, + byte[] messageBody) throws Exception { + + String contentType = actualContentType != null ? actualContentType : MediaType.APPLICATION_OCTET_STREAM; + + return sendMessage( queueName, regionsParam, delayParam, expirationParam, + contentType, ByteBuffer.wrap( messageBody ) ); + } + + + private Response sendMessage( String queueName, + String regionsParam, + String delayParam, + String expirationParam, + String contentType, + ByteBuffer byteBuffer) { + + Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.SEND_TIME_TOTAL ).time(); + try { + + Preconditions.checkArgument( !QakkaUtils.isNullOrEmpty( queueName ), "Queue name is required" ); + + // if regions, delay or expiration are empty string, would get the defaults from the queue + if (regionsParam.equals( "" )) { + regionsParam = Regions.LOCAL; + } + + Long delayMs = convertDelayParameter( delayParam ); + + Long expirationSecs = convertExpirationParameter( expirationParam ); + + List<String> regionList = regions.getRegions( regionsParam ); + + queueMessageManager.sendMessages( queueName, regionList, delayMs, expirationSecs, + contentType, byteBuffer ); + + ApiResponse apiResponse = new ApiResponse(); + apiResponse.setCount( 1 ); + return Response.ok().entity( apiResponse ).build(); + + } finally { + timer.close(); + } + } + + + @GET + @Path( "{queueName}/messages" ) + @Produces({MediaType.APPLICATION_JSON}) + public Response getNextMessages( @PathParam("queueName") String queueName, + @QueryParam("count") @DefaultValue("1") String countParam) throws Exception { + + Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.GET_TIME_TOTAL ).time(); + try { + + Preconditions.checkArgument( !QakkaUtils.isNullOrEmpty( queueName ), "Queue name is required" ); + + int count = 1; + try { + count = Integer.parseInt( countParam ); + } catch (Exception e) { + throw new IllegalArgumentException( "Invalid count parameter" ); + } + if (count <= 0) { + // invalid count + throw new IllegalArgumentException( "Count must be >= 1" ); + } + + List<QueueMessage> messages = queueMessageManager.getNextMessages( queueName, count ); + + ApiResponse apiResponse = new ApiResponse(); + + if (messages != null && !messages.isEmpty()) { + apiResponse.setQueueMessages( messages ); + + } else { // always return queueMessages field + apiResponse.setQueueMessages( Collections.EMPTY_LIST ); + } + apiResponse.setCount( apiResponse.getQueueMessages().size() ); + return Response.ok().entity( apiResponse ).build(); + + } finally { + timer.close(); + } + } + + + @DELETE + @Path( "{queueName}/messages/{queueMessageId}" ) + @Produces({MediaType.APPLICATION_JSON}) + public Response ackMessage( @PathParam("queueName") String queueName, + @PathParam("queueMessageId") String queueMessageId) throws Exception { + + Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.ACK_TIME_TOTAL ).time(); + try { + + Preconditions.checkArgument( !QakkaUtils.isNullOrEmpty( queueName ), "Queue name is required" ); + + UUID messageUuid; + try { + messageUuid = UUID.fromString( queueMessageId ); + } catch (Exception e) { + throw new IllegalArgumentException( "Invalid queue message UUID" ); + } + queueMessageManager.ackMessage( queueName, messageUuid ); + + ApiResponse apiResponse = new ApiResponse(); + return Response.ok().entity( apiResponse ).build(); + + } finally { + timer.close(); + } + } + + + @GET + @Path( "{queueName}/data/{queueMessageId}" ) + public Response getMessageData( + @PathParam("queueName") String queueName, + @PathParam("queueMessageId") String queueMessageIdParam ) { + + Preconditions.checkArgument(!QakkaUtils.isNullOrEmpty(queueName), "Queue name is required"); + + UUID queueMessageId; + try { + queueMessageId = UUID.fromString(queueMessageIdParam); + } + catch (Exception e) { + throw new IllegalArgumentException("Invalid queue message UUID"); + } + + QueueMessage message = queueMessageManager.getMessage( queueName, queueMessageId ); + if ( message == null ) { + throw new NotFoundException( + "Message not found for queueName: " + queueName + " queue message id: " + queueMessageId ); + } + + ByteBuffer messageData = queueMessageManager.getMessageData( message.getMessageId() ); + if ( messageData == null ) { + throw new NotFoundException( "Message data not found queueName: " + queueName + + " queue message id: " + queueMessageId + " message id: " + message.getMessageId() ); + } + + ByteBufferBackedInputStream input = new ByteBufferBackedInputStream( messageData ); + + StreamingOutput stream = output -> { + try { + ByteStreams.copy(input, output); + } catch (Exception e) { + throw new WebApplicationException(e); + } + }; + + return Response.ok( stream ).header( "Content-Type", message.getContentType() ).build(); + } + + +// @PUT +// @Path( "{queueName}/messages/{queueMessageId}" ) +// @Produces({MediaType.APPLICATION_JSON}) +// public Response requeueMessage( @PathParam("queueName") String queueName, +// @PathParam("queueMessageId") String queueMessageIdParam, +// @QueryParam("delay") @DefaultValue("") String delayParam) throws Exception { +// +// Preconditions.checkArgument(!QakkaUtils.isNullOrEmpty(queueName), "Queue name is required"); +// +// UUID queueMessageId; +// try { +// queueMessageId = UUID.fromString(queueMessageIdParam); +// } +// catch (Exception e) { +// throw new IllegalArgumentException("Invalid message UUID"); +// } +// Long delayMs = convertDelayParameter(delayParam); +// +// queueMessageManager.requeueMessage(queueName, queueMessageId, delayMs); +// +// ApiResponse apiResponse = new ApiResponse(); +// return Response.ok().entity(apiResponse).build(); +// } +// +// +// @DELETE +// @Path( "{queueName}/messages" ) +// @Produces({MediaType.APPLICATION_JSON}) +// public Response clearMessages( @PathParam("queueName") String queueName, +// @QueryParam("confirm") @DefaultValue("false") Boolean confirmed) throws Exception { +// +// // TODO: implement DELETE /queues/{queueName}/messages" +// throw new UnsupportedOperationException(); +// } + +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/StatusResource.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/StatusResource.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/StatusResource.java new file mode 100644 index 0000000..1d1c836 --- /dev/null +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/StatusResource.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.qakka.api; + +import com.codahale.metrics.Timer; +import com.google.inject.servlet.RequestScoped; +import org.apache.usergrid.persistence.qakka.App; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.inject.Inject; +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.core.MediaType; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.text.DecimalFormat; +import java.util.HashMap; +import java.util.SortedSet; + + +@Path("status") +@RequestScoped +public class StatusResource { + private static final Logger logger = LoggerFactory.getLogger( StatusResource.class ); + + private App app; + + @Inject + public StatusResource( App app ) { + this.app = app; + } + + @GET + @Produces( MediaType.APPLICATION_JSON ) + public Object status() { + + final DecimalFormat format = new DecimalFormat("##.###"); + final long nano = 1000000000; + + return new HashMap<String, Object>() {{ + put( "name", "Qakka" ); + try { + put( "host", InetAddress.getLocalHost().getHostName() ); + } catch (UnknownHostException e) { + put( "host", "unknown" ); + } + SortedSet<String> names = app.getMetricRegistry().getNames(); + for (String name : names) { + Timer t = app.getMetricRegistry().timer( name ); + put( name, new HashMap<String, Object>() {{ + put( "count", "" + t.getCount() ); + put( "mean_rate", "" + format.format( t.getMeanRate() ) ); + put( "one_minute_rate", "" + format.format( t.getOneMinuteRate() ) ); + put( "five_minute_rate", "" + format.format( t.getFiveMinuteRate() ) ); + put( "mean", "" + format.format( t.getSnapshot().getMean() / nano ) ); + put( "min", "" + format.format( (double) t.getSnapshot().getMin() / nano ) ); + put( "max", "" + format.format( (double) t.getSnapshot().getMax() / nano ) ); + }} ); + } + }}; + + } +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/URIStrategy.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/URIStrategy.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/URIStrategy.java new file mode 100644 index 0000000..2f0b65c --- /dev/null +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/URIStrategy.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.usergrid.persistence.qakka.api; + + +import java.net.URI; +import java.net.URISyntaxException; +import java.util.UUID; + +public interface URIStrategy { + + URI queueURI(String queueName) throws URISyntaxException; + + URI queueMessageDataURI(String queueName, UUID queueMessageId) throws URISyntaxException; +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/impl/BadRequestMapper.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/impl/BadRequestMapper.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/impl/BadRequestMapper.java new file mode 100644 index 0000000..88d6394 --- /dev/null +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/impl/BadRequestMapper.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.qakka.api.impl; + +import org.apache.usergrid.persistence.qakka.api.ApiResponse; +import org.apache.usergrid.persistence.qakka.exceptions.BadRequestException; + +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import javax.ws.rs.ext.ExceptionMapper; +import javax.ws.rs.ext.Provider; + + +@Provider +public class BadRequestMapper implements ExceptionMapper<BadRequestException> { + + public Response toResponse( BadRequestException ex ) { + + ApiResponse apiResponse = new ApiResponse(); + apiResponse.setMessage( ex.getMessage() ); + + return Response.status(400).entity( apiResponse ).type( MediaType.APPLICATION_JSON ).build(); + } + +} + http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/impl/GuiceFeature.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/impl/GuiceFeature.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/impl/GuiceFeature.java new file mode 100644 index 0000000..a3485d0 --- /dev/null +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/impl/GuiceFeature.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.usergrid.persistence.qakka.api.impl; + +import org.glassfish.hk2.api.ServiceLocator; +import org.glassfish.jersey.ServiceLocatorProvider; +import org.jvnet.hk2.guice.bridge.api.GuiceBridge; +import org.jvnet.hk2.guice.bridge.api.GuiceIntoHK2Bridge; + +import javax.ws.rs.core.Feature; +import javax.ws.rs.core.FeatureContext; +import javax.ws.rs.ext.Provider; + + +@Provider +public class GuiceFeature implements Feature { + + @Override + public boolean configure(FeatureContext context) { + + ServiceLocator serviceLocator = ServiceLocatorProvider.getServiceLocator( context ); + GuiceBridge.getGuiceBridge().initializeGuiceBridge( serviceLocator ); + + GuiceIntoHK2Bridge guiceBridge = serviceLocator.getService( GuiceIntoHK2Bridge.class ); + guiceBridge.bridgeGuiceInjector( StartupListener.INJECTOR ); + + return true; + } +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/impl/JacksonProvider.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/impl/JacksonProvider.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/impl/JacksonProvider.java new file mode 100644 index 0000000..b1400ca --- /dev/null +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/impl/JacksonProvider.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.qakka.api.impl; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; + +import javax.ws.rs.ext.ContextResolver; +import javax.ws.rs.ext.Provider; + + +@Provider +public class JacksonProvider implements ContextResolver<ObjectMapper> { + final ObjectMapper mapper; + + public JacksonProvider() { + mapper = new ObjectMapper(); + mapper.enable( SerializationFeature.INDENT_OUTPUT ); + mapper.setSerializationInclusion( JsonInclude.Include.NON_NULL ); + } + + @Override + public ObjectMapper getContext(final Class<?> type) { + return mapper; + } +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/impl/JerseyResourceConfig.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/impl/JerseyResourceConfig.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/impl/JerseyResourceConfig.java new file mode 100644 index 0000000..86cfe05 --- /dev/null +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/impl/JerseyResourceConfig.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.usergrid.persistence.qakka.api.impl; + +import org.glassfish.jersey.server.ResourceConfig; + + +public class JerseyResourceConfig extends ResourceConfig { + + public JerseyResourceConfig() { + packages( "org.apache.usergrid.persistence.qakka.api" ); + } +} + + http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/impl/NotFoundMapper.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/impl/NotFoundMapper.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/impl/NotFoundMapper.java new file mode 100644 index 0000000..2725766 --- /dev/null +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/impl/NotFoundMapper.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.qakka.api.impl; + +import org.apache.usergrid.persistence.qakka.api.ApiResponse; +import org.apache.usergrid.persistence.qakka.exceptions.NotFoundException; + +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import javax.ws.rs.ext.ExceptionMapper; +import javax.ws.rs.ext.Provider; + + +@Provider +public class NotFoundMapper implements ExceptionMapper<NotFoundException> { + + public Response toResponse( NotFoundException ex ) { + + ApiResponse apiResponse = new ApiResponse(); + apiResponse.setMessage( ex.getMessage() ); + + return Response.status(404).entity( apiResponse ).type( MediaType.APPLICATION_JSON ).build(); + } + +} + http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/impl/StartupListener.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/impl/StartupListener.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/impl/StartupListener.java new file mode 100644 index 0000000..d97363a --- /dev/null +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/impl/StartupListener.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.qakka.api.impl; + +import com.google.inject.Guice; +import com.google.inject.Injector; +import com.google.inject.servlet.GuiceServletContextListener; +import com.google.inject.servlet.ServletModule; +import org.apache.usergrid.persistence.qakka.App; +import org.apache.usergrid.persistence.qakka.QakkaModule; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.bridge.SLF4JBridgeHandler; + +import java.util.logging.LogManager; + + +public class StartupListener extends GuiceServletContextListener { + private static final Logger logger = LoggerFactory.getLogger( StartupListener.class ); + + public static App APP = null; + + public static Injector INJECTOR = null; + + static { + + try { + LogManager.getLogManager().reset(); + SLF4JBridgeHandler.install(); + + INJECTOR = Guice.createInjector( new ServletModule() { + @Override + protected void configureServlets() { + install( new QakkaModule() ); + } + } ); + + APP = INJECTOR.getInstance( App.class ); + + APP.start(); + + } catch ( Throwable t ) { + logger.error("Error static initializing StartupListener class", t); + } + + } + + @Override + protected Injector getInjector() { + return INJECTOR; + } +} + http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/impl/URIStrategyLocalhost.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/impl/URIStrategyLocalhost.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/impl/URIStrategyLocalhost.java new file mode 100644 index 0000000..6f8dbb2 --- /dev/null +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/impl/URIStrategyLocalhost.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.usergrid.persistence.qakka.api.impl; + +import com.google.inject.Inject; +import org.apache.usergrid.persistence.actorsystem.ActorSystemFig; +import org.apache.usergrid.persistence.qakka.api.URIStrategy; + +import java.net.URI; +import java.net.URISyntaxException; +import java.util.UUID; + + +/** TODO: implement a "real" URI strategy */ +public class URIStrategyLocalhost implements URIStrategy { + + final private String hostname; + + @Inject + public URIStrategyLocalhost( ActorSystemFig actorSystemFig ) { + this.hostname = actorSystemFig.getHostname(); + } + + @Override + public URI queueURI(String queueName) throws URISyntaxException { + return new URI("http://" + hostname + ":8080/api/queues/" + queueName); + } + + @Override + public URI queueMessageDataURI(String queueName, UUID queueMessageId) throws URISyntaxException { + return new URI("http://" + hostname + ":8080/api/queues/" + queueName + "/data/" + queueMessageId ); + } +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/CassandraClient.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/CassandraClient.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/CassandraClient.java new file mode 100644 index 0000000..9f40b51 --- /dev/null +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/CassandraClient.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.qakka.core; + +import com.datastax.driver.core.Session; + +/** + * Created by Dave Johnson ([email protected]) on 9/9/16. + */ +public interface CassandraClient { + Session getSession(); +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/CassandraClientImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/CassandraClientImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/CassandraClientImpl.java new file mode 100644 index 0000000..ed665c2 --- /dev/null +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/CassandraClientImpl.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.qakka.core; + +import com.datastax.driver.core.Session; +import com.google.inject.Inject; +import com.google.inject.Singleton; +import org.apache.usergrid.persistence.core.datastax.DataStaxCluster; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +@Singleton +public class CassandraClientImpl implements CassandraClient { + private static final Logger logger = LoggerFactory.getLogger( CassandraClientImpl.class ); + + private final Session session; + + @Inject + public CassandraClientImpl( DataStaxCluster dataStaxCluster) { + + logger.info("Constructing Cassandra client"); + + this.session = dataStaxCluster.getApplicationSession(); + } + + + @Override + public Session getSession() { + return session; + } +}
