This is an automated email from the ASF dual-hosted git repository.
altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new c4de94e [BEAM-13430] Clean up tests that override the time of the JVM
new 6be2090 Merge pull request #16326 from
dpcollins-google/dontPolluteStatic
c4de94e is described below
commit c4de94ebfa80678af873d0a1d58ce6741753df23
Author: Daniel Collins <[email protected]>
AuthorDate: Wed Dec 22 15:49:28 2021 -0500
[BEAM-13430] Clean up tests that override the time of the JVM
---
.../fnexecution/control/RemoteExecutionTest.java | 88 +++++++++++-----------
.../splittabledofn/WatermarkEstimatorsTest.java | 56 ++++++++------
.../io/gcp/datastore/RampupThrottlingFnTest.java | 6 ++
3 files changed, 83 insertions(+), 67 deletions(-)
diff --git
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
index 8c9e150..5aa4568 100644
---
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
+++
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
@@ -1598,49 +1598,53 @@ public class RemoteExecutionTest implements
Serializable {
// Set the current system time to a fixed value to get stable values for
processing time timer
// output.
DateTimeUtils.setCurrentMillisFixed(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis()
+ 10000L);
-
- try (RemoteBundle bundle =
- processor.newBundle(
- outputReceivers,
- timerReceivers,
- StateRequestHandler.unsupported(),
- BundleProgressHandler.ignored(),
- null,
- null)) {
- Iterables.getOnlyElement(bundle.getInputReceivers().values())
- .accept(valueInGlobalWindow(KV.of("X", "X")));
- bundle
- .getTimerReceivers()
- .get(KV.of(eventTimerSpec.transformId(), eventTimerSpec.timerId()))
- .accept(timerForTest("Y", 1000L, 100L));
- bundle
- .getTimerReceivers()
- .get(KV.of(processingTimerSpec.transformId(),
processingTimerSpec.timerId()))
- .accept(timerForTest("Z", 2000L, 200L));
+ try {
+ try (RemoteBundle bundle =
+ processor.newBundle(
+ outputReceivers,
+ timerReceivers,
+ StateRequestHandler.unsupported(),
+ BundleProgressHandler.ignored(),
+ null,
+ null)) {
+ Iterables.getOnlyElement(bundle.getInputReceivers().values())
+ .accept(valueInGlobalWindow(KV.of("X", "X")));
+ bundle
+ .getTimerReceivers()
+ .get(KV.of(eventTimerSpec.transformId(), eventTimerSpec.timerId()))
+ .accept(timerForTest("Y", 1000L, 100L));
+ bundle
+ .getTimerReceivers()
+ .get(KV.of(processingTimerSpec.transformId(),
processingTimerSpec.timerId()))
+ .accept(timerForTest("Z", 2000L, 200L));
+ }
+ String mainOutputTransform =
+
Iterables.getOnlyElement(descriptor.getRemoteOutputCoders().keySet());
+ assertThat(
+ outputValues.get(mainOutputTransform),
+ containsInAnyOrder(
+ valueInGlobalWindow(KV.of("mainX", "")),
+ WindowedValue.timestampedValueInGlobalWindow(
+ KV.of("event", ""),
+
BoundedWindow.TIMESTAMP_MIN_VALUE.plus(Duration.millis(100L))),
+ WindowedValue.timestampedValueInGlobalWindow(
+ KV.of("processing", ""),
+
BoundedWindow.TIMESTAMP_MIN_VALUE.plus(Duration.millis(200L)))));
+ assertThat(
+ timerValues.get(KV.of(eventTimerSpec.transformId(),
eventTimerSpec.timerId())),
+ containsInAnyOrder(
+ timerForTest("X", 1L, 0L),
+ timerForTest("Y", 1011L, 100L),
+ timerForTest("Z", 2021L, 200L)));
+ assertThat(
+ timerValues.get(KV.of(processingTimerSpec.transformId(),
processingTimerSpec.timerId())),
+ containsInAnyOrder(
+ timerForTest("X", 10002L, 0L),
+ timerForTest("Y", 10012L, 100L),
+ timerForTest("Z", 10022L, 200L)));
+ } finally {
+ DateTimeUtils.setCurrentMillisSystem();
}
- String mainOutputTransform =
- Iterables.getOnlyElement(descriptor.getRemoteOutputCoders().keySet());
- assertThat(
- outputValues.get(mainOutputTransform),
- containsInAnyOrder(
- valueInGlobalWindow(KV.of("mainX", "")),
- WindowedValue.timestampedValueInGlobalWindow(
- KV.of("event", ""),
BoundedWindow.TIMESTAMP_MIN_VALUE.plus(Duration.millis(100L))),
- WindowedValue.timestampedValueInGlobalWindow(
- KV.of("processing", ""),
-
BoundedWindow.TIMESTAMP_MIN_VALUE.plus(Duration.millis(200L)))));
- assertThat(
- timerValues.get(KV.of(eventTimerSpec.transformId(),
eventTimerSpec.timerId())),
- containsInAnyOrder(
- timerForTest("X", 1L, 0L),
- timerForTest("Y", 1011L, 100L),
- timerForTest("Z", 2021L, 200L)));
- assertThat(
- timerValues.get(KV.of(processingTimerSpec.transformId(),
processingTimerSpec.timerId())),
- containsInAnyOrder(
- timerForTest("X", 10002L, 0L),
- timerForTest("Y", 10012L, 100L),
- timerForTest("Z", 10022L, 200L)));
}
@Test
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/WatermarkEstimatorsTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/WatermarkEstimatorsTest.java
index 762579d..b028f4d 100644
---
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/WatermarkEstimatorsTest.java
+++
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/WatermarkEstimatorsTest.java
@@ -59,33 +59,39 @@ public class WatermarkEstimatorsTest {
@Test
public void testWallTimeWatermarkEstimator() {
-
DateTimeUtils.setCurrentMillisFixed(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis());
- WatermarkEstimator<Instant> watermarkEstimator =
- new WatermarkEstimators.WallTime(new Instant());
- DateTimeUtils.setCurrentMillisFixed(
-
BoundedWindow.TIMESTAMP_MIN_VALUE.plus(Duration.millis(1)).getMillis());
- assertEquals(
- BoundedWindow.TIMESTAMP_MIN_VALUE.plus(Duration.millis(1)),
- watermarkEstimator.currentWatermark());
+ try {
+
DateTimeUtils.setCurrentMillisFixed(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis());
+ WatermarkEstimator<Instant> watermarkEstimator =
+ new WatermarkEstimators.WallTime(new Instant());
+ DateTimeUtils.setCurrentMillisFixed(
+
BoundedWindow.TIMESTAMP_MIN_VALUE.plus(Duration.millis(1)).getMillis());
+ assertEquals(
+ BoundedWindow.TIMESTAMP_MIN_VALUE.plus(Duration.millis(1)),
+ watermarkEstimator.currentWatermark());
- DateTimeUtils.setCurrentMillisFixed(
-
BoundedWindow.TIMESTAMP_MIN_VALUE.plus(Duration.millis(2)).getMillis());
- // Make sure that we don't mutate state even if the clock advanced
- assertEquals(
- BoundedWindow.TIMESTAMP_MIN_VALUE.plus(Duration.millis(1)),
watermarkEstimator.getState());
- assertEquals(
- BoundedWindow.TIMESTAMP_MIN_VALUE.plus(Duration.millis(2)),
- watermarkEstimator.currentWatermark());
- assertEquals(
- BoundedWindow.TIMESTAMP_MIN_VALUE.plus(Duration.millis(2)),
watermarkEstimator.getState());
+ DateTimeUtils.setCurrentMillisFixed(
+
BoundedWindow.TIMESTAMP_MIN_VALUE.plus(Duration.millis(2)).getMillis());
+ // Make sure that we don't mutate state even if the clock advanced
+ assertEquals(
+ BoundedWindow.TIMESTAMP_MIN_VALUE.plus(Duration.millis(1)),
+ watermarkEstimator.getState());
+ assertEquals(
+ BoundedWindow.TIMESTAMP_MIN_VALUE.plus(Duration.millis(2)),
+ watermarkEstimator.currentWatermark());
+ assertEquals(
+ BoundedWindow.TIMESTAMP_MIN_VALUE.plus(Duration.millis(2)),
+ watermarkEstimator.getState());
- // Handle the case if the clock ever goes backwards. Could happen if we
resumed processing
- // on a machine that had misconfigured clock or due to clock skew.
- DateTimeUtils.setCurrentMillisFixed(
-
BoundedWindow.TIMESTAMP_MIN_VALUE.plus(Duration.millis(1)).getMillis());
- assertEquals(
- BoundedWindow.TIMESTAMP_MIN_VALUE.plus(Duration.millis(2)),
- watermarkEstimator.currentWatermark());
+ // Handle the case if the clock ever goes backwards. Could happen if we
resumed processing
+ // on a machine that had misconfigured clock or due to clock skew.
+ DateTimeUtils.setCurrentMillisFixed(
+
BoundedWindow.TIMESTAMP_MIN_VALUE.plus(Duration.millis(1)).getMillis());
+ assertEquals(
+ BoundedWindow.TIMESTAMP_MIN_VALUE.plus(Duration.millis(2)),
+ watermarkEstimator.currentWatermark());
+ } finally {
+ DateTimeUtils.setCurrentMillisSystem();
+ }
}
@Test
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/RampupThrottlingFnTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/RampupThrottlingFnTest.java
index 940884b..bb1377c 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/RampupThrottlingFnTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/RampupThrottlingFnTest.java
@@ -34,6 +34,7 @@ import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Immutabl
import org.joda.time.DateTimeUtils;
import org.joda.time.Duration;
import org.joda.time.Instant;
+import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -77,6 +78,11 @@ public class RampupThrottlingFnTest {
rampupThrottlingFn.throttlingMsecs = mockCounter;
}
+ @After
+ public void tearDown() {
+ DateTimeUtils.setCurrentMillisSystem();
+ }
+
@Test
public void testRampupThrottler() throws Exception {
Map<Duration, Integer> rampupSchedule =