This is an automated email from the ASF dual-hosted git repository.

jkff pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 7564d822aa28a79f27bd0efa09a213ffd3635708
Author: Ɓukasz Gajowy <[email protected]>
AuthorDate: Thu Nov 30 15:59:59 2017 -0800

    add post-review updates
---
 .../beam/sdk/io/common/IOTestPipelineOptions.java  |  3 +-
 ...FileBasedIOIT.java => FileBasedIOITHelper.java} | 50 +++++++++-------------
 .../java/org/apache/beam/sdk/io/text/TextIOIT.java | 16 ++++---
 .../apache/beam/sdk/io/tfrecord/TFRecordIOIT.java  | 16 ++++---
 4 files changed, 43 insertions(+), 42 deletions(-)

diff --git 
a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java
 
b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java
index d919654..e7b475d 100644
--- 
a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java
+++ 
b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.io.common;
 
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.Validation;
 import org.apache.beam.sdk.testing.TestPipelineOptions;
 
 /**
@@ -96,7 +97,7 @@ public interface IOTestPipelineOptions extends 
TestPipelineOptions {
   void setNumberOfRecords(Long count);
 
   @Description("Destination prefix for files generated by the test")
-  @Default.String("FILEBASEDIOIT")
+  @Validation.Required
   String getFilenamePrefix();
 
   void setFilenamePrefix(String prefix);
diff --git 
a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/common/AbstractFileBasedIOIT.java
 
b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/common/FileBasedIOITHelper.java
similarity index 66%
rename from 
sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/common/AbstractFileBasedIOIT.java
rename to 
sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/common/FileBasedIOITHelper.java
index 9eb8aea..cf20d8e 100644
--- 
a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/common/AbstractFileBasedIOIT.java
+++ 
b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/common/FileBasedIOITHelper.java
@@ -18,47 +18,44 @@
 
 package org.apache.beam.sdk.io.common;
 
-import com.google.common.base.Function;
-import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 import java.io.IOException;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.Date;
+import java.util.HashSet;
 import java.util.Map;
-import org.apache.beam.sdk.io.Compression;
+import java.util.Set;
 import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.io.fs.MatchResult;
 import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.PipelineOptionsValidator;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.DoFn;
 
 /**
- * Abstract class for file based IO Integration tests.
+ * Contains helper methods for file based IO Integration tests.
  */
