This is an automated email from the ASF dual-hosted git repository. aljoscha pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
commit 6148a6d063a0503ee435ab5084fcba3fb864b26f Author: Aljoscha Krettek <aljoscha.kret...@gmail.com> AuthorDate: Thu Feb 22 14:26:16 2018 +0100 [BEAM-3727] Never shutdown sources in Flink Streaming execution mode This adds an option that controls whether to shutdown sources or not in case of reaching the +Inf watermark. The reason for this is https://issues.apache.org/jira/browse/FLINK-2491, which causes checkpointing to stop once some source is shut down. --- .../beam/runners/flink/FlinkPipelineOptions.java | 12 +++++ .../apache/beam/runners/flink/TestFlinkRunner.java | 1 + .../streaming/io/BoundedSourceWrapper.java | 28 +++++++++++ .../streaming/io/UnboundedSourceWrapper.java | 56 ++++++++++++---------- 4 files changed, 71 insertions(+), 26 deletions(-) diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java index 01f7847..b2cbefb 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java @@ -137,4 +137,16 @@ public interface FlinkPipelineOptions Long getMaxBundleTimeMills(); void setMaxBundleTimeMills(Long time); + /** + * Whether to shutdown sources when their watermark reaches {@code +Inf}. For production use + * cases you want this to be disabled because Flink will currently (versions {@literal <=} 1.5) + * stop doing checkpoints when any operator (which includes sources) is finished. + * + * <p>Please see <a href="https://issues.apache.org/jira/browse/FLINK-2491">FLINK-2491</a> for + * progress on this issue. + */ + @Description("If set, shutdown sources when their watermark reaches +Inf.") + @Default.Boolean(false) + Boolean isShutdownSourcesOnFinalWatermark(); + void setShutdownSourcesOnFinalWatermark(Boolean shutdownOnFinalWatermark); } diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java index 01b67e5..47d4494 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java @@ -36,6 +36,7 @@ public class TestFlinkRunner extends PipelineRunner<PipelineResult> { private TestFlinkRunner(FlinkPipelineOptions options) { // We use [auto] for testing since this will make it pick up the Testing ExecutionEnvironment options.setFlinkMaster("[auto]"); + options.setShutdownSourcesOnFinalWatermark(true); this.delegate = FlinkRunner.fromOptions(options); } diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java index 5ddc46f..6db5426 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java @@ -21,6 +21,7 @@ import com.google.common.annotations.VisibleForTesting; import java.util.ArrayList; import java.util.List; import org.apache.beam.runners.core.construction.SerializablePipelineOptions; +import org.apache.beam.runners.flink.FlinkPipelineOptions; import org.apache.beam.runners.flink.metrics.FlinkMetricContainer; import org.apache.beam.runners.flink.metrics.ReaderInvocationUtil; import org.apache.beam.sdk.io.BoundedSource; @@ -180,6 +181,33 @@ public class BoundedSourceWrapper<OutputT> // emit final Long.MAX_VALUE watermark, just to be sure ctx.emitWatermark(new Watermark(Long.MAX_VALUE)); + + FlinkPipelineOptions options = serializedOptions.get().as(FlinkPipelineOptions.class); + if (!options.isShutdownSourcesOnFinalWatermark()) { + // do nothing, but still look busy ... + // we can't return here since Flink requires that all operators stay up, + // otherwise checkpointing would not work correctly anymore + // + // See https://issues.apache.org/jira/browse/FLINK-2491 for progress on this issue + + // wait until this is canceled + final Object waitLock = new Object(); + while (isRunning) { + try { + // Flink will interrupt us at some point + //noinspection SynchronizationOnLocalVariableOrMethodParameter + synchronized (waitLock) { + // don't wait indefinitely, in case something goes horribly wrong + waitLock.wait(1000); + } + } catch (InterruptedException e) { + if (!isRunning) { + // restore the interrupted state, and fall through the loop + Thread.currentThread().interrupt(); + } + } + } + } } /** diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java index 817dd74..fc23c01 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java @@ -23,6 +23,7 @@ import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import org.apache.beam.runners.core.construction.SerializablePipelineOptions; +import org.apache.beam.runners.flink.FlinkPipelineOptions; import org.apache.beam.runners.flink.metrics.FlinkMetricContainer; import org.apache.beam.runners.flink.metrics.ReaderInvocationUtil; import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; @@ -224,32 +225,7 @@ public class UnboundedSourceWrapper< serializedOptions.get(), metricContainer); - if (localReaders.size() == 0) { - // do nothing, but still look busy ... - // also, output a Long.MAX_VALUE watermark since we know that we're not - // going to emit anything - // we can't return here since Flink requires that all operators stay up, - // otherwise checkpointing would not work correctly anymore - ctx.emitWatermark(new Watermark(Long.MAX_VALUE)); - - // wait until this is canceled - final Object waitLock = new Object(); - while (isRunning) { - try { - // Flink will interrupt us at some point - //noinspection SynchronizationOnLocalVariableOrMethodParameter - synchronized (waitLock) { - // don't wait indefinitely, in case something goes horribly wrong - waitLock.wait(1000); - } - } catch (InterruptedException e) { - if (!isRunning) { - // restore the interrupted state, and fall through the loop - Thread.currentThread().interrupt(); - } - } - } - } else if (localReaders.size() == 1) { + if (localReaders.size() == 1) { // the easy case, we just read from one reader UnboundedSource.UnboundedReader<OutputT> reader = localReaders.get(0); @@ -305,7 +281,35 @@ public class UnboundedSourceWrapper< hadData = false; } } + } + ctx.emitWatermark(new Watermark(Long.MAX_VALUE)); + + FlinkPipelineOptions options = serializedOptions.get().as(FlinkPipelineOptions.class); + if (!options.isShutdownSourcesOnFinalWatermark()) { + // do nothing, but still look busy ... + // we can't return here since Flink requires that all operators stay up, + // otherwise checkpointing would not work correctly anymore + // + // See https://issues.apache.org/jira/browse/FLINK-2491 for progress on this issue + + // wait until this is canceled + final Object waitLock = new Object(); + while (isRunning) { + try { + // Flink will interrupt us at some point + //noinspection SynchronizationOnLocalVariableOrMethodParameter + synchronized (waitLock) { + // don't wait indefinitely, in case something goes horribly wrong + waitLock.wait(1000); + } + } catch (InterruptedException e) { + if (!isRunning) { + // restore the interrupted state, and fall through the loop + Thread.currentThread().interrupt(); + } + } + } } } -- To stop receiving notification emails like this one, please contact aljos...@apache.org.