diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java index dcd0f0bd7dfe..647d6ccacb0a 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -698,12 +698,14 @@ public void snapshotState(StateSnapshotContext context) throws Exception { public void onEventTime(InternalTimer<Object, TimerData> timer) throws Exception { // We don't have to cal checkInvokeStartBundle() because it's already called in // processWatermark*(). + timerInternals.cleanupPendingTimer(timer.getNamespace()); fireTimer(timer); } @Override public void onProcessingTime(InternalTimer<Object, TimerData> timer) throws Exception { checkInvokeStartBundle(); + timerInternals.cleanupPendingTimer(timer.getNamespace()); fireTimer(timer); } @@ -714,7 +716,6 @@ public void fireTimer(InternalTimer<?, TimerData> timer) { // This is a user timer, so namespace must be WindowNamespace checkArgument(namespace instanceof WindowNamespace); BoundedWindow window = ((WindowNamespace) namespace).getWindow(); - timerInternals.cleanupPendingTimer(timerData); pushbackDoFnRunner.onTimer( timerData.getTimerId(), window, timerData.getTimestamp(), timerData.getDomain()); } @@ -927,7 +928,7 @@ public TimerInternals timerInternals() { * namespace of the timer and the timer's id. Necessary for supporting removal of existing * timers. In Flink removal of timers can only be done by providing id and time of the timer. */ - private final MapState<String, TimerData> pendingTimersById; + final MapState<String, TimerData> pendingTimersById; private FlinkTimerInternals() { MapStateDescriptor<String, TimerData> pendingTimersByIdStateDescriptor = diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DedupingOperatorTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DedupingOperatorTest.java similarity index 95% rename from runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DedupingOperatorTest.java rename to runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DedupingOperatorTest.java index a6fa3dba6667..3a2c4a376530 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DedupingOperatorTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DedupingOperatorTest.java @@ -15,9 +15,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.runners.flink.streaming; +package org.apache.beam.runners.flink.translation.wrappers.streaming; -import static org.apache.beam.runners.flink.streaming.StreamRecordStripper.stripStreamRecordFromWindowedValue; +import static org.apache.beam.runners.flink.translation.wrappers.streaming.StreamRecordStripper.stripStreamRecordFromWindowedValue; import static org.hamcrest.collection.IsIterableContainingInOrder.contains; import static org.junit.Assert.assertThat; diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java similarity index 99% rename from runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java rename to runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java index 416595cb045d..ed1630c0472a 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java @@ -15,9 +15,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.runners.flink.streaming; +package org.apache.beam.runners.flink.translation.wrappers.streaming; -import static org.apache.beam.runners.flink.streaming.StreamRecordStripper.stripStreamRecordFromWindowedValue; +import static org.apache.beam.runners.flink.translation.wrappers.streaming.StreamRecordStripper.stripStreamRecordFromWindowedValue; import static org.hamcrest.Matchers.emptyIterable; import static org.hamcrest.collection.IsIterableContainingInOrder.contains; import static org.junit.Assert.assertEquals; @@ -31,7 +31,6 @@ import org.apache.beam.runners.flink.FlinkPipelineOptions; import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; import org.apache.beam.runners.flink.translation.types.CoderTypeSerializer; -import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/StreamRecordStripper.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/StreamRecordStripper.java similarity index 96% rename from runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/StreamRecordStripper.java rename to runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/StreamRecordStripper.java index 26a86a4b9f24..c8c7b24924b8 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/StreamRecordStripper.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/StreamRecordStripper.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.runners.flink.streaming; +package org.apache.beam.runners.flink.translation.wrappers.streaming; import javax.annotation.Nullable; import org.apache.beam.sdk.util.WindowedValue; diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/WindowDoFnOperatorTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperatorTest.java similarity index 83% rename from runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/WindowDoFnOperatorTest.java rename to runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperatorTest.java index 56a056f0498c..91114cc90a1a 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/WindowDoFnOperatorTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperatorTest.java @@ -15,17 +15,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.runners.flink.streaming; +package org.apache.beam.runners.flink.translation.wrappers.streaming; import static java.util.Collections.emptyList; import static java.util.Collections.emptyMap; -import static org.apache.beam.runners.flink.streaming.StreamRecordStripper.stripStreamRecordFromWindowedValue; +import static org.apache.beam.runners.flink.translation.wrappers.streaming.StreamRecordStripper.stripStreamRecordFromWindowedValue; import static org.apache.beam.sdk.transforms.windowing.PaneInfo.NO_FIRING; import static org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing.ON_TIME; +import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.core.Is.is; import static org.joda.time.Duration.standardMinutes; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThat; import java.io.ByteArrayOutputStream; import java.nio.ByteBuffer; @@ -33,9 +34,6 @@ import org.apache.beam.runners.core.SystemReduceFn; import org.apache.beam.runners.flink.FlinkPipelineOptions; import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.MultiOutputOutputManagerFactory; -import org.apache.beam.runners.flink.translation.wrappers.streaming.SingletonKeyedWorkItem; -import org.apache.beam.runners.flink.translation.wrappers.streaming.SingletonKeyedWorkItemCoder; -import org.apache.beam.runners.flink.translation.wrappers.streaming.WindowDoFnOperator; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderRegistry; import org.apache.beam.sdk.coders.KvCoder; @@ -119,6 +117,44 @@ public void testRestore() throws Exception { testHarness.close(); } + @Test + public void testTimerCleanupOfPendingTimerList() throws Exception { + // test harness + WindowDoFnOperator<Long, Long, Long> windowDoFnOperator = getWindowDoFnOperator(); + KeyedOneInputStreamOperatorTestHarness< + ByteBuffer, WindowedValue<KeyedWorkItem<Long, Long>>, WindowedValue<KV<Long, Long>>> + testHarness = createTestHarness(windowDoFnOperator); + testHarness.open(); + + DoFnOperator<KeyedWorkItem<Long, Long>, KV<Long, Long>>.FlinkTimerInternals timerInternals = + windowDoFnOperator.timerInternals; + + // process elements + IntervalWindow window = new IntervalWindow(new Instant(0), Duration.millis(100)); + IntervalWindow window2 = new IntervalWindow(new Instant(100), Duration.millis(100)); + testHarness.processWatermark(0L); + testHarness.processElement( + Item.builder().key(1L).timestamp(1L).value(100L).window(window).build().toStreamRecord()); + testHarness.processElement( + Item.builder() + .key(1L) + .timestamp(150L) + .value(150L) + .window(window2) + .build() + .toStreamRecord()); + + assertThat(Iterables.size(timerInternals.pendingTimersById.keys()), is(2)); + + // close window + testHarness.processWatermark(200L); + + assertThat(Iterables.size(timerInternals.pendingTimersById.keys()), is(0)); + + // cleanup + testHarness.close(); + } + private WindowDoFnOperator<Long, Long, Long> getWindowDoFnOperator() { WindowingStrategy<Object, IntervalWindow> windowingStrategy = WindowingStrategy.of(FixedWindows.of(standardMinutes(1)));
With regards, Apache Git Services