This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 3e4013d8ca ARTEMIS-3868 Journal compactor split improvement
3e4013d8ca is described below
commit 3e4013d8ca2308cb99882b5655693c9e0e65a187
Author: Clebert Suconic <[email protected]>
AuthorDate: Mon Jun 27 12:45:15 2022 -0400
ARTEMIS-3868 Journal compactor split improvement
The condition fixed on this commit should not really happen in production
as the compacting counts should always be ordered (records that were
compacted earelier will always be at the top of the journal).
However it highlights an improvement that could be done on the journal
compacting.
---
.../core/journal/impl/JournalCompactor.java | 55 +++------
.../journal/JournalCompactSplitTest.java | 134 +++++++++++++++++++++
2 files changed, 152 insertions(+), 37 deletions(-)
diff --git
a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalCompactor.java
b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalCompactor.java
index fb8e0f7860..e2aafc393c 100644
---
a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalCompactor.java
+++
b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalCompactor.java
@@ -52,6 +52,9 @@ public class JournalCompactor extends
AbstractJournalUpdateTask implements Journ
// We will force a moveNextFiles when the compactCount is bellow than
COMPACT_SPLIT_LINE
private static final short COMPACT_SPLIT_LINE = 2;
+ // Compacting should split the compacting counts only once
+ boolean split = false;
+
// Snapshot of transactions that were pending when the compactor started
private final ConcurrentLongHashMap<PendingTransaction> pendingTransactions
= new ConcurrentLongHashMap<>();
@@ -156,22 +159,22 @@ public class JournalCompactor extends
AbstractJournalUpdateTask implements Journ
}
private void checkSize(final int size) throws Exception {
- checkSize(size, -1);
+ checkSizeAndCompactSplit(size, -1);
}
- private void checkSize(final int size, final int compactCount) throws
Exception {
+ private void checkSizeAndCompactSplit(final int size, final int
compactCount) throws Exception {
if (getWritingChannel() == null) {
- if (!checkCompact(compactCount)) {
- // will need to open a file either way
- openFile();
+ if (compactCount < COMPACT_SPLIT_LINE) {
+ // in case the very first record is already bellog SPLIT-LINE we
need to set split = true already
+ // so no further checks will be done
+ // otherwise we would endup with more files than needed
+ split = true;
}
+ openFile();
} else {
- if (compactCount >= 0) {
- if (checkCompact(compactCount)) {
- // The file was already moved on this case, no need to check
for the size.
- // otherwise we will also need to check for the size
- return;
- }
+ if (compactCount >= 0 && compactCount < COMPACT_SPLIT_LINE && !split)
{
+ split = true;
+ openFile();
}
if (getWritingChannel().writerIndex() + size >
getWritingChannel().capacity()) {
@@ -180,28 +183,6 @@ public class JournalCompactor extends
AbstractJournalUpdateTask implements Journ
}
}
- int currentCount;
-
- // This means we will need to split when the compactCount is bellow the
watermark
- boolean willNeedToSplit = false;
-
- boolean splitted = false;
-
- private boolean checkCompact(final int compactCount) throws Exception {
- if (compactCount >= COMPACT_SPLIT_LINE && !splitted) {
- willNeedToSplit = true;
- }
-
- if (willNeedToSplit && compactCount < COMPACT_SPLIT_LINE) {
- willNeedToSplit = false;
- splitted = false;
- openFile();
- return true;
- } else {
- return false;
- }
- }
-
/**
* Replay pending counts that happened during compacting
*/
@@ -244,7 +225,7 @@ public class JournalCompactor extends
AbstractJournalUpdateTask implements Journ
JournalInternalRecord addRecord = new JournalAddRecord(true, info.id,
info.getUserRecordType(), EncoderPersister.getInstance(), new
ByteArrayEncoding(info.data));
addRecord.setCompactCount((short) (info.compactCount + 1));
- checkSize(addRecord.getEncodeSize(), info.compactCount);
+ checkSizeAndCompactSplit(addRecord.getEncodeSize(),
info.compactCount);
writeEncoder(addRecord);
@@ -271,7 +252,7 @@ public class JournalCompactor extends
AbstractJournalUpdateTask implements Journ
record.setCompactCount((short) (info.compactCount + 1));
- checkSize(record.getEncodeSize(), info.compactCount);
+ checkSizeAndCompactSplit(record.getEncodeSize(), info.compactCount);
newTransaction.addPositive(currentFile, info.id, record.getEncodeSize(),
info.replaceableUpdate);
@@ -460,7 +441,7 @@ public class JournalCompactor extends
AbstractJournalUpdateTask implements Journ
updateRecord.setCompactCount((short) (info.compactCount + 1));
- checkSize(updateRecord.getEncodeSize(), info.compactCount);
+ checkSizeAndCompactSplit(updateRecord.getEncodeSize(),
info.compactCount);
JournalRecord newRecord = newRecords.get(info.id);
@@ -493,7 +474,7 @@ public class JournalCompactor extends
AbstractJournalUpdateTask implements Journ
updateRecordTX.setCompactCount((short) (info.compactCount + 1));
- checkSize(updateRecordTX.getEncodeSize(), info.compactCount);
+ checkSizeAndCompactSplit(updateRecordTX.getEncodeSize(),
info.compactCount);
writeEncoder(updateRecordTX);
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/JournalCompactSplitTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/JournalCompactSplitTest.java
new file mode 100644
index 0000000000..32fc3ce68e
--- /dev/null
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/JournalCompactSplitTest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.journal;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
+import org.apache.activemq.artemis.core.io.SequentialFile;
+import org.apache.activemq.artemis.core.io.SequentialFileFactory;
+import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
+import org.apache.activemq.artemis.core.journal.EncoderPersister;
+import org.apache.activemq.artemis.core.journal.LoaderCallback;
+import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
+import org.apache.activemq.artemis.core.journal.RecordInfo;
+import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
+import
org.apache.activemq.artemis.core.journal.impl.dataformat.ByteArrayEncoding;
+import
org.apache.activemq.artemis.core.journal.impl.dataformat.JournalAddRecord;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Authored by Fabio Nascimento Brandao through
https://issues.apache.org/jira/browse/ARTEMIS-3868
+ * Clebert added some refactoring to make it a Unit Test
+ * */
+public class JournalCompactSplitTest extends ActiveMQTestBase {
+ private static final long RECORDS_TO_CREATE = 100;
+
+ private static final int JOURNAL_FILE_SIZE = 100 * 1024;
+
+ private static final int BODY_SIZE = 124;
+
+ @Test
+ public void testJournalSplit() throws Exception {
+
+ SequentialFileFactory fileFactory = new NIOSequentialFileFactory(new
File(getTemporaryDir()), 1);
+
+ createFileWithRecords(fileFactory);
+
+ createAndCompactJournal(fileFactory);
+ }
+
+ private void createAndCompactJournal(SequentialFileFactory fileFactory)
throws Exception {
+ final int minFiles = 2;
+ final int compactMinFiles = 2;
+ final int compactPercentage = 30;
+ final String filePrefix = "activemq-data";
+ final String fileExtension = "amq";
+ final int maxAIO = 1024;
+ JournalImpl journalImpl = new JournalImpl(JOURNAL_FILE_SIZE, minFiles,
2, compactMinFiles, compactPercentage, fileFactory, filePrefix, fileExtension,
maxAIO, 1);
+
+ AtomicInteger recordCount = new AtomicInteger(0);
+ journalImpl.start();
+ runAfter(journalImpl::stop);
+ journalImpl.load(new LoaderCallback() {
+ @Override
+ public void addPreparedTransaction(PreparedTransactionInfo
preparedTransaction) {
+
+ }
+
+ @Override
+ public void addRecord(RecordInfo info) {
+ recordCount.incrementAndGet();
+ }
+
+ @Override
+ public void deleteRecord(long id) {
+ }
+
+ @Override
+ public void updateRecord(RecordInfo info) {
+ }
+
+ @Override
+ public void failedTransaction(long transactionID, List<RecordInfo>
records, List<RecordInfo> recordsToDelete) {
+ }
+ });
+
+ Assert.assertEquals(RECORDS_TO_CREATE, recordCount.get());
+
+ journalImpl.compact();
+ Assert.assertEquals(2, journalImpl.getDataFilesCount());
+ }
+
+ private void createFileWithRecords(SequentialFileFactory fileFactory)
throws Exception {
+ SequentialFile file =
fileFactory.createSequentialFile("activemq-data-1.amq", JOURNAL_FILE_SIZE);
+ file.open();
+ file.fill(JOURNAL_FILE_SIZE);
+ file.position(0);
+ ByteBuffer bb = fileFactory.newBuffer(JournalImpl.SIZE_HEADER);
+ ActiveMQBuffer buffer = ActiveMQBuffers.wrappedBuffer(bb);
+ JournalImpl.writeHeader(buffer, 1, 1);
+ file.write(buffer, true);
+
+ byte[] z = new byte[BODY_SIZE];
+ for (int i = 0; i < BODY_SIZE; i++) {
+ z[i] = 'z';
+ }
+ short compactCount = 10;
+
+ for (long i = 0; i < RECORDS_TO_CREATE; i++) {
+ JournalAddRecord record = new JournalAddRecord(true, i,
JournalImpl.ADD_RECORD, EncoderPersister.getInstance(), new
ByteArrayEncoding(z));
+ record.setFileID(1);
+ compactCount--;
+ if (compactCount < 0) compactCount = 10;
+
+ // this test is playing with the order of compact count
+ record.setCompactCount(compactCount);
+ file.write(record, false);
+ }
+ file.close();
+ }
+
+
+}