[ 
https://issues.apache.org/jira/browse/BEAM-1507?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16293568#comment-16293568
 ] 

ASF GitHub Bot commented on BEAM-1507:
--------------------------------------

jkff closed pull request #3167: [BEAM-1507] adding TTL check for staging 
location
URL: https://github.com/apache/beam/pull/3167
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java
index d18e306cfe8..a510f802c42 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java
@@ -22,17 +22,25 @@
 import static com.google.common.base.Preconditions.checkNotNull;
 
 import com.google.api.services.dataflow.model.DataflowPackage;
+
 import java.util.List;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
+import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
 import org.apache.beam.sdk.extensions.gcp.storage.GcsCreateOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.util.MimeTypes;
+import org.apache.beam.sdk.util.gcsfs.GcsPath;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Utility class for staging files to GCS.
  */
 public class GcsStager implements Stager {
+
+  private static final Logger LOG = LoggerFactory.getLogger(GcsStager.class);
+
   private DataflowPipelineOptions options;
 
   private GcsStager(DataflowPipelineOptions options) {
@@ -46,7 +54,10 @@ public static GcsStager fromOptions(PipelineOptions options) 
{
 
   @Override
   public List<DataflowPackage> stageFiles() {
+
     checkNotNull(options.getStagingLocation());
+    warnIfStagingHasTTL();
+
     String windmillBinary =
         
options.as(DataflowPipelineDebugOptions.class).getOverrideWindmillBinary();
     if (windmillBinary != null) {
@@ -67,4 +78,19 @@ public static GcsStager fromOptions(PipelineOptions options) 
{
         options.getStagingLocation(),
         createOptions);
   }
+
+  private void warnIfStagingHasTTL() {
+    try {
+      LOG.debug("Checking if staging location {} has TTL assigned", 
options.getStagingLocation());
+      boolean stagingHasTTL = options.as(GcsOptions.class).getGcsUtil()
+          .bucketHasTTL(GcsPath.fromUri(options.getStagingLocation()));
+      if (stagingHasTTL) {
+        LOG.warn("Staging location {} has TTL assigned. This might cause 
unpredictable bugs."
+            , options.getStagingLocation());
+      }
+    } catch (Exception ie) {
+      LOG.warn("Exception while trying to determine if staging location has 
TTL assigned", ie);
+    }
+  }
+
 }
diff --git 
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
 
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
index 94b733a7f8d..63b43a46ba4 100644
--- 
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
+++ 
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
@@ -30,6 +30,7 @@
 import com.google.api.client.util.Sleeper;
 import com.google.api.services.storage.Storage;
 import com.google.api.services.storage.model.Bucket;
+import com.google.api.services.storage.model.Bucket.Lifecycle.Rule;
 import com.google.api.services.storage.model.Objects;
 import com.google.api.services.storage.model.StorageObject;
 import com.google.auto.value.AutoValue;
@@ -454,6 +455,26 @@ public boolean bucketAccessible(GcsPath path) throws 
IOException {
         Sleeper.DEFAULT);
   }
 
+  /**
+   * Returns whether the GCS bucket has TTL assigned.
+   *
+   * @param path the path to GCS bucket
+   * @return true if TTL is assigned to bucket, false otherwise
+   */
+  public boolean bucketHasTTL(GcsPath path) throws IOException {
+    Bucket bucket = getBucket(path, createBackOff(), Sleeper.DEFAULT);
+    if (bucket != null && bucket.getLifecycle() != null
+        && bucket.getLifecycle().getRule() != null) {
+      for (Rule r : bucket.getLifecycle().getRule()) {
+        if ("Delete".equalsIgnoreCase(r.getAction().getType())) {
+          LOG.debug("Bucket {} has TTL assigned", path);
+          return true;
+        }
+      }
+    }
+    return false;
+  }
+
   /**
    * Returns the project number of the project which owns this bucket.
    * If the bucket exists, it must be accessible otherwise the permissions
diff --git 
a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java
 
b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java
index 532645047d5..c582bc9b432 100644
--- 
a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java
+++ 
b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java
@@ -50,6 +50,8 @@
 import com.google.api.client.util.BackOff;
 import com.google.api.services.storage.Storage;
 import com.google.api.services.storage.model.Bucket;
+import com.google.api.services.storage.model.Bucket.Lifecycle;
+import com.google.api.services.storage.model.Bucket.Lifecycle.Rule.Action;
 import com.google.api.services.storage.model.Objects;
 import com.google.api.services.storage.model.StorageObject;
 import com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel;
@@ -68,6 +70,7 @@
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
@@ -628,8 +631,51 @@ public void testBucketDoesNotExist() throws IOException {
     Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class);
     Storage.Buckets.Get mockStorageGet = 
Mockito.mock(Storage.Buckets.Get.class);
 
-    BackOff mockBackOff = BackOffAdapter.toGcpBackOff(
-        FluentBackoff.DEFAULT.backoff());
+    BackOff mockBackOff = 
BackOffAdapter.toGcpBackOff(FluentBackoff.DEFAULT.backoff());
+
+    when(mockStorage.buckets()).thenReturn(mockStorageObjects);
+    when(mockStorageObjects.get("testbucket")).thenReturn(mockStorageGet);
+    when(mockStorageGet.execute()).thenThrow(googleJsonResponseException(
+        HttpStatusCodes.STATUS_CODE_NOT_FOUND, "It don't exist", "Nothing here 
to see"));
+
+    assertFalse(gcsUtil.bucketAccessible(GcsPath.fromComponents("testbucket", 
"testobject"),
+        mockBackOff, new FastNanoClockAndSleeper()));
+  }
+
+  @Test
+  public void testBucketAccessErrorNoTTLAttached() throws IOException {
+    GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
+    GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
+
+    Storage mockStorage = Mockito.mock(Storage.class);
+    gcsUtil.setStorageClient(mockStorage);
+
+    Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class);
+    Storage.Buckets.Get mockStorageGet = 
Mockito.mock(Storage.Buckets.Get.class);
+
+    GoogleJsonResponseException expectedException = 
googleJsonResponseException(
+        HttpStatusCodes.STATUS_CODE_FORBIDDEN, "Waves hand mysteriously",
+        "These aren't the buckets you're looking for");
+
+    when(mockStorage.buckets()).thenReturn(mockStorageObjects);
+    when(mockStorageObjects.get("testbucket")).thenReturn(mockStorageGet);
+    when(mockStorageGet.execute()).thenThrow(expectedException);
+
+    thrown.expect(AccessDeniedException.class);
+
+    assertFalse(gcsUtil.bucketHasTTL(GcsPath.fromComponents("testbucket", 
"testobject")));
+  }
+
+  @Test
+  public void testBucketDoesNotExistNoTTLAttached() throws IOException {
+    GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
+    GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
+
+    Storage mockStorage = Mockito.mock(Storage.class);
+    gcsUtil.setStorageClient(mockStorage);
+
+    Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class);
+    Storage.Buckets.Get mockStorageGet = 
Mockito.mock(Storage.Buckets.Get.class);
 
     when(mockStorage.buckets()).thenReturn(mockStorageObjects);
     when(mockStorageObjects.get("testbucket")).thenReturn(mockStorageGet);
@@ -637,8 +683,69 @@ public void testBucketDoesNotExist() throws IOException {
         
.thenThrow(googleJsonResponseException(HttpStatusCodes.STATUS_CODE_NOT_FOUND,
             "It don't exist", "Nothing here to see"));
 
-    assertFalse(gcsUtil.bucketAccessible(GcsPath.fromComponents("testbucket", 
"testobject"),
-        mockBackOff, new FastNanoClockAndSleeper()));
+    thrown.expect(FileNotFoundException.class);
+
+    assertFalse(gcsUtil.bucketHasTTL(GcsPath.fromComponents("testbucket", 
"testobject")));
+  }
+
+  @Test
+  public void testGetBucketWithTTLAttached() throws IOException {
+    GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
+    GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
+
+    Storage mockStorage = Mockito.mock(Storage.class);
+    gcsUtil.setStorageClient(mockStorage);
+
+    Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class);
+    Storage.Buckets.Get mockStorageGet = 
Mockito.mock(Storage.Buckets.Get.class);
+
+    Bucket b = new Bucket();
+    Lifecycle lc = new Lifecycle();
+    com.google.api.services.storage.model.Bucket.Lifecycle.Rule deleteRule =
+        new com.google.api.services.storage.model.Bucket.Lifecycle.Rule();
+    Action deleteAction = new Action();
+    deleteAction.setType("Delete");
+    deleteRule.setAction(deleteAction);
+    List<com.google.api.services.storage.model.Bucket.Lifecycle.Rule> rules = 
new LinkedList<>();
+    rules.add(deleteRule);
+    lc.setRule(rules);
+    b.setLifecycle(lc);
+    when(mockStorage.buckets()).thenReturn(mockStorageObjects);
+    when(mockStorageObjects.get("testbucket")).thenReturn(mockStorageGet);
+    when(mockStorageGet.execute()).thenThrow(new 
SocketTimeoutException("SocketException"))
+        .thenReturn(b);
+
+    assertTrue(gcsUtil.bucketHasTTL(GcsPath.fromComponents("testbucket", 
"testobject")));
+  }
+
+  @Test
+  public void testGetBucketWithNoTTLAttached() throws IOException {
+    GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
+    GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
+
+    Storage mockStorage = Mockito.mock(Storage.class);
+    gcsUtil.setStorageClient(mockStorage);
+
+    Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class);
+    Storage.Buckets.Get mockStorageGet = 
Mockito.mock(Storage.Buckets.Get.class);
+
+    Bucket b = new Bucket();
+    Lifecycle lc = new Lifecycle();
+    com.google.api.services.storage.model.Bucket.Lifecycle.Rule deleteRule =
+        new com.google.api.services.storage.model.Bucket.Lifecycle.Rule();
+    Action deleteAction = new Action();
+    deleteAction.setType("SomeOtherType");
+    deleteRule.setAction(deleteAction);
+    List<com.google.api.services.storage.model.Bucket.Lifecycle.Rule> rules = 
new LinkedList<>();
+    rules.add(deleteRule);
+    lc.setRule(rules);
+    b.setLifecycle(lc);
+    when(mockStorage.buckets()).thenReturn(mockStorageObjects);
+    when(mockStorageObjects.get("testbucket")).thenReturn(mockStorageGet);
+    when(mockStorageGet.execute()).thenThrow(new 
SocketTimeoutException("SocketException"))
+        .thenReturn(b);
+
+    assertFalse(gcsUtil.bucketHasTTL(GcsPath.fromComponents("testbucket", 
"testobject")));
   }
 
   @Test


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> java DataflowRunner should warn if the stagingLocation has a TTL
> ----------------------------------------------------------------
>
>                 Key: BEAM-1507
>                 URL: https://issues.apache.org/jira/browse/BEAM-1507
>             Project: Beam
>          Issue Type: Improvement
>          Components: runner-dataflow
>            Reporter: Daniel Halperin
>            Priority: Minor
>              Labels: newbie, starter
>
> We have seen a few customers run into a hard-to-track-down bug where the 
> staging bucket has a TTL, but files get TTL-deleted when they are still 
> needed.
> This might be because of:
> 1. Long lived batch jobs / streaming jobs can reference staged files 
> arbitrarily later and will fail in bad ways if they have been deleted.
> 2. Some customers even hit issues where the "check file already exists" 
> succeeds when starting a job, but then the file is TTL-deleted before the job 
> actually starts. (This sounds crazy, but may happen if TTL is 7 days and jobs 
> run every 7 days, for example. Race condition.)
> I'm hoping it's not hard to check that files would have TTLs and warn if so.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to