Repository: incubator-beam Updated Branches: refs/heads/master 8390a2212 -> 1283308e2
Remove BlockingDataflowRunner Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4c0fab0b Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4c0fab0b Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4c0fab0b Branch: refs/heads/master Commit: 4c0fab0b3a36d184c6d1fe060d60dd9b6678daf1 Parents: 8390a22 Author: Pei He <pe...@google.com> Authored: Fri Jul 29 15:40:07 2016 -0700 Committer: Davor Bonaci <da...@google.com> Committed: Tue Nov 1 12:52:09 2016 -0700 ---------------------------------------------------------------------- .../beam/examples/common/ExampleUtils.java | 3 +- .../dataflow/BlockingDataflowRunner.java | 170 ----------- .../dataflow/DataflowJobCancelledException.java | 39 --- .../dataflow/DataflowJobExecutionException.java | 35 --- .../dataflow/DataflowJobUpdatedException.java | 51 ---- .../runners/dataflow/DataflowPipelineJob.java | 43 ++- .../dataflow/DataflowPipelineRegistrar.java | 11 +- .../BlockingDataflowPipelineOptions.java | 28 -- .../testing/TestDataflowPipelineOptions.java | 6 +- .../dataflow/testing/TestDataflowRunner.java | 7 +- .../dataflow/BlockingDataflowRunnerTest.java | 300 ------------------- .../dataflow/DataflowPipelineJobTest.java | 30 +- .../dataflow/DataflowPipelineRegistrarTest.java | 7 +- .../apache/beam/sdk/transforms/Aggregator.java | 9 +- 14 files changed, 80 insertions(+), 659 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4c0fab0b/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java b/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java index 1209a67..6962571 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java +++ b/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java @@ -272,8 +272,7 @@ public class ExampleUtils { } /** - * If {@literal DataflowRunner} or {@literal BlockingDataflowRunner} is used, - * waits for the pipeline to finish and cancels it (and the injector) before the program exists. + * Waits for the pipeline to finish and cancels it before the program exists. */ public void waitToFinish(PipelineResult result) { pipelinesToCancel.add(result); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4c0fab0b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BlockingDataflowRunner.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BlockingDataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BlockingDataflowRunner.java deleted file mode 100644 index 5285ade..0000000 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BlockingDataflowRunner.java +++ /dev/null @@ -1,170 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.dataflow; - -import javax.annotation.Nullable; -import org.apache.beam.runners.dataflow.options.BlockingDataflowPipelineOptions; -import org.apache.beam.runners.dataflow.util.MonitoringUtil; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.PipelineResult.State; -import org.apache.beam.sdk.annotations.Experimental; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsValidator; -import org.apache.beam.sdk.runners.PipelineRunner; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.values.PInput; -import org.apache.beam.sdk.values.POutput; -import org.joda.time.Duration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * A {@link PipelineRunner} that's like {@link DataflowRunner} - * but that waits for the launched job to finish. - * - * <p>Logs job status updates and console messages while it waits. - * - * <p>Returns the final job state, or throws an exception if the job - * fails or cannot be monitored. - * - * <p><h3>Permissions</h3> - * When reading from a Dataflow source or writing to a Dataflow sink using - * {@code BlockingDataflowRunner}, the Google cloud services account and the Google compute - * engine service account of the GCP project running the Dataflow Job will need access to the - * corresponding source/sink. - * - * <p>Please see <a href="https://cloud.google.com/dataflow/security-and-permissions">Google Cloud - * Dataflow Security and Permissions</a> for more details. - */ -public class BlockingDataflowRunner extends - PipelineRunner<DataflowPipelineJob> { - private static final Logger LOG = LoggerFactory.getLogger(BlockingDataflowRunner.class); - - // Defaults to an infinite wait period. - // TODO: make this configurable after removal of option map. - private static final long BUILTIN_JOB_TIMEOUT_SEC = -1L; - - private final DataflowRunner dataflowRunner; - private final BlockingDataflowPipelineOptions options; - - protected BlockingDataflowRunner( - DataflowRunner internalRunner, - BlockingDataflowPipelineOptions options) { - this.dataflowRunner = internalRunner; - this.options = options; - } - - /** - * Constructs a runner from the provided options. - */ - public static BlockingDataflowRunner fromOptions( - PipelineOptions options) { - BlockingDataflowPipelineOptions dataflowOptions = - PipelineOptionsValidator.validate(BlockingDataflowPipelineOptions.class, options); - DataflowRunner dataflowRunner = - DataflowRunner.fromOptions(dataflowOptions); - - return new BlockingDataflowRunner(dataflowRunner, dataflowOptions); - } - - /** - * {@inheritDoc} - * - * @throws DataflowJobExecutionException if there is an exception during job execution. - * @throws DataflowServiceException if there is an exception retrieving information about the job. - */ - @Override - public DataflowPipelineJob run(Pipeline p) { - final DataflowPipelineJob job = dataflowRunner.run(p); - - // We ignore the potential race condition here (Ctrl-C after job submission but before the - // shutdown hook is registered). Even if we tried to do something smarter (eg., SettableFuture) - // the run method (which produces the job) could fail or be Ctrl-C'd before it had returned a - // job. The display of the command to cancel the job is best-effort anyways -- RPC's could fail, - // etc. If the user wants to verify the job was cancelled they should look at the job status. - Thread shutdownHook = new Thread() { - @Override - public void run() { - LOG.warn("Job is already running in Google Cloud Platform, Ctrl-C will not cancel it.\n" - + "To cancel the job in the cloud, run:\n> {}", - MonitoringUtil.getGcloudCancelCommand(options, job.getJobId())); - } - }; - - try { - Runtime.getRuntime().addShutdownHook(shutdownHook); - - @Nullable - State result = job.waitUntilFinish(Duration.standardSeconds(BUILTIN_JOB_TIMEOUT_SEC)); - - if (result == null) { - throw new DataflowServiceException( - job, "Timed out while retrieving status for job " + job.getJobId()); - } - - LOG.info("Job finished with status {}", result); - if (!result.isTerminal()) { - throw new IllegalStateException("Expected terminal state for job " + job.getJobId() - + ", got " + result); - } - - if (result == State.DONE) { - return job; - } else if (result == State.UPDATED) { - DataflowPipelineJob newJob = job.getReplacedByJob(); - LOG.info("Job {} has been updated and is running as the new job with id {}." - + "To access the updated job on the Dataflow monitoring console, please navigate to {}", - job.getJobId(), - newJob.getJobId(), - MonitoringUtil.getJobMonitoringPageURL(newJob.getProjectId(), newJob.getJobId())); - throw new DataflowJobUpdatedException( - job, - String.format("Job %s updated; new job is %s.", job.getJobId(), newJob.getJobId()), - newJob); - } else if (result == State.CANCELLED) { - String message = String.format("Job %s cancelled by user", job.getJobId()); - LOG.info(message); - throw new DataflowJobCancelledException(job, message); - } else { - throw new DataflowJobExecutionException(job, "Job " + job.getJobId() - + " failed with status " + result); - } - } finally { - Runtime.getRuntime().removeShutdownHook(shutdownHook); - } - } - - @Override - public <OutputT extends POutput, InputT extends PInput> OutputT apply( - PTransform<InputT, OutputT> transform, InputT input) { - return dataflowRunner.apply(transform, input); - } - - /** - * Sets callbacks to invoke during execution. See {@link DataflowRunnerHooks}. - */ - @Experimental - public void setHooks(DataflowRunnerHooks hooks) { - this.dataflowRunner.setHooks(hooks); - } - - @Override - public String toString() { - return "BlockingDataflowRunner#" + options.getJobName(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4c0fab0b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowJobCancelledException.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowJobCancelledException.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowJobCancelledException.java deleted file mode 100644 index e2edb6a..0000000 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowJobCancelledException.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.dataflow; - -/** - * Signals that a job run by a {@link BlockingDataflowRunner} was updated during execution. - */ -public class DataflowJobCancelledException extends DataflowJobException { - /** - * Create a new {@code DataflowJobAlreadyUpdatedException} with the specified {@link - * DataflowPipelineJob} and message. - */ - public DataflowJobCancelledException(DataflowPipelineJob job, String message) { - super(job, message, null); - } - - /** - * Create a new {@code DataflowJobAlreadyUpdatedException} with the specified {@link - * DataflowPipelineJob}, message, and cause. - */ - public DataflowJobCancelledException(DataflowPipelineJob job, String message, Throwable cause) { - super(job, message, cause); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4c0fab0b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowJobExecutionException.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowJobExecutionException.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowJobExecutionException.java deleted file mode 100644 index ccf8057..0000000 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowJobExecutionException.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.dataflow; - -import javax.annotation.Nullable; - -/** - * Signals that a job run by a {@link BlockingDataflowRunner} fails during execution, and - * provides access to the failed job. - */ -public class DataflowJobExecutionException extends DataflowJobException { - DataflowJobExecutionException(DataflowPipelineJob job, String message) { - this(job, message, null); - } - - DataflowJobExecutionException( - DataflowPipelineJob job, String message, @Nullable Throwable cause) { - super(job, message, cause); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4c0fab0b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowJobUpdatedException.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowJobUpdatedException.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowJobUpdatedException.java deleted file mode 100644 index 39d1d47..0000000 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowJobUpdatedException.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.dataflow; - -/** - * Signals that a job run by a {@link BlockingDataflowRunner} was updated during execution. - */ -public class DataflowJobUpdatedException extends DataflowJobException { - private DataflowPipelineJob replacedByJob; - - /** - * Create a new {@code DataflowJobUpdatedException} with the specified original {@link - * DataflowPipelineJob}, message, and replacement {@link DataflowPipelineJob}. - */ - public DataflowJobUpdatedException( - DataflowPipelineJob job, String message, DataflowPipelineJob replacedByJob) { - this(job, message, replacedByJob, null); - } - - /** - * Create a new {@code DataflowJobUpdatedException} with the specified original {@link - * DataflowPipelineJob}, message, replacement {@link DataflowPipelineJob}, and cause. - */ - public DataflowJobUpdatedException( - DataflowPipelineJob job, String message, DataflowPipelineJob replacedByJob, Throwable cause) { - super(job, message, cause); - this.replacedByJob = replacedByJob; - } - - /** - * The new job that replaces the job terminated with this exception. - */ - public DataflowPipelineJob getReplacedByJob() { - return replacedByJob; - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4c0fab0b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java index c3be192..27006a4 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java @@ -206,7 +206,26 @@ public class DataflowPipelineJob implements PipelineResult { public State waitUntilFinish( Duration duration, MonitoringUtil.JobMessagesHandler messageHandler) throws IOException, InterruptedException { - return waitUntilFinish(duration, messageHandler, Sleeper.DEFAULT, NanoClock.SYSTEM); + // We ignore the potential race condition here (Ctrl-C after job submission but before the + // shutdown hook is registered). Even if we tried to do something smarter (eg., SettableFuture) + // the run method (which produces the job) could fail or be Ctrl-C'd before it had returned a + // job. The display of the command to cancel the job is best-effort anyways -- RPC's could fail, + // etc. If the user wants to verify the job was cancelled they should look at the job status. + Thread shutdownHook = new Thread() { + @Override + public void run() { + LOG.warn("Job is already running in Google Cloud Platform, Ctrl-C will not cancel it.\n" + + "To cancel the job in the cloud, run:\n> {}", + MonitoringUtil.getGcloudCancelCommand(dataflowOptions, getJobId())); + } + }; + + try { + Runtime.getRuntime().addShutdownHook(shutdownHook); + return waitUntilFinish(duration, messageHandler, Sleeper.DEFAULT, NanoClock.SYSTEM); + } finally { + Runtime.getRuntime().removeShutdownHook(shutdownHook); + } } /** @@ -230,8 +249,7 @@ public class DataflowPipelineJob implements PipelineResult { Duration duration, MonitoringUtil.JobMessagesHandler messageHandler, Sleeper sleeper, - NanoClock nanoClock) - throws IOException, InterruptedException { + NanoClock nanoClock) throws IOException, InterruptedException { MonitoringUtil monitor = new MonitoringUtil(projectId, dataflowOptions.getDataflowClient()); long lastTimestamp = 0; @@ -275,6 +293,23 @@ public class DataflowPipelineJob implements PipelineResult { if (!hasError) { // We can stop if the job is done. if (state.isTerminal()) { + switch (state) { + case DONE: + case CANCELLED: + LOG.info("Job {} finished with status {}.", getJobId(), state); + break; + case UPDATED: + LOG.info("Job {} has been updated and is running as the new job with id {}. " + + "To access the updated job on the Dataflow monitoring console, " + + "please navigate to {}", + getJobId(), + getReplacedByJob().getJobId(), + MonitoringUtil.getJobMonitoringPageURL( + getReplacedByJob().getProjectId(), getReplacedByJob().getJobId())); + break; + default: + LOG.info("Job {} failed with status {}.", getJobId(), state); + } return state; } @@ -297,7 +332,7 @@ public class DataflowPipelineJob implements PipelineResult { } } } while(BackOffUtils.next(sleeper, backoff)); - LOG.warn("No terminal state was returned. State value {}", state); + LOG.warn("No terminal state was returned. State value {}", state); return null; // Timed out. } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4c0fab0b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineRegistrar.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineRegistrar.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineRegistrar.java index 5090a8a..5bd3bcd 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineRegistrar.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineRegistrar.java @@ -19,7 +19,6 @@ package org.apache.beam.runners.dataflow; import com.google.auto.service.AutoService; import com.google.common.collect.ImmutableList; -import org.apache.beam.runners.dataflow.options.BlockingDataflowPipelineOptions; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsRegistrar; @@ -34,28 +33,26 @@ public class DataflowPipelineRegistrar { private DataflowPipelineRegistrar() { } /** - * Register the {@link DataflowPipelineOptions} and {@link BlockingDataflowPipelineOptions}. + * Register the {@link DataflowPipelineOptions}. */ @AutoService(PipelineOptionsRegistrar.class) public static class Options implements PipelineOptionsRegistrar { @Override public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() { return ImmutableList.<Class<? extends PipelineOptions>>of( - DataflowPipelineOptions.class, - BlockingDataflowPipelineOptions.class); + DataflowPipelineOptions.class); } } /** - * Register the {@link DataflowRunner} and {@link BlockingDataflowRunner}. + * Register the {@link DataflowRunner}. */ @AutoService(PipelineRunnerRegistrar.class) public static class Runner implements PipelineRunnerRegistrar { @Override public Iterable<Class<? extends PipelineRunner<?>>> getPipelineRunners() { return ImmutableList.<Class<? extends PipelineRunner<?>>>of( - DataflowRunner.class, - BlockingDataflowRunner.class); + DataflowRunner.class); } } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4c0fab0b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/BlockingDataflowPipelineOptions.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/BlockingDataflowPipelineOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/BlockingDataflowPipelineOptions.java deleted file mode 100644 index 5d8d1a1..0000000 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/BlockingDataflowPipelineOptions.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.dataflow.options; - -import org.apache.beam.runners.dataflow.BlockingDataflowRunner; -import org.apache.beam.sdk.options.Description; - -/** - * Options that are used to configure the {@link BlockingDataflowRunner}. - */ -@Description("Configure options on the BlockingDataflowRunner.") -public interface BlockingDataflowPipelineOptions extends DataflowPipelineOptions { -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4c0fab0b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineOptions.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineOptions.java index e66ffc9..12f7b39 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineOptions.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineOptions.java @@ -17,14 +17,12 @@ */ package org.apache.beam.runners.dataflow.testing; -import org.apache.beam.runners.dataflow.options.BlockingDataflowPipelineOptions; +import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.testing.TestPipelineOptions; /** * A set of options used to configure the {@link TestPipeline}. */ -public interface TestDataflowPipelineOptions extends TestPipelineOptions, - BlockingDataflowPipelineOptions { - +public interface TestDataflowPipelineOptions extends TestPipelineOptions, DataflowPipelineOptions { } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4c0fab0b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java index a152505..0f141d2 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java @@ -33,7 +33,6 @@ import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import javax.annotation.Nullable; -import org.apache.beam.runners.dataflow.DataflowJobExecutionException; import org.apache.beam.runners.dataflow.DataflowPipelineJob; import org.apache.beam.runners.dataflow.DataflowRunner; import org.apache.beam.runners.dataflow.util.MonitoringUtil; @@ -97,11 +96,7 @@ public class TestDataflowRunner extends PipelineRunner<DataflowPipelineJob> { TestPipelineOptions testPipelineOptions = pipeline.getOptions().as(TestPipelineOptions.class); final DataflowPipelineJob job; - try { - job = runner.run(pipeline); - } catch (DataflowJobExecutionException ex) { - throw new IllegalStateException("The dataflow failed."); - } + job = runner.run(pipeline); LOG.info("Running Dataflow job {} with {} expected assertions.", job.getJobId(), expectedNumberOfAssertions); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4c0fab0b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BlockingDataflowRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BlockingDataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BlockingDataflowRunnerTest.java deleted file mode 100644 index 4572a64..0000000 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BlockingDataflowRunnerTest.java +++ /dev/null @@ -1,300 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.dataflow; - -import static org.hamcrest.CoreMatchers.equalTo; -import static org.junit.Assert.assertEquals; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.isA; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; -import org.apache.beam.runners.dataflow.testing.TestDataflowPipelineOptions; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.PipelineResult.State; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.testing.ExpectedLogs; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.util.NoopPathValidator; -import org.apache.beam.sdk.util.TestCredential; -import org.hamcrest.Description; -import org.hamcrest.Factory; -import org.hamcrest.Matcher; -import org.hamcrest.TypeSafeMatcher; -import org.joda.time.Duration; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** - * Tests for BlockingDataflowRunner. - */ -@RunWith(JUnit4.class) -public class BlockingDataflowRunnerTest { - - @Rule - public ExpectedLogs expectedLogs = ExpectedLogs.none(BlockingDataflowRunner.class); - - @Rule - public ExpectedException expectedThrown = ExpectedException.none(); - - /** - * A {@link Matcher} for a {@link DataflowJobException} that applies an underlying {@link Matcher} - * to the {@link DataflowPipelineJob} returned by {@link DataflowJobException#getJob()}. - */ - private static class DataflowJobExceptionMatcher<T extends DataflowJobException> - extends TypeSafeMatcher<T> { - - private final Matcher<DataflowPipelineJob> matcher; - - public DataflowJobExceptionMatcher(Matcher<DataflowPipelineJob> matcher) { - this.matcher = matcher; - } - - @Override - public boolean matchesSafely(T ex) { - return matcher.matches(ex.getJob()); - } - - @Override - protected void describeMismatchSafely(T item, Description description) { - description.appendText("job "); - matcher.describeMismatch(item.getMessage(), description); - } - - @Override - public void describeTo(Description description) { - description.appendText("exception with job matching "); - description.appendDescriptionOf(matcher); - } - - @Factory - public static <T extends DataflowJobException> Matcher<T> expectJob( - Matcher<DataflowPipelineJob> matcher) { - return new DataflowJobExceptionMatcher<T>(matcher); - } - } - - /** - * A {@link Matcher} for a {@link DataflowPipelineJob} that applies an underlying {@link Matcher} - * to the return value of {@link DataflowPipelineJob#getJobId()}. - */ - private static class JobIdMatcher<T extends DataflowPipelineJob> extends TypeSafeMatcher<T> { - - private final Matcher<String> matcher; - - public JobIdMatcher(Matcher<String> matcher) { - this.matcher = matcher; - } - - @Override - public boolean matchesSafely(T job) { - return matcher.matches(job.getJobId()); - } - - @Override - protected void describeMismatchSafely(T item, Description description) { - description.appendText("jobId "); - matcher.describeMismatch(item.getJobId(), description); - } - - @Override - public void describeTo(Description description) { - description.appendText("job with jobId "); - description.appendDescriptionOf(matcher); - } - - @Factory - public static <T extends DataflowPipelineJob> Matcher<T> expectJobId(final String jobId) { - return new JobIdMatcher<T>(equalTo(jobId)); - } - } - - /** - * A {@link Matcher} for a {@link DataflowJobUpdatedException} that applies an underlying - * {@link Matcher} to the {@link DataflowPipelineJob} returned by - * {@link DataflowJobUpdatedException#getReplacedByJob()}. - */ - private static class ReplacedByJobMatcher<T extends DataflowJobUpdatedException> - extends TypeSafeMatcher<T> { - - private final Matcher<DataflowPipelineJob> matcher; - - public ReplacedByJobMatcher(Matcher<DataflowPipelineJob> matcher) { - this.matcher = matcher; - } - - @Override - public boolean matchesSafely(T ex) { - return matcher.matches(ex.getReplacedByJob()); - } - - @Override - protected void describeMismatchSafely(T item, Description description) { - description.appendText("job "); - matcher.describeMismatch(item.getMessage(), description); - } - - @Override - public void describeTo(Description description) { - description.appendText("exception with replacedByJob() "); - description.appendDescriptionOf(matcher); - } - - @Factory - public static <T extends DataflowJobUpdatedException> Matcher<T> expectReplacedBy( - Matcher<DataflowPipelineJob> matcher) { - return new ReplacedByJobMatcher<T>(matcher); - } - } - - /** - * Creates a mocked {@link DataflowPipelineJob} with the given {@code projectId} and {@code jobId} - * that will immediately terminate in the provided {@code terminalState}. - * - * <p>The return value may be further mocked. - */ - private DataflowPipelineJob createMockJob( - String projectId, String jobId, State terminalState) throws Exception { - DataflowPipelineJob mockJob = mock(DataflowPipelineJob.class); - when(mockJob.getProjectId()).thenReturn(projectId); - when(mockJob.getJobId()).thenReturn(jobId); - when(mockJob.waitUntilFinish(any(Duration.class))) - .thenReturn(terminalState); - return mockJob; - } - - /** - * Returns a {@link BlockingDataflowRunner} that will return the provided a job to return. - * Some {@link PipelineOptions} will be extracted from the job, such as the project ID. - */ - private BlockingDataflowRunner createMockRunner(DataflowPipelineJob job) - throws Exception { - DataflowRunner mockRunner = mock(DataflowRunner.class); - TestDataflowPipelineOptions options = - PipelineOptionsFactory.as(TestDataflowPipelineOptions.class); - options.setRunner(BlockingDataflowRunner.class); - options.setProject(job.getProjectId()); - - when(mockRunner.run(isA(Pipeline.class))).thenReturn(job); - - return new BlockingDataflowRunner(mockRunner, options); - } - - /** - * Tests that the {@link BlockingDataflowRunner} returns normally when a job terminates in - * the {@link State#DONE DONE} state. - */ - @Test - public void testJobDoneComplete() throws Exception { - createMockRunner(createMockJob("testJobDone-projectId", "testJobDone-jobId", State.DONE)) - .run(TestPipeline.create()); - expectedLogs.verifyInfo("Job finished with status DONE"); - } - - /** - * Tests that the {@link BlockingDataflowRunner} throws the appropriate exception - * when a job terminates in the {@link State#FAILED FAILED} state. - */ - @Test - public void testFailedJobThrowsException() throws Exception { - expectedThrown.expect(DataflowJobExecutionException.class); - expectedThrown.expect(DataflowJobExceptionMatcher.expectJob( - JobIdMatcher.expectJobId("testFailedJob-jobId"))); - createMockRunner(createMockJob("testFailedJob-projectId", "testFailedJob-jobId", State.FAILED)) - .run(TestPipeline.create()); - } - - /** - * Tests that the {@link BlockingDataflowRunner} throws the appropriate exception - * when a job terminates in the {@link State#CANCELLED CANCELLED} state. - */ - @Test - public void testCancelledJobThrowsException() throws Exception { - expectedThrown.expect(DataflowJobCancelledException.class); - expectedThrown.expect(DataflowJobExceptionMatcher.expectJob( - JobIdMatcher.expectJobId("testCancelledJob-jobId"))); - createMockRunner( - createMockJob("testCancelledJob-projectId", "testCancelledJob-jobId", State.CANCELLED)) - .run(TestPipeline.create()); - } - - /** - * Tests that the {@link BlockingDataflowRunner} throws the appropriate exception - * when a job terminates in the {@link State#UPDATED UPDATED} state. - */ - @Test - public void testUpdatedJobThrowsException() throws Exception { - expectedThrown.expect(DataflowJobUpdatedException.class); - expectedThrown.expect(DataflowJobExceptionMatcher.expectJob( - JobIdMatcher.expectJobId("testUpdatedJob-jobId"))); - expectedThrown.expect(ReplacedByJobMatcher.expectReplacedBy( - JobIdMatcher.expectJobId("testUpdatedJob-replacedByJobId"))); - DataflowPipelineJob job = - createMockJob("testUpdatedJob-projectId", "testUpdatedJob-jobId", State.UPDATED); - DataflowPipelineJob replacedByJob = - createMockJob("testUpdatedJob-projectId", "testUpdatedJob-replacedByJobId", State.DONE); - when(job.getReplacedByJob()).thenReturn(replacedByJob); - createMockRunner(job).run(TestPipeline.create()); - } - - /** - * Tests that the {@link BlockingDataflowRunner} throws the appropriate exception - * when a job terminates in the {@link State#UNKNOWN UNKNOWN} state, indicating that the - * Dataflow service returned a state that the SDK is unfamiliar with (possibly because it - * is an old SDK relative the service). - */ - @Test - public void testUnknownJobThrowsException() throws Exception { - expectedThrown.expect(IllegalStateException.class); - createMockRunner( - createMockJob("testUnknownJob-projectId", "testUnknownJob-jobId", State.UNKNOWN)) - .run(TestPipeline.create()); - } - - /** - * Tests that the {@link BlockingDataflowRunner} throws the appropriate exception - * when a job returns a {@code null} state, indicating that it failed to contact the service, - * including all of its built-in resilience logic. - */ - @Test - public void testNullJobThrowsException() throws Exception { - expectedThrown.expect(DataflowServiceException.class); - expectedThrown.expect(DataflowJobExceptionMatcher.expectJob( - JobIdMatcher.expectJobId("testNullJob-jobId"))); - createMockRunner(createMockJob("testNullJob-projectId", "testNullJob-jobId", null)) - .run(TestPipeline.create()); - } - - @Test - public void testToString() { - DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); - options.setJobName("TestJobName"); - options.setProject("test-project"); - options.setTempLocation("gs://test/temp/location"); - options.setGcpCredential(new TestCredential()); - options.setPathValidatorClass(NoopPathValidator.class); - options.setRunner(BlockingDataflowRunner.class); - assertEquals("BlockingDataflowRunner#testjobname", - BlockingDataflowRunner.fromOptions(options).toString()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4c0fab0b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java index 2af95e2..0527b7c 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java @@ -61,6 +61,7 @@ import org.apache.beam.sdk.AggregatorRetrievalException; import org.apache.beam.sdk.AggregatorValues; import org.apache.beam.sdk.PipelineResult.State; import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.ExpectedLogs; import org.apache.beam.sdk.testing.FastNanoClockAndSleeper; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.AppliedPTransform; @@ -86,6 +87,7 @@ import org.mockito.MockitoAnnotations; public class DataflowPipelineJobTest { private static final String PROJECT_ID = "someProject"; private static final String JOB_ID = "1234"; + private static final String REPLACEMENT_JOB_ID = "replacementJobId"; @Mock private Dataflow mockWorkflowClient; @@ -99,6 +101,9 @@ public class DataflowPipelineJobTest { @Rule public ExpectedException thrown = ExpectedException.none(); + @Rule + public ExpectedLogs expectedLogs = ExpectedLogs.none(DataflowPipelineJob.class); + private TestDataflowPipelineOptions options; @Before @@ -168,6 +173,9 @@ public class DataflowPipelineJobTest { Job statusResponse = new Job(); statusResponse.setCurrentState("JOB_STATE_" + state.name()); + if (state == State.UPDATED) { + statusResponse.setReplacedByJobId(REPLACEMENT_JOB_ID); + } when(mockJobs.get(eq(PROJECT_ID), eq(JOB_ID))).thenReturn(statusRequest); when(statusRequest.execute()).thenReturn(statusResponse); @@ -187,6 +195,7 @@ public class DataflowPipelineJobTest { @Test public void testWaitToFinishDone() throws Exception { assertEquals(State.DONE, mockWaitToFinishInState(State.DONE)); + expectedLogs.verifyInfo(String.format("Job %s finished with status DONE.", JOB_ID)); } /** @@ -196,24 +205,39 @@ public class DataflowPipelineJobTest { @Test public void testWaitToFinishFailed() throws Exception { assertEquals(State.FAILED, mockWaitToFinishInState(State.FAILED)); + expectedLogs.verifyInfo(String.format("Job %s failed with status FAILED.", JOB_ID)); } /** - * Tests that the {@link DataflowPipelineJob} understands that the {@link State#FAILED FAILED} - * state is terminal. + * Tests that the {@link DataflowPipelineJob} understands that the + * {@link State#CANCELLED CANCELLED} state is terminal. */ @Test public void testWaitToFinishCancelled() throws Exception { assertEquals(State.CANCELLED, mockWaitToFinishInState(State.CANCELLED)); + expectedLogs.verifyInfo(String.format("Job %s finished with status CANCELLED", JOB_ID)); } /** - * Tests that the {@link DataflowPipelineJob} understands that the {@link State#FAILED FAILED} + * Tests that the {@link DataflowPipelineJob} understands that the {@link State#UPDATED UPDATED} * state is terminal. */ @Test public void testWaitToFinishUpdated() throws Exception { assertEquals(State.UPDATED, mockWaitToFinishInState(State.UPDATED)); + expectedLogs.verifyInfo(String.format( + "Job %s has been updated and is running as the new job with id %s.", + JOB_ID, REPLACEMENT_JOB_ID)); + } + + /** + * Tests that the {@link DataflowPipelineJob} understands that the {@link State#UNKNOWN UNKNOWN} + * state is terminal. + */ + @Test + public void testWaitToFinishUnknown() throws Exception { + assertEquals(null, mockWaitToFinishInState(State.UNKNOWN)); + expectedLogs.verifyWarn("No terminal state was returned. State value UNKNOWN"); } @Test http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4c0fab0b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineRegistrarTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineRegistrarTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineRegistrarTest.java index f084757..31f9281 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineRegistrarTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineRegistrarTest.java @@ -23,7 +23,6 @@ import static org.junit.Assert.fail; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import java.util.ServiceLoader; -import org.apache.beam.runners.dataflow.options.BlockingDataflowPipelineOptions; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsRegistrar; import org.apache.beam.sdk.runners.PipelineRunnerRegistrar; @@ -36,15 +35,13 @@ import org.junit.runners.JUnit4; public class DataflowPipelineRegistrarTest { @Test public void testCorrectOptionsAreReturned() { - assertEquals(ImmutableList.of(DataflowPipelineOptions.class, - BlockingDataflowPipelineOptions.class), + assertEquals(ImmutableList.of(DataflowPipelineOptions.class), new DataflowPipelineRegistrar.Options().getPipelineOptions()); } @Test public void testCorrectRunnersAreReturned() { - assertEquals(ImmutableList.of(DataflowRunner.class, - BlockingDataflowRunner.class), + assertEquals(ImmutableList.of(DataflowRunner.class), new DataflowPipelineRegistrar.Runner().getPipelineRunners()); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4c0fab0b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java index 13bf322..427ecfc 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java @@ -29,11 +29,10 @@ import org.apache.beam.sdk.util.ExecutionContext; * typically from the {@link DoFn} constructor. Elements can be added to the * {@code Aggregator} by calling {@link Aggregator#addValue}. * - * <p>Aggregators are visible in the monitoring UI, when the pipeline is run - * using DataflowRunner or BlockingDataflowRunner, along with - * their current value. Aggregators may not become visible until the system - * begins executing the ParDo transform that created them and/or their initial - * value is changed. + * <p>Aggregators are visible in the monitoring UI, when the pipeline is run using + * {@link DataflowRunner} along with their current value. + * Aggregators may not become visible until the system begins executing the ParDo transform + * that created them and/or their initial value is changed. * * <p>Example: * <pre> {@code