-public abstract class AbstractFileBasedIOIT {
+public class FileBasedIOITHelper {
 
-  protected static IOTestPipelineOptions readTestPipelineOptions() {
-    PipelineOptionsFactory.register(IOTestPipelineOptions.class);
-    return 
TestPipeline.testingPipelineOptions().as(IOTestPipelineOptions.class);
+  private FileBasedIOITHelper() {
   }
 
-  protected static String appendTimestampToPrefix(String filenamePrefix) {
-    return String.format("%s_%s", filenamePrefix, new Date().getTime());
+  public static IOTestPipelineOptions readTestPipelineOptions() {
+    PipelineOptionsFactory.register(IOTestPipelineOptions.class);
+    IOTestPipelineOptions options = TestPipeline
+        .testingPipelineOptions()
+        .as(IOTestPipelineOptions.class);
+
+    return PipelineOptionsValidator.validate(IOTestPipelineOptions.class, 
options);
   }
 
-  protected static Compression parseCompressionType(String compressionType) {
-    try {
-      return Compression.valueOf(compressionType.toUpperCase());
-    } catch (IllegalArgumentException ex) {
-      throw new IllegalArgumentException(
-          String.format("Unsupported compression type: %s", compressionType));
-    }
+  public static String appendTimestampToPrefix(String filenamePrefix) {
+    return String.format("%s_%s", filenamePrefix, new Date().getTime());
   }
 
-  protected String getExpectedHashForLineCount(Long lineCount) {
+  public static String getExpectedHashForLineCount(Long lineCount) {
     Map<Long, String> expectedHashes = ImmutableMap.of(
         100_000L, "4c8bb3b99dcc59459b20fefba400d446",
         1_000_000L, "9796db06e7a7960f974d5a91164afff1",
@@ -78,6 +75,7 @@ public abstract class AbstractFileBasedIOIT {
    * Constructs text lines in files used for testing.
    */
   public static class DeterministicallyConstructTestTextLineFn extends 
DoFn<Long, String> {
+
     @ProcessElement
     public void processElement(ProcessContext c) {
       c.output(String.format("IO IT Test line of text. Line seed: %s", 
c.element()));
@@ -94,18 +92,12 @@ public abstract class AbstractFileBasedIOIT {
       MatchResult match = Iterables
           
.getOnlyElement(FileSystems.match(Collections.singletonList(c.element())));
 
-      Collection<ResourceId> resourceIds = toResourceIds(match);
+      Set<ResourceId> resourceIds = new HashSet<>();
+      for (MatchResult.Metadata metadataElem : match.metadata()) {
+        resourceIds.add(metadataElem.resourceId());
+      }
 
       FileSystems.delete(resourceIds);
     }
-    private Collection<ResourceId> toResourceIds(MatchResult match) throws 
IOException {
-      return FluentIterable.from(match.metadata())
-          .transform(new Function<MatchResult.Metadata, ResourceId>() {
-            @Override
-            public ResourceId apply(MatchResult.Metadata metadata) {
-              return metadata.resourceId();
-            }
-          }).toList();
-    }
   }
 }
diff --git 
a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java
 
b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java
index 7593f85..f9fad80 100644
--- 
a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java
+++ 
b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java
@@ -19,12 +19,15 @@
 package org.apache.beam.sdk.io.text;
 
 import static org.apache.beam.sdk.io.Compression.AUTO;
+import static 
org.apache.beam.sdk.io.common.FileBasedIOITHelper.appendTimestampToPrefix;
+import static 
org.apache.beam.sdk.io.common.FileBasedIOITHelper.getExpectedHashForLineCount;
+import static 
org.apache.beam.sdk.io.common.FileBasedIOITHelper.readTestPipelineOptions;
 
 import java.text.ParseException;
 import org.apache.beam.sdk.io.Compression;
 import org.apache.beam.sdk.io.GenerateSequence;
 import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.io.common.AbstractFileBasedIOIT;
+import org.apache.beam.sdk.io.common.FileBasedIOITHelper;
 import org.apache.beam.sdk.io.common.HashingFn;
 import org.apache.beam.sdk.io.common.IOTestPipelineOptions;
 import org.apache.beam.sdk.testing.PAssert;
@@ -49,7 +52,7 @@ import org.junit.runners.JUnit4;
  *  -Dit.test=org.apache.beam.sdk.io.text.TextIOIT
  *  -DintegrationTestPipelineOptions='[
  *  "--numberOfRecords=100000",
- *  "--filenamePrefix=FILEBASEDIOIT"
+ *  "--filenamePrefix=output_file_path",
  *  "--compressionType=GZIP"
  *  ]'
  * </pre>
@@ -58,7 +61,7 @@ import org.junit.runners.JUnit4;
  * running this test using Beam performance testing framework.</p>
  */
 @RunWith(JUnit4.class)
-public class TextIOIT extends AbstractFileBasedIOIT {
+public class TextIOIT {
 
   private static String filenamePrefix;
   private static Long numberOfTextLines;
@@ -73,7 +76,7 @@ public class TextIOIT extends AbstractFileBasedIOIT {
 
     numberOfTextLines = options.getNumberOfRecords();
     filenamePrefix = appendTimestampToPrefix(options.getFilenamePrefix());
-    compressionType = parseCompressionType(options.getCompressionType());
+    compressionType = Compression.valueOf(options.getCompressionType());
   }
 
   @Test
@@ -86,7 +89,8 @@ public class TextIOIT extends AbstractFileBasedIOIT {
 
     PCollection<String> testFilenames = pipeline
         .apply("Generate sequence", 
GenerateSequence.from(0).to(numberOfTextLines))
-        .apply("Produce text lines", ParDo.of(new 
DeterministicallyConstructTestTextLineFn()))
+        .apply("Produce text lines",
+            ParDo.of(new 
FileBasedIOITHelper.DeterministicallyConstructTestTextLineFn()))
         .apply("Write content to files", write)
         .getPerDestinationOutputFilenames().apply(Values.<String>create());
 
@@ -97,7 +101,7 @@ public class TextIOIT extends AbstractFileBasedIOIT {
     String expectedHash = getExpectedHashForLineCount(numberOfTextLines);
     PAssert.thatSingleton(consolidatedHashcode).isEqualTo(expectedHash);
 
-    testFilenames.apply("Delete test files", ParDo.of(new DeleteFileFn())
+    testFilenames.apply("Delete test files", ParDo.of(new 
FileBasedIOITHelper.DeleteFileFn())
         
.withSideInputs(consolidatedHashcode.apply(View.<String>asSingleton())));
 
     pipeline.run().waitUntilFinish();
diff --git 
a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/tfrecord/TFRecordIOIT.java
 
b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/tfrecord/TFRecordIOIT.java
index 4589942..b887316 100644
--- 
a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/tfrecord/TFRecordIOIT.java
+++ 
b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/tfrecord/TFRecordIOIT.java
@@ -19,12 +19,15 @@
 package org.apache.beam.sdk.io.tfrecord;
 
 import static org.apache.beam.sdk.io.Compression.AUTO;
+import static 
org.apache.beam.sdk.io.common.FileBasedIOITHelper.appendTimestampToPrefix;
+import static 
org.apache.beam.sdk.io.common.FileBasedIOITHelper.getExpectedHashForLineCount;
+import static 
org.apache.beam.sdk.io.common.FileBasedIOITHelper.readTestPipelineOptions;
 
 import java.text.ParseException;
 import org.apache.beam.sdk.io.Compression;
 import org.apache.beam.sdk.io.GenerateSequence;
 import org.apache.beam.sdk.io.TFRecordIO;
-import org.apache.beam.sdk.io.common.AbstractFileBasedIOIT;
+import org.apache.beam.sdk.io.common.FileBasedIOITHelper;
 import org.apache.beam.sdk.io.common.HashingFn;
 import org.apache.beam.sdk.io.common.IOTestPipelineOptions;
 import org.apache.beam.sdk.testing.PAssert;
@@ -51,7 +54,7 @@ import org.junit.runners.JUnit4;
  *  -Dit.test=org.apache.beam.sdk.io.tfrecord.TFRecordIOIT
  *  -DintegrationTestPipelineOptions='[
  *  "--numberOfRecords=100000",
- *  "--filenamePrefix=FILEBASEDIOIT"
+ *  "--filenamePrefix=output_file_path",
  *  "--compressionType=GZIP"
  *  ]'
  * </pre>
@@ -60,7 +63,7 @@ import org.junit.runners.JUnit4;
  * running this test using Beam performance testing framework.</p>
  */
 @RunWith(JUnit4.class)
-public class TFRecordIOIT extends AbstractFileBasedIOIT {
+public class TFRecordIOIT {
 
   private static String filenamePrefix;
   private static Long numberOfTextLines;
@@ -78,7 +81,7 @@ public class TFRecordIOIT extends AbstractFileBasedIOIT {
 
     numberOfTextLines = options.getNumberOfRecords();
     filenamePrefix = appendTimestampToPrefix(options.getFilenamePrefix());
-    compressionType = parseCompressionType(options.getCompressionType());
+    compressionType = Compression.valueOf(options.getCompressionType());
   }
 
   private static String createFilenamePattern() {
@@ -96,7 +99,8 @@ public class TFRecordIOIT extends AbstractFileBasedIOIT {
 
     writePipeline
         .apply("Generate sequence", 
GenerateSequence.from(0).to(numberOfTextLines))
-        .apply("Produce text lines", ParDo.of(new 
DeterministicallyConstructTestTextLineFn()))
+        .apply("Produce text lines",
+            ParDo.of(new 
FileBasedIOITHelper.DeterministicallyConstructTestTextLineFn()))
         .apply("Transform strings to bytes", MapElements.via(new 
StringToByteArray()))
         .apply("Write content to files", writeTransform);
 
@@ -112,7 +116,7 @@ public class TFRecordIOIT extends AbstractFileBasedIOIT {
     PAssert.thatSingleton(consolidatedHashcode).isEqualTo(expectedHash);
 
     readPipeline.apply(Create.of(filenamePattern))
-        .apply("Delete test files", ParDo.of(new DeleteFileFn())
+        .apply("Delete test files", ParDo.of(new 
FileBasedIOITHelper.DeleteFileFn())
         
.withSideInputs(consolidatedHashcode.apply(View.<String>asSingleton())));
     readPipeline.run().waitUntilFinish();
   }

-- 
To stop receiving notification emails like this one, please contact
"[email protected]" <[email protected]>.

Reply via email to