This is an automated email from the ASF dual-hosted git repository.

mxm pushed a commit to branch release-2.10.0
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/release-2.10.0 by this push:
     new 8ab9b55  [BEAM-6440] Fix leakage of timer de-duplication map
     new 7807606  Merge pull request #7530: [BEAM-6440] Fix leakage of timer 
de-duplication map
8ab9b55 is described below

commit 8ab9b557f15030ff905986036a2364f5dba5c8a2
Author: Maximilian Michels <m...@apache.org>
AuthorDate: Tue Jan 15 12:56:24 2019 -0500

    [BEAM-6440] Fix leakage of timer de-duplication map
    
    The FlinkStateInternals use a keyed map of pending timers to make up for 
Flink's
    limitation to only be able to delete timers with their original timestamp, 
not
    via timer id.
    
    The Map leaked memory because subclasses of DoFnOperator overwrote 
`fireTimer`
    which was responsible for performing cleanup in the map upon firing a timer.
---
 .../wrappers/streaming/DoFnOperator.java           |  5 ++-
 .../wrappers}/streaming/DedupingOperatorTest.java  |  4 +-
 .../wrappers}/streaming/DoFnOperatorTest.java      |  5 +--
 .../wrappers}/streaming/StreamRecordStripper.java  |  2 +-
 .../streaming/WindowDoFnOperatorTest.java          | 48 +++++++++++++++++++---
 5 files changed, 50 insertions(+), 14 deletions(-)

diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index dcd0f0b..647d6cc 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -698,12 +698,14 @@ public class DoFnOperator<InputT, OutputT> extends 
AbstractStreamOperator<Window
   public void onEventTime(InternalTimer<Object, TimerData> timer) throws 
Exception {
     // We don't have to cal checkInvokeStartBundle() because it's already 
called in
     // processWatermark*().
+    timerInternals.cleanupPendingTimer(timer.getNamespace());
     fireTimer(timer);
   }
 
   @Override
   public void onProcessingTime(InternalTimer<Object, TimerData> timer) throws 
Exception {
     checkInvokeStartBundle();
+    timerInternals.cleanupPendingTimer(timer.getNamespace());
     fireTimer(timer);
   }
 
@@ -714,7 +716,6 @@ public class DoFnOperator<InputT, OutputT> extends 
AbstractStreamOperator<Window
     // This is a user timer, so namespace must be WindowNamespace
     checkArgument(namespace instanceof WindowNamespace);
     BoundedWindow window = ((WindowNamespace) namespace).getWindow();
-    timerInternals.cleanupPendingTimer(timerData);
     pushbackDoFnRunner.onTimer(
         timerData.getTimerId(), window, timerData.getTimestamp(), 
timerData.getDomain());
   }
@@ -927,7 +928,7 @@ public class DoFnOperator<InputT, OutputT> extends 
AbstractStreamOperator<Window
      * namespace of the timer and the timer's id. Necessary for supporting 
removal of existing
      * timers. In Flink removal of timers can only be done by providing id and 
time of the timer.
      */
