This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2.4 by this push:
new bc6295e0a54 HBASE-27211 Data race in MonitoredTaskImpl could cause
split wal failure (#4630)
bc6295e0a54 is described below
commit bc6295e0a547de1cc051c6d86938898fa08fa581
Author: Duo Zhang <[email protected]>
AuthorDate: Mon Jul 18 19:25:06 2022 +0800
HBASE-27211 Data race in MonitoredTaskImpl could cause split wal failure
(#4630)
Signed-off-by: Guanghao Zhang <[email protected]>
Signed-off-by: Xin Sun <[email protected]>
(cherry picked from commit 9ab0b1504f589c78cd4e01895027d2ce954ba7f7)
Conflicts:
hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
---
.../hbase/master/snapshot/TakeSnapshotHandler.java | 3 +-
.../hbase/monitoring/MonitoredRPCHandlerImpl.java | 4 +-
.../hadoop/hbase/monitoring/MonitoredTask.java | 11 +---
.../hadoop/hbase/monitoring/MonitoredTaskImpl.java | 72 ++++++++--------------
.../hadoop/hbase/monitoring/TaskMonitor.java | 12 ++--
.../apache/hadoop/hbase/regionserver/HRegion.java | 10 +--
.../org/apache/hadoop/hbase/wal/WALSplitter.java | 3 +-
.../hadoop/hbase/monitoring/TestTaskMonitor.java | 14 ++---
.../org/apache/hadoop/hbase/wal/TestWALSplit.java | 4 ++
.../wal/TestWALSplitBoundedLogWriterCreation.java | 15 +----
10 files changed, 53 insertions(+), 95 deletions(-)
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/TakeSnapshotHandler.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/TakeSnapshotHandler.java
index a36edf9d230..acd8ce972a6 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/TakeSnapshotHandler.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/TakeSnapshotHandler.java
@@ -130,8 +130,7 @@ public abstract class TakeSnapshotHandler extends
EventHandler
this.verifier = new MasterSnapshotVerifier(masterServices, snapshot,
workingDirFs);
// update the running tasks
this.status = TaskMonitor.get()
- .createStatus("Taking " + snapshot.getType() + " snapshot on table: " +
snapshotTable);
- this.status.enableStatusJournal(true);
+ .createStatus("Taking " + snapshot.getType() + " snapshot on table: " +
snapshotTable, true);
this.snapshotManifest =
SnapshotManifest.create(conf, rootFs, workingDir, snapshot, monitor,
status);
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandlerImpl.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandlerImpl.java
index b1e50831ca1..3053b79fe98 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandlerImpl.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandlerImpl.java
@@ -44,7 +44,7 @@ public class MonitoredRPCHandlerImpl extends
MonitoredTaskImpl implements Monito
private Map<String, Object> callInfoMap = new HashMap<>();
public MonitoredRPCHandlerImpl() {
- super();
+ super(false);
// in this implementation, WAITING indicates that the handler is not
// actively servicing an RPC call.
setState(State.WAITING);
@@ -234,7 +234,7 @@ public class MonitoredRPCHandlerImpl extends
MonitoredTaskImpl implements Monito
return map;
}
Map<String, Object> rpcJSON = new HashMap<>();
- ArrayList paramList = new ArrayList();
+ ArrayList<Object> paramList = new ArrayList<>();
map.put("rpcCall", rpcJSON);
rpcJSON.put("queuetimems", getRPCQueueTime());
rpcJSON.put("starttimems", getRPCStartTime());
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTask.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTask.java
index 5507e1607b6..08a82a3e9de 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTask.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTask.java
@@ -69,16 +69,11 @@ public interface MonitoredTask extends Cloneable {
void setWarnTime(final long t);
- List<StatusJournalEntry> getStatusJournal();
-
/**
- * Enable journal that will store all statuses that have been set along with
the time stamps when
- * they were set.
- * @param includeCurrentStatus whether to include the current set status in
the journal
+ * If journal is enabled, we will store all statuses that have been set
along with the time stamps
+ * when they were set. This method will give you all the journals stored so
far.
*/
- void enableStatusJournal(boolean includeCurrentStatus);
-
- void disableStatusJournal();
+ List<StatusJournalEntry> getStatusJournal();
String prettyPrintJournal();
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTaskImpl.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTaskImpl.java
index d0c86b81502..f92d5655726 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTaskImpl.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTaskImpl.java
@@ -18,14 +18,16 @@
package org.apache.hadoop.hbase.monitoring;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.hadoop.hbase.util.GsonUtil;
import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
import org.apache.hbase.thirdparty.com.google.gson.Gson;
@InterfaceAudience.Private
@@ -39,22 +41,25 @@ class MonitoredTaskImpl implements MonitoredTask {
private volatile String description;
protected volatile State state = State.RUNNING;
-
- private boolean journalEnabled = false;
- private List<StatusJournalEntry> journal;
+ private final ConcurrentLinkedQueue<StatusJournalEntry> journal;
private static final Gson GSON = GsonUtil.createGson().create();
- public MonitoredTaskImpl() {
+ public MonitoredTaskImpl(boolean enableJournal) {
startTime = System.currentTimeMillis();
statusTime = startTime;
stateTime = startTime;
warnTime = startTime;
+ if (enableJournal) {
+ journal = new ConcurrentLinkedQueue<>();
+ } else {
+ journal = null;
+ }
}
- private static class StatusJournalEntryImpl implements StatusJournalEntry {
- private long statusTime;
- private String status;
+ private static final class StatusJournalEntryImpl implements
StatusJournalEntry {
+ private final long statusTime;
+ private final String status;
public StatusJournalEntryImpl(String status, long statusTime) {
this.status = status;
@@ -73,11 +78,7 @@ class MonitoredTaskImpl implements MonitoredTask {
@Override
public String toString() {
- StringBuilder sb = new StringBuilder();
- sb.append(status);
- sb.append(" at ");
- sb.append(statusTime);
- return sb.toString();
+ return status + " at " + statusTime;
}
}
@@ -161,7 +162,7 @@ class MonitoredTaskImpl implements MonitoredTask {
public void setStatus(String status) {
this.status = status;
statusTime = System.currentTimeMillis();
- if (journalEnabled) {
+ if (journal != null) {
journal.add(new StatusJournalEntryImpl(this.status, statusTime));
}
}
@@ -239,52 +240,29 @@ class MonitoredTaskImpl implements MonitoredTask {
if (journal == null) {
return Collections.emptyList();
} else {
- return Collections.unmodifiableList(journal);
+ return ImmutableList.copyOf(journal);
}
}
- /**
- * Enables journaling of this monitored task, the first invocation will
lazily initialize the
- * journal. The journal implementation itself and this method are not thread
safe
- */
- @Override
- public void enableStatusJournal(boolean includeCurrentStatus) {
- if (journalEnabled && journal != null) {
- return;
- }
- journalEnabled = true;
- if (journal == null) {
- journal = new ArrayList<StatusJournalEntry>();
- }
- if (includeCurrentStatus && status != null) {
- journal.add(new StatusJournalEntryImpl(status, statusTime));
- }
- }
-
- @Override
- public void disableStatusJournal() {
- journalEnabled = false;
- }
-
@Override
public String prettyPrintJournal() {
- if (!journalEnabled) {
+ if (journal == null) {
return "";
}
StringBuilder sb = new StringBuilder();
- for (int i = 0; i < journal.size(); i++) {
- StatusJournalEntry je = journal.get(i);
- sb.append(je.toString());
- if (i != 0) {
- StatusJournalEntry jep = journal.get(i - 1);
- long delta = je.getTimeStamp() - jep.getTimeStamp();
+ Iterator<StatusJournalEntry> iter = journal.iterator();
+ StatusJournalEntry previousEntry = null;
+ while (iter.hasNext()) {
+ StatusJournalEntry entry = iter.next();
+ sb.append(entry);
+ if (previousEntry != null) {
+ long delta = entry.getTimeStamp() - previousEntry.getTimeStamp();
if (delta != 0) {
sb.append(" (+" + delta + " ms)");
}
}
- sb.append("\n");
+ previousEntry = entry;
}
return sb.toString();
}
-
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java
index b18790c6176..c2425cbbc1f 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java
@@ -58,7 +58,7 @@ public class TaskMonitor {
private final int maxTasks;
private final long rpcWarnTime;
private final long expirationTime;
- private final CircularFifoQueue tasks;
+ private final CircularFifoQueue<TaskAndWeakRefPair> tasks;
private final List<TaskAndWeakRefPair> rpcTasks;
private final long monitorInterval;
private Thread monitorThread;
@@ -67,7 +67,7 @@ public class TaskMonitor {
maxTasks = conf.getInt(MAX_TASKS_KEY, DEFAULT_MAX_TASKS);
expirationTime = conf.getLong(EXPIRATION_TIME_KEY,
DEFAULT_EXPIRATION_TIME);
rpcWarnTime = conf.getLong(RPC_WARN_TIME_KEY, DEFAULT_RPC_WARN_TIME);
- tasks = new CircularFifoQueue(maxTasks);
+ tasks = new CircularFifoQueue<>(maxTasks);
rpcTasks = Lists.newArrayList();
monitorInterval = conf.getLong(MONITOR_INTERVAL_KEY,
DEFAULT_MONITOR_INTERVAL);
monitorThread = new Thread(new MonitorRunnable());
@@ -84,8 +84,12 @@ public class TaskMonitor {
return instance;
}
- public synchronized MonitoredTask createStatus(String description) {
- MonitoredTask stat = new MonitoredTaskImpl();
+ public MonitoredTask createStatus(String description) {
+ return createStatus(description, false);
+ }
+
+ public synchronized MonitoredTask createStatus(String description, boolean
enableJournal) {
+ MonitoredTask stat = new MonitoredTaskImpl(enableJournal);
stat.setDescription(description);
MonitoredTask proxy = (MonitoredTask)
Proxy.newProxyInstance(stat.getClass().getClassLoader(),
new Class<?>[] { MonitoredTask.class }, new
PassthroughInvocationHandler<>(stat));
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 04504afe39a..2398be5b5a3 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -928,7 +928,6 @@ public class HRegion implements HeapSize,
PropagatingConfigurationObserver, Regi
* Initialize this region.
* @param reporter Tickle every so often if initialize is taking a while.
* @return What the next sequence (edit) id should be.
- * @throws IOException e
*/
long initialize(final CancelableProgressable reporter) throws IOException {
@@ -938,8 +937,7 @@ public class HRegion implements HeapSize,
PropagatingConfigurationObserver, Regi
+ " should have at least one column family.");
}
- MonitoredTask status = TaskMonitor.get().createStatus("Initializing region
" + this);
- status.enableStatusJournal(true);
+ MonitoredTask status = TaskMonitor.get().createStatus("Initializing region
" + this, true);
long nextSeqId = -1;
try {
nextSeqId = initializeRegionInternals(reporter, status);
@@ -1560,8 +1558,8 @@ public class HRegion implements HeapSize,
PropagatingConfigurationObserver, Regi
// Only allow one thread to close at a time. Serialize them so dual
// threads attempting to close will run up against each other.
MonitoredTask status = TaskMonitor.get().createStatus(
- "Closing region " + this.getRegionInfo().getEncodedName() + (abort ? "
due to abort" : ""));
- status.enableStatusJournal(true);
+ "Closing region " + this.getRegionInfo().getEncodedName() + (abort ? "
due to abort" : ""),
+ true);
status.setStatus("Waiting for close lock");
try {
synchronized (closeLock) {
@@ -2255,7 +2253,6 @@ public class HRegion implements HeapSize,
PropagatingConfigurationObserver, Regi
}
status = TaskMonitor.get().createStatus("Compacting " + store + " in " +
this);
- status.enableStatusJournal(false);
if (this.closed.get()) {
String msg = "Skipping compaction on " + this + " because closed";
LOG.debug(msg);
@@ -2392,7 +2389,6 @@ public class HRegion implements HeapSize,
PropagatingConfigurationObserver, Regi
return new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH, msg, false);
}
MonitoredTask status = TaskMonitor.get().createStatus("Flushing " + this);
- status.enableStatusJournal(false);
status.setStatus("Acquiring readlock on region");
// block waiting for the lock for flushing cache
lock.readLock().lock();
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
index f1bc9c8dcf7..a6463094bea 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
@@ -288,8 +288,7 @@ public class WALSplitter {
int editsCount = 0;
int editsSkipped = 0;
MonitoredTask status =
- TaskMonitor.get().createStatus("Splitting " + wal + " to temporary
staging area.");
- status.enableStatusJournal(true);
+ TaskMonitor.get().createStatus("Splitting " + wal + " to temporary
staging area.", true);
Reader walReader = null;
this.fileBeingSplit = walStatus;
long startTS = EnvironmentEdgeManager.currentTime();
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/monitoring/TestTaskMonitor.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/monitoring/TestTaskMonitor.java
index 00a3cd6431d..f7623c4d803 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/monitoring/TestTaskMonitor.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/monitoring/TestTaskMonitor.java
@@ -212,23 +212,17 @@ public class TestTaskMonitor {
TaskMonitor tm = new TaskMonitor(new Configuration());
MonitoredTask task = tm.createStatus("Test task");
assertTrue(task.getStatusJournal().isEmpty());
- task.disableStatusJournal();
task.setStatus("status1");
// journal should be empty since it is disabled
assertTrue(task.getStatusJournal().isEmpty());
- task.enableStatusJournal(true);
- // check existing status entered in journal
- assertEquals("status1", task.getStatusJournal().get(0).getStatus());
- assertTrue(task.getStatusJournal().get(0).getTimeStamp() > 0);
- task.disableStatusJournal();
+ task = tm.createStatus("Test task with journal", true);
task.setStatus("status2");
- // check status 2 not added since disabled
- assertEquals(1, task.getStatusJournal().size());
- task.enableStatusJournal(false);
- // size should still be 1 since we didn't include current status
assertEquals(1, task.getStatusJournal().size());
+ assertEquals("status2", task.getStatusJournal().get(0).getStatus());
task.setStatus("status3");
+ assertEquals(2, task.getStatusJournal().size());
assertEquals("status3", task.getStatusJournal().get(1).getStatus());
+ task.prettyPrintJournal();
tm.shutdown();
}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java
index 58159424e01..904f261a806 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java
@@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import static org.junit.Assume.assumeFalse;
import java.io.FileNotFoundException;
import java.io.IOException;
@@ -945,6 +946,9 @@ public class TestWALSplit {
*/
@Test
public void testThreadingSlowWriterSmallBuffer() throws Exception {
+ // The logic of this test has conflict with the limit writers split logic,
skip this test for
+ // TestWALSplitBoundedLogWriterCreation
+ assumeFalse(this instanceof TestWALSplitBoundedLogWriterCreation);
doTestThreading(200, 1024, 50);
}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplitBoundedLogWriterCreation.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplitBoundedLogWriterCreation.java
index 2a9e77ba60b..940248eb6f9 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplitBoundedLogWriterCreation.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplitBoundedLogWriterCreation.java
@@ -19,13 +19,12 @@ package org.apache.hadoop.hbase.wal;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.junit.BeforeClass;
import org.junit.ClassRule;
-import org.junit.Ignore;
-import org.junit.Test;
import org.junit.experimental.categories.Category;
-@Category(LargeTests.class)
+@Category({ RegionServerTests.class, LargeTests.class })
public class TestWALSplitBoundedLogWriterCreation extends TestWALSplit {
@ClassRule
@@ -37,14 +36,4 @@ public class TestWALSplitBoundedLogWriterCreation extends
TestWALSplit {
TestWALSplit.setUpBeforeClass();
TEST_UTIL.getConfiguration().setBoolean(WALSplitter.SPLIT_WRITER_CREATION_BOUNDED,
true);
}
-
- /**
- * The logic of this test has conflict with the limit writers split logic,
skip this test
- */
- @Override
- @Test
- @Ignore
- public void testThreadingSlowWriterSmallBuffer() throws Exception {
- super.testThreadingSlowWriterSmallBuffer();
- }
}