This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new bc6295e0a54 HBASE-27211 Data race in MonitoredTaskImpl could cause 
split wal failure (#4630)
bc6295e0a54 is described below

commit bc6295e0a547de1cc051c6d86938898fa08fa581
Author: Duo Zhang <[email protected]>
AuthorDate: Mon Jul 18 19:25:06 2022 +0800

    HBASE-27211 Data race in MonitoredTaskImpl could cause split wal failure 
(#4630)
    
    Signed-off-by: Guanghao Zhang <[email protected]>
    Signed-off-by: Xin Sun <[email protected]>
    (cherry picked from commit 9ab0b1504f589c78cd4e01895027d2ce954ba7f7)
    
    Conflicts:
            
hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java
            
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
---
 .../hbase/master/snapshot/TakeSnapshotHandler.java |  3 +-
 .../hbase/monitoring/MonitoredRPCHandlerImpl.java  |  4 +-
 .../hadoop/hbase/monitoring/MonitoredTask.java     | 11 +---
 .../hadoop/hbase/monitoring/MonitoredTaskImpl.java | 72 ++++++++--------------
 .../hadoop/hbase/monitoring/TaskMonitor.java       | 12 ++--
 .../apache/hadoop/hbase/regionserver/HRegion.java  | 10 +--
 .../org/apache/hadoop/hbase/wal/WALSplitter.java   |  3 +-
 .../hadoop/hbase/monitoring/TestTaskMonitor.java   | 14 ++---
 .../org/apache/hadoop/hbase/wal/TestWALSplit.java  |  4 ++
 .../wal/TestWALSplitBoundedLogWriterCreation.java  | 15 +----
 10 files changed, 53 insertions(+), 95 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/TakeSnapshotHandler.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/TakeSnapshotHandler.java
index a36edf9d230..acd8ce972a6 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/TakeSnapshotHandler.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/TakeSnapshotHandler.java
@@ -130,8 +130,7 @@ public abstract class TakeSnapshotHandler extends 
EventHandler
     this.verifier = new MasterSnapshotVerifier(masterServices, snapshot, 
workingDirFs);
     // update the running tasks
     this.status = TaskMonitor.get()
-      .createStatus("Taking " + snapshot.getType() + " snapshot on table: " + 
snapshotTable);
-    this.status.enableStatusJournal(true);
+      .createStatus("Taking " + snapshot.getType() + " snapshot on table: " + 
snapshotTable, true);
     this.snapshotManifest =
       SnapshotManifest.create(conf, rootFs, workingDir, snapshot, monitor, 
status);
   }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandlerImpl.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandlerImpl.java
index b1e50831ca1..3053b79fe98 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandlerImpl.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandlerImpl.java
@@ -44,7 +44,7 @@ public class MonitoredRPCHandlerImpl extends 
MonitoredTaskImpl implements Monito
   private Map<String, Object> callInfoMap = new HashMap<>();
 
   public MonitoredRPCHandlerImpl() {
-    super();
+    super(false);
     // in this implementation, WAITING indicates that the handler is not
     // actively servicing an RPC call.
     setState(State.WAITING);
@@ -234,7 +234,7 @@ public class MonitoredRPCHandlerImpl extends 
MonitoredTaskImpl implements Monito
       return map;
     }
     Map<String, Object> rpcJSON = new HashMap<>();
-    ArrayList paramList = new ArrayList();
+    ArrayList<Object> paramList = new ArrayList<>();
     map.put("rpcCall", rpcJSON);
     rpcJSON.put("queuetimems", getRPCQueueTime());
     rpcJSON.put("starttimems", getRPCStartTime());
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTask.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTask.java
index 5507e1607b6..08a82a3e9de 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTask.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTask.java
@@ -69,16 +69,11 @@ public interface MonitoredTask extends Cloneable {
 
   void setWarnTime(final long t);
 
-  List<StatusJournalEntry> getStatusJournal();
-
   /**
-   * Enable journal that will store all statuses that have been set along with 
the time stamps when
-   * they were set.
-   * @param includeCurrentStatus whether to include the current set status in 
the journal
+   * If journal is enabled, we will store all statuses that have been set 
along with the time stamps
+   * when they were set. This method will give you all the journals stored so 
far.
    */
-  void enableStatusJournal(boolean includeCurrentStatus);
-
-  void disableStatusJournal();
+  List<StatusJournalEntry> getStatusJournal();
 
   String prettyPrintJournal();
 
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTaskImpl.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTaskImpl.java
index d0c86b81502..f92d5655726 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTaskImpl.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTaskImpl.java
@@ -18,14 +18,16 @@
 package org.apache.hadoop.hbase.monitoring;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import org.apache.hadoop.hbase.util.GsonUtil;
 import org.apache.yetus.audience.InterfaceAudience;
 
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
 import org.apache.hbase.thirdparty.com.google.gson.Gson;
 
 @InterfaceAudience.Private
@@ -39,22 +41,25 @@ class MonitoredTaskImpl implements MonitoredTask {
   private volatile String description;
 
   protected volatile State state = State.RUNNING;
-
-  private boolean journalEnabled = false;
-  private List<StatusJournalEntry> journal;
+  private final ConcurrentLinkedQueue<StatusJournalEntry> journal;
 
   private static final Gson GSON = GsonUtil.createGson().create();
 
-  public MonitoredTaskImpl() {
+  public MonitoredTaskImpl(boolean enableJournal) {
     startTime = System.currentTimeMillis();
     statusTime = startTime;
     stateTime = startTime;
     warnTime = startTime;
+    if (enableJournal) {
+      journal = new ConcurrentLinkedQueue<>();
+    } else {
+      journal = null;
+    }
   }
 
-  private static class StatusJournalEntryImpl implements StatusJournalEntry {
-    private long statusTime;
-    private String status;
+  private static final class StatusJournalEntryImpl implements 
StatusJournalEntry {
+    private final long statusTime;
+    private final String status;
 
     public StatusJournalEntryImpl(String status, long statusTime) {
       this.status = status;
@@ -73,11 +78,7 @@ class MonitoredTaskImpl implements MonitoredTask {
 
     @Override
     public String toString() {
-      StringBuilder sb = new StringBuilder();
-      sb.append(status);
-      sb.append(" at ");
-      sb.append(statusTime);
-      return sb.toString();
+      return status + " at " + statusTime;
     }
   }
 
@@ -161,7 +162,7 @@ class MonitoredTaskImpl implements MonitoredTask {
   public void setStatus(String status) {
     this.status = status;
     statusTime = System.currentTimeMillis();
-    if (journalEnabled) {
+    if (journal != null) {
       journal.add(new StatusJournalEntryImpl(this.status, statusTime));
     }
   }
@@ -239,52 +240,29 @@ class MonitoredTaskImpl implements MonitoredTask {
     if (journal == null) {
       return Collections.emptyList();
     } else {
-      return Collections.unmodifiableList(journal);
+      return ImmutableList.copyOf(journal);
     }
   }
 
-  /**
-   * Enables journaling of this monitored task, the first invocation will 
lazily initialize the
-   * journal. The journal implementation itself and this method are not thread 
safe
-   */
-  @Override
-  public void enableStatusJournal(boolean includeCurrentStatus) {
-    if (journalEnabled && journal != null) {
-      return;
-    }
-    journalEnabled = true;
-    if (journal == null) {
-      journal = new ArrayList<StatusJournalEntry>();
-    }
-    if (includeCurrentStatus && status != null) {
-      journal.add(new StatusJournalEntryImpl(status, statusTime));
-    }
-  }
-
-  @Override
-  public void disableStatusJournal() {
-    journalEnabled = false;
-  }
-
   @Override
   public String prettyPrintJournal() {
-    if (!journalEnabled) {
+    if (journal == null) {
       return "";
     }
     StringBuilder sb = new StringBuilder();
-    for (int i = 0; i < journal.size(); i++) {
-      StatusJournalEntry je = journal.get(i);
-      sb.append(je.toString());
-      if (i != 0) {
-        StatusJournalEntry jep = journal.get(i - 1);
-        long delta = je.getTimeStamp() - jep.getTimeStamp();
+    Iterator<StatusJournalEntry> iter = journal.iterator();
+    StatusJournalEntry previousEntry = null;
+    while (iter.hasNext()) {
+      StatusJournalEntry entry = iter.next();
+      sb.append(entry);
+      if (previousEntry != null) {
+        long delta = entry.getTimeStamp() - previousEntry.getTimeStamp();
         if (delta != 0) {
           sb.append(" (+" + delta + " ms)");
         }
       }
-      sb.append("\n");
+      previousEntry = entry;
     }
     return sb.toString();
   }
-
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java
index b18790c6176..c2425cbbc1f 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java
@@ -58,7 +58,7 @@ public class TaskMonitor {
   private final int maxTasks;
   private final long rpcWarnTime;
   private final long expirationTime;
-  private final CircularFifoQueue tasks;
+  private final CircularFifoQueue<TaskAndWeakRefPair> tasks;
   private final List<TaskAndWeakRefPair> rpcTasks;
   private final long monitorInterval;
   private Thread monitorThread;
@@ -67,7 +67,7 @@ public class TaskMonitor {
     maxTasks = conf.getInt(MAX_TASKS_KEY, DEFAULT_MAX_TASKS);
     expirationTime = conf.getLong(EXPIRATION_TIME_KEY, 
DEFAULT_EXPIRATION_TIME);
     rpcWarnTime = conf.getLong(RPC_WARN_TIME_KEY, DEFAULT_RPC_WARN_TIME);
-    tasks = new CircularFifoQueue(maxTasks);
+    tasks = new CircularFifoQueue<>(maxTasks);
     rpcTasks = Lists.newArrayList();
     monitorInterval = conf.getLong(MONITOR_INTERVAL_KEY, 
DEFAULT_MONITOR_INTERVAL);
     monitorThread = new Thread(new MonitorRunnable());
@@ -84,8 +84,12 @@ public class TaskMonitor {
     return instance;
   }
 
-  public synchronized MonitoredTask createStatus(String description) {
-    MonitoredTask stat = new MonitoredTaskImpl();
+  public MonitoredTask createStatus(String description) {
+    return createStatus(description, false);
+  }
+
+  public synchronized MonitoredTask createStatus(String description, boolean 
enableJournal) {
+    MonitoredTask stat = new MonitoredTaskImpl(enableJournal);
     stat.setDescription(description);
     MonitoredTask proxy = (MonitoredTask) 
Proxy.newProxyInstance(stat.getClass().getClassLoader(),
       new Class<?>[] { MonitoredTask.class }, new 
PassthroughInvocationHandler<>(stat));
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 04504afe39a..2398be5b5a3 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -928,7 +928,6 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
    * Initialize this region.
    * @param reporter Tickle every so often if initialize is taking a while.
    * @return What the next sequence (edit) id should be.
-   * @throws IOException e
    */
   long initialize(final CancelableProgressable reporter) throws IOException {
 
@@ -938,8 +937,7 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
         + " should have at least one column family.");
     }
 
-    MonitoredTask status = TaskMonitor.get().createStatus("Initializing region 
" + this);
-    status.enableStatusJournal(true);
+    MonitoredTask status = TaskMonitor.get().createStatus("Initializing region 
" + this, true);
     long nextSeqId = -1;
     try {
       nextSeqId = initializeRegionInternals(reporter, status);
@@ -1560,8 +1558,8 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
     // Only allow one thread to close at a time. Serialize them so dual
     // threads attempting to close will run up against each other.
     MonitoredTask status = TaskMonitor.get().createStatus(
-      "Closing region " + this.getRegionInfo().getEncodedName() + (abort ? " 
due to abort" : ""));
-    status.enableStatusJournal(true);
+      "Closing region " + this.getRegionInfo().getEncodedName() + (abort ? " 
due to abort" : ""),
+      true);
     status.setStatus("Waiting for close lock");
     try {
       synchronized (closeLock) {
@@ -2255,7 +2253,6 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
       }
 
       status = TaskMonitor.get().createStatus("Compacting " + store + " in " + 
this);
-      status.enableStatusJournal(false);
       if (this.closed.get()) {
         String msg = "Skipping compaction on " + this + " because closed";
         LOG.debug(msg);
@@ -2392,7 +2389,6 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
       return new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH, msg, false);
     }
     MonitoredTask status = TaskMonitor.get().createStatus("Flushing " + this);
-    status.enableStatusJournal(false);
     status.setStatus("Acquiring readlock on region");
     // block waiting for the lock for flushing cache
     lock.readLock().lock();
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
index f1bc9c8dcf7..a6463094bea 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
@@ -288,8 +288,7 @@ public class WALSplitter {
     int editsCount = 0;
     int editsSkipped = 0;
     MonitoredTask status =
-      TaskMonitor.get().createStatus("Splitting " + wal + " to temporary 
staging area.");
-    status.enableStatusJournal(true);
+      TaskMonitor.get().createStatus("Splitting " + wal + " to temporary 
staging area.", true);
     Reader walReader = null;
     this.fileBeingSplit = walStatus;
     long startTS = EnvironmentEdgeManager.currentTime();
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/monitoring/TestTaskMonitor.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/monitoring/TestTaskMonitor.java
index 00a3cd6431d..f7623c4d803 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/monitoring/TestTaskMonitor.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/monitoring/TestTaskMonitor.java
@@ -212,23 +212,17 @@ public class TestTaskMonitor {
     TaskMonitor tm = new TaskMonitor(new Configuration());
     MonitoredTask task = tm.createStatus("Test task");
     assertTrue(task.getStatusJournal().isEmpty());
-    task.disableStatusJournal();
     task.setStatus("status1");
     // journal should be empty since it is disabled
     assertTrue(task.getStatusJournal().isEmpty());
-    task.enableStatusJournal(true);
-    // check existing status entered in journal
-    assertEquals("status1", task.getStatusJournal().get(0).getStatus());
-    assertTrue(task.getStatusJournal().get(0).getTimeStamp() > 0);
-    task.disableStatusJournal();
+    task = tm.createStatus("Test task with journal", true);
     task.setStatus("status2");
-    // check status 2 not added since disabled
-    assertEquals(1, task.getStatusJournal().size());
-    task.enableStatusJournal(false);
-    // size should still be 1 since we didn't include current status
     assertEquals(1, task.getStatusJournal().size());
+    assertEquals("status2", task.getStatusJournal().get(0).getStatus());
     task.setStatus("status3");
+    assertEquals(2, task.getStatusJournal().size());
     assertEquals("status3", task.getStatusJournal().get(1).getStatus());
+    task.prettyPrintJournal();
     tm.shutdown();
   }
 
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java
index 58159424e01..904f261a806 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java
@@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.junit.Assume.assumeFalse;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
@@ -945,6 +946,9 @@ public class TestWALSplit {
    */
   @Test
   public void testThreadingSlowWriterSmallBuffer() throws Exception {
+    // The logic of this test has conflict with the limit writers split logic, 
skip this test for
+    // TestWALSplitBoundedLogWriterCreation
+    assumeFalse(this instanceof TestWALSplitBoundedLogWriterCreation);
     doTestThreading(200, 1024, 50);
   }
 
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplitBoundedLogWriterCreation.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplitBoundedLogWriterCreation.java
index 2a9e77ba60b..940248eb6f9 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplitBoundedLogWriterCreation.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplitBoundedLogWriterCreation.java
@@ -19,13 +19,12 @@ package org.apache.hadoop.hbase.wal;
 
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
-import org.junit.Ignore;
-import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-@Category(LargeTests.class)
+@Category({ RegionServerTests.class, LargeTests.class })
 public class TestWALSplitBoundedLogWriterCreation extends TestWALSplit {
 
   @ClassRule
@@ -37,14 +36,4 @@ public class TestWALSplitBoundedLogWriterCreation extends 
TestWALSplit {
     TestWALSplit.setUpBeforeClass();
     
TEST_UTIL.getConfiguration().setBoolean(WALSplitter.SPLIT_WRITER_CREATION_BOUNDED,
 true);
   }
-
-  /**
-   * The logic of this test has conflict with the limit writers split logic, 
skip this test
-   */
-  @Override
-  @Test
-  @Ignore
-  public void testThreadingSlowWriterSmallBuffer() throws Exception {
-    super.testThreadingSlowWriterSmallBuffer();
-  }
 }

Reply via email to