This is an automated email from the ASF dual-hosted git repository.
sergeychugunov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 49f4086 IGNITE-13569 disable archiving + walCompactionEnabled
probably broke reading from wal on server restart - Fixes #8344.
49f4086 is described below
commit 49f4086a2b97857bd45bea107510210fcab72cdb
Author: Anton Kalashnikov <[email protected]>
AuthorDate: Fri Oct 16 16:56:59 2020 +0300
IGNITE-13569 disable archiving + walCompactionEnabled probably broke
reading from wal on server restart - Fixes #8344.
Signed-off-by: Sergey Chugunov <[email protected]>
---
.../persistence/wal/FileWriteAheadLogManager.java | 36 ++++--
.../db/wal/WalCompactionNoArchiverTest.java | 135 +++++++++++++++++++++
.../ignite/testsuites/IgnitePdsMvccTestSuite2.java | 2 +
.../ignite/testsuites/IgnitePdsTestSuite2.java | 2 +
4 files changed, 164 insertions(+), 11 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
index 6e73141..b79d637 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
@@ -484,17 +484,19 @@ public class FileWriteAheadLogManager extends
GridCacheSharedManagerAdapter impl
segmentAware = new SegmentAware(dsCfg.getWalSegments(),
dsCfg.isWalCompactionEnabled());
- if (isArchiverEnabled())
- archiver = new FileArchiver(segmentAware, log);
- else
- archiver = null;
-
+ // We have to initialize compressor before archiver in order to
setup already compressed segments.
+ // Otherwise, FileArchiver initialization will trigger redundant
work for FileCompressor.
if (dsCfg.isWalCompactionEnabled()) {
compressor = new FileCompressor(log);
decompressor = new FileDecompressor(log);
}
+ if (isArchiverEnabled())
+ archiver = new FileArchiver(segmentAware, log);
+ else
+ archiver = null;
+
segmentRouter = new SegmentRouter(walWorkDir, walArchiveDir,
segmentAware, dsCfg);
fileHandleManager = fileHandleManagerFactory.build(
@@ -2072,6 +2074,8 @@ public class FileWriteAheadLogManager extends
GridCacheSharedManagerAdapter impl
/** */
FileCompressor(IgniteLogger log) {
super(0, log);
+
+ initAlreadyCompressedSegments();
}
/** */
@@ -2085,11 +2089,6 @@ public class FileWriteAheadLogManager extends
GridCacheSharedManagerAdapter impl
f.delete();
}
- FileDescriptor[] alreadyCompressed =
scan(walArchiveDir.listFiles(WAL_SEGMENT_FILE_COMPACTED_FILTER));
-
- if (alreadyCompressed.length > 0)
-
segmentAware.onSegmentCompressed(alreadyCompressed[alreadyCompressed.length -
1].idx());
-
for (int i = 1; i < calculateThreadCount(); i++) {
FileCompressorWorker worker = new FileCompressorWorker(i, log);
@@ -2102,6 +2101,16 @@ public class FileWriteAheadLogManager extends
GridCacheSharedManagerAdapter impl
}
/**
+ * Checks if there are already compressed segments and assigns
counters if needed.
+ */
+ private void initAlreadyCompressedSegments() {
+ FileDescriptor[] alreadyCompressed =
scan(walArchiveDir.listFiles(WAL_SEGMENT_FILE_COMPACTED_FILTER));
+
+ if (alreadyCompressed.length > 0)
+
segmentAware.onSegmentCompressed(alreadyCompressed[alreadyCompressed.length -
1].idx());
+ }
+
+ /**
* Calculate optimal additional compressor worker threads count. If
quarter of proc threads greater
* than WAL_COMPRESSOR_WORKER_THREAD_CNT, use this value. Otherwise,
reduce number of threads.
*
@@ -2148,6 +2157,9 @@ public class FileWriteAheadLogManager extends
GridCacheSharedManagerAdapter impl
/** */
private class FileCompressorWorker extends GridWorker {
+ /** Last compression error. */
+ private volatile Throwable lastCompressionError;
+
/** */
FileCompressorWorker(int idx, IgniteLogger log) {
super(cctx.igniteInstanceName(), "wal-file-compressor-%" +
cctx.igniteInstanceName() + "%-" + idx, log);
@@ -2228,8 +2240,10 @@ public class FileWriteAheadLogManager extends
GridCacheSharedManagerAdapter impl
Thread.currentThread().interrupt();
}
catch (IgniteCheckedException | IOException e) {
+ lastCompressionError = e;
+
U.error(log, "Compression of WAL segment [idx=" + segIdx +
- "] was skipped due to unexpected error", e);
+ "] was skipped due to unexpected error",
lastCompressionError);
segmentAware.onSegmentCompressed(segIdx);
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionNoArchiverTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionNoArchiverTest.java
new file mode 100644
index 0000000..4c50e25
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionNoArchiverTest.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.persistence.db.wal;
+
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+/**
+ * Checks that WAL compaction works correctly in no-archiver mode.
+ */
+public class WalCompactionNoArchiverTest extends GridCommonAbstractTest {
+ /** Wal segment size. */
+ private static final int WAL_SEGMENT_SIZE = 4 * 1024 * 1024;
+
+ /** Cache name. */
+ public static final String CACHE_NAME = "cache";
+
+ /** Entries count. */
+ public static final int ENTRIES = 1000;
+
+ /** WAL path. */
+ public static final String WAL_PATH = "no-arch";
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String name)
throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(name);
+
+ cfg.setDataStorageConfiguration(new DataStorageConfiguration()
+ .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
+ .setPersistenceEnabled(true)
+ .setMaxSize(200L * 1024 * 1024))
+ .setWalSegmentSize(WAL_SEGMENT_SIZE)
+ .setWalCompactionEnabled(true)
+ .setWalArchivePath(WAL_PATH)
+ .setWalPath(WAL_PATH)
+ .setCheckpointFrequency(1000));
+
+ CacheConfiguration ccfg = new CacheConfiguration();
+
+ ccfg.setName(CACHE_NAME);
+ ccfg.setAffinity(new RendezvousAffinityFunction(false, 16));
+
+ cfg.setCacheConfiguration(ccfg);
+ cfg.setConsistentId(name);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ stopAllGrids();
+
+ cleanPersistenceDir();
+
+ U.delete(U.resolveWorkDirectory(U.defaultWorkDirectory(), WAL_PATH,
true));
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+
+ cleanPersistenceDir();
+
+ U.delete(U.resolveWorkDirectory(U.defaultWorkDirectory(), WAL_PATH,
true));
+ }
+
+ /**
+ * Tests that attempts to compress WAL segment don't result with error.
+ */
+ @Test
+ @WithSystemProperty(key = "IGNITE_WAL_COMPRESSOR_WORKER_THREAD_CNT", value
= "1")
+ public void testNoCompressionErrors() throws Exception {
+ IgniteEx ig = startGrid(0);
+ ig.cluster().active(true);
+
+ IgniteCache<Integer, byte[]> cache = ig.cache(CACHE_NAME);
+
+ for (int i = 0; i < ENTRIES; i++) { // At least 20MB of raw data in
total.
+ final byte[] val = new byte[40000];
+
+ val[i] = 1;
+
+ cache.put(i, val);
+ }
+
+ stopAllGrids();
+
+ ig = startGrid(0);
+
+ cache = ig.cache(CACHE_NAME);
+
+ for (int i = 0; i < ENTRIES; i++) { // At least 20MB of raw data in
total.
+ final byte[] val = new byte[40000];
+
+ val[i] = 1;
+
+ cache.put(i, val);
+ }
+
+ IgniteWriteAheadLogManager wal = ig.context().cache().context().wal();
+
+ Object compressor = U.field(wal, "compressor");
+
+ assertNotNull(compressor);
+
+ Object error = U.field(compressor, "lastCompressionError");
+
+ if (error != null)
+ fail("Unexpected error in FileCompressor: " + error);
+ }
+}
diff --git
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsMvccTestSuite2.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsMvccTestSuite2.java
index a17e889..3c92319 100644
---
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsMvccTestSuite2.java
+++
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsMvccTestSuite2.java
@@ -38,6 +38,7 @@ import
org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalI
import
org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalIteratorSwitchSegmentTest;
import
org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalRebalanceLoggingTest;
import
org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalSerializerVersionTest;
+import
org.apache.ignite.internal.processors.cache.persistence.db.wal.WalCompactionNoArchiverTest;
import
org.apache.ignite.internal.processors.cache.persistence.db.wal.WalCompactionSwitchOnTest;
import
org.apache.ignite.internal.processors.cache.persistence.db.wal.WalCompactionTest;
import
org.apache.ignite.internal.processors.cache.persistence.db.wal.WalRolloverTypesTest;
@@ -88,6 +89,7 @@ public class IgnitePdsMvccTestSuite2 {
ignoredTests.add(IgniteUidAsConsistentIdMigrationTest.class);
ignoredTests.add(IgniteWalSerializerVersionTest.class);
ignoredTests.add(WalCompactionTest.class);
+ ignoredTests.add(WalCompactionNoArchiverTest.class);
ignoredTests.add(WalCompactionSwitchOnTest.class);
ignoredTests.add(IgniteWalIteratorSwitchSegmentTest.class);
ignoredTests.add(IgniteWalIteratorExceptionDuringReadTest.class);
diff --git
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
index 00fc33a..8ff45e4 100644
---
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
+++
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
@@ -74,6 +74,7 @@ import
org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalR
import
org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalRecoverySeveralRestartsTest;
import
org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalReplayingAfterRestartTest;
import
org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalSerializerVersionTest;
+import
org.apache.ignite.internal.processors.cache.persistence.db.wal.WalCompactionNoArchiverTest;
import
org.apache.ignite.internal.processors.cache.persistence.db.wal.WalCompactionSwitchOnTest;
import
org.apache.ignite.internal.processors.cache.persistence.db.wal.WalCompactionTest;
import
org.apache.ignite.internal.processors.cache.persistence.db.wal.WalDeletionArchiveFsyncTest;
@@ -201,6 +202,7 @@ public class IgnitePdsTestSuite2 {
GridTestUtils.addTestIfNeeded(suite,
IgniteUidAsConsistentIdMigrationTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite,
IgniteWalSerializerVersionTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, WalCompactionTest.class,
ignoredTests);
+ GridTestUtils.addTestIfNeeded(suite,
WalCompactionNoArchiverTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, WalCompactionSwitchOnTest.class,
ignoredTests);
GridTestUtils.addTestIfNeeded(suite,
WalDeletionArchiveFsyncTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite,
WalDeletionArchiveLogOnlyTest.class, ignoredTests);