chenyuzhi459 opened a new pull request #10027:
URL: https://github.com/apache/druid/pull/10027


   
   
   ### Description
   This PR fix a bug where we can not release resources when accumulate someone 
segments. 
   ### Reason
   Because we can not cancel all futures in the 
`com.google.common.util.concurrent.Futures.CombinedFuture` immediately when 
exception occurs in someone future by using `CombinedFuture.cancel(boolean 
mayInterruptIfRunning) ` simply.
   
   Let's see a code example:
   ```
   import com.google.common.util.concurrent.Futures;
   import com.google.common.util.concurrent.ListenableFuture;
   import com.google.common.util.concurrent.ListeningExecutorService;
   import com.google.common.util.concurrent.MoreExecutors;
   
   import java.util.ArrayList;
   import java.util.List;
   import java.util.concurrent.Callable;
   import java.util.concurrent.ExecutorService;
   import java.util.concurrent.Executors;
   import java.util.concurrent.atomic.AtomicBoolean;
   import java.util.concurrent.atomic.AtomicInteger;
   import java.util.function.Function;
   
   public class GuavaFutureTest {
   
        public static void main(String[] args) {
                ExecutorService service = Executors.newFixedThreadPool(3);
                ListeningExecutorService exc = 
MoreExecutors.listeningDecorator(service);
                int tasks = 3;
                int cancelCount = 10;
                AtomicInteger index = new AtomicInteger(0);
                Function<Integer, List<ListenableFuture<Object>>> function = 
(c) -> {
                        List<ListenableFuture<Object>> futures = new 
ArrayList<>();
                        for(int i = 0; i < c; i++){
                                ListenableFuture<Object> future = 
exc.submit(new Callable<Object>() {
                                        @Override
                                        public Object call() throws Exception {
                                                AtomicBoolean interupted = new 
AtomicBoolean(false);
                                                while (true && 
!interupted.get()){
                                                        try {
                                                                if(index.get() 
== cancelCount){
                                                                        //here 
we simulate occurs exception in some on future.
                                                                        throw 
new RuntimeException("A big bug");
                                                                }
                                                                //print 
something to say the task still active.
                                                                
System.out.println(String.format("Thread[id=%s] running. %s", 
Thread.currentThread().getId(), System.currentTimeMillis()));
                                                                
Thread.sleep(1000);
                                                                
index.getAndIncrement();
                                                        } catch 
(InterruptedException e) {
                                                                
interupted.set(true);
                                                                
e.printStackTrace();
                                                        }
                                                }
                                                return null;
                                        }
                                });
                                futures.add(future);
                        }
                        return futures;
                };
   
                List<ListenableFuture<Object>> futures = function.apply(tasks);
   
                ListenableFuture future = Futures.allAsList(futures);
                try{
                        future.get();
                }catch(Exception e){
                        System.err.println(e);
                        // here we try to cancel all tasks. however, we can see 
all tasks is printing message.
                        future.cancel(true);
                }
        }
   
   }
   
   ```
   Here is the console messages:
   ```
   Thread[id=15] running. 1591945237078
   Thread[id=13] running. 1591945237078
   Thread[id=14] running. 1591945238078
   Thread[id=13] running. 1591945241988
   all task should be cancelled.
   Thread[id=14] running. 1591945241988
   java.util.concurrent.ExecutionException: java.lang.RuntimeException: A big 
bug
   Thread[id=13] running. 1591945242988
   Thread[id=14] running. 1591945242988
   Thread[id=13] running. 1591945243988
   Thread[id=14] running. 1591945243989
   ```
   I had test it from guava[version=16.0.1] to  the latest guava[version=29.0]. 
it's the same result. 
   Accords to source code in guava[version=16.0.1], when someone future occurs 
error, it will set the state of combineFuture from `RUNNING` to `COMPLETED` 
immediately(method stack `CombinedFuture.setOneValue() 
->CombinedFuture.setExceptionAndMaybeLog() -> AbstractFuture.setException() -> 
AbstractFuture.Sync.setException() ->  AbstractFuture.Sync.complete()`). Thus 
when we use `cancel()` method which try to set state from `RUNNING` to 
''INTERRUPTED|COMPLETED" through `CAS` operation to stop all tasks, it will 
failed, because the state is not `RUNNING` anymore.
   
   
   
   
   ### Solution
   we cancel all compute task manually.
   
   
   
   
   <hr>
   
   This PR has:
   - [ ] been self-reviewed.
      - [ ] using the [concurrency 
checklist](https://github.com/apache/druid/blob/master/dev/code-review/concurrency.md)
 (Remove this item if the PR doesn't have any relation to concurrency.)
   - [ ] added documentation for new or modified features or behaviors.
   - [ ] added Javadocs for most classes and all non-trivial methods. Linked 
related entities via Javadoc links.
   - [ ] added or updated version, license, or notice information in 
[licenses.yaml](https://github.com/apache/druid/blob/master/licenses.yaml)
   - [ ] added comments explaining the "why" and the intent of the code 
wherever would not be obvious for an unfamiliar reader.
   - [ ] added unit tests or modified existing tests to cover new code paths, 
ensuring the threshold for [code 
coverage](https://github.com/apache/druid/blob/master/dev/code-review/code-coverage.md)
 is met.
   - [ ] added integration tests.
   - [ ] been tested in a test Druid cluster.
   
   <!-- Check the items by putting "x" in the brackets for the done things. Not 
all of these items apply to every PR. Remove the items which are not done or 
not relevant to the PR. None of the items from the checklist above are strictly 
necessary, but it would be very helpful if you at least self-review the PR. -->
   
   <hr>
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to