This is an automated email from the ASF dual-hosted git repository. mblow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit d2aefdccbac0ca4dc2ad50f45480c6c57f94a50e Author: Michael Blow <[email protected]> AuthorDate: Wed Sep 30 20:12:36 2020 -0400 [NO ISSUE][*DB][ACT] Active stats synchronization Avoid locks on stats refresh requests on non-running active entities Change-Id: I458f15cd4b199576b3236762c6da904f086147fd Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/8185 Reviewed-by: Murtadha Hubail <[email protected]> Tested-by: Michael Blow <[email protected]> --- .../active/IActiveEntityEventSubscriber.java | 2 +- .../active/IActiveEntityEventsListener.java | 2 +- .../active/message/ActiveManagerMessage.java | 2 +- .../active/message/ActiveStatsRequestMessage.java | 10 ++++++++ .../app/active/ActiveEntityEventsListener.java | 25 ++++++++++++++----- .../asterix/test/active/ActiveStatsTest.java | 19 +++++++++++--- .../asterix/common/exceptions/ErrorCode.java | 1 + .../src/main/resources/asx_errormsg/en.properties | 1 + .../external/feed/watch/AbstractSubscriber.java | 29 +++++++++++++++++----- .../feed/watch/WaitForStateSubscriber.java | 2 +- 10 files changed, 74 insertions(+), 19 deletions(-) diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventSubscriber.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventSubscriber.java index e01d0a7..3c2f8e8 100644 --- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventSubscriber.java +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventSubscriber.java @@ -34,7 +34,7 @@ public interface IActiveEntityEventSubscriber { void notify(ActiveEvent event); /** - * Checkcs whether the subscriber is done receiving events + * Checks whether the subscriber is done receiving events * * @return */ diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java index ca610aa..8338b2b 100644 --- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java @@ -75,7 +75,7 @@ public interface IActiveEntityEventsListener { * refresh the stats * * @param timeout - * @throws HyracksDataException + * @throws HyracksDataException throws ASX3118 if active entity is not currently running */ void refreshStats(long timeout) throws HyracksDataException; diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java index b8c44a6..1a2af13 100644 --- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java @@ -56,6 +56,6 @@ public class ActiveManagerMessage extends CcIdentifiedMessage implements INcAddr @Override public String toString() { - return ActiveManagerMessage.class.getSimpleName(); + return getClass().getSimpleName(); } } diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveStatsRequestMessage.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveStatsRequestMessage.java index 0dbba52..117a68c 100644 --- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveStatsRequestMessage.java +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveStatsRequestMessage.java @@ -29,7 +29,17 @@ public class ActiveStatsRequestMessage extends ActiveManagerMessage { this.reqId = reqId; } + @Override + public boolean isWhispered() { + return true; + } + public long getReqId() { return reqId; } + + @Override + public String toString() { + return "ActiveStatsRequestMessage{" + "reqId=" + reqId + '}'; + } } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java index 882afc5..39ebdf7 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java @@ -18,6 +18,8 @@ */ package org.apache.asterix.app.active; +import static org.apache.asterix.common.exceptions.ErrorCode.ACTIVE_ENTITY_NOT_RUNNING; + import java.util.ArrayList; import java.util.Arrays; import java.util.EnumSet; @@ -93,7 +95,7 @@ public abstract class ActiveEntityEventsListener implements IActiveEntityControl protected ActivityState prevState; protected JobId jobId; protected volatile long statsTimestamp; - protected String stats; + protected volatile String stats; protected volatile boolean isFetchingStats; protected int numRegistered; protected int numDeRegistered; @@ -292,12 +294,17 @@ public abstract class ActiveEntityEventsListener implements IActiveEntityControl @Override public void refreshStats(long timeout) throws HyracksDataException { LOGGER.log(level, "refreshStats called"); + // first check state & if we are fetching outside of the lock- in the event we are recovering it may take some + // time to obtain the lock... + ensureRunning(); + if (isFetchingStats) { + LOGGER.log(level, "returning immediately since fetchingStats = " + isFetchingStats); + return; + } synchronized (this) { - if (state != ActivityState.RUNNING) { - LOGGER.log(level, "returning immediately since state = " + state); - notifySubscribers(statsUpdatedEvent); - return; - } else if (isFetchingStats) { + // now that we have the lock, again verify the state & ensure we are not already fetching new stats + ensureRunning(); + if (isFetchingStats) { LOGGER.log(level, "returning immediately since fetchingStats = " + isFetchingStats); return; } else { @@ -323,6 +330,12 @@ public abstract class ActiveEntityEventsListener implements IActiveEntityControl isFetchingStats = false; } + protected void ensureRunning() throws RuntimeDataException { + if (state != ActivityState.RUNNING) { + throw new RuntimeDataException(ACTIVE_ENTITY_NOT_RUNNING, runtimeName, String.valueOf(state).toLowerCase()); + } + } + protected synchronized void notifySubscribers(ActiveEvent event) { notifyAll(); Iterator<IActiveEntityEventSubscriber> it = subscribers.iterator(); diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java index 80dde8a..ca5e1e6 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java @@ -19,6 +19,9 @@ package org.apache.asterix.test.active; +import static org.apache.asterix.common.exceptions.ErrorCode.ACTIVE_ENTITY_NOT_RUNNING; +import static org.apache.asterix.common.exceptions.ErrorCode.ASTERIX; + import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -116,7 +119,13 @@ public class ActiveStatsTest { Assert.assertTrue(requestedStats.contains("N/A")); // Update stats of not-started job - eventsListener.refreshStats(1000); + try { + eventsListener.refreshStats(1000); + Assert.fail("expected exception on refresh stats on not-started job"); + } catch (HyracksDataException e) { + Assert.assertTrue("incorrect exception thrown (expected: ACTIVE_ENTITY_NOT_RUNNING, was: " + e, + e.matches(ASTERIX, ACTIVE_ENTITY_NOT_RUNNING)); + } requestedStats = eventsListener.getStats(); Assert.assertTrue(requestedStats.contains("N/A")); WaitForStateSubscriber startingSubscriber = @@ -127,8 +136,12 @@ public class ActiveStatsTest { startingSubscriber.sync(); activeJobNotificationHandler.notifyJobCreation(jobId, jobSpec); activeJobNotificationHandler.notifyJobStart(jobId); - eventsListener.refreshStats(1000); - requestedStats = eventsListener.getStats(); + try { + eventsListener.refreshStats(1000); + } catch (HyracksDataException e) { + Assert.assertTrue("incorrect exception thrown (expected: ACTIVE_ENTITY_NOT_RUNNING, was: " + e, + e.matches(ASTERIX, ACTIVE_ENTITY_NOT_RUNNING)); + } Assert.assertTrue(requestedStats.contains("N/A")); // Fake partition message and notify eventListener ActivePartitionMessage partitionMessage = diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java index d57c433..1d18cf6 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java @@ -316,6 +316,7 @@ public class ErrorCode { public static final int FAILED_TO_PARSE_METADATA = 3115; public static final int INPUT_DECODE_FAILURE = 3116; public static final int FAILED_TO_PARSE_MALFORMED_LOG_RECORD = 3117; + public static final int ACTIVE_ENTITY_NOT_RUNNING = 3118; // Lifecycle management errors public static final int DUPLICATE_PARTITION_ID = 4000; diff --git a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties index 5cf3968..a2780ba 100644 --- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties +++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties @@ -314,6 +314,7 @@ 3115 = Failed to parse record metadata 3116 = Failed to decode input 3117 = Failed to parse record, malformed log record +3118 = Active Entity %1$s is not running (it is %2$s) # Lifecycle management errors 4000 = Partition id %1$s for node %2$s already in use by node %3$s diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/AbstractSubscriber.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/AbstractSubscriber.java index 37b157e..726880d 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/AbstractSubscriber.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/AbstractSubscriber.java @@ -18,6 +18,8 @@ */ package org.apache.asterix.external.feed.watch; +import java.util.Objects; + import org.apache.asterix.active.IActiveEntityEventSubscriber; import org.apache.asterix.active.IActiveEntityEventsListener; import org.apache.hyracks.util.Span; @@ -25,11 +27,19 @@ import org.apache.hyracks.util.Span; public abstract class AbstractSubscriber implements IActiveEntityEventSubscriber { protected final IActiveEntityEventsListener listener; + private final Object lockObject; private volatile boolean done = false; private volatile Exception failure = null; + public AbstractSubscriber(IActiveEntityEventsListener listener, Object lockObject) { + Objects.requireNonNull(lockObject); + this.listener = listener; + this.lockObject = lockObject; + } + public AbstractSubscriber(IActiveEntityEventsListener listener) { this.listener = listener; + this.lockObject = this; } @Override @@ -38,28 +48,28 @@ public abstract class AbstractSubscriber implements IActiveEntityEventSubscriber } public void complete(Exception failure) { - synchronized (listener) { + synchronized (lockObject) { if (failure != null) { this.failure = failure; } done = true; - listener.notifyAll(); + lockObject.notifyAll(); } } @Override public void sync() throws InterruptedException { - synchronized (listener) { + synchronized (lockObject) { while (!done) { - listener.wait(); + lockObject.wait(); } } } public boolean sync(Span span) throws InterruptedException { - synchronized (listener) { + synchronized (lockObject) { while (!done) { - span.wait(listener); + span.wait(lockObject); if (done || span.elapsed()) { return done; } @@ -71,4 +81,11 @@ public abstract class AbstractSubscriber implements IActiveEntityEventSubscriber public Exception getFailure() { return failure; } + + protected void reset() { + synchronized (lockObject) { + done = false; + failure = null; + } + } } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/WaitForStateSubscriber.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/WaitForStateSubscriber.java index 818d826..4dc86ac 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/WaitForStateSubscriber.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/WaitForStateSubscriber.java @@ -29,7 +29,7 @@ public class WaitForStateSubscriber extends AbstractSubscriber { private final Set<ActivityState> targetStates; public WaitForStateSubscriber(IActiveEntityEventsListener listener, Set<ActivityState> targetStates) { - super(listener); + super(listener, listener); this.targetStates = targetStates; listener.subscribe(this); }
