fix indexcacheimpl
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/a7e08db8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/a7e08db8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/a7e08db8 Branch: refs/heads/two-dot-o-dev Commit: a7e08db85babdb1f4c75c38cee67ba6f03547e32 Parents: 9e8d506 Author: Shawn Feldman <[email protected]> Authored: Thu Mar 26 15:17:13 2015 -0600 Committer: Shawn Feldman <[email protected]> Committed: Thu Mar 26 15:17:13 2015 -0600 ---------------------------------------------------------------------- .../persistence/index/EsIndexCache.java | 29 ++++ .../persistence/index/guice/IndexModule.java | 4 +- .../persistence/index/impl/EsIndexCache.java | 143 ------------------- .../index/impl/EsIndexCacheImpl.java | 141 ++++++++++++++++++ .../migration/EsIndexDataMigrationImpl.java | 2 +- .../index/guice/TestIndexModule.java | 2 +- 6 files changed, 175 insertions(+), 146 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a7e08db8/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EsIndexCache.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EsIndexCache.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EsIndexCache.java new file mode 100644 index 0000000..6427459 --- /dev/null +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EsIndexCache.java @@ -0,0 +1,29 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. The ASF licenses this file to You + * * under the Apache License, Version 2.0 (the "License"); you may not + * * use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. For additional information regarding + * * copyright in this work, please see the NOTICE file in the top level + * * directory of this distribution. + * + */ +package org.apache.usergrid.persistence.index; + +/** + * Classy class class. + */ +public interface EsIndexCache { + String[] getIndexes(IndexAlias alias, AliasedEntityIndex.AliasType aliasType); + + void invalidate(IndexAlias alias); +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a7e08db8/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java index 922a916..bb5ec54 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java @@ -48,9 +48,11 @@ public abstract class IndexModule extends AbstractModule { install(new MapModule()); install(new QueueModule()); - bind(EntityIndexFactory.class).to( EsEntityIndexFactoryImpl.class ); + bind(EntityIndexFactory.class).to(EsEntityIndexFactoryImpl.class); bind(AliasedEntityIndex.class).to(EsEntityIndexImpl.class); bind(EntityIndex.class).to(EsEntityIndexImpl.class); + bind(EsIndexCache.class).to(EsIndexCacheImpl.class); + bind(IndexIdentifier.class).to(IndexIdentifierImpl.class); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a7e08db8/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexCache.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexCache.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexCache.java deleted file mode 100644 index 9ba89ca..0000000 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexCache.java +++ /dev/null @@ -1,143 +0,0 @@ -/* - * - * * Licensed to the Apache Software Foundation (ASF) under one or more - * * contributor license agreements. The ASF licenses this file to You - * * under the Apache License, Version 2.0 (the "License"); you may not - * * use this file except in compliance with the License. - * * You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. For additional information regarding - * * copyright in this work, please see the NOTICE file in the top level - * * directory of this distribution. - * - */ - -package org.apache.usergrid.persistence.index.impl; - - -import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; - -import org.apache.usergrid.persistence.index.IndexAlias; -import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest; -import org.elasticsearch.client.AdminClient; -import org.elasticsearch.cluster.metadata.AliasMetaData; -import org.elasticsearch.common.collect.ImmutableOpenMap; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.usergrid.persistence.index.AliasedEntityIndex; -import org.apache.usergrid.persistence.index.IndexFig; -import org.apache.usergrid.persistence.index.IndexIdentifier; - -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListenableFutureTask; -import com.google.common.util.concurrent.ListeningScheduledExecutorService; -import com.google.common.util.concurrent.MoreExecutors; -import com.google.inject.Inject; -import com.google.inject.Singleton; - - -/** - * Cache for Es index operations - */ -@Singleton -public class EsIndexCache { - - private static final Logger logger = LoggerFactory.getLogger( EsEntityIndexImpl.class ); - private final ListeningScheduledExecutorService refreshExecutors; - - private LoadingCache<String, String[]> aliasIndexCache; - private EsProvider provider; - - - @Inject - public EsIndexCache( final EsProvider provider, final IndexFig indexFig ) { - - this.refreshExecutors = - MoreExecutors.listeningDecorator( Executors.newScheduledThreadPool( indexFig.getIndexCacheMaxWorkers() ) ); - - this.provider = provider; - - aliasIndexCache = CacheBuilder.newBuilder().maximumSize( 1000 ).refreshAfterWrite( 5, TimeUnit.MINUTES ) - .build( new CacheLoader<String, String[]>() { - @Override - public ListenableFuture<String[]> reload( final String key, - String[] oldValue ) - throws Exception { - ListenableFutureTask<String[]> task = - ListenableFutureTask.create( new Callable<String[]>() { - public String[] call() { - return load( key ); - } - } ); - refreshExecutors.execute( task ); - return task; - } - - - @Override - public String[] load( final String aliasName ) { - return getIndexesFromEs(aliasName); - } - } ); - } - - - /** - * Get indexes for an alias - */ - public String[] getIndexes( IndexAlias alias, AliasedEntityIndex.AliasType aliasType ) { - String[] indexes; - try { - indexes = aliasIndexCache.get( getAliasName( alias, aliasType ) ); - } - catch ( ExecutionException ee ) { - logger.error( "Failed to retreive indexes", ee ); - throw new RuntimeException( ee ); - } - return indexes; - } - - - - private String[] getIndexesFromEs(final String aliasName){ - final AdminClient adminClient = this.provider.getClient().admin(); - //remove write alias, can only have one - ImmutableOpenMap<String, List<AliasMetaData>> aliasMap = - adminClient.indices().getAliases( new GetAliasesRequest( aliasName ) ).actionGet().getAliases(); - return aliasMap.keys().toArray( String.class ); - } - - - /** - * Get the name of the alias to use - * @param alias - * @param aliasType - * @return - */ - private String getAliasName( IndexAlias alias, AliasedEntityIndex.AliasType aliasType ) { - return aliasType == AliasedEntityIndex.AliasType.Read ? alias.getReadAlias() : alias.getWriteAlias(); - } - - - /** - * clean up cache - */ - public void invalidate( IndexAlias alias ) { - aliasIndexCache.invalidate( alias.getWriteAlias() ); - aliasIndexCache.invalidate( alias.getReadAlias() ); - } -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a7e08db8/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexCacheImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexCacheImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexCacheImpl.java new file mode 100644 index 0000000..5f5cc8b --- /dev/null +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexCacheImpl.java @@ -0,0 +1,141 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. The ASF licenses this file to You + * * under the Apache License, Version 2.0 (the "License"); you may not + * * use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. For additional information regarding + * * copyright in this work, please see the NOTICE file in the top level + * * directory of this distribution. + * + */ + +package org.apache.usergrid.persistence.index.impl; + + +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import org.apache.usergrid.persistence.index.*; +import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest; +import org.elasticsearch.client.AdminClient; +import org.elasticsearch.cluster.metadata.AliasMetaData; +import org.elasticsearch.common.collect.ImmutableOpenMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListenableFutureTask; +import com.google.common.util.concurrent.ListeningScheduledExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.inject.Inject; +import com.google.inject.Singleton; + + +/** + * Cache for Es index operations + */ +@Singleton +public class EsIndexCacheImpl implements EsIndexCache { + + private static final Logger logger = LoggerFactory.getLogger( EsEntityIndexImpl.class ); + private final ListeningScheduledExecutorService refreshExecutors; + + private LoadingCache<String, String[]> aliasIndexCache; + private EsProvider provider; + + + @Inject + public EsIndexCacheImpl( final EsProvider provider, final IndexFig indexFig ) { + + this.refreshExecutors = + MoreExecutors.listeningDecorator( Executors.newScheduledThreadPool( indexFig.getIndexCacheMaxWorkers() ) ); + + this.provider = provider; + + aliasIndexCache = CacheBuilder.newBuilder().maximumSize( 1000 ).refreshAfterWrite( 5, TimeUnit.MINUTES ) + .build( new CacheLoader<String, String[]>() { + @Override + public ListenableFuture<String[]> reload( final String key, + String[] oldValue ) + throws Exception { + ListenableFutureTask<String[]> task = + ListenableFutureTask.create( new Callable<String[]>() { + public String[] call() { + return load( key ); + } + } ); + refreshExecutors.execute( task ); + return task; + } + + + @Override + public String[] load( final String aliasName ) { + return getIndexesFromEs(aliasName); + } + } ); + } + + + /** + * Get indexes for an alias + */ + @Override + public String[] getIndexes(IndexAlias alias, AliasedEntityIndex.AliasType aliasType) { + String[] indexes; + try { + indexes = aliasIndexCache.get( getAliasName( alias, aliasType ) ); + } + catch ( ExecutionException ee ) { + logger.error( "Failed to retreive indexes", ee ); + throw new RuntimeException( ee ); + } + return indexes; + } + + + + private String[] getIndexesFromEs(final String aliasName){ + final AdminClient adminClient = this.provider.getClient().admin(); + //remove write alias, can only have one + ImmutableOpenMap<String, List<AliasMetaData>> aliasMap = + adminClient.indices().getAliases( new GetAliasesRequest( aliasName ) ).actionGet().getAliases(); + return aliasMap.keys().toArray( String.class ); + } + + + /** + * Get the name of the alias to use + * @param alias + * @param aliasType + * @return + */ + private String getAliasName( IndexAlias alias, AliasedEntityIndex.AliasType aliasType ) { + return aliasType == AliasedEntityIndex.AliasType.Read ? alias.getReadAlias() : alias.getWriteAlias(); + } + + + /** + * clean up cache + */ + @Override + public void invalidate(IndexAlias alias) { + aliasIndexCache.invalidate( alias.getWriteAlias() ); + aliasIndexCache.invalidate( alias.getReadAlias() ); + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a7e08db8/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/migration/EsIndexDataMigrationImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/migration/EsIndexDataMigrationImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/migration/EsIndexDataMigrationImpl.java index d6e3812..fe0cfa1 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/migration/EsIndexDataMigrationImpl.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/migration/EsIndexDataMigrationImpl.java @@ -26,7 +26,7 @@ import org.apache.usergrid.persistence.index.AliasedEntityIndex; import org.apache.usergrid.persistence.index.IndexAlias; import org.apache.usergrid.persistence.index.IndexFig; import org.apache.usergrid.persistence.index.IndexIdentifier; -import org.apache.usergrid.persistence.index.impl.EsIndexCache; +import org.apache.usergrid.persistence.index.EsIndexCache; import org.apache.usergrid.persistence.index.impl.EsProvider; import org.apache.usergrid.persistence.index.impl.IndexingUtils; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequestBuilder; http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a7e08db8/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java index 6488b16..7a99d59 100644 --- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java +++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java @@ -31,7 +31,7 @@ import org.apache.usergrid.persistence.index.IndexBufferProducer; import org.apache.usergrid.persistence.index.IndexFig; import org.apache.usergrid.persistence.index.IndexIdentifier; import org.apache.usergrid.persistence.index.impl.EsEntityIndexImpl; -import org.apache.usergrid.persistence.index.impl.EsIndexCache; +import org.apache.usergrid.persistence.index.EsIndexCache; import org.apache.usergrid.persistence.index.impl.EsProvider; import org.apache.usergrid.persistence.index.migration.EsIndexDataMigrationImpl; import org.apache.usergrid.persistence.index.migration.LegacyIndexIdentifier;
