This is an automated email from the ASF dual-hosted git repository.
lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new ff7964c [BEAM-8006] Add retracting to windowing strategy translation.
ff7964c is described below
commit ff7964c7252c8a0c670f69bb4291230ca6136afd
Author: amaliujia <[email protected]>
AuthorDate: Tue Aug 20 15:29:29 2019 -0700
[BEAM-8006] Add retracting to windowing strategy translation.
---
.../core/construction/WindowingStrategyTranslation.java | 4 ++++
.../core/construction/WindowingStrategyTranslationTest.java | 7 +++++++
.../java/org/apache/beam/sdk/values/WindowingStrategy.java | 12 ++++++++++--
3 files changed, 21 insertions(+), 2 deletions(-)
diff --git
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
index 5fd0f33..a57aa9b 100644
---
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
+++
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
@@ -57,6 +57,8 @@ public class WindowingStrategyTranslation implements
Serializable {
return AccumulationMode.DISCARDING_FIRED_PANES;
case ACCUMULATING:
return AccumulationMode.ACCUMULATING_FIRED_PANES;
+ case RETRACTING:
+ return AccumulationMode.RETRACTING_FIRED_PANES;
case UNRECOGNIZED:
default:
// Whether or not it is proto that cannot recognize it (due to the
version of the
@@ -77,6 +79,8 @@ public class WindowingStrategyTranslation implements
Serializable {
return RunnerApi.AccumulationMode.Enum.DISCARDING;
case ACCUMULATING_FIRED_PANES:
return RunnerApi.AccumulationMode.Enum.ACCUMULATING;
+ case RETRACTING_FIRED_PANES:
+ return RunnerApi.AccumulationMode.Enum.RETRACTING;
default:
throw new IllegalArgumentException(
String.format(
diff --git
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslationTest.java
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslationTest.java
index eee9c3f..9f50c0b 100644
---
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslationTest.java
+++
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslationTest.java
@@ -85,6 +85,13 @@ public class WindowingStrategyTranslationTest {
.withMode(AccumulationMode.DISCARDING_FIRED_PANES)
.withTrigger(REPRESENTATIVE_TRIGGER)
.withAllowedLateness(Duration.millis(93))
+ .withTimestampCombiner(TimestampCombiner.LATEST)),
+ toProtoAndBackSpec(
+ WindowingStrategy.of(REPRESENTATIVE_WINDOW_FN)
+ .withClosingBehavior(ClosingBehavior.FIRE_IF_NON_EMPTY)
+ .withMode(AccumulationMode.RETRACTING_FIRED_PANES)
+ .withTrigger(REPRESENTATIVE_TRIGGER)
+ .withAllowedLateness(Duration.millis(100))
.withTimestampCombiner(TimestampCombiner.LATEST)));
}
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowingStrategy.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowingStrategy.java
index dfd9562..6b2c4d6 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowingStrategy.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowingStrategy.java
@@ -44,10 +44,18 @@ import org.joda.time.Duration;
*/
public class WindowingStrategy<T, W extends BoundedWindow> implements
Serializable {
- /** The accumulation modes that can be used with windowing. */
+ /**
+ * The accumulation modes that can be used with windowing.
+ *
+ * <p>Experimental {@link AccumulationMode.RETRACTING_FIRED_PANES} for
enabling retractions in
+ * pipelines. There is no backwards-compatibility guarantees.
+ */
public enum AccumulationMode {
DISCARDING_FIRED_PANES,
- ACCUMULATING_FIRED_PANES
+ ACCUMULATING_FIRED_PANES,
+ // RETRACTING_FIRED_PANES is experimental. There is no
backwards-compatibility guarantees.
+ @Experimental
+ RETRACTING_FIRED_PANES,
}
private static final Duration DEFAULT_ALLOWED_LATENESS = Duration.ZERO;