[ 
https://issues.apache.org/jira/browse/BEAM-12740?focusedWorklogId=650433&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-650433
 ]

ASF GitHub Bot logged work on BEAM-12740:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 14/Sep/21 09:05
            Start Date: 14/Sep/21 09:05
    Worklog Time Spent: 10m 
      Work Description: scwhittle commented on a change in pull request #15355:
URL: https://github.com/apache/beam/pull/15355#discussion_r708068776



##########
File path: 
sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java
##########
@@ -458,38 +458,89 @@ SeekableByteChannel open(GcsPath path, 
GoogleCloudStorageReadOptions readOptions
         new StorageResourceId(path.getBucket(), path.getObject()), 
readOptions);
   }
 
-  /**
-   * Creates an object in GCS.
-   *
-   * <p>Returns a WritableByteChannel that can be used to write data to the 
object.
-   *
-   * @param path the GCS file to write to
-   * @param type the type of object, eg "text/plain".
-   * @return a Callable object that encloses the operation.
-   */
+  /** @deprecated Use {@link #create(GcsPath, CreateOptions)} instead. */
+  @Deprecated
   public WritableByteChannel create(GcsPath path, String type) throws 
IOException {
-    return create(path, type, uploadBufferSizeBytes);
+    CreateOptions.Builder builder = 
CreateOptions.builder().setContentType(type);
+    return create(path, builder.build());
   }
 
-  /**
-   * Same as {@link GcsUtil#create(GcsPath, String)} but allows overriding 
{code
-   * uploadBufferSizeBytes}.
-   */
+  /** @deprecated Use {@link #create(GcsPath, CreateOptions)} instead. */
+  @Deprecated
   public WritableByteChannel create(GcsPath path, String type, Integer 
uploadBufferSizeBytes)
       throws IOException {
+    CreateOptions.Builder builder =
+        CreateOptions.builder()
+            .setContentType(type)
+            .setUploadBufferSizeBytes(uploadBufferSizeBytes);
+    return create(path, builder.build());
+  }
+
+  @AutoValue
+  public abstract static class CreateOptions {
+    /**
+     * If true, the created file is expected to not exist. Instead of checking 
for file presence
+     * before writing a write exception may occur if the file does exist.
+     */
+    public abstract boolean getExpectFileToNotExist();
+
+    /**
+     * If non-null, the upload buffer size to be used. If null, the buffer 
size corresponds to {code
+     * GCSUtil.getUploadBufferSizeBytes}
+     */
+    public abstract @Nullable Integer getUploadBufferSizeBytes();
+
+    /** The content type for the created file, eg "text/plain". */
+    public abstract @Nullable String getContentType();
+
+    public static Builder builder() {
+      return new 
AutoValue_GcsUtil_CreateOptions.Builder().setExpectFileToNotExist(false);
+    }
+
+    @AutoValue.Builder
+    public abstract static class Builder {
+      public abstract Builder setContentType(String value);
+
+      public abstract Builder setUploadBufferSizeBytes(int value);
+
+      public abstract Builder setExpectFileToNotExist(boolean value);
+
+      public abstract CreateOptions build();
+    }
+  }
+  /**
+   * Creates an object in GCS and prepares for uploading its contents.
+   *
+   * @param path the GCS file to write to
+   * @param options to be used for creating and configuring file upload
+   * @return a WritableByteChannel that can be used to write data to the 
object.
+   */
+  public WritableByteChannel create(GcsPath path, CreateOptions options) 
throws IOException {
     AsyncWriteChannelOptions wcOptions = 
googleCloudStorageOptions.getWriteChannelOptions();
-    int uploadChunkSize =
-        (uploadBufferSizeBytes == null) ? wcOptions.getUploadChunkSize() : 
uploadBufferSizeBytes;
-    AsyncWriteChannelOptions newOptions =
-        wcOptions.toBuilder().setUploadChunkSize(uploadChunkSize).build();
+    @Nullable
+    Integer uploadBufferSizeBytes =
+        options.getUploadBufferSizeBytes() != null
+            ? options.getUploadBufferSizeBytes()
+            : getUploadBufferSizeBytes();
+    if (uploadBufferSizeBytes != null) {
+      wcOptions = 
wcOptions.toBuilder().setUploadChunkSize(uploadBufferSizeBytes).build();
+    }
     GoogleCloudStorageOptions newGoogleCloudStorageOptions =
-        
googleCloudStorageOptions.toBuilder().setWriteChannelOptions(newOptions).build();
+        
googleCloudStorageOptions.toBuilder().setWriteChannelOptions(wcOptions).build();
     GoogleCloudStorage gcpStorage =
         new GoogleCloudStorageImpl(
             newGoogleCloudStorageOptions, this.storageClient, 
this.credentials);
-    return gcpStorage.create(
-        new StorageResourceId(path.getBucket(), path.getObject()),
-        
CreateObjectOptions.builder().setOverwriteExisting(true).setContentType(type).build());
+    StorageResourceId resourceId =
+        new StorageResourceId(
+            path.getBucket(),
+            path.getObject(),
+            options.getExpectFileToNotExist() ? 0L : 
StorageResourceId.UNKNOWN_GENERATION_ID);

Review comment:
       It is documented on the create method.
   
https://www.javadoc.io/doc/com.google.cloud.bigdataoss/gcsio/latest/com/google/cloud/hadoop/gcsio/GoogleCloudStorage.html#create-com.google.cloud.hadoop.gcsio.StorageResourceId-com.google.cloud.hadoop.gcsio.CreateObjectOptions-
   
   I added a comment.  (separately maybe we should consider moving from the 
hadoop GCS interface to raw gcs interfaces here which may be more powerful)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 650433)
    Time Spent: 8h  (was: 7h 50m)

> Reduce and backoff GCS metadata operations when writing to GCS files
> --------------------------------------------------------------------
>
>                 Key: BEAM-12740
>                 URL: https://issues.apache.org/jira/browse/BEAM-12740
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-gcp
>            Reporter: Sam Whittle
>            Assignee: Sam Whittle
>            Priority: P2
>          Time Spent: 8h
>  Remaining Estimate: 0h
>
> When issuing GCS operations affecting metadata (ie file-level operations not 
> read/write operations), GCS may return errors indicating backoff. See
> https://cloud.google.com/storage/docs/request-rate#ramp-up
> If such errors are encountered, currently the exception is not handled by 
> GcsUtil.java and is propagated, causing retries and backoff of all operations 
> at a higher level.  Instead we should backoff and retry only such files that 
> require it.
> Additionally FileBasedSink issues deletes for files that have been renamed.  
> The rename itself should take care of removing the original file and thus we 
> can reduce some metadata operations.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to