This is an automated email from the ASF dual-hosted git repository. jkff pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
commit 1a6c25c7c547de882b7ec4da80a5ace1d01f397d Author: Ćukasz Gajowy <[email protected]> AuthorDate: Tue Nov 28 13:23:28 2017 -0800 [BEAM-3060] add TFRecordIOIT --- .../beam/sdk/io/common/IOTestPipelineOptions.java | 2 +- .../beam/sdk/io/common/AbstractFileBasedIOIT.java | 111 +++++++++++++++++ .../java/org/apache/beam/sdk/io/text/TextIOIT.java | 86 +------------ .../apache/beam/sdk/io/tfrecord/TFRecordIOIT.java | 133 +++++++++++++++++++++ 4 files changed, 251 insertions(+), 81 deletions(-) diff --git a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java index 5a29d4f..d919654 100644 --- a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java +++ b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java @@ -96,7 +96,7 @@ public interface IOTestPipelineOptions extends TestPipelineOptions { void setNumberOfRecords(Long count); @Description("Destination prefix for files generated by the test") - @Default.String("TEXTIOIT") + @Default.String("FILEBASEDIOIT") String getFilenamePrefix(); void setFilenamePrefix(String prefix); diff --git a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/common/AbstractFileBasedIOIT.java b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/common/AbstractFileBasedIOIT.java new file mode 100644 index 0000000..9eb8aea --- /dev/null +++ b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/common/AbstractFileBasedIOIT.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.io.common; + +import com.google.common.base.Function; +import com.google.common.collect.FluentIterable; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.Map; +import org.apache.beam.sdk.io.Compression; +import org.apache.beam.sdk.io.FileSystems; +import org.apache.beam.sdk.io.fs.MatchResult; +import org.apache.beam.sdk.io.fs.ResourceId; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.DoFn; + +/** + * Abstract class for file based IO Integration tests. + */ +public abstract class AbstractFileBasedIOIT { + + protected static IOTestPipelineOptions readTestPipelineOptions() { + PipelineOptionsFactory.register(IOTestPipelineOptions.class); + return TestPipeline.testingPipelineOptions().as(IOTestPipelineOptions.class); + } + + protected static String appendTimestampToPrefix(String filenamePrefix) { + return String.format("%s_%s", filenamePrefix, new Date().getTime()); + } + + protected static Compression parseCompressionType(String compressionType) { + try { + return Compression.valueOf(compressionType.toUpperCase()); + } catch (IllegalArgumentException ex) { + throw new IllegalArgumentException( + String.format("Unsupported compression type: %s", compressionType)); + } + } + + protected String getExpectedHashForLineCount(Long lineCount) { + Map<Long, String> expectedHashes = ImmutableMap.of( + 100_000L, "4c8bb3b99dcc59459b20fefba400d446", + 1_000_000L, "9796db06e7a7960f974d5a91164afff1", + 100_000_000L, "6ce05f456e2fdc846ded2abd0ec1de95" + ); + + String hash = expectedHashes.get(lineCount); + if (hash == null) { + throw new UnsupportedOperationException( + String.format("No hash for that line count: %s", lineCount) + ); + } + return hash; + } + + /** + * Constructs text lines in files used for testing. + */ + public static class DeterministicallyConstructTestTextLineFn extends DoFn<Long, String> { + @ProcessElement + public void processElement(ProcessContext c) { + c.output(String.format("IO IT Test line of text. Line seed: %s", c.element())); + } + } + + /** + * Deletes matching files using the FileSystems API. + */ + public static class DeleteFileFn extends DoFn<String, Void> { + + @ProcessElement + public void processElement(ProcessContext c) throws IOException { + MatchResult match = Iterables + .getOnlyElement(FileSystems.match(Collections.singletonList(c.element()))); + + Collection<ResourceId> resourceIds = toResourceIds(match); + + FileSystems.delete(resourceIds); + } + private Collection<ResourceId> toResourceIds(MatchResult match) throws IOException { + return FluentIterable.from(match.metadata()) + .transform(new Function<MatchResult.Metadata, ResourceId>() { + @Override + public ResourceId apply(MatchResult.Metadata metadata) { + return metadata.resourceId(); + } + }).toList(); + } + } +} diff --git a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java index e9aac80..7593f85 100644 --- a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java +++ b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java @@ -20,31 +20,16 @@ package org.apache.beam.sdk.io.text; import static org.apache.beam.sdk.io.Compression.AUTO; -import com.google.common.base.Function; -import com.google.common.collect.FluentIterable; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Iterables; - -import java.io.IOException; import java.text.ParseException; -import java.util.Collection; -import java.util.Collections; -import java.util.Date; -import java.util.Map; - import org.apache.beam.sdk.io.Compression; -import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.io.GenerateSequence; import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.io.common.AbstractFileBasedIOIT; import org.apache.beam.sdk.io.common.HashingFn; import org.apache.beam.sdk.io.common.IOTestPipelineOptions; -import org.apache.beam.sdk.io.fs.MatchResult; -import org.apache.beam.sdk.io.fs.ResourceId; -import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Combine; -import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.Values; import org.apache.beam.sdk.transforms.View; @@ -64,16 +49,16 @@ import org.junit.runners.JUnit4; * -Dit.test=org.apache.beam.sdk.io.text.TextIOIT * -DintegrationTestPipelineOptions='[ * "--numberOfRecords=100000", - * "--filenamePrefix=TEXTIOIT" + * "--filenamePrefix=FILEBASEDIOIT" * "--compressionType=GZIP" * ]' * </pre> * </p> * <p>Please see 'sdks/java/io/file-based-io-tests/pom.xml' for instructions regarding * running this test using Beam performance testing framework.</p> - * */ + */ @RunWith(JUnit4.class) -public class TextIOIT { +public class TextIOIT extends AbstractFileBasedIOIT { private static String filenamePrefix; private static Long numberOfTextLines; @@ -84,28 +69,13 @@ public class TextIOIT { @BeforeClass public static void setup() throws ParseException { - PipelineOptionsFactory.register(IOTestPipelineOptions.class); - IOTestPipelineOptions options = TestPipeline.testingPipelineOptions() - .as(IOTestPipelineOptions.class); + IOTestPipelineOptions options = readTestPipelineOptions(); numberOfTextLines = options.getNumberOfRecords(); - filenamePrefix = appendTimestamp(options.getFilenamePrefix()); + filenamePrefix = appendTimestampToPrefix(options.getFilenamePrefix()); compressionType = parseCompressionType(options.getCompressionType()); } - private static Compression parseCompressionType(String compressionType) { - try { - return Compression.valueOf(compressionType.toUpperCase()); - } catch (IllegalArgumentException ex) { - throw new IllegalArgumentException( - String.format("Unsupported compression type: %s", compressionType)); - } - } - - private static String appendTimestamp(String filenamePrefix) { - return String.format("%s_%s", filenamePrefix, new Date().getTime()); - } - @Test public void writeThenReadAll() { TextIO.TypedWrite<String, Object> write = TextIO @@ -132,48 +102,4 @@ public class TextIOIT { pipeline.run().waitUntilFinish(); } - - private static String getExpectedHashForLineCount(Long lineCount) { - Map<Long, String> expectedHashes = ImmutableMap.of( - 100_000L, "4c8bb3b99dcc59459b20fefba400d446", - 1_000_000L, "9796db06e7a7960f974d5a91164afff1", - 100_000_000L, "6ce05f456e2fdc846ded2abd0ec1de95" - ); - - String hash = expectedHashes.get(lineCount); - if (hash == null) { - throw new UnsupportedOperationException( - String.format("No hash for that line count: %s", lineCount)); - } - return hash; - } - - private static class DeterministicallyConstructTestTextLineFn extends DoFn<Long, String> { - - @ProcessElement - public void processElement(ProcessContext c) { - c.output(String.format("IO IT Test line of text. Line seed: %s", c.element())); - } - } - - private static class DeleteFileFn extends DoFn<String, Void> { - - @ProcessElement - public void processElement(ProcessContext c) throws IOException { - MatchResult match = Iterables - .getOnlyElement(FileSystems.match(Collections.singletonList(c.element()))); - FileSystems.delete(toResourceIds(match)); - } - - private Collection<ResourceId> toResourceIds(MatchResult match) throws IOException { - return FluentIterable.from(match.metadata()) - .transform(new Function<MatchResult.Metadata, ResourceId>() { - - @Override - public ResourceId apply(MatchResult.Metadata metadata) { - return metadata.resourceId(); - } - }).toList(); - } - } } diff --git a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/tfrecord/TFRecordIOIT.java b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/tfrecord/TFRecordIOIT.java new file mode 100644 index 0000000..4589942 --- /dev/null +++ b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/tfrecord/TFRecordIOIT.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.io.tfrecord; + +import static org.apache.beam.sdk.io.Compression.AUTO; + +import java.text.ParseException; +import org.apache.beam.sdk.io.Compression; +import org.apache.beam.sdk.io.GenerateSequence; +import org.apache.beam.sdk.io.TFRecordIO; +import org.apache.beam.sdk.io.common.AbstractFileBasedIOIT; +import org.apache.beam.sdk.io.common.HashingFn; +import org.apache.beam.sdk.io.common.IOTestPipelineOptions; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.transforms.View; +import org.apache.beam.sdk.values.PCollection; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Integration tests for {@link org.apache.beam.sdk.io.TFRecordIO}. + * + * <p>Run this test using the command below. Pass in connection information via PipelineOptions: + * <pre> + * mvn -e -Pio-it verify -pl sdks/java/io/file-based-io-tests + * -Dit.test=org.apache.beam.sdk.io.tfrecord.TFRecordIOIT + * -DintegrationTestPipelineOptions='[ + * "--numberOfRecords=100000", + * "--filenamePrefix=FILEBASEDIOIT" + * "--compressionType=GZIP" + * ]' + * </pre> + * </p> + * <p>Please {@see 'sdks/java/io/file-based-io-tests/pom.xml'} for instructions regarding + * running this test using Beam performance testing framework.</p> + */ +@RunWith(JUnit4.class) +public class TFRecordIOIT extends AbstractFileBasedIOIT { + + private static String filenamePrefix; + private static Long numberOfTextLines; + private static Compression compressionType; + + @Rule + public TestPipeline writePipeline = TestPipeline.create(); + + @Rule + public TestPipeline readPipeline = TestPipeline.create(); + + @BeforeClass + public static void setup() throws ParseException { + IOTestPipelineOptions options = readTestPipelineOptions(); + + numberOfTextLines = options.getNumberOfRecords(); + filenamePrefix = appendTimestampToPrefix(options.getFilenamePrefix()); + compressionType = parseCompressionType(options.getCompressionType()); + } + + private static String createFilenamePattern() { + return filenamePrefix + "*"; + } + + // TODO: There are two pipelines due to: https://issues.apache.org/jira/browse/BEAM-3267 + @Test + public void writeThenReadAll() { + TFRecordIO.Write writeTransform = TFRecordIO + .write() + .to(filenamePrefix) + .withCompression(compressionType) + .withSuffix(".tfrecord"); + + writePipeline + .apply("Generate sequence", GenerateSequence.from(0).to(numberOfTextLines)) + .apply("Produce text lines", ParDo.of(new DeterministicallyConstructTestTextLineFn())) + .apply("Transform strings to bytes", MapElements.via(new StringToByteArray())) + .apply("Write content to files", writeTransform); + + writePipeline.run().waitUntilFinish(); + + String filenamePattern = createFilenamePattern(); + PCollection<String> consolidatedHashcode = readPipeline + .apply(TFRecordIO.read().from(filenamePattern).withCompression(AUTO)) + .apply("Transform bytes to strings", MapElements.via(new ByteArrayToString())) + .apply("Calculate hashcode", Combine.globally(new HashingFn())); + + String expectedHash = getExpectedHashForLineCount(numberOfTextLines); + PAssert.thatSingleton(consolidatedHashcode).isEqualTo(expectedHash); + + readPipeline.apply(Create.of(filenamePattern)) + .apply("Delete test files", ParDo.of(new DeleteFileFn()) + .withSideInputs(consolidatedHashcode.apply(View.<String>asSingleton()))); + readPipeline.run().waitUntilFinish(); + } + + static class StringToByteArray extends SimpleFunction<String, byte[]> { + @Override + public byte[] apply(String input) { + return input.getBytes(); + } + } + + static class ByteArrayToString extends SimpleFunction<byte[], String> { + @Override + public String apply(byte[] input) { + return new String(input); + } + } +} -- To stop receiving notification emails like this one, please contact "[email protected]" <[email protected]>.
