This is an automated email from the ASF dual-hosted git repository.

mkevo pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new c0fbe30  GEODE-9881: Oplog not compacted after recovery (#7193)
c0fbe30 is described below

commit c0fbe309ded8e1b53b048ff80a1892eb6a1285ff
Author: Jakov Varenina <[email protected]>
AuthorDate: Tue Jan 11 12:27:42 2022 +0100

    GEODE-9881: Oplog not compacted after recovery (#7193)
    
    * GEODE-9881: Oplog not compacted after recovery
---
 ...ctorClearOplogAfterRecoveryIntegrationTest.java | 182 +++++++++++++++++++++
 .../geode/internal/offheap/OffHeapRegionBase.java  |   5 +-
 .../org/apache/geode/internal/cache/Oplog.java     |   2 +-
 3 files changed, 187 insertions(+), 2 deletions(-)

diff --git 
a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/DiskRegionCompactorClearOplogAfterRecoveryIntegrationTest.java
 
b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/DiskRegionCompactorClearOplogAfterRecoveryIntegrationTest.java
new file mode 100644
index 0000000..18357a7
--- /dev/null
+++ 
b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/DiskRegionCompactorClearOplogAfterRecoveryIntegrationTest.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.CacheFactory;
+import org.apache.geode.cache.DataPolicy;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionFactory;
+import org.apache.geode.cache.RegionShortcut;
+
+/**
+ * Verifies that automatic compaction works after cache recovered from oplogs
+ */
+public class DiskRegionCompactorClearOplogAfterRecoveryIntegrationTest {
+
+  private final Properties config = new Properties();
+  private Cache cache;
+
+  private File[] diskDirs;
+  private int[] diskDirSizes;
+
+  private String regionName;
+  private String diskStoreName;
+
+  @Rule
+  public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+  @Rule
+  public TestName testName = new TestName();
+
+  private static final int ENTRY_RANGE = 350;
+
+  @Before
+  public void setUp() throws Exception {
+    String uniqueName = getClass().getSimpleName() + "_" + 
testName.getMethodName();
+    regionName = uniqueName + "_region";
+    diskStoreName = uniqueName + "_diskStore";
+
+    cache = new CacheFactory(config).create();
+
+    diskDirs = new File[1];
+    diskDirs[0] = createDirectory(temporaryFolder.getRoot(), 
testName.getMethodName());
+    diskDirSizes = new int[1];
+    Arrays.fill(diskDirSizes, Integer.MAX_VALUE);
+
+    DiskStoreImpl.SET_IGNORE_PREALLOCATE = true;
+    TombstoneService.EXPIRED_TOMBSTONE_LIMIT = 1;
+    TombstoneService.REPLICATE_TOMBSTONE_TIMEOUT = 1;
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    try {
+      cache.close();
+    } finally {
+      DiskStoreImpl.SET_IGNORE_PREALLOCATE = false;
+    }
+  }
+
+  /**
+   * Verifies that compaction works as expected after region is recovered
+   **/
+  @Test
+  public void testThatCompactionWorksAfterRegionIsClosedAndThenRecovered()
+      throws InterruptedException {
+
+    createDiskStore(30, 10000);
+    Region<Object, Object> region = createRegion();
+    DiskStoreImpl diskStore = ((InternalRegion) region).getDiskStore();
+
+    // Create several oplog files (.crf and .drf) by executing put operations 
in defined range
+    executePutOperations(region);
+    await().untilAsserted(() -> 
assertThat(getCurrentNumberOfOplogs(diskStore)).isEqualTo(5));
+
+    Set<Long> oplogIds = getAllOplogIds(diskStore);
+
+    region.close();
+    region = createRegion();
+
+    // Execute destroy operations for all entries created in previous step. 
All oplogs created
+    // in previous step will not contain live entries, so they must be 
compacted.
+    executeDestroyOperations(region);
+
+    await().untilAsserted(
+        () -> assertThat(areOplogsCompacted(oplogIds, diskStore)).isTrue());
+  }
+
+  boolean areOplogsCompacted(Set<Long> oplogIds, DiskStoreImpl diskStore) {
+    Set<Long> currentOplogId = getAllOplogIds(diskStore);
+    return currentOplogId.stream().noneMatch(oplogIds::contains);
+  }
+
+  Set<Long> getAllOplogIds(DiskStoreImpl diskStore) {
+    Set<Long> oplogIds = new HashSet<>();
+    for (Oplog oplog : diskStore.getAllOplogsForBackup()) {
+      oplogIds.add(oplog.getOplogId());
+    }
+    return oplogIds;
+  }
+
+
+  void executePutOperations(Region<Object, Object> region) {
+    for (int i = 0; i < ENTRY_RANGE; i++) {
+      region.put(i, new byte[100]);
+    }
+  }
+
+  void executeDestroyOperations(Region<Object, Object> region) throws 
InterruptedException {
+    TombstoneService tombstoneService = ((InternalCache) 
cache).getTombstoneService();
+    for (int i = 0; i < ENTRY_RANGE; i++) {
+      region.destroy(i);
+      assertThat(tombstoneService.forceBatchExpirationForTests(1)).isTrue();
+    }
+  }
+
+  void createDiskStore(int compactionThreshold, int maxOplogSizeInBytes) {
+    DiskStoreFactoryImpl diskStoreFactory = (DiskStoreFactoryImpl) 
cache.createDiskStoreFactory();
+    diskStoreFactory.setAutoCompact(true);
+    diskStoreFactory.setCompactionThreshold(compactionThreshold);
+    diskStoreFactory.setDiskDirsAndSizes(diskDirs, diskDirSizes);
+
+    createDiskStoreWithSizeInBytes(diskStoreName, diskStoreFactory, 
maxOplogSizeInBytes);
+  }
+
+  Region<Object, Object> createRegion() {
+    RegionFactory<Object, Object> regionFactory =
+        cache.createRegionFactory(RegionShortcut.PARTITION_PERSISTENT);
+    regionFactory.setDataPolicy(DataPolicy.PERSISTENT_PARTITION);
+    regionFactory.setDiskStoreName(diskStoreName);
+    regionFactory.setDiskSynchronous(true);
+    return regionFactory.create(regionName);
+  }
+
+  int getCurrentNumberOfOplogs(DiskStoreImpl ds) {
+    return ds.getAllOplogsForBackup().length;
+  }
+
+  private File createDirectory(File parentDirectory, String name) {
+    File file = new File(parentDirectory, name);
+    assertThat(file.mkdir()).isTrue();
+    return file;
+  }
+
+  private void createDiskStoreWithSizeInBytes(String diskStoreName,
+      DiskStoreFactoryImpl diskStoreFactory,
+      long maxOplogSizeInBytes) {
+    diskStoreFactory.setMaxOplogSizeInBytes(maxOplogSizeInBytes);
+    diskStoreFactory.setDiskDirSizesUnit(DiskDirSizesUnit.BYTES);
+    diskStoreFactory.create(diskStoreName);
+  }
+}
diff --git 
a/geode-core/src/integrationTest/java/org/apache/geode/internal/offheap/OffHeapRegionBase.java
 
b/geode-core/src/integrationTest/java/org/apache/geode/internal/offheap/OffHeapRegionBase.java
index 13914bc..bb0ce74 100644
--- 
a/geode-core/src/integrationTest/java/org/apache/geode/internal/offheap/OffHeapRegionBase.java
+++ 
b/geode-core/src/integrationTest/java/org/apache/geode/internal/offheap/OffHeapRegionBase.java
@@ -17,6 +17,7 @@ package org.apache.geode.internal.offheap;
 import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
 import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
 import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -475,7 +476,9 @@ public abstract class OffHeapRegionBase {
           fail("identity of offHeapStore changed when cache was recreated");
         }
         r = gfc.createRegionFactory(rs).setOffHeap(true).create(rName);
-        assertTrue(ma.getUsedMemory() > 0);
+        await().untilAsserted(() -> {
+          assertThat(ma.getUsedMemory()).isGreaterThan(0);
+        });
         assertEquals(4, r.size());
         assertEquals(data, r.get("key1"));
         assertEquals(Integer.valueOf(1234567890), r.get("key2"));
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/Oplog.java 
b/geode-core/src/main/java/org/apache/geode/internal/cache/Oplog.java
index a69b6a3..7bf27c0 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/Oplog.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/Oplog.java
@@ -1302,7 +1302,6 @@ public class Oplog implements CompactableOplog, Flushable 
{
     // while a krf is being created can not close a region
     lockCompactor();
     try {
-      addUnrecoveredRegion(dr.getId());
       DiskRegionInfo dri = getDRI(dr);
       if (dri != null) {
         long clearCount = dri.clear(null);
@@ -1313,6 +1312,7 @@ public class Oplog implements CompactableOplog, Flushable 
{
         }
         regionMap.remove(dr.getId(), dri);
       }
+      addUnrecoveredRegion(dr.getId());
     } finally {
       unlockCompactor();
     }

Reply via email to