chamikaramj closed pull request #4149: [BEAM-3060] Add Compressed TextIOIT
URL: https://github.com/apache/beam/pull/4149
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java
b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java
index 91b3aa6d344..5a29d4f8126 100644
---
a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java
+++
b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java
@@ -100,4 +100,10 @@
String getFilenamePrefix();
void setFilenamePrefix(String prefix);
+
+ @Description("File compression type for writing and reading test files")
+ @Default.String("UNCOMPRESSED")
+ String getCompressionType();
+
+ void setCompressionType(String compressionType);
}
diff --git
a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java
b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java
index ecab1d86497..1b9b385a1ff 100644
---
a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java
+++
b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java
@@ -15,18 +15,24 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.beam.sdk.io.text;
+import static org.apache.beam.sdk.io.Compression.AUTO;
+
import com.google.common.base.Function;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
+
import java.io.IOException;
import java.text.ParseException;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.Map;
+
+import org.apache.beam.sdk.io.Compression;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.TextIO;
@@ -50,13 +56,16 @@
import org.junit.runners.JUnit4;
/**
- * An integration test for {@link org.apache.beam.sdk.io.TextIO}.
+ * Integration tests for {@link org.apache.beam.sdk.io.TextIO}.
*
* <p>Run this test using the command below. Pass in connection information
via PipelineOptions:
* <pre>
- * mvn -e -Pio-it verify -pl sdks/java/io/text
-DintegrationTestPipelineOptions='[
+ * mvn -e -Pio-it verify -pl sdks/java/io/file-based-io-tests
+ * -Dit.test=org.apache.beam.sdk.io.text.TextIOIT
+ * -DintegrationTestPipelineOptions='[
* "--numberOfRecords=100000",
* "--filenamePrefix=TEXTIOIT"
+ * "--compressionType=GZIP"
* ]'
* </pre>
* */
@@ -65,6 +74,7 @@
private static String filenamePrefix;
private static Long numberOfTextLines;
+ private static Compression compressionType;
@Rule
public TestPipeline pipeline = TestPipeline.create();
@@ -77,6 +87,16 @@ public static void setup() throws ParseException {
numberOfTextLines = options.getNumberOfRecords();
filenamePrefix = appendTimestamp(options.getFilenamePrefix());
+ compressionType = parseCompressionType(options.getCompressionType());
+ }
+
+ private static Compression parseCompressionType(String compressionType) {
+ try {
+ return Compression.valueOf(compressionType.toUpperCase());
+ } catch (IllegalArgumentException ex) {
+ throw new IllegalArgumentException(
+ String.format("Unsupported compression type: %s", compressionType));
+ }
}
private static String appendTimestamp(String filenamePrefix) {
@@ -85,14 +105,20 @@ private static String appendTimestamp(String
filenamePrefix) {
@Test
public void writeThenReadAll() {
+ TextIO.TypedWrite<String, Object> write = TextIO
+ .write()
+ .to(filenamePrefix)
+ .withOutputFilenames()
+ .withCompression(compressionType);
+
PCollection<String> testFilenames = pipeline
.apply("Generate sequence",
GenerateSequence.from(0).to(numberOfTextLines))
.apply("Produce text lines", ParDo.of(new
DeterministicallyConstructTestTextLineFn()))
- .apply("Write content to files",
TextIO.write().to(filenamePrefix).withOutputFilenames())
+ .apply("Write content to files", write)
.getPerDestinationOutputFilenames().apply(Values.<String>create());
PCollection<String> consolidatedHashcode = testFilenames
- .apply("Read all files", TextIO.readAll())
+ .apply("Read all files", TextIO.readAll().withCompression(AUTO))
.apply("Calculate hashcode", Combine.globally(new HashingFn()));
String expectedHash = getExpectedHashForLineCount(numberOfTextLines);
@@ -107,7 +133,8 @@ public void writeThenReadAll() {
private static String getExpectedHashForLineCount(Long lineCount) {
Map<Long, String> expectedHashes = ImmutableMap.of(
100_000L, "4c8bb3b99dcc59459b20fefba400d446",
- 1_000_000L, "9796db06e7a7960f974d5a91164afff1"
+ 1_000_000L, "9796db06e7a7960f974d5a91164afff1",
+ 100_000_000L, "6ce05f456e2fdc846ded2abd0ec1de95"
);
String hash = expectedHashes.get(lineCount);
@@ -119,6 +146,7 @@ private static String getExpectedHashForLineCount(Long
lineCount) {
}
private static class DeterministicallyConstructTestTextLineFn extends
DoFn<Long, String> {
+
@ProcessElement
public void processElement(ProcessContext c) {
c.output(String.format("IO IT Test line of text. Line seed: %s",
c.element()));
@@ -126,6 +154,7 @@ public void processElement(ProcessContext c) {
}
private static class DeleteFileFn extends DoFn<String, Void> {
+
@ProcessElement
public void processElement(ProcessContext c) throws IOException {
MatchResult match = Iterables
@@ -136,6 +165,7 @@ public void processElement(ProcessContext c) throws
IOException {
private Collection<ResourceId> toResourceIds(MatchResult match) throws
IOException {
return FluentIterable.from(match.metadata())
.transform(new Function<MatchResult.Metadata, ResourceId>() {
+
@Override
public ResourceId apply(MatchResult.Metadata metadata) {
return metadata.resourceId();
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services