This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch branch-2.2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2.2 by this push:
     new 15a9c88  HBASE-22684 The log rolling request maybe canceled 
immediately in LogRoller due to a race (#378)
15a9c88 is described below

commit 15a9c882b978bb326dfaee14a20bf58392ea1ecd
Author: Duo Zhang <[email protected]>
AuthorDate: Mon Jul 15 21:35:06 2019 +0800

    HBASE-22684 The log rolling request maybe canceled immediately in LogRoller 
due to a race (#378)
    
    Signed-off-by: Michael Stack <[email protected]>
---
 .../hadoop/hbase/regionserver/LogRoller.java       | 165 ++++++++++-----------
 .../hbase/regionserver/wal/AbstractFSWAL.java      |  25 +++-
 .../hadoop/hbase/regionserver/wal/AsyncFSWAL.java  |   9 +-
 .../hadoop/hbase/regionserver/wal/FSHLog.java      |   2 +
 .../hadoop/hbase/regionserver/TestWALLockup.java   |   2 +-
 .../hbase/regionserver/wal/TestAsyncFSWAL.java     | 132 ++++++++++++++++-
 6 files changed, 230 insertions(+), 105 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
index 55c5219..464f51b 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
@@ -20,11 +20,13 @@ package org.apache.hadoop.hbase.regionserver;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.net.ConnectException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.ReentrantLock;
-
+import java.util.concurrent.ConcurrentMap;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
@@ -37,6 +39,7 @@ import org.apache.hadoop.ipc.RemoteException;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import 
org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
 
 /**
@@ -52,43 +55,47 @@ import 
org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti
 @VisibleForTesting
 public class LogRoller extends HasThread implements Closeable {
   private static final Logger LOG = LoggerFactory.getLogger(LogRoller.class);
-  private final ReentrantLock rollLock = new ReentrantLock();
-  private final AtomicBoolean rollLog = new AtomicBoolean(false);
-  private final ConcurrentHashMap<WAL, Boolean> walNeedsRoll = new 
ConcurrentHashMap<>();
+  private final ConcurrentMap<WAL, Boolean> walNeedsRoll = new 
ConcurrentHashMap<>();
   private final Server server;
   protected final RegionServerServices services;
-  private volatile long lastrolltime = System.currentTimeMillis();
+  private volatile long lastRollTime = System.currentTimeMillis();
   // Period to roll log.
-  private final long rollperiod;
+  private final long rollPeriod;
   private final int threadWakeFrequency;
   // The interval to check low replication on hlog's pipeline
   private long checkLowReplicationInterval;
 
   private volatile boolean running = true;
 
-  public void addWAL(final WAL wal) {
-    if (null == walNeedsRoll.putIfAbsent(wal, Boolean.FALSE)) {
-      wal.registerWALActionsListener(new WALActionsListener() {
-        @Override
-        public void logRollRequested(boolean lowReplicas) {
-          walNeedsRoll.put(wal, Boolean.TRUE);
-          // TODO logs will contend with each other here, replace with e.g. 
DelayedQueue
-          synchronized(rollLog) {
-            rollLog.set(true);
-            rollLog.notifyAll();
+  public void addWAL(WAL wal) {
+    // check without lock first
+    if (walNeedsRoll.containsKey(wal)) {
+      return;
+    }
+    // this is to avoid race between addWAL and requestRollAll.
+    synchronized (this) {
+      if (walNeedsRoll.putIfAbsent(wal, Boolean.FALSE) == null) {
+        wal.registerWALActionsListener(new WALActionsListener() {
+          @Override
+          public void logRollRequested(boolean lowReplicas) {
+            // TODO logs will contend with each other here, replace with e.g. 
DelayedQueue
+            synchronized (LogRoller.this) {
+              walNeedsRoll.put(wal, Boolean.TRUE);
+              LogRoller.this.notifyAll();
+            }
           }
-        }
-      });
+        });
+      }
     }
   }
 
   public void requestRollAll() {
-    for (WAL wal : walNeedsRoll.keySet()) {
-      walNeedsRoll.put(wal, Boolean.TRUE);
-    }
-    synchronized(rollLog) {
-      rollLog.set(true);
-      rollLog.notifyAll();
+    synchronized (this) {
+      List<WAL> wals = new ArrayList<WAL>(walNeedsRoll.keySet());
+      for (WAL wal : wals) {
+        walNeedsRoll.put(wal, Boolean.TRUE);
+      }
+      notifyAll();
     }
   }
 
@@ -97,7 +104,7 @@ public class LogRoller extends HasThread implements 
Closeable {
     super("LogRoller");
     this.server = server;
     this.services = services;
-    this.rollperiod = this.server.getConfiguration().
+    this.rollPeriod = this.server.getConfiguration().
       getLong("hbase.regionserver.logroll.period", 3600000);
     this.threadWakeFrequency = this.server.getConfiguration().
       getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
@@ -105,19 +112,10 @@ public class LogRoller extends HasThread implements 
Closeable {
         "hbase.regionserver.hlog.check.lowreplication.interval", 30 * 1000);
   }
 
-  @Override
-  public void interrupt() {
-    // Wake up if we are waiting on rollLog. For tests.
-    synchronized (rollLog) {
-      this.rollLog.notify();
-    }
-    super.interrupt();
-  }
-
   /**
    * we need to check low replication in period, see HBASE-18132
    */
-  void checkLowReplication(long now) {
+  private void checkLowReplication(long now) {
     try {
       for (Entry<WAL, Boolean> entry : walNeedsRoll.entrySet()) {
         WAL wal = entry.getKey();
@@ -152,47 +150,49 @@ public class LogRoller extends HasThread implements 
Closeable {
   @Override
   public void run() {
     while (running) {
+      boolean periodic = false;
       long now = System.currentTimeMillis();
       checkLowReplication(now);
-      boolean periodic = false;
-      if (!rollLog.get()) {
-        periodic = (now - this.lastrolltime) > this.rollperiod;
-        if (!periodic) {
-          synchronized (rollLog) {
+      periodic = (now - this.lastRollTime) > this.rollPeriod;
+      if (periodic) {
+        // Time for periodic roll, fall through
+        LOG.debug("Wal roll period {} ms elapsed", this.rollPeriod);
+      } else {
+        synchronized (this) {
+          if (walNeedsRoll.values().stream().anyMatch(Boolean::booleanValue)) {
+            // WAL roll requested, fall through
+            LOG.debug("WAL roll requested");
+          } else {
             try {
-              if (!rollLog.get()) {
-                rollLog.wait(this.threadWakeFrequency);
-              }
+              wait(this.threadWakeFrequency);
             } catch (InterruptedException e) {
-              // Fall through
+              // restore the interrupt state
+              Thread.currentThread().interrupt();
             }
+            // goto the beginning to check whether again whether we should 
fall through to roll
+            // several WALs, and also check whether we should quit.
+            continue;
           }
-          continue;
         }
-        // Time for periodic roll
-        LOG.debug("Wal roll period {} ms elapsed", this.rollperiod);
-      } else {
-        LOG.debug("WAL roll requested");
       }
-      rollLock.lock(); // FindBugs UL_UNRELEASED_LOCK_EXCEPTION_PATH
       try {
-        this.lastrolltime = now;
-        for (Entry<WAL, Boolean> entry : walNeedsRoll.entrySet()) {
-          final WAL wal = entry.getKey();
-          // Force the roll if the logroll.period is elapsed or if a roll was 
requested.
-          // The returned value is an array of actual region names.
-          final byte [][] regionsToFlush = wal.rollWriter(periodic ||
-              entry.getValue().booleanValue());
+        this.lastRollTime = System.currentTimeMillis();
+        for (Iterator<Entry<WAL, Boolean>> iter = 
walNeedsRoll.entrySet().iterator(); iter
+            .hasNext();) {
+          Entry<WAL, Boolean> entry = iter.next();
+          WAL wal = entry.getKey();
+          // reset the flag in front to avoid missing roll request before we 
return from rollWriter.
           walNeedsRoll.put(wal, Boolean.FALSE);
+            // Force the roll if the logroll.period is elapsed or if a roll 
was requested.
+            // The returned value is an array of actual region names.
+            byte[][]   regionsToFlush = wal.rollWriter(periodic || 
entry.getValue().booleanValue());
           if (regionsToFlush != null) {
             for (byte[] r : regionsToFlush) {
-              scheduleFlush(r);
+              scheduleFlush(Bytes.toString(r));
             }
           }
         }
-      } catch (FailedLogCloseException e) {
-        abort("Failed log close in log roller", e);
-      } catch (java.net.ConnectException e) {
+      } catch (FailedLogCloseException | ConnectException e) {
         abort("Failed log close in log roller", e);
       } catch (IOException ex) {
         // Abort if we get here.  We probably won't recover an IOE. HBASE-1132
@@ -201,12 +201,6 @@ public class LogRoller extends HasThread implements 
Closeable {
       } catch (Exception ex) {
         LOG.error("Log rolling failed", ex);
         abort("Log rolling failed", ex);
-      } finally {
-        try {
-          rollLog.set(false);
-        } finally {
-          rollLock.unlock();
-        }
       }
     }
     LOG.info("LogRoller exiting.");
@@ -215,22 +209,20 @@ public class LogRoller extends HasThread implements 
Closeable {
   /**
    * @param encodedRegionName Encoded name of region to flush.
    */
-  private void scheduleFlush(final byte [] encodedRegionName) {
-    boolean scheduled = false;
-    HRegion r = (HRegion) 
this.services.getRegion(Bytes.toString(encodedRegionName));
-    FlushRequester requester = null;
-    if (r != null) {
-      requester = this.services.getFlushRequester();
-      if (requester != null) {
-        // force flushing all stores to clean old logs
-        requester.requestFlush(r, true, FlushLifeCycleTracker.DUMMY);
-        scheduled = true;
-      }
+  private void scheduleFlush(String encodedRegionName) {
+    HRegion r = (HRegion) this.services.getRegion(encodedRegionName);
+    if (r == null) {
+      LOG.warn("Failed to schedule flush of {}, because it is not online on 
us", encodedRegionName);
+      return;
     }
-    if (!scheduled) {
-      LOG.warn("Failed to schedule flush of {}, region={}, requester={}",
-        Bytes.toString(encodedRegionName), r, requester);
+    FlushRequester requester = this.services.getFlushRequester();
+    if (requester == null) {
+      LOG.warn("Failed to schedule flush of {}, region={}, because 
FlushRequester is null",
+        encodedRegionName, r);
+      return;
     }
+    // force flushing all stores to clean old logs
+    requester.requestFlush(r, true, FlushLifeCycleTracker.DUMMY);
   }
 
   /**
@@ -239,12 +231,7 @@ public class LogRoller extends HasThread implements 
Closeable {
    */
   @VisibleForTesting
   public boolean walRollFinished() {
-    for (boolean needRoll : walNeedsRoll.values()) {
-      if (needRoll) {
-        return false;
-      }
-    }
-    return true;
+    return walNeedsRoll.values().stream().allMatch(needRoll -> !needRoll);
   }
 
   @Override
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
index c9c17ea..43f1512 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
@@ -280,6 +280,8 @@ public abstract class AbstractFSWAL<W extends WriterBase> 
implements WAL {
    */
   protected final String implClassName;
 
+  protected final AtomicBoolean rollRequested = new AtomicBoolean(false);
+
   public long getFilenum() {
     return this.filenum.get();
   }
@@ -681,11 +683,8 @@ public abstract class AbstractFSWAL<W extends WriterBase> 
implements WAL {
   }
 
   /**
-   * <p>
-   * Cleans up current writer closing it and then puts in place the passed in
-   * <code>nextWriter</code>.
-   * </p>
-   * <p>
+   * Cleans up current writer closing it and then puts in place the passed in 
{@code nextWriter}.
+   * <p/>
    * <ul>
    * <li>In the case of creating a new WAL, oldPath will be null.</li>
    * <li>In the case of rolling over from one file to the next, none of the 
parameters will be null.
@@ -693,7 +692,6 @@ public abstract class AbstractFSWAL<W extends WriterBase> 
implements WAL {
    * <li>In the case of closing out this FSHLog with no further use newPath 
and nextWriter will be
    * null.</li>
    * </ul>
-   * </p>
    * @param oldPath may be null
    * @param newPath may be null
    * @param nextWriter may be null
@@ -875,8 +873,14 @@ public abstract class AbstractFSWAL<W extends WriterBase> 
implements WAL {
     return cachedSyncFutures.get().reset(sequence);
   }
 
+  protected boolean isLogRollRequested() {
+    return rollRequested.get();
+  }
+
   protected final void requestLogRoll(boolean tooFewReplicas) {
-    if (!this.listeners.isEmpty()) {
+    // If we have already requested a roll, don't do it again
+    // And only set rollRequested to true when there is a registered listener
+    if (!this.listeners.isEmpty() && rollRequested.compareAndSet(false, true)) 
{
       for (WALActionsListener i : this.listeners) {
         i.logRollRequested(tooFewReplicas);
       }
@@ -1031,6 +1035,13 @@ public abstract class AbstractFSWAL<W extends 
WriterBase> implements WAL {
   protected abstract W createWriterInstance(Path path)
       throws IOException, CommonFSUtils.StreamLacksCapabilityException;
 
+  /**
+   * Notice that you need to clear the {@link #rollRequested} flag in this 
method, as the new writer
+   * will begin to work before returning from this method. If we clear the 
flag after returning from
+   * this call, we may miss a roll request. The implementation class should 
choose a proper place to
+   * clear the {@link #rollRequested} flag so we do not miss a roll request, 
typically before you
+   * start writing to the new writer.
+   */
   protected abstract void doReplaceWriter(Path oldPath, Path newPath, W 
nextWriter)
       throws IOException;
 
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
index 5699b3d..79409a0 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
@@ -167,9 +167,6 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
   // notice that, modification to this field is only allowed under the 
protection of consumeLock.
   private volatile int epochAndState;
 
-  // used to guard the log roll request when we exceed the log roll size.
-  private boolean rollRequested;
-
   private boolean readyForRolling;
 
   private final Condition readyForRollingCond = consumeLock.newCondition();
@@ -336,10 +333,9 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> 
{
       // closed soon.
       return;
     }
-    if (writer.getLength() < logrollsize || rollRequested) {
+    if (writer.getLength() < logrollsize || isLogRollRequested()) {
       return;
     }
-    rollRequested = true;
     requestLogRoll();
   }
 
@@ -666,7 +662,6 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
       this.fsOut = ((AsyncProtobufLogWriter) nextWriter).getOutput();
     }
     this.fileLengthAtLastSync = nextWriter.getLength();
-    this.rollRequested = false;
     this.highestProcessedAppendTxidAtLastSync = 0L;
     consumeLock.lock();
     try {
@@ -675,6 +670,8 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
       int nextEpoch = currentEpoch == MAX_EPOCH ? 0 : currentEpoch + 1;
       // set a new epoch and also clear waitingRoll and writerBroken
       this.epochAndState = nextEpoch << 2;
+      // Reset rollRequested status
+      rollRequested.set(false);
       consumeExecutor.execute(consumer);
     } finally {
       consumeLock.unlock();
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
index 4681c29..b77210e 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
@@ -381,6 +381,8 @@ public class FSHLog extends AbstractFSWAL<Writer> {
     } finally {
       // Let the writer thread go regardless, whether error or not.
       if (zigzagLatch != null) {
+        // Reset rollRequested status
+        rollRequested.set(false);
         zigzagLatch.releaseSafePoint();
         // syncFuture will be null if we failed our wait on safe point above. 
Otherwise, if
         // latch was obtained successfully, the sync we threw in either 
trigger the latch or it
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java
index 5c2bd06..1bf9c6f 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java
@@ -435,7 +435,7 @@ public class TestWALLockup {
       // To stop logRoller, its server has to say it is stopped.
       Mockito.when(server.isStopped()).thenReturn(true);
       if (logRoller != null) {
-        logRoller.interrupt();
+        logRoller.close();
       }
       if (dodgyWAL != null) {
         dodgyWAL.close();
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java
index 399cdc4..9302428 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java
@@ -17,18 +17,49 @@
  */
 package org.apache.hadoop.hbase.regionserver.wal;
 
+import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
 import java.io.IOException;
+import java.io.UncheckedIOException;
 import java.util.List;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.regionserver.LogRoller;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.regionserver.SequenceId;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.hadoop.hbase.util.FutureUtils;
 import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.wal.WALEdit;
+import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.hbase.wal.WALKeyImpl;
+import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
+import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 import org.apache.hbase.thirdparty.io.netty.channel.Channel;
@@ -39,7 +70,7 @@ import 
org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;
 /**
  * Provides AsyncFSWAL test cases.
  */
-@Category({ RegionServerTests.class, MediumTests.class })
+@Category({ RegionServerTests.class, LargeTests.class })
 public class TestAsyncFSWAL extends AbstractTestFSWAL {
 
   @ClassRule
@@ -90,4 +121,101 @@ public class TestAsyncFSWAL extends AbstractTestFSWAL {
     asyncFSWAL.init();
     return asyncFSWAL;
   }
+
+  @Test
+  public void testBrokenWriter() throws Exception {
+    Server server = mock(Server.class);
+    when(server.getConfiguration()).thenReturn(CONF);
+    RegionServerServices services = mock(RegionServerServices.class);
+    TableDescriptor td = 
TableDescriptorBuilder.newBuilder(TableName.valueOf("table"))
+        .setColumnFamily(ColumnFamilyDescriptorBuilder.of("row")).build();
+    RegionInfo ri = RegionInfoBuilder.newBuilder(td.getTableName()).build();
+    MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
+    NavigableMap<byte[], Integer> scopes = new 
TreeMap<>(Bytes.BYTES_COMPARATOR);
+    for (byte[] fam : td.getColumnFamilyNames()) {
+      scopes.put(fam, 0);
+    }
+    long timestamp = System.currentTimeMillis();
+    String testName = currentTest.getMethodName();
+    AtomicInteger failedCount = new AtomicInteger(0);
+    try (LogRoller roller = new LogRoller(server, services);
+        AsyncFSWAL wal = new AsyncFSWAL(FS, CommonFSUtils.getWALRootDir(CONF), 
DIR.toString(),
+            testName, CONF, null, true, null, null, GROUP, CHANNEL_CLASS) {
+
+          @Override
+          protected AsyncWriter createWriterInstance(Path path) throws 
IOException {
+            AsyncWriter writer = super.createWriterInstance(path);
+            return new AsyncWriter() {
+
+              @Override
+              public void close() throws IOException {
+                writer.close();
+              }
+
+              @Override
+              public long getLength() {
+                return writer.getLength();
+              }
+
+              @Override
+              public CompletableFuture<Long> sync() {
+                CompletableFuture<Long> result = writer.sync();
+                if (failedCount.incrementAndGet() < 1000) {
+                  CompletableFuture<Long> future = new CompletableFuture<>();
+                  FutureUtils.addListener(result,
+                    (r, e) -> future.completeExceptionally(new 
IOException("Inject Error")));
+                  return future;
+                } else {
+                  return result;
+                }
+              }
+
+              @Override
+              public void append(Entry entry) {
+                writer.append(entry);
+              }
+            };
+          }
+        }) {
+      wal.init();
+      roller.addWAL(wal);
+      roller.start();
+      int numThreads = 10;
+      AtomicReference<Exception> error = new AtomicReference<>();
+      Thread[] threads = new Thread[numThreads];
+      for (int i = 0; i < 10; i++) {
+        final int index = i;
+        threads[index] = new Thread("Write-Thread-" + index) {
+
+          @Override
+          public void run() {
+            byte[] row = Bytes.toBytes("row" + index);
+            WALEdit cols = new WALEdit();
+            cols.add(new KeyValue(row, row, row, timestamp + index, row));
+            WALKeyImpl key = new WALKeyImpl(ri.getEncodedNameAsBytes(), 
td.getTableName(),
+                SequenceId.NO_SEQUENCE_ID, timestamp, WALKey.EMPTY_UUIDS, 
HConstants.NO_NONCE,
+                HConstants.NO_NONCE, mvcc, scopes);
+            try {
+              wal.append(ri, key, cols, true);
+            } catch (IOException e) {
+              // should not happen
+              throw new UncheckedIOException(e);
+            }
+            try {
+              wal.sync();
+            } catch (IOException e) {
+              error.set(e);
+            }
+          }
+        };
+      }
+      for (Thread t : threads) {
+        t.start();
+      }
+      for (Thread t : threads) {
+        t.join();
+      }
+      assertNull(error.get());
+    }
+  }
 }

Reply via email to