Apache9 commented on a change in pull request #305: HBASE-22577
BufferedMutatorOverAsyncBufferedMutator.tryCompleteFuture…
URL: https://github.com/apache/hbase/pull/305#discussion_r293868097
##########
File path:
hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorOverAsyncBufferedMutator.java
##########
@@ -100,62 +107,64 @@ private RetriesExhaustedWithDetailsException makeError()
{
return new RetriesExhaustedWithDetailsException(throwables, rows,
hostnameAndPorts);
}
+ private void internalFlush() throws RetriesExhaustedWithDetailsException {
+ // should get the future array before calling mutator.flush, otherwise we
may hit an infinite
+ // wait, since someone may add new future to the map after we calling the
flush.
+ CompletableFuture<?>[] toWait = futures.keySet().toArray(new
CompletableFuture<?>[0]);
+ mutator.flush();
+ try {
+ CompletableFuture.allOf(toWait).join();
+ } catch (CompletionException e) {
+ // just ignore, we will record the actual error in the errors field
+ LOG.debug("Flush failed, you should get an exception thrown to your
code", e);
+ }
+ if (!errors.isEmpty()) {
+ RetriesExhaustedWithDetailsException error = makeError();
+ listener.onException(error, this);
+ }
+ }
+
@Override
public void mutate(List<? extends Mutation> mutations) throws IOException {
- List<CompletableFuture<Void>> toBuffered = new ArrayList<>();
List<CompletableFuture<Void>> fs = mutator.mutate(mutations);
for (int i = 0, n = fs.size(); i < n; i++) {
CompletableFuture<Void> toComplete = new CompletableFuture<>();
- final int index = i;
- addListener(fs.get(index), (r, e) -> {
+ futures.put(toComplete, toComplete);
+ Mutation mutation = mutations.get(i);
+ long heapSize = mutation.heapSize();
+ bufferedSize.addAndGet(heapSize);
+ addListener(fs.get(i), (r, e) -> {
+ futures.remove(toComplete);
+ bufferedSize.addAndGet(-heapSize);
if (e != null) {
- errors.add(Pair.newPair(mutations.get(index), e));
+ errors.add(Pair.newPair(mutation, e));
toComplete.completeExceptionally(e);
} else {
toComplete.complete(r);
}
});
- toBuffered.add(toComplete);
}
synchronized (this) {
- futures.addAll(toBuffered);
- if (futures.size() > BUFFERED_FUTURES_THRESHOLD) {
- tryCompleteFuture();
- }
- if (!errors.isEmpty()) {
+ if (bufferedSize.get() > mutator.getWriteBufferSize() * 2) {
Review comment:
Add more comments about why *2 here.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services