This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 3e4013d8ca ARTEMIS-3868 Journal compactor split improvement
3e4013d8ca is described below

commit 3e4013d8ca2308cb99882b5655693c9e0e65a187
Author: Clebert Suconic <[email protected]>
AuthorDate: Mon Jun 27 12:45:15 2022 -0400

    ARTEMIS-3868 Journal compactor split improvement
    
    The condition fixed on this commit should not really happen in production
    as the compacting counts should always be ordered (records that were 
compacted earelier will always be at the top of the journal).
    
    However it highlights an improvement that could be done on the journal 
compacting.
---
 .../core/journal/impl/JournalCompactor.java        |  55 +++------
 .../journal/JournalCompactSplitTest.java           | 134 +++++++++++++++++++++
 2 files changed, 152 insertions(+), 37 deletions(-)

diff --git 
a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalCompactor.java
 
b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalCompactor.java
index fb8e0f7860..e2aafc393c 100644
--- 
a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalCompactor.java
+++ 
b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalCompactor.java
@@ -52,6 +52,9 @@ public class JournalCompactor extends 
AbstractJournalUpdateTask implements Journ
    // We will force a moveNextFiles when the compactCount is bellow than 
COMPACT_SPLIT_LINE
    private static final short COMPACT_SPLIT_LINE = 2;
 
+   // Compacting should split the compacting counts only once
+   boolean split = false;
+
    // Snapshot of transactions that were pending when the compactor started
    private final ConcurrentLongHashMap<PendingTransaction> pendingTransactions 
= new ConcurrentLongHashMap<>();
 
@@ -156,22 +159,22 @@ public class JournalCompactor extends 
AbstractJournalUpdateTask implements Journ
    }
 
    private void checkSize(final int size) throws Exception {
-      checkSize(size, -1);
+      checkSizeAndCompactSplit(size, -1);
    }
 
