This is an automated email from the ASF dual-hosted git repository. mxm pushed a commit to branch release-2.10.0 in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/release-2.10.0 by this push: new 8ab9b55 [BEAM-6440] Fix leakage of timer de-duplication map new 7807606 Merge pull request #7530: [BEAM-6440] Fix leakage of timer de-duplication map 8ab9b55 is described below commit 8ab9b557f15030ff905986036a2364f5dba5c8a2 Author: Maximilian Michels <m...@apache.org> AuthorDate: Tue Jan 15 12:56:24 2019 -0500 [BEAM-6440] Fix leakage of timer de-duplication map The FlinkStateInternals use a keyed map of pending timers to make up for Flink's limitation to only be able to delete timers with their original timestamp, not via timer id. The Map leaked memory because subclasses of DoFnOperator overwrote `fireTimer` which was responsible for performing cleanup in the map upon firing a timer. --- .../wrappers/streaming/DoFnOperator.java | 5 ++- .../wrappers}/streaming/DedupingOperatorTest.java | 4 +- .../wrappers}/streaming/DoFnOperatorTest.java | 5 +-- .../wrappers}/streaming/StreamRecordStripper.java | 2 +- .../streaming/WindowDoFnOperatorTest.java | 48 +++++++++++++++++++--- 5 files changed, 50 insertions(+), 14 deletions(-) diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java index dcd0f0b..647d6cc 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -698,12 +698,14 @@ public class DoFnOperator<InputT, OutputT> extends AbstractStreamOperator<Window public void onEventTime(InternalTimer<Object, TimerData> timer) throws Exception { // We don't have to cal checkInvokeStartBundle() because it's already called in // processWatermark*(). + timerInternals.cleanupPendingTimer(timer.getNamespace()); fireTimer(timer); } @Override public void onProcessingTime(InternalTimer<Object, TimerData> timer) throws Exception { checkInvokeStartBundle(); + timerInternals.cleanupPendingTimer(timer.getNamespace()); fireTimer(timer); } @@ -714,7 +716,6 @@ public class DoFnOperator<InputT, OutputT> extends AbstractStreamOperator<Window // This is a user timer, so namespace must be WindowNamespace checkArgument(namespace instanceof WindowNamespace); BoundedWindow window = ((WindowNamespace) namespace).getWindow(); - timerInternals.cleanupPendingTimer(timerData); pushbackDoFnRunner.onTimer( timerData.getTimerId(), window, timerData.getTimestamp(), timerData.getDomain()); } @@ -927,7 +928,7 @@ public class DoFnOperator<InputT, OutputT> extends AbstractStreamOperator<Window * namespace of the timer and the timer's id. Necessary for supporting removal of existing * timers. In Flink removal of timers can only be done by providing id and time of the timer. */ - private final MapState<String, TimerData> pendingTimersById; + final MapState<String, TimerData> pendingTimersById; private FlinkTimerInternals() { MapStateDescriptor<String, TimerData> pendingTimersByIdStateDescriptor = diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DedupingOperatorTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DedupingOperatorTest.java similarity index 95% rename from runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DedupingOperatorTest.java rename to runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DedupingOperatorTest.java index a6fa3db..3a2c4a3 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DedupingOperatorTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DedupingOperatorTest.java @@ -15,9 +15,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.runners.flink.streaming; +package org.apache.beam.runners.flink.translation.wrappers.streaming; -import static org.apache.beam.runners.flink.streaming.StreamRecordStripper.stripStreamRecordFromWindowedValue; +import static org.apache.beam.runners.flink.translation.wrappers.streaming.StreamRecordStripper.stripStreamRecordFromWindowedValue; import static org.hamcrest.collection.IsIterableContainingInOrder.contains; import static org.junit.Assert.assertThat; diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java similarity index 99% rename from runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java rename to runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java index 416595c..ed1630c 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java @@ -15,9 +15,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.runners.flink.streaming; +package org.apache.beam.runners.flink.translation.wrappers.streaming; -import static org.apache.beam.runners.flink.streaming.StreamRecordStripper.stripStreamRecordFromWindowedValue; +import static org.apache.beam.runners.flink.translation.wrappers.streaming.StreamRecordStripper.stripStreamRecordFromWindowedValue; import static org.hamcrest.Matchers.emptyIterable; import static org.hamcrest.collection.IsIterableContainingInOrder.contains; import static org.junit.Assert.assertEquals; @@ -31,7 +31,6 @@ import org.apache.beam.runners.core.StatefulDoFnRunner; import org.apache.beam.runners.flink.FlinkPipelineOptions; import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; import org.apache.beam.runners.flink.translation.types.CoderTypeSerializer; -import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/StreamRecordStripper.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/StreamRecordStripper.java similarity index 96% rename from runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/StreamRecordStripper.java rename to runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/StreamRecordStripper.java index 26a86a4..c8c7b24 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/StreamRecordStripper.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/StreamRecordStripper.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.runners.flink.streaming; +package org.apache.beam.runners.flink.translation.wrappers.streaming; import javax.annotation.Nullable; import org.apache.beam.sdk.util.WindowedValue; diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/WindowDoFnOperatorTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperatorTest.java similarity index 83% rename from runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/WindowDoFnOperatorTest.java rename to runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperatorTest.java index 56a056f..91114cc 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/WindowDoFnOperatorTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperatorTest.java @@ -15,17 +15,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.runners.flink.streaming; +package org.apache.beam.runners.flink.translation.wrappers.streaming; import static java.util.Collections.emptyList; import static java.util.Collections.emptyMap; -import static org.apache.beam.runners.flink.streaming.StreamRecordStripper.stripStreamRecordFromWindowedValue; +import static org.apache.beam.runners.flink.translation.wrappers.streaming.StreamRecordStripper.stripStreamRecordFromWindowedValue; import static org.apache.beam.sdk.transforms.windowing.PaneInfo.NO_FIRING; import static org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing.ON_TIME; +import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.core.Is.is; import static org.joda.time.Duration.standardMinutes; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThat; import java.io.ByteArrayOutputStream; import java.nio.ByteBuffer; @@ -33,9 +34,6 @@ import org.apache.beam.runners.core.KeyedWorkItem; import org.apache.beam.runners.core.SystemReduceFn; import org.apache.beam.runners.flink.FlinkPipelineOptions; import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.MultiOutputOutputManagerFactory; -import org.apache.beam.runners.flink.translation.wrappers.streaming.SingletonKeyedWorkItem; -import org.apache.beam.runners.flink.translation.wrappers.streaming.SingletonKeyedWorkItemCoder; -import org.apache.beam.runners.flink.translation.wrappers.streaming.WindowDoFnOperator; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderRegistry; import org.apache.beam.sdk.coders.KvCoder; @@ -119,6 +117,44 @@ public class WindowDoFnOperatorTest { testHarness.close(); } + @Test + public void testTimerCleanupOfPendingTimerList() throws Exception { + // test harness + WindowDoFnOperator<Long, Long, Long> windowDoFnOperator = getWindowDoFnOperator(); + KeyedOneInputStreamOperatorTestHarness< + ByteBuffer, WindowedValue<KeyedWorkItem<Long, Long>>, WindowedValue<KV<Long, Long>>> + testHarness = createTestHarness(windowDoFnOperator); + testHarness.open(); + + DoFnOperator<KeyedWorkItem<Long, Long>, KV<Long, Long>>.FlinkTimerInternals timerInternals = + windowDoFnOperator.timerInternals; + + // process elements + IntervalWindow window = new IntervalWindow(new Instant(0), Duration.millis(100)); + IntervalWindow window2 = new IntervalWindow(new Instant(100), Duration.millis(100)); + testHarness.processWatermark(0L); + testHarness.processElement( + Item.builder().key(1L).timestamp(1L).value(100L).window(window).build().toStreamRecord()); + testHarness.processElement( + Item.builder() + .key(1L) + .timestamp(150L) + .value(150L) + .window(window2) + .build() + .toStreamRecord()); + + assertThat(Iterables.size(timerInternals.pendingTimersById.keys()), is(2)); + + // close window + testHarness.processWatermark(200L); + + assertThat(Iterables.size(timerInternals.pendingTimersById.keys()), is(0)); + + // cleanup + testHarness.close(); + } + private WindowDoFnOperator<Long, Long, Long> getWindowDoFnOperator() { WindowingStrategy<Object, IntervalWindow> windowingStrategy = WindowingStrategy.of(FixedWindows.of(standardMinutes(1)));