This is an automated email from the ASF dual-hosted git repository.

altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new c4de94e  [BEAM-13430] Clean up tests that override the time of the JVM
     new 6be2090  Merge pull request #16326 from 
dpcollins-google/dontPolluteStatic
c4de94e is described below

commit c4de94ebfa80678af873d0a1d58ce6741753df23
Author: Daniel Collins <[email protected]>
AuthorDate: Wed Dec 22 15:49:28 2021 -0500

    [BEAM-13430] Clean up tests that override the time of the JVM
---
 .../fnexecution/control/RemoteExecutionTest.java   | 88 +++++++++++-----------
 .../splittabledofn/WatermarkEstimatorsTest.java    | 56 ++++++++------
 .../io/gcp/datastore/RampupThrottlingFnTest.java   |  6 ++
 3 files changed, 83 insertions(+), 67 deletions(-)

diff --git 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
index 8c9e150..5aa4568 100644
--- 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
+++ 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
@@ -1598,49 +1598,53 @@ public class RemoteExecutionTest implements 
Serializable {
     // Set the current system time to a fixed value to get stable values for 
processing time timer
     // output.
     
DateTimeUtils.setCurrentMillisFixed(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis()
 + 10000L);
-
-    try (RemoteBundle bundle =
-        processor.newBundle(
-            outputReceivers,
-            timerReceivers,
-            StateRequestHandler.unsupported(),
-            BundleProgressHandler.ignored(),
-            null,
-            null)) {
-      Iterables.getOnlyElement(bundle.getInputReceivers().values())
-          .accept(valueInGlobalWindow(KV.of("X", "X")));
-      bundle
-          .getTimerReceivers()
-          .get(KV.of(eventTimerSpec.transformId(), eventTimerSpec.timerId()))
-          .accept(timerForTest("Y", 1000L, 100L));
-      bundle
-          .getTimerReceivers()
-          .get(KV.of(processingTimerSpec.transformId(), 
processingTimerSpec.timerId()))
-          .accept(timerForTest("Z", 2000L, 200L));
+    try {
+      try (RemoteBundle bundle =
+          processor.newBundle(
+              outputReceivers,
+              timerReceivers,
+              StateRequestHandler.unsupported(),
+              BundleProgressHandler.ignored(),
+              null,
+              null)) {
+        Iterables.getOnlyElement(bundle.getInputReceivers().values())
+            .accept(valueInGlobalWindow(KV.of("X", "X")));
+        bundle
+            .getTimerReceivers()
+            .get(KV.of(eventTimerSpec.transformId(), eventTimerSpec.timerId()))
+            .accept(timerForTest("Y", 1000L, 100L));
+        bundle
+            .getTimerReceivers()
+            .get(KV.of(processingTimerSpec.transformId(), 
processingTimerSpec.timerId()))
+            .accept(timerForTest("Z", 2000L, 200L));
+      }
+      String mainOutputTransform =
+          
Iterables.getOnlyElement(descriptor.getRemoteOutputCoders().keySet());
+      assertThat(
+          outputValues.get(mainOutputTransform),
+          containsInAnyOrder(
+              valueInGlobalWindow(KV.of("mainX", "")),
+              WindowedValue.timestampedValueInGlobalWindow(
+                  KV.of("event", ""),
+                  
BoundedWindow.TIMESTAMP_MIN_VALUE.plus(Duration.millis(100L))),
+              WindowedValue.timestampedValueInGlobalWindow(
+                  KV.of("processing", ""),
+                  
BoundedWindow.TIMESTAMP_MIN_VALUE.plus(Duration.millis(200L)))));
+      assertThat(
+          timerValues.get(KV.of(eventTimerSpec.transformId(), 
eventTimerSpec.timerId())),
+          containsInAnyOrder(
+              timerForTest("X", 1L, 0L),
+              timerForTest("Y", 1011L, 100L),
+              timerForTest("Z", 2021L, 200L)));
+      assertThat(
+          timerValues.get(KV.of(processingTimerSpec.transformId(), 
processingTimerSpec.timerId())),
+          containsInAnyOrder(
+              timerForTest("X", 10002L, 0L),
+              timerForTest("Y", 10012L, 100L),
+              timerForTest("Z", 10022L, 200L)));
+    } finally {
+      DateTimeUtils.setCurrentMillisSystem();
     }
-    String mainOutputTransform =
-        Iterables.getOnlyElement(descriptor.getRemoteOutputCoders().keySet());
-    assertThat(
-        outputValues.get(mainOutputTransform),
-        containsInAnyOrder(
-            valueInGlobalWindow(KV.of("mainX", "")),
-            WindowedValue.timestampedValueInGlobalWindow(
-                KV.of("event", ""), 
BoundedWindow.TIMESTAMP_MIN_VALUE.plus(Duration.millis(100L))),
-            WindowedValue.timestampedValueInGlobalWindow(
-                KV.of("processing", ""),
-                
BoundedWindow.TIMESTAMP_MIN_VALUE.plus(Duration.millis(200L)))));
-    assertThat(
-        timerValues.get(KV.of(eventTimerSpec.transformId(), 
eventTimerSpec.timerId())),
-        containsInAnyOrder(
-            timerForTest("X", 1L, 0L),
-            timerForTest("Y", 1011L, 100L),
-            timerForTest("Z", 2021L, 200L)));
-    assertThat(
-        timerValues.get(KV.of(processingTimerSpec.transformId(), 
processingTimerSpec.timerId())),
-        containsInAnyOrder(
-            timerForTest("X", 10002L, 0L),
-            timerForTest("Y", 10012L, 100L),
-            timerForTest("Z", 10022L, 200L)));
   }
 
   @Test
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/WatermarkEstimatorsTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/WatermarkEstimatorsTest.java
index 762579d..b028f4d 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/WatermarkEstimatorsTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/WatermarkEstimatorsTest.java
@@ -59,33 +59,39 @@ public class WatermarkEstimatorsTest {
 
   @Test
   public void testWallTimeWatermarkEstimator() {
-    
DateTimeUtils.setCurrentMillisFixed(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis());
-    WatermarkEstimator<Instant> watermarkEstimator =
-        new WatermarkEstimators.WallTime(new Instant());
-    DateTimeUtils.setCurrentMillisFixed(
-        
BoundedWindow.TIMESTAMP_MIN_VALUE.plus(Duration.millis(1)).getMillis());
-    assertEquals(
-        BoundedWindow.TIMESTAMP_MIN_VALUE.plus(Duration.millis(1)),
-        watermarkEstimator.currentWatermark());
+    try {
+      
DateTimeUtils.setCurrentMillisFixed(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis());
+      WatermarkEstimator<Instant> watermarkEstimator =
+          new WatermarkEstimators.WallTime(new Instant());
+      DateTimeUtils.setCurrentMillisFixed(
+          
BoundedWindow.TIMESTAMP_MIN_VALUE.plus(Duration.millis(1)).getMillis());
+      assertEquals(
+          BoundedWindow.TIMESTAMP_MIN_VALUE.plus(Duration.millis(1)),
+          watermarkEstimator.currentWatermark());
 
-    DateTimeUtils.setCurrentMillisFixed(
-        
BoundedWindow.TIMESTAMP_MIN_VALUE.plus(Duration.millis(2)).getMillis());
-    // Make sure that we don't mutate state even if the clock advanced
-    assertEquals(
-        BoundedWindow.TIMESTAMP_MIN_VALUE.plus(Duration.millis(1)), 
watermarkEstimator.getState());
-    assertEquals(
-        BoundedWindow.TIMESTAMP_MIN_VALUE.plus(Duration.millis(2)),
-        watermarkEstimator.currentWatermark());
-    assertEquals(
-        BoundedWindow.TIMESTAMP_MIN_VALUE.plus(Duration.millis(2)), 
watermarkEstimator.getState());
+      DateTimeUtils.setCurrentMillisFixed(
+          
BoundedWindow.TIMESTAMP_MIN_VALUE.plus(Duration.millis(2)).getMillis());
+      // Make sure that we don't mutate state even if the clock advanced
+      assertEquals(
+          BoundedWindow.TIMESTAMP_MIN_VALUE.plus(Duration.millis(1)),
+          watermarkEstimator.getState());
+      assertEquals(
+          BoundedWindow.TIMESTAMP_MIN_VALUE.plus(Duration.millis(2)),
+          watermarkEstimator.currentWatermark());
+      assertEquals(
+          BoundedWindow.TIMESTAMP_MIN_VALUE.plus(Duration.millis(2)),
+          watermarkEstimator.getState());
 
-    // Handle the case if the clock ever goes backwards. Could happen if we 
resumed processing
-    // on a machine that had misconfigured clock or due to clock skew.
-    DateTimeUtils.setCurrentMillisFixed(
-        
BoundedWindow.TIMESTAMP_MIN_VALUE.plus(Duration.millis(1)).getMillis());
-    assertEquals(
-        BoundedWindow.TIMESTAMP_MIN_VALUE.plus(Duration.millis(2)),
-        watermarkEstimator.currentWatermark());
+      // Handle the case if the clock ever goes backwards. Could happen if we 
resumed processing
+      // on a machine that had misconfigured clock or due to clock skew.
+      DateTimeUtils.setCurrentMillisFixed(
+          
BoundedWindow.TIMESTAMP_MIN_VALUE.plus(Duration.millis(1)).getMillis());
+      assertEquals(
+          BoundedWindow.TIMESTAMP_MIN_VALUE.plus(Duration.millis(2)),
+          watermarkEstimator.currentWatermark());
+    } finally {
+      DateTimeUtils.setCurrentMillisSystem();
+    }
   }
 
   @Test
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/RampupThrottlingFnTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/RampupThrottlingFnTest.java
index 940884b..bb1377c 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/RampupThrottlingFnTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/RampupThrottlingFnTest.java
@@ -34,6 +34,7 @@ import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Immutabl
 import org.joda.time.DateTimeUtils;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -77,6 +78,11 @@ public class RampupThrottlingFnTest {
     rampupThrottlingFn.throttlingMsecs = mockCounter;
   }
 
+  @After
+  public void tearDown() {
+    DateTimeUtils.setCurrentMillisSystem();
+  }
+
   @Test
   public void testRampupThrottler() throws Exception {
     Map<Duration, Integer> rampupSchedule =

Reply via email to