This is an automated email from the ASF dual-hosted git repository.
altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new dfd46d8 [BEAM-7846] add test for BEAM-7689
new a2b57e3 Merge pull request #9228 from ihji/BEAM-7846
dfd46d8 is described below
commit dfd46d8e02e8f18dc8504931cb27558b9d8d75ab
Author: Heejong Lee <[email protected]>
AuthorDate: Thu Aug 1 18:10:31 2019 -0700
[BEAM-7846] add test for BEAM-7689
check whether FileBasedSink can create a unique temporary directory.
---
.../java/org/apache/beam/sdk/io/FileBasedSinkTest.java | 16 ++++++++++++++++
.../src/test/java/org/apache/beam/sdk/io/SimpleSink.java | 4 ++++
sdks/python/apache_beam/io/filebasedsink_test.py | 8 ++++++++
3 files changed, 28 insertions(+)
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
index 8a4733f..5da3290 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
@@ -47,6 +47,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.Set;
import java.util.zip.GZIPInputStream;
import org.apache.beam.sdk.io.FileBasedSink.CompressionType;
import org.apache.beam.sdk.io.FileBasedSink.FileResult;
@@ -59,6 +60,7 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
import
org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
import
org.apache.commons.compress.compressors.deflate.DeflateCompressorInputStream;
import org.junit.Rule;
@@ -141,6 +143,20 @@ public class FileBasedSinkTest {
}
}
+ /** Test whether WriteOperation can create a unique temporary directory. */
+ @Test
+ public void testTemporaryDirectoryUniqueness() {
+ List<SimpleSink.SimpleWriteOperation<Void>> writeOps =
Lists.newArrayListWithCapacity(1000);
+ for (int i = 0; i < 1000; i++) {
+ writeOps.add(buildWriteOperation());
+ }
+ Set<String> tempDirectorySet = Sets.newHashSetWithExpectedSize(1000);
+ for (SimpleSink.SimpleWriteOperation<Void> op : writeOps) {
+ tempDirectorySet.add(op.getTempDirectory().toString());
+ }
+ assertEquals(1000, tempDirectorySet.size());
+ }
+
/** Removes temporary files when temporary and output directories differ. */
@Test
public void testRemoveWithTempFilename() throws Exception {
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java
index 8ad5ace..1501a81 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java
@@ -100,6 +100,10 @@ class SimpleSink<DestinationT> extends
FileBasedSink<String, DestinationT, Strin
public SimpleWriter<DestinationT> createWriter() {
return new SimpleWriter<>(this);
}
+
+ public ResourceId getTempDirectory() {
+ return tempDirectory.get();
+ }
}
static final class SimpleWriter<DestinationT> extends Writer<DestinationT,
String> {
diff --git a/sdks/python/apache_beam/io/filebasedsink_test.py
b/sdks/python/apache_beam/io/filebasedsink_test.py
index a934381..666bc9c 100644
--- a/sdks/python/apache_beam/io/filebasedsink_test.py
+++ b/sdks/python/apache_beam/io/filebasedsink_test.py
@@ -231,6 +231,14 @@ class TestFileBasedSink(_TestCaseWithTempDirCleanUp):
with self.assertRaises(ValueError):
_get_temp_dir(dir_root_path)
+ def test_temp_dir_uniqueness(self):
+ temp_path = os.path.join(self._new_tempdir(), 'unique')
+ sink = MyFileBasedSink(temp_path, coder=coders.ToStringCoder())
+ init_list = [''] * 1000
+ temp_dir_list = [sink._create_temp_dir(temp_path) for _ in init_list]
+ temp_dir_set = set(temp_dir_list)
+ self.assertEqual(len(temp_dir_list), len(temp_dir_set))
+
def test_temp_dir_gcs(self):
try:
self.run_temp_dir_check(