[
https://issues.apache.org/jira/browse/BEAM-6751?focusedWorklogId=208921&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-208921
]
ASF GitHub Bot logged work on BEAM-6751:
----------------------------------------
Author: ASF GitHub Bot
Created on: 06/Mar/19 16:10
Start Date: 06/Mar/19 16:10
Worklog Time Spent: 10m
Work Description: mxm commented on pull request #7991: [BEAM-6751] Add
KafkaIO EOS support to Flink via @RequiresStableInput
URL: https://github.com/apache/beam/pull/7991#discussion_r263012970
##########
File path:
runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkRequiresStableInputTest.java
##########
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.util.Collections;
+import java.util.Date;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.RequiresStableInputIT;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.ResolveOptions;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.FileChecksumMatcher;
+import org.apache.beam.sdk.testing.SerializableMatchers;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.util.FilePatternMatchingShardedFile;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+/** Tests {@link org.apache.beam.sdk.transforms.DoFn.RequiresStableInput} with
Flink. */
+public class FlinkRequiresStableInputTest {
+
+ @ClassRule public static TemporaryFolder tempFolder = new TemporaryFolder();
+
+ private static CountDownLatch latch;
+
+ private static final String VALUE = "value";
+ // SHA-1 hash of string "value"
+ private static final String VALUE_CHECKSUM =
"f32b67c7e26342af42efabc674d441dca0a281c5";
+
+ private static transient MiniCluster flinkCluster;
+
+ @BeforeClass
+ public static void beforeClass() throws Exception {
+ final int parallelism = 1;
+
+ Configuration config = new Configuration();
+ // Avoid port collision in parallel tests
+ config.setInteger(RestOptions.PORT, 0);
+ config.setString(CheckpointingOptions.STATE_BACKEND, "filesystem");
+ // It is necessary to configure the checkpoint directory for the state
backend,
+ // even though we only create savepoints in this test.
+ config.setString(
+ CheckpointingOptions.CHECKPOINTS_DIRECTORY,
+ "file://" + tempFolder.getRoot().getAbsolutePath());
+ // Checkpoints will go into a subdirectory of this directory
+ config.setString(
+ CheckpointingOptions.SAVEPOINT_DIRECTORY,
+ "file://" + tempFolder.getRoot().getAbsolutePath());
+
+ MiniClusterConfiguration clusterConfig =
+ new MiniClusterConfiguration.Builder()
+ .setConfiguration(config)
+ .setNumTaskManagers(1)
+ .setNumSlotsPerTaskManager(1)
+ .build();
+
+ flinkCluster = new MiniCluster(clusterConfig);
+ flinkCluster.start();
+
+ TestStreamEnvironment.setAsContext(flinkCluster, parallelism);
+ }
+
+ @AfterClass
+ public static void afterClass() throws Exception {
+ TestStreamEnvironment.unsetAsContext();
+ flinkCluster.close();
+ flinkCluster = null;
+ }
+
+ /**
+ * Test for the support of {@link
org.apache.beam.sdk.transforms.DoFn.RequiresStableInput} in both
+ * {@link ParDo.SingleOutput} and {@link ParDo.MultiOutput}.
+ *
+ * <p>In each test, a singleton string value is paired with a random key. In
the following
+ * transform, the value is written to a file, whose path is specified by the
random key, and then
+ * the transform fails. When the pipeline retries, the latter transform
should receive the same
+ * input from the former transform, because its {@link DoFn} is annotated
with {@link
+ * org.apache.beam.sdk.transforms.DoFn.RequiresStableInput}, and it will not
fail due to presence
+ * of the file. Therefore, only one file for each transform is expected.
+ *
+ * <p>A Savepoint is taken until the desired state in the operators has been
reached. We then
+ * restore the savepoint to check if we produce impotent results.
+ */
+ @Test(timeout = 30_000)
+ public void testParDoRequiresStableInput() throws Exception {
+ FlinkPipelineOptions options =
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+ options.setParallelism(1);
+ // We only want to trigger external savepoints but we require
+ // checkpointing to be enabled for @RequiresStableInput
+ options.setCheckpointingInterval(Long.MAX_VALUE);
+ options.setRunner(FlinkRunner.class);
+ options.setStreaming(true);
+
+ ResourceId outputDir =
+ FileSystems.matchNewResource(tempFolder.getRoot().getAbsolutePath(),
true)
+ .resolve(
+ String.format("requires-stable-input-%tF-%<tH-%<tM-%<tS-%<tL",
new Date()),
+ ResolveOptions.StandardResolveOptions.RESOLVE_DIRECTORY);
+ String singleOutputPrefix =
+ outputDir
+ .resolve("pardo-single-output",
ResolveOptions.StandardResolveOptions.RESOLVE_DIRECTORY)
+ .resolve("key-",
ResolveOptions.StandardResolveOptions.RESOLVE_FILE)
+ .toString();
+ String multiOutputPrefix =
+ outputDir
+ .resolve("pardo-multi-output",
ResolveOptions.StandardResolveOptions.RESOLVE_DIRECTORY)
+ .resolve("key-",
ResolveOptions.StandardResolveOptions.RESOLVE_FILE)
+ .toString();
+
+ Pipeline p = createPipeline(options, singleOutputPrefix,
multiOutputPrefix);
+
+ // a latch used by the transforms to signal completion
+ latch = new CountDownLatch(2);
+ JobID jobID = executePipeline(p);
+ String savepointDir;
+ do {
+ // Take a savepoint (checkpoint) which will trigger releasing the
buffered elements
+ // and trigger the latch
+ savepointDir = takeSavepoint(jobID);
+ } while (!latch.await(100, TimeUnit.MILLISECONDS));
+ flinkCluster.cancelJob(jobID).get();
+
+ options.setShutdownSourcesOnFinalWatermark(true);
+ restoreFromSavepoint(p, savepointDir);
+ waitUntilJobIsDone();
+
+ assertThat(
+ new FlinkRunnerResult(Collections.emptyMap(), 1L),
+ SerializableMatchers.allOf(
+ new FileChecksumMatcher(
+ VALUE_CHECKSUM, new
FilePatternMatchingShardedFile(singleOutputPrefix + "*")),
+ new FileChecksumMatcher(
+ VALUE_CHECKSUM, new
FilePatternMatchingShardedFile(multiOutputPrefix + "*"))));
+ }
+
+ private JobGraph getJobGraph(Pipeline pipeline) {
+ FlinkRunner flinkRunner = FlinkRunner.fromOptions(pipeline.getOptions());
+ return flinkRunner.getJobGraph(pipeline);
+ }
+
+ private JobID executePipeline(Pipeline pipeline) throws Exception {
+ JobGraph jobGraph = getJobGraph(pipeline);
+ flinkCluster.submitJob(jobGraph).get();
+ return jobGraph.getJobID();
+ }
+
+ private String takeSavepoint(JobID jobID) throws Exception {
+ Exception exception = null;
+ // try multiple times because the job might not be ready yet
+ for (int i = 0; i < 10; i++) {
+ try {
+ return flinkCluster.triggerSavepoint(jobID, null, false).get();
+ } catch (Exception e) {
+ exception = e;
+ Thread.sleep(100);
+ }
+ }
+ throw exception;
+ }
+
+ private JobID restoreFromSavepoint(Pipeline pipeline, String savepointDir)
+ throws ExecutionException, InterruptedException {
+ JobGraph jobGraph = getJobGraph(pipeline);
+ SavepointRestoreSettings savepointSettings =
SavepointRestoreSettings.forPath(savepointDir);
+ jobGraph.setSavepointRestoreSettings(savepointSettings);
+ return flinkCluster.submitJob(jobGraph).get().getJobID();
+ }
+
+ private void waitUntilJobIsDone() throws InterruptedException,
ExecutionException {
+ while (flinkCluster.listJobs().get().stream()
+ .anyMatch(message -> message.getJobState() == JobStatus.RUNNING)) {
+ Thread.sleep(100);
+ }
+ }
+
+ private static Pipeline createPipeline(
+ PipelineOptions options, String singleOutputPrefix, String
multiOutputPrefix) {
+ Pipeline p = Pipeline.create(options);
+
+ SerializableFunction<Void, Void> firstTime =
+ (SerializableFunction<Void, Void>)
+ value -> {
+ latch.countDown();
+ return null;
+ };
+
+ PCollection<String> impulse = p.apply("CreatePCollectionOfOneValue",
Create.of(VALUE));
+ impulse
+ .apply(
+ "Single-PairWithRandomKey",
+ MapElements.via(new RequiresStableInputIT.PairWithRandomKeyFn()))
+ .apply(
+ "Single-MakeSideEffectAndThenFail",
+ ParDo.of(
+ new RequiresStableInputIT.MakeSideEffectAndThenFailFn(
Review comment:
At least reusing these
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 208921)
Time Spent: 4h 40m (was: 4.5h)
> KafkaIO's EOS mode does not work with FlinkRunner
> -------------------------------------------------
>
> Key: BEAM-6751
> URL: https://issues.apache.org/jira/browse/BEAM-6751
> Project: Beam
> Issue Type: Bug
> Components: io-java-kafka, runner-flink
> Reporter: Maximilian Michels
> Assignee: Maximilian Michels
> Priority: Major
> Fix For: 2.12.0
>
> Time Spent: 4h 40m
> Remaining Estimate: 0h
>
> KafkaIO has a validation check which whitelists certain runners capable of
> provide exactly-once semantics:
> {noformat}
> if ("org.apache.beam.runners.direct.DirectRunner".equals(runner)
> || runner.startsWith("org.apache.beam.runners.dataflow.")
> || runner.startsWith("org.apache.beam.runners.spark.") {
> ...
> {noformat}
> The Flink supports exactly-once checkpointing but the Flink Runner can't
> utilize it in the way KafkaIO intends it.
> I think we should remove the check in favor of checking for translation of
> {{@RequiresStableInput}}. Changes to KafkaIO might have to be made to support
> EOS efficiently with the Flink Runner.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)