http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/DataflowWorkerLoggingOptions.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/DataflowWorkerLoggingOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/DataflowWorkerLoggingOptions.java new file mode 100644 index 0000000..ebd42d9 --- /dev/null +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/DataflowWorkerLoggingOptions.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.dataflow.sdk.options; + +import com.google.common.base.Preconditions; + +import com.fasterxml.jackson.annotation.JsonCreator; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +/** + * Options that are used to control logging configuration on the Dataflow worker. + */ +@Description("Options that are used to control logging configuration on the Dataflow worker.") +public interface DataflowWorkerLoggingOptions extends PipelineOptions { + /** + * The set of log levels that can be used on the Dataflow worker. + */ + public enum Level { + DEBUG, ERROR, INFO, TRACE, WARN + } + + /** + * This option controls the default log level of all loggers without a log level override. + */ + @Description("Controls the default log level of all loggers without a log level override.") + @Default.Enum("INFO") + Level getDefaultWorkerLogLevel(); + void setDefaultWorkerLogLevel(Level level); + + /** + * This option controls the log levels for specifically named loggers. + * + * <p>Later options with equivalent names override earlier options. + * + * <p>See {@link WorkerLogLevelOverrides} for more information on how to configure logging + * on a per {@link Class}, {@link Package}, or name basis. If used from the command line, + * the expected format is {"Name":"Level",...}, further details on + * {@link WorkerLogLevelOverrides#from}. + */ + @Description("This option controls the log levels for specifically named loggers. " + + "The expected format is {\"Name\":\"Level\",...}. The Dataflow worker uses " + + "java.util.logging, which supports a logging hierarchy based off of names that are '.' " + + "separated. For example, by specifying the value {\"a.b.c.Foo\":\"DEBUG\"}, the logger " + + "for the class 'a.b.c.Foo' will be configured to output logs at the DEBUG level. " + + "Similarly, by specifying the value {\"a.b.c\":\"WARN\"}, all loggers underneath the " + + "'a.b.c' package will be configured to output logs at the WARN level. Also, note that " + + "when multiple overrides are specified, the exact name followed by the closest parent " + + "takes precedence.") + WorkerLogLevelOverrides getWorkerLogLevelOverrides(); + void setWorkerLogLevelOverrides(WorkerLogLevelOverrides value); + + /** + * Defines a log level override for a specific class, package, or name. + * + * <p>{@code java.util.logging} is used on the Dataflow worker harness and supports + * a logging hierarchy based off of names that are "." separated. It is a common + * pattern to have the logger for a given class share the same name as the class itself. + * Given the classes {@code a.b.c.Foo}, {@code a.b.c.Xyz}, and {@code a.b.Bar}, with + * loggers named {@code "a.b.c.Foo"}, {@code "a.b.c.Xyz"}, and {@code "a.b.Bar"} respectively, + * we can override the log levels: + * <ul> + * <li>for {@code Foo} by specifying the name {@code "a.b.c.Foo"} or the {@link Class} + * representing {@code a.b.c.Foo}. + * <li>for {@code Foo}, {@code Xyz}, and {@code Bar} by specifying the name {@code "a.b"} or + * the {@link Package} representing {@code a.b}. + * <li>for {@code Foo} and {@code Bar} by specifying both of their names or classes. + * </ul> + * Note that by specifying multiple overrides, the exact name followed by the closest parent + * takes precedence. + */ + public static class WorkerLogLevelOverrides extends HashMap<String, Level> { + /** + * Overrides the default log level for the passed in class. + * + * <p>This is equivalent to calling + * {@link #addOverrideForName(String, DataflowWorkerLoggingOptions.Level)} + * and passing in the {@link Class#getName() class name}. + */ + public WorkerLogLevelOverrides addOverrideForClass(Class<?> klass, Level level) { + Preconditions.checkNotNull(klass, "Expected class to be not null."); + addOverrideForName(klass.getName(), level); + return this; + } + + /** + * Overrides the default log level for the passed in package. + * + * <p>This is equivalent to calling + * {@link #addOverrideForName(String, DataflowWorkerLoggingOptions.Level)} + * and passing in the {@link Package#getName() package name}. + */ + public WorkerLogLevelOverrides addOverrideForPackage(Package pkg, Level level) { + Preconditions.checkNotNull(pkg, "Expected package to be not null."); + addOverrideForName(pkg.getName(), level); + return this; + } + + /** + * Overrides the default log level for the passed in name. + * + * <p>Note that because of the hierarchical nature of logger names, this will + * override the log level of all loggers that have the passed in name or + * a parent logger that has the passed in name. + */ + public WorkerLogLevelOverrides addOverrideForName(String name, Level level) { + Preconditions.checkNotNull(name, "Expected name to be not null."); + Preconditions.checkNotNull(level, + "Expected level to be one of %s.", Arrays.toString(Level.values())); + put(name, level); + return this; + } + + /** + * Expects a map keyed by logger {@code Name}s with values representing {@code Level}s. + * The {@code Name} generally represents the fully qualified Java + * {@link Class#getName() class name}, or fully qualified Java + * {@link Package#getName() package name}, or custom logger name. The {@code Level} + * represents the log level and must be one of {@link Level}. + */ + @JsonCreator + public static WorkerLogLevelOverrides from(Map<String, String> values) { + Preconditions.checkNotNull(values, "Expected values to be not null."); + WorkerLogLevelOverrides overrides = new WorkerLogLevelOverrides(); + for (Map.Entry<String, String> entry : values.entrySet()) { + try { + overrides.addOverrideForName(entry.getKey(), Level.valueOf(entry.getValue())); + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException(String.format( + "Unsupported log level '%s' requested for %s. Must be one of %s.", + entry.getValue(), entry.getKey(), Arrays.toString(Level.values()))); + } + + } + return overrides; + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/BlockingDataflowPipelineRunner.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/BlockingDataflowPipelineRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/BlockingDataflowPipelineRunner.java new file mode 100644 index 0000000..9096c2b --- /dev/null +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/BlockingDataflowPipelineRunner.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.dataflow.sdk.runners; + +import org.apache.beam.sdk.annotations.Experimental; + +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.PipelineResult.State; +import com.google.cloud.dataflow.sdk.options.BlockingDataflowPipelineOptions; +import com.google.cloud.dataflow.sdk.options.PipelineOptions; +import com.google.cloud.dataflow.sdk.options.PipelineOptionsValidator; +import com.google.cloud.dataflow.sdk.transforms.PTransform; +import com.google.cloud.dataflow.sdk.util.MonitoringUtil; +import com.google.cloud.dataflow.sdk.values.PInput; +import com.google.cloud.dataflow.sdk.values.POutput; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +import javax.annotation.Nullable; + +/** + * A {@link PipelineRunner} that's like {@link DataflowPipelineRunner} + * but that waits for the launched job to finish. + * + * <p>Prints out job status updates and console messages while it waits. + * + * <p>Returns the final job state, or throws an exception if the job + * fails or cannot be monitored. + * + * <p><h3>Permissions</h3> + * When reading from a Dataflow source or writing to a Dataflow sink using + * {@code BlockingDataflowPipelineRunner}, the Google cloud services account and the Google compute + * engine service account of the GCP project running the Dataflow Job will need access to the + * corresponding source/sink. + * + * <p>Please see <a href="https://cloud.google.com/dataflow/security-and-permissions">Google Cloud + * Dataflow Security and Permissions</a> for more details. + */ +public class BlockingDataflowPipelineRunner extends + PipelineRunner<DataflowPipelineJob> { + private static final Logger LOG = LoggerFactory.getLogger(BlockingDataflowPipelineRunner.class); + + // Defaults to an infinite wait period. + // TODO: make this configurable after removal of option map. + private static final long BUILTIN_JOB_TIMEOUT_SEC = -1L; + + private final DataflowPipelineRunner dataflowPipelineRunner; + private final BlockingDataflowPipelineOptions options; + + protected BlockingDataflowPipelineRunner( + DataflowPipelineRunner internalRunner, + BlockingDataflowPipelineOptions options) { + this.dataflowPipelineRunner = internalRunner; + this.options = options; + } + + /** + * Constructs a runner from the provided options. + */ + public static BlockingDataflowPipelineRunner fromOptions( + PipelineOptions options) { + BlockingDataflowPipelineOptions dataflowOptions = + PipelineOptionsValidator.validate(BlockingDataflowPipelineOptions.class, options); + DataflowPipelineRunner dataflowPipelineRunner = + DataflowPipelineRunner.fromOptions(dataflowOptions); + + return new BlockingDataflowPipelineRunner(dataflowPipelineRunner, dataflowOptions); + } + + /** + * {@inheritDoc} + * + * @throws DataflowJobExecutionException if there is an exception during job execution. + * @throws DataflowServiceException if there is an exception retrieving information about the job. + */ + @Override + public DataflowPipelineJob run(Pipeline p) { + final DataflowPipelineJob job = dataflowPipelineRunner.run(p); + + // We ignore the potential race condition here (Ctrl-C after job submission but before the + // shutdown hook is registered). Even if we tried to do something smarter (eg., SettableFuture) + // the run method (which produces the job) could fail or be Ctrl-C'd before it had returned a + // job. The display of the command to cancel the job is best-effort anyways -- RPC's could fail, + // etc. If the user wants to verify the job was cancelled they should look at the job status. + Thread shutdownHook = new Thread() { + @Override + public void run() { + LOG.warn("Job is already running in Google Cloud Platform, Ctrl-C will not cancel it.\n" + + "To cancel the job in the cloud, run:\n> {}", + MonitoringUtil.getGcloudCancelCommand(options, job.getJobId())); + } + }; + + try { + Runtime.getRuntime().addShutdownHook(shutdownHook); + + @Nullable + State result; + try { + result = job.waitToFinish( + BUILTIN_JOB_TIMEOUT_SEC, TimeUnit.SECONDS, + new MonitoringUtil.PrintHandler(options.getJobMessageOutput())); + } catch (IOException | InterruptedException ex) { + if (ex instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } + LOG.debug("Exception caught while retrieving status for job {}", job.getJobId(), ex); + throw new DataflowServiceException( + job, "Exception caught while retrieving status for job " + job.getJobId(), ex); + } + + if (result == null) { + throw new DataflowServiceException( + job, "Timed out while retrieving status for job " + job.getJobId()); + } + + LOG.info("Job finished with status {}", result); + if (!result.isTerminal()) { + throw new IllegalStateException("Expected terminal state for job " + job.getJobId() + + ", got " + result); + } + + if (result == State.DONE) { + return job; + } else if (result == State.UPDATED) { + DataflowPipelineJob newJob = job.getReplacedByJob(); + LOG.info("Job {} has been updated and is running as the new job with id {}." + + "To access the updated job on the Dataflow monitoring console, please navigate to {}", + job.getJobId(), + newJob.getJobId(), + MonitoringUtil.getJobMonitoringPageURL(newJob.getProjectId(), newJob.getJobId())); + throw new DataflowJobUpdatedException( + job, + String.format("Job %s updated; new job is %s.", job.getJobId(), newJob.getJobId()), + newJob); + } else if (result == State.CANCELLED) { + String message = String.format("Job %s cancelled by user", job.getJobId()); + LOG.info(message); + throw new DataflowJobCancelledException(job, message); + } else { + throw new DataflowJobExecutionException(job, "Job " + job.getJobId() + + " failed with status " + result); + } + } finally { + Runtime.getRuntime().removeShutdownHook(shutdownHook); + } + } + + @Override + public <OutputT extends POutput, InputT extends PInput> OutputT apply( + PTransform<InputT, OutputT> transform, InputT input) { + return dataflowPipelineRunner.apply(transform, input); + } + + /** + * Sets callbacks to invoke during execution. See {@link DataflowPipelineRunnerHooks}. + */ + @Experimental + public void setHooks(DataflowPipelineRunnerHooks hooks) { + this.dataflowPipelineRunner.setHooks(hooks); + } + + @Override + public String toString() { + return "BlockingDataflowPipelineRunner#" + options.getJobName(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowJobAlreadyExistsException.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowJobAlreadyExistsException.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowJobAlreadyExistsException.java new file mode 100644 index 0000000..4a4f100 --- /dev/null +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowJobAlreadyExistsException.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.dataflow.sdk.runners; + +/** + * An exception that is thrown if the unique job name constraint of the Dataflow + * service is broken because an existing job with the same job name is currently active. + * The {@link DataflowPipelineJob} contained within this exception contains information + * about the pre-existing job. + */ +public class DataflowJobAlreadyExistsException extends DataflowJobException { + /** + * Create a new {@code DataflowJobAlreadyExistsException} with the specified {@link + * DataflowPipelineJob} and message. + */ + public DataflowJobAlreadyExistsException( + DataflowPipelineJob job, String message) { + super(job, message, null); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowJobAlreadyUpdatedException.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowJobAlreadyUpdatedException.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowJobAlreadyUpdatedException.java new file mode 100644 index 0000000..1f52c6a --- /dev/null +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowJobAlreadyUpdatedException.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.dataflow.sdk.runners; + +/** + * An exception that is thrown if the existing job has already been updated within the Dataflow + * service and is no longer able to be updated. The {@link DataflowPipelineJob} contained within + * this exception contains information about the pre-existing updated job. + */ +public class DataflowJobAlreadyUpdatedException extends DataflowJobException { + /** + * Create a new {@code DataflowJobAlreadyUpdatedException} with the specified {@link + * DataflowPipelineJob} and message. + */ + public DataflowJobAlreadyUpdatedException( + DataflowPipelineJob job, String message) { + super(job, message, null); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowJobCancelledException.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowJobCancelledException.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowJobCancelledException.java new file mode 100644 index 0000000..495ca5a --- /dev/null +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowJobCancelledException.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.dataflow.sdk.runners; + +/** + * Signals that a job run by a {@link BlockingDataflowPipelineRunner} was updated during execution. + */ +public class DataflowJobCancelledException extends DataflowJobException { + /** + * Create a new {@code DataflowJobAlreadyUpdatedException} with the specified {@link + * DataflowPipelineJob} and message. + */ + public DataflowJobCancelledException(DataflowPipelineJob job, String message) { + super(job, message, null); + } + + /** + * Create a new {@code DataflowJobAlreadyUpdatedException} with the specified {@link + * DataflowPipelineJob}, message, and cause. + */ + public DataflowJobCancelledException(DataflowPipelineJob job, String message, Throwable cause) { + super(job, message, cause); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowJobException.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowJobException.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowJobException.java new file mode 100644 index 0000000..a22d13c --- /dev/null +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowJobException.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.dataflow.sdk.runners; + +import java.util.Objects; + +import javax.annotation.Nullable; + +/** + * A {@link RuntimeException} that contains information about a {@link DataflowPipelineJob}. + */ +public abstract class DataflowJobException extends RuntimeException { + private final DataflowPipelineJob job; + + DataflowJobException(DataflowPipelineJob job, String message, @Nullable Throwable cause) { + super(message, cause); + this.job = Objects.requireNonNull(job); + } + + /** + * Returns the failed job. + */ + public DataflowPipelineJob getJob() { + return job; + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowJobExecutionException.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowJobExecutionException.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowJobExecutionException.java new file mode 100644 index 0000000..3dbb007 --- /dev/null +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowJobExecutionException.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.dataflow.sdk.runners; + +import javax.annotation.Nullable; + +/** + * Signals that a job run by a {@link BlockingDataflowPipelineRunner} fails during execution, and + * provides access to the failed job. + */ +public class DataflowJobExecutionException extends DataflowJobException { + DataflowJobExecutionException(DataflowPipelineJob job, String message) { + this(job, message, null); + } + + DataflowJobExecutionException( + DataflowPipelineJob job, String message, @Nullable Throwable cause) { + super(job, message, cause); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowJobUpdatedException.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowJobUpdatedException.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowJobUpdatedException.java new file mode 100644 index 0000000..5792941 --- /dev/null +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowJobUpdatedException.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.dataflow.sdk.runners; + +/** + * Signals that a job run by a {@link BlockingDataflowPipelineRunner} was updated during execution. + */ +public class DataflowJobUpdatedException extends DataflowJobException { + private DataflowPipelineJob replacedByJob; + + /** + * Create a new {@code DataflowJobUpdatedException} with the specified original {@link + * DataflowPipelineJob}, message, and replacement {@link DataflowPipelineJob}. + */ + public DataflowJobUpdatedException( + DataflowPipelineJob job, String message, DataflowPipelineJob replacedByJob) { + this(job, message, replacedByJob, null); + } + + /** + * Create a new {@code DataflowJobUpdatedException} with the specified original {@link + * DataflowPipelineJob}, message, replacement {@link DataflowPipelineJob}, and cause. + */ + public DataflowJobUpdatedException( + DataflowPipelineJob job, String message, DataflowPipelineJob replacedByJob, Throwable cause) { + super(job, message, cause); + this.replacedByJob = replacedByJob; + } + + /** + * The new job that replaces the job terminated with this exception. + */ + public DataflowPipelineJob getReplacedByJob() { + return replacedByJob; + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowPipeline.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowPipeline.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowPipeline.java new file mode 100644 index 0000000..7b97c7d --- /dev/null +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowPipeline.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.dataflow.sdk.runners; + +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions; +import com.google.cloud.dataflow.sdk.options.PipelineOptions; + +/** + * A {@link DataflowPipeline} is a {@link Pipeline} that returns a + * {@link DataflowPipelineJob} when it is + * {@link com.google.cloud.dataflow.sdk.Pipeline#run()}. + * + * <p>This is not intended for use by users of Cloud Dataflow. + * Instead, use {@link Pipeline#create(PipelineOptions)} to initialize a + * {@link Pipeline}. + */ +public class DataflowPipeline extends Pipeline { + + /** + * Creates and returns a new {@link DataflowPipeline} instance for tests. + */ + public static DataflowPipeline create(DataflowPipelineOptions options) { + return new DataflowPipeline(options); + } + + private DataflowPipeline(DataflowPipelineOptions options) { + super(DataflowPipelineRunner.fromOptions(options), options); + } + + @Override + public DataflowPipelineJob run() { + return (DataflowPipelineJob) super.run(); + } + + @Override + public DataflowPipelineRunner getRunner() { + return (DataflowPipelineRunner) super.getRunner(); + } + + @Override + public String toString() { + return "DataflowPipeline#" + getOptions().as(DataflowPipelineOptions.class).getJobName(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowPipelineJob.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowPipelineJob.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowPipelineJob.java new file mode 100644 index 0000000..632be6d --- /dev/null +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowPipelineJob.java @@ -0,0 +1,394 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.dataflow.sdk.runners; + +import static com.google.cloud.dataflow.sdk.util.TimeUtil.fromCloudTime; + +import com.google.api.client.googleapis.json.GoogleJsonResponseException; +import com.google.api.client.util.BackOff; +import com.google.api.client.util.BackOffUtils; +import com.google.api.client.util.NanoClock; +import com.google.api.client.util.Sleeper; +import com.google.api.services.dataflow.Dataflow; +import com.google.api.services.dataflow.model.Job; +import com.google.api.services.dataflow.model.JobMessage; +import com.google.api.services.dataflow.model.JobMetrics; +import com.google.api.services.dataflow.model.MetricUpdate; +import com.google.cloud.dataflow.sdk.PipelineResult; +import com.google.cloud.dataflow.sdk.runners.dataflow.DataflowAggregatorTransforms; +import com.google.cloud.dataflow.sdk.runners.dataflow.DataflowMetricUpdateExtractor; +import com.google.cloud.dataflow.sdk.transforms.Aggregator; +import com.google.cloud.dataflow.sdk.util.AttemptAndTimeBoundedExponentialBackOff; +import com.google.cloud.dataflow.sdk.util.AttemptBoundedExponentialBackOff; +import com.google.cloud.dataflow.sdk.util.MapAggregatorValues; +import com.google.cloud.dataflow.sdk.util.MonitoringUtil; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Throwables; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.SocketTimeoutException; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import javax.annotation.Nullable; + +/** + * A DataflowPipelineJob represents a job submitted to Dataflow using + * {@link DataflowPipelineRunner}. + */ +public class DataflowPipelineJob implements PipelineResult { + private static final Logger LOG = LoggerFactory.getLogger(DataflowPipelineJob.class); + + /** + * The id for the job. + */ + private String jobId; + + /** + * Google cloud project to associate this pipeline with. + */ + private String projectId; + + /** + * Client for the Dataflow service. This can be used to query the service + * for information about the job. + */ + private Dataflow dataflowClient; + + /** + * The state the job terminated in or {@code null} if the job has not terminated. + */ + @Nullable + private State terminalState = null; + + /** + * The job that replaced this one or {@code null} if the job has not been replaced. + */ + @Nullable + private DataflowPipelineJob replacedByJob = null; + + private DataflowAggregatorTransforms aggregatorTransforms; + + /** + * The Metric Updates retrieved after the job was in a terminal state. + */ + private List<MetricUpdate> terminalMetricUpdates; + + /** + * The polling interval for job status and messages information. + */ + static final long MESSAGES_POLLING_INTERVAL = TimeUnit.SECONDS.toMillis(2); + static final long STATUS_POLLING_INTERVAL = TimeUnit.SECONDS.toMillis(2); + + /** + * The amount of polling attempts for job status and messages information. + */ + static final int MESSAGES_POLLING_ATTEMPTS = 10; + static final int STATUS_POLLING_ATTEMPTS = 5; + + /** + * Constructs the job. + * + * @param projectId the project id + * @param jobId the job id + * @param dataflowClient the client for the Dataflow Service + */ + public DataflowPipelineJob(String projectId, String jobId, Dataflow dataflowClient, + DataflowAggregatorTransforms aggregatorTransforms) { + this.projectId = projectId; + this.jobId = jobId; + this.dataflowClient = dataflowClient; + this.aggregatorTransforms = aggregatorTransforms; + } + + /** + * Get the id of this job. + */ + public String getJobId() { + return jobId; + } + + /** + * Get the project this job exists in. + */ + public String getProjectId() { + return projectId; + } + + /** + * Returns a new {@link DataflowPipelineJob} for the job that replaced this one, if applicable. + * + * @throws IllegalStateException if called before the job has terminated or if the job terminated + * but was not updated + */ + public DataflowPipelineJob getReplacedByJob() { + if (terminalState == null) { + throw new IllegalStateException("getReplacedByJob() called before job terminated"); + } + if (replacedByJob == null) { + throw new IllegalStateException("getReplacedByJob() called for job that was not replaced"); + } + return replacedByJob; + } + + /** + * Get the Cloud Dataflow API Client used by this job. + */ + public Dataflow getDataflowClient() { + return dataflowClient; + } + + /** + * Waits for the job to finish and return the final status. + * + * @param timeToWait The time to wait in units timeUnit for the job to finish. + * Provide a value less than 1 ms for an infinite wait. + * @param timeUnit The unit of time for timeToWait. + * @param messageHandler If non null this handler will be invoked for each + * batch of messages received. + * @return The final state of the job or null on timeout or if the + * thread is interrupted. + * @throws IOException If there is a persistent problem getting job + * information. + * @throws InterruptedException + */ + @Nullable + public State waitToFinish( + long timeToWait, + TimeUnit timeUnit, + MonitoringUtil.JobMessagesHandler messageHandler) + throws IOException, InterruptedException { + return waitToFinish(timeToWait, timeUnit, messageHandler, Sleeper.DEFAULT, NanoClock.SYSTEM); + } + + /** + * Wait for the job to finish and return the final status. + * + * @param timeToWait The time to wait in units timeUnit for the job to finish. + * Provide a value less than 1 ms for an infinite wait. + * @param timeUnit The unit of time for timeToWait. + * @param messageHandler If non null this handler will be invoked for each + * batch of messages received. + * @param sleeper A sleeper to use to sleep between attempts. + * @param nanoClock A nanoClock used to time the total time taken. + * @return The final state of the job or null on timeout or if the + * thread is interrupted. + * @throws IOException If there is a persistent problem getting job + * information. + * @throws InterruptedException + */ + @Nullable + @VisibleForTesting + State waitToFinish( + long timeToWait, + TimeUnit timeUnit, + MonitoringUtil.JobMessagesHandler messageHandler, + Sleeper sleeper, + NanoClock nanoClock) + throws IOException, InterruptedException { + MonitoringUtil monitor = new MonitoringUtil(projectId, dataflowClient); + + long lastTimestamp = 0; + BackOff backoff = + timeUnit.toMillis(timeToWait) > 0 + ? new AttemptAndTimeBoundedExponentialBackOff( + MESSAGES_POLLING_ATTEMPTS, + MESSAGES_POLLING_INTERVAL, + timeUnit.toMillis(timeToWait), + AttemptAndTimeBoundedExponentialBackOff.ResetPolicy.ATTEMPTS, + nanoClock) + : new AttemptBoundedExponentialBackOff( + MESSAGES_POLLING_ATTEMPTS, MESSAGES_POLLING_INTERVAL); + State state; + do { + // Get the state of the job before listing messages. This ensures we always fetch job + // messages after the job finishes to ensure we have all them. + state = getStateWithRetries(1, sleeper); + boolean hasError = state == State.UNKNOWN; + + if (messageHandler != null && !hasError) { + // Process all the job messages that have accumulated so far. + try { + List<JobMessage> allMessages = monitor.getJobMessages( + jobId, lastTimestamp); + + if (!allMessages.isEmpty()) { + lastTimestamp = + fromCloudTime(allMessages.get(allMessages.size() - 1).getTime()).getMillis(); + messageHandler.process(allMessages); + } + } catch (GoogleJsonResponseException | SocketTimeoutException e) { + hasError = true; + LOG.warn("There were problems getting current job messages: {}.", e.getMessage()); + LOG.debug("Exception information:", e); + } + } + + if (!hasError) { + backoff.reset(); + // Check if the job is done. + if (state.isTerminal()) { + return state; + } + } + } while(BackOffUtils.next(sleeper, backoff)); + LOG.warn("No terminal state was returned. State value {}", state); + return null; // Timed out. + } + + /** + * Cancels the job. + * @throws IOException if there is a problem executing the cancel request. + */ + public void cancel() throws IOException { + Job content = new Job(); + content.setProjectId(projectId); + content.setId(jobId); + content.setRequestedState("JOB_STATE_CANCELLED"); + dataflowClient.projects().jobs() + .update(projectId, jobId, content) + .execute(); + } + + @Override + public State getState() { + if (terminalState != null) { + return terminalState; + } + + return getStateWithRetries(STATUS_POLLING_ATTEMPTS, Sleeper.DEFAULT); + } + + /** + * Attempts to get the state. Uses exponential backoff on failure up to the maximum number + * of passed in attempts. + * + * @param attempts The amount of attempts to make. + * @param sleeper Object used to do the sleeps between attempts. + * @return The state of the job or State.UNKNOWN in case of failure. + */ + @VisibleForTesting + State getStateWithRetries(int attempts, Sleeper sleeper) { + if (terminalState != null) { + return terminalState; + } + try { + Job job = getJobWithRetries(attempts, sleeper); + return MonitoringUtil.toState(job.getCurrentState()); + } catch (IOException exn) { + // The only IOException that getJobWithRetries is permitted to throw is the final IOException + // that caused the failure of retry. Other exceptions are wrapped in an unchecked exceptions + // and will propagate. + return State.UNKNOWN; + } + } + + /** + * Attempts to get the underlying {@link Job}. Uses exponential backoff on failure up to the + * maximum number of passed in attempts. + * + * @param attempts The amount of attempts to make. + * @param sleeper Object used to do the sleeps between attempts. + * @return The underlying {@link Job} object. + * @throws IOException When the maximum number of retries is exhausted, the last exception is + * thrown. + */ + @VisibleForTesting + Job getJobWithRetries(int attempts, Sleeper sleeper) throws IOException { + AttemptBoundedExponentialBackOff backoff = + new AttemptBoundedExponentialBackOff(attempts, STATUS_POLLING_INTERVAL); + + // Retry loop ends in return or throw + while (true) { + try { + Job job = dataflowClient + .projects() + .jobs() + .get(projectId, jobId) + .execute(); + State currentState = MonitoringUtil.toState(job.getCurrentState()); + if (currentState.isTerminal()) { + terminalState = currentState; + replacedByJob = new DataflowPipelineJob( + getProjectId(), job.getReplacedByJobId(), dataflowClient, aggregatorTransforms); + } + return job; + } catch (IOException exn) { + LOG.warn("There were problems getting current job status: {}.", exn.getMessage()); + LOG.debug("Exception information:", exn); + + if (!nextBackOff(sleeper, backoff)) { + throw exn; + } + } + } + } + + /** + * Identical to {@link BackOffUtils#next} but without checked exceptions. + */ + private boolean nextBackOff(Sleeper sleeper, BackOff backoff) { + try { + return BackOffUtils.next(sleeper, backoff); + } catch (InterruptedException | IOException e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } + throw Throwables.propagate(e); + } + } + + @Override + public <OutputT> AggregatorValues<OutputT> getAggregatorValues(Aggregator<?, OutputT> aggregator) + throws AggregatorRetrievalException { + try { + return new MapAggregatorValues<>(fromMetricUpdates(aggregator)); + } catch (IOException e) { + throw new AggregatorRetrievalException( + "IOException when retrieving Aggregator values for Aggregator " + aggregator, e); + } + } + + private <OutputT> Map<String, OutputT> fromMetricUpdates(Aggregator<?, OutputT> aggregator) + throws IOException { + if (aggregatorTransforms.contains(aggregator)) { + List<MetricUpdate> metricUpdates; + if (terminalMetricUpdates != null) { + metricUpdates = terminalMetricUpdates; + } else { + boolean terminal = getState().isTerminal(); + JobMetrics jobMetrics = + dataflowClient.projects().jobs().getMetrics(projectId, jobId).execute(); + metricUpdates = jobMetrics.getMetrics(); + if (terminal && jobMetrics.getMetrics() != null) { + terminalMetricUpdates = metricUpdates; + } + } + + return DataflowMetricUpdateExtractor.fromMetricUpdates( + aggregator, aggregatorTransforms, metricUpdates); + } else { + throw new IllegalArgumentException( + "Aggregator " + aggregator + " is not used in this pipeline"); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowPipelineRegistrar.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowPipelineRegistrar.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowPipelineRegistrar.java new file mode 100644 index 0000000..8aaa7cc --- /dev/null +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowPipelineRegistrar.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.dataflow.sdk.runners; + +import com.google.auto.service.AutoService; +import com.google.cloud.dataflow.sdk.options.BlockingDataflowPipelineOptions; +import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions; +import com.google.cloud.dataflow.sdk.options.PipelineOptions; +import com.google.cloud.dataflow.sdk.options.PipelineOptionsRegistrar; +import com.google.common.collect.ImmutableList; + +/** + * Contains the {@link PipelineOptionsRegistrar} and {@link PipelineRunnerRegistrar} for + * the {@link DataflowPipeline}. + */ +public class DataflowPipelineRegistrar { + private DataflowPipelineRegistrar() { } + + /** + * Register the {@link DataflowPipelineOptions} and {@link BlockingDataflowPipelineOptions}. + */ + @AutoService(PipelineOptionsRegistrar.class) + public static class Options implements PipelineOptionsRegistrar { + @Override + public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() { + return ImmutableList.<Class<? extends PipelineOptions>>of( + DataflowPipelineOptions.class, + BlockingDataflowPipelineOptions.class); + } + } + + /** + * Register the {@link DataflowPipelineRunner} and {@link BlockingDataflowPipelineRunner}. + */ + @AutoService(PipelineRunnerRegistrar.class) + public static class Runner implements PipelineRunnerRegistrar { + @Override + public Iterable<Class<? extends PipelineRunner<?>>> getPipelineRunners() { + return ImmutableList.<Class<? extends PipelineRunner<?>>>of( + DataflowPipelineRunner.class, + BlockingDataflowPipelineRunner.class); + } + } +}