This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new bb1104a  [BEAM-13456] Rollback #15890 to fix timeout in Java 
PostCommit (#16257)
bb1104a is described below

commit bb1104a82878d5e3b48210089f6d00e9d460dfea
Author: Brian Hulette <[email protected]>
AuthorDate: Thu Dec 16 13:30:51 2021 -0800

    [BEAM-13456] Rollback #15890 to fix timeout in Java PostCommit (#16257)
    
    * Revert "[BEAM-11936] Fix errorprone warnings (#15890)"
    
    This reverts commit 06a5e67332aae53ea90dedb4ef6421c2a7d65035.
    
    * spotless
---
 .../org/apache/beam/gradle/BeamModulePlugin.groovy |   6 +
 .../apache/beam/examples/snippets/Snippets.java    |   8 --
 .../core/construction/PTransformMatchersTest.java  |  21 ++++
 .../beam/runners/direct/NanosOffsetClock.java      |   6 +-
 .../runners/direct/TransformEvaluatorRegistry.java |  16 +++
 .../runners/direct/UnboundedReadDeduplicator.java  |   3 +-
 .../runners/direct/CloningBundleFactoryTest.java   |  67 +++++++++++
 .../flink/FlinkStreamingPipelineTranslator.java    |   4 +-
 .../flink/FlinkStreamingTransformTranslators.java  |  17 +++
 .../wrappers/streaming/DoFnOperator.java           |   2 +-
 .../state/FlinkBroadcastStateInternals.java        | 131 +++++++++++++++++++++
 .../streaming/ExecutableStageDoFnOperatorTest.java |   1 -
 .../beam/runners/dataflow/DataflowRunner.java      | 110 +++++++++++++++++
 .../runners/dataflow/DataflowPipelineJobTest.java  |  25 ++++
 .../beam/runners/dataflow/worker/ReaderCache.java  |   3 +-
 .../beam/runners/dataflow/worker/StateFetcher.java |   4 +-
 .../dataflow/worker/StreamingDataflowWorker.java   |   2 +-
 .../fn/data/RemoteGrpcPortWriteOperation.java      |   6 +-
 .../common/worker/CachingShuffleBatchReader.java   |   4 +-
 .../control/DefaultJobBundleFactory.java           |  16 +--
 .../fnexecution/control/RemoteExecutionTest.java   |   2 +
 .../beam/runners/spark/io/MicrobatchSource.java    |   2 +-
 .../translation/utils/SideInputStorage.java        |   4 +-
 .../runners/spark/util/GlobalWatermarkHolder.java  |   4 +-
 .../beam/runners/spark/util/SideInputStorage.java  |   4 +-
 .../src/main/java/org/apache/beam/sdk/io/Read.java |   3 +-
 .../org/apache/beam/sdk/schemas/SchemaCoder.java   |  20 ++++
 .../apache/beam/sdk/values/PCollectionViews.java   |  59 ++++++++++
 .../apache/beam/sdk/coders/CoderRegistryTest.java  |   4 +
 .../apache/beam/sdk/testing/ExpectedLogsTest.java  |  10 +-
 .../beam/sdk/testing/SystemNanoTimeSleeper.java    |   4 +-
 .../sdk/transforms/reflect/DoFnSignaturesTest.java |   9 ++
 .../GrowableOffsetRangeTrackerTest.java            |   2 +-
 .../core/translate/TimestampExtractTransform.java  |   8 ++
 .../sql/meta/provider/kafka/BeamKafkaTable.java    |   3 +-
 .../org/apache/beam/sdk/fn/CancellableQueue.java   |   4 +-
 .../harness/state/StateFetchingIteratorsTest.java  |   2 +-
 .../bigquery/StorageApiWriteUnshardedRecords.java  |   3 +-
 .../bigquery/StorageApiWritesShardedRecords.java   |   3 +-
 .../internal/LimitingTopicBacklogReader.java       |   6 +-
 .../beam/sdk/io/gcp/spanner/SpannerAccessor.java   |  24 ++++
 .../sdk/io/hadoop/format/TestRowDBWritable.java    |  10 ++
 .../beam/sdk/io/kafka/KafkaExactlyOnceSink.java    |   3 +-
 .../beam/sdk/io/kafka/KafkaUnboundedReader.java    |   4 +-
 .../org/apache/beam/sdk/io/xml/XmlSourceTest.java  |   4 +-
 45 files changed, 592 insertions(+), 61 deletions(-)

diff --git 
a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy 
b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index 60d76c9..5f1a2a0 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -1140,12 +1140,18 @@ class BeamModulePlugin implements Plugin<Project> {
         options.errorprone.errorproneArgs.add("-Xep:EqualsGetClass:OFF")
         options.errorprone.errorproneArgs.add("-Xep:EqualsUnsafeCast:OFF")
         options.errorprone.errorproneArgs.add("-Xep:ExtendsAutoValue:OFF")
+        
options.errorprone.errorproneArgs.add("-Xep:FloatingPointAssertionWithinEpsilon:OFF")
         
options.errorprone.errorproneArgs.add("-Xep:JavaTimeDefaultTimeZone:OFF")
+        options.errorprone.errorproneArgs.add("-Xep:LockNotBeforeTry:OFF")
         
options.errorprone.errorproneArgs.add("-Xep:MixedMutabilityReturnType:OFF")
+        
options.errorprone.errorproneArgs.add("-Xep:PreferJavaTimeOverload:OFF")
+        options.errorprone.errorproneArgs.add("-Xep:ModifiedButNotUsed:OFF")
         options.errorprone.errorproneArgs.add("-Xep:ThreadPriorityCheck:OFF")
         
options.errorprone.errorproneArgs.add("-Xep:TimeUnitConversionChecker:OFF")
         options.errorprone.errorproneArgs.add("-Xep:UndefinedEquals:OFF")
         options.errorprone.errorproneArgs.add("-Xep:UnnecessaryLambda:OFF")
+        options.errorprone.errorproneArgs.add("-Xep:UnusedVariable:OFF")
+        options.errorprone.errorproneArgs.add("-Xep:UnusedNestedClass:OFF")
         
options.errorprone.errorproneArgs.add("-Xep:UnsafeReflectiveConstructionCast:OFF")
       }
 
diff --git 
a/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java 
b/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java
index 1bde081..d2e78fe 100644
--- 
a/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java
+++ 
b/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java
@@ -191,7 +191,6 @@ public class Snippets {
     }
 
     {
-      @SuppressWarnings("ModifiedButNotUsed")
       // [START BigQueryDataTypes]
       TableRow row = new TableRow();
       row.set("string", "abc");
@@ -1175,7 +1174,6 @@ public class Snippets {
     }
   }
 
-  @SuppressWarnings("unused")
   private static class BundleFinalization {
     private static class BundleFinalizationDoFn extends DoFn<String, Integer> {
       // [START BundleFinalize]
@@ -1193,7 +1191,6 @@ public class Snippets {
     }
   }
 
-  @SuppressWarnings("unused")
   private static class SplittableDoFn {
 
     private static void seekToNextRecordBoundaryInFile(
@@ -1233,7 +1230,6 @@ public class Snippets {
     }
     // [END SDF_BasicExample]
 
-    @SuppressWarnings("unused")
     private static class BasicExampleWithInitialSplitting extends 
FileToWordsFn {
       // [START SDF_BasicExampleWithSplitting]
       void splitRestriction(
@@ -1252,7 +1248,6 @@ public class Snippets {
       // [END SDF_BasicExampleWithSplitting]
     }
 
-    @SuppressWarnings("unused")
     private static class BasicExampleWithBadTryClaimLoop extends DoFn<String, 
Integer> {
       // [START SDF_BadTryClaimLoop]
       @ProcessElement
@@ -1276,7 +1271,6 @@ public class Snippets {
       // [END SDF_BadTryClaimLoop]
     }
 
-    @SuppressWarnings("unused")
     private static class CustomWatermarkEstimatorExample extends DoFn<String, 
Integer> {
       private static Instant currentWatermark = Instant.now();
 
@@ -1342,7 +1336,6 @@ public class Snippets {
     }
     // [END SDF_CustomWatermarkEstimator]
 
-    @SuppressWarnings("unused")
     private static class UserInitiatedCheckpointExample extends DoFn<String, 
Integer> {
       public static class ThrottlingException extends Exception {}
 
@@ -1405,7 +1398,6 @@ public class Snippets {
       // [END SDF_Truncate]
     }
 
-    @SuppressWarnings("unused")
     private static class GetSizeExample extends DoFn<String, Integer> {
       // [START SDF_GetSize]
       @GetSize
diff --git 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
index 185b52d..35762bb 100644
--- 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
+++ 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
@@ -57,7 +57,9 @@ import 
org.apache.beam.sdk.transforms.View.CreatePCollectionView;
 import org.apache.beam.sdk.transforms.ViewFn;
 import org.apache.beam.sdk.transforms.resourcehints.ResourceHints;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
@@ -72,6 +74,7 @@ import org.apache.beam.sdk.values.TypeDescriptors;
 import org.apache.beam.sdk.values.WindowingStrategy;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.checkerframework.checker.nullness.qual.Nullable;
 import org.hamcrest.Matchers;
 import org.joda.time.Duration;
 import org.junit.Rule;
@@ -595,4 +598,22 @@ public class PTransformMatchersTest implements 
Serializable {
         ResourceHints.create(),
         p);
   }
+
+  private static class FakeFilenamePolicy extends FilenamePolicy {
+    @Override
+    public ResourceId windowedFilename(
+        int shardNumber,
+        int numShards,
+        BoundedWindow window,
+        PaneInfo paneInfo,
+        FileBasedSink.OutputFileHints outputFileHints) {
+      throw new UnsupportedOperationException("should not be called");
+    }
+
+    @Override
+    public @Nullable ResourceId unwindowedFilename(
+        int shardNumber, int numShards, FileBasedSink.OutputFileHints 
outputFileHints) {
+      throw new UnsupportedOperationException("should not be called");
+    }
+  }
 }
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/NanosOffsetClock.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/NanosOffsetClock.java
index 286d3d9..f26c907 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/NanosOffsetClock.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/NanosOffsetClock.java
@@ -17,7 +17,7 @@
  */
 package org.apache.beam.runners.direct;
 
-import java.time.Duration;
+import java.util.concurrent.TimeUnit;
 import org.joda.time.Instant;
 
 /** A {@link Clock} that uses {@link System#nanoTime()} to track the progress 
of time. */
@@ -37,6 +37,8 @@ class NanosOffsetClock implements Clock {
   @Override
   public Instant now() {
     return new Instant(
-        baseMillis + Duration.ofNanos(System.nanoTime() - 
nanosAtBaseMillis).toMillis());
+        baseMillis
+            + TimeUnit.MILLISECONDS.convert(
+                System.nanoTime() - nanosAtBaseMillis, TimeUnit.NANOSECONDS));
   }
 }
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
index 6f750d4..3d96fc7 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
@@ -39,6 +39,7 @@ import java.util.Collection;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems;
+import 
org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems.ProcessElements;
 import org.apache.beam.runners.core.construction.PTransformTranslation;
 import 
org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator;
 import 
org.apache.beam.runners.core.construction.TransformPayloadTranslatorRegistrar;
@@ -139,6 +140,21 @@ class TransformEvaluatorRegistry {
     }
   }
 
+  /**
+   * A translator just to vend the URN. This will need to be moved to 
runners-core-construction-java
+   * once SDF is reorganized appropriately.
+   */
+  private static class SplittableParDoProcessElementsTranslator
+      extends TransformPayloadTranslator.NotSerializable<ProcessElements<?, ?, 
?, ?, ?>> {
+
+    private SplittableParDoProcessElementsTranslator() {}
+
+    @Override
+    public String getUrn(ProcessElements<?, ?, ?, ?, ?> transform) {
+      return SPLITTABLE_PROCESS_URN;
+    }
+  }
+
   // the TransformEvaluatorFactories can construct instances of all generic 
types of transform,
   // so all instances of a primitive can be handled with the same evaluator 
factory.
   private final Map<String, TransformEvaluatorFactory> factories;
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadDeduplicator.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadDeduplicator.java
index 125c026..b222698 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadDeduplicator.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadDeduplicator.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.runners.direct;
 
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import 
org.apache.beam.runners.core.construction.SplittableParDo.PrimitiveUnboundedRead;
 import org.apache.beam.runners.local.StructuralKey;
@@ -73,7 +74,7 @@ interface UnboundedReadDeduplicator {
     private CachedIdDeduplicator() {
       ids =
           CacheBuilder.newBuilder()
-              
.expireAfterAccess(java.time.Duration.ofMillis(MAX_RETENTION_SINCE_ACCESS))
+              .expireAfterAccess(MAX_RETENTION_SINCE_ACCESS, 
TimeUnit.MILLISECONDS)
               .maximumSize(100_000L)
               .build(new TrueBooleanLoader());
     }
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java
index 6d7a975..6a6d7df 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java
@@ -36,7 +36,9 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.SimpleFunction;
 import org.apache.beam.sdk.transforms.WithKeys;
 import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -189,4 +191,69 @@ public class CloningBundleFactoryTest {
       throw new CoderException("Decode not allowed");
     }
   }
+
+  private static class RecordStructuralValueCoder extends AtomicCoder<Record> {
+    @Override
+    public void encode(Record value, OutputStream outStream) throws 
CoderException, IOException {}
+
+    @Override
+    public Record decode(InputStream inStream) throws CoderException, 
IOException {
+      return new Record() {
+        @Override
+        public String toString() {
+          return "DecodedRecord";
+        }
+      };
+    }
+
+    @Override
+    public boolean consistentWithEquals() {
+      return true;
+    }
+
+    @Override
+    public Object structuralValue(Record value) {
+      return value;
+    }
+  }
+
+  private static class RecordNotConsistentWithEqualsStructuralValueCoder
+      extends AtomicCoder<Record> {
+    @Override
+    public void encode(Record value, OutputStream outStream) throws 
CoderException, IOException {}
+
+    @Override
+    public Record decode(InputStream inStream) throws CoderException, 
IOException {
+      return new Record() {
+        @Override
+        public String toString() {
+          return "DecodedRecord";
+        }
+      };
+    }
+
+    @Override
+    public boolean consistentWithEquals() {
+      return false;
+    }
+
+    @Override
+    public Object structuralValue(Record value) {
+      return value;
+    }
+  }
+
+  private static class IdentityDoFn extends DoFn<Record, Record> {
+    @ProcessElement
+    public void proc(ProcessContext ctxt) {
+      ctxt.output(ctxt.element());
+    }
+  }
+
+  private static class SimpleIdentity extends SimpleFunction<Record, Record> {
+    @Override
+    public Record apply(Record input) {
+      return input;
+    }
+  }
 }
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
index bcb3883..b7f99b8 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
@@ -21,13 +21,13 @@ import static 
org.apache.beam.runners.core.construction.PTransformTranslation.WR
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.time.Duration;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
 import org.apache.beam.runners.core.construction.PTransformReplacements;
 import org.apache.beam.runners.core.construction.PTransformTranslation;
 import org.apache.beam.runners.core.construction.ReplacementOutputs;
@@ -368,7 +368,7 @@ class FlinkStreamingPipelineTranslator extends 
FlinkPipelineTranslator {
         cache =
             CacheBuilder.newBuilder()
                 .maximumSize(CACHE_MAX_SIZE)
-                .expireAfterAccess(Duration.ofSeconds(CACHE_EXPIRE_SECONDS))
+                .expireAfterAccess(CACHE_EXPIRE_SECONDS, TimeUnit.SECONDS)
                 .build();
       }
 
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
index 18ece76..9379ef5 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
@@ -1417,6 +1417,23 @@ class FlinkStreamingTransformTranslators {
     }
   }
 
+  /**
+   * A translator just to vend the URN. This will need to be moved to 
runners-core-construction-java
+   * once SDF is reorganized appropriately.
+   */
+  private static class SplittableParDoProcessElementsTranslator
+      extends PTransformTranslation.TransformPayloadTranslator.NotSerializable<
+          SplittableParDoViaKeyedWorkItems.ProcessElements<?, ?, ?, ?, ?>> {
+
+    private SplittableParDoProcessElementsTranslator() {}
+
+    @Override
+    public String getUrn(
+        SplittableParDoViaKeyedWorkItems.ProcessElements<?, ?, ?, ?, ?> 
transform) {
+      return SPLITTABLE_PROCESS_URN;
+    }
+  }
+
   /** Registers classes specialized to the Flink runner. */
   @AutoService(TransformPayloadTranslatorRegistrar.class)
   public static class FlinkTransformsRegistrar implements 
TransformPayloadTranslatorRegistrar {
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index f78f621..9b2a8e5 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -1133,8 +1133,8 @@ public class DoFnOperator<InputT, OutputT>
     }
 
     private void buffer(KV<Integer, WindowedValue<?>> taggedValue) {
-      bufferLock.lock();
       try {
+        bufferLock.lock();
         pushedBackElementsHandler.pushBack(taggedValue);
       } catch (Exception e) {
         throw new RuntimeException("Couldn't pushback element.", e);
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java
index 7b89e01..e6c47ed 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java
@@ -553,6 +553,137 @@ public class FlinkBroadcastStateInternals<K> implements 
StateInternals {
     }
   }
 
+  private class FlinkKeyedCombiningState<K2, InputT, AccumT, OutputT>
+      extends AbstractBroadcastState<AccumT> implements CombiningState<InputT, 
AccumT, OutputT> {
+
+    private final StateNamespace namespace;
+    private final StateTag<CombiningState<InputT, AccumT, OutputT>> address;
+    private final Combine.CombineFn<InputT, AccumT, OutputT> combineFn;
+
+    FlinkKeyedCombiningState(
+        OperatorStateBackend flinkStateBackend,
+        StateTag<CombiningState<InputT, AccumT, OutputT>> address,
+        Combine.CombineFn<InputT, AccumT, OutputT> combineFn,
+        StateNamespace namespace,
+        Coder<AccumT> accumCoder,
+        FlinkBroadcastStateInternals<K2> flinkStateInternals,
+        PipelineOptions pipelineOptions) {
+      super(flinkStateBackend, address.getId(), namespace, accumCoder, 
pipelineOptions);
+
+      this.namespace = namespace;
+      this.address = address;
+      this.combineFn = combineFn;
+    }
+
+    @Override
+    public CombiningState<InputT, AccumT, OutputT> readLater() {
+      return this;
+    }
+
+    @Override
+    public void add(InputT value) {
+      try {
+        AccumT current = readInternal();
+        if (current == null) {
+          current = combineFn.createAccumulator();
+        }
+        current = combineFn.addInput(current, value);
+        writeInternal(current);
+      } catch (Exception e) {
+        throw new RuntimeException("Error adding to state.", e);
+      }
+    }
+
+    @Override
+    public void addAccum(AccumT accum) {
+      try {
+        AccumT current = readInternal();
+        if (current == null) {
+          writeInternal(accum);
+        } else {
+          current = combineFn.mergeAccumulators(Arrays.asList(current, accum));
+          writeInternal(current);
+        }
+      } catch (Exception e) {
+        throw new RuntimeException("Error adding to state.", e);
+      }
+    }
+
+    @Override
+    public AccumT getAccum() {
+      try {
+        AccumT accum = readInternal();
+        return accum != null ? accum : combineFn.createAccumulator();
+      } catch (Exception e) {
+        throw new RuntimeException("Error reading state.", e);
+      }
+    }
+
+    @Override
+    public AccumT mergeAccumulators(Iterable<AccumT> accumulators) {
+      return combineFn.mergeAccumulators(accumulators);
+    }
+
+    @Override
+    public OutputT read() {
+      try {
+        AccumT accum = readInternal();
+        if (accum != null) {
+          return combineFn.extractOutput(accum);
+        } else {
+          return combineFn.extractOutput(combineFn.createAccumulator());
+        }
+      } catch (Exception e) {
+        throw new RuntimeException("Error reading state.", e);
+      }
+    }
+
+    @Override
+    public ReadableState<Boolean> isEmpty() {
+      return new ReadableState<Boolean>() {
+        @Override
+        public Boolean read() {
+          try {
+            return readInternal() == null;
+          } catch (Exception e) {
+            throw new RuntimeException("Error reading state.", e);
+          }
+        }
+
+        @Override
+        public ReadableState<Boolean> readLater() {
+          return this;
+        }
+      };
+    }
+
+    @Override
+    public void clear() {
+      clearInternal();
+    }
+
+    @Override
+    public boolean equals(@Nullable Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+
+      FlinkKeyedCombiningState<?, ?, ?, ?> that = (FlinkKeyedCombiningState<?, 
?, ?, ?>) o;
+
+      return namespace.equals(that.namespace) && address.equals(that.address);
+    }
+
+    @Override
+    public int hashCode() {
+      int result = namespace.hashCode();
+      result = 31 * result + address.hashCode();
+      return result;
+    }
+  }
+
   private class FlinkCombiningStateWithContext<K2, InputT, AccumT, OutputT>
       extends AbstractBroadcastState<AccumT> implements CombiningState<InputT, 
AccumT, OutputT> {
 
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java
index af609e9..4d27af6 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java
@@ -637,7 +637,6 @@ public class ExecutableStageDoFnOperatorTest {
     assertThat(statefulDoFnRunner, instanceOf(StatefulDoFnRunner.class));
   }
 
-  @SuppressWarnings("LockNotBeforeTry")
   @Test
   public void testEnsureStateCleanupWithKeyedInputCleanupTimer() {
     InMemoryTimerInternals inMemoryTimerInternals = new 
InMemoryTimerInternals();
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 212e03a..0f23d61 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -19,6 +19,7 @@ package org.apache.beam.runners.dataflow;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static 
org.apache.beam.runners.core.construction.resources.PipelineResources.detectClassPathResourcesToStage;
+import static org.apache.beam.sdk.options.ExperimentalOptions.hasExperiment;
 import static org.apache.beam.sdk.util.CoderUtils.encodeToByteArray;
 import static org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray;
 import static org.apache.beam.sdk.util.StringUtils.byteArrayToJsonString;
@@ -48,6 +49,7 @@ import java.io.PrintWriter;
 import java.nio.channels.Channels;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
@@ -61,6 +63,7 @@ import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.runners.core.construction.BeamUrns;
+import org.apache.beam.runners.core.construction.CoderTranslation;
 import org.apache.beam.runners.core.construction.DeduplicatedFlattenFactory;
 import org.apache.beam.runners.core.construction.EmptyFlattenAsCreateFactory;
 import org.apache.beam.runners.core.construction.Environments;
@@ -68,6 +71,7 @@ import org.apache.beam.runners.core.construction.External;
 import org.apache.beam.runners.core.construction.PTransformMatchers;
 import org.apache.beam.runners.core.construction.PTransformReplacements;
 import org.apache.beam.runners.core.construction.PipelineTranslation;
+import org.apache.beam.runners.core.construction.RehydratedComponents;
 import org.apache.beam.runners.core.construction.ReplacementOutputs;
 import org.apache.beam.runners.core.construction.SdkComponents;
 import 
org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory;
@@ -126,6 +130,7 @@ import org.apache.beam.sdk.state.SetState;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.Combine.GroupedValues;
+import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.GroupIntoBatches;
 import org.apache.beam.sdk.transforms.Impulse;
@@ -140,6 +145,7 @@ import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.util.InstanceBuilder;
 import org.apache.beam.sdk.util.MimeTypes;
 import org.apache.beam.sdk.util.NameUtils;
@@ -1777,6 +1783,110 @@ public class DataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
 
   // 
================================================================================
 
+  /**
+   * A PTranform override factory which maps Create.Values PTransforms for 
streaming pipelines into
+   * a Dataflow specific variant.
+   */
+  private static class StreamingFnApiCreateOverrideFactory<T>
+      implements PTransformOverrideFactory<PBegin, PCollection<T>, 
Create.Values<T>> {
+
+    @Override
+    public PTransformReplacement<PBegin, PCollection<T>> 
getReplacementTransform(
+        AppliedPTransform<PBegin, PCollection<T>, Create.Values<T>> transform) 
{
+      Create.Values<T> original = transform.getTransform();
+      PCollection<T> output =
+          (PCollection) 
Iterables.getOnlyElement(transform.getOutputs().values());
+      return PTransformReplacement.of(
+          transform.getPipeline().begin(), new 
StreamingFnApiCreate<>(original, output));
+    }
+
+    @Override
+    public Map<PCollection<?>, ReplacementOutput> mapOutputs(
+        Map<TupleTag<?>, PCollection<?>> outputs, PCollection<T> newOutput) {
+      return ReplacementOutputs.singleton(outputs, newOutput);
+    }
+  }
+
+  /**
+   * Specialized implementation for {@link 
org.apache.beam.sdk.transforms.Create.Values
+   * Create.Values} for the Dataflow runner in streaming mode over the Fn API.
+   */
+  private static class StreamingFnApiCreate<T> extends PTransform<PBegin, 
PCollection<T>> {
+
+    private final Create.Values<T> transform;
+    private final transient PCollection<T> originalOutput;
+
+    private StreamingFnApiCreate(Create.Values<T> transform, PCollection<T> 
originalOutput) {
+      this.transform = transform;
+      this.originalOutput = originalOutput;
+    }
+
+    @Override
+    public final PCollection<T> expand(PBegin input) {
+      try {
+        PCollection<T> pc =
+            Pipeline.applyTransform(input, Impulse.create())
+                .apply(
+                    ParDo.of(
+                        DecodeAndEmitDoFn.fromIterable(
+                            transform.getElements(), 
originalOutput.getCoder())));
+        pc.setCoder(originalOutput.getCoder());
+        return pc;
+      } catch (IOException e) {
+        throw new IllegalStateException("Unable to encode elements.", e);
+      }
+    }
+
+    /**
+     * A DoFn which stores encoded versions of elements and a representation 
of a Coder capable of
+     * decoding those elements.
+     *
+     * <p>TODO: BEAM-2422 - Make this a SplittableDoFn.
+     */
+    private static class DecodeAndEmitDoFn<T> extends DoFn<byte[], T> {
+
+      public static <T> DecodeAndEmitDoFn<T> fromIterable(Iterable<T> 
elements, Coder<T> elemCoder)
+          throws IOException {
+        ImmutableList.Builder<byte[]> allElementsBytes = 
ImmutableList.builder();
+        for (T element : elements) {
+          byte[] bytes = encodeToByteArray(elemCoder, element);
+          allElementsBytes.add(bytes);
+        }
+        return new DecodeAndEmitDoFn<>(allElementsBytes.build(), elemCoder);
+      }
+
+      private final Collection<byte[]> elements;
+      private final RunnerApi.MessageWithComponents coderSpec;
+
+      // lazily initialized by parsing coderSpec
+      private transient Coder<T> coder;
+
+      private Coder<T> getCoder() throws IOException {
+        if (coder == null) {
+          coder =
+              (Coder)
+                  CoderTranslation.fromProto(
+                      coderSpec.getCoder(),
+                      
RehydratedComponents.forComponents(coderSpec.getComponents()),
+                      CoderTranslation.TranslationContext.DEFAULT);
+        }
+        return coder;
+      }
+
+      private DecodeAndEmitDoFn(Collection<byte[]> elements, Coder<T> coder) 
throws IOException {
+        this.elements = elements;
+        this.coderSpec = CoderTranslation.toProto(coder);
+      }
+
+      @ProcessElement
+      public void processElement(ProcessContext context) throws IOException {
+        for (byte[] element : elements) {
+          context.output(CoderUtils.decodeFromByteArray(getCoder(), element));
+        }
+      }
+    }
+  }
+
   private static class SingleOutputExpandableTransformTranslator
       implements TransformTranslator<External.SingleOutputExpandableTransform> 
{
     @Override
diff --git 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
index d45b3c6..a9e8976 100644
--- 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
+++ 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
@@ -40,6 +40,8 @@ import com.google.api.services.dataflow.model.Job;
 import com.google.api.services.dataflow.model.JobMessage;
 import java.io.IOException;
 import java.net.SocketTimeoutException;
+import java.util.List;
+import java.util.NavigableMap;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import org.apache.beam.runners.dataflow.util.MonitoringUtil;
@@ -53,6 +55,7 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.ExpectedLogs;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.junit.Before;
@@ -523,6 +526,28 @@ public class DataflowPipelineJobTest {
     return message;
   }
 
+  private class FakeMonitor extends MonitoringUtil {
+    // Messages in timestamp order
+    private final NavigableMap<Long, JobMessage> timestampedMessages;
+
+    public FakeMonitor(JobMessage... messages) {
+      // The client should never be used; this Fake is intended to intercept 
relevant methods
+      super(mockDataflowClient);
+
+      NavigableMap<Long, JobMessage> timestampedMessages = Maps.newTreeMap();
+      for (JobMessage message : messages) {
+        timestampedMessages.put(Long.parseLong(message.getTime()), message);
+      }
+
+      this.timestampedMessages = timestampedMessages;
+    }
+
+    @Override
+    public List<JobMessage> getJobMessages(String jobId, long 
startTimestampMs) {
+      return 
ImmutableList.copyOf(timestampedMessages.headMap(startTimestampMs).values());
+    }
+  }
+
   private static class ZeroSleeper implements Sleeper {
     @Override
     public void sleep(long l) throws InterruptedException {}
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ReaderCache.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ReaderCache.java
index 3329c1e..58f3abf 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ReaderCache.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ReaderCache.java
@@ -19,6 +19,7 @@ package org.apache.beam.runners.dataflow.worker;
 
 import java.io.IOException;
 import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.beam.sdk.io.UnboundedSource;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
@@ -70,7 +71,7 @@ class ReaderCache {
     this.invalidationExecutor = invalidationExecutor;
     this.cache =
         CacheBuilder.newBuilder()
-            
.expireAfterWrite(java.time.Duration.ofMillis(cacheDuration.getMillis()))
+            .expireAfterWrite(cacheDuration.getMillis(), TimeUnit.MILLISECONDS)
             .removalListener(
                 (RemovalNotification<WindmillComputationKey, CacheEntry> 
notification) -> {
                   if (notification.getCause() != RemovalCause.EXPLICIT) {
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StateFetcher.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StateFetcher.java
index aeb3f62..db5256a 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StateFetcher.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StateFetcher.java
@@ -20,11 +20,11 @@ package org.apache.beam.runners.dataflow.worker;
 import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
 
 import java.io.Closeable;
-import java.time.Duration;
 import java.util.Collections;
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
 import org.apache.beam.runners.core.InMemoryMultimapSideInputView;
 import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
 import org.apache.beam.sdk.coders.Coder;
@@ -71,7 +71,7 @@ class StateFetcher {
         server,
         CacheBuilder.newBuilder()
             .maximumWeight(100000000 /* 100 MB */)
-            .expireAfterWrite(Duration.ofMinutes(1))
+            .expireAfterWrite(1, TimeUnit.MINUTES)
             .weigher((Weigher<SideInputId, SideInputCacheEntry>) (id, entry) 
-> entry.size())
             .build());
   }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index 5418e4f..72fc234 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -412,7 +412,7 @@ public class StreamingDataflowWorker {
   // Using Cache with time eviction policy helps us to prevent memory leak 
when callback ids are
   // discarded by Dataflow service and calling commitCallback is best-effort.
   private final Cache<Long, Runnable> commitCallbacks =
-      
CacheBuilder.newBuilder().expireAfterWrite(java.time.Duration.ofMinutes(5L)).build();
+      CacheBuilder.newBuilder().expireAfterWrite(5L, TimeUnit.MINUTES).build();
 
   // Map of user state names to system state names.
   // TODO(drieber): obsolete stateNameMap. Use transformUserNameToStateFamily 
in
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/data/RemoteGrpcPortWriteOperation.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/data/RemoteGrpcPortWriteOperation.java
index ad8a071..aef324e 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/data/RemoteGrpcPortWriteOperation.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/data/RemoteGrpcPortWriteOperation.java
@@ -153,8 +153,8 @@ public class RemoteGrpcPortWriteOperation<T> extends 
ReceivingOperation {
   public Consumer<Integer> processedElementsConsumer() {
     usingElementsProcessed = true;
     return elementsProcessed -> {
-      lock.lock();
       try {
+        lock.lock();
         this.elementsProcessed.set(elementsProcessed);
         condition.signal();
       } finally {
@@ -168,8 +168,8 @@ public class RemoteGrpcPortWriteOperation<T> extends 
ReceivingOperation {
 
   private void maybeWait() throws Exception {
     if (shouldWait()) {
-      lock.lock();
       try {
+        lock.lock();
         while (shouldWait()) {
           LOG.debug(
               "Throttling elements at {} until more than {} elements been 
processed.",
@@ -185,8 +185,8 @@ public class RemoteGrpcPortWriteOperation<T> extends 
ReceivingOperation {
 
   public void abortWait() {
     usingElementsProcessed = false;
-    lock.lock();
     try {
+      lock.lock();
       condition.signal();
     } finally {
       lock.unlock();
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/CachingShuffleBatchReader.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/CachingShuffleBatchReader.java
index 33dfbc2..4dd13c6 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/CachingShuffleBatchReader.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/CachingShuffleBatchReader.java
@@ -18,8 +18,8 @@
 package org.apache.beam.runners.dataflow.worker.util.common.worker;
 
 import java.io.IOException;
-import java.time.Duration;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Objects;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Throwables;
@@ -49,7 +49,7 @@ public class CachingShuffleBatchReader implements 
ShuffleBatchReader {
     this.cache =
         CacheBuilder.newBuilder()
             .maximumSize(maximumBatches)
-            .expireAfterAccess(Duration.ofMillis(expireAfterAccessMillis))
+            .expireAfterAccess(expireAfterAccessMillis, TimeUnit.MILLISECONDS)
             .<BatchRange, Batch>build(
                 new CacheLoader<BatchRange, Batch>() {
                   @Override
diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java
index 6a3f9a7..b9851ca 100644
--- 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java
@@ -19,7 +19,6 @@ package org.apache.beam.runners.fnexecution.control;
 
 import com.google.auto.value.AutoValue;
 import java.io.IOException;
-import java.time.Duration;
 import java.util.IdentityHashMap;
 import java.util.Map;
 import java.util.Set;
@@ -27,6 +26,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -204,11 +204,11 @@ public class DefaultJobBundleFactory implements 
JobBundleFactory {
                   notification -> {
                     WrappedSdkHarnessClient client = notification.getValue();
                     final int refCount;
-                    // We need to use a lock here to ensure we are not causing 
the environment to
-                    // be removed if beforehand a StageBundleFactory has 
retrieved it but not yet
-                    // issued ref() on it.
-                    refLock.lock();
                     try {
+                      // We need to use a lock here to ensure we are not 
causing the environment to
+                      // be removed if beforehand a StageBundleFactory has 
retrieved it but not yet
+                      // issued ref() on it.
+                      refLock.lock();
                       refCount = client.unref();
                     } finally {
                       refLock.unlock();
@@ -223,7 +223,7 @@ public class DefaultJobBundleFactory implements 
JobBundleFactory {
                   });
 
       if (environmentExpirationMillis > 0) {
-        
cacheBuilder.expireAfterWrite(Duration.ofMillis(environmentExpirationMillis));
+        cacheBuilder.expireAfterWrite(environmentExpirationMillis, 
TimeUnit.MILLISECONDS);
       }
 
       LoadingCache<Environment, WrappedSdkHarnessClient> cache =
@@ -474,8 +474,8 @@ public class DefaultJobBundleFactory implements 
JobBundleFactory {
         currentCache = availableCaches.take();
         // Lock because the environment expiration can remove the ref for the 
client
         // which would close the underlying environment before we can ref it.
-        currentCache.lock.lock();
         try {
+          currentCache.lock.lock();
           client = 
currentCache.cache.getUnchecked(executableStage.getEnvironment());
           client.ref();
         } finally {
@@ -494,8 +494,8 @@ public class DefaultJobBundleFactory implements 
JobBundleFactory {
         currentCache = environmentCaches.get(environmentIndex);
         // Lock because the environment expiration can remove the ref for the 
client which would
         // close the underlying environment before we can ref it.
-        currentCache.lock.lock();
         try {
+          currentCache.lock.lock();
           client = 
currentCache.cache.getUnchecked(executableStage.getEnvironment());
           client.ref();
         } finally {
diff --git 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
index 3cac2ce..8c9e150 100644
--- 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
+++ 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
@@ -801,9 +801,11 @@ public class RemoteExecutionTest implements Serializable {
             stateDelegator);
 
     Map<String, Coder> remoteOutputCoders = descriptor.getRemoteOutputCoders();
+    Map<String, Collection<WindowedValue<?>>> outputValues = new HashMap<>();
     Map<String, RemoteOutputReceiver<?>> outputReceivers = new HashMap<>();
     for (Entry<String, Coder> remoteOutputCoder : 
remoteOutputCoders.entrySet()) {
       List<WindowedValue<?>> outputContents = Collections.synchronizedList(new 
ArrayList<>());
+      outputValues.put(remoteOutputCoder.getKey(), outputContents);
       outputReceivers.put(
           remoteOutputCoder.getKey(),
           RemoteOutputReceiver.of(
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
index d07416e..685c1a7 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
@@ -88,7 +88,7 @@ public class MicrobatchSource<T, CheckpointMarkT extends 
UnboundedSource.Checkpo
       LOG.info("Creating reader cache. Cache interval = {} ms.", 
readerCacheInterval);
       readerCache =
           CacheBuilder.newBuilder()
-              
.expireAfterAccess(java.time.Duration.ofMillis(readerCacheInterval))
+              .expireAfterAccess(readerCacheInterval, TimeUnit.MILLISECONDS)
               .removalListener(new ReaderCacheRemovalListener())
               .build();
     }
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/utils/SideInputStorage.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/utils/SideInputStorage.java
index dadc18e..acf4e05 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/utils/SideInputStorage.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/utils/SideInputStorage.java
@@ -17,8 +17,8 @@
  */
 package org.apache.beam.runners.spark.structuredstreaming.translation.utils;
 
-import java.time.Duration;
 import java.util.Objects;
+import java.util.concurrent.TimeUnit;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.Cache;
@@ -36,7 +36,7 @@ class SideInputStorage {
 
   /** JVM deserialized side input cache. */
   private static final Cache<Key<?>, Value<?>> materializedSideInputs =
-      
CacheBuilder.newBuilder().expireAfterAccess(Duration.ofMinutes(5)).build();
+      CacheBuilder.newBuilder().expireAfterAccess(5, TimeUnit.MINUTES).build();
 
   static Cache<Key<?>, Value<?>> getMaterializedSideInputs() {
     return materializedSideInputs;
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/GlobalWatermarkHolder.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/GlobalWatermarkHolder.java
index f7c1eab..5468a27 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/GlobalWatermarkHolder.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/GlobalWatermarkHolder.java
@@ -20,12 +20,12 @@ package org.apache.beam.runners.spark.util;
 import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
 
 import java.io.Serializable;
-import java.time.Duration;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 import javax.annotation.Nonnull;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
@@ -137,7 +137,7 @@ public class GlobalWatermarkHolder {
       createWatermarkCache(final Long batchDuration) {
     return CacheBuilder.newBuilder()
         // expire watermarks every half batch duration to ensure they update 
in every batch.
-        .expireAfterWrite(Duration.ofMillis(batchDuration / 2))
+        .expireAfterWrite(batchDuration / 2, TimeUnit.MILLISECONDS)
         .build(new WatermarksLoader());
   }
 
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SideInputStorage.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SideInputStorage.java
index da796b5..08fefe2 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SideInputStorage.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SideInputStorage.java
@@ -17,8 +17,8 @@
  */
 package org.apache.beam.runners.spark.util;
 
-import java.time.Duration;
 import java.util.Objects;
+import java.util.concurrent.TimeUnit;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.Cache;
@@ -36,7 +36,7 @@ class SideInputStorage {
 
   /** JVM deserialized side input cache. */
   private static final Cache<Key<?>, Value<?>> materializedSideInputs =
-      
CacheBuilder.newBuilder().expireAfterAccess(Duration.ofMinutes(5)).build();
+      CacheBuilder.newBuilder().expireAfterAccess(5, TimeUnit.MINUTES).build();
 
   static Cache<Key<?>, Value<?>> getMaterializedSideInputs() {
     return materializedSideInputs;
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
index 0be7626..52e87d5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
@@ -28,6 +28,7 @@ import java.io.Serializable;
 import java.util.Arrays;
 import java.util.List;
 import java.util.NoSuchElementException;
+import java.util.concurrent.TimeUnit;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.InstantCoder;
@@ -470,7 +471,7 @@ public class Read {
       restrictionCoder = restrictionCoder();
       cachedReaders =
           CacheBuilder.newBuilder()
-              .expireAfterWrite(java.time.Duration.ofMinutes(1))
+              .expireAfterWrite(1, TimeUnit.MINUTES)
               .maximumSize(100)
               .removalListener(
                   (RemovalListener<Object, UnboundedReader>)
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaCoder.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaCoder.java
index 959c863..db64718 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaCoder.java
@@ -219,6 +219,26 @@ public class SchemaCoder<T> extends CustomCoder<T> {
     return Objects.hash(schema, typeDescriptor, toRowFunction, 
fromRowFunction);
   }
 
+  private static class RowIdentity implements SerializableFunction<Row, Row> {
+    @Override
+    public Row apply(Row input) {
+      return input;
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(getClass());
+    }
+
+    @Override
+    public boolean equals(@Nullable Object o) {
+      if (this == o) {
+        return true;
+      }
+      return o != null && getClass() == o.getClass();
+    }
+  }
+
   @Override
   public TypeDescriptor<T> getEncodedTypeDescriptor() {
     return this.typeDescriptor;
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java
index ca2b85e..360c1af 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java
@@ -680,6 +680,65 @@ public class PCollectionViews {
       public ListIterator<T> listIterator() {
         return super.listIterator();
       }
+
+      /** A {@link ListIterator} over {@link MultimapView} adapter. */
+      private class ListIteratorOverMultimapView implements ListIterator<T> {
+        private int position;
+
+        @Override
+        public boolean hasNext() {
+          return position < size();
+        }
+
+        @Override
+        public T next() {
+          if (!hasNext()) {
+            throw new NoSuchElementException();
+          }
+          T rval = get(position);
+          position += 1;
+          return rval;
+        }
+
+        @Override
+        public boolean hasPrevious() {
+          return position > 0;
+        }
+
+        @Override
+        public T previous() {
+          if (!hasPrevious()) {
+            throw new NoSuchElementException();
+          }
+          position -= 1;
+          return get(position);
+        }
+
+        @Override
+        public int nextIndex() {
+          return position;
+        }
+
+        @Override
+        public int previousIndex() {
+          return position - 1;
+        }
+
+        @Override
+        public void remove() {
+          throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void set(T e) {
+          throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void add(T e) {
+          throw new UnsupportedOperationException();
+        }
+      }
     }
   }
 
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
index cd50db4..954eb7a 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
@@ -62,6 +62,10 @@ public class CoderRegistryTest {
 
   @Rule public ExpectedLogs expectedLogs = 
ExpectedLogs.none(CoderRegistry.class);
 
+  private static class SerializableClass implements Serializable {}
+
+  private static class NotSerializableClass {}
+
   @Test
   public void testRegisterInstantiatedCoder() throws Exception {
     CoderRegistry registry = CoderRegistry.createDefault();
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/ExpectedLogsTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/ExpectedLogsTest.java
index 14dffa2..896ca69 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/ExpectedLogsTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/ExpectedLogsTest.java
@@ -21,13 +21,13 @@ import static 
org.apache.beam.sdk.testing.SystemNanoTimeSleeper.sleepMillis;
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
-import java.time.Duration;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
 import java.util.concurrent.CompletionService;
 import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import java.util.logging.LogRecord;
 import org.hamcrest.TypeSafeMatcher;
 import org.junit.Rule;
@@ -143,7 +143,8 @@ public class ExpectedLogsTest {
   public void testThreadSafetyOfLogSaver() throws Throwable {
     CompletionService<Void> completionService =
         new ExecutorCompletionService<>(Executors.newCachedThreadPool());
-    final long scheduledLogTime = 
Duration.ofNanos(System.nanoTime()).toMillis() + 500L;
+    final long scheduledLogTime =
+        TimeUnit.MILLISECONDS.convert(System.nanoTime(), TimeUnit.NANOSECONDS) 
+ 500L;
 
     List<String> expectedStrings = new ArrayList<>();
     for (int i = 0; i < 100; i++) {
@@ -153,7 +154,10 @@ public class ExpectedLogsTest {
           () -> {
             // Have all threads started and waiting to log at about the same 
moment.
             sleepMillis(
-                Math.max(1, scheduledLogTime - 
Duration.ofNanos(System.nanoTime()).toMillis()));
+                Math.max(
+                    1,
+                    scheduledLogTime
+                        - TimeUnit.MILLISECONDS.convert(System.nanoTime(), 
TimeUnit.NANOSECONDS)));
             LOG.trace(expected);
             return null;
           });
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SystemNanoTimeSleeper.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SystemNanoTimeSleeper.java
index fd716b9..47a28ef 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SystemNanoTimeSleeper.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SystemNanoTimeSleeper.java
@@ -17,7 +17,7 @@
  */
 package org.apache.beam.sdk.testing;
 
-import java.time.Duration;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.LockSupport;
 import org.apache.beam.sdk.util.Sleeper;
 
@@ -41,7 +41,7 @@ public class SystemNanoTimeSleeper implements Sleeper {
   @Override
   public void sleep(long millis) throws InterruptedException {
     long currentTime;
-    long endTime = System.nanoTime() + Duration.ofMillis(millis).toNanos();
+    long endTime = System.nanoTime() + TimeUnit.NANOSECONDS.convert(millis, 
TimeUnit.MILLISECONDS);
     while ((currentTime = System.nanoTime()) < endTime) {
       if (Thread.interrupted()) {
         throw new InterruptedException();
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
index 9166a33..90e0106 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
@@ -1602,6 +1602,15 @@ public class DoFnSignaturesTest {
         ProcessContext context, @StateId(STATE_ID) ValueState<String> state);
   }
 
+  private abstract static class DoFnDeclaringMyTimerId extends DoFn<KV<String, 
Integer>, Long> {
+
+    @TimerId("my-timer-id")
+    private final TimerSpec bizzle = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+    @ProcessElement
+    public void foo(ProcessContext context) {}
+  }
+
   private abstract static class DoFnDeclaringTimerAndCallback
       extends DoFn<KV<String, Integer>, Long> {
     public static final String TIMER_ID = "my-timer-id";
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/GrowableOffsetRangeTrackerTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/GrowableOffsetRangeTrackerTest.java
index eb7d8ca..084cb5c 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/GrowableOffsetRangeTrackerTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/GrowableOffsetRangeTrackerTest.java
@@ -183,7 +183,7 @@ public class GrowableOffsetRangeTrackerTest {
     tracker.checkDone();
     simpleEstimator.setEstimateRangeEnd(0L);
     Progress currentProgress = tracker.getProgress();
-    assertEquals(Long.MAX_VALUE - 10L, currentProgress.getWorkCompleted(), 0);
+    assertEquals(Long.MAX_VALUE - 10L, currentProgress.getWorkCompleted(), 
0.001);
     assertEquals(0, currentProgress.getWorkRemaining(), 0.001);
   }
 
diff --git 
a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/TimestampExtractTransform.java
 
b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/TimestampExtractTransform.java
index 999ece6..3ed5083 100644
--- 
a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/TimestampExtractTransform.java
+++ 
b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/TimestampExtractTransform.java
@@ -86,6 +86,14 @@ public class TimestampExtractTransform<InputT, OutputT>
     }
   }
 
+  private static class Unwrap<T> extends DoFn<KV<Long, T>, T> {
+
+    @ProcessElement
+    public void processElement(ProcessContext ctx) {
+      ctx.output(ctx.element().getValue());
+    }
+  }
+
   private final PCollectionTransform<InputT, OutputT> timestampedTransform;
 
   private TimestampExtractTransform(
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTable.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTable.java
index a333ba9..7d9d61b 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTable.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTable.java
@@ -19,7 +19,6 @@ package 
org.apache.beam.sdk.extensions.sql.meta.provider.kafka;
 
 import static 
org.apache.beam.vendor.calcite.v1_28_0.com.google.common.base.Preconditions.checkArgument;
 
-import java.time.Duration;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
@@ -223,7 +222,7 @@ public abstract class BeamKafkaTable extends 
SchemaBaseBeamTable {
       throw new NoEstimationException("There is no partition with messages in 
it.");
     }
 
-    ConsumerRecords<T, T> records = consumer.poll(Duration.ofSeconds(1));
+    ConsumerRecords<T, T> records = consumer.poll(1000);
 
     // Kafka guarantees the delivery of messages in order they arrive to each 
partition.
     // Therefore the first message seen from each partition is the first 
message arrived to that.
diff --git 
a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/CancellableQueue.java
 
b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/CancellableQueue.java
index 3d1cded..e58f09b 100644
--- 
a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/CancellableQueue.java
+++ 
b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/CancellableQueue.java
@@ -112,8 +112,8 @@ public class CancellableQueue<T extends @NonNull Object> {
    * queue clears the exception.
    */
   public void cancel(Exception exception) {
-    lock.lock();
     try {
+      lock.lock();
       cancellationException = exception;
       notEmpty.signalAll();
       notFull.signalAll();
@@ -124,8 +124,8 @@ public class CancellableQueue<T extends @NonNull Object> {
 
   /** Enables the queue to be re-used after it has been cancelled. */
   public void reset() {
-    lock.lock();
     try {
+      lock.lock();
       cancellationException = null;
       addIndex = 0;
       takeIndex = 0;
diff --git 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/StateFetchingIteratorsTest.java
 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/StateFetchingIteratorsTest.java
index c9ab166..ae897cb 100644
--- 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/StateFetchingIteratorsTest.java
+++ 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/StateFetchingIteratorsTest.java
@@ -272,8 +272,8 @@ public class StateFetchingIteratorsTest {
       }
       assertFalse(valuesIter2.hasNext());
       assertTrue(valuesIter2.isReady());
+
       // The contents agree.
-      assertArrayEquals(expected, Iterables.toArray(results, Object.class));
       assertArrayEquals(expected, Iterables.toArray(values, Object.class));
     }
   }
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
index c214fda..21c7a46 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
@@ -29,6 +29,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
@@ -82,7 +83,7 @@ public class StorageApiWriteUnshardedRecords<DestinationT, 
ElementT>
 
   private static final Cache<String, StreamAppendClient> APPEND_CLIENTS =
       CacheBuilder.newBuilder()
-          .expireAfterAccess(java.time.Duration.ofMinutes(5))
+          .expireAfterAccess(5, TimeUnit.MINUTES)
           .removalListener(
               (RemovalNotification<String, StreamAppendClient> removal) -> {
                 @Nullable final StreamAppendClient streamAppendClient = 
removal.getValue();
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
index ede5129..3d6e33c 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
@@ -33,6 +33,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import java.util.function.Function;
@@ -101,7 +102,7 @@ public class StorageApiWritesShardedRecords<DestinationT, 
ElementT>
 
   private static final Cache<String, StreamAppendClient> APPEND_CLIENTS =
       CacheBuilder.newBuilder()
-          .expireAfterAccess(java.time.Duration.ofMinutes(5))
+          .expireAfterAccess(5, TimeUnit.MINUTES)
           .removalListener(
               (RemovalNotification<String, StreamAppendClient> removal) -> {
                 @Nullable final StreamAppendClient streamAppendClient = 
removal.getValue();
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/LimitingTopicBacklogReader.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/LimitingTopicBacklogReader.java
index cd2adcc1..8108d09 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/LimitingTopicBacklogReader.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/LimitingTopicBacklogReader.java
@@ -24,7 +24,7 @@ import com.google.api.gax.rpc.ApiException;
 import com.google.cloud.pubsublite.Offset;
 import com.google.cloud.pubsublite.proto.ComputeMessageStatsResponse;
 import com.google.errorprone.annotations.concurrent.GuardedBy;
-import java.time.Duration;
+import java.util.concurrent.TimeUnit;
 import javax.annotation.Nullable;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Ticker;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheBuilder;
@@ -46,8 +46,8 @@ final class LimitingTopicBacklogReader implements 
TopicBacklogReader {
         CacheBuilder.newBuilder()
             .ticker(ticker)
             .maximumSize(1)
-            .expireAfterWrite(Duration.ofMinutes(1))
-            .refreshAfterWrite(Duration.ofSeconds(10))
+            .expireAfterWrite(1, TimeUnit.MINUTES)
+            .refreshAfterWrite(10, TimeUnit.SECONDS)
             .build(
                 new CacheLoader<String, ComputeMessageStatsResponse>() {
                   @Override
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java
index e34863e..faff06e 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java
@@ -33,7 +33,13 @@ import com.google.spanner.v1.CommitRequest;
 import com.google.spanner.v1.CommitResponse;
 import com.google.spanner.v1.ExecuteSqlRequest;
 import com.google.spanner.v1.PartialResultSet;
+import io.grpc.CallOptions;
+import io.grpc.Channel;
+import io.grpc.ClientCall;
+import io.grpc.ClientInterceptor;
+import io.grpc.MethodDescriptor;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.util.ReleaseInfo;
@@ -200,4 +206,22 @@ public class SpannerAccessor implements AutoCloseable {
       }
     }
   }
+
+  private static class CommitDeadlineSettingInterceptor implements 
ClientInterceptor {
+    private final long commitDeadlineMilliseconds;
+
+    private CommitDeadlineSettingInterceptor(Duration commitDeadline) {
+      this.commitDeadlineMilliseconds = commitDeadline.getMillis();
+    }
+
+    @Override
+    public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
+        MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel 
next) {
+      if 
(method.getFullMethodName().equals("google.spanner.v1.Spanner/Commit")) {
+        callOptions =
+            callOptions.withDeadlineAfter(commitDeadlineMilliseconds, 
TimeUnit.MILLISECONDS);
+      }
+      return next.newCall(method, callOptions);
+    }
+  }
 }
diff --git 
a/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/TestRowDBWritable.java
 
b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/TestRowDBWritable.java
index 2d10bdb..f4e3677 100644
--- 
a/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/TestRowDBWritable.java
+++ 
b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/TestRowDBWritable.java
@@ -26,6 +26,7 @@ import java.sql.SQLException;
 import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.DefaultCoder;
 import org.apache.beam.sdk.io.common.TestRow;
+import org.apache.beam.sdk.io.jdbc.JdbcIO;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.lib.db.DBWritable;
 
@@ -79,4 +80,13 @@ class TestRowDBWritable extends TestRow implements 
DBWritable, Writable {
     id = in.readInt();
     name = in.readUTF();
   }
+
+  private static class PrepareStatementFromTestRow
+      implements JdbcIO.PreparedStatementSetter<TestRow> {
+    @Override
+    public void setParameters(TestRow element, PreparedStatement statement) 
throws SQLException {
+      statement.setLong(1, element.id());
+      statement.setString(2, element.name());
+    }
+  }
 }
diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java
 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java
index 8ad0aff..75c8270 100644
--- 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java
+++ 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java
@@ -24,7 +24,6 @@ import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Prec
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import java.io.IOException;
-import java.time.Duration;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -642,7 +641,7 @@ class KafkaExactlyOnceSink<K, V>
       ShardWriterCache() {
         this.cache =
             CacheBuilder.newBuilder()
-                .expireAfterWrite(Duration.ofMillis(IDLE_TIMEOUT_MS))
+                .expireAfterWrite(IDLE_TIMEOUT_MS, TimeUnit.MILLISECONDS)
                 .<Integer, ShardWriter<K, V>>removalListener(
                     notification -> {
                       if (notification.getCause() != RemovalCause.EXPLICIT) {
diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java
 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java
index de70ac0..aaeb1b4 100644
--- 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java
+++ 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java
@@ -329,7 +329,7 @@ class KafkaUnboundedReader<K, V> extends 
UnboundedReader<KafkaRecord<K, V>> {
    * good to experiment. Often multiple marks would be finalized in a batch, 
it it reduce
    * finalization overhead to wait a short while and finalize only the last 
checkpoint mark.
    */
-  private static final java.time.Duration KAFKA_POLL_TIMEOUT = 
java.time.Duration.ofSeconds(1);
+  private static final Duration KAFKA_POLL_TIMEOUT = Duration.millis(1000);
 
   private static final Duration RECORDS_DEQUEUE_POLL_TIMEOUT = 
Duration.millis(10);
   private static final Duration RECORDS_ENQUEUE_POLL_TIMEOUT = 
Duration.millis(100);
@@ -520,7 +520,7 @@ class KafkaUnboundedReader<K, V> extends 
UnboundedReader<KafkaRecord<K, V>> {
       while (!closed.get()) {
         try {
           if (records.isEmpty()) {
-            records = consumer.poll(KAFKA_POLL_TIMEOUT);
+            records = consumer.poll(KAFKA_POLL_TIMEOUT.getMillis());
           } else if (availableRecordsQueue.offer(
               records, RECORDS_ENQUEUE_POLL_TIMEOUT.getMillis(), 
TimeUnit.MILLISECONDS)) {
             records = ConsumerRecords.empty();
diff --git 
a/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSourceTest.java 
b/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSourceTest.java
index 98af830..d45942e 100644
--- 
a/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSourceTest.java
+++ 
b/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSourceTest.java
@@ -540,8 +540,10 @@ public class XmlSourceTest {
     exception.expectMessage("MyCustomValidationEventHandler failure mesage");
     try (Reader<WrongTrainType> reader = source.createReader(null)) {
 
+      List<WrongTrainType> results = new ArrayList<>();
       for (boolean available = reader.start(); available; available = 
reader.advance()) {
-        reader.getCurrent();
+        WrongTrainType train = reader.getCurrent();
+        results.add(train);
       }
     }
   }

Reply via email to