Apache9 commented on a change in pull request #305: HBASE-22577
BufferedMutatorOverAsyncBufferedMutator.tryCompleteFuture…
URL: https://github.com/apache/hbase/pull/305#discussion_r293854876
##########
File path:
hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorOverAsyncBufferedMutator.java
##########
@@ -100,62 +107,64 @@ private RetriesExhaustedWithDetailsException makeError()
{
return new RetriesExhaustedWithDetailsException(throwables, rows,
hostnameAndPorts);
}
+ private void internalFlush() throws RetriesExhaustedWithDetailsException {
+ // should get the future array before calling mutator.flush, otherwise we
may hit an infinite
+ // wait, since someone may add new future to the map after we calling the
flush.
+ CompletableFuture<?>[] toWait = futures.keySet().toArray(new
CompletableFuture<?>[0]);
+ mutator.flush();
+ try {
+ CompletableFuture.allOf(toWait).join();
+ } catch (CompletionException e) {
+ // just ignore, we will record the actual error in the errors field
+ LOG.debug("Flush failed, you should get an exception thrown to your
code", e);
+ }
+ if (!errors.isEmpty()) {
+ RetriesExhaustedWithDetailsException error = makeError();
+ listener.onException(error, this);
+ }
+ }
+
@Override
public void mutate(List<? extends Mutation> mutations) throws IOException {
- List<CompletableFuture<Void>> toBuffered = new ArrayList<>();
List<CompletableFuture<Void>> fs = mutator.mutate(mutations);
for (int i = 0, n = fs.size(); i < n; i++) {
CompletableFuture<Void> toComplete = new CompletableFuture<>();
- final int index = i;
- addListener(fs.get(index), (r, e) -> {
+ futures.put(toComplete, toComplete);
+ Mutation mutation = mutations.get(i);
+ long heapSize = mutation.heapSize();
+ bufferedSize.addAndGet(heapSize);
+ addListener(fs.get(i), (r, e) -> {
+ futures.remove(toComplete);
+ bufferedSize.addAndGet(-heapSize);
if (e != null) {
- errors.add(Pair.newPair(mutations.get(index), e));
+ errors.add(Pair.newPair(mutation, e));
toComplete.completeExceptionally(e);
} else {
toComplete.complete(r);
}
});
- toBuffered.add(toComplete);
}
synchronized (this) {
- futures.addAll(toBuffered);
- if (futures.size() > BUFFERED_FUTURES_THRESHOLD) {
- tryCompleteFuture();
- }
- if (!errors.isEmpty()) {
+ if (bufferedSize.get() > mutator.getWriteBufferSize() * 2) {
Review comment:
The comment is just below this line. It is not a good idea to just check for
writeBufferSize as we will likely to always enter the branch, so a * 2 is used.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services