This is an automated email from the ASF dual-hosted git repository.
altay pushed a commit to branch release-2.7.1
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/release-2.7.1 by this push:
new 11d7e6d [BEAM-7744] LTS backport: Temporary directory for
WriteOperation may not be unique in FileBaseSink
new a35de2f Merge pull request #9071 from ihji/cherry-pick-7689-2.7.1
11d7e6d is described below
commit 11d7e6d8fa713e3865a808ed8b4888fc5822e08d
Author: Heejong Lee <[email protected]>
AuthorDate: Mon Jul 8 14:18:42 2019 -0700
[BEAM-7744] LTS backport: Temporary directory for WriteOperation may not be
unique in FileBaseSink
Temporary directory for WriteOperation in FileBasedSink is generated
from a second-granularity timestamp (yyyy-MM-dd_HH-mm-ss) and unique
increasing index. Such granularity is not enough to make temporary
directories unique between different jobs. When two jobs share the same
temporary directory, output file may not be produced in one job since
the required temporary file can be deleted from another job.
To avoid fore-mentioned issue, this commit changes the temporary
directory name from a second-granularity timestamp to UUID.
---
.../java/org/apache/beam/sdk/io/FileBasedSink.java | 20 ++++----------------
1 file changed, 4 insertions(+), 16 deletions(-)
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index 92b2382..22bee93 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -48,7 +48,6 @@ import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
-import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
@@ -82,9 +81,6 @@ import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors.TypeVariableExtractor;
-import org.joda.time.Instant;
-import org.joda.time.format.DateTimeFormat;
-import org.joda.time.format.DateTimeFormatter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -504,7 +500,7 @@ public abstract class FileBasedSink<UserT, DestinationT,
OutputT>
*
* <p>Default is a uniquely named subdirectory of the provided
tempDirectory, e.g. if
* tempDirectory is /path/to/foo/, the temporary directory will be
- * /path/to/foo/temp-beam-foo-$date.
+ * /path/to/foo/.temp-beam-$uuid.
*
* @param sink the FileBasedSink that will be used to configure this write
operation.
*/
@@ -516,20 +512,12 @@ public abstract class FileBasedSink<UserT, DestinationT,
OutputT>
private static class TemporaryDirectoryBuilder
implements SerializableFunction<ResourceId, ResourceId> {
- private static final AtomicLong TEMP_COUNT = new AtomicLong(0);
- private static final DateTimeFormatter TEMPDIR_TIMESTAMP =
- DateTimeFormat.forPattern("yyyy-MM-dd_HH-mm-ss");
- // The intent of the code is to have a consistent value of tempDirectory
across
- // all workers, which wouldn't happen if now() was called inline.
- private final String timestamp =
Instant.now().toString(TEMPDIR_TIMESTAMP);
- // Multiple different sinks may be used in the same output directory;
use tempId to create a
- // separate temp directory for each.
- private final Long tempId = TEMP_COUNT.getAndIncrement();
+ private final UUID tempUUID = UUID.randomUUID();
@Override
public ResourceId apply(ResourceId tempDirectory) {
- // Temp directory has a timestamp and a unique ID
- String tempDirName = String.format(TEMP_DIRECTORY_PREFIX + "-%s-%s",
timestamp, tempId);
+ // Temp directory has a random UUID postfix (BEAM-7689)
+ String tempDirName = String.format(TEMP_DIRECTORY_PREFIX + "-%s",
tempUUID);
return tempDirectory
.getCurrentDirectory()
.resolve(tempDirName, StandardResolveOptions.RESOLVE_DIRECTORY);