ARTEMIS-822 Add executor service to JournalImpl for append operations and remove synchronization
https://issues.apache.org/jira/browse/ARTEMIS-822 Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/4b47461f Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/4b47461f Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/4b47461f Branch: refs/heads/master Commit: 4b47461f03a607b9ef517beb2a1666ffae43a2a7 Parents: bfb9bed Author: barreiro <[email protected]> Authored: Fri Jan 22 03:23:26 2016 +0000 Committer: Clebert Suconic <[email protected]> Committed: Fri Oct 28 16:54:59 2016 -0400 ---------------------------------------------------------------------- .../cli/commands/tools/DecodeJournal.java | 20 +- .../activemq/artemis/utils/ExecutorFactory.java | 24 + .../artemis/utils/OrderedExecutorFactory.java | 127 ++++ .../activemq/artemis/utils/SimpleFuture.java | 79 +++ .../artemis/utils/SimpleFutureTest.java | 69 ++ .../activemq/artemis/utils/ExecutorFactory.java | 24 - .../artemis/utils/OrderedExecutorFactory.java | 128 ---- .../artemis/core/journal/impl/JournalImpl.java | 662 +++++++++++-------- .../core/journal/impl/JournalTransaction.java | 46 +- .../artemis/journal/ActiveMQJournalLogger.java | 12 +- .../journal/impl/AlignedJournalImplTest.java | 39 +- .../core/journal/impl/JournalAsyncTest.java | 15 +- 12 files changed, 761 insertions(+), 484 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4b47461f/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/DecodeJournal.java ---------------------------------------------------------------------- diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/DecodeJournal.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/DecodeJournal.java index b392f6f..f290eba 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/DecodeJournal.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/DecodeJournal.java @@ -33,7 +33,6 @@ import org.apache.activemq.artemis.cli.commands.ActionContext; import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory; import org.apache.activemq.artemis.core.journal.RecordInfo; import org.apache.activemq.artemis.core.journal.impl.JournalImpl; -import org.apache.activemq.artemis.core.journal.impl.JournalRecord; import org.apache.activemq.artemis.utils.Base64; @Command(name = "decode", description = "Decode a journal's internal format into a new journal set of files") @@ -125,8 +124,6 @@ public class DecodeJournal extends LockAbstract { long lineNumber = 0; - Map<Long, JournalRecord> journalRecords = journal.getRecords(); - while ((line = buffReader.readLine()) != null) { lineNumber++; String[] splitLine = line.split(","); @@ -150,12 +147,6 @@ public class DecodeJournal extends LockAbstract { counter.incrementAndGet(); RecordInfo info = parseRecord(lineProperties); journal.appendAddRecordTransactional(txID, info.id, info.userRecordType, info.data); - } else if (operation.equals("AddRecordTX")) { - long txID = parseLong("txID", lineProperties); - AtomicInteger counter = getCounter(txID, txCounters); - counter.incrementAndGet(); - RecordInfo info = parseRecord(lineProperties); - journal.appendAddRecordTransactional(txID, info.id, info.userRecordType, info.data); } else if (operation.equals("UpdateTX")) { long txID = parseLong("txID", lineProperties); AtomicInteger counter = getCounter(txID, txCounters); @@ -168,20 +159,17 @@ public class DecodeJournal extends LockAbstract { } else if (operation.equals("DeleteRecord")) { long id = parseLong("id", lineProperties); - // If not found it means the append/update records were reclaimed already - if (journalRecords.get(id) != null) { + try { journal.appendDeleteRecord(id, false); + } catch (IllegalStateException ignored) { + // If not found it means the append/update records were reclaimed already } } else if (operation.equals("DeleteRecordTX")) { long txID = parseLong("txID", lineProperties); long id = parseLong("id", lineProperties); AtomicInteger counter = getCounter(txID, txCounters); counter.incrementAndGet(); - - // If not found it means the append/update records were reclaimed already - if (journalRecords.get(id) != null) { - journal.appendDeleteRecordTransactional(txID, id); - } + journal.appendDeleteRecordTransactional(txID, id); } else if (operation.equals("Prepare")) { long txID = parseLong("txID", lineProperties); int numberOfRecords = parseInt("numberOfRecords", lineProperties); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4b47461f/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ExecutorFactory.java ---------------------------------------------------------------------- diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ExecutorFactory.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ExecutorFactory.java new file mode 100644 index 0000000..dd0209b --- /dev/null +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ExecutorFactory.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.utils; + +import java.util.concurrent.Executor; + +public interface ExecutorFactory { + + Executor getExecutor(); +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4b47461f/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/OrderedExecutorFactory.java ---------------------------------------------------------------------- diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/OrderedExecutorFactory.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/OrderedExecutorFactory.java new file mode 100644 index 0000000..c7d5c03 --- /dev/null +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/OrderedExecutorFactory.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.utils; + +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; + +import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException; +import org.jboss.logging.Logger; + +/** + * A factory for producing executors that run all tasks in order, which delegate to a single common executor instance. + */ +public final class OrderedExecutorFactory implements ExecutorFactory { + + private static final Logger logger = Logger.getLogger(OrderedExecutorFactory.class); + + private final Executor parent; + + /** + * Construct a new instance delegating to the given parent executor. + * + * @param parent the parent executor + */ + public OrderedExecutorFactory(final Executor parent) { + this.parent = parent; + } + + /** + * Get an executor that always executes tasks in order. + * + * @return an ordered executor + */ + @Override + public Executor getExecutor() { + return new OrderedExecutor(parent); + } + + /** + * An executor that always runs all tasks in order, using a delegate executor to run the tasks. + * <br> + * More specifically, any call B to the {@link #execute(Runnable)} method that happens-after another call A to the + * same method, will result in B's task running after A's. + */ + private static class OrderedExecutor implements Executor { + + private final Queue<Runnable> tasks = new ConcurrentLinkedQueue<>(); + private final Executor delegate; + private final ExecutorTask task = new ExecutorTask(); + + // used by stateUpdater + @SuppressWarnings("unused") + private volatile int state = 0; + + private static final AtomicIntegerFieldUpdater<OrderedExecutor> stateUpdater = AtomicIntegerFieldUpdater.newUpdater(OrderedExecutor.class, "state"); + + private static final int STATE_NOT_RUNNING = 0; + private static final int STATE_RUNNING = 1; + + private OrderedExecutor(Executor delegate) { + this.delegate = delegate; + } + + @Override + public void execute(Runnable command) { + tasks.add(command); + if (stateUpdater.get(this) == STATE_NOT_RUNNING) { + //note that this can result in multiple tasks being queued + //this is not an issue as the CAS will mean that the second (and subsequent) execution is ignored + delegate.execute(task); + } + } + + private final class ExecutorTask implements Runnable { + + @Override + public void run() { + do { + //if there is no thread active then we run + if (stateUpdater.compareAndSet(OrderedExecutor.this, STATE_NOT_RUNNING, STATE_RUNNING)) { + Runnable task = tasks.poll(); + //while the queue is not empty we process in order + while (task != null) { + try { + task.run(); + } catch (ActiveMQInterruptedException e) { + // This could happen during shutdowns. Nothing to be concerned about here + logger.debug("Interrupted Thread", e); + } catch (Throwable t) { + logger.warn(t.getMessage(), t); + } + task = tasks.poll(); + } + //set state back to not running. + stateUpdater.set(OrderedExecutor.this, STATE_NOT_RUNNING); + } else { + return; + } + //we loop again based on tasks not being empty. Otherwise there is a window where the state is running, + //but poll() has returned null, so a submitting thread will believe that it does not need re-execute. + //this check fixes the issue + } while (!tasks.isEmpty()); + } + } + + @Override + public String toString() { + return "OrderedExecutor(tasks=" + tasks + ")"; + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4b47461f/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/SimpleFuture.java ---------------------------------------------------------------------- diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/SimpleFuture.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/SimpleFuture.java new file mode 100644 index 0000000..eedfef4 --- /dev/null +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/SimpleFuture.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.utils; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +public class SimpleFuture<V> implements Future<V> { + + public SimpleFuture() { + } + + V value; + Exception exception; + + private final CountDownLatch latch = new CountDownLatch(1); + + boolean canceled = false; + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + canceled = true; + latch.countDown(); + return true; + } + + @Override + public boolean isCancelled() { + return canceled; + } + + @Override + public boolean isDone() { + return latch.getCount() <= 0; + } + + public void fail(Exception e) { + this.exception = e; + latch.countDown(); + } + + @Override + public V get() throws InterruptedException, ExecutionException { + latch.await(); + if (this.exception != null) { + throw new ExecutionException(this.exception); + } + return value; + } + + public void set(V v) { + this.value = v; + latch.countDown(); + } + + @Override + public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + latch.await(timeout, unit); + return value; + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4b47461f/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/SimpleFutureTest.java ---------------------------------------------------------------------- diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/SimpleFutureTest.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/SimpleFutureTest.java new file mode 100644 index 0000000..00fd5d7 --- /dev/null +++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/SimpleFutureTest.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.utils; + +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; + +public class SimpleFutureTest { + + @Rule + public ThreadLeakCheckRule threadLeakCheckRule = new ThreadLeakCheckRule(); + + @Test + public void testFuture() throws Exception { + final long randomStart = System.currentTimeMillis(); + final SimpleFuture<Long> simpleFuture = new SimpleFuture<>(); + Thread t = new Thread() { + @Override + public void run() { + simpleFuture.set(randomStart); + } + }; + t.start(); + + Assert.assertEquals(randomStart, simpleFuture.get().longValue()); + } + + + @Test + public void testException() throws Exception { + final SimpleFuture<Long> simpleFuture = new SimpleFuture<>(); + Thread t = new Thread() { + @Override + public void run() { + simpleFuture.fail(new Exception("hello")); + } + }; + t.start(); + + boolean failed = false; + try { + simpleFuture.get(); + } catch (Exception e) { + failed = true; + } + + + Assert.assertTrue(failed); + } + + + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4b47461f/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/ExecutorFactory.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/ExecutorFactory.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/ExecutorFactory.java deleted file mode 100644 index dd0209b..0000000 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/ExecutorFactory.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.utils; - -import java.util.concurrent.Executor; - -public interface ExecutorFactory { - - Executor getExecutor(); -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4b47461f/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/OrderedExecutorFactory.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/OrderedExecutorFactory.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/OrderedExecutorFactory.java deleted file mode 100644 index 609af8e..0000000 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/OrderedExecutorFactory.java +++ /dev/null @@ -1,128 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.utils; - -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; - -import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException; -import org.apache.activemq.artemis.core.client.ActiveMQClientLogger; -import org.jboss.logging.Logger; - -/** - * A factory for producing executors that run all tasks in order, which delegate to a single common executor instance. - */ -public final class OrderedExecutorFactory implements ExecutorFactory { - - private static final Logger logger = Logger.getLogger(OrderedExecutorFactory.class); - - private final Executor parent; - - /** - * Construct a new instance delegating to the given parent executor. - * - * @param parent the parent executor - */ - public OrderedExecutorFactory(final Executor parent) { - this.parent = parent; - } - - /** - * Get an executor that always executes tasks in order. - * - * @return an ordered executor - */ - @Override - public Executor getExecutor() { - return new OrderedExecutor(parent); - } - - /** - * An executor that always runs all tasks in order, using a delegate executor to run the tasks. - * <br> - * More specifically, any call B to the {@link #execute(Runnable)} method that happens-after another call A to the - * same method, will result in B's task running after A's. - */ - private static class OrderedExecutor implements Executor { - - private final Queue<Runnable> tasks = new ConcurrentLinkedQueue<>(); - private final Executor delegate; - private final ExecutorTask task = new ExecutorTask(); - - // used by stateUpdater - @SuppressWarnings("unused") - private volatile int state = 0; - - private static final AtomicIntegerFieldUpdater<OrderedExecutor> stateUpdater = AtomicIntegerFieldUpdater.newUpdater(OrderedExecutor.class, "state"); - - private static final int STATE_NOT_RUNNING = 0; - private static final int STATE_RUNNING = 1; - - private OrderedExecutor(Executor delegate) { - this.delegate = delegate; - } - - @Override - public void execute(Runnable command) { - tasks.add(command); - if (stateUpdater.get(this) == STATE_NOT_RUNNING) { - //note that this can result in multiple tasks being queued - //this is not an issue as the CAS will mean that the second (and subsequent) execution is ignored - delegate.execute(task); - } - } - - private final class ExecutorTask implements Runnable { - - @Override - public void run() { - do { - //if there is no thread active then we run - if (stateUpdater.compareAndSet(OrderedExecutor.this, STATE_NOT_RUNNING, STATE_RUNNING)) { - Runnable task = tasks.poll(); - //while the queue is not empty we process in order - while (task != null) { - try { - task.run(); - } catch (ActiveMQInterruptedException e) { - // This could happen during shutdowns. Nothing to be concerned about here - logger.debug("Interrupted Thread", e); - } catch (Throwable t) { - ActiveMQClientLogger.LOGGER.caughtunexpectedThrowable(t); - } - task = tasks.poll(); - } - //set state back to not running. - stateUpdater.set(OrderedExecutor.this, STATE_NOT_RUNNING); - } else { - return; - } - //we loop again based on tasks not being empty. Otherwise there is a window where the state is running, - //but poll() has returned null, so a submitting thread will believe that it does not need re-execute. - //this check fixes the issue - } while (!tasks.isEmpty()); - } - } - - @Override - public String toString() { - return "OrderedExecutor(tasks=" + tasks + ")"; - } - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4b47461f/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java index b6d5e62..43db1f7 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java @@ -29,11 +29,13 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -45,6 +47,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.api.core.Pair; +import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.io.SequentialFileFactory; @@ -160,6 +163,8 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal // Compacting may replace this structure private final ConcurrentMap<Long, JournalRecord> records = new ConcurrentHashMap<>(); + private final Set<Long> pendingRecords = Collections.newSetFromMap(new ConcurrentHashMap<Long, Boolean>()); + // Compacting may replace this structure private final ConcurrentMap<Long, JournalTransaction> transactions = new ConcurrentHashMap<>(); @@ -172,12 +177,9 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal private ExecutorService compactorExecutor = null; - private ConcurrentHashSet<CountDownLatch> latches = new ConcurrentHashSet<>(); + private ExecutorService appendExecutor = null; - // Lock used during the append of records - // This lock doesn't represent a global lock. - // After a record is appended, the usedFile can't be changed until the positives and negatives are updated - private final Object lockAppend = new Object(); + private ConcurrentHashSet<CountDownLatch> latches = new ConcurrentHashSet<>(); /** * We don't lock the journal during the whole compacting operation. During compacting we only @@ -688,32 +690,37 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal final boolean sync, final IOCompletion callback) throws Exception { checkJournalIsLoaded(); + lineUpContext(callback); + pendingRecords.add(id); - journalLock.readLock().lock(); - - try { - JournalInternalRecord addRecord = new JournalAddRecord(true, id, recordType, record); - - if (callback != null) { - callback.storeLineUp(); - } - - synchronized (lockAppend) { - JournalFile usedFile = appendRecord(addRecord, false, sync, null, callback); + Future<?> result = appendExecutor.submit(new Runnable() { + @Override + public void run() { + journalLock.readLock().lock(); + try { + JournalInternalRecord addRecord = new JournalAddRecord(true, id, recordType, record); + JournalFile usedFile = appendRecord(addRecord, false, sync, null, callback); + records.put(id, new JournalRecord(usedFile, addRecord.getEncodeSize())); - if (logger.isTraceEnabled()) { - logger.trace("appendAddRecord::id=" + id + - ", userRecordType=" + - recordType + - ", record = " + record + - ", usedFile = " + - usedFile); + if (logger.isTraceEnabled()) { + logger.trace("appendAddRecord::id=" + id + + ", userRecordType=" + + recordType + + ", record = " + record + + ", usedFile = " + + usedFile); + } + } catch (Exception e) { + ActiveMQJournalLogger.LOGGER.error(e.getMessage(), e); + } finally { + pendingRecords.remove(id); + journalLock.readLock().unlock(); } - - records.put(id, new JournalRecord(usedFile, addRecord.getEncodeSize())); } - } finally { - journalLock.readLock().unlock(); + }); + + if (sync && callback == null) { + result.get(); } } @@ -724,94 +731,86 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal final boolean sync, final IOCompletion callback) throws Exception { checkJournalIsLoaded(); + lineUpContext(callback); + checkKnownRecordID(id); - journalLock.readLock().lock(); + Future<?> result = appendExecutor.submit(new Runnable() { + @Override + public void run() { + journalLock.readLock().lock(); + try { + JournalRecord jrnRecord = records.get(id); + JournalInternalRecord updateRecord = new JournalAddRecord(false, id, recordType, record); + JournalFile usedFile = appendRecord(updateRecord, false, sync, null, callback); - try { - JournalRecord jrnRecord = records.get(id); + if (logger.isTraceEnabled()) { + logger.trace("appendUpdateRecord::id=" + id + + ", userRecordType=" + + recordType + + ", usedFile = " + + usedFile); + } - if (jrnRecord == null) { - if (!(compactor != null && compactor.lookupRecord(id))) { - throw new IllegalStateException("Cannot find add info " + id); + // record==null here could only mean there is a compactor + // computing the delete should be done after compacting is done + if (jrnRecord == null) { + compactor.addCommandUpdate(id, usedFile, updateRecord.getEncodeSize()); + } else { + jrnRecord.addUpdateFile(usedFile, updateRecord.getEncodeSize()); + } + } catch (Exception e) { + ActiveMQJournalLogger.LOGGER.error(e.getMessage(), e); + } finally { + journalLock.readLock().unlock(); } } + }); - JournalInternalRecord updateRecord = new JournalAddRecord(false, id, recordType, record); - - if (callback != null) { - callback.storeLineUp(); - } - - synchronized (lockAppend) { - JournalFile usedFile = appendRecord(updateRecord, false, sync, null, callback); - - if (logger.isTraceEnabled()) { - logger.trace("appendUpdateRecord::id=" + id + - ", userRecordType=" + - recordType + - ", record = " + record + - ", usedFile = " + - usedFile); - } - - // record== null here could only mean there is a compactor, and computing the delete should be done after - // compacting is done - if (jrnRecord == null) { - compactor.addCommandUpdate(id, usedFile, updateRecord.getEncodeSize()); - } else { - jrnRecord.addUpdateFile(usedFile, updateRecord.getEncodeSize()); - } - } - } finally { - journalLock.readLock().unlock(); + if (sync && callback == null) { + result.get(); } } @Override public void appendDeleteRecord(final long id, final boolean sync, final IOCompletion callback) throws Exception { checkJournalIsLoaded(); + lineUpContext(callback); + checkKnownRecordID(id); - journalLock.readLock().lock(); - try { + Future<?> result = appendExecutor.submit(new Runnable() { + @Override + public void run() { + journalLock.readLock().lock(); + try { + JournalRecord record = null; + if (compactor == null) { + record = records.remove(id); + } - JournalRecord record = null; + JournalInternalRecord deleteRecord = new JournalDeleteRecord(id); + JournalFile usedFile = appendRecord(deleteRecord, false, sync, null, callback); - if (compactor == null) { - record = records.remove(id); + if (logger.isTraceEnabled()) { + logger.trace("appendDeleteRecord::id=" + id + ", usedFile = " + usedFile); + } - if (record == null) { - throw new IllegalStateException("Cannot find add info " + id); - } - } else { - if (!records.containsKey(id) && !compactor.lookupRecord(id)) { - throw new IllegalStateException("Cannot find add info " + id + " on compactor or current records"); + // record==null here could only mean there is a compactor + // computing the delete should be done after compacting is done + if (record == null) { + compactor.addCommandDelete(id, usedFile); + } else { + record.delete(usedFile); + } + } catch (Exception e) { + ActiveMQJournalLogger.LOGGER.error(e.getMessage(), e); + } finally { + journalLock.readLock().unlock(); } } + }); - JournalInternalRecord deleteRecord = new JournalDeleteRecord(id); - - if (callback != null) { - callback.storeLineUp(); - } - - synchronized (lockAppend) { - JournalFile usedFile = appendRecord(deleteRecord, false, sync, null, callback); - - if (logger.isTraceEnabled()) { - logger.trace("appendDeleteRecord::id=" + id + ", usedFile = " + usedFile); - } - - // record== null here could only mean there is a compactor, and computing the delete should be done after - // compacting is done - if (record == null) { - compactor.addCommandDelete(id, usedFile); - } else { - record.delete(usedFile); - } - - } - } finally { - journalLock.readLock().unlock(); + if (sync && callback == null) { + result.get(); } } @@ -822,31 +821,62 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal final EncodingSupport record) throws Exception { checkJournalIsLoaded(); - journalLock.readLock().lock(); + final JournalTransaction tx = getTransactionInfo(txID); + tx.checkErrorCondition(); - try { - JournalInternalRecord addRecord = new JournalAddRecordTX(true, txID, id, recordType, record); + appendExecutor.submit(new Runnable() { - JournalTransaction tx = getTransactionInfo(txID); + @Override + public void run() { + journalLock.readLock().lock(); + try { + JournalInternalRecord addRecord = new JournalAddRecordTX(true, txID, id, recordType, record); + JournalFile usedFile = appendRecord(addRecord, false, false, tx, null); - synchronized (lockAppend) { - JournalFile usedFile = appendRecord(addRecord, false, false, tx, null); + if (logger.isTraceEnabled()) { + logger.trace("appendAddRecordTransactional:txID=" + txID + + ",id=" + + id + + ", userRecordType=" + + recordType + + ", record = " + record + + ", usedFile = " + + usedFile); + } - if (logger.isTraceEnabled()) { - logger.trace("appendAddRecordTransactional:txID=" + txID + - ",id=" + - id + - ", userRecordType=" + - recordType + - ", record = " + record + - ", usedFile = " + - usedFile); + tx.addPositive(usedFile, id, addRecord.getEncodeSize()); + } catch (Exception e) { + ActiveMQJournalLogger.LOGGER.error(e.getMessage(), e); + setErrorCondition(tx, e); + } finally { + journalLock.readLock().unlock(); } + } + }); + } + + private void checkKnownRecordID(final long id) throws Exception { + if (records.containsKey(id) || pendingRecords.contains(id) || (compactor != null && compactor.lookupRecord(id))) { + return; + } - tx.addPositive(usedFile, id, addRecord.getEncodeSize()); + // retry on the append thread. maybe the appender thread is not keeping up. + Future<Boolean> known = appendExecutor.submit(new Callable<Boolean>() { + @Override + public Boolean call() throws Exception { + journalLock.readLock().lock(); + try { + return records.containsKey(id) + || pendingRecords.contains(id) + || (compactor != null && compactor.lookupRecord(id)); + } finally { + journalLock.readLock().unlock(); + } } - } finally { - journalLock.readLock().unlock(); + }); + + if (!known.get()) { + throw new IllegalStateException("Cannot find add info " + id + " on compactor or current records"); } } @@ -867,32 +897,39 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal final EncodingSupport record) throws Exception { checkJournalIsLoaded(); - journalLock.readLock().lock(); + final JournalTransaction tx = getTransactionInfo(txID); + tx.checkErrorCondition(); - try { - JournalInternalRecord updateRecordTX = new JournalAddRecordTX(false, txID, id, recordType, record); + appendExecutor.submit(new Runnable() { - JournalTransaction tx = getTransactionInfo(txID); + @Override + public void run() { + journalLock.readLock().lock(); + try { - synchronized (lockAppend) { - JournalFile usedFile = appendRecord(updateRecordTX, false, false, tx, null); + JournalInternalRecord updateRecordTX = new JournalAddRecordTX( false, txID, id, recordType, record ); + JournalFile usedFile = appendRecord( updateRecordTX, false, false, tx, null ); + + if ( logger.isTraceEnabled() ) { + logger.trace( "appendUpdateRecordTransactional::txID=" + txID + + ",id=" + + id + + ", userRecordType=" + + recordType + + ", record = " + record + + ", usedFile = " + + usedFile ); + } - if (logger.isTraceEnabled()) { - logger.trace("appendUpdateRecordTransactional::txID=" + txID + - ",id=" + - id + - ", userRecordType=" + - recordType + - ", record = " + record + - ", usedFile = " + - usedFile); + tx.addPositive( usedFile, id, updateRecordTX.getEncodeSize() ); + } catch ( Exception e ) { + ActiveMQJournalLogger.LOGGER.error( e.getMessage(), e ); + setErrorCondition( tx, e ); + } finally { + journalLock.readLock().unlock(); } - - tx.addPositive(usedFile, id, updateRecordTX.getEncodeSize()); } - } finally { - journalLock.readLock().unlock(); - } + }); } @Override @@ -901,29 +938,35 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal final EncodingSupport record) throws Exception { checkJournalIsLoaded(); - journalLock.readLock().lock(); + final JournalTransaction tx = getTransactionInfo(txID); + tx.checkErrorCondition(); - try { - JournalInternalRecord deleteRecordTX = new JournalDeleteRecordTX(txID, id, record); + appendExecutor.submit(new Runnable() { + @Override + public void run() { + journalLock.readLock().lock(); + try { - JournalTransaction tx = getTransactionInfo(txID); + JournalInternalRecord deleteRecordTX = new JournalDeleteRecordTX(txID, id, record); + JournalFile usedFile = appendRecord(deleteRecordTX, false, false, tx, null); - synchronized (lockAppend) { - JournalFile usedFile = appendRecord(deleteRecordTX, false, false, tx, null); + if (logger.isTraceEnabled()) { + logger.trace("appendDeleteRecordTransactional::txID=" + txID + + ", id=" + + id + + ", usedFile = " + + usedFile); + } - if (logger.isTraceEnabled()) { - logger.trace("appendDeleteRecordTransactional::txID=" + txID + - ", id=" + - id + - ", usedFile = " + - usedFile); + tx.addNegative(usedFile, id); + } catch (Exception e) { + ActiveMQJournalLogger.LOGGER.error(e.getMessage(), e); + setErrorCondition(tx, e); + } finally { + journalLock.readLock().unlock(); } - - tx.addNegative(usedFile, id); } - } finally { - journalLock.readLock().unlock(); - } + }); } /** @@ -943,36 +986,53 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal final IOCompletion callback) throws Exception { checkJournalIsLoaded(); + lineUpContext(callback); - journalLock.readLock().lock(); - - try { - JournalTransaction tx = getTransactionInfo(txID); - - JournalInternalRecord prepareRecord = new JournalCompleteRecordTX(TX_RECORD_TYPE.PREPARE, txID, transactionData); + final JournalTransaction tx = getTransactionInfo(txID); + tx.checkErrorCondition(); - if (callback != null) { - callback.storeLineUp(); - } + Future<?> result = appendExecutor.submit(new Runnable() { + @Override + public void run() { + journalLock.readLock().lock(); + try { + JournalInternalRecord prepareRecord = new JournalCompleteRecordTX(TX_RECORD_TYPE.PREPARE, txID, transactionData); + JournalFile usedFile = appendRecord(prepareRecord, true, sync, tx, callback); - synchronized (lockAppend) { - JournalFile usedFile = appendRecord(prepareRecord, true, sync, tx, callback); + if (logger.isTraceEnabled()) { + logger.trace("appendPrepareRecord::txID=" + txID + ", usedFile = " + usedFile); + } - if (logger.isTraceEnabled()) { - logger.trace("appendPrepareRecord::txID=" + txID + ", usedFile = " + usedFile); + tx.prepare(usedFile); + } catch (Exception e) { + ActiveMQJournalLogger.LOGGER.error(e.getMessage(), e); + setErrorCondition(tx, e); + } finally { + journalLock.readLock().unlock(); } - - tx.prepare(usedFile); } + }); - } finally { - journalLock.readLock().unlock(); + if (sync && callback == null) { + result.get(); + tx.checkErrorCondition(); } } @Override public void lineUpContext(IOCompletion callback) { - callback.storeLineUp(); + if (callback != null) { + callback.storeLineUp(); + } + } + + private void setErrorCondition(JournalTransaction jt, Throwable t) { + if (jt != null) { + TransactionCallback callback = jt.getCurrentCallback(); + if (callback != null && callback.getErrorMessage() != null) { + callback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), t.getMessage()); + } + } } /** @@ -982,68 +1042,83 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal public void appendCommitRecord(final long txID, final boolean sync, final IOCompletion callback, - boolean lineUpContext) throws Exception { + final boolean lineUpContext) throws Exception { checkJournalIsLoaded(); + if (lineUpContext) { + lineUpContext(callback); + } - journalLock.readLock().lock(); + final JournalTransaction tx = transactions.remove(txID); - try { - JournalTransaction tx = transactions.remove(txID); + if (tx == null) { + throw new IllegalStateException("Cannot find tx with id " + txID); + } - if (tx == null) { - throw new IllegalStateException("Cannot find tx with id " + txID); - } + tx.checkErrorCondition(); - JournalInternalRecord commitRecord = new JournalCompleteRecordTX(TX_RECORD_TYPE.COMMIT, txID, null); + Future<?> result = appendExecutor.submit(new Runnable() { + @Override + public void run() { + journalLock.readLock().lock(); + try { + JournalInternalRecord commitRecord = new JournalCompleteRecordTX(TX_RECORD_TYPE.COMMIT, txID, null); + JournalFile usedFile = appendRecord(commitRecord, true, sync, tx, callback); - if (callback != null && lineUpContext) { - callback.storeLineUp(); - } - synchronized (lockAppend) { - JournalFile usedFile = appendRecord(commitRecord, true, sync, tx, callback); + if (logger.isTraceEnabled()) { + logger.trace("appendCommitRecord::txID=" + txID + ", usedFile = " + usedFile); + } - if (logger.isTraceEnabled()) { - logger.trace("appendCommitRecord::txID=" + txID + ", usedFile = " + usedFile); + tx.commit(usedFile); + } catch (Exception e) { + ActiveMQJournalLogger.LOGGER.error(e.getMessage(), e); + setErrorCondition(tx, e); + } finally { + journalLock.readLock().unlock(); } - - tx.commit(usedFile); } + }); - } finally { - journalLock.readLock().unlock(); + if (sync && callback == null) { + result.get(); + tx.checkErrorCondition(); } } @Override public void appendRollbackRecord(final long txID, final boolean sync, final IOCompletion callback) throws Exception { checkJournalIsLoaded(); + lineUpContext(callback); - journalLock.readLock().lock(); - - JournalTransaction tx = null; - - try { - tx = transactions.remove(txID); - - if (tx == null) { - throw new IllegalStateException("Cannot find tx with id " + txID); - } + final JournalTransaction tx = transactions.remove(txID); - JournalInternalRecord rollbackRecord = new JournalRollbackRecordTX(txID); + if (tx == null) { + throw new IllegalStateException("Cannot find tx with id " + txID); + } - if (callback != null) { - callback.storeLineUp(); - } + tx.checkErrorCondition(); - synchronized (lockAppend) { - JournalFile usedFile = appendRecord(rollbackRecord, false, sync, tx, callback); + Future<?> result = appendExecutor.submit(new Runnable() { + @Override + public void run() { + journalLock.readLock().lock(); + try { + JournalInternalRecord rollbackRecord = new JournalRollbackRecordTX(txID); + JournalFile usedFile = appendRecord(rollbackRecord, false, sync, tx, callback); - tx.rollback(usedFile); + tx.rollback(usedFile); + } catch (Exception e) { + ActiveMQJournalLogger.LOGGER.error(e.getMessage(), e); + setErrorCondition(tx, e); + } finally { + journalLock.readLock().unlock(); + } } + }); - } finally { - journalLock.readLock().unlock(); + if (sync && callback == null) { + result.get(); + tx.checkErrorCondition(); } } @@ -1906,13 +1981,23 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal public void debugWait() throws InterruptedException { fileFactory.flush(); - for (JournalTransaction tx : transactions.values()) { - tx.waitCallbacks(); + if (appendExecutor != null && !appendExecutor.isShutdown()) { + // Send something to the closingExecutor, just to make sure we went until its end + final CountDownLatch latch = newLatch(1); + + appendExecutor.execute(new Runnable() { + + @Override + public void run() { + latch.countDown(); + } + + }); + awaitLatch(latch, -1); } if (filesExecutor != null && !filesExecutor.isShutdown()) { - // Send something to the closingExecutor, just to make sure we went - // until its end + // Send something to the closingExecutor, just to make sure we went until its end final CountDownLatch latch = newLatch(1); filesExecutor.execute(new Runnable() { @@ -1985,20 +2070,52 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal // In some tests we need to force the journal to move to a next file @Override public void forceMoveNextFile() throws Exception { - journalLock.readLock().lock(); + debugWait(); + journalLock.writeLock().lock(); try { - synchronized (lockAppend) { - moveNextFile(false); - debugWait(); - } + moveNextFile(false); } finally { - journalLock.readLock().unlock(); + journalLock.writeLock().unlock(); } } @Override public void perfBlast(final int pages) { - new PerfBlast(pages).start(); + + checkJournalIsLoaded(); + + final ByteArrayEncoding byteEncoder = new ByteArrayEncoding(new byte[128 * 1024]); + + final JournalInternalRecord blastRecord = new JournalInternalRecord() { + + @Override + public int getEncodeSize() { + return byteEncoder.getEncodeSize(); + } + + @Override + public void encode(final ActiveMQBuffer buffer) { + byteEncoder.encode(buffer); + } + }; + + appendExecutor.submit(new Runnable() { + @Override + public void run() { + journalLock.readLock().lock(); + try { + + for (int i = 0; i < pages; i++) { + appendRecord(blastRecord, false, false, null, null); + } + + } catch (Exception e) { + ActiveMQJournalLogger.LOGGER.failedToPerfBlast(e); + } finally { + journalLock.readLock().unlock(); + } + } + }); } // ActiveMQComponent implementation @@ -2031,6 +2148,14 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal } }); + appendExecutor = Executors.newSingleThreadExecutor(new ThreadFactory() { + + @Override + public Thread newThread(final Runnable r) { + return new Thread(r, "JournalImpl::appendExecutor"); + } + }); + filesRepository.setExecutor(filesExecutor); fileFactory.start(); @@ -2044,46 +2169,50 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal throw new IllegalStateException("Journal is already stopped"); } - journalLock.writeLock().lock(); - try { - synchronized (lockAppend) { + setJournalState(JournalState.STOPPED); - setJournalState(JournalState.STOPPED); + // appendExecutor must be shut down first + appendExecutor.shutdown(); - compactorExecutor.shutdown(); + if (!appendExecutor.awaitTermination(60, TimeUnit.SECONDS)) { + ActiveMQJournalLogger.LOGGER.couldNotStopJournalAppendExecutor(); + } - if (!compactorExecutor.awaitTermination(120, TimeUnit.SECONDS)) { - ActiveMQJournalLogger.LOGGER.couldNotStopCompactor(); - } + journalLock.writeLock().lock(); + try { + compactorExecutor.shutdown(); - filesExecutor.shutdown(); + if (!compactorExecutor.awaitTermination(120, TimeUnit.SECONDS)) { + ActiveMQJournalLogger.LOGGER.couldNotStopCompactor(); + } - filesRepository.setExecutor(null); + filesExecutor.shutdown(); - if (!filesExecutor.awaitTermination(60, TimeUnit.SECONDS)) { - ActiveMQJournalLogger.LOGGER.couldNotStopJournalExecutor(); - } + filesRepository.setExecutor(null); - try { - for (CountDownLatch latch : latches) { - latch.countDown(); - } - } catch (Throwable e) { - ActiveMQJournalLogger.LOGGER.warn(e.getMessage(), e); + if (!filesExecutor.awaitTermination(60, TimeUnit.SECONDS)) { + ActiveMQJournalLogger.LOGGER.couldNotStopJournalExecutor(); + } + + try { + for (CountDownLatch latch : latches) { + latch.countDown(); } + } catch (Throwable e) { + ActiveMQJournalLogger.LOGGER.warn(e.getMessage(), e); + } - fileFactory.deactivateBuffer(); + fileFactory.deactivateBuffer(); - if (currentFile != null && currentFile.getFile().isOpen()) { - currentFile.getFile().close(); - } + if (currentFile != null && currentFile.getFile().isOpen()) { + currentFile.getFile().close(); + } - filesRepository.clear(); + filesRepository.clear(); - fileFactory.stop(); + fileFactory.stop(); - currentFile = null; - } + currentFile = null; } finally { journalLock.writeLock().unlock(); } @@ -2358,7 +2487,6 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal final boolean sync, final JournalTransaction tx, final IOCallback parameterCallback) throws Exception { - checkJournalIsLoaded(); final IOCallback callback; @@ -2552,46 +2680,6 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal } } - private final class PerfBlast extends Thread { - - private final int pages; - - private PerfBlast(final int pages) { - super("activemq-perfblast-thread"); - - this.pages = pages; - } - - @Override - public void run() { - synchronized (lockAppend) { - try { - - final ByteArrayEncoding byteEncoder = new ByteArrayEncoding(new byte[128 * 1024]); - - JournalInternalRecord blastRecord = new JournalInternalRecord() { - - @Override - public int getEncodeSize() { - return byteEncoder.getEncodeSize(); - } - - @Override - public void encode(final ActiveMQBuffer buffer) { - byteEncoder.encode(buffer); - } - }; - - for (int i = 0; i < pages; i++) { - appendRecord(blastRecord, false, false, null, null); - } - } catch (Exception e) { - ActiveMQJournalLogger.LOGGER.failedToPerfBlast(e); - } - } - } - } - @Override public final void synchronizationLock() { compactorLock.writeLock().lock(); @@ -2624,7 +2712,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal long maxID = -1; for (long id : fileIds) { maxID = Math.max(maxID, id); - map.put(Long.valueOf(id), filesRepository.createRemoteBackupSyncFile(id)); + map.put(id, filesRepository.createRemoteBackupSyncFile(id)); } filesRepository.setNextFileID(maxID); return map; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4b47461f/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalTransaction.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalTransaction.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalTransaction.java index 6e41c17..1542bd4 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalTransaction.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalTransaction.java @@ -17,11 +17,13 @@ package org.apache.activemq.artemis.core.journal.impl; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; @@ -45,12 +47,14 @@ public class JournalTransaction { private boolean compacting = false; - private Map<JournalFile, TransactionCallback> callbackList; + private final Map<JournalFile, TransactionCallback> callbackList = Collections.synchronizedMap(new HashMap<JournalFile, TransactionCallback>()); private JournalFile lastFile = null; private final AtomicInteger counter = new AtomicInteger(); + private CountDownLatch firstCallbackLatch; + public JournalTransaction(final long id, final JournalRecordProvider journal) { this.id = id; this.journal = journal; @@ -139,9 +143,7 @@ public class JournalTransaction { pendingFiles.clear(); } - if (callbackList != null) { - callbackList.clear(); - } + callbackList.clear(); if (pos != null) { pos.clear(); @@ -156,6 +158,8 @@ public class JournalTransaction { lastFile = null; currentCallback = null; + + firstCallbackLatch = null; } /** @@ -166,9 +170,13 @@ public class JournalTransaction { data.setNumberOfRecords(getCounter(currentFile)); } + public TransactionCallback getCurrentCallback() { + return currentCallback; + } + public TransactionCallback getCallback(final JournalFile file) throws Exception { - if (callbackList == null) { - callbackList = new HashMap<>(); + if (firstCallbackLatch != null && callbackList.isEmpty()) { + firstCallbackLatch.countDown(); } currentCallback = callbackList.get(file); @@ -178,15 +186,19 @@ public class JournalTransaction { callbackList.put(file, currentCallback); } - if (currentCallback.getErrorMessage() != null) { - throw ActiveMQExceptionType.createException(currentCallback.getErrorCode(), currentCallback.getErrorMessage()); - } - currentCallback.countUp(); return currentCallback; } + public void checkErrorCondition() throws Exception { + if (currentCallback != null) { + if (currentCallback.getErrorMessage() != null) { + throw ActiveMQExceptionType.createException(currentCallback.getErrorCode(), currentCallback.getErrorMessage()); + } + } + } + public void addPositive(final JournalFile file, final long id, final int size) { incCounter(file); @@ -264,7 +276,8 @@ public class JournalTransaction { } public void waitCallbacks() throws InterruptedException { - if (callbackList != null) { + waitFirstCallback(); + synchronized (callbackList) { for (TransactionCallback callback : callbackList.values()) { callback.waitCompletion(); } @@ -275,8 +288,15 @@ public class JournalTransaction { * Wait completion at the latest file only */ public void waitCompletion() throws Exception { - if (currentCallback != null) { - currentCallback.waitCompletion(); + waitFirstCallback(); + currentCallback.waitCompletion(); + } + + private void waitFirstCallback() throws InterruptedException { + if (currentCallback == null) { + firstCallbackLatch = new CountDownLatch(1); + firstCallbackLatch.await(); + firstCallbackLatch = null; } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4b47461f/artemis-journal/src/main/java/org/apache/activemq/artemis/journal/ActiveMQJournalLogger.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/journal/ActiveMQJournalLogger.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/journal/ActiveMQJournalLogger.java index 198185c..6758c64 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/journal/ActiveMQJournalLogger.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/journal/ActiveMQJournalLogger.java @@ -143,7 +143,7 @@ public interface ActiveMQJournalLogger extends BasicLogger { void compactReadError(JournalFile file); @LogMessage(level = Logger.Level.WARN) - @Message(id = 142012, value = "Couldn''t find tx={0} to merge after compacting", + @Message(id = 142012, value = "Couldn't find tx={0} to merge after compacting", format = Message.Format.MESSAGE_FORMAT) void compactMergeError(Long id); @@ -163,12 +163,12 @@ public interface ActiveMQJournalLogger extends BasicLogger { void uncomittedTxFound(Long id); @LogMessage(level = Logger.Level.WARN) - @Message(id = 142016, value = "Couldn''t stop compactor executor after 120 seconds", + @Message(id = 142016, value = "Could not stop compactor executor after 120 seconds", format = Message.Format.MESSAGE_FORMAT) void couldNotStopCompactor(); @LogMessage(level = Logger.Level.WARN) - @Message(id = 142017, value = "Couldn''t stop journal executor after 60 seconds", + @Message(id = 142017, value = "Could not stop journal executor after 60 seconds", format = Message.Format.MESSAGE_FORMAT) void couldNotStopJournalExecutor(); @@ -182,7 +182,7 @@ public interface ActiveMQJournalLogger extends BasicLogger { void deletingOrphanedFile(String fileToDelete); @LogMessage(level = Logger.Level.WARN) - @Message(id = 142020, value = "Couldn''t get lock after 60 seconds on closing Asynchronous File: {0}", format = Message.Format.MESSAGE_FORMAT) + @Message(id = 142020, value = "Could not get lock after 60 seconds on closing Asynchronous File: {0}", format = Message.Format.MESSAGE_FORMAT) void errorClosingFile(String fileToDelete); @LogMessage(level = Logger.Level.WARN) @@ -241,6 +241,10 @@ public interface ActiveMQJournalLogger extends BasicLogger { @Message(id = 142034, value = "Exception on submitting write", format = Message.Format.MESSAGE_FORMAT) void errorSubmittingWrite(@Cause Throwable e); + @LogMessage(level = Logger.Level.WARN) + @Message(id = 142035, value = "Could not stop journal append executor after 60 seconds", format = Message.Format.MESSAGE_FORMAT) + void couldNotStopJournalAppendExecutor(); + @LogMessage(level = Logger.Level.ERROR) @Message(id = 144000, value = "Failed to delete file {0}", format = Message.Format.MESSAGE_FORMAT) void errorDeletingFile(Object e); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4b47461f/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/AlignedJournalImplTest.java ---------------------------------------------------------------------- diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/AlignedJournalImplTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/AlignedJournalImplTest.java index 080db78..5e27b36 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/AlignedJournalImplTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/AlignedJournalImplTest.java @@ -532,6 +532,8 @@ public class AlignedJournalImplTest extends ActiveMQTestBase { journalImpl.appendCommitRecord(1L, false); + journalImpl.debugWait(); + System.out.println("Files = " + factory.listFiles("tt")); SequentialFile file = factory.createSequentialFile("tt-1.tt"); @@ -598,6 +600,8 @@ public class AlignedJournalImplTest extends ActiveMQTestBase { journalImpl.appendCommitRecord(2L, false); + journalImpl.debugWait(); + SequentialFile file = factory.createSequentialFile("tt-1.tt"); file.open(); @@ -697,6 +701,8 @@ public class AlignedJournalImplTest extends ActiveMQTestBase { journalImpl.appendCommitRecord(1L, false); + journalImpl.debugWait(); + SequentialFile file = factory.createSequentialFile("tt-1.tt"); file.open(); @@ -936,8 +942,7 @@ public class AlignedJournalImplTest extends ActiveMQTestBase { journalImpl.forceMoveNextFile(); - // Reclaiming should still be able to reclaim a file if a transaction was - // ignored + // Reclaiming should still be able to reclaim a file if a transaction was ignored journalImpl.checkReclaimStatus(); Assert.assertEquals(2, factory.listFiles("tt").size()); @@ -1109,7 +1114,16 @@ public class AlignedJournalImplTest extends ActiveMQTestBase { } @Test - public void testReclaimingAfterConcurrentAddsAndDeletes() throws Exception { + public void testReclaimingAfterConcurrentAddsAndDeletesTx() throws Exception { + testReclaimingAfterConcurrentAddsAndDeletes(true); + } + + @Test + public void testReclaimingAfterConcurrentAddsAndDeletesNonTx() throws Exception { + testReclaimingAfterConcurrentAddsAndDeletes(false); + } + + public void testReclaimingAfterConcurrentAddsAndDeletes(final boolean transactional) throws Exception { final int JOURNAL_SIZE = 10 * 1024; setupAndLoadJournal(JOURNAL_SIZE, 1); @@ -1131,8 +1145,14 @@ public class AlignedJournalImplTest extends ActiveMQTestBase { latchReady.countDown(); ActiveMQTestBase.waitForLatch(latchStart); for (int i = 0; i < NUMBER_OF_ELEMENTS; i++) { - journalImpl.appendAddRecordTransactional(i, i, (byte) 1, new SimpleEncoding(50, (byte) 1)); - journalImpl.appendCommitRecord(i, false); + + if (transactional) { + journalImpl.appendAddRecordTransactional(i, i, (byte) 1, new SimpleEncoding(50, (byte) 1)); + journalImpl.appendCommitRecord(i, false); + } else { + journalImpl.appendAddRecord(i, (byte) 1, new SimpleEncoding(50, (byte) 1), false); + } + queueDelete.offer(i); } finishedOK.incrementAndGet(); @@ -1153,7 +1173,14 @@ public class AlignedJournalImplTest extends ActiveMQTestBase { if (toDelete == null) { break; } - journalImpl.appendDeleteRecord(toDelete, false); + + if (transactional) { + journalImpl.appendDeleteRecordTransactional(toDelete, toDelete, new SimpleEncoding(50, (byte) 1)); + journalImpl.appendCommitRecord(i, false); + } else { + journalImpl.appendDeleteRecord(toDelete, false); + } + } finishedOK.incrementAndGet(); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4b47461f/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalAsyncTest.java ---------------------------------------------------------------------- diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalAsyncTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalAsyncTest.java index 41058c6..204600e 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalAsyncTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalAsyncTest.java @@ -81,6 +81,8 @@ public class JournalAsyncTest extends ActiveMQTestBase { journalImpl.appendAddRecordTransactional(1L, i, (byte) 1, new SimpleEncoding(1, (byte) 0)); } + journalImpl.debugWait(); + latch.countDown(); factory.setHoldCallbacks(false, null); if (isCommit) { @@ -115,8 +117,7 @@ public class JournalAsyncTest extends ActiveMQTestBase { } } - // If a callback error already arrived, we should just throw the exception - // right away + // If a callback error already arrived, we should just throw the exception right away @Test public void testPreviousError() throws Exception { final int JOURNAL_SIZE = 20000; @@ -128,6 +129,8 @@ public class JournalAsyncTest extends ActiveMQTestBase { journalImpl.appendAddRecordTransactional(1L, 1, (byte) 1, new SimpleEncoding(1, (byte) 0)); + journalImpl.debugWait(); + factory.flushAllCallbacks(); factory.setGenerateErrors(false); @@ -135,11 +138,11 @@ public class JournalAsyncTest extends ActiveMQTestBase { try { journalImpl.appendAddRecordTransactional(1L, 2, (byte) 1, new SimpleEncoding(1, (byte) 0)); - Assert.fail("Exception expected"); // An exception already happened in one - // of the elements on this transaction. - // We can't accept any more elements on - // the transaction + Assert.fail("Exception expected"); + // An exception already happened in one of the elements on this transaction. + // We can't accept any more elements on the transaction } catch (Exception ignored) { + } }
