This is an automated email from the ASF dual-hosted git repository.

altay pushed a commit to branch release-2.7.1
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/release-2.7.1 by this push:
     new 11d7e6d  [BEAM-7744] LTS backport: Temporary directory for 
WriteOperation may not be unique in FileBaseSink
     new a35de2f  Merge pull request #9071 from ihji/cherry-pick-7689-2.7.1
11d7e6d is described below

commit 11d7e6d8fa713e3865a808ed8b4888fc5822e08d
Author: Heejong Lee <[email protected]>
AuthorDate: Mon Jul 8 14:18:42 2019 -0700

    [BEAM-7744] LTS backport: Temporary directory for WriteOperation may not be 
unique in FileBaseSink
    
    Temporary directory for WriteOperation in FileBasedSink is generated
    from a second-granularity timestamp (yyyy-MM-dd_HH-mm-ss) and unique
    increasing index. Such granularity is not enough to make temporary
    directories unique between different jobs. When two jobs share the same
    temporary directory, output file may not be produced in one job since
    the required temporary file can be deleted from another job.
    
    To avoid fore-mentioned issue, this commit changes the temporary
    directory name from a second-granularity timestamp to UUID.
---
 .../java/org/apache/beam/sdk/io/FileBasedSink.java   | 20 ++++----------------
 1 file changed, 4 insertions(+), 16 deletions(-)

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index 92b2382..22bee93 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -48,7 +48,6 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.atomic.AtomicLong;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
@@ -82,9 +81,6 @@ import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TypeDescriptor;
 import org.apache.beam.sdk.values.TypeDescriptors.TypeVariableExtractor;
-import org.joda.time.Instant;
-import org.joda.time.format.DateTimeFormat;
-import org.joda.time.format.DateTimeFormatter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -504,7 +500,7 @@ public abstract class FileBasedSink<UserT, DestinationT, 
OutputT>
      *
      * <p>Default is a uniquely named subdirectory of the provided 
tempDirectory, e.g. if
      * tempDirectory is /path/to/foo/, the temporary directory will be
-     * /path/to/foo/temp-beam-foo-$date.
+     * /path/to/foo/.temp-beam-$uuid.
      *
      * @param sink the FileBasedSink that will be used to configure this write 
operation.
      */
@@ -516,20 +512,12 @@ public abstract class FileBasedSink<UserT, DestinationT, 
OutputT>
 
     private static class TemporaryDirectoryBuilder
         implements SerializableFunction<ResourceId, ResourceId> {
-      private static final AtomicLong TEMP_COUNT = new AtomicLong(0);
-      private static final DateTimeFormatter TEMPDIR_TIMESTAMP =
-          DateTimeFormat.forPattern("yyyy-MM-dd_HH-mm-ss");
-      // The intent of the code is to have a consistent value of tempDirectory 
across
-      // all workers, which wouldn't happen if now() was called inline.
-      private final String timestamp = 
Instant.now().toString(TEMPDIR_TIMESTAMP);
-      // Multiple different sinks may be used in the same output directory; 
use tempId to create a
-      // separate temp directory for each.
-      private final Long tempId = TEMP_COUNT.getAndIncrement();
+      private final UUID tempUUID = UUID.randomUUID();
 
       @Override
       public ResourceId apply(ResourceId tempDirectory) {
-        // Temp directory has a timestamp and a unique ID
-        String tempDirName = String.format(TEMP_DIRECTORY_PREFIX + "-%s-%s", 
timestamp, tempId);
+        // Temp directory has a random UUID postfix (BEAM-7689)
+        String tempDirName = String.format(TEMP_DIRECTORY_PREFIX + "-%s", 
tempUUID);
         return tempDirectory
             .getCurrentDirectory()
             .resolve(tempDirName, StandardResolveOptions.RESOLVE_DIRECTORY);

Reply via email to