scwhittle commented on code in PR #36750:
URL: https://github.com/apache/beam/pull/36750#discussion_r2549741694
##########
sdks/java/core/src/test/java/org/apache/beam/sdk/fn/splittabledofn/RestrictionTrackersTest.java:
##########
@@ -123,6 +160,16 @@ public void checkDone() throws IllegalStateException {}
public IsBounded isBounded() {
return IsBounded.BOUNDED;
}
+
+ public synchronized void releaseLock() {
+ blockTrySplit = false;
+ blockTryClaim = false;
+ notifyAll();
+ }
+
+ public synchronized boolean isBlocked() {
Review Comment:
nit: waitUntilBlocking? can hide the poll/sleep from test to keep it simpler
##########
sdks/java/core/src/test/java/org/apache/beam/sdk/fn/splittabledofn/RestrictionTrackersTest.java:
##########
@@ -131,4 +178,47 @@ public void testClaimObserversMaintainBacklogInterfaces() {
RestrictionTrackers.observe(new RestrictionTrackerWithProgress(),
null);
assertThat(hasSize, instanceOf(HasProgress.class));
}
+
+ private void testBlocking(String testCase) throws InterruptedException {
Review Comment:
nit: I'd remove this helper, more code is spent on the differences than
would be repeated in two separate tests.
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/fn/splittabledofn/RestrictionTrackers.java:
##########
@@ -101,8 +138,33 @@ protected RestrictionTrackerObserverWithProgress(
}
@Override
- public synchronized Progress getProgress() {
- return ((HasProgress) delegate).getProgress();
+ public Progress getProgress() {
+ return getProgress(FIRST_PROGRESS_TIMEOUT_SEC);
+ }
+
+ @VisibleForTesting
+ Progress getProgress(int timeOutSec) {
+ if (!hasInitialProgress) {
+ Progress progress = Progress.NONE;
+ try {
+ // lock can be held long by long-running tryClaim/trySplit. We
tolerate this scenario
Review Comment:
if we're just trying to avoid blocking on the initial tryClaim it seems that
this coudl be simpler, just a
volatile noProgress that defaults to true and that we we set to false after
tryClaim/trySplit is called on the delegate. I think you could keep all the
synchronization existing except this method to check the volatile first.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]