This is an automated email from the ASF dual-hosted git repository.
mhubail pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git
The following commit(s) were added to refs/heads/master by this push:
new 199398a [ASTERIXDB-2730][STO] Avoid Double Flushes in GVBC
199398a is described below
commit 199398a13f631a8fe848c833711ed09e49509c70
Author: Murtadha Hubail <[email protected]>
AuthorDate: Wed May 13 04:13:27 2020 +0300
[ASTERIXDB-2730][STO] Avoid Double Flushes in GVBC
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Ensure GVBC only flushes a primary index if it has a modified
memory component and there is no pending flush request.
Change-Id: Ib4c3c632c43d83c5e60960c2cdcce54f1216b851
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/6305
Tested-by: Jenkins <[email protected]>
Integration-Tests: Jenkins <[email protected]>
Reviewed-by: Murtadha Hubail <[email protected]>
---
.../dataflow/GlobalVirtualBufferCacheTest.java | 112 ++++++++++++---------
.../common/context/GlobalVirtualBufferCache.java | 43 ++++----
2 files changed, 89 insertions(+), 66 deletions(-)
diff --git
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/GlobalVirtualBufferCacheTest.java
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/GlobalVirtualBufferCacheTest.java
index 0b07bc4..0e51f28 100644
---
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/GlobalVirtualBufferCacheTest.java
+++
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/GlobalVirtualBufferCacheTest.java
@@ -91,65 +91,85 @@ public class GlobalVirtualBufferCacheTest {
private static final long FILTERED_MEMORY_COMPONENT_SIZE = 16 * 1024l;
@BeforeClass
- public static void setUp() throws Exception {
- System.out.println("SetUp: ");
- TestHelper.deleteExistingInstanceFiles();
- String configPath = System.getProperty("user.dir") + File.separator +
"src" + File.separator + "test"
- + File.separator + "resources" + File.separator + "cc.conf";
- nc = new TestNodeController(configPath, false);
+ public static void setUp() {
+ try {
+ System.out.println("SetUp: ");
+ TestHelper.deleteExistingInstanceFiles();
+ String configPath = System.getProperty("user.dir") +
File.separator + "src" + File.separator + "test"
+ + File.separator + "resources" + File.separator +
"cc.conf";
+ nc = new TestNodeController(configPath, false);
+ } catch (Throwable e) {
+ LOGGER.error(e);
+ Assert.fail(e.getMessage());
+ }
}
@Before
- public void initializeTest() throws Exception {
+ public void initializeTest() {
// initialize NC before each test
- initializeNc();
- initializeTestCtx();
- createIndex();
- readIndex();
- tupleGenerator = StorageTestUtils.getTupleGenerator();
+ try {
+ initializeNc();
+ initializeTestCtx();
+ createIndex();
+ readIndex();
+ tupleGenerator = StorageTestUtils.getTupleGenerator();
+ } catch (Throwable e) {
+ LOGGER.error(e);
+ Assert.fail(e.getMessage());
+ }
}
@After
- public void deinitializeTest() throws Exception {
- dropIndex();
- // cleanup after each test case
- nc.deInit(true);
- nc.clearOpts();
+ public void deinitializeTest() {
+ try {
+ dropIndex();
+ // cleanup after each test case
+ nc.deInit(true);
+ nc.clearOpts();
+ } catch (Throwable e) {
+ LOGGER.error(e);
+ Assert.fail(e.getMessage());
+ }
}
@Test
- public void testFlushes() throws Exception {
- List<Thread> threads = new ArrayList<>();
- int records = 16 * 1024;
- int threadsPerPartition = 2;
- AtomicReference<Exception> exceptionRef = new AtomicReference<>();
- for (int p = 0; p < NUM_PARTITIONS; p++) {
- for (int t = 0; t < threadsPerPartition; t++) {
- threads.add(insertRecords(records, p, false, exceptionRef));
- threads.add(insertRecords(records, p, true, exceptionRef));
+ public void testFlushes() {
+ try {
+ List<Thread> threads = new ArrayList<>();
+ int records = 16 * 1024;
+ int threadsPerPartition = 2;
+ AtomicReference<Exception> exceptionRef = new AtomicReference<>();
+ for (int p = 0; p < NUM_PARTITIONS; p++) {
+ for (int t = 0; t < threadsPerPartition; t++) {
+ threads.add(insertRecords(records, p, false,
exceptionRef));
+ threads.add(insertRecords(records, p, true, exceptionRef));
+ }
}
- }
- for (Thread thread : threads) {
- thread.join();
- }
- if (exceptionRef.get() != null) {
- exceptionRef.get().printStackTrace();
- Assert.fail();
- }
- for (int i = 0; i < NUM_PARTITIONS; i++) {
-
Assert.assertFalse(primaryIndexes[i].getDiskComponents().isEmpty());
- Assert.assertTrue(
- primaryIndexes[i].getDiskComponents().stream().anyMatch(c
-> ((AbstractTreeIndex) c.getIndex())
- .getFileReference().getFile().length() >
FILTERED_MEMORY_COMPONENT_SIZE));
+ for (Thread thread : threads) {
+ thread.join();
+ }
+ if (exceptionRef.get() != null) {
+ exceptionRef.get().printStackTrace();
+ Assert.fail();
+ }
+ for (int i = 0; i < NUM_PARTITIONS; i++) {
+
Assert.assertFalse(primaryIndexes[i].getDiskComponents().isEmpty());
+ Assert.assertTrue(
+
primaryIndexes[i].getDiskComponents().stream().anyMatch(c ->
((AbstractTreeIndex) c.getIndex())
+ .getFileReference().getFile().length() >
FILTERED_MEMORY_COMPONENT_SIZE));
-
Assert.assertFalse(filteredPrimaryIndexes[i].getDiskComponents().isEmpty());
-
Assert.assertTrue(filteredPrimaryIndexes[i].getDiskComponents().stream()
- .allMatch(c -> ((AbstractTreeIndex)
c.getIndex()).getFileReference().getFile()
- .length() <= FILTERED_MEMORY_COMPONENT_SIZE));
- }
+
Assert.assertFalse(filteredPrimaryIndexes[i].getDiskComponents().isEmpty());
+
Assert.assertTrue(filteredPrimaryIndexes[i].getDiskComponents().stream()
+ .allMatch(c -> ((AbstractTreeIndex)
c.getIndex()).getFileReference().getFile()
+ .length() <= FILTERED_MEMORY_COMPONENT_SIZE));
+ }
- nc.getTransactionManager().commitTransaction(txnCtx.getTxnId());
-
nc.getTransactionManager().commitTransaction(filteredTxnCtx.getTxnId());
+ nc.getTransactionManager().commitTransaction(txnCtx.getTxnId());
+
nc.getTransactionManager().commitTransaction(filteredTxnCtx.getTxnId());
+ } catch (Throwable e) {
+ LOGGER.error(e);
+ Assert.fail(e.getMessage());
+ }
}
private void initializeNc() throws Exception {
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/GlobalVirtualBufferCache.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/GlobalVirtualBufferCache.java
index 5177b25..c0197b0 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/GlobalVirtualBufferCache.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/GlobalVirtualBufferCache.java
@@ -452,33 +452,36 @@ public class GlobalVirtualBufferCache implements
IVirtualBufferCache, ILifeCycle
private void scheduleFlush() throws HyracksDataException {
synchronized (GlobalVirtualBufferCache.this) {
- if (vbc.getUsage() < flushPageBudget || flushingIndex != null)
{
- return;
- }
int cycles = 0;
- // find the first modified memory component while avoiding
infinite loops
- while (cycles <= primaryIndexes.size()
- &&
!primaryIndexes.get(flushPtr).getCurrentMemoryComponent().isModified()) {
- flushPtr = (flushPtr + 1) % primaryIndexes.size();
- cycles++;
- }
- if
(primaryIndexes.get(flushPtr).getCurrentMemoryComponent().isModified()) {
- // flush the current memory component
- flushingIndex = primaryIndexes.get(flushPtr);
+ while (vbc.getUsage() >= flushPageBudget && flushingIndex ==
null && cycles <= primaryIndexes.size()) {
+ // find the first modified memory component while avoiding
infinite loops
+ while (cycles <= primaryIndexes.size()
+ &&
primaryIndexes.get(flushPtr).isCurrentMutableComponentEmpty()) {
+ flushPtr = (flushPtr + 1) % primaryIndexes.size();
+ cycles++;
+ }
+
+ ILSMIndex primaryIndex = primaryIndexes.get(flushPtr);
flushPtr = (flushPtr + 1) % primaryIndexes.size();
// we need to manually flush this memory component because
it may be idle at this point
// note that this is different from flushing a filtered
memory component
PrimaryIndexOperationTracker opTracker =
- (PrimaryIndexOperationTracker)
flushingIndex.getOperationTracker();
+ (PrimaryIndexOperationTracker)
primaryIndex.getOperationTracker();
synchronized (opTracker) {
- opTracker.setFlushOnExit(true);
- opTracker.flushIfNeeded();
- // If the flush cannot be scheduled at this time, then
there must be active writers.
- // The flush will be eventually scheduled when writers
exit
+ boolean flushable =
!primaryIndex.isCurrentMutableComponentEmpty();
+ if (flushable && !opTracker.isFlushLogCreated()) {
+ // if the flush log has already been created, then
we can simply wait for
+ // that flush to complete
+ opTracker.setFlushOnExit(true);
+ opTracker.flushIfNeeded();
+ // If the flush cannot be scheduled at this time,
then there must be active writers.
+ // The flush will be eventually scheduled when
writers exit
+ }
+ if (flushable || opTracker.isFlushLogCreated()) {
+ flushingIndex = primaryIndex;
+ break;
+ }
}
- } else {
- throw new IllegalStateException(
- "Cannot find modified memory component after
checking all primary indexes");
}
}
}