-    private final MapState<String, TimerData> pendingTimersById;
+    final MapState<String, TimerData> pendingTimersById;
 
     private FlinkTimerInternals() {
       MapStateDescriptor<String, TimerData> pendingTimersByIdStateDescriptor =
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DedupingOperatorTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DedupingOperatorTest.java
similarity index 95%
rename from 
runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DedupingOperatorTest.java
rename to 
runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DedupingOperatorTest.java
index a6fa3db..3a2c4a3 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DedupingOperatorTest.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DedupingOperatorTest.java
@@ -15,9 +15,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.runners.flink.streaming;
+package org.apache.beam.runners.flink.translation.wrappers.streaming;
 
-import static 
org.apache.beam.runners.flink.streaming.StreamRecordStripper.stripStreamRecordFromWindowedValue;
+import static 
org.apache.beam.runners.flink.translation.wrappers.streaming.StreamRecordStripper.stripStreamRecordFromWindowedValue;
 import static org.hamcrest.collection.IsIterableContainingInOrder.contains;
 import static org.junit.Assert.assertThat;
 
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java
similarity index 99%
rename from 
runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
rename to 
runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java
index 416595c..ed1630c 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java
@@ -15,9 +15,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.runners.flink.streaming;
+package org.apache.beam.runners.flink.translation.wrappers.streaming;
 
-import static 
org.apache.beam.runners.flink.streaming.StreamRecordStripper.stripStreamRecordFromWindowedValue;
+import static 
org.apache.beam.runners.flink.translation.wrappers.streaming.StreamRecordStripper.stripStreamRecordFromWindowedValue;
 import static org.hamcrest.Matchers.emptyIterable;
 import static org.hamcrest.collection.IsIterableContainingInOrder.contains;
 import static org.junit.Assert.assertEquals;
@@ -31,7 +31,6 @@ import org.apache.beam.runners.core.StatefulDoFnRunner;
 import org.apache.beam.runners.flink.FlinkPipelineOptions;
 import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
 import org.apache.beam.runners.flink.translation.types.CoderTypeSerializer;
-import 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/StreamRecordStripper.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/StreamRecordStripper.java
similarity index 96%
rename from 
runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/StreamRecordStripper.java
rename to 
runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/StreamRecordStripper.java
index 26a86a4..c8c7b24 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/StreamRecordStripper.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/StreamRecordStripper.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.runners.flink.streaming;
+package org.apache.beam.runners.flink.translation.wrappers.streaming;
 
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.util.WindowedValue;
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/WindowDoFnOperatorTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperatorTest.java
similarity index 83%
rename from 
runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/WindowDoFnOperatorTest.java
rename to 
runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperatorTest.java
index 56a056f..91114cc 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/WindowDoFnOperatorTest.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperatorTest.java
@@ -15,17 +15,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.runners.flink.streaming;
+package org.apache.beam.runners.flink.translation.wrappers.streaming;
 
 import static java.util.Collections.emptyList;
 import static java.util.Collections.emptyMap;
-import static 
org.apache.beam.runners.flink.streaming.StreamRecordStripper.stripStreamRecordFromWindowedValue;
+import static 
org.apache.beam.runners.flink.translation.wrappers.streaming.StreamRecordStripper.stripStreamRecordFromWindowedValue;
 import static org.apache.beam.sdk.transforms.windowing.PaneInfo.NO_FIRING;
 import static org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing.ON_TIME;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.core.Is.is;
 import static org.joda.time.Duration.standardMinutes;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
 
 import java.io.ByteArrayOutputStream;
 import java.nio.ByteBuffer;
@@ -33,9 +34,6 @@ import org.apache.beam.runners.core.KeyedWorkItem;
 import org.apache.beam.runners.core.SystemReduceFn;
 import org.apache.beam.runners.flink.FlinkPipelineOptions;
 import 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.MultiOutputOutputManagerFactory;
-import 
org.apache.beam.runners.flink.translation.wrappers.streaming.SingletonKeyedWorkItem;
-import 
org.apache.beam.runners.flink.translation.wrappers.streaming.SingletonKeyedWorkItemCoder;
-import 
org.apache.beam.runners.flink.translation.wrappers.streaming.WindowDoFnOperator;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.coders.KvCoder;
@@ -119,6 +117,44 @@ public class WindowDoFnOperatorTest {
     testHarness.close();
   }
 
+  @Test
+  public void testTimerCleanupOfPendingTimerList() throws Exception {
+    // test harness
+    WindowDoFnOperator<Long, Long, Long> windowDoFnOperator = 
getWindowDoFnOperator();
+    KeyedOneInputStreamOperatorTestHarness<
+            ByteBuffer, WindowedValue<KeyedWorkItem<Long, Long>>, 
WindowedValue<KV<Long, Long>>>
+        testHarness = createTestHarness(windowDoFnOperator);
+    testHarness.open();
+
+    DoFnOperator<KeyedWorkItem<Long, Long>, KV<Long, 
Long>>.FlinkTimerInternals timerInternals =
+        windowDoFnOperator.timerInternals;
+
+    // process elements
+    IntervalWindow window = new IntervalWindow(new Instant(0), 
Duration.millis(100));
+    IntervalWindow window2 = new IntervalWindow(new Instant(100), 
Duration.millis(100));
+    testHarness.processWatermark(0L);
+    testHarness.processElement(
+        
Item.builder().key(1L).timestamp(1L).value(100L).window(window).build().toStreamRecord());
+    testHarness.processElement(
+        Item.builder()
+            .key(1L)
+            .timestamp(150L)
+            .value(150L)
+            .window(window2)
+            .build()
+            .toStreamRecord());
+
+    assertThat(Iterables.size(timerInternals.pendingTimersById.keys()), is(2));
+
+    // close window
+    testHarness.processWatermark(200L);
+
+    assertThat(Iterables.size(timerInternals.pendingTimersById.keys()), is(0));
+
+    // cleanup
+    testHarness.close();
+  }
+
   private WindowDoFnOperator<Long, Long, Long> getWindowDoFnOperator() {
     WindowingStrategy<Object, IntervalWindow> windowingStrategy =
         WindowingStrategy.of(FixedWindows.of(standardMinutes(1)));

Reply via email to