gemmellr commented on code in PR #5498:
URL: https://github.com/apache/activemq-artemis/pull/5498#discussion_r1958454882
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java:
##########
@@ -392,20 +398,82 @@ private void pageLimitReleased() {
}
@Override
- public boolean lock(long timeout) {
+ public boolean readLock(long timeout) {
if (timeout == -1) {
- lock.writeLock().lock();
- return true;
+ while (true) {
+ try {
+ if (lock.readLock().tryLock(1, TimeUnit.SECONDS)) {
+ return true;
+ } else {
+ if (logger.isTraceEnabled()) {
+ logger.trace("Not able to read lock", new
Exception("lock"));
+ }
+ }
+ } catch (InterruptedException e) {
+ logger.warn(e.getMessage(), e);
+ Thread.currentThread().interrupt();
+ return false;
+ }
+ }
+ }
+ try {
+ if (lock.readLock().tryLock(timeout, TimeUnit.MILLISECONDS)) {
+ return true;
+ } else {
+ if (logger.isTraceEnabled()) {
+ logger.trace("Not able to read lock", new Exception("lock"));
+ }
+ return false;
+ }
+ } catch (InterruptedException e) {
+ logger.warn(e.getMessage(), e);
+ Thread.currentThread().interrupt();
+ return false;
+ }
+ }
+
+ @Override
+ public void readUnlock() {
+ lock.readLock().unlock();
+ }
+
+ @Override
+ public boolean writeLock(long timeout) {
+ if (timeout == -1) {
+ while (true) {
+ try {
+ if (lock.writeLock().tryLock(1, TimeUnit.SECONDS)) {
+ return true;
Review Comment:
There is an extra space before true
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java:
##########
@@ -96,7 +97,7 @@ public class PagingStoreImpl implements PagingStore {
private final PagingStoreFactory storeFactory;
// Used to schedule sync threads
- private final PageSyncTimer syncTimer;
+ private final PageTimedWriter timedWriter;
Review Comment:
The comment above this is now inaccurate.
As is another comment 10 lines further up:
>//it's being guarded by lock.writeLock().lock() and never read concurrently
>private long currentPageSize = 0;
Its seemingly typically not protected by a write lock at all anymore (unless
the 'timedWriter is null'), and the read lock usage (without any write lock,
when 'timedWriter is not null') seems to suggest it can be read concurrently.
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java:
##########
@@ -1262,8 +1324,16 @@ private boolean writePage(Message message,
Transaction tx,
RouteContextList listCtx,
Function<Message, Message> pageDecorator) throws
Exception {
- lock.writeLock().lock();
-
+ if (timedWriter == null) {
+ // in case timedWriter is not being used, we need to guarantee a
writeLock.
+ // because there's no way to upgrade from reading to writeLock
+ writeLock();
+ } else {
+ // if we are using a timedWritter, we can just use a readLock to
guarantee paging is not changed
+ // this will issue the writer on a different thread, and we can just
use a readLock
+ readLock();
Review Comment:
I dont understand this. When a timedWriter is around it _doesnt obviously
take the write lock anywhere_, and if it doesnt do so then how does taking the
readLock here stop it changing paging? The only thing obviously taking the
writeLock when adding new stuff is this writePage method, _if timedWriter is
null_.
What am I missing?
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java:
##########
@@ -392,20 +398,82 @@ private void pageLimitReleased() {
}
@Override
- public boolean lock(long timeout) {
+ public boolean readLock(long timeout) {
if (timeout == -1) {
- lock.writeLock().lock();
- return true;
+ while (true) {
+ try {
+ if (lock.readLock().tryLock(1, TimeUnit.SECONDS)) {
+ return true;
+ } else {
+ if (logger.isTraceEnabled()) {
+ logger.trace("Not able to read lock", new
Exception("lock"));
+ }
+ }
+ } catch (InterruptedException e) {
+ logger.warn(e.getMessage(), e);
+ Thread.currentThread().interrupt();
+ return false;
+ }
+ }
+ }
+ try {
Review Comment:
Using an _else_ branch would make this more readable.
As might extracting a sub-method and passing in a timeout (both branches
would do the same thing other than the while loop and the difference in
incremental vs actual timeout)
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryDatabase.java:
##########
@@ -153,7 +153,7 @@ public PageCursorProvider newCursorProvider(PagingStore
store,
@Override
public synchronized PagingStore newStore(final SimpleString address, final
AddressSettings settings) {
- return new PagingStoreImpl(address, scheduledExecutor, syncTimeout,
pagingManager, storageManager, null, this, address, settings,
executorFactory.getExecutor().setFair(true), ioExecutorFactory.getExecutor(),
syncNonTransactional);
Review Comment:
Is ioExecutorFactory used anymore?
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java:
##########
@@ -1413,8 +1500,18 @@ private void installPageTransaction(final Transaction
tx, final RouteContextList
return;
}
+ @Override
+ public boolean hasPendingIO() {
+ return timedWriter != null && timedWriter.hasPendingIO();
Review Comment:
When timedWriter is null, could there be writes _happening presently_ which
this doesnt account for?
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java:
##########
@@ -1413,8 +1500,18 @@ private void installPageTransaction(final Transaction
tx, final RouteContextList
return;
}
+ @Override
+ public boolean hasPendingIO() {
+ return timedWriter != null && timedWriter.hasPendingIO();
+ }
+
+ public PageTimedWriter getPageTimedWriter() {
+ return timedWriter;
+ }
+
@Override
public void destroy() throws Exception {
+ this.timedWriter.stop();
Review Comment:
Since timedWriter can be null, would seem like this should have a null check.
The "this." is inconsistent with the method above.
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java:
##########
@@ -204,19 +202,32 @@ default void addSize(int size) {
*/
boolean checkReleasedMemory();
+ default void writeLock() {
+ writeLock(-1);
+ }
/**
* Write lock the PagingStore.
*
* @param timeout milliseconds to wait for the lock. If value is {@literal
-1} then wait
* indefinitely.
* @return {@code true} if the lock was obtained, {@code false} otherwise
*/
- boolean lock(long timeout);
+ boolean writeLock(long timeout);
+
+ default void readLock() {
+ readLock(-1);
+ }
+ default boolean readLock(long timeout) {
+ return true;
+ }
+
+ default void readUnlock() {
+ }
Review Comment:
Are all these defaults just here to avoid needing to implement them on the
test impl?
It doesnt seem like its for compatibility given the breaking changes.
Given the safety implications of ever using these defaults, it seems like
they would be better just not defaulted and instead actually [not-] implemented
in any test bits as needed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact