This is an automated email from the ASF dual-hosted git repository.
chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 381f434 [BEAM-3060] Add Compressed TextIOIT
new 3b79b62 This closes 4149
381f434 is described below
commit 381f4348c1f97fe5d3c31469fe856e575150ad2d
Author: Ćukasz Gajowy <[email protected]>
AuthorDate: Mon Nov 20 17:00:54 2017 +0100
[BEAM-3060] Add Compressed TextIOIT
---
.../beam/sdk/io/common/IOTestPipelineOptions.java | 6 ++++
.../java/org/apache/beam/sdk/io/text/TextIOIT.java | 33 ++++++++++++++++++++--
2 files changed, 36 insertions(+), 3 deletions(-)
diff --git
a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java
b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java
index 91b3aa6..5a29d4f 100644
---
a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java
+++
b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java
@@ -100,4 +100,10 @@ public interface IOTestPipelineOptions extends
TestPipelineOptions {
String getFilenamePrefix();
void setFilenamePrefix(String prefix);
+
+ @Description("File compression type for writing and reading test files")
+ @Default.String("UNCOMPRESSED")
+ String getCompressionType();
+
+ void setCompressionType(String compressionType);
}
diff --git
a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java
b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java
index d741f95..e9aac80 100644
---
a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java
+++
b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java
@@ -15,18 +15,24 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.beam.sdk.io.text;
+import static org.apache.beam.sdk.io.Compression.AUTO;
+
import com.google.common.base.Function;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
+
import java.io.IOException;
import java.text.ParseException;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.Map;
+
+import org.apache.beam.sdk.io.Compression;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.TextIO;
@@ -50,7 +56,7 @@ import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
/**
- * An integration test for {@link org.apache.beam.sdk.io.TextIO}.
+ * Integration tests for {@link org.apache.beam.sdk.io.TextIO}.
*
* <p>Run this test using the command below. Pass in connection information
via PipelineOptions:
* <pre>
@@ -59,6 +65,7 @@ import org.junit.runners.JUnit4;
* -DintegrationTestPipelineOptions='[
* "--numberOfRecords=100000",
* "--filenamePrefix=TEXTIOIT"
+ * "--compressionType=GZIP"
* ]'
* </pre>
* </p>
@@ -70,6 +77,7 @@ public class TextIOIT {
private static String filenamePrefix;
private static Long numberOfTextLines;
+ private static Compression compressionType;
@Rule
public TestPipeline pipeline = TestPipeline.create();
@@ -82,6 +90,16 @@ public class TextIOIT {
numberOfTextLines = options.getNumberOfRecords();
filenamePrefix = appendTimestamp(options.getFilenamePrefix());
+ compressionType = parseCompressionType(options.getCompressionType());
+ }
+
+ private static Compression parseCompressionType(String compressionType) {
+ try {
+ return Compression.valueOf(compressionType.toUpperCase());
+ } catch (IllegalArgumentException ex) {
+ throw new IllegalArgumentException(
+ String.format("Unsupported compression type: %s", compressionType));
+ }
}
private static String appendTimestamp(String filenamePrefix) {
@@ -90,14 +108,20 @@ public class TextIOIT {
@Test
public void writeThenReadAll() {
+ TextIO.TypedWrite<String, Object> write = TextIO
+ .write()
+ .to(filenamePrefix)
+ .withOutputFilenames()
+ .withCompression(compressionType);
+
PCollection<String> testFilenames = pipeline
.apply("Generate sequence",
GenerateSequence.from(0).to(numberOfTextLines))
.apply("Produce text lines", ParDo.of(new
DeterministicallyConstructTestTextLineFn()))
- .apply("Write content to files",
TextIO.write().to(filenamePrefix).withOutputFilenames())
+ .apply("Write content to files", write)
.getPerDestinationOutputFilenames().apply(Values.<String>create());
PCollection<String> consolidatedHashcode = testFilenames
- .apply("Read all files", TextIO.readAll())
+ .apply("Read all files", TextIO.readAll().withCompression(AUTO))
.apply("Calculate hashcode", Combine.globally(new HashingFn()));
String expectedHash = getExpectedHashForLineCount(numberOfTextLines);
@@ -125,6 +149,7 @@ public class TextIOIT {
}
private static class DeterministicallyConstructTestTextLineFn extends
DoFn<Long, String> {
+
@ProcessElement
public void processElement(ProcessContext c) {
c.output(String.format("IO IT Test line of text. Line seed: %s",
c.element()));
@@ -132,6 +157,7 @@ public class TextIOIT {
}
private static class DeleteFileFn extends DoFn<String, Void> {
+
@ProcessElement
public void processElement(ProcessContext c) throws IOException {
MatchResult match = Iterables
@@ -142,6 +168,7 @@ public class TextIOIT {
private Collection<ResourceId> toResourceIds(MatchResult match) throws
IOException {
return FluentIterable.from(match.metadata())
.transform(new Function<MatchResult.Metadata, ResourceId>() {
+
@Override
public ResourceId apply(MatchResult.Metadata metadata) {
return metadata.resourceId();
--
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].