-   private void checkSize(final int size, final int compactCount) throws 
Exception {
+   private void checkSizeAndCompactSplit(final int size, final int 
compactCount) throws Exception {
       if (getWritingChannel() == null) {
-         if (!checkCompact(compactCount)) {
-            // will need to open a file either way
-            openFile();
+         if (compactCount < COMPACT_SPLIT_LINE) {
+            // in case the very first record is already bellog SPLIT-LINE we 
need to set split = true already
+            // so no further checks will be done
+            // otherwise we would endup with more files than needed
+            split = true;
          }
+         openFile();
       } else {
-         if (compactCount >= 0) {
-            if (checkCompact(compactCount)) {
-               // The file was already moved on this case, no need to check 
for the size.
-               // otherwise we will also need to check for the size
-               return;
-            }
+         if (compactCount >= 0 && compactCount < COMPACT_SPLIT_LINE && !split) 
{
+            split = true;
+            openFile();
          }
 
          if (getWritingChannel().writerIndex() + size > 
getWritingChannel().capacity()) {
@@ -180,28 +183,6 @@ public class JournalCompactor extends 
AbstractJournalUpdateTask implements Journ
       }
    }
 
-   int currentCount;
-
-   // This means we will need to split when the compactCount is bellow the 
watermark
-   boolean willNeedToSplit = false;
-
-   boolean splitted = false;
-
-   private boolean checkCompact(final int compactCount) throws Exception {
-      if (compactCount >= COMPACT_SPLIT_LINE && !splitted) {
-         willNeedToSplit = true;
-      }
-
-      if (willNeedToSplit && compactCount < COMPACT_SPLIT_LINE) {
-         willNeedToSplit = false;
-         splitted = false;
-         openFile();
-         return true;
-      } else {
-         return false;
-      }
-   }
-
    /**
     * Replay pending counts that happened during compacting
     */
@@ -244,7 +225,7 @@ public class JournalCompactor extends 
AbstractJournalUpdateTask implements Journ
          JournalInternalRecord addRecord = new JournalAddRecord(true, info.id, 
info.getUserRecordType(), EncoderPersister.getInstance(), new 
ByteArrayEncoding(info.data));
          addRecord.setCompactCount((short) (info.compactCount + 1));
 
-         checkSize(addRecord.getEncodeSize(), info.compactCount);
+         checkSizeAndCompactSplit(addRecord.getEncodeSize(), 
info.compactCount);
 
          writeEncoder(addRecord);
 
@@ -271,7 +252,7 @@ public class JournalCompactor extends 
AbstractJournalUpdateTask implements Journ
 
       record.setCompactCount((short) (info.compactCount + 1));
 
-      checkSize(record.getEncodeSize(), info.compactCount);
+      checkSizeAndCompactSplit(record.getEncodeSize(), info.compactCount);
 
       newTransaction.addPositive(currentFile, info.id, record.getEncodeSize(), 
info.replaceableUpdate);
 
@@ -460,7 +441,7 @@ public class JournalCompactor extends 
AbstractJournalUpdateTask implements Journ
 
       updateRecord.setCompactCount((short) (info.compactCount + 1));
 
-      checkSize(updateRecord.getEncodeSize(), info.compactCount);
+      checkSizeAndCompactSplit(updateRecord.getEncodeSize(), 
info.compactCount);
 
       JournalRecord newRecord = newRecords.get(info.id);
 
@@ -493,7 +474,7 @@ public class JournalCompactor extends 
AbstractJournalUpdateTask implements Journ
 
       updateRecordTX.setCompactCount((short) (info.compactCount + 1));
 
-      checkSize(updateRecordTX.getEncodeSize(), info.compactCount);
+      checkSizeAndCompactSplit(updateRecordTX.getEncodeSize(), 
info.compactCount);
 
       writeEncoder(updateRecordTX);
 
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/JournalCompactSplitTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/JournalCompactSplitTest.java
new file mode 100644
index 0000000000..32fc3ce68e
--- /dev/null
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/JournalCompactSplitTest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.journal;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
+import org.apache.activemq.artemis.core.io.SequentialFile;
+import org.apache.activemq.artemis.core.io.SequentialFileFactory;
+import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
+import org.apache.activemq.artemis.core.journal.EncoderPersister;
+import org.apache.activemq.artemis.core.journal.LoaderCallback;
+import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
+import org.apache.activemq.artemis.core.journal.RecordInfo;
+import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
+import 
org.apache.activemq.artemis.core.journal.impl.dataformat.ByteArrayEncoding;
+import 
org.apache.activemq.artemis.core.journal.impl.dataformat.JournalAddRecord;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Authored by Fabio Nascimento Brandao through 
https://issues.apache.org/jira/browse/ARTEMIS-3868
+ * Clebert added some refactoring to make it a Unit Test
+ * */
+public class JournalCompactSplitTest extends ActiveMQTestBase {
+   private static final long RECORDS_TO_CREATE = 100;
+
+   private static final int JOURNAL_FILE_SIZE = 100 * 1024;
+
+   private static final int BODY_SIZE = 124;
+
+   @Test
+   public void testJournalSplit() throws Exception {
+
+      SequentialFileFactory fileFactory = new NIOSequentialFileFactory(new 
File(getTemporaryDir()), 1);
+
+      createFileWithRecords(fileFactory);
+
+      createAndCompactJournal(fileFactory);
+   }
+
+   private void createAndCompactJournal(SequentialFileFactory fileFactory) 
throws Exception {
+      final int minFiles = 2;
+      final int compactMinFiles = 2;
+      final int compactPercentage = 30;
+      final String filePrefix = "activemq-data";
+      final String fileExtension = "amq";
+      final int maxAIO = 1024;
+      JournalImpl journalImpl = new JournalImpl(JOURNAL_FILE_SIZE, minFiles, 
2, compactMinFiles, compactPercentage, fileFactory, filePrefix, fileExtension, 
maxAIO, 1);
+
+      AtomicInteger recordCount = new AtomicInteger(0);
+      journalImpl.start();
+      runAfter(journalImpl::stop);
+      journalImpl.load(new LoaderCallback() {
+         @Override
+         public void addPreparedTransaction(PreparedTransactionInfo 
preparedTransaction) {
+
+         }
+
+         @Override
+         public void addRecord(RecordInfo info) {
+            recordCount.incrementAndGet();
+         }
+
+         @Override
+         public void deleteRecord(long id) {
+         }
+
+         @Override
+         public void updateRecord(RecordInfo info) {
+         }
+
+         @Override
+         public void failedTransaction(long transactionID, List<RecordInfo> 
records, List<RecordInfo> recordsToDelete) {
+         }
+      });
+
+      Assert.assertEquals(RECORDS_TO_CREATE, recordCount.get());
+
+      journalImpl.compact();
+      Assert.assertEquals(2, journalImpl.getDataFilesCount());
+   }
+
+   private void createFileWithRecords(SequentialFileFactory fileFactory) 
throws Exception {
+      SequentialFile file = 
fileFactory.createSequentialFile("activemq-data-1.amq", JOURNAL_FILE_SIZE);
+      file.open();
+      file.fill(JOURNAL_FILE_SIZE);
+      file.position(0);
+      ByteBuffer bb = fileFactory.newBuffer(JournalImpl.SIZE_HEADER);
+      ActiveMQBuffer buffer = ActiveMQBuffers.wrappedBuffer(bb);
+      JournalImpl.writeHeader(buffer, 1, 1);
+      file.write(buffer, true);
+
+      byte[] z = new byte[BODY_SIZE];
+      for (int i = 0; i < BODY_SIZE; i++) {
+         z[i] = 'z';
+      }
+      short compactCount = 10;
+
+      for (long i = 0; i < RECORDS_TO_CREATE; i++) {
+         JournalAddRecord record = new JournalAddRecord(true, i, 
JournalImpl.ADD_RECORD, EncoderPersister.getInstance(), new 
ByteArrayEncoding(z));
+         record.setFileID(1);
+         compactCount--;
+         if (compactCount < 0) compactCount = 10;
+
+         // this test is playing with the order of compact count
+         record.setCompactCount(compactCount);
+         file.write(record, false);
+      }
+      file.close();
+   }
+
+
+}

Reply via email to