http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/core/src/main/java/org/apache/usergrid/persistence/GuiceAdapterBeanFactory.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/GuiceAdapterBeanFactory.java b/stack/core/src/main/java/org/apache/usergrid/persistence/GuiceAdapterBeanFactory.java new file mode 100644 index 0000000..8bda3b0 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/persistence/GuiceAdapterBeanFactory.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence; + + +import org.springframework.beans.BeansException; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.support.DefaultListableBeanFactory; +import org.springframework.context.annotation.Bean; +import org.springframework.stereotype.Component; + +import com.google.inject.Injector; + + +/** + * This allows us to search guice for beans that are not in spring + */ +@Component +public class GuiceAdapterBeanFactory extends DefaultListableBeanFactory { + + /** + * Wire our injector into this so we can use it go get beans + */ + @Autowired + private Injector injector; + + + public <T> T getBean( Class<T> requiredType ) throws BeansException { + final T bean = super.getBean( requiredType ); + + // Comes from spring, return it + if ( bean != null ) { + return bean; + } + + final T guiceBean = injector.getInstance( requiredType ); + + if(guiceBean == null){ + throw new NoGuiceBean( "Could not find bean for class" + requiredType ); + } + + + return guiceBean; + } + + + @Override + public Object getBean( final String name ) throws BeansException { + final Object springBean = super.getBean( name ); + + return validateBean( springBean, name ); + } + + + @Override + public <T> T getBean( final String name, final Class<T> requiredType ) throws BeansException { + final T springBean = super.getBean( name, requiredType ); + + return validateBean( springBean, name ); + } + + + @Override + public <T> T getBean( final String name, final Class<T> requiredType, final Object... args ) throws BeansException { + final T springBean = super.getBean( name, requiredType, args ); + + return validateBean( springBean, name ); + } + + + /** + * If we can't find the spring bean, we should blow up + * @param springBean + * @param <T> + * @return + */ + private <T> T validateBean( T springBean, final String name ) { + if ( springBean == null ) { + throw new NoGuiceBean( String.format("Guice beans by name is unsupoported, and could not find a spring bean with the name '%s'", name) ); + } + + return springBean; + } + + + /** + * Exception class to throw when we can't find a bean in spring, and can't find it in guice + */ + public static final class NoGuiceBean extends BeansException{ + + public NoGuiceBean( final String msg ) { + super( msg ); + } + } +} + +
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/core/src/main/java/org/apache/usergrid/persistence/PersistenceModule.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/PersistenceModule.java b/stack/core/src/main/java/org/apache/usergrid/persistence/PersistenceModule.java new file mode 100644 index 0000000..6e58676 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/persistence/PersistenceModule.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence; + + +import java.io.IOException; +import java.net.URL; +import java.nio.charset.Charset; +import java.util.Collection; +import java.util.Properties; + +import org.springframework.context.ApplicationContext; + +import org.apache.usergrid.locking.cassandra.HectorLockManagerImpl; + +import com.google.common.base.Preconditions; +import com.google.common.io.CharSource; +import com.google.common.io.Resources; +import com.google.inject.AbstractModule; +import com.google.inject.Inject; +import com.google.inject.Provides; +import com.google.inject.Singleton; +import com.google.inject.name.Named; +import com.google.inject.name.Names; +import com.google.inject.spring.SpringIntegration; + +import me.prettyprint.cassandra.connection.RoundRobinBalancingPolicy; +import me.prettyprint.cassandra.service.CassandraHostConfigurator; +import me.prettyprint.cassandra.service.ThriftCluster; +import me.prettyprint.hector.api.Cluster; + + +/** + * Replacement for configuration of our spring modules with guice + */ +public class PersistenceModule extends AbstractModule { + + + + private final ApplicationContext applicationContext; + + public PersistenceModule( final ApplicationContext applicationContext ) { + + this.applicationContext = applicationContext; + } + + + + + @Override + protected void configure() { + SpringIntegration.bindAll( binder(), applicationContext ); + } + + + +// <bean id="cassandraCluster" class="me.prettyprint.cassandra.service.ThriftCluster"> +// <constructor-arg value="${cassandra.cluster}" /> +// <constructor-arg ref="cassandraHostConfigurator" /> +// </bean> +// @Provides +// @Singleton +// @Inject +// public Cluster configureThrift( @Named( "cassandra.cluster" ) final String cassCluster, +// @Named( "cassandra.connections" ) final int cassandraConnections ){ +// +// final int setSize = cassandraConnections == 0 ? 50: cassandraConnections; +// +// CassandraHostConfigurator hostConfigurator = new CassandraHostConfigurator( cassCluster ); +// +// hostConfigurator.setMaxActive( setSize ); +// hostConfigurator.setLoadBalancingPolicy( new RoundRobinBalancingPolicy() ); +// +// +// ThriftCluster thriftCluster = new ThriftCluster(cassCluster, hostConfigurator); +// +// return thriftCluster; +// +// } +// +// +// @Provides +// @Singleton +// @Inject +// public Properties configureProps(final PropertiesProvider propertiesProvider ){ +// +// final Properties props = new Properties( ); +// +// for(final String propFile: propertiesProvider.getPropertiesFiles()){ +// +// final URL url = Resources.getResource( propFile ); +// +// Preconditions.checkNotNull( url, "Could not find properties file '" + propFile + "' on the classpath" ); +// +// +// final CharSource propsInput = Resources.asCharSource( url, Charset.defaultCharset() ); +// try { +// props.load( propsInput.openStream() ); +// } +// catch ( IOException e ) { +// throw new RuntimeException( "Unable to load properties file '" + propFile + "'", e ); +// } +// } +// +// //bind these properties +// Names.bindProperties( binder(), props ); +// +// return props; +// } +// +// @Provides +// @Singleton +// @Inject +// public void configureLocks(final Cluster hectorCluster, @Named("cassandra.lock.keyspace") final String lockKeyspace, @Named("cassandra.lock.keyspace") final String writeCl, final String readCl ){ +// +// +// final HectorLockManagerImpl hectorLockManager = new HectorLockManagerImpl(); +// +// +//// +//// <bean name="consistencyLevelPolicy" class="me.prettyprint.cassandra.model.ConfigurableConsistencyLevel"> +//// <property name="defaultReadConsistencyLevel" value="${cassandra.readcl}"/> +//// <property name="defaultWriteConsistencyLevel" value="${cassandra.writecl}"/> +//// </bean> +// +//// <bean name="lockManager" class="org.apache.usergrid.locking.cassandra.HectorLockManagerImpl" > +//// <property name="cluster" ref="cassandraCluster"/> +//// <property name="keyspaceName" value="${cassandra.lock.keyspace}"/> +//// <property name="consistencyLevelPolicy" ref="consistencyLevelPolicy"/> +//// </bean> +// +// } +// +// +// /** +// * Interface to allow users to provide and inject properties +// */ +// public interface PropertiesProvider{ +// /** +// * Get the properties files to load +// * @return +// */ +// String[] getPropertiesFiles(); +// } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/core/src/test/java/org/apache/usergrid/corepersistence/TestIndexModule.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/TestIndexModule.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/TestIndexModule.java new file mode 100644 index 0000000..e5254d1 --- /dev/null +++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/TestIndexModule.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.usergrid.corepersistence; + + +import org.apache.usergrid.persistence.core.guice.TestModule; + + +public class TestIndexModule extends TestModule { + + @Override + protected void configure() { + + //this will break, we need to untagle this and move to guice in core completely + install( new CoreModule( )); + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/BufferQueueSQSImplTest.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/BufferQueueSQSImplTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/BufferQueueSQSImplTest.java new file mode 100644 index 0000000..a76a589 --- /dev/null +++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/BufferQueueSQSImplTest.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.index; + + +import java.util.*; +import java.util.concurrent.TimeUnit; + +import org.apache.usergrid.corepersistence.TestIndexModule; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl; +import org.apache.usergrid.persistence.index.SearchEdge; +import org.apache.usergrid.persistence.index.impl.DeIndexRequest; +import org.apache.usergrid.persistence.index.impl.EsRunner; +import org.apache.usergrid.persistence.index.impl.IndexOperationMessage; +import org.apache.usergrid.persistence.index.impl.IndexRequest; +import org.apache.usergrid.persistence.index.impl.SearchEdgeImpl; +import org.apache.usergrid.persistence.model.entity.SimpleId; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; + +import org.apache.usergrid.persistence.core.guice.MigrationManagerRule; +import org.apache.usergrid.persistence.core.metrics.MetricsFactory; +import org.apache.usergrid.persistence.core.test.UseModules; +import org.apache.usergrid.persistence.map.MapManagerFactory; +import org.apache.usergrid.persistence.core.aws.NoAWSCredsRule; +import org.apache.usergrid.persistence.queue.QueueManagerFactory; +import org.apache.usergrid.persistence.queue.impl.UsergridAwsCredentialsProvider; + +import com.google.inject.Inject; + +import net.jcip.annotations.NotThreadSafe; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assume.assumeTrue; + + +@RunWith(EsRunner.class) +@UseModules({ TestIndexModule.class }) +@NotThreadSafe +public class BufferQueueSQSImplTest { + + + @Inject + @Rule + public MigrationManagerRule migrationManagerRule; + + + @Rule + public NoAWSCredsRule noAwsCredsRule = new NoAWSCredsRule(); + + @Inject + public QueueManagerFactory queueManagerFactory; + + @Inject + public QueryFig queryFig; + + @Inject + public MapManagerFactory mapManagerFactory; + + @Inject + public MetricsFactory metricsFactory; + + + private BufferQueueSQSImpl bufferQueueSQS; + + @Before + public void setup(){ + bufferQueueSQS = new BufferQueueSQSImpl( queueManagerFactory, queryFig, mapManagerFactory, metricsFactory ); + } + + + + + @Test + public void testMessageIndexing(){ + + ApplicationScope applicationScope = new ApplicationScopeImpl(new SimpleId(UUID.randomUUID(),"application")); + final UsergridAwsCredentialsProvider ugProvider = new UsergridAwsCredentialsProvider(); + assumeTrue( ugProvider.getCredentials().getAWSAccessKeyId() != null ); + assumeTrue( ugProvider.getCredentials().getAWSSecretKey() != null ); + + final Map<String, Object> request1Data = new HashMap<String, Object>() {{put("test", "testval1");}}; + final IndexRequest indexRequest1 = new IndexRequest( "testAlias1", "testDoc1",request1Data ); + + + final Map<String, Object> request2Data = new HashMap<String, Object>() {{put("test", "testval2");}}; + final IndexRequest indexRequest2 = new IndexRequest( "testAlias2", "testDoc2",request2Data ); + + + //de-index request + final DeIndexRequest + deIndexRequest1 = new DeIndexRequest( new String[]{"index1.1, index1.2"}, applicationScope, new SearchEdgeImpl(new SimpleId("testId3"),"name3", + + + SearchEdge.NodeType.SOURCE ), new SimpleId("id3"), UUID.randomUUID() ); + + final DeIndexRequest deIndexRequest2 = new DeIndexRequest( new String[]{"index2.1", "index2.1"}, applicationScope, new SearchEdgeImpl(new SimpleId("testId4"),"name4", + SearchEdge.NodeType.SOURCE ), new SimpleId("id4"), UUID.randomUUID() ); + + + + + IndexOperationMessage indexOperationMessage = new IndexOperationMessage(); + indexOperationMessage.addIndexRequest( indexRequest1); + indexOperationMessage.addIndexRequest( indexRequest2); + + indexOperationMessage.addDeIndexRequest( deIndexRequest1 ); + indexOperationMessage.addDeIndexRequest( deIndexRequest2 ); + + bufferQueueSQS.offer( indexOperationMessage ); + + //wait for it to send to SQS + indexOperationMessage.getFuture().get(); + + //now get it back + + final List<IndexOperationMessage> ops = getResults( 20, TimeUnit.SECONDS ); + + assertTrue(ops.size() > 0); + + final IndexOperationMessage returnedOperation = ops.get( 0 ); + + //get the operations out + + final Set<IndexRequest> indexRequestSet = returnedOperation.getIndexRequests(); + + assertTrue(indexRequestSet.contains(indexRequest1)); + assertTrue(indexRequestSet.contains(indexRequest2)); + + + final Set<DeIndexRequest> deIndexRequests = returnedOperation.getDeIndexRequests(); + + assertTrue( deIndexRequests.contains( deIndexRequest1 ) ); + assertTrue( deIndexRequests.contains( deIndexRequest2 ) ); + + + + //now ack the message + + bufferQueueSQS.ack( ops ); + + } + + private List<IndexOperationMessage> getResults(final long timeout, final TimeUnit timeUnit){ + final long endTime = System.currentTimeMillis() + timeUnit.toMillis( timeout ); + + List<IndexOperationMessage> ops; + + do{ + ops = bufferQueueSQS.take( 10, 20, TimeUnit.SECONDS ); + }while((ops == null || ops.size() == 0 ) && System.currentTimeMillis() < endTime); + + return ops; + } + + + + +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/aws/NoAWSCredsRule.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/aws/NoAWSCredsRule.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/aws/NoAWSCredsRule.java new file mode 100644 index 0000000..1b0f538 --- /dev/null +++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/aws/NoAWSCredsRule.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.core.aws; + + +import org.junit.Assume; +import org.junit.internal.runners.model.MultipleFailureException; +import org.junit.rules.TestRule; +import org.junit.runner.Description; +import org.junit.runners.model.Statement; + +import com.amazonaws.AmazonClientException; + + +/** + * Created in an attempt to mark no aws cred tests as ignored. Blocked by this issue + * https://github.com/junit-team/junit/issues/116 + * + * Until then, simply marks as passed, which is a bit dangerous + */ +public class NoAWSCredsRule implements TestRule { + + public Statement apply( final Statement base, final Description description ) { + return new Statement() { + @Override + public void evaluate() throws Throwable { + + try { + base.evaluate(); + } + catch ( Throwable t ) { + + if ( !isMissingCredsException( t ) ) { + throw t; + } + + //do this so our test gets marked as ignored. Not pretty, but it works + Assume.assumeTrue( false ); + + + } + } + }; + } + + + private boolean isMissingCredsException( final Throwable t ) { + + if ( t instanceof AmazonClientException ) { + + final AmazonClientException ace = ( AmazonClientException ) t; + + if ( ace.getMessage().contains( "could not get aws access key" ) || ace.getMessage().contains( + "could not get aws secret key from system properties" ) ) { + //swallow + return true; + } + } + + /** + * Handle the multiple failure junit trace + */ + if( t instanceof MultipleFailureException ){ + for(final Throwable failure : ((MultipleFailureException)t).getFailures()){ + final boolean isMissingCreds = isMissingCredsException( failure ); + + if(isMissingCreds){ + return true; + } + } + } + final Throwable cause = t.getCause(); + + if ( cause == null ) { + return false; + } + + + return isMissingCredsException( cause ); + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java index 2206953..fa73991 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java @@ -28,6 +28,29 @@ import rx.Observable; * Get all edges from source */ public interface EdgesObservable { + + /** + * Return an observable of all edges from a source + * @param gm + * @param sourceNode + * @return + */ Observable<Edge> edgesFromSource( final GraphManager gm, final Id sourceNode); + + /** + * Get all edges from the source node with the target type + * @param gm + * @param sourceNode + * @param targetType + * @return + */ + Observable<Edge> getEdgesFromSource(final GraphManager gm, final Id sourceNode, final String targetType ); + + /** + * Return an observable of all edges to a target + * @param gm + * @param targetNode + * @return + */ Observable<Edge> edgesToTarget(final GraphManager gm, final Id targetNode); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java index 371cf1d..2264cbd 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java @@ -19,62 +19,88 @@ */ package org.apache.usergrid.persistence.graph.serialization.impl; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.usergrid.persistence.graph.Edge; import org.apache.usergrid.persistence.graph.GraphManager; import org.apache.usergrid.persistence.graph.SearchByEdgeType; import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType; +import org.apache.usergrid.persistence.graph.impl.SimpleSearchByIdType; import org.apache.usergrid.persistence.graph.impl.SimpleSearchEdgeType; import org.apache.usergrid.persistence.graph.serialization.EdgesObservable; import org.apache.usergrid.persistence.model.entity.Id; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; + import rx.Observable; import rx.functions.Func1; + /** * Emits the edges that are edges from the specified source node */ public class EdgesObservableImpl implements EdgesObservable { - private static final Logger logger = LoggerFactory.getLogger(EdgesObservableImpl.class); - public EdgesObservableImpl(){ + private static final Logger logger = LoggerFactory.getLogger( EdgesObservableImpl.class ); + + + public EdgesObservableImpl() { } + /** * Get all edges from the source */ @Override - public Observable<Edge> edgesFromSource( final GraphManager gm, final Id sourceNode){ - Observable<String> edgeTypes = gm.getEdgeTypesFromSource( new SimpleSearchEdgeType( sourceNode, null, null ) ); + public Observable<Edge> edgesFromSource( final GraphManager gm, final Id sourceNode ) { + final Observable<String> edgeTypes = + gm.getEdgeTypesFromSource( new SimpleSearchEdgeType( sourceNode, null, null ) ); - return edgeTypes.flatMap( new Func1<String, Observable<Edge>>() { - @Override - public Observable<Edge> call( final String edgeType ) { + return edgeTypes.flatMap( edgeType -> { logger.debug( "Loading edges of edgeType {} from {}", edgeType, sourceNode ); - return gm.loadEdgesFromSource( new SimpleSearchByEdgeType( sourceNode, edgeType, Long.MAX_VALUE, - SearchByEdgeType.Order.DESCENDING, null ) ); - } + return gm.loadEdgesFromSource( + new SimpleSearchByEdgeType( sourceNode, edgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, + null ) ); } ); } + + + @Override + public Observable<Edge> getEdgesFromSource( final GraphManager gm, final Id sourceNode, final String targetType ) { + + final Observable<String> edgeTypes = + gm.getEdgeTypesFromSource( new SimpleSearchEdgeType( sourceNode, null, null ) ); + + + return edgeTypes.flatMap( edgeType -> { + + logger.debug( "Loading edges of edgeType {} from {}", edgeType, sourceNode ); + + return gm.loadEdgesFromSourceByType( + new SimpleSearchByIdType( sourceNode, edgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, + targetType, null ) ); + } ); + } + + /** * Get all edges from the source */ @Override - public Observable<Edge> edgesToTarget(final GraphManager gm, final Id targetNode) { - Observable<String> edgeTypes = gm.getEdgeTypesToTarget( new SimpleSearchEdgeType( targetNode, null, null ) ); + public Observable<Edge> edgesToTarget( final GraphManager gm, final Id targetNode ) { + final Observable<String> edgeTypes = + gm.getEdgeTypesToTarget( new SimpleSearchEdgeType( targetNode, null, null ) ); - return edgeTypes.flatMap( new Func1<String, Observable<Edge>>() { - @Override - public Observable<Edge> call( final String edgeType ) { + return edgeTypes.flatMap( edgeType -> { - logger.debug( "Loading edges of edgeType {} to {}", edgeType, targetNode); + logger.debug( "Loading edges of edgeType {} to {}", edgeType, targetNode ); - return gm.loadEdgesToTarget( new SimpleSearchByEdgeType( targetNode, edgeType, Long.MAX_VALUE, - SearchByEdgeType.Order.DESCENDING, null ) ); - } + return gm.loadEdgesToTarget( + new SimpleSearchByEdgeType( targetNode, edgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, + null ) ); } ); } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java index da05d39..39c686c 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java @@ -21,6 +21,7 @@ package org.apache.usergrid.persistence.index;/* import java.util.UUID; import org.apache.usergrid.persistence.core.future.BetterFuture; +import org.apache.usergrid.persistence.index.impl.IndexOperationMessage; import org.apache.usergrid.persistence.model.entity.Entity; import org.apache.usergrid.persistence.model.entity.Id; @@ -65,7 +66,7 @@ public interface EntityIndexBatch { * Execute the batch * @return future to guarantee execution */ - BetterFuture execute(); + BetterFuture<IndexOperationMessage> execute(); /** * Get the number of operations in the batch http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java index 8b9a03e..d13d055 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java @@ -30,84 +30,52 @@ import org.apache.usergrid.persistence.index.impl.EsProvider; @FigSingleton public interface IndexFig extends GuicyFig { - public static final String ELASTICSEARCH_HOSTS = "elasticsearch.hosts"; + String ELASTICSEARCH_HOSTS = "elasticsearch.hosts"; - public static final String ELASTICSEARCH_PORT = "elasticsearch.port"; + String ELASTICSEARCH_PORT = "elasticsearch.port"; - public static final String ELASTICSEARCH_CLUSTER_NAME = "elasticsearch.cluster_name"; + String ELASTICSEARCH_CLUSTER_NAME = "elasticsearch.cluster_name"; - public static final String ELASTICSEARCH_NODENAME = "elasticsearch.node_name"; + String ELASTICSEARCH_NODENAME = "elasticsearch.node_name"; - public static final String ELASTICSEARCH_INDEX_PREFIX = "elasticsearch.index_prefix"; + String ELASTICSEARCH_INDEX_PREFIX = "elasticsearch.index_prefix"; - public static final String ELASTICSEARCH_ALIAS_POSTFIX = "elasticsearch.alias_postfix"; + String ELASTICSEARCH_ALIAS_POSTFIX = "elasticsearch.alias_postfix"; - public static final String ELASTICSEARCH_STARTUP = "elasticsearch.startup"; + String ELASTICSEARCH_STARTUP = "elasticsearch.startup"; - public static final String ELASTICSEARCH_NUMBER_OF_SHARDS = "elasticsearch.number_shards"; + String ELASTICSEARCH_NUMBER_OF_SHARDS = "elasticsearch.number_shards"; - public static final String ELASTICSEARCH_NUMBER_OF_REPLICAS = "elasticsearch.number_replicas"; + String ELASTICSEARCH_NUMBER_OF_REPLICAS = "elasticsearch.number_replicas"; - public static final String QUERY_CURSOR_TIMEOUT_MINUTES = "elasticsearch.cursor_timeout.minutes"; + String QUERY_CURSOR_TIMEOUT_MINUTES = "elasticsearch.cursor_timeout.minutes"; - public static final String ELASTICSEARCH_FORCE_REFRESH = "elasticsearch.force_refresh"; + String ELASTICSEARCH_FORCE_REFRESH = "elasticsearch.force_refresh"; - public static final String INDEX_BUFFER_SIZE = "elasticsearch.buffer_size"; + String INDEX_BUFFER_SIZE = "elasticsearch.buffer_size"; - public static final String INDEX_QUEUE_SIZE = "elasticsearch.queue_size"; + String INDEX_BUFFER_TIMEOUT = "elasticsearch.buffer_timeout"; - public static final String INDEX_BUFFER_TIMEOUT = "elasticsearch.buffer_timeout"; + String INDEX_BATCH_SIZE = "elasticsearch.batch_size"; - /** - * Amount of time to wait when reading from the queue - */ - public static final String INDEX_QUEUE_READ_TIMEOUT = "elasticsearch.queue_read_timeout"; - - /** - * Amount of time to wait when reading from the queue in milliseconds - */ - public static final String INDEX_QUEUE_TRANSACTION_TIMEOUT = "elasticsearch.queue_transaction_timeout"; - - public static final String INDEX_BATCH_SIZE = "elasticsearch.batch_size"; - - public static final String INDEX_WRITE_CONSISTENCY_LEVEL = "elasticsearch.write_consistency_level"; + String INDEX_WRITE_CONSISTENCY_LEVEL = "elasticsearch.write_consistency_level"; /** * the number of times we can fail before we refresh the client */ - public static final String ELASTICSEARCH_FAIL_REFRESH = "elasticsearch.fail_refresh"; - - /** - * Amount of time in milliseconds to wait when ES rejects our request before retrying. Provides simple - * backpressure - */ - public static final String FAILURE_REJECTED_RETRY_WAIT_TIME = "elasticsearch.rejected_retry_wait"; + String ELASTICSEARCH_FAIL_REFRESH = "elasticsearch.fail_refresh"; - /** - * The number of worker threads to consume from the queue - */ - public static final String ELASTICSEARCH_WORKER_COUNT = "elasticsearch.worker_count"; - /** - * The queue implementation to use. Values come from <class>QueueProvider.Implementations</class> - */ - public static final String ELASTICSEARCH_QUEUE_IMPL = "elasticsearch.queue_impl"; - /** - * The queue implementation to use. Values come from <class>QueueProvider.Implementations</class> - */ - public static final String ELASTICSEARCH_QUEUE_OFFER_TIMEOUT = "elasticsearch.queue.offer_timeout"; - - - public static final String QUERY_LIMIT_DEFAULT = "index.query.limit.default"; + String QUERY_LIMIT_DEFAULT = "index.query.limit.default"; /** * The client type to use. Valid values are NODE or TRANSPORT */ - public static final String ELASTICSEARCH_CLIENT_TYPE = "elasticsearch.client.type"; + String ELASTICSEARCH_CLIENT_TYPE = "elasticsearch.client.type"; @Default( "127.0.0.1" ) @@ -145,20 +113,20 @@ public interface IndexFig extends GuicyFig { @Default( "false" ) @Key( ELASTICSEARCH_FORCE_REFRESH ) - public boolean isForcedRefresh(); + boolean isForcedRefresh(); /** Identify the client node with a unique name. */ @Default( "default" ) @Key( ELASTICSEARCH_NODENAME ) - public String getNodeName(); + String getNodeName(); @Default( "6" ) @Key( ELASTICSEARCH_NUMBER_OF_SHARDS ) - public int getNumberOfShards(); + int getNumberOfShards(); @Default( "1" ) @Key( ELASTICSEARCH_NUMBER_OF_REPLICAS ) - public int getNumberOfReplicas(); + int getNumberOfReplicas(); @Default( "20" ) @Key( ELASTICSEARCH_FAIL_REFRESH ) @@ -181,12 +149,7 @@ public interface IndexFig extends GuicyFig { @Key( INDEX_BUFFER_SIZE ) int getIndexBufferSize(); - /** - * size of the buffer to build up before you send results - */ - @Default( "1000" ) - @Key( INDEX_QUEUE_SIZE ) - int getIndexQueueSize(); + /** * Request batch size for ES @@ -199,27 +162,6 @@ public interface IndexFig extends GuicyFig { @Key( INDEX_WRITE_CONSISTENCY_LEVEL ) String getWriteConsistencyLevel(); - @Default( "1000" ) - @Key( FAILURE_REJECTED_RETRY_WAIT_TIME ) - long getFailureRetryTime(); - - //give us 60 seconds to process the message - @Default( "60" ) - @Key( INDEX_QUEUE_READ_TIMEOUT ) - int getIndexQueueTimeout(); - - @Default( "2" ) - @Key( ELASTICSEARCH_WORKER_COUNT ) - int getWorkerCount(); - - @Default( "LOCAL" ) - @Key( ELASTICSEARCH_QUEUE_IMPL ) - String getQueueImplementation(); - - @Default( "1000" ) - @Key( ELASTICSEARCH_QUEUE_OFFER_TIMEOUT ) - long getQueueOfferTimeout(); - /** * Return the type of client. Valid values or NODE or TRANSPORT * @return http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java index 10cbde0..b6a1f09 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java @@ -52,15 +52,12 @@ public abstract class IndexModule extends AbstractModule { bind(EntityIndex.class).to(EsEntityIndexImpl.class).asEagerSingleton(); bind(IndexCache.class).to(EsIndexCacheImpl.class); bind(IndexRefreshCommand.class).to(IndexRefreshCommandImpl.class); - bind(FailureMonitorImpl.IndexIdentifier.class).to(IndexIdentifierImpl.class); + bind(IndexIdentifier.class).to(IndexIdentifierImpl.class); - bind(IndexBufferProducer.class).to(EsIndexBufferProducerImpl.class); bind(IndexBufferConsumer.class).to(EsIndexBufferConsumerImpl.class).asEagerSingleton(); - bind( BufferQueue.class).toProvider( QueueProvider.class ); - //wire up the edg migration. A no-op ATM, but retained for future development Multibinder<DataMigration<ApplicationScope>> dataMigrationMultibinder = Multibinder.newSetBinder( binder(), new TypeLiteral<DataMigration<ApplicationScope>>() {}, IndexMigration.class ); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/QueueProvider.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/QueueProvider.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/QueueProvider.java deleted file mode 100644 index ea3e046..0000000 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/QueueProvider.java +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.persistence.index.guice; - - -import org.apache.usergrid.persistence.core.metrics.MetricsFactory; -import org.apache.usergrid.persistence.index.IndexFig; -import org.apache.usergrid.persistence.index.impl.BufferQueue; -import org.apache.usergrid.persistence.index.impl.BufferQueueInMemoryImpl; -import org.apache.usergrid.persistence.index.impl.BufferQueueSQSImpl; -import org.apache.usergrid.persistence.map.MapManagerFactory; -import org.apache.usergrid.persistence.queue.QueueManagerFactory; - -import com.google.inject.Inject; -import com.google.inject.Provider; -import com.google.inject.Singleton; - - -/** - * A provider to allow users to configure their queue impl via properties - */ -@Singleton -public class QueueProvider implements Provider<BufferQueue> { - - private final IndexFig indexFig; - - private final QueueManagerFactory queueManagerFactory; - private final MapManagerFactory mapManagerFactory; - private final MetricsFactory metricsFactory; - - private BufferQueue bufferQueue; - - - @Inject - public QueueProvider( final IndexFig indexFig, final QueueManagerFactory queueManagerFactory, - final MapManagerFactory mapManagerFactory, final MetricsFactory metricsFactory ) { - this.indexFig = indexFig; - - - this.queueManagerFactory = queueManagerFactory; - this.mapManagerFactory = mapManagerFactory; - this.metricsFactory = metricsFactory; - } - - - @Override - @Singleton - public BufferQueue get() { - if ( bufferQueue == null ) { - bufferQueue = getQueue(); - } - - - return bufferQueue; - } - - - private BufferQueue getQueue() { - final String value = indexFig.getQueueImplementation(); - - final Implementations impl = Implementations.valueOf( value ); - - switch ( impl ) { - case LOCAL: - return new BufferQueueInMemoryImpl( indexFig ); - case SQS: - return new BufferQueueSQSImpl( queueManagerFactory, indexFig, mapManagerFactory, metricsFactory ); - default: - throw new IllegalArgumentException( "Configuration value of " + getErrorValues() + " are allowed" ); - } - } - - - private String getErrorValues() { - String values = ""; - - for ( final Implementations impl : Implementations.values() ) { - values += impl + ", "; - } - - values = values.substring( 0, values.length() - 2 ); - - return values; - } - - - /** - * Different implementations - */ - public static enum Implementations { - LOCAL, - SQS; - - - public String asString() { - return toString(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueue.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueue.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueue.java deleted file mode 100644 index cb89f64..0000000 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueue.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.persistence.index.impl; - - -import java.util.List; -import java.util.concurrent.TimeUnit; - - -/** - * A temporary interface of our buffer Q to decouple of producer and consumer; - */ -public interface BufferQueue { - - /** - * Offer the indexoperation message. Some queues may support not returning the future until ack or fail. - * Other queues may return the future after ack on the offer. See the implementation documentation for details. - * @param operation - */ - public void offer(final IndexIdentifierImpl.IndexOperationMessage operation); - - - /** - * Perform a take, potentially blocking until up to takesize is available, or timeout has elapsed. - * May return less than the take size, but will never return null - * - * @param takeSize - * @param timeout - * @param timeUnit - * @return A null safe lid - */ - public List<IndexIdentifierImpl.IndexOperationMessage> take(final int takeSize, final long timeout, final TimeUnit timeUnit ); - - - /** - * Ack all messages so they do not appear again. Meant for transactional queues, and may or may not be implemented. - * This will set the future as done in in memory operations - * - * @param messages - */ - public void ack(final List<IndexIdentifierImpl.IndexOperationMessage> messages); - - /** - * Mark these message as failed. Set the exception in the future on local operation - * - * @param messages - */ - public void fail(final List<IndexIdentifierImpl.IndexOperationMessage> messages, final Throwable t); -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemoryImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemoryImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemoryImpl.java deleted file mode 100644 index bfaed3d..0000000 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemoryImpl.java +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.persistence.index.impl; - - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.TimeUnit; - -import org.apache.usergrid.persistence.core.future.BetterFuture; -import org.apache.usergrid.persistence.index.IndexFig; - -import com.google.inject.Inject; -import com.google.inject.Singleton; - - -@Singleton -public class BufferQueueInMemoryImpl implements BufferQueue { - - - private final IndexFig fig; - private final ArrayBlockingQueue<IndexIdentifierImpl.IndexOperationMessage> messages; - - - @Inject - public BufferQueueInMemoryImpl( final IndexFig fig ) { - this.fig = fig; - messages = new ArrayBlockingQueue<>( fig.getIndexQueueSize() ); - } - - - @Override - public void offer( final IndexIdentifierImpl.IndexOperationMessage operation ) { - try { - messages.offer( operation, fig.getQueueOfferTimeout(), TimeUnit.MILLISECONDS ); - } - catch ( InterruptedException e ) { - throw new RuntimeException("Unable to offer message to queue", e); - } - } - - - @Override - public List<IndexIdentifierImpl.IndexOperationMessage> take( final int takeSize, final long timeout, final TimeUnit timeUnit ) { - - final List<IndexIdentifierImpl.IndexOperationMessage> response = new ArrayList<>( takeSize ); - try { - - - messages.drainTo( response, takeSize ); - - //we got something, go process it - if ( response.size() > 0 ) { - return response; - } - - - final IndexIdentifierImpl.IndexOperationMessage polled = messages.poll( timeout, timeUnit ); - - if ( polled != null ) { - response.add( polled ); - - //try to add more - messages.drainTo( response, takeSize - 1 ); - } - } - catch ( InterruptedException e ) { - //swallow - } - - - return response; - } - - - @Override - public void ack( final List<IndexIdentifierImpl.IndexOperationMessage> messages ) { - //if we have a future ack it - for ( final IndexIdentifierImpl.IndexOperationMessage op : messages ) { - op.done(); - } - } - - - @Override - public void fail( final List<IndexIdentifierImpl.IndexOperationMessage> messages, final Throwable t ) { - - - for ( final IndexIdentifierImpl.IndexOperationMessage op : messages ) { - final BetterFuture<IndexIdentifierImpl.IndexOperationMessage> future = op.getFuture(); - - if ( future != null ) { - future.setError( t ); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImpl.java deleted file mode 100644 index fee828f..0000000 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImpl.java +++ /dev/null @@ -1,306 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.persistence.index.impl; - - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.TimeUnit; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.usergrid.persistence.core.metrics.MetricsFactory; -import org.apache.usergrid.persistence.index.IndexFig; -import org.apache.usergrid.persistence.map.MapManager; -import org.apache.usergrid.persistence.map.MapManagerFactory; -import org.apache.usergrid.persistence.map.MapScope; -import org.apache.usergrid.persistence.map.impl.MapScopeImpl; -import org.apache.usergrid.persistence.model.entity.SimpleId; -import org.apache.usergrid.persistence.model.util.UUIDGenerator; -import org.apache.usergrid.persistence.queue.QueueManager; -import org.apache.usergrid.persistence.queue.QueueManagerFactory; -import org.apache.usergrid.persistence.queue.QueueMessage; -import org.apache.usergrid.persistence.queue.QueueScope; -import org.apache.usergrid.persistence.queue.impl.QueueScopeImpl; - -import com.codahale.metrics.Meter; -import com.codahale.metrics.Timer; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.dataformat.smile.SmileFactory; -import com.google.inject.Inject; -import com.google.inject.Singleton; - - -/** - * This is experimental at best. Our SQS size limit is a problem. We shouldn't use this for index operation. Only for - * performing - */ -@Singleton -public class BufferQueueSQSImpl implements BufferQueue { - - private static final Logger logger = LoggerFactory.getLogger( BufferQueueSQSImpl.class ); - - /** Hacky, copied from CPEntityManager b/c we can't access it here */ - public static final UUID MANAGEMENT_APPLICATION_ID = UUID.fromString( "b6768a08-b5d5-11e3-a495-11ddb1de66c8" ); - - - /** - * Set our TTL to 1 month. This is high, but in the event of a bug, we want these entries to get removed - */ - public static final int TTL = 60 * 60 * 24 * 30; - - /** - * The name to put in the map - */ - public static final String MAP_NAME = "esqueuedata"; - - - private static final String QUEUE_NAME = "es_queue"; - - private static SmileFactory SMILE_FACTORY = new SmileFactory(); - - - static { - SMILE_FACTORY.delegateToTextual( true ); - } - - - private final QueueManager queue; - private final MapManager mapManager; - private final IndexFig indexFig; - private final ObjectMapper mapper; - private final Meter readMeter; - private final Timer readTimer; - private final Meter writeMeter; - private final Timer writeTimer; - - - @Inject - public BufferQueueSQSImpl( final QueueManagerFactory queueManagerFactory, final IndexFig indexFig, - final MapManagerFactory mapManagerFactory, final MetricsFactory metricsFactory ) { - final QueueScope queueScope = - new QueueScopeImpl( QUEUE_NAME ); - - this.queue = queueManagerFactory.getQueueManager( queueScope ); - this.indexFig = indexFig; - - final MapScope scope = new MapScopeImpl( new SimpleId( MANAGEMENT_APPLICATION_ID, "application" ), MAP_NAME ); - - this.mapManager = mapManagerFactory.createMapManager( scope ); - - - this.writeTimer = metricsFactory.getTimer( BufferQueueSQSImpl.class, "write.timer" ); - this.writeMeter = metricsFactory.getMeter( BufferQueueSQSImpl.class, "write.meter" ); - - this.readTimer = metricsFactory.getTimer( BufferQueueSQSImpl.class, "read.timer" ); - this.readMeter = metricsFactory.getMeter( BufferQueueSQSImpl.class, "read.meter" ); - - this.mapper = new ObjectMapper( SMILE_FACTORY ); - //pretty print, disabling for speed - // mapper.enable(SerializationFeature.INDENT_OUTPUT); - - } - - - @Override - public void offer( final IndexIdentifierImpl.IndexOperationMessage operation ) { - - //no op - if(operation.isEmpty()){ - operation.getFuture().done(); - return; - } - - final Timer.Context timer = this.writeTimer.time(); - this.writeMeter.mark(); - - final UUID identifier = UUIDGenerator.newTimeUUID(); - - try { - - final String payLoad = toString( operation ); - - //write to cassandra - this.mapManager.putString( identifier.toString(), payLoad, TTL ); - - //signal to SQS - this.queue.sendMessage( identifier ); - operation.done(); - } - catch ( IOException e ) { - throw new RuntimeException( "Unable to queue message", e ); - } - finally { - timer.stop(); - } - } - - - @Override - public List<IndexIdentifierImpl.IndexOperationMessage> take( final int takeSize, final long timeout, final TimeUnit timeUnit ) { - - //SQS doesn't support more than 10 - - final int actualTake = Math.min( 10, takeSize ); - - final Timer.Context timer = this.readTimer.time(); - - try { - - List<QueueMessage> messages = queue - .getMessages( actualTake, indexFig.getIndexQueueTimeout(), ( int ) timeUnit.toMillis( timeout ), - String.class ); - - - - final List<IndexIdentifierImpl.IndexOperationMessage> response = new ArrayList<>( messages.size() ); - - final List<String> mapEntries = new ArrayList<>( messages.size() ); - - - if(messages.size() == 0){ - return response; - } - - //add all our keys for a single round trip - for ( final QueueMessage message : messages ) { - mapEntries.add( message.getBody().toString() ); - } - - //look up the values - final Map<String, String> storedCommands = mapManager.getStrings( mapEntries ); - - - //load them into our response - for ( final QueueMessage message : messages ) { - - final String key = getMessageKey( message ); - - //now see if the key was there - final String payload = storedCommands.get( key ); - - //the entry was not present in cassandra, ignore this message. Failure should eventually kick it to - // a DLQ - - if ( payload == null ) { - continue; - } - - final IndexIdentifierImpl.IndexOperationMessage messageBody; - - try { - messageBody = fromString( payload ); - } - catch ( IOException e ) { - logger.error( "Unable to deserialize message from string. This is a bug", e ); - throw new RuntimeException( "Unable to deserialize message from string. This is a bug", e ); - } - - SqsIndexOperationMessage operation = new SqsIndexOperationMessage( message, messageBody ); - - response.add( operation ); - } - - readMeter.mark( response.size() ); - return response; - } - //stop our timer - finally { - timer.stop(); - } - } - - - @Override - public void ack( final List<IndexIdentifierImpl.IndexOperationMessage> messages ) { - - //nothing to do - if ( messages.size() == 0 ) { - return; - } - - List<QueueMessage> toAck = new ArrayList<>( messages.size() ); - - for ( IndexIdentifierImpl.IndexOperationMessage ioe : messages ) { - - - final SqsIndexOperationMessage sqsIndexOperationMessage = ( SqsIndexOperationMessage ) ioe; - - final String key = getMessageKey( sqsIndexOperationMessage.getMessage() ); - - //remove it from the map - mapManager.delete( key ); - - toAck.add( ( ( SqsIndexOperationMessage ) ioe ).getMessage() ); - } - - queue.commitMessages( toAck ); - } - - - @Override - public void fail( final List<IndexIdentifierImpl.IndexOperationMessage> messages, final Throwable t ) { - //no op, just let it retry after the queue timeout - } - - - /** Read the object from Base64 string. */ - private IndexIdentifierImpl.IndexOperationMessage fromString( String s ) throws IOException { - IndexIdentifierImpl.IndexOperationMessage o = mapper.readValue( s, IndexIdentifierImpl.IndexOperationMessage.class ); - return o; - } - - - /** Write the object to a Base64 string. */ - private String toString( IndexIdentifierImpl.IndexOperationMessage o ) throws IOException { - return mapper.writeValueAsString( o ); - } - - private String getMessageKey(final QueueMessage message){ - return message.getBody().toString(); - } - - /** - * The message that subclasses our IndexOperationMessage. holds a pointer to the original message - */ - public class SqsIndexOperationMessage extends IndexIdentifierImpl.IndexOperationMessage { - - private final QueueMessage message; - - - public SqsIndexOperationMessage( final QueueMessage message, final IndexIdentifierImpl.IndexOperationMessage source ) { - this.message = message; - this.addAllDeIndexRequest( source.getDeIndexRequests() ); - this.addAllIndexRequest( source.getIndexRequests() ); - } - - - /** - * Get the message from our queue - */ - public QueueMessage getMessage() { - return message; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java index b509ad9..05c84b9 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java @@ -23,7 +23,6 @@ package org.apache.usergrid.persistence.index.impl; import java.io.Serializable; import java.util.ArrayList; import java.util.List; -import java.util.UUID; import java.util.concurrent.TimeUnit; import org.elasticsearch.action.ActionListener; @@ -61,14 +60,11 @@ import org.apache.usergrid.persistence.map.MapManager; import org.apache.usergrid.persistence.map.MapManagerFactory; import org.apache.usergrid.persistence.map.MapScope; import org.apache.usergrid.persistence.map.impl.MapScopeImpl; -import org.apache.usergrid.persistence.model.entity.Id; -import org.apache.usergrid.persistence.model.entity.SimpleId; import com.codahale.metrics.Meter; import com.codahale.metrics.Timer; import com.google.common.base.Preconditions; import com.google.inject.Inject; -import com.google.inject.assistedinject.Assisted; import rx.Observable; @@ -85,12 +81,12 @@ public class EsApplicationEntityIndexImpl implements ApplicationEntityIndex { private final ApplicationScope applicationScope; - private final FailureMonitorImpl.IndexIdentifier indexIdentifier; + private final IndexIdentifier indexIdentifier; private final Timer searchTimer; private final Timer cursorTimer; private final MapManager mapManager; private final AliasedEntityIndex entityIndex; - private final IndexBufferProducer indexBatchBufferProducer; + private final IndexBufferConsumer indexBatchBufferProducer; private final IndexFig indexFig; private final EsProvider esProvider; private final IndexAlias alias; @@ -103,11 +99,11 @@ public class EsApplicationEntityIndexImpl implements ApplicationEntityIndex { @Inject public EsApplicationEntityIndexImpl( ApplicationScope appScope, final AliasedEntityIndex entityIndex, - final IndexFig config, final IndexBufferProducer indexBatchBufferProducer, + final IndexFig config, final IndexBufferConsumer indexBatchBufferProducer, final EsProvider provider, final MetricsFactory metricsFactory, final MapManagerFactory mapManagerFactory, final IndexFig indexFig, - final FailureMonitorImpl.IndexIdentifier indexIdentifier ) { + final IndexIdentifier indexIdentifier ) { this.entityIndex = entityIndex; this.indexBatchBufferProducer = indexBatchBufferProducer; this.indexFig = indexFig; http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java index d22f000..32a0f02 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java @@ -44,18 +44,18 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch { private final ApplicationScope applicationScope; private final IndexAlias alias; - private final FailureMonitorImpl.IndexIdentifier indexIdentifier; + private final IndexIdentifier indexIdentifier; - private final IndexBufferProducer indexBatchBufferProducer; + private final IndexBufferConsumer indexBatchBufferProducer; private final AliasedEntityIndex entityIndex; - private IndexIdentifierImpl.IndexOperationMessage container; + private IndexOperationMessage container; public EsEntityIndexBatchImpl( final ApplicationScope applicationScope, - final IndexBufferProducer indexBatchBufferProducer, + final IndexBufferConsumer indexBatchBufferProducer, final AliasedEntityIndex entityIndex, - FailureMonitorImpl.IndexIdentifier indexIdentifier ) { + IndexIdentifier indexIdentifier ) { this.applicationScope = applicationScope; this.indexBatchBufferProducer = indexBatchBufferProducer; @@ -63,7 +63,7 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch { this.indexIdentifier = indexIdentifier; this.alias = indexIdentifier.getAlias(); //constrained - this.container = new IndexIdentifierImpl.IndexOperationMessage(); + this.container = new IndexOperationMessage(); } @@ -126,8 +126,8 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch { @Override public BetterFuture execute() { - IndexIdentifierImpl.IndexOperationMessage tempContainer = container; - container = new IndexIdentifierImpl.IndexOperationMessage(); + IndexOperationMessage tempContainer = container; + container = new IndexOperationMessage(); /** * No-op, just disregard it http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexFactoryImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexFactoryImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexFactoryImpl.java index 6145069..e41bcf8 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexFactoryImpl.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexFactoryImpl.java @@ -42,12 +42,12 @@ public class EsEntityIndexFactoryImpl implements EntityIndexFactory{ private final IndexFig config; private final EsProvider provider; - private final IndexBufferProducer indexBatchBufferProducer; + private final IndexBufferConsumer indexBatchBufferProducer; private final MetricsFactory metricsFactory; private final MapManagerFactory mapManagerFactory; private final IndexFig indexFig; private final AliasedEntityIndex entityIndex; - private final FailureMonitorImpl.IndexIdentifier indexIdentifier; + private final IndexIdentifier indexIdentifier; private LoadingCache<ApplicationScope, ApplicationEntityIndex> eiCache = CacheBuilder.newBuilder().maximumSize( 1000 ).build( new CacheLoader<ApplicationScope, ApplicationEntityIndex>() { @@ -60,9 +60,9 @@ public class EsEntityIndexFactoryImpl implements EntityIndexFactory{ @Inject public EsEntityIndexFactoryImpl( final IndexFig config, final EsProvider provider, - final IndexBufferProducer indexBatchBufferProducer, + final IndexBufferConsumer indexBatchBufferProducer, final MetricsFactory metricsFactory, final MapManagerFactory mapManagerFactory, - final IndexFig indexFig, final AliasedEntityIndex entityIndex, final FailureMonitorImpl.IndexIdentifier indexIdentifier ){ + final IndexFig indexFig, final AliasedEntityIndex entityIndex, final IndexIdentifier indexIdentifier ){ this.config = config; this.provider = provider; this.indexBatchBufferProducer = indexBatchBufferProducer; http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java index 456bd15..961ddd2 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java @@ -25,57 +25,33 @@ import com.google.common.collect.ImmutableMap; import com.google.common.io.Resources; import com.google.inject.Inject; import com.google.inject.Singleton; -import com.sun.org.apache.xpath.internal.operations.Bool; -import org.apache.commons.lang.StringUtils; + import org.apache.usergrid.persistence.core.future.BetterFuture; import org.apache.usergrid.persistence.core.metrics.MetricsFactory; import org.apache.usergrid.persistence.core.migration.data.VersionedData; -import org.apache.usergrid.persistence.core.scope.ApplicationScope; -import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl; import org.apache.usergrid.persistence.core.util.Health; import org.apache.usergrid.persistence.index.*; import org.apache.usergrid.persistence.index.exceptions.IndexException; import org.apache.usergrid.persistence.index.migration.IndexDataVersions; -import org.apache.usergrid.persistence.index.query.ParsedQuery; -import org.apache.usergrid.persistence.index.utils.UUIDUtils; -import org.apache.usergrid.persistence.model.entity.Entity; -import org.apache.usergrid.persistence.model.entity.Id; -import org.apache.usergrid.persistence.model.entity.SimpleId; -import org.apache.usergrid.persistence.model.util.EntityUtils; import org.apache.usergrid.persistence.model.util.UUIDGenerator; import org.elasticsearch.action.ActionFuture; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.ListenableActionFuture; -import org.elasticsearch.action.ShardOperationFailedException; import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequestBuilder; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; -import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse; import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse; -import org.elasticsearch.action.admin.indices.refresh.RefreshResponse; -import org.elasticsearch.action.index.*; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.search.SearchRequestBuilder; -import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.AdminClient; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.index.query.MatchAllQueryBuilder; -import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.indices.IndexAlreadyExistsException; -import org.elasticsearch.indices.IndexMissingException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import rx.*; + import rx.Observable; -import rx.functions.Action1; import java.io.IOException; import java.net.URL; import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicBoolean; /** @@ -87,8 +63,7 @@ public class EsEntityIndexImpl implements AliasedEntityIndex,VersionedData { private static final Logger logger = LoggerFactory.getLogger( EsEntityIndexImpl.class ); private final IndexAlias alias; - private final IndexBufferProducer indexBatchBufferProducer; - private final MetricsFactory metricsFactory; + private final IndexBufferConsumer producer; private final IndexFig indexFig; private final Timer addTimer; private final Timer updateAliasTimer; @@ -97,7 +72,6 @@ public class EsEntityIndexImpl implements AliasedEntityIndex,VersionedData { * We purposefully make this per instance. Some indexes may work, while others may fail */ private final EsProvider esProvider; - private final IndexBufferProducer producer; private final IndexRefreshCommand indexRefreshCommand; //number of times to wait for the index to refresh properly. @@ -110,8 +84,8 @@ public class EsEntityIndexImpl implements AliasedEntityIndex,VersionedData { private static final ImmutableMap<String, Object> DEFAULT_PAYLOAD = ImmutableMap.<String, Object>builder().put(IndexingUtils.ENTITY_ID_FIELDNAME, UUIDGenerator.newTimeUUID().toString()).build(); - private static final MatchAllQueryBuilder MATCH_ALL_QUERY_BUILDER = QueryBuilders.matchAllQuery(); - private final FailureMonitorImpl.IndexIdentifier indexIdentifier; + + private final IndexIdentifier indexIdentifier; private IndexCache aliasCache; private Timer mappingTimer; @@ -121,12 +95,12 @@ public class EsEntityIndexImpl implements AliasedEntityIndex,VersionedData { @Inject - public EsEntityIndexImpl(final IndexBufferProducer indexBatchBufferProducer, final EsProvider provider, - final IndexCache indexCache, final MetricsFactory metricsFactory, - final IndexFig indexFig, final FailureMonitorImpl.IndexIdentifier indexIdentifier, - final IndexBufferProducer producer, IndexRefreshCommand indexRefreshCommand) { - this.indexBatchBufferProducer = indexBatchBufferProducer; - this.metricsFactory = metricsFactory; + public EsEntityIndexImpl( final EsProvider provider, + final IndexCache indexCache, + final IndexFig indexFig, final IndexIdentifier indexIdentifier, + final IndexBufferConsumer producer, final IndexRefreshCommand indexRefreshCommand, + final MetricsFactory metricsFactory) { + this.indexFig = indexFig; this.indexIdentifier = indexIdentifier; @@ -349,7 +323,7 @@ public class EsEntityIndexImpl implements AliasedEntityIndex,VersionedData { public Observable<Boolean> refreshAsync() { refreshIndexMeter.mark(); - BetterFuture future = indexBatchBufferProducer.put(new IndexIdentifierImpl.IndexOperationMessage()); + BetterFuture future = producer.put(new IndexOperationMessage()); future.get(); return indexRefreshCommand.execute(); }