Repository: incubator-usergrid Updated Branches: refs/heads/asyncqueue 51a9ffd92 -> dd37909fd
Removed Hystrix. Revert this commit to re-apply hystrix on this issue is resolved. https://github.com/Netflix/Hystrix/pull/209 Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/dd37909f Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/dd37909f Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/dd37909f Branch: refs/heads/asyncqueue Commit: dd37909fd2b04b07cce8bdebf3e9a801ad28da32 Parents: 51a9ffd Author: Todd Nine <[email protected]> Authored: Mon Mar 10 11:25:32 2014 -0700 Committer: Todd Nine <[email protected]> Committed: Mon Mar 10 11:25:32 2014 -0700 ---------------------------------------------------------------------- stack/corepersistence/collection/pom.xml | 12 +- .../collection/hystrix/CassandraCommand.java | 74 ------- .../collection/hystrix/CommandUtils.java | 28 --- .../persistence/collection/rx/ParallelTest.java | 219 ------------------- 4 files changed, 6 insertions(+), 327 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/dd37909f/stack/corepersistence/collection/pom.xml ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/pom.xml b/stack/corepersistence/collection/pom.xml index 00db576..95aea45 100644 --- a/stack/corepersistence/collection/pom.xml +++ b/stack/corepersistence/collection/pom.xml @@ -201,14 +201,14 @@ <version>${log4j.version}</version> </dependency> - <!--Remove custom build once this patch is complete + <!-- Re-add once this is done https://github.com/Netflix/Hystrix/pull/209--> - <dependency> - <groupId>com.netflix.hystrix</groupId> - <artifactId>hystrix-core</artifactId> - <version>1.3.14-SNAPSHOT</version> - </dependency> + <!--<dependency>--> + <!--<groupId>com.netflix.hystrix</groupId>--> + <!--<artifactId>hystrix-core</artifactId>--> + <!--<version>1.3.13</version>--> + <!--</dependency>--> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/dd37909f/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/hystrix/CassandraCommand.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/hystrix/CassandraCommand.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/hystrix/CassandraCommand.java deleted file mode 100644 index 731933a..0000000 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/hystrix/CassandraCommand.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.persistence.collection.hystrix; - - -import com.netflix.hystrix.HystrixCommand; -import com.netflix.hystrix.HystrixCommandGroupKey; - -import rx.Observable; -import rx.schedulers.Schedulers; - - -/** - * Default command that just returns the value handed to it. Useful for creating observables that are subscribed on the - * correct underlying Hystrix thread pool - * - * TODO change this when this PR makes it into head to wrap our observables - * https://github.com/Netflix/Hystrix/pull/209 - */ -public class CassandraCommand<R> extends HystrixCommand<R> { - - public static final String NAME = "CassandraCommand"; - - public static final HystrixCommandGroupKey GROUP_KEY = HystrixCommandGroupKey.Factory.asKey( NAME ); - - public static final String THREAD_POOL_SIZE = CommandUtils.getThreadPoolCoreSize( NAME ); - - public static final String THREAD_POOL_QUEUE = CommandUtils.getThreadPoolMaxQueueSize( NAME ); - - - private final R value; - - - private CassandraCommand( final R value ) { - super( GROUP_KEY ); - this.value = value; - } - - - @Override - protected R run() throws Exception { - return value; - } - - - /** - * Get the write command - * - * @param readValue The value to observe on - * - * @return The value wrapped in a Hystrix observable - */ - private static <R> Observable<R> toObservable( R readValue ) { - //create a new command and ensure it's observed on the correct thread scheduler - return new CassandraCommand<R>( readValue ).toObservable( Schedulers.io() ); - } -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/dd37909f/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/hystrix/CommandUtils.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/hystrix/CommandUtils.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/hystrix/CommandUtils.java deleted file mode 100644 index b81f79b..0000000 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/hystrix/CommandUtils.java +++ /dev/null @@ -1,28 +0,0 @@ -package org.apache.usergrid.persistence.collection.hystrix; - - -/** - * - * - */ -public class CommandUtils { - - /** - * Get the name of the archiaus property for the core thread pool size - * @param threadPoolName - * @return - */ - public static String getThreadPoolCoreSize(String threadPoolName){ - return "hystrix.threadpool."+ threadPoolName + ".coreSize"; - } - - /** - * Get the name of the archiaus property for the max thread pool size - * @param threadPoolName - * @return - */ - public static String getThreadPoolMaxQueueSize(String threadPoolName){ - return "hystrix.threadpool."+ threadPoolName + ".maxQueueSize"; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/dd37909f/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/ParallelTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/ParallelTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/ParallelTest.java deleted file mode 100644 index 434ea26..0000000 --- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/ParallelTest.java +++ /dev/null @@ -1,219 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.persistence.collection.rx; - - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CountDownLatch; - -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.usergrid.persistence.collection.hystrix.CommandUtils; - -import com.google.common.collect.HashMultiset; -import com.google.common.collect.Multiset; -import com.netflix.config.ConfigurationManager; -import com.netflix.hystrix.HystrixCommand; -import com.netflix.hystrix.HystrixCommandGroupKey; - -import rx.Observable; -import rx.Scheduler; -import rx.functions.Func1; -import rx.functions.FuncN; -import rx.schedulers.Schedulers; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - - -/** - * Tests that provides examples of how to perform more complex RX operations - */ -public class ParallelTest { - - private static final Logger logger = LoggerFactory.getLogger( ParallelTest.class ); - - - private static final HystrixCommandGroupKey GROUP_KEY = HystrixCommandGroupKey.Factory.asKey( "TEST_KEY" ); - - - public static final String THREAD_POOL_SIZE = CommandUtils.getThreadPoolCoreSize( GROUP_KEY.name() ); - - public static final String THREAD_POOL_QUEUE = CommandUtils.getThreadPoolMaxQueueSize( GROUP_KEY.name() ); - - - /** - * An example of how an observable that requires a "fan out" then join should execute. - */ - @Test(timeout = 5000) - public void concurrentFunctions() { - final String input = "input"; - - final int size = 100; - //since we start at index 0 - final int expected = size - 1; - - - /** - * QUESTION Using this thread blocks indefinitely. The execution of the Hystrix command happens on the - * computation - * Thread if this is used - */ - // final Scheduler scheduler = Schedulers.threadPoolForComputation(); - - //use the I/O scheduler to allow enough thread, otherwise our pool will be the same size as the # of cores - final Scheduler scheduler = Schedulers.io(); - - //set our size equal - ConfigurationManager.getConfigInstance().setProperty( THREAD_POOL_SIZE, size ); - // ConfigurationManager.getConfigInstance().setProperty( THREAD_POOL_SIZE, 10 ); - - //reject requests we have to queue - ConfigurationManager.getConfigInstance().setProperty( THREAD_POOL_QUEUE, -1 ); - - //latch used to make each thread block to prove correctness - final CountDownLatch latch = new CountDownLatch( size ); - - - final Multiset<String> set = HashMultiset.create(); - - - //create our observable and execute it in the I/O pool since we'll be doing I/O operations - - /** - * QUESTION: Should this use the computation scheduler since all operations (except the hystrix command) are - * non blocking? - */ - - final Observable<String> observable = Observable.from( input ).observeOn( scheduler ); - - - Observable<Integer> thing = observable.mapMany( new Func1<String, Observable<Integer>>() { - - @Override - public Observable<Integer> call( final String s ) { - List<Observable<Integer>> functions = new ArrayList<Observable<Integer>>(); - - logger.info( "Creating new set of observables in thread {}", Thread.currentThread().getName() ); - - for ( int i = 0; i < size; i++ ) { - - - final int index = i; - - //create a new observable and execute the function on it. These should happen in parallel when - // a subscription occurs - - /** - * QUESTION: Should this again be the process thread, not the I/O - */ - Observable<String> newObservable = Observable.from( input ).subscribeOn( scheduler ); - - Observable<Integer> transformed = newObservable.map( new Func1<String, Integer>() { - - @Override - public Integer call( final String s ) { - - final String threadName = Thread.currentThread().getName(); - - logger.info( "Invoking parallel task in thread {}", threadName ); - - /** - * Simulate a Hystrix command making a call to an external resource. Invokes - * the Hystrix command immediately as the function is invoked. This is currently - * how we have to call Cassandra. - * - * TODO This needs to be re-written and evaluated once this PR is released https://github.com/Netflix/Hystrix/pull/209 - */ - return new HystrixCommand<Integer>( GROUP_KEY ) { - @Override - protected Integer run() throws Exception { - - final String threadName = Thread.currentThread().getName(); - - logger.info( "Invoking hystrix task in thread {}", threadName ); - - - set.add( threadName ); - - latch.countDown(); - - try { - latch.await(); - } - catch ( InterruptedException e ) { - throw new RuntimeException( "Interrupted", e ); - } - - assertTrue( isExecutedInThread() ); - - return index; - } - }.execute(); - } - } ); - - functions.add( transformed ); - } - - /** - * Execute the functions above and zip the results together - */ - Observable<Integer> zipped = Observable.zip( functions, new FuncN<Integer>() { - - @Override - public Integer call( final Object... args ) { - - logger.info( "Invoking zip in thread {}", Thread.currentThread().getName() ); - - assertEquals( size, args.length ); - - for ( int i = 0; i < args.length; i++ ) { - assertEquals( "Indexes are returned in order", i, args[i] ); - } - - //just return our string - return ( Integer ) args[args.length - 1]; - } - } ); - - return zipped; - } - } ); - - - final Integer last = thing.toBlockingObservable().last(); - - - assertEquals( expected, last.intValue() ); - - assertEquals( size, set.size() ); - - /** - * Ensure only 1 entry per thread - */ - for ( String entry : set.elementSet() ) { - assertEquals( 1, set.count( entry ) ); - } - } -}
