[
https://issues.apache.org/jira/browse/BEAM-3727?focusedWorklogId=154324&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-154324
]
ASF GitHub Bot logged work on BEAM-3727:
----------------------------------------
Author: ASF GitHub Bot
Created on: 15/Oct/18 14:46
Start Date: 15/Oct/18 14:46
Worklog Time Spent: 10m
Work Description: mxm closed pull request #6690: [BEAM-3727] Add tests
for ImpulseSourceFunction
URL: https://github.com/apache/beam/pull/6690
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/ImpulseSourceFunction.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/ImpulseSourceFunction.java
index 420d90e749a..ced3cbd8b49 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/ImpulseSourceFunction.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/ImpulseSourceFunction.java
@@ -22,8 +22,9 @@
import org.apache.flink.streaming.api.functions.source.SourceFunction;
/**
- * Source function which sends an impulse to a downstream operator. It may
keep the source alive
- * although its work is already done. It will only shutdown when requested by
the JobManager.
+ * Source function which sends a single global impulse to a downstream
operator. It may keep the
+ * source alive although its work is already done. It will only shutdown when
the streaming job is
+ * cancelled.
*/
public class ImpulseSourceFunction implements
SourceFunction<WindowedValue<byte[]>> {
diff --git
a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/functions/ImpulseSourceFunctionTest.java
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/functions/ImpulseSourceFunctionTest.java
new file mode 100644
index 00000000000..3a6ccdc1d57
--- /dev/null
+++
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/functions/ImpulseSourceFunctionTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.flink.translation.functions;
+
+import static org.hamcrest.core.Is.is;
+import static org.hamcrest.core.IsInstanceOf.instanceOf;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Mockito.verify;
+
+import java.util.Arrays;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.mockito.ArgumentMatcher;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Tests for {@link ImpulseSourceFunction}. */
+public class ImpulseSourceFunctionTest {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ImpulseSourceFunctionTest.class);
+
+ @Rule public TestName testName = new TestName();
+
+ private final SourceFunction.SourceContext<WindowedValue<byte[]>>
sourceContext;
+ private final ImpulseElementMatcher elementMatcher = new
ImpulseElementMatcher();
+
+ public ImpulseSourceFunctionTest() {
+ this.sourceContext = Mockito.mock(SourceFunction.SourceContext.class);
+ }
+
+ @Test
+ public void testInstanceOfSourceFunction() {
+ // should be a non-parallel source function
+ assertThat(new ImpulseSourceFunction(false),
instanceOf(SourceFunction.class));
+ }
+
+ @Test(timeout = 10_000)
+ public void testImpulse() throws Exception {
+ ImpulseSourceFunction source = new ImpulseSourceFunction(false);
+ source.run(sourceContext);
+ // should finish
+ verify(sourceContext).collect(argThat(elementMatcher));
+ }
+
+ @Test(timeout = 10_000)
+ public void testKeepAlive() throws Exception {
+ ImpulseSourceFunction source = new ImpulseSourceFunction(true);
+ Thread sourceThread =
+ new Thread(
+ () -> {
+ try {
+ source.run(sourceContext);
+ // should not finish
+ } catch (Exception e) {
+ LOG.error("Exception while executing ImpulseSourceFunction",
e);
+ }
+ });
+ try {
+ sourceThread.start();
+ source.cancel();
+ // should finish
+ sourceThread.join();
+ } finally {
+ sourceThread.interrupt();
+ sourceThread.join();
+ }
+ verify(sourceContext).collect(argThat(elementMatcher));
+ }
+
+ @Test(timeout = 10_000)
+ public void testKeepAliveDuringInterrupt() throws Exception {
+ ImpulseSourceFunction source = new ImpulseSourceFunction(true);
+ Thread sourceThread =
+ new Thread(
+ () -> {
+ try {
+ source.run(sourceContext);
+ // should not finish
+ } catch (Exception e) {
+ LOG.error("Exception while executing ImpulseSourceFunction",
e);
+ }
+ });
+
+ sourceThread.start();
+ sourceThread.interrupt();
+ Thread.sleep(200);
+ assertThat(sourceThread.isAlive(), is(true));
+ // should quit
+ source.cancel();
+ sourceThread.interrupt();
+ sourceThread.join();
+ verify(sourceContext).collect(argThat(elementMatcher));
+ }
+
+ private static class ImpulseElementMatcher extends
ArgumentMatcher<WindowedValue<byte[]>> {
+
+ @Override
+ public boolean matches(Object o) {
+ return o instanceof WindowedValue
+ && Arrays.equals((byte[]) ((WindowedValue) o).getValue(), new byte[]
{});
+ }
+ }
+}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 154324)
Time Spent: 20m (was: 10m)
> Never shutdown sources in Flink Streaming execution mode
> --------------------------------------------------------
>
> Key: BEAM-3727
> URL: https://issues.apache.org/jira/browse/BEAM-3727
> Project: Beam
> Issue Type: Bug
> Components: runner-flink
> Reporter: Aljoscha Krettek
> Assignee: Maximilian Michels
> Priority: Major
> Fix For: 2.8.0
>
> Time Spent: 20m
> Remaining Estimate: 0h
>
> Because of FLINK-2491 checkpointing will not work for a streaming pipeline as
> soon as at least one source has finished. With the move to SDF IOs and the
> impulse primitive this is very problematic because essentially all jobs
> running in Flink Streaming mode would run without checkpointing.
> We should, by default, never stop sources in streaming mode but add an option
> that allows doing so. Some people might want this and we also need it to make
> sure that the {{@ValidatesRunner}} tests for Flink in streaming mode finish.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)