This is an automated email from the ASF dual-hosted git repository.

bbeaudreault pushed a commit to branch hubspot-2
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 954004af49f5578f0065082f2fec13229036878a
Author: Andrew Purtell <apurt...@apache.org>
AuthorDate: Thu Mar 10 11:55:38 2022 -0800

    HubSpot Backport: HBASE-26715 Blocked on SyncFuture in 
AsyncProtobufLogWriter#write (#4184)
    
    Co-authored-by: Bryan Beaudreault <bbeaudrea...@gmail.com>
    Signed-off-by: Duo Zhang <zhang...@apache.org>
    Signed-off-by: Viraj Jasani<virajjas...@apache.org>
    Signed-off-by: Xiaolin Ha <haxiao...@apache.org>
---
 .../regionserver/wal/AsyncProtobufLogWriter.java   | 23 +++++++++++++++++-----
 .../apache/hadoop/hbase/wal/AbstractWALRoller.java |  6 +++---
 2 files changed, 21 insertions(+), 8 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java
index e834d654310..ec7d3259e37 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java
@@ -25,14 +25,18 @@ import java.io.OutputStream;
 import java.nio.ByteBuffer;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.function.Consumer;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.io.ByteBufferWriter;
 import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput;
 import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutputHelper;
 import 
org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
+import org.apache.hadoop.hbase.wal.AbstractWALRoller;
 import org.apache.hadoop.hbase.wal.AsyncFSWALProvider;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -107,11 +111,20 @@ public class AsyncProtobufLogWriter extends 
AbstractProtobufLogWriter
   }
 
   private OutputStream asyncOutputWrapper;
+  private long waitTimeout;
 
   public AsyncProtobufLogWriter(EventLoopGroup eventLoopGroup,
       Class<? extends Channel> channelClass) {
     this.eventLoopGroup = eventLoopGroup;
     this.channelClass = channelClass;
+    // Reuse WAL_ROLL_WAIT_TIMEOUT here to avoid an infinite wait if somehow a 
wait on a future
+    // never completes. The objective is the same. We want to propagate an 
exception to trigger
+    // an abort if we seem to be hung.
+    if (this.conf == null) {
+      this.conf = HBaseConfiguration.create();
+    }
+    this.waitTimeout = 
this.conf.getLong(AbstractWALRoller.WAL_ROLL_WAIT_TIMEOUT,
+      AbstractWALRoller.DEFAULT_WAL_ROLL_WAIT_TIMEOUT);
   }
 
   /*
@@ -182,16 +195,16 @@ public class AsyncProtobufLogWriter extends 
AbstractProtobufLogWriter
     this.asyncOutputWrapper = new OutputStreamWrapper(output);
   }
 
-  private long write(Consumer<CompletableFuture<Long>> action) throws 
IOException {
+  private long writeWALMetadata(Consumer<CompletableFuture<Long>> action) 
throws IOException {
     CompletableFuture<Long> future = new CompletableFuture<>();
     action.accept(future);
     try {
-      return future.get().longValue();
+      return future.get(waitTimeout, TimeUnit.MILLISECONDS).longValue();
     } catch (InterruptedException e) {
       InterruptedIOException ioe = new InterruptedIOException();
       ioe.initCause(e);
       throw ioe;
-    } catch (ExecutionException e) {
+    } catch (ExecutionException | TimeoutException e) {
       Throwables.propagateIfPossible(e.getCause(), IOException.class);
       throw new RuntimeException(e.getCause());
     }
@@ -199,7 +212,7 @@ public class AsyncProtobufLogWriter extends 
AbstractProtobufLogWriter
 
   @Override
   protected long writeMagicAndWALHeader(byte[] magic, WALHeader header) throws 
IOException {
-    return write(future -> {
+    return writeWALMetadata(future -> {
       output.write(magic);
       try {
         header.writeDelimitedTo(asyncOutputWrapper);
@@ -219,7 +232,7 @@ public class AsyncProtobufLogWriter extends 
AbstractProtobufLogWriter
 
   @Override
   protected long writeWALTrailerAndMagic(WALTrailer trailer, byte[] magic) 
throws IOException {
-    return write(future -> {
+    return writeWALMetadata(future -> {
       try {
         trailer.writeTo(asyncOutputWrapper);
       } catch (IOException e) {
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractWALRoller.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractWALRoller.java
index 45db3bfed7b..3209b11581c 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractWALRoller.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractWALRoller.java
@@ -62,8 +62,8 @@ public abstract class AbstractWALRoller<T extends Abortable> 
extends Thread
   /**
    * Configure for the timeout of log rolling retry.
    */
-  protected static final String WAL_ROLL_WAIT_TIMEOUT =
-    "hbase.regionserver.logroll.wait.timeout.ms";
+  public static final String WAL_ROLL_WAIT_TIMEOUT = 
"hbase.regionserver.logroll.wait.timeout.ms";
+  public static final long DEFAULT_WAL_ROLL_WAIT_TIMEOUT = 30000;
 
   /**
    * Configure for the max count of log rolling retry.
@@ -130,7 +130,7 @@ public abstract class AbstractWALRoller<T extends 
Abortable> extends Thread
     this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 
10 * 1000);
     this.checkLowReplicationInterval =
       conf.getLong("hbase.regionserver.hlog.check.lowreplication.interval", 30 
* 1000);
-    this.rollWaitTimeout = conf.getLong(WAL_ROLL_WAIT_TIMEOUT, 30000);
+    this.rollWaitTimeout = conf.getLong(WAL_ROLL_WAIT_TIMEOUT, 
DEFAULT_WAL_ROLL_WAIT_TIMEOUT);
     // retry rolling does not have to be the default behavior, so the default 
value is 0 here
     this.maxRollRetry = conf.getInt(WAL_ROLL_RETRIES, 0);
   }

Reply via email to