This is an automated email from the ASF dual-hosted git repository. jkff pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
commit 7564d822aa28a79f27bd0efa09a213ffd3635708 Author: Ćukasz Gajowy <[email protected]> AuthorDate: Thu Nov 30 15:59:59 2017 -0800 add post-review updates --- .../beam/sdk/io/common/IOTestPipelineOptions.java | 3 +- ...FileBasedIOIT.java => FileBasedIOITHelper.java} | 50 +++++++++------------- .../java/org/apache/beam/sdk/io/text/TextIOIT.java | 16 ++++--- .../apache/beam/sdk/io/tfrecord/TFRecordIOIT.java | 16 ++++--- 4 files changed, 43 insertions(+), 42 deletions(-) diff --git a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java index d919654..e7b475d 100644 --- a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java +++ b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java @@ -19,6 +19,7 @@ package org.apache.beam.sdk.io.common; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.Validation; import org.apache.beam.sdk.testing.TestPipelineOptions; /** @@ -96,7 +97,7 @@ public interface IOTestPipelineOptions extends TestPipelineOptions { void setNumberOfRecords(Long count); @Description("Destination prefix for files generated by the test") - @Default.String("FILEBASEDIOIT") + @Validation.Required String getFilenamePrefix(); void setFilenamePrefix(String prefix); diff --git a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/common/AbstractFileBasedIOIT.java b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/common/FileBasedIOITHelper.java similarity index 66% rename from sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/common/AbstractFileBasedIOIT.java rename to sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/common/FileBasedIOITHelper.java index 9eb8aea..cf20d8e 100644 --- a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/common/AbstractFileBasedIOIT.java +++ b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/common/FileBasedIOITHelper.java @@ -18,47 +18,44 @@ package org.apache.beam.sdk.io.common; -import com.google.common.base.Function; -import com.google.common.collect.FluentIterable; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import java.io.IOException; -import java.util.Collection; import java.util.Collections; import java.util.Date; +import java.util.HashSet; import java.util.Map; -import org.apache.beam.sdk.io.Compression; +import java.util.Set; import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.io.fs.MatchResult; import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.options.PipelineOptionsValidator; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.DoFn; /** - * Abstract class for file based IO Integration tests. + * Contains helper methods for file based IO Integration tests. */ -public abstract class AbstractFileBasedIOIT { +public class FileBasedIOITHelper { - protected static IOTestPipelineOptions readTestPipelineOptions() { - PipelineOptionsFactory.register(IOTestPipelineOptions.class); - return TestPipeline.testingPipelineOptions().as(IOTestPipelineOptions.class); + private FileBasedIOITHelper() { } - protected static String appendTimestampToPrefix(String filenamePrefix) { - return String.format("%s_%s", filenamePrefix, new Date().getTime()); + public static IOTestPipelineOptions readTestPipelineOptions() { + PipelineOptionsFactory.register(IOTestPipelineOptions.class); + IOTestPipelineOptions options = TestPipeline + .testingPipelineOptions() + .as(IOTestPipelineOptions.class); + + return PipelineOptionsValidator.validate(IOTestPipelineOptions.class, options); } - protected static Compression parseCompressionType(String compressionType) { - try { - return Compression.valueOf(compressionType.toUpperCase()); - } catch (IllegalArgumentException ex) { - throw new IllegalArgumentException( - String.format("Unsupported compression type: %s", compressionType)); - } + public static String appendTimestampToPrefix(String filenamePrefix) { + return String.format("%s_%s", filenamePrefix, new Date().getTime()); } - protected String getExpectedHashForLineCount(Long lineCount) { + public static String getExpectedHashForLineCount(Long lineCount) { Map<Long, String> expectedHashes = ImmutableMap.of( 100_000L, "4c8bb3b99dcc59459b20fefba400d446", 1_000_000L, "9796db06e7a7960f974d5a91164afff1", @@ -78,6 +75,7 @@ public abstract class AbstractFileBasedIOIT { * Constructs text lines in files used for testing. */ public static class DeterministicallyConstructTestTextLineFn extends DoFn<Long, String> { + @ProcessElement public void processElement(ProcessContext c) { c.output(String.format("IO IT Test line of text. Line seed: %s", c.element())); @@ -94,18 +92,12 @@ public abstract class AbstractFileBasedIOIT { MatchResult match = Iterables .getOnlyElement(FileSystems.match(Collections.singletonList(c.element()))); - Collection<ResourceId> resourceIds = toResourceIds(match); + Set<ResourceId> resourceIds = new HashSet<>(); + for (MatchResult.Metadata metadataElem : match.metadata()) { + resourceIds.add(metadataElem.resourceId()); + } FileSystems.delete(resourceIds); } - private Collection<ResourceId> toResourceIds(MatchResult match) throws IOException { - return FluentIterable.from(match.metadata()) - .transform(new Function<MatchResult.Metadata, ResourceId>() { - @Override - public ResourceId apply(MatchResult.Metadata metadata) { - return metadata.resourceId(); - } - }).toList(); - } } } diff --git a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java index 7593f85..f9fad80 100644 --- a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java +++ b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java @@ -19,12 +19,15 @@ package org.apache.beam.sdk.io.text; import static org.apache.beam.sdk.io.Compression.AUTO; +import static org.apache.beam.sdk.io.common.FileBasedIOITHelper.appendTimestampToPrefix; +import static org.apache.beam.sdk.io.common.FileBasedIOITHelper.getExpectedHashForLineCount; +import static org.apache.beam.sdk.io.common.FileBasedIOITHelper.readTestPipelineOptions; import java.text.ParseException; import org.apache.beam.sdk.io.Compression; import org.apache.beam.sdk.io.GenerateSequence; import org.apache.beam.sdk.io.TextIO; -import org.apache.beam.sdk.io.common.AbstractFileBasedIOIT; +import org.apache.beam.sdk.io.common.FileBasedIOITHelper; import org.apache.beam.sdk.io.common.HashingFn; import org.apache.beam.sdk.io.common.IOTestPipelineOptions; import org.apache.beam.sdk.testing.PAssert; @@ -49,7 +52,7 @@ import org.junit.runners.JUnit4; * -Dit.test=org.apache.beam.sdk.io.text.TextIOIT * -DintegrationTestPipelineOptions='[ * "--numberOfRecords=100000", - * "--filenamePrefix=FILEBASEDIOIT" + * "--filenamePrefix=output_file_path", * "--compressionType=GZIP" * ]' * </pre> @@ -58,7 +61,7 @@ import org.junit.runners.JUnit4; * running this test using Beam performance testing framework.</p> */ @RunWith(JUnit4.class) -public class TextIOIT extends AbstractFileBasedIOIT { +public class TextIOIT { private static String filenamePrefix; private static Long numberOfTextLines; @@ -73,7 +76,7 @@ public class TextIOIT extends AbstractFileBasedIOIT { numberOfTextLines = options.getNumberOfRecords(); filenamePrefix = appendTimestampToPrefix(options.getFilenamePrefix()); - compressionType = parseCompressionType(options.getCompressionType()); + compressionType = Compression.valueOf(options.getCompressionType()); } @Test @@ -86,7 +89,8 @@ public class TextIOIT extends AbstractFileBasedIOIT { PCollection<String> testFilenames = pipeline .apply("Generate sequence", GenerateSequence.from(0).to(numberOfTextLines)) - .apply("Produce text lines", ParDo.of(new DeterministicallyConstructTestTextLineFn())) + .apply("Produce text lines", + ParDo.of(new FileBasedIOITHelper.DeterministicallyConstructTestTextLineFn())) .apply("Write content to files", write) .getPerDestinationOutputFilenames().apply(Values.<String>create()); @@ -97,7 +101,7 @@ public class TextIOIT extends AbstractFileBasedIOIT { String expectedHash = getExpectedHashForLineCount(numberOfTextLines); PAssert.thatSingleton(consolidatedHashcode).isEqualTo(expectedHash); - testFilenames.apply("Delete test files", ParDo.of(new DeleteFileFn()) + testFilenames.apply("Delete test files", ParDo.of(new FileBasedIOITHelper.DeleteFileFn()) .withSideInputs(consolidatedHashcode.apply(View.<String>asSingleton()))); pipeline.run().waitUntilFinish(); diff --git a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/tfrecord/TFRecordIOIT.java b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/tfrecord/TFRecordIOIT.java index 4589942..b887316 100644 --- a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/tfrecord/TFRecordIOIT.java +++ b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/tfrecord/TFRecordIOIT.java @@ -19,12 +19,15 @@ package org.apache.beam.sdk.io.tfrecord; import static org.apache.beam.sdk.io.Compression.AUTO; +import static org.apache.beam.sdk.io.common.FileBasedIOITHelper.appendTimestampToPrefix; +import static org.apache.beam.sdk.io.common.FileBasedIOITHelper.getExpectedHashForLineCount; +import static org.apache.beam.sdk.io.common.FileBasedIOITHelper.readTestPipelineOptions; import java.text.ParseException; import org.apache.beam.sdk.io.Compression; import org.apache.beam.sdk.io.GenerateSequence; import org.apache.beam.sdk.io.TFRecordIO; -import org.apache.beam.sdk.io.common.AbstractFileBasedIOIT; +import org.apache.beam.sdk.io.common.FileBasedIOITHelper; import org.apache.beam.sdk.io.common.HashingFn; import org.apache.beam.sdk.io.common.IOTestPipelineOptions; import org.apache.beam.sdk.testing.PAssert; @@ -51,7 +54,7 @@ import org.junit.runners.JUnit4; * -Dit.test=org.apache.beam.sdk.io.tfrecord.TFRecordIOIT * -DintegrationTestPipelineOptions='[ * "--numberOfRecords=100000", - * "--filenamePrefix=FILEBASEDIOIT" + * "--filenamePrefix=output_file_path", * "--compressionType=GZIP" * ]' * </pre> @@ -60,7 +63,7 @@ import org.junit.runners.JUnit4; * running this test using Beam performance testing framework.</p> */ @RunWith(JUnit4.class) -public class TFRecordIOIT extends AbstractFileBasedIOIT { +public class TFRecordIOIT { private static String filenamePrefix; private static Long numberOfTextLines; @@ -78,7 +81,7 @@ public class TFRecordIOIT extends AbstractFileBasedIOIT { numberOfTextLines = options.getNumberOfRecords(); filenamePrefix = appendTimestampToPrefix(options.getFilenamePrefix()); - compressionType = parseCompressionType(options.getCompressionType()); + compressionType = Compression.valueOf(options.getCompressionType()); } private static String createFilenamePattern() { @@ -96,7 +99,8 @@ public class TFRecordIOIT extends AbstractFileBasedIOIT { writePipeline .apply("Generate sequence", GenerateSequence.from(0).to(numberOfTextLines)) - .apply("Produce text lines", ParDo.of(new DeterministicallyConstructTestTextLineFn())) + .apply("Produce text lines", + ParDo.of(new FileBasedIOITHelper.DeterministicallyConstructTestTextLineFn())) .apply("Transform strings to bytes", MapElements.via(new StringToByteArray())) .apply("Write content to files", writeTransform); @@ -112,7 +116,7 @@ public class TFRecordIOIT extends AbstractFileBasedIOIT { PAssert.thatSingleton(consolidatedHashcode).isEqualTo(expectedHash); readPipeline.apply(Create.of(filenamePattern)) - .apply("Delete test files", ParDo.of(new DeleteFileFn()) + .apply("Delete test files", ParDo.of(new FileBasedIOITHelper.DeleteFileFn()) .withSideInputs(consolidatedHashcode.apply(View.<String>asSingleton()))); readPipeline.run().waitUntilFinish(); } -- To stop receiving notification emails like this one, please contact "[email protected]" <[email protected]>.
