This is an automated email from the ASF dual-hosted git repository.
yichi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 86ad8af4da1 [BEAM-14253] patch SubscriptionPartitionLoader to work
around a dataflow bug (#17523)
86ad8af4da1 is described below
commit 86ad8af4da189fdefc387d29a194bb6f6ddbf3f3
Author: dpcollins-google <[email protected]>
AuthorDate: Mon May 2 14:12:33 2022 -0400
[BEAM-14253] patch SubscriptionPartitionLoader to work around a dataflow
bug (#17523)
---
.../io/gcp/pubsublite/internal/SubscriptionPartitionLoader.java | 7 ++++---
.../java/org/apache/beam/sdk/io/gcp/pubsublite/ReadWriteIT.java | 2 --
2 files changed, 4 insertions(+), 5 deletions(-)
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscriptionPartitionLoader.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscriptionPartitionLoader.java
index 3a21f85be99..1d04cdd3575 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscriptionPartitionLoader.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscriptionPartitionLoader.java
@@ -34,7 +34,6 @@ import
org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.splittabledofn.SplitResult;
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
@@ -73,8 +72,10 @@ class SubscriptionPartitionLoader extends PTransform<PBegin,
PCollection<Subscri
@GetInitialWatermarkEstimatorState
public Instant getInitialWatermarkEstimatorState(@Timestamp Instant
initial) {
- checkArgument(initial.equals(BoundedWindow.TIMESTAMP_MIN_VALUE));
- return initial;
+ // TODO: Add back when dataflow is fixed.
+ // checkArgument(initial.equals(BoundedWindow.TIMESTAMP_MIN_VALUE));
+ // return initial;
+ return Instant.EPOCH;
}
@GetInitialRestriction
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/ReadWriteIT.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/ReadWriteIT.java
index 60d11cc712f..c02bd97241a 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/ReadWriteIT.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/ReadWriteIT.java
@@ -61,7 +61,6 @@ import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier;
import org.joda.time.Duration;
import org.junit.After;
-import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -227,7 +226,6 @@ public class ReadWriteIT {
}
@Test
- @Ignore("BEAM-14253")
public void testReadWrite() throws Exception {
pipeline.getOptions().as(StreamingOptions.class).setStreaming(true);
pipeline.getOptions().as(TestPipelineOptions.class).setBlockOnRun(false);