This is an automated email from the ASF dual-hosted git repository.

altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new dfd46d8  [BEAM-7846] add test for BEAM-7689
     new a2b57e3  Merge pull request #9228 from ihji/BEAM-7846
dfd46d8 is described below

commit dfd46d8e02e8f18dc8504931cb27558b9d8d75ab
Author: Heejong Lee <[email protected]>
AuthorDate: Thu Aug 1 18:10:31 2019 -0700

    [BEAM-7846] add test for BEAM-7689
    
    check whether FileBasedSink can create a unique temporary directory.
---
 .../java/org/apache/beam/sdk/io/FileBasedSinkTest.java   | 16 ++++++++++++++++
 .../src/test/java/org/apache/beam/sdk/io/SimpleSink.java |  4 ++++
 sdks/python/apache_beam/io/filebasedsink_test.py         |  8 ++++++++
 3 files changed, 28 insertions(+)

diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
index 8a4733f..5da3290 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
@@ -47,6 +47,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.Set;
 import java.util.zip.GZIPInputStream;
 import org.apache.beam.sdk.io.FileBasedSink.CompressionType;
 import org.apache.beam.sdk.io.FileBasedSink.FileResult;
@@ -59,6 +60,7 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
 import 
org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
 import 
org.apache.commons.compress.compressors.deflate.DeflateCompressorInputStream;
 import org.junit.Rule;
@@ -141,6 +143,20 @@ public class FileBasedSinkTest {
     }
   }
 
+  /** Test whether WriteOperation can create a unique temporary directory. */
+  @Test
+  public void testTemporaryDirectoryUniqueness() {
+    List<SimpleSink.SimpleWriteOperation<Void>> writeOps = 
Lists.newArrayListWithCapacity(1000);
+    for (int i = 0; i < 1000; i++) {
+      writeOps.add(buildWriteOperation());
+    }
+    Set<String> tempDirectorySet = Sets.newHashSetWithExpectedSize(1000);
+    for (SimpleSink.SimpleWriteOperation<Void> op : writeOps) {
+      tempDirectorySet.add(op.getTempDirectory().toString());
+    }
+    assertEquals(1000, tempDirectorySet.size());
+  }
+
   /** Removes temporary files when temporary and output directories differ. */
   @Test
   public void testRemoveWithTempFilename() throws Exception {
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java
index 8ad5ace..1501a81 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java
@@ -100,6 +100,10 @@ class SimpleSink<DestinationT> extends 
FileBasedSink<String, DestinationT, Strin
     public SimpleWriter<DestinationT> createWriter() {
       return new SimpleWriter<>(this);
     }
+
+    public ResourceId getTempDirectory() {
+      return tempDirectory.get();
+    }
   }
 
   static final class SimpleWriter<DestinationT> extends Writer<DestinationT, 
String> {
diff --git a/sdks/python/apache_beam/io/filebasedsink_test.py 
b/sdks/python/apache_beam/io/filebasedsink_test.py
index a934381..666bc9c 100644
--- a/sdks/python/apache_beam/io/filebasedsink_test.py
+++ b/sdks/python/apache_beam/io/filebasedsink_test.py
@@ -231,6 +231,14 @@ class TestFileBasedSink(_TestCaseWithTempDirCleanUp):
     with self.assertRaises(ValueError):
       _get_temp_dir(dir_root_path)
 
+  def test_temp_dir_uniqueness(self):
+    temp_path = os.path.join(self._new_tempdir(), 'unique')
+    sink = MyFileBasedSink(temp_path, coder=coders.ToStringCoder())
+    init_list = [''] * 1000
+    temp_dir_list = [sink._create_temp_dir(temp_path) for _ in init_list]
+    temp_dir_set = set(temp_dir_list)
+    self.assertEqual(len(temp_dir_list), len(temp_dir_set))
+
   def test_temp_dir_gcs(self):
     try:
       self.run_temp_dir_check(

Reply via email to