Repository: beam Updated Branches: refs/heads/master faa2277b5 -> d13549334
Add a default bucket to GcpOptions If gcpStagingLocation is not specified, construct a stable Default Bucket. This bucket is used to stage GCP artifacts. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f29a4761 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f29a4761 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f29a4761 Branch: refs/heads/master Commit: f29a4761220d2e9fa5752af79c6bce690903c760 Parents: faa2277 Author: Sam McVeety <[email protected]> Authored: Wed Jan 18 12:20:10 2017 -0800 Committer: Thomas Groh <[email protected]> Committed: Wed Jan 18 12:56:04 2017 -0800 ---------------------------------------------------------------------- .../options/DataflowPipelineOptionsTest.java | 4 +- .../org/apache/beam/sdk/options/GcpOptions.java | 36 ++++-- .../org/apache/beam/sdk/util/DefaultBucket.java | 105 +++++++++++++++++ .../apache/beam/sdk/util/GcpProjectUtil.java | 2 +- .../java/org/apache/beam/sdk/util/GcsUtil.java | 4 +- .../apache/beam/sdk/options/GcpOptionsTest.java | 4 +- .../apache/beam/sdk/util/DefaultBucketTest.java | 112 +++++++++++++++++++ 7 files changed, 250 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/f29a4761/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java index 9dacfb2..30eee0e 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java @@ -183,9 +183,9 @@ public class DataflowPipelineOptionsTest { @Test public void testDefaultStagingLocationUnset() { DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); + options.setProject(""); thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("Error constructing default value for stagingLocation: " - + "failed to retrieve gcpTempLocation."); + thrown.expectMessage("Error constructing default value for stagingLocation"); options.getStagingLocation(); } } http://git-wip-us.apache.org/repos/asf/beam/blob/f29a4761/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcpOptions.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcpOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcpOptions.java index 042f4b4..c04e4f0 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcpOptions.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcpOptions.java @@ -17,12 +17,11 @@ */ package org.apache.beam.sdk.options; -import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Strings.isNullOrEmpty; import com.fasterxml.jackson.annotation.JsonIgnore; import com.google.auth.Credentials; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Strings; import com.google.common.io.Files; import java.io.File; import java.io.IOException; @@ -34,6 +33,7 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; import javax.annotation.Nullable; import org.apache.beam.sdk.util.CredentialFactory; +import org.apache.beam.sdk.util.DefaultBucket; import org.apache.beam.sdk.util.GcpCredentialFactory; import org.apache.beam.sdk.util.InstanceBuilder; import org.apache.beam.sdk.util.PathValidator; @@ -62,6 +62,17 @@ public interface GcpOptions extends GoogleApiDebugOptions, PipelineOptions { void setProject(String value); /** + * GCP <a href="https://developers.google.com/compute/docs/zones" + * >availability zone</a> for operations. + * + * <p>Default is set on a per-service basis. + */ + @Description("GCP availability zone for running GCP operations. " + + "Default is up to the individual service.") + String getZone(); + void setZone(String value); + + /** * The class of the credential factory that should be created and used to create * credentials. If gcpCredential has not been set explicitly, an instance of this class will * be constructed and used as a credential factory. @@ -197,15 +208,18 @@ public interface GcpOptions extends GoogleApiDebugOptions, PipelineOptions { @Nullable public String create(PipelineOptions options) { String tempLocation = options.getTempLocation(); - checkArgument(!Strings.isNullOrEmpty(options.getTempLocation()), - "Error constructing default value for gcpTempLocation: tempLocation is not set"); - try { - PathValidator validator = options.as(GcsOptions.class).getPathValidator(); - validator.validateOutputFilePrefixSupported(tempLocation); - } catch (Exception e) { - throw new IllegalArgumentException(String.format( - "Error constructing default value for gcpTempLocation: tempLocation is not" - + " a valid GCS path, %s. ", tempLocation), e); + if (isNullOrEmpty(tempLocation)) { + tempLocation = DefaultBucket.tryCreateDefaultBucket(options); + options.setTempLocation(tempLocation); + } else { + try { + PathValidator validator = options.as(GcsOptions.class).getPathValidator(); + validator.validateOutputFilePrefixSupported(tempLocation); + } catch (Exception e) { + throw new IllegalArgumentException(String.format( + "Error constructing default value for gcpTempLocation: tempLocation is not" + + " a valid GCS path, %s. ", tempLocation), e); + } } return tempLocation; } http://git-wip-us.apache.org/repos/asf/beam/blob/f29a4761/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DefaultBucket.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DefaultBucket.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DefaultBucket.java new file mode 100644 index 0000000..75954c0 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DefaultBucket.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Strings.isNullOrEmpty; + +import com.google.api.services.storage.model.Bucket; +import com.google.common.annotations.VisibleForTesting; +import java.io.IOException; +import java.nio.file.FileAlreadyExistsException; +import org.apache.beam.sdk.options.CloudResourceManagerOptions; +import org.apache.beam.sdk.options.GcsOptions; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.util.gcsfs.GcsPath; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Utility class for handling default GCS buckets. + */ +public class DefaultBucket { + static final Logger LOG = LoggerFactory.getLogger(DefaultBucket.class); + + static final String DEFAULT_REGION = "us-central1"; + + /** + * Creates a default bucket or verifies the existence and proper access control + * of an existing default bucket. Returns the location if successful. + */ + public static String tryCreateDefaultBucket(PipelineOptions options) { + GcsOptions gcpOptions = options.as(GcsOptions.class); + + final String projectId = gcpOptions.getProject(); + checkArgument(!isNullOrEmpty(projectId), + "--project is a required option."); + + // Look up the project number, to create a default bucket with a stable + // name with no special characters. + long projectNumber = 0L; + try { + projectNumber = gcpOptions.as(CloudResourceManagerOptions.class) + .getGcpProjectUtil().getProjectNumber(projectId); + } catch (IOException e) { + throw new RuntimeException("Unable to verify project with ID " + projectId, e); + } + String region = DEFAULT_REGION; + if (!isNullOrEmpty(gcpOptions.getZone())) { + region = getRegionFromZone(gcpOptions.getZone()); + } + final String bucketName = + "dataflow-staging-" + region + "-" + projectNumber; + LOG.info("No staging location provided, attempting to use default bucket: {}", + bucketName); + Bucket bucket = new Bucket() + .setName(bucketName) + .setLocation(region); + // Always try to create the bucket before checking access, so that we do not + // race with other pipelines that may be attempting to do the same thing. + try { + gcpOptions.getGcsUtil().createBucket(projectId, bucket); + } catch (FileAlreadyExistsException e) { + LOG.debug("Bucket '{}'' already exists, verifying access.", bucketName); + } catch (IOException e) { + throw new RuntimeException("Unable create default bucket.", e); + } + + // Once the bucket is expected to exist, verify that it is correctly owned + // by the project executing the job. + try { + long owner = gcpOptions.getGcsUtil().bucketOwner( + GcsPath.fromComponents(bucketName, "")); + checkArgument( + owner == projectNumber, + "Bucket owner does not match the project from --project:" + + " %s vs. %s", owner, projectNumber); + } catch (IOException e) { + throw new RuntimeException( + "Unable to determine the owner of the default bucket at gs://" + bucketName, e); + } + return "gs://" + bucketName; + } + + @VisibleForTesting + static String getRegionFromZone(String zone) { + String[] zoneParts = zone.split("-"); + checkArgument(zoneParts.length >= 2, "Invalid zone provided: %s", zone); + return zoneParts[0] + "-" + zoneParts[1]; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/f29a4761/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpProjectUtil.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpProjectUtil.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpProjectUtil.java index beac4e4..f73afe0 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpProjectUtil.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpProjectUtil.java @@ -76,7 +76,7 @@ public class GcpProjectUtil { * Returns the project number or throws an exception if the project does not * exist or has other access exceptions. */ - long getProjectNumber(String projectId) throws IOException { + public long getProjectNumber(String projectId) throws IOException { return getProjectNumber( projectId, BACKOFF_FACTORY.backoff(), http://git-wip-us.apache.org/repos/asf/beam/blob/f29a4761/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java index 521673c..a10ea28 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java @@ -451,7 +451,9 @@ public class GcsUtil { void createBucket(String projectId, Bucket bucket, BackOff backoff, Sleeper sleeper) throws IOException { Storage.Buckets.Insert insertBucket = - storageClient.buckets().insert(projectId, bucket); + storageClient.buckets().insert(projectId, bucket); + insertBucket.setPredefinedAcl("projectPrivate"); + insertBucket.setPredefinedDefaultObjectAcl("projectPrivate"); try { ResilientOperation.retry( http://git-wip-us.apache.org/repos/asf/beam/blob/f29a4761/sdks/java/core/src/test/java/org/apache/beam/sdk/options/GcpOptionsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/GcpOptionsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/GcpOptionsTest.java index 7854d67..288383e 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/GcpOptionsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/GcpOptionsTest.java @@ -109,9 +109,9 @@ public class GcpOptionsTest { @Test public void testEmptyGcpTempLocation() throws Exception { GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class); + options.setProject(""); thrown.expect(IllegalArgumentException.class); - thrown.expectMessage( - "Error constructing default value for gcpTempLocation: tempLocation is not set"); + thrown.expectMessage("--project is a required option"); options.getGcpTempLocation(); } http://git-wip-us.apache.org/repos/asf/beam/blob/f29a4761/sdks/java/core/src/test/java/org/apache/beam/sdk/util/DefaultBucketTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/DefaultBucketTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/DefaultBucketTest.java new file mode 100644 index 0000000..395e1f3 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/DefaultBucketTest.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.when; + +import com.google.api.services.storage.model.Bucket; +import java.io.IOException; +import org.apache.beam.sdk.options.CloudResourceManagerOptions; +import org.apache.beam.sdk.options.GcpOptions; +import org.apache.beam.sdk.options.GcsOptions; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.util.gcsfs.GcsPath; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.MockitoAnnotations; +import org.mockito.MockitoAnnotations.Mock; + +/** Tests for DefaultBucket. */ +@RunWith(JUnit4.class) +public class DefaultBucketTest { + @Rule public ExpectedException thrown = ExpectedException.none(); + + private PipelineOptions options; + @Mock + private GcsUtil gcsUtil; + @Mock + private GcpProjectUtil gcpUtil; + + @Before + public void setUp() { + MockitoAnnotations.initMocks(this); + options = PipelineOptionsFactory.create(); + options.as(GcsOptions.class).setGcsUtil(gcsUtil); + options.as(CloudResourceManagerOptions.class).setGcpProjectUtil(gcpUtil); + options.as(GcpOptions.class).setProject("foo"); + options.as(GcpOptions.class).setZone("us-north1-a"); + } + + @Test + public void testCreateBucket() { + String bucket = DefaultBucket.tryCreateDefaultBucket(options); + assertEquals("gs://dataflow-staging-us-north1-0", bucket); + } + + @Test + public void testCreateBucketProjectLookupFails() throws IOException { + when(gcpUtil.getProjectNumber("foo")).thenThrow(new IOException("badness")); + + thrown.expect(RuntimeException.class); + thrown.expectMessage("Unable to verify project"); + DefaultBucket.tryCreateDefaultBucket(options); + } + + @Test + public void testCreateBucketCreateBucketFails() throws IOException { + doThrow(new IOException("badness")).when( + gcsUtil).createBucket(any(String.class), any(Bucket.class)); + + thrown.expect(RuntimeException.class); + thrown.expectMessage("Unable create default bucket"); + DefaultBucket.tryCreateDefaultBucket(options); + } + + @Test + public void testCannotGetBucketOwner() throws IOException { + when(gcsUtil.bucketOwner(any(GcsPath.class))) + .thenThrow(new IOException("badness")); + + thrown.expect(RuntimeException.class); + thrown.expectMessage("Unable to determine the owner"); + DefaultBucket.tryCreateDefaultBucket(options); + } + + @Test + public void testProjectMismatch() throws IOException { + when(gcsUtil.bucketOwner(any(GcsPath.class))).thenReturn(5L); + + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Bucket owner does not match the project"); + DefaultBucket.tryCreateDefaultBucket(options); + } + + @Test + public void regionFromZone() throws IOException { + assertEquals("us-central1", DefaultBucket.getRegionFromZone("us-central1-a")); + assertEquals("asia-east", DefaultBucket.getRegionFromZone("asia-east-a")); + } +}
