[ 
https://issues.apache.org/jira/browse/BEAM-3060?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264953#comment-16264953
 ] 

ASF GitHub Bot commented on BEAM-3060:
--------------------------------------

chamikaramj commented on a change in pull request #4149: [BEAM-3060] Add 
Compressed TextIOIT
URL: https://github.com/apache/beam/pull/4149#discussion_r152900718
 
 

 ##########
 File path: 
sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java
 ##########
 @@ -83,25 +90,82 @@ private static String appendTimestamp(String 
filenamePrefix) {
     return String.format("%s_%s", filenamePrefix, new Date().getTime());
   }
 
-  @Test
-  public void writeThenReadAll() {
-    PCollection<String> testFilenames = pipeline
-        .apply("Generate sequence", 
GenerateSequence.from(0).to(numberOfTextLines))
-        .apply("Produce text lines", ParDo.of(new 
DeterministicallyConstructTestTextLineFn()))
-        .apply("Write content to files", 
TextIO.write().to(filenamePrefix).withOutputFilenames())
-        .getPerDestinationOutputFilenames().apply(Values.<String>create());
+  /** IO IT with no compression. */
+  @RunWith(JUnit4.class)
+  public static class UncompressedTextIOIT {
+
+    @Rule
+    public TestPipeline pipeline = TestPipeline.create();
+
+    @Test
+    public void writeThenReadAll() {
+      PCollection<String> testFilenames = pipeline
+          .apply("Generate sequence", 
GenerateSequence.from(0).to(numberOfTextLines))
+          .apply("Produce text lines", ParDo.of(new 
DeterministicallyConstructTestTextLineFn()))
+          .apply("Write content to files", 
TextIO.write().to(filenamePrefix).withOutputFilenames())
+          .getPerDestinationOutputFilenames().apply(Values.<String>create());
+
+      PCollection<String> consolidatedHashcode = testFilenames
+          .apply("Read all files", TextIO.readAll())
+          .apply("Calculate hashcode", Combine.globally(new HashingFn()));
+
+      String expectedHash = getExpectedHashForLineCount(numberOfTextLines);
+      PAssert.thatSingleton(consolidatedHashcode).isEqualTo(expectedHash);
+
+      testFilenames.apply("Delete test files", ParDo.of(new DeleteFileFn())
+          
.withSideInputs(consolidatedHashcode.apply(View.<String>asSingleton())));
+
+      pipeline.run().waitUntilFinish();
+    }
+  }
+
+  /** IO IT with various compression types. */
+  @RunWith(Parameterized.class)
+  public static class CompressedTextIOIT {
+
+    @Rule
+    public TestPipeline pipeline = TestPipeline.create();
+
+    @Parameterized.Parameters()
+    public static Iterable<Compression> data() {
+      return ImmutableList.<Compression>builder()
+          .add(GZIP)
+          .add(DEFLATE)
+          .add(BZIP2)
+          .build();
+    }
+
+    @Parameterized.Parameter()
+    public Compression compression;
+
+    @Test
+    public void writeThenReadAllWithCompression() {
+      TextIO.TypedWrite<String, Object> write = TextIO
+          .write()
+          .to(filenamePrefix)
+          .withOutputFilenames()
+          .withCompression(compression);
+
+      TextIO.ReadAll read = TextIO.readAll().withCompression(AUTO);
 
-    PCollection<String> consolidatedHashcode = testFilenames
-        .apply("Read all files", TextIO.readAll())
-        .apply("Calculate hashcode", Combine.globally(new HashingFn()));
+      PCollection<String> testFilenames = pipeline
 
 Review comment:
   This and uncompressed version have the same pipeline. Can't we share to code 
between tests (and keep the same test class TextIOIT) and add "compression 
type" as a parameter to the test (a Maven -D parameter for the perfkit based 
runs) ?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add performance tests for commonly used file-based I/O PTransforms
> ------------------------------------------------------------------
>
>                 Key: BEAM-3060
>                 URL: https://issues.apache.org/jira/browse/BEAM-3060
>             Project: Beam
>          Issue Type: Test
>          Components: sdk-java-core
>            Reporter: Chamikara Jayalath
>            Assignee: Szymon Nieradka
>
> We recently added a performance testing framework [1] that can be used to do 
> following.
> (1) Execute Beam tests using PerfkitBenchmarker
> (2) Manage Kubernetes-based deployments of data stores.
> (3) Easily publish benchmark results. 
> I think it will be useful to add performance tests for commonly used 
> file-based I/O PTransforms using this framework. I suggest looking into 
> following formats initially.
> (1) AvroIO
> (2) TextIO
> (3) Compressed text using TextIO
> (4) TFRecordIO
> It should be possibly to run these tests for various Beam runners (Direct, 
> Dataflow, Flink, Spark, etc.) and file-systems (GCS, local, HDFS, etc.) 
> easily.
> In the initial version, tests can be made manually triggerable for PRs 
> through Jenkins. Later, we could make some of these tests run periodically 
> and publish benchmark results (to BigQuery) through PerfkitBenchmarker.
> [1] https://beam.apache.org/documentation/io/testing/



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to