This is an automated email from the ASF dual-hosted git repository.
mivanac pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 9154cfc7ed GEODE-10226: Added monitoring of async writer (#7667)
9154cfc7ed is described below
commit 9154cfc7ed70f1accd695a7e99714d3886e29ba9
Author: Mario Ivanac <[email protected]>
AuthorDate: Wed May 25 07:32:44 2022 +0200
GEODE-10226: Added monitoring of async writer (#7667)
* GEODE-10226: Added monitoring of async writer
---
.../apache/geode/internal/cache/DiskStoreImpl.java | 121 ++++++++++++---------
.../internal/monitoring/ThreadsMonitoring.java | 4 +-
.../internal/monitoring/ThreadsMonitoringImpl.java | 4 +
.../monitoring/executor/AbstractExecutor.java | 4 +
...Executor.java => AsyncWriterExecutorGroup.java} | 28 +----
.../monitoring/executor/SuspendableExecutor.java | 9 ++
.../cache/DiskStoreImplValueRecoveryTest.java | 14 +++
.../geode/internal/cache/FlusherThreadTest.java | 17 +++
.../monitoring/ThreadsMonitoringImplJUnitTest.java | 1 +
.../monitoring/ThreadsMonitoringJUnitTest.java | 6 +-
.../executor/SuspendableExecutorTest.java | 11 ++
11 files changed, 142 insertions(+), 77 deletions(-)
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java
index 9dee1c1c77..0f9865b720 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java
@@ -19,6 +19,7 @@ import static
org.apache.geode.distributed.ConfigurationProperties.CACHE_XML_FIL
import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
import static
org.apache.geode.internal.cache.entries.DiskEntry.Helper.readRawValue;
+import static
org.apache.geode.internal.monitoring.ThreadsMonitoring.Mode.AsyncWriterExecutor;
import java.io.File;
import java.io.FileOutputStream;
@@ -113,6 +114,8 @@ import
org.apache.geode.internal.cache.versions.RegionVersionVector;
import org.apache.geode.internal.cache.versions.VersionSource;
import org.apache.geode.internal.cache.versions.VersionStamp;
import org.apache.geode.internal.cache.versions.VersionTag;
+import org.apache.geode.internal.monitoring.ThreadsMonitoring;
+import org.apache.geode.internal.monitoring.executor.AbstractExecutor;
import org.apache.geode.internal.serialization.KnownVersion;
import org.apache.geode.internal.util.BlobHelper;
import org.apache.geode.logging.internal.executors.LoggingExecutors;
@@ -1649,6 +1652,10 @@ public class DiskStoreImpl implements DiskStore {
this.diskStore = diskStore;
}
+ private ThreadsMonitoring getThreadMonitoring() {
+ return
diskStore.getCache().getInternalDistributedSystem().getDM().getThreadMonitoring();
+ }
+
private boolean waitUntilFlushIsReady() throws InterruptedException {
if (diskStore.maxAsyncItems > 0) {
final long time = diskStore.getTimeInterval();
@@ -1716,67 +1723,79 @@ public class DiskStoreImpl implements DiskStore {
logger.debug("Async writer thread started");
}
boolean doingFlush = false;
+ final ThreadsMonitoring threadMonitoring = getThreadMonitoring();
+ final AbstractExecutor threadMonitorExecutor =
+ threadMonitoring.createAbstractExecutor(AsyncWriterExecutor);
+ threadMonitorExecutor.suspendMonitoring();
+ threadMonitoring.register(threadMonitorExecutor);
+
try {
while (waitUntilFlushIsReady()) {
- int drainCount = diskStore.fillDrainList();
- if (drainCount > 0) {
- Iterator<Object> it = diskStore.getDrainList().iterator();
- while (it.hasNext()) {
- Object o = it.next();
- if (o instanceof FlushNotifier) {
- flushChild();
- if (LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER) {
- if (!it.hasNext()) {
- doingFlush = false;
- CacheObserverHolder.getInstance().afterWritingBytes();
+ threadMonitorExecutor.resumeMonitoring();
+ try {
+ int drainCount = diskStore.fillDrainList();
+ if (drainCount > 0) {
+ Iterator<Object> it = diskStore.getDrainList().iterator();
+ while (it.hasNext()) {
+ threadMonitorExecutor.reportProgress();
+ Object o = it.next();
+ if (o instanceof FlushNotifier) {
+ flushChild();
+ if (LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER) {
+ if (!it.hasNext()) {
+ doingFlush = false;
+ CacheObserverHolder.getInstance().afterWritingBytes();
+ }
}
- }
- ((FlushNotifier) o).doFlush();
- } else {
- try {
- AsyncDiskEntry ade = (AsyncDiskEntry) o;
- InternalRegion region = ade.region;
- VersionTag tag = ade.tag;
- if (ade.versionOnly) {
- DiskEntry.Helper.doAsyncFlush(tag, region);
- } else {
- DiskEntry entry = ade.de;
- // We check isPendingAsync
- if (entry.getDiskId().isPendingAsync()) {
- if (LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER) {
- if (!doingFlush) {
- doingFlush = true;
- CacheObserverHolder.getInstance().goingToFlush();
- }
- }
- DiskEntry.Helper.doAsyncFlush(entry, region, tag);
+ ((FlushNotifier) o).doFlush();
+ } else {
+ try {
+ AsyncDiskEntry ade = (AsyncDiskEntry) o;
+ InternalRegion region = ade.region;
+ VersionTag tag = ade.tag;
+ if (ade.versionOnly) {
+ DiskEntry.Helper.doAsyncFlush(tag, region);
} else {
- // If it is no longer pending someone called
- // unscheduleAsyncWrite
- // so we don't need to write the entry, but
- // if we have a version tag we need to record the
- // operation
- // to update the RVV
- if (tag != null) {
- DiskEntry.Helper.doAsyncFlush(tag, region);
+ DiskEntry entry = ade.de;
+ // We check isPendingAsync
+ if (entry.getDiskId().isPendingAsync()) {
+ if (LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER) {
+ if (!doingFlush) {
+ doingFlush = true;
+ CacheObserverHolder.getInstance().goingToFlush();
+ }
+ }
+ DiskEntry.Helper.doAsyncFlush(entry, region, tag);
+ } else {
+ // If it is no longer pending someone called
+ // unscheduleAsyncWrite
+ // so we don't need to write the entry, but
+ // if we have a version tag we need to record the
+ // operation
+ // to update the RVV
+ if (tag != null) {
+ DiskEntry.Helper.doAsyncFlush(tag, region);
+ }
}
}
+ } catch (RegionDestroyedException ignore) {
+ // Normally we flush before closing or destroying a region
+ // but in some cases it is closed w/o flushing.
+ // So just ignore it; see bug 41305.
}
- } catch (RegionDestroyedException ignore) {
- // Normally we flush before closing or destroying a region
- // but in some cases it is closed w/o flushing.
- // So just ignore it; see bug 41305.
}
}
- }
- flushChild();
- if (doingFlush) {
- doingFlush = false;
- if (LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER) {
- CacheObserverHolder.getInstance().afterWritingBytes();
+ flushChild();
+ if (doingFlush) {
+ doingFlush = false;
+ if (LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER) {
+ CacheObserverHolder.getInstance().afterWritingBytes();
+ }
}
+ diskStore.getStats().incQueueSize(-drainCount);
}
- diskStore.getStats().incQueueSize(-drainCount);
+ } finally {
+ threadMonitorExecutor.suspendMonitoring();
}
}
} catch (InterruptedException ie) {
@@ -1802,6 +1821,8 @@ public class DiskStoreImpl implements DiskStore {
}
diskStore.flusherThreadTerminated = true;
diskStore.stopFlusher = true; // set this before calling
handleDiskAccessException
+ threadMonitoring.unregister(threadMonitorExecutor);
+
// or it will hang
if (fatalDae != null) {
diskStore.handleDiskAccessException(fatalDae);
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/monitoring/ThreadsMonitoring.java
b/geode-core/src/main/java/org/apache/geode/internal/monitoring/ThreadsMonitoring.java
index d1f8d2b0b1..3a06449306 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/monitoring/ThreadsMonitoring.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/monitoring/ThreadsMonitoring.java
@@ -29,7 +29,9 @@ public interface ThreadsMonitoring {
ScheduledThreadExecutor,
AGSExecutor,
P2PReaderExecutor,
- ServerConnectionExecutor
+ ServerConnectionExecutor,
+
+ AsyncWriterExecutor
}
Map<Long, AbstractExecutor> getMonitorMap();
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/monitoring/ThreadsMonitoringImpl.java
b/geode-core/src/main/java/org/apache/geode/internal/monitoring/ThreadsMonitoringImpl.java
index 3be816f3db..df66945f66 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/monitoring/ThreadsMonitoringImpl.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/monitoring/ThreadsMonitoringImpl.java
@@ -23,6 +23,7 @@ import java.util.concurrent.ConcurrentMap;
import org.apache.geode.annotations.VisibleForTesting;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.internal.monitoring.executor.AbstractExecutor;
+import org.apache.geode.internal.monitoring.executor.AsyncWriterExecutorGroup;
import
org.apache.geode.internal.monitoring.executor.FunctionExecutionPooledExecutorGroup;
import
org.apache.geode.internal.monitoring.executor.GatewaySenderEventProcessorGroup;
import org.apache.geode.internal.monitoring.executor.OneTaskOnlyExecutorGroup;
@@ -138,6 +139,9 @@ public class ThreadsMonitoringImpl implements
ThreadsMonitoring {
return new P2PReaderExecutorGroup();
case ServerConnectionExecutor:
return new ServerConnectionExecutorGroup();
+ case AsyncWriterExecutor:
+ return new AsyncWriterExecutorGroup();
+
default:
throw new IllegalStateException("Unhandled mode=" + mode);
}
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/AbstractExecutor.java
b/geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/AbstractExecutor.java
index 3e8560d73b..6e3a36b862 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/AbstractExecutor.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/AbstractExecutor.java
@@ -178,6 +178,10 @@ public abstract class AbstractExecutor {
public void resumeMonitoring() {}
+ public void reportProgress() {
+ setStartTime(System.currentTimeMillis());
+ }
+
public boolean isMonitoringSuspended() {
return false;
}
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/SuspendableExecutor.java
b/geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/AsyncWriterExecutorGroup.java
similarity index 57%
copy from
geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/SuspendableExecutor.java
copy to
geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/AsyncWriterExecutorGroup.java
index b64b519a44..f930416ad4 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/SuspendableExecutor.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/AsyncWriterExecutorGroup.java
@@ -14,30 +14,10 @@
*/
package org.apache.geode.internal.monitoring.executor;
-public abstract class SuspendableExecutor extends AbstractExecutor {
- private volatile boolean suspended;
+public class AsyncWriterExecutorGroup extends SuspendableExecutor {
+ public static final String GROUP_NAME = "AsyncWriterExecutor";
- public SuspendableExecutor(String groupName) {
- super(groupName);
- }
-
- @Override
- public void suspendMonitoring() {
- suspended = true;
- }
-
- @Override
- public void resumeMonitoring() {
- setStartTime(0);
- // The ThreadMonitoringProcess will set the
- // startTime once it sees it set to 0.
- // This prevents the monitored thread from
- // constantly calling System.currentTimeMillis.
- suspended = false;
- }
-
- @Override
- public boolean isMonitoringSuspended() {
- return suspended;
+ public AsyncWriterExecutorGroup() {
+ super(GROUP_NAME);
}
}
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/SuspendableExecutor.java
b/geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/SuspendableExecutor.java
index b64b519a44..9f23609da4 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/SuspendableExecutor.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/SuspendableExecutor.java
@@ -36,6 +36,15 @@ public abstract class SuspendableExecutor extends
AbstractExecutor {
suspended = false;
}
+ @Override
+ public void reportProgress() {
+ setStartTime(0);
+ // The ThreadMonitoringProcess will set the
+ // startTime once it sees it set to 0.
+ // This prevents the monitored thread from
+ // constantly calling System.currentTimeMillis.
+ }
+
@Override
public boolean isMonitoringSuspended() {
return suspended;
diff --git
a/geode-core/src/test/java/org/apache/geode/internal/cache/DiskStoreImplValueRecoveryTest.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/DiskStoreImplValueRecoveryTest.java
index fb612a8ac3..60d61b0730 100644
---
a/geode-core/src/test/java/org/apache/geode/internal/cache/DiskStoreImplValueRecoveryTest.java
+++
b/geode-core/src/test/java/org/apache/geode/internal/cache/DiskStoreImplValueRecoveryTest.java
@@ -32,7 +32,11 @@ import org.mockito.ArgumentCaptor;
import org.apache.geode.Statistics;
import org.apache.geode.StatisticsFactory;
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.internal.cache.control.InternalResourceManager;
+import org.apache.geode.internal.monitoring.ThreadsMonitoring;
+import org.apache.geode.internal.monitoring.executor.AbstractExecutor;
public class DiskStoreImplValueRecoveryTest {
@@ -46,10 +50,20 @@ public class DiskStoreImplValueRecoveryTest {
StatisticsFactory statisticsFactory = mock(StatisticsFactory.class);
internalResourceManager = mock(InternalResourceManager.class);
+ InternalDistributedSystem ids = mock(InternalDistributedSystem.class);
+ DistributionManager dm = mock(DistributionManager.class);
+ ThreadsMonitoring threadsMonitoring = mock(ThreadsMonitoring.class);
+
when(statisticsFactory.createStatistics(any(),
any())).thenReturn(mock(Statistics.class));
when(cache.getCachePerfStats()).thenReturn(mock(CachePerfStats.class));
when(cache.getDiskStoreMonitor()).thenReturn(mock(DiskStoreMonitor.class));
+ when(cache.getInternalDistributedSystem()).thenReturn(ids);
+ when(ids.getDM()).thenReturn(dm);
+ when(dm.getThreadMonitoring()).thenReturn(threadsMonitoring);
+
when(threadsMonitoring.createAbstractExecutor(any())).thenReturn(mock(AbstractExecutor.class));
+
+
diskStore = new DiskStoreImpl(cache, "name", diskStoreAttributes, false,
null, false, false,
false, false, false, false, statisticsFactory,
internalResourceManager);
diff --git
a/geode-core/src/test/java/org/apache/geode/internal/cache/FlusherThreadTest.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/FlusherThreadTest.java
index ad81413a90..cb69fbf33f 100644
---
a/geode-core/src/test/java/org/apache/geode/internal/cache/FlusherThreadTest.java
+++
b/geode-core/src/test/java/org/apache/geode/internal/cache/FlusherThreadTest.java
@@ -14,6 +14,7 @@
*/
package org.apache.geode.internal.cache;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
@@ -28,6 +29,10 @@ import org.junit.Before;
import org.junit.Test;
import org.apache.geode.cache.DiskAccessException;
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.internal.monitoring.ThreadsMonitoring;
+import org.apache.geode.internal.monitoring.executor.AbstractExecutor;
public class FlusherThreadTest {
@@ -42,6 +47,10 @@ public class FlusherThreadTest {
diskStoreImpl = mock(DiskStoreImpl.class);
diskStoreStats = mock(DiskStoreStats.class);
PersistentOplogSet persistentOpLogSet = mock(PersistentOplogSet.class);
+ InternalCache cache = mock(InternalCache.class);
+ InternalDistributedSystem ids = mock(InternalDistributedSystem.class);
+ DistributionManager dm = mock(DistributionManager.class);
+ ThreadsMonitoring threadsMonitoring = mock(ThreadsMonitoring.class);
when(diskStoreImpl.getAsyncMonitor()).thenReturn(new Object());
when(diskStoreImpl.getForceFlushCount()).thenReturn(new AtomicInteger(1));
@@ -52,6 +61,14 @@ public class FlusherThreadTest {
when(diskStoreImpl.checkAndClearForceFlush()).thenReturn(true);
when(diskStoreImpl.isStopFlusher()).thenReturn(false).thenReturn(true);
+ when(diskStoreImpl.getAsyncMonitor()).thenReturn(new Object());
+ when(diskStoreImpl.getCache()).thenReturn(cache);
+ when(cache.getInternalDistributedSystem()).thenReturn(ids);
+ when(ids.getDM()).thenReturn(dm);
+ when(dm.getThreadMonitoring()).thenReturn(threadsMonitoring);
+
when(threadsMonitoring.createAbstractExecutor(any())).thenReturn(mock(AbstractExecutor.class));
+
+
flusherThread = new DiskStoreImpl.FlusherThread(diskStoreImpl);
}
diff --git
a/geode-core/src/test/java/org/apache/geode/internal/monitoring/ThreadsMonitoringImplJUnitTest.java
b/geode-core/src/test/java/org/apache/geode/internal/monitoring/ThreadsMonitoringImplJUnitTest.java
index 78af92ab7f..88f73a17ed 100644
---
a/geode-core/src/test/java/org/apache/geode/internal/monitoring/ThreadsMonitoringImplJUnitTest.java
+++
b/geode-core/src/test/java/org/apache/geode/internal/monitoring/ThreadsMonitoringImplJUnitTest.java
@@ -63,6 +63,7 @@ public class ThreadsMonitoringImplJUnitTest {
assertTrue(threadsMonitoringImpl.startMonitor(Mode.AGSExecutor));
assertTrue(threadsMonitoringImpl.startMonitor(Mode.P2PReaderExecutor));
assertTrue(threadsMonitoringImpl.startMonitor(Mode.ServerConnectionExecutor));
+ assertTrue(threadsMonitoringImpl.startMonitor(Mode.AsyncWriterExecutor));
}
@Test
diff --git
a/geode-core/src/test/java/org/apache/geode/internal/monitoring/ThreadsMonitoringJUnitTest.java
b/geode-core/src/test/java/org/apache/geode/internal/monitoring/ThreadsMonitoringJUnitTest.java
index 7dd9d67c87..8d5ac0a711 100644
---
a/geode-core/src/test/java/org/apache/geode/internal/monitoring/ThreadsMonitoringJUnitTest.java
+++
b/geode-core/src/test/java/org/apache/geode/internal/monitoring/ThreadsMonitoringJUnitTest.java
@@ -36,11 +36,13 @@ public class ThreadsMonitoringJUnitTest {
ScheduledThreadExecutor,
AGSExecutor,
P2PReaderExecutor,
- ServerConnectionExecutor
+ ServerConnectionExecutor,
+
+ AsyncWriterExecutor
}
- public final int numberOfElements = 8;
+ public final int numberOfElements = 9;
private static final Logger logger = LogService.getLogger();
/**
diff --git
a/geode-core/src/test/java/org/apache/geode/internal/monitoring/executor/SuspendableExecutorTest.java
b/geode-core/src/test/java/org/apache/geode/internal/monitoring/executor/SuspendableExecutorTest.java
index 5cc9afbbce..ee22d52473 100644
---
a/geode-core/src/test/java/org/apache/geode/internal/monitoring/executor/SuspendableExecutorTest.java
+++
b/geode-core/src/test/java/org/apache/geode/internal/monitoring/executor/SuspendableExecutorTest.java
@@ -50,4 +50,15 @@ public class SuspendableExecutorTest {
executor.resumeMonitoring();
assertThat(executor.getStartTime()).isEqualTo(0);
}
+
+ @Test
+ public void verifyReportProgressAfterResume() {
+ SuspendableExecutor executor = new FakeSuspendableExecutor();
+ executor.resumeMonitoring();
+ assertThat(executor.getStartTime()).isEqualTo(0);
+ executor.setStartTime(11);
+ assertThat(executor.getStartTime()).isEqualTo(11);
+ executor.reportProgress();
+ assertThat(executor.getStartTime()).isEqualTo(0);
+ }
}