Accept Region in Dataflow Monitoring Page URL Update Google Cloud Dataflow FE URLs from the Dataflow Runners to regionalized paths.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/111603a9 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/111603a9 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/111603a9 Branch: refs/heads/master Commit: 111603a9952f415fa1386046f7a2d3bde5b6532d Parents: 2d5b6d7 Author: Robert Burke <rob...@frantil.com> Authored: Tue Jun 27 15:41:56 2017 -0700 Committer: Thomas Groh <tg...@google.com> Committed: Tue Jul 18 14:49:56 2017 -0700 ---------------------------------------------------------------------- .../beam/runners/dataflow/DataflowPipelineJob.java | 14 ++++++++++++-- .../beam/runners/dataflow/DataflowRunner.java | 3 ++- .../beam/runners/dataflow/util/MonitoringUtil.java | 16 +++++++++++++--- .../dataflow/BatchStatefulParDoOverridesTest.java | 1 + .../dataflow/DataflowPipelineTranslatorTest.java | 1 + .../runners/dataflow/internal/apiclient.py | 7 +++++-- .../runners/dataflow/test_dataflow_runner.py | 5 +++-- 7 files changed, 37 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/111603a9/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java index e30d426..e736373 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java @@ -169,6 +169,13 @@ public class DataflowPipelineJob implements PipelineResult { } /** + * Get the region this job exists in. + */ + public String getRegion() { + return dataflowOptions.getRegion(); + } + + /** * Returns a new {@link DataflowPipelineJob} for the job that replaced this one, if applicable. * * @throws IllegalStateException if called before the job has terminated or if the job terminated @@ -344,7 +351,9 @@ public class DataflowPipelineJob implements PipelineResult { getJobId(), getReplacedByJob().getJobId(), MonitoringUtil.getJobMonitoringPageURL( - getReplacedByJob().getProjectId(), getReplacedByJob().getJobId())); + getReplacedByJob().getProjectId(), + getRegion(), + getReplacedByJob().getJobId())); break; default: LOG.info("Job {} failed with status {}.", getJobId(), state); @@ -422,7 +431,8 @@ public class DataflowPipelineJob implements PipelineResult { "Failed to cancel job in state %s, " + "please go to the Developers Console to cancel it manually: %s", state, - MonitoringUtil.getJobMonitoringPageURL(getProjectId(), getJobId())); + MonitoringUtil.getJobMonitoringPageURL( + getProjectId(), getRegion(), getJobId())); LOG.warn(errorMsg); throw new IOException(errorMsg, e); } http://git-wip-us.apache.org/repos/asf/beam/blob/111603a9/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 8935759..57a5ea5 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -679,7 +679,8 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { } LOG.info("To access the Dataflow monitoring console, please navigate to {}", - MonitoringUtil.getJobMonitoringPageURL(options.getProject(), jobResult.getId())); + MonitoringUtil.getJobMonitoringPageURL( + options.getProject(), options.getRegion(), jobResult.getId())); System.out.println("Submitted job: " + jobResult.getId()); LOG.info("To cancel the job using the 'gcloud' tool, run:\n> {}", http://git-wip-us.apache.org/repos/asf/beam/blob/111603a9/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java index 759387c..780a979 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java @@ -180,14 +180,24 @@ public class MonitoringUtil { return allMessages; } + /** + * @deprecated this method defaults the region to "us-central1". Prefer using the overload with + * an explicit regionId parameter. + */ + @Deprecated public static String getJobMonitoringPageURL(String projectName, String jobId) { + return getJobMonitoringPageURL(projectName, "us-central1", jobId); + } + + public static String getJobMonitoringPageURL(String projectName, String regionId, String jobId) { try { // Project name is allowed in place of the project id: the user will be redirected to a URL // that has the project name replaced with project id. return String.format( - "https://console.developers.google.com/project/%s/dataflow/job/%s", - URLEncoder.encode(projectName, "UTF-8"), - URLEncoder.encode(jobId, "UTF-8")); + "https://console.cloud.google.com/dataflow/jobsDetail/locations/%s/jobs/%s?project=%s", + URLEncoder.encode(regionId, "UTF-8"), + URLEncoder.encode(jobId, "UTF-8"), + URLEncoder.encode(projectName, "UTF-8")); } catch (UnsupportedEncodingException e) { // Should never happen. throw new AssertionError("UTF-8 encoding is not supported by the environment", e); http://git-wip-us.apache.org/repos/asf/beam/blob/111603a9/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverridesTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverridesTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverridesTest.java index d2ab357..e62a8b8 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverridesTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverridesTest.java @@ -161,6 +161,7 @@ public class BatchStatefulParDoOverridesTest implements Serializable { options.setGcpCredential(new TestCredential()); options.setJobName("some-job-name"); options.setProject("some-project"); + options.setRegion("some-region"); options.setTempLocation(GcsPath.fromComponents("somebucket", "some/path").toString()); options.setFilesToStage(new LinkedList<String>()); options.setGcsUtil(mockGcsUtil); http://git-wip-us.apache.org/repos/asf/beam/blob/111603a9/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java index 43b2788..9a0bdf8 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java @@ -200,6 +200,7 @@ public class DataflowPipelineTranslatorTest implements Serializable { options.setGcpCredential(new TestCredential()); options.setJobName("some-job-name"); options.setProject("some-project"); + options.setRegion("some-region"); options.setTempLocation(GcsPath.fromComponents("somebucket", "some/path").toString()); options.setFilesToStage(new LinkedList<String>()); options.setDataflowClient(buildMockDataflow(new IsValidCreateRequest())); http://git-wip-us.apache.org/repos/asf/beam/blob/111603a9/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py index 33dfe19..dcaf74e 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py @@ -500,8 +500,11 @@ class DataflowApplicationClient(object): logging.info('Created job with id: [%s]', response.id) logging.info( 'To access the Dataflow monitoring console, please navigate to ' - 'https://console.developers.google.com/project/%s/dataflow/job/%s', - self.google_cloud_options.project, response.id) + 'https://console.cloud.google.com/dataflow/jobsDetail' + '/locations/%s/jobs/%s?project=%s', + self.google_cloud_options.region, + response.id, + self.google_cloud_options.project) return response http://git-wip-us.apache.org/repos/asf/beam/blob/111603a9/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py index b339882..96e6a66 100644 --- a/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py @@ -38,12 +38,13 @@ class TestDataflowRunner(DataflowRunner): self.result = super(TestDataflowRunner, self).run(pipeline) if self.result.has_job: project = pipeline._options.view_as(GoogleCloudOptions).project + region_id = pipeline._options.view_as(GoogleCloudOptions).region job_id = self.result.job_id() # TODO(markflyhigh)(BEAM-1890): Use print since Nose dosen't show logs # in some cases. print ( - 'Found: https://console.cloud.google.com/dataflow/job/%s?project=%s' % - (job_id, project)) + 'Found: https://console.cloud.google.com/dataflow/jobsDetail' + '/locations/%s/jobs/%s?project=%s' % (region_id, job_id, project)) self.result.wait_until_finish() if on_success_matcher: