chenyuzhi459 opened a new pull request #10027:
URL: https://github.com/apache/druid/pull/10027
### Description
This PR fix a bug where we can not release resources when accumulate someone
segments.
### Reason
Because we can not cancel all futures in the
`com.google.common.util.concurrent.Futures.CombinedFuture` immediately when
exception occurs in someone future by using `CombinedFuture.cancel(boolean
mayInterruptIfRunning) ` simply.
Let's see a code example:
```
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
public class GuavaFutureTest {
public static void main(String[] args) {
ExecutorService service = Executors.newFixedThreadPool(3);
ListeningExecutorService exc =
MoreExecutors.listeningDecorator(service);
int tasks = 3;
int cancelCount = 10;
AtomicInteger index = new AtomicInteger(0);
Function<Integer, List<ListenableFuture<Object>>> function =
(c) -> {
List<ListenableFuture<Object>> futures = new
ArrayList<>();
for(int i = 0; i < c; i++){
ListenableFuture<Object> future =
exc.submit(new Callable<Object>() {
@Override
public Object call() throws Exception {
AtomicBoolean interupted = new
AtomicBoolean(false);
while (true &&
!interupted.get()){
try {
if(index.get()
== cancelCount){
//here
we simulate occurs exception in some on future.
throw
new RuntimeException("A big bug");
}
//print
something to say the task still active.
System.out.println(String.format("Thread[id=%s] running. %s",
Thread.currentThread().getId(), System.currentTimeMillis()));
Thread.sleep(1000);
index.getAndIncrement();
} catch
(InterruptedException e) {
interupted.set(true);
e.printStackTrace();
}
}
return null;
}
});
futures.add(future);
}
return futures;
};
List<ListenableFuture<Object>> futures = function.apply(tasks);
ListenableFuture future = Futures.allAsList(futures);
try{
future.get();
}catch(Exception e){
System.err.println(e);
// here we try to cancel all tasks. however, we can see
all tasks is printing message.
future.cancel(true);
}
}
}
```
Here is the console messages:
```
Thread[id=15] running. 1591945237078
Thread[id=13] running. 1591945237078
Thread[id=14] running. 1591945238078
Thread[id=13] running. 1591945241988
all task should be cancelled.
Thread[id=14] running. 1591945241988
java.util.concurrent.ExecutionException: java.lang.RuntimeException: A big
bug
Thread[id=13] running. 1591945242988
Thread[id=14] running. 1591945242988
Thread[id=13] running. 1591945243988
Thread[id=14] running. 1591945243989
```
I had test it from guava[version=16.0.1] to the latest guava[version=29.0].
it's the same result.
Accords to source code in guava[version=16.0.1], when someone future occurs
error, it will set the state of combineFuture from `RUNNING` to `COMPLETED`
immediately(method stack `CombinedFuture.setOneValue()
->CombinedFuture.setExceptionAndMaybeLog() -> AbstractFuture.setException() ->
AbstractFuture.Sync.setException() -> AbstractFuture.Sync.complete()`). Thus
when we use `cancel()` method which try to set state from `RUNNING` to
''INTERRUPTED|COMPLETED" through `CAS` operation to stop all tasks, it will
failed, because the state is not `RUNNING` anymore.
### Solution
we cancel all compute task manually.
<hr>
This PR has:
- [ ] been self-reviewed.
- [ ] using the [concurrency
checklist](https://github.com/apache/druid/blob/master/dev/code-review/concurrency.md)
(Remove this item if the PR doesn't have any relation to concurrency.)
- [ ] added documentation for new or modified features or behaviors.
- [ ] added Javadocs for most classes and all non-trivial methods. Linked
related entities via Javadoc links.
- [ ] added or updated version, license, or notice information in
[licenses.yaml](https://github.com/apache/druid/blob/master/licenses.yaml)
- [ ] added comments explaining the "why" and the intent of the code
wherever would not be obvious for an unfamiliar reader.
- [ ] added unit tests or modified existing tests to cover new code paths,
ensuring the threshold for [code
coverage](https://github.com/apache/druid/blob/master/dev/code-review/code-coverage.md)
is met.
- [ ] added integration tests.
- [ ] been tested in a test Druid cluster.
<!-- Check the items by putting "x" in the brackets for the done things. Not
all of these items apply to every PR. Remove the items which are not done or
not relevant to the PR. None of the items from the checklist above are strictly
necessary, but it would be very helpful if you at least self-review the PR. -->
<hr>
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]