This is an automated email from the ASF dual-hosted git repository.
mmack pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 86eeb03536e [BEAM-14363] Fixes WatermarkParameters builder for Kinesis
new 64dc9c62dce Merge pull request #17458 from
nickcaballero/fix-kinesis-watermark-parameters
86eeb03536e is described below
commit 86eeb03536e4dc3a2855064103da09e2a2e79a63
Author: Nick Caballero <[email protected]>
AuthorDate: Mon Apr 25 12:17:13 2022 -0400
[BEAM-14363] Fixes WatermarkParameters builder for Kinesis
---
.../beam/sdk/io/aws2/kinesis/WatermarkParameters.java | 4 ++--
.../beam/sdk/io/aws2/kinesis/WatermarkPolicyTest.java | 14 ++++++++++++++
.../apache/beam/sdk/io/kinesis/WatermarkParameters.java | 4 ++--
.../apache/beam/sdk/io/kinesis/WatermarkPolicyTest.java | 14 ++++++++++++++
4 files changed, 32 insertions(+), 4 deletions(-)
diff --git
a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/WatermarkParameters.java
b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/WatermarkParameters.java
index 520fd374703..55c36d1128b 100644
---
a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/WatermarkParameters.java
+++
b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/WatermarkParameters.java
@@ -84,7 +84,7 @@ public abstract class WatermarkParameters implements
Serializable {
public WatermarkParameters withTimestampFn(
SerializableFunction<KinesisRecord, Instant> timestampFn) {
checkArgument(timestampFn != null, "timestampFn function is null");
- return builder().setTimestampFn(timestampFn).build();
+ return toBuilder().setTimestampFn(timestampFn).build();
}
/**
@@ -93,6 +93,6 @@ public abstract class WatermarkParameters implements
Serializable {
*/
public WatermarkParameters withWatermarkIdleDurationThreshold(Duration
idleDurationThreshold) {
checkArgument(idleDurationThreshold != null, "watermark idle duration
threshold is null");
- return
builder().setWatermarkIdleDurationThreshold(idleDurationThreshold).build();
+ return
toBuilder().setWatermarkIdleDurationThreshold(idleDurationThreshold).build();
}
}
diff --git
a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/kinesis/WatermarkPolicyTest.java
b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/kinesis/WatermarkPolicyTest.java
index 896e18e31ae..a1c8adffb3d 100644
---
a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/kinesis/WatermarkPolicyTest.java
+++
b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/kinesis/WatermarkPolicyTest.java
@@ -147,4 +147,18 @@ public class WatermarkPolicyTest {
policy.update(b);
assertThat(policy.getWatermark()).isEqualTo(time2.plus(Duration.standardMinutes(1)));
}
+
+ @Test
+ public void shouldUpdateWatermarkParameters() {
+ SerializableFunction<KinesisRecord, Instant> fn = input -> Instant.now();
+ Duration idleDurationThreshold = Duration.standardSeconds(30);
+
+ WatermarkParameters parameters =
+ WatermarkParameters.create()
+ .withTimestampFn(fn)
+ .withWatermarkIdleDurationThreshold(idleDurationThreshold);
+
+ assertThat(parameters.getTimestampFn()).isEqualTo(fn);
+
assertThat(parameters.getWatermarkIdleDurationThreshold()).isEqualTo(idleDurationThreshold);
+ }
}
diff --git
a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/WatermarkParameters.java
b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/WatermarkParameters.java
index 704e1ff0ef6..1e9ca2174a7 100644
---
a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/WatermarkParameters.java
+++
b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/WatermarkParameters.java
@@ -84,7 +84,7 @@ public abstract class WatermarkParameters implements
Serializable {
public WatermarkParameters withTimestampFn(
SerializableFunction<KinesisRecord, Instant> timestampFn) {
checkArgument(timestampFn != null, "timestampFn function is null");
- return builder().setTimestampFn(timestampFn).build();
+ return toBuilder().setTimestampFn(timestampFn).build();
}
/**
@@ -93,6 +93,6 @@ public abstract class WatermarkParameters implements
Serializable {
*/
public WatermarkParameters withWatermarkIdleDurationThreshold(Duration
idleDurationThreshold) {
checkArgument(idleDurationThreshold != null, "watermark idle duration
threshold is null");
- return
builder().setWatermarkIdleDurationThreshold(idleDurationThreshold).build();
+ return
toBuilder().setWatermarkIdleDurationThreshold(idleDurationThreshold).build();
}
}
diff --git
a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/WatermarkPolicyTest.java
b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/WatermarkPolicyTest.java
index 11dd07dc9a7..ce5c555a4df 100644
---
a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/WatermarkPolicyTest.java
+++
b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/WatermarkPolicyTest.java
@@ -149,4 +149,18 @@ public class WatermarkPolicyTest {
policy.update(b);
assertThat(policy.getWatermark()).isEqualTo(time2.plus(Duration.standardMinutes(1)));
}
+
+ @Test
+ public void shouldUpdateWatermarkParameters() {
+ SerializableFunction<KinesisRecord, Instant> fn = input -> Instant.now();
+ Duration idleDurationThreshold = Duration.standardSeconds(30);
+
+ WatermarkParameters parameters =
+ WatermarkParameters.create()
+ .withTimestampFn(fn)
+ .withWatermarkIdleDurationThreshold(idleDurationThreshold);
+
+ assertThat(parameters.getTimestampFn()).isEqualTo(fn);
+
assertThat(parameters.getWatermarkIdleDurationThreshold()).isEqualTo(idleDurationThreshold);
+ }
}