[
https://issues.apache.org/jira/browse/BEAM-4137?focusedWorklogId=110117&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-110117
]
ASF GitHub Bot logged work on BEAM-4137:
----------------------------------------
Author: ASF GitHub Bot
Created on: 08/Jun/18 13:46
Start Date: 08/Jun/18 13:46
Worklog Time Spent: 10m
Work Description: iemejia closed pull request #5569: [BEAM-4137] split io
test pipeline options
URL: https://github.com/apache/beam/pull/5569
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/.test-infra/jenkins/job_PerformanceTests_FileBasedIO_IT.groovy
b/.test-infra/jenkins/job_PerformanceTests_FileBasedIO_IT.groovy
index 7c4ebf63b07..6edf489de87 100644
--- a/.test-infra/jenkins/job_PerformanceTests_FileBasedIO_IT.groovy
+++ b/.test-infra/jenkins/job_PerformanceTests_FileBasedIO_IT.groovy
@@ -85,8 +85,7 @@ def testsConfigurations = [
prCommitStatusName: 'Java ParquetIOPerformance Test',
prTriggerPhase : 'Run Java ParquetIO Performance Test',
extraPipelineArgs: [
- numberOfRecords: '100000000',
- charset: 'UTF-8'
+ numberOfRecords: '100000000'
]
]
]
diff --git
a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/DatabaseTestHelper.java
b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/DatabaseTestHelper.java
index d69654a25fa..101eace07a0 100644
---
a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/DatabaseTestHelper.java
+++
b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/DatabaseTestHelper.java
@@ -30,7 +30,7 @@
*/
public class DatabaseTestHelper {
- public static PGSimpleDataSource getPostgresDataSource(IOTestPipelineOptions
options) {
+ public static PGSimpleDataSource
getPostgresDataSource(PostgresIOTestPipelineOptions options) {
PGSimpleDataSource dataSource = new PGSimpleDataSource();
dataSource.setDatabaseName(options.getPostgresDatabaseName());
dataSource.setServerName(options.getPostgresServerName());
@@ -67,7 +67,7 @@ public static String getTestTableName(String testIdentifier) {
return String.format("BEAMTEST_%s_%s", testIdentifier,
formatter.format(new Date()));
}
- public static String getPostgresDBUrl(IOTestPipelineOptions options) {
+ public static String getPostgresDBUrl(PostgresIOTestPipelineOptions options)
{
return String.format(
"jdbc:postgresql://%s:%s/%s",
options.getPostgresServerName(),
diff --git
a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOITHelper.java
b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOITHelper.java
index 819bc9a171c..5d0971fab02 100644
---
a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOITHelper.java
+++
b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOITHelper.java
@@ -18,6 +18,9 @@
package org.apache.beam.sdk.io.common;
import java.util.Map;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.PipelineOptionsValidator;
+import org.apache.beam.sdk.testing.TestPipeline;
/**
* Methods common to all types of IOITs.
@@ -36,4 +39,15 @@ public static String getHashForRecordCount(int recordCount,
Map<Integer, String>
}
return hash;
}
+
+ public static <T extends IOTestPipelineOptions> T readIOTestPipelineOptions(
+ Class<T> optionsType) {
+
+ PipelineOptionsFactory.register(optionsType);
+ IOTestPipelineOptions options = TestPipeline
+ .testingPipelineOptions()
+ .as(optionsType);
+
+ return PipelineOptionsValidator.validate(optionsType, options);
+ }
}
diff --git
a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java
b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java
index 89b7ae81bc5..ab23ca40cd9 100644
---
a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java
+++
b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java
@@ -19,7 +19,6 @@
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
-import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.testing.TestPipelineOptions;
/**
@@ -28,39 +27,6 @@
* that were passed on the command line.
*/
public interface IOTestPipelineOptions extends TestPipelineOptions {
- /* Postgres */
- @Description("Server name for postgres server (host name/ip address)")
- @Default.String("postgres-server-name")
- String getPostgresServerName();
- void setPostgresServerName(String value);
-
- @Description("Username for postgres server")
- @Default.String("postgres-username")
- String getPostgresUsername();
- void setPostgresUsername(String value);
-
- // Note that passwords are not as secure an authentication as other methods,
and used here for
- // a test environment only.
- @Description("Password for postgres server")
- @Default.String("postgres-password")
- String getPostgresPassword();
- void setPostgresPassword(String value);
-
- @Description("Database name for postgres server")
- @Default.String("postgres-database-name")
- String getPostgresDatabaseName();
- void setPostgresDatabaseName(String value);
-
- @Description("Port for postgres server")
- @Default.Integer(0)
- Integer getPostgresPort();
- void setPostgresPort(Integer value);
-
- @Description("Whether the postgres server uses SSL")
- @Default.Boolean(true)
- Boolean getPostgresSsl();
- void setPostgresSsl(Boolean value);
-
/* Elasticsearch */
@Description("Server name for Elasticsearch server (host name/ip address)")
@Default.String("elasticsearch-server-name")
@@ -89,29 +55,10 @@
Integer getCassandraPort();
void setCassandraPort(Integer port);
- /* Options for test pipeline for file-based I/O in
'sdks/java/io/file-based-io-tests/'. */
+ /* Used by most IOIT */
@Description("Number records that will be written and read by the test")
@Default.Integer(100000)
Integer getNumberOfRecords();
void setNumberOfRecords(Integer count);
-
- @Description("Destination prefix for files generated by the test")
- @Validation.Required
- String getFilenamePrefix();
-
- void setFilenamePrefix(String prefix);
-
- @Description("File compression type for writing and reading test files")
- @Default.String("UNCOMPRESSED")
- String getCompressionType();
-
- void setCompressionType(String compressionType);
-
- /* Used by XmlIOIT */
- @Description("Xml file charset name")
- @Default.String("UTF-8")
- String getCharset();
-
- void setCharset(String charset);
}
diff --git
a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/PostgresIOTestPipelineOptions.java
b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/PostgresIOTestPipelineOptions.java
new file mode 100644
index 00000000000..f781f60f5cf
--- /dev/null
+++
b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/PostgresIOTestPipelineOptions.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.common;
+
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.Validation;
+
+/** Postgres related IO testing pipeline options. */
+public interface PostgresIOTestPipelineOptions extends IOTestPipelineOptions {
+
+ @Description("Server name for postgres server (host name/ip address)")
+ @Validation.Required
+ String getPostgresServerName();
+ void setPostgresServerName(String value);
+
+ @Description("Username for postgres server")
+ @Validation.Required
+ String getPostgresUsername();
+ void setPostgresUsername(String value);
+
+ // Note that passwords are not as secure an authentication as other methods,
and used here for
+ // a test environment only.
+ @Description("Password for postgres server")
+ @Validation.Required
+ String getPostgresPassword();
+ void setPostgresPassword(String value);
+
+ @Description("Database name for postgres server")
+ @Validation.Required
+ String getPostgresDatabaseName();
+ void setPostgresDatabaseName(String value);
+
+ @Description("Port for postgres server")
+ @Default.Integer(5432)
+ Integer getPostgresPort();
+ void setPostgresPort(Integer value);
+
+ @Description("Whether the postgres server uses SSL")
+ @Default.Boolean(true)
+ Boolean getPostgresSsl();
+ void setPostgresSsl(Boolean value);
+}
diff --git
a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/avro/AvroIOIT.java
b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/avro/AvroIOIT.java
index bcc239c991e..076af31bbfa 100644
---
a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/avro/AvroIOIT.java
+++
b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/avro/AvroIOIT.java
@@ -19,7 +19,7 @@
import static
org.apache.beam.sdk.io.common.FileBasedIOITHelper.appendTimestampSuffix;
import static
org.apache.beam.sdk.io.common.FileBasedIOITHelper.getExpectedHashForLineCount;
-import static
org.apache.beam.sdk.io.common.FileBasedIOITHelper.readTestPipelineOptions;
+import static
org.apache.beam.sdk.io.common.FileBasedIOITHelper.readFileBasedIOITPipelineOptions;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
@@ -29,8 +29,8 @@
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.common.FileBasedIOITHelper;
import org.apache.beam.sdk.io.common.FileBasedIOITHelper.DeleteFileFn;
+import org.apache.beam.sdk.io.common.FileBasedIOTestPipelineOptions;
import org.apache.beam.sdk.io.common.HashingFn;
-import org.apache.beam.sdk.io.common.IOTestPipelineOptions;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Combine;
@@ -66,7 +66,6 @@
@RunWith(JUnit4.class)
public class AvroIOIT {
-
private static final Schema AVRO_SCHEMA = new Schema.Parser().parse("{\n"
+ " \"namespace\": \"ioitavro\",\n"
+ " \"type\": \"record\",\n"
@@ -84,7 +83,7 @@
@BeforeClass
public static void setup() {
- IOTestPipelineOptions options = readTestPipelineOptions();
+ FileBasedIOTestPipelineOptions options =
readFileBasedIOITPipelineOptions();
numberOfTextLines = options.getNumberOfRecords();
filenamePrefix = appendTimestampSuffix(options.getFilenamePrefix());
@@ -93,35 +92,27 @@ public static void setup() {
@Test
public void writeThenReadAll() {
- PCollection<String> testFilenames =
- pipeline
- .apply("Generate sequence",
GenerateSequence.from(0).to(numberOfTextLines))
- .apply(
- "Produce text lines",
- ParDo.of(new
FileBasedIOITHelper.DeterministicallyConstructTestTextLineFn()))
- .apply("Produce Avro records", ParDo.of(new
DeterministicallyConstructAvroRecordsFn()))
- .setCoder(AvroCoder.of(AVRO_SCHEMA))
- .apply(
- "Write Avro records to files",
- AvroIO.writeGenericRecords(AVRO_SCHEMA)
- .to(filenamePrefix)
- .withOutputFilenames()
- .withSuffix(".avro"))
- .getPerDestinationOutputFilenames()
- .apply(Values.create());
+ PCollection<String> testFilenames = pipeline
+ .apply("Generate sequence",
GenerateSequence.from(0).to(numberOfTextLines))
+ .apply("Produce text lines",
+ ParDo.of(new
FileBasedIOITHelper.DeterministicallyConstructTestTextLineFn()))
+ .apply("Produce Avro records", ParDo.of(new
DeterministicallyConstructAvroRecordsFn()))
+ .setCoder(AvroCoder.of(AVRO_SCHEMA))
+ .apply("Write Avro records to files",
+
AvroIO.writeGenericRecords(AVRO_SCHEMA).to(filenamePrefix).withOutputFilenames()
+ .withSuffix(".avro")).getPerDestinationOutputFilenames()
+ .apply(Values.create());
PCollection<String> consolidatedHashcode = testFilenames
- .apply("Read all files", AvroIO.readAllGenericRecords(AVRO_SCHEMA))
- .apply("Parse Avro records to Strings", ParDo.of(new
ParseAvroRecordsFn()))
- .apply("Calculate hashcode", Combine.globally(new HashingFn()));
+ .apply("Read all files", AvroIO.readAllGenericRecords(AVRO_SCHEMA))
+ .apply("Parse Avro records to Strings", ParDo.of(new
ParseAvroRecordsFn()))
+ .apply("Calculate hashcode", Combine.globally(new HashingFn()));
String expectedHash = getExpectedHashForLineCount(numberOfTextLines);
PAssert.thatSingleton(consolidatedHashcode).isEqualTo(expectedHash);
- testFilenames.apply(
- "Delete test files",
- ParDo.of(new DeleteFileFn())
- .withSideInputs(consolidatedHashcode.apply(View.asSingleton())));
+ testFilenames.apply("Delete test files",
+ ParDo.of(new
DeleteFileFn()).withSideInputs(consolidatedHashcode.apply(View.asSingleton())));
pipeline.run().waitUntilFinish();
}
@@ -129,9 +120,7 @@ public void writeThenReadAll() {
private static class DeterministicallyConstructAvroRecordsFn extends
DoFn<String, GenericRecord> {
@ProcessElement
public void processElement(ProcessContext c) {
- c.output(
- new GenericRecordBuilder(AVRO_SCHEMA).set("row", c.element()).build()
- );
+ c.output(new GenericRecordBuilder(AVRO_SCHEMA).set("row",
c.element()).build());
}
}
@@ -141,5 +130,4 @@ public void processElement(ProcessContext c) {
c.output(String.valueOf(c.element().get("row")));
}
}
-
}
diff --git
a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/common/FileBasedIOITHelper.java
b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/common/FileBasedIOITHelper.java
index 13694c9b996..8cf086e69c2 100644
---
a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/common/FileBasedIOITHelper.java
+++
b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/common/FileBasedIOITHelper.java
@@ -30,9 +30,6 @@
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.fs.MatchResult;
import org.apache.beam.sdk.io.fs.ResourceId;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.options.PipelineOptionsValidator;
-import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.DoFn;
@@ -44,13 +41,8 @@
private FileBasedIOITHelper() {
}
- public static IOTestPipelineOptions readTestPipelineOptions() {
- PipelineOptionsFactory.register(IOTestPipelineOptions.class);
- IOTestPipelineOptions options = TestPipeline
- .testingPipelineOptions()
- .as(IOTestPipelineOptions.class);
-
- return PipelineOptionsValidator.validate(IOTestPipelineOptions.class,
options);
+ public static FileBasedIOTestPipelineOptions
readFileBasedIOITPipelineOptions() {
+ return
IOITHelper.readIOTestPipelineOptions(FileBasedIOTestPipelineOptions.class);
}
public static String appendTimestampSuffix(String text) {
diff --git
a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/common/FileBasedIOTestPipelineOptions.java
b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/common/FileBasedIOTestPipelineOptions.java
new file mode 100644
index 00000000000..f78ab6fd6c3
--- /dev/null
+++
b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/common/FileBasedIOTestPipelineOptions.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.common;
+
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.Validation;
+
+/** Pipeline options for all file based IOITs. */
+public interface FileBasedIOTestPipelineOptions extends IOTestPipelineOptions {
+
+ @Description("Destination prefix for files generated by the test")
+ @Validation.Required
+ String getFilenamePrefix();
+
+ void setFilenamePrefix(String prefix);
+
+ @Description("File compression type for writing and reading test files")
+ @Default.String("UNCOMPRESSED")
+ String getCompressionType();
+
+ void setCompressionType(String compressionType);
+}
diff --git
a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOIT.java
b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOIT.java
index bc9b67c4bde..e4748ccd9cd 100644
---
a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOIT.java
+++
b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOIT.java
@@ -19,7 +19,7 @@
import static
org.apache.beam.sdk.io.common.FileBasedIOITHelper.appendTimestampSuffix;
import static
org.apache.beam.sdk.io.common.FileBasedIOITHelper.getExpectedHashForLineCount;
-import static
org.apache.beam.sdk.io.common.FileBasedIOITHelper.readTestPipelineOptions;
+import static
org.apache.beam.sdk.io.common.FileBasedIOITHelper.readFileBasedIOITPipelineOptions;
import static org.apache.beam.sdk.values.TypeDescriptors.strings;
import org.apache.avro.Schema;
@@ -29,8 +29,8 @@
import org.apache.beam.sdk.io.FileIO;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.common.FileBasedIOITHelper;
+import org.apache.beam.sdk.io.common.FileBasedIOTestPipelineOptions;
import org.apache.beam.sdk.io.common.HashingFn;
-import org.apache.beam.sdk.io.common.IOTestPipelineOptions;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Combine;
@@ -85,7 +85,7 @@
@BeforeClass
public static void setup() {
- IOTestPipelineOptions options = readTestPipelineOptions();
+ FileBasedIOTestPipelineOptions options =
readFileBasedIOITPipelineOptions();
numberOfRecords = options.getNumberOfRecords();
filenamePrefix = appendTimestampSuffix(options.getFilenamePrefix());
@@ -93,12 +93,12 @@ public static void setup() {
@Test
public void writeThenReadAll() {
- PCollection<String> testFiles =
- pipeline.apply("Generate sequence",
GenerateSequence.from(0).to(numberOfRecords))
+ PCollection<String> testFiles = pipeline
+ .apply("Generate sequence", GenerateSequence.from(0).to(numberOfRecords))
.apply("Produce text lines",
ParDo.of(new
FileBasedIOITHelper.DeterministicallyConstructTestTextLineFn()))
.apply("Produce Avro records", ParDo.of(new
DeterministicallyConstructAvroRecordsFn()))
- .setCoder(AvroCoder.of(SCHEMA))
+ .setCoder(AvroCoder.of(SCHEMA))
.apply("Write Parquet files",
FileIO.<GenericRecord>write().via(ParquetIO.sink(SCHEMA)).to(filenamePrefix))
.getPerDestinationOutputFilenames()
@@ -107,9 +107,10 @@ public void writeThenReadAll() {
PCollection<String> consolidatedHashcode = testFiles.apply("Find files",
FileIO.matchAll())
.apply("Read matched files", FileIO.readMatches())
.apply("Read parquet files", ParquetIO.readFiles(SCHEMA))
- .apply("Map records to strings", MapElements.into(strings())
- .via((SerializableFunction<GenericRecord, String>) record -> String
- .valueOf(record.get("row"))))
+ .apply("Map records to strings",
+ MapElements.into(strings()).via(
+ (SerializableFunction<GenericRecord, String>) record -> String
+ .valueOf(record.get("row"))))
.apply("Calculate hashcode", Combine.globally(new HashingFn()));
String expectedHash = getExpectedHashForLineCount(numberOfRecords);
diff --git
a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java
b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java
index a7a55730de3..0894ab04203 100644
---
a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java
+++
b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java
@@ -21,15 +21,15 @@
import static org.apache.beam.sdk.io.Compression.AUTO;
import static
org.apache.beam.sdk.io.common.FileBasedIOITHelper.appendTimestampSuffix;
import static
org.apache.beam.sdk.io.common.FileBasedIOITHelper.getExpectedHashForLineCount;
-import static
org.apache.beam.sdk.io.common.FileBasedIOITHelper.readTestPipelineOptions;
+import static
org.apache.beam.sdk.io.common.FileBasedIOITHelper.readFileBasedIOITPipelineOptions;
import org.apache.beam.sdk.io.Compression;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.common.FileBasedIOITHelper;
import org.apache.beam.sdk.io.common.FileBasedIOITHelper.DeleteFileFn;
+import org.apache.beam.sdk.io.common.FileBasedIOTestPipelineOptions;
import org.apache.beam.sdk.io.common.HashingFn;
-import org.apache.beam.sdk.io.common.IOTestPipelineOptions;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Combine;
@@ -74,7 +74,7 @@
@BeforeClass
public static void setup() {
- IOTestPipelineOptions options = readTestPipelineOptions();
+ FileBasedIOTestPipelineOptions options =
readFileBasedIOITPipelineOptions();
numberOfTextLines = options.getNumberOfRecords();
filenamePrefix = appendTimestampSuffix(options.getFilenamePrefix());
@@ -84,32 +84,27 @@ public static void setup() {
@Test
public void writeThenReadAll() {
TextIO.TypedWrite<String, Object> write = TextIO
- .write()
- .to(filenamePrefix)
- .withOutputFilenames()
- .withCompression(compressionType);
+ .write()
+ .to(filenamePrefix)
+ .withOutputFilenames()
+ .withCompression(compressionType);
- PCollection<String> testFilenames =
- pipeline
- .apply("Generate sequence",
GenerateSequence.from(0).to(numberOfTextLines))
- .apply(
- "Produce text lines",
- ParDo.of(new
FileBasedIOITHelper.DeterministicallyConstructTestTextLineFn()))
- .apply("Write content to files", write)
- .getPerDestinationOutputFilenames()
- .apply(Values.create());
+ PCollection<String> testFilenames = pipeline
+ .apply("Generate sequence",
GenerateSequence.from(0).to(numberOfTextLines))
+ .apply("Produce text lines",
+ ParDo.of(new
FileBasedIOITHelper.DeterministicallyConstructTestTextLineFn()))
+ .apply("Write content to files",
write).getPerDestinationOutputFilenames()
+ .apply(Values.create());
PCollection<String> consolidatedHashcode = testFilenames
- .apply("Read all files", TextIO.readAll().withCompression(AUTO))
- .apply("Calculate hashcode", Combine.globally(new HashingFn()));
+ .apply("Read all files", TextIO.readAll().withCompression(AUTO))
+ .apply("Calculate hashcode", Combine.globally(new HashingFn()));
String expectedHash = getExpectedHashForLineCount(numberOfTextLines);
PAssert.thatSingleton(consolidatedHashcode).isEqualTo(expectedHash);
- testFilenames.apply(
- "Delete test files",
- ParDo.of(new DeleteFileFn())
- .withSideInputs(consolidatedHashcode.apply(View.asSingleton())));
+ testFilenames.apply("Delete test files",
+ ParDo.of(new
DeleteFileFn()).withSideInputs(consolidatedHashcode.apply(View.asSingleton())));
pipeline.run().waitUntilFinish();
}
diff --git
a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/tfrecord/TFRecordIOIT.java
b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/tfrecord/TFRecordIOIT.java
index 56c3e03be6f..619884b65e3 100644
---
a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/tfrecord/TFRecordIOIT.java
+++
b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/tfrecord/TFRecordIOIT.java
@@ -15,13 +15,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.beam.sdk.io.tfrecord;
import static org.apache.beam.sdk.io.Compression.AUTO;
import static
org.apache.beam.sdk.io.common.FileBasedIOITHelper.appendTimestampSuffix;
import static
org.apache.beam.sdk.io.common.FileBasedIOITHelper.getExpectedHashForLineCount;
-import static
org.apache.beam.sdk.io.common.FileBasedIOITHelper.readTestPipelineOptions;
+import static
org.apache.beam.sdk.io.common.FileBasedIOITHelper.readFileBasedIOITPipelineOptions;
import java.nio.charset.StandardCharsets;
import org.apache.beam.sdk.io.Compression;
@@ -29,8 +28,8 @@
import org.apache.beam.sdk.io.TFRecordIO;
import org.apache.beam.sdk.io.common.FileBasedIOITHelper;
import org.apache.beam.sdk.io.common.FileBasedIOITHelper.DeleteFileFn;
+import org.apache.beam.sdk.io.common.FileBasedIOTestPipelineOptions;
import org.apache.beam.sdk.io.common.HashingFn;
-import org.apache.beam.sdk.io.common.IOTestPipelineOptions;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Combine;
@@ -47,7 +46,6 @@
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
-
/**
* Integration tests for {@link org.apache.beam.sdk.io.TFRecordIO}.
*
@@ -82,7 +80,7 @@
@BeforeClass
public static void setup() {
- IOTestPipelineOptions options = readTestPipelineOptions();
+ FileBasedIOTestPipelineOptions options =
readFileBasedIOITPipelineOptions();
numberOfTextLines = options.getNumberOfRecords();
filenamePrefix = appendTimestampSuffix(options.getFilenamePrefix());
@@ -97,37 +95,34 @@ private static String createFilenamePattern() {
@Test
public void writeThenReadAll() {
TFRecordIO.Write writeTransform = TFRecordIO
- .write()
- .to(filenamePrefix)
- .withCompression(compressionType)
- .withSuffix(".tfrecord");
+ .write()
+ .to(filenamePrefix)
+ .withCompression(compressionType)
+ .withSuffix(".tfrecord");
writePipeline
- .apply("Generate sequence",
GenerateSequence.from(0).to(numberOfTextLines))
- .apply("Produce text lines",
- ParDo.of(new
FileBasedIOITHelper.DeterministicallyConstructTestTextLineFn()))
- .apply("Transform strings to bytes", MapElements.via(new
StringToByteArray()))
- .apply("Write content to files", writeTransform);
+ .apply("Generate sequence",
GenerateSequence.from(0).to(numberOfTextLines))
+ .apply("Produce text lines",
+ ParDo.of(new
FileBasedIOITHelper.DeterministicallyConstructTestTextLineFn()))
+ .apply("Transform strings to bytes", MapElements.via(new
StringToByteArray()))
+ .apply("Write content to files", writeTransform);
writePipeline.run().waitUntilFinish();
String filenamePattern = createFilenamePattern();
- PCollection<String> consolidatedHashcode =
- readPipeline
-
.apply(TFRecordIO.read().from(filenamePattern).withCompression(AUTO))
- .apply("Transform bytes to strings", MapElements.via(new
ByteArrayToString()))
- .apply("Calculate hashcode", Combine.globally(new HashingFn()))
- .apply(Reshuffle.viaRandomKey());
+ PCollection<String> consolidatedHashcode = readPipeline
+ .apply(TFRecordIO.read().from(filenamePattern).withCompression(AUTO))
+ .apply("Transform bytes to strings", MapElements.via(new
ByteArrayToString()))
+ .apply("Calculate hashcode", Combine.globally(new HashingFn()))
+ .apply(Reshuffle.viaRandomKey());
String expectedHash = getExpectedHashForLineCount(numberOfTextLines);
PAssert.thatSingleton(consolidatedHashcode).isEqualTo(expectedHash);
readPipeline
- .apply(Create.of(filenamePattern))
- .apply(
- "Delete test files",
- ParDo.of(new DeleteFileFn())
-
.withSideInputs(consolidatedHashcode.apply(View.asSingleton())));
+ .apply(Create.of(filenamePattern))
+ .apply("Delete test files", ParDo.of(new DeleteFileFn())
+ .withSideInputs(consolidatedHashcode.apply(View.asSingleton())));
readPipeline.run().waitUntilFinish();
}
diff --git
a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/xml/XmlIOIT.java
b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/xml/XmlIOIT.java
index f5d61462e9f..580d290dd04 100644
---
a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/xml/XmlIOIT.java
+++
b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/xml/XmlIOIT.java
@@ -19,6 +19,7 @@
import static
org.apache.beam.sdk.io.common.FileBasedIOITHelper.appendTimestampSuffix;
import static org.apache.beam.sdk.io.common.IOITHelper.getHashForRecordCount;
+import static
org.apache.beam.sdk.io.common.IOITHelper.readIOTestPipelineOptions;
import com.google.common.collect.ImmutableMap;
import java.io.Serializable;
@@ -29,9 +30,10 @@
import org.apache.beam.sdk.io.FileIO;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.common.FileBasedIOITHelper;
+import org.apache.beam.sdk.io.common.FileBasedIOTestPipelineOptions;
import org.apache.beam.sdk.io.common.HashingFn;
-import org.apache.beam.sdk.io.common.IOTestPipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Combine;
@@ -69,6 +71,15 @@
@RunWith(JUnit4.class)
public class XmlIOIT {
+ /** XmlIOIT options. */
+ public interface XmlIOITPipelineOptions extends
FileBasedIOTestPipelineOptions {
+ @Description("Xml file charset name")
+ @Default.String("UTF-8")
+ String getCharset();
+
+ void setCharset(String charset);
+ }
+
private static final ImmutableMap<Integer, String> EXPECTED_HASHES =
ImmutableMap.of(
1000, "7f51adaf701441ee83459a3f705c1b86",
100_000, "af7775de90d0b0c8bbc36273fbca26fe",
@@ -86,10 +97,7 @@
@BeforeClass
public static void setup() {
- PipelineOptionsFactory.register(IOTestPipelineOptions.class);
- IOTestPipelineOptions options = TestPipeline
- .testingPipelineOptions()
- .as(IOTestPipelineOptions.class);
+ XmlIOITPipelineOptions options =
readIOTestPipelineOptions(XmlIOITPipelineOptions.class);
filenamePrefix = appendTimestampSuffix(options.getFilenamePrefix());
numberOfRecords = options.getNumberOfRecords();
@@ -100,13 +108,13 @@ public static void setup() {
public void writeThenReadAll() {
PCollection<String> testFileNames = pipeline
.apply("Generate sequence", GenerateSequence.from(0).to(numberOfRecords))
- .apply("Create xml records", MapElements.via(new LongToBird()))
- .apply("Write xml files", FileIO.<Bird>write()
- .via(XmlIO.sink(Bird.class)
+ .apply("Create xml records", MapElements.via(new
LongToBird())).apply("Write xml files",
+ FileIO.<Bird>write()
+ .via(XmlIO
+ .sink(Bird.class)
.withRootElement("birds")
.withCharset(charset))
- .to(filenamePrefix)
- .withPrefix("birds")
+ .to(filenamePrefix).withPrefix("birds")
.withSuffix(".xml"))
.getPerDestinationOutputFilenames()
.apply("Get file names", Values.create());
@@ -114,10 +122,12 @@ public void writeThenReadAll() {
PCollection<Bird> birds = testFileNames
.apply("Find files", FileIO.matchAll())
.apply("Read matched files", FileIO.readMatches())
- .apply("Read xml files", XmlIO.<Bird>readFiles()
- .withRecordClass(Bird.class).withRootElement("birds")
- .withRecordElement("bird")
- .withCharset(charset));
+ .apply("Read xml files",
+ XmlIO.<Bird>readFiles()
+ .withRecordClass(Bird.class)
+ .withRootElement("birds")
+ .withRecordElement("bird")
+ .withCharset(charset));
PCollection<String> consolidatedHashcode = birds
.apply("Map xml records to strings", MapElements.via(new BirdToString()))
@@ -127,7 +137,7 @@ public void writeThenReadAll() {
PAssert.thatSingleton(consolidatedHashcode).isEqualTo(expectedHash);
testFileNames.apply("Delete test files", ParDo.of(new
FileBasedIOITHelper.DeleteFileFn())
- .withSideInputs(consolidatedHashcode.apply(View.asSingleton())));
+ .withSideInputs(consolidatedHashcode.apply(View.asSingleton())));
pipeline.run().waitUntilFinish();
}
diff --git
a/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOIT.java
b/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOIT.java
index e24dd68dd2c..b1ffec4ddad 100644
---
a/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOIT.java
+++
b/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOIT.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.io.hadoop.inputformat;
+import static
org.apache.beam.sdk.io.common.IOITHelper.readIOTestPipelineOptions;
import static
org.apache.beam.sdk.io.common.TestRow.DeterministicallyConstructTestRowFn;
import static org.apache.beam.sdk.io.common.TestRow.SelectNameFn;
import static org.apache.beam.sdk.io.common.TestRow.getExpectedHashForRowCount;
@@ -26,11 +27,10 @@
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.common.DatabaseTestHelper;
import org.apache.beam.sdk.io.common.HashingFn;
-import org.apache.beam.sdk.io.common.IOTestPipelineOptions;
+import org.apache.beam.sdk.io.common.PostgresIOTestPipelineOptions;
import org.apache.beam.sdk.io.common.TestRow;
import org.apache.beam.sdk.io.hadoop.SerializableConfiguration;
import org.apache.beam.sdk.io.jdbc.JdbcIO;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Combine;
@@ -87,9 +87,8 @@
@BeforeClass
public static void setUp() throws SQLException {
- PipelineOptionsFactory.register(IOTestPipelineOptions.class);
- IOTestPipelineOptions options = TestPipeline.testingPipelineOptions()
- .as(IOTestPipelineOptions.class);
+ PostgresIOTestPipelineOptions options = readIOTestPipelineOptions(
+ PostgresIOTestPipelineOptions.class);
dataSource = DatabaseTestHelper.getPostgresDataSource(options);
numberOfRows = options.getNumberOfRecords();
@@ -99,7 +98,7 @@ public static void setUp() throws SQLException {
setupHadoopConfiguration(options);
}
- private static void setupHadoopConfiguration(IOTestPipelineOptions options) {
+ private static void setupHadoopConfiguration(PostgresIOTestPipelineOptions
options) {
Configuration conf = new Configuration();
DBConfiguration.configureDB(
conf,
@@ -127,31 +126,25 @@ public static void tearDown() throws SQLException {
@Test
public void readUsingHadoopInputFormat() {
- writePipeline
- .apply("Generate sequence", GenerateSequence.from(0).to(numberOfRows))
- .apply("Produce db rows", ParDo.of(new
DeterministicallyConstructTestRowFn()))
- .apply("Prevent fusion before writing", Reshuffle.viaRandomKey())
- .apply(
- "Write using JDBCIO",
- JdbcIO.<TestRow>write()
-
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(dataSource))
- .withStatement(String.format("insert into %s values(?, ?)",
tableName))
- .withPreparedStatementSetter(new
PrepareStatementFromTestRow()));
+ writePipeline.apply("Generate sequence",
GenerateSequence.from(0).to(numberOfRows))
+ .apply("Produce db rows", ParDo.of(new
DeterministicallyConstructTestRowFn()))
+ .apply("Prevent fusion before writing", Reshuffle.viaRandomKey())
+ .apply("Write using JDBCIO", JdbcIO.<TestRow>write()
+
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(dataSource))
+ .withStatement(String.format("insert into %s values(?, ?)", tableName))
+ .withPreparedStatementSetter(new PrepareStatementFromTestRow()));
writePipeline.run().waitUntilFinish();
- PCollection<String> consolidatedHashcode =
- readPipeline
- .apply(
- "Read using HadoopInputFormat",
- HadoopInputFormatIO.<LongWritable, TestRowDBWritable>read()
- .withConfiguration(hadoopConfiguration.get()))
- .apply("Get values only", Values.create())
- .apply("Values as string", ParDo.of(new SelectNameFn()))
- .apply("Calculate hashcode", Combine.globally(new HashingFn()));
-
- PAssert.thatSingleton(consolidatedHashcode)
- .isEqualTo(getExpectedHashForRowCount(numberOfRows));
+ PCollection<String> consolidatedHashcode = readPipeline
+ .apply("Read using HadoopInputFormat", HadoopInputFormatIO.
+ <LongWritable, TestRowDBWritable>read()
+ .withConfiguration(hadoopConfiguration.get()))
+ .apply("Get values only", Values.create())
+ .apply("Values as string", ParDo.of(new SelectNameFn()))
+ .apply("Calculate hashcode", Combine.globally(new HashingFn()));
+
+
PAssert.thatSingleton(consolidatedHashcode).isEqualTo(getExpectedHashForRowCount(numberOfRows));
readPipeline.run().waitUntilFinish();
}
diff --git
a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java
b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java
index 7c4ef4012d8..7123ed4ebab 100644
--- a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java
+++ b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java
@@ -17,15 +17,16 @@
*/
package org.apache.beam.sdk.io.jdbc;
+import static
org.apache.beam.sdk.io.common.IOITHelper.readIOTestPipelineOptions;
+
import java.sql.SQLException;
import java.util.List;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.common.DatabaseTestHelper;
import org.apache.beam.sdk.io.common.HashingFn;
-import org.apache.beam.sdk.io.common.IOTestPipelineOptions;
+import org.apache.beam.sdk.io.common.PostgresIOTestPipelineOptions;
import org.apache.beam.sdk.io.common.TestRow;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Combine;
@@ -75,9 +76,8 @@
@BeforeClass
public static void setup() throws SQLException {
- PipelineOptionsFactory.register(IOTestPipelineOptions.class);
- IOTestPipelineOptions options = TestPipeline.testingPipelineOptions()
- .as(IOTestPipelineOptions.class);
+ PostgresIOTestPipelineOptions options = readIOTestPipelineOptions(
+ PostgresIOTestPipelineOptions.class);
numberOfRows = options.getNumberOfRecords();
dataSource = DatabaseTestHelper.getPostgresDataSource(options);
@@ -109,11 +109,11 @@ public void testWriteThenRead() {
*/
private void runWrite() {
pipelineWrite.apply(GenerateSequence.from(0).to(numberOfRows))
- .apply(ParDo.of(new TestRow.DeterministicallyConstructTestRowFn()))
- .apply(JdbcIO.<TestRow>write()
-
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(dataSource))
- .withStatement(String.format("insert into %s values(?, ?)",
tableName))
- .withPreparedStatementSetter(new
JdbcTestHelper.PrepareStatementFromTestRow()));
+ .apply(ParDo.of(new TestRow.DeterministicallyConstructTestRowFn()))
+ .apply(JdbcIO.<TestRow>write()
+
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(dataSource))
+ .withStatement(String.format("insert into %s values(?, ?)", tableName))
+ .withPreparedStatementSetter(new
JdbcTestHelper.PrepareStatementFromTestRow()));
pipelineWrite.run().waitUntilFinish();
}
@@ -137,8 +137,8 @@ private void runWrite() {
* can use the natural ordering of that key.
*/
private void runRead() {
- PCollection<TestRow> namesAndIds =
- pipelineRead.apply(JdbcIO.<TestRow>read()
+ PCollection<TestRow> namesAndIds = pipelineRead
+ .apply(JdbcIO.<TestRow>read()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(dataSource))
.withQuery(String.format("select name,id from %s;", tableName))
.withRowMapper(new JdbcTestHelper.CreateTestRowOfNameAndId())
@@ -148,18 +148,18 @@ private void runRead() {
.isEqualTo((long) numberOfRows);
PCollection<String> consolidatedHashcode = namesAndIds
- .apply(ParDo.of(new TestRow.SelectNameFn()))
- .apply("Hash row contents", Combine.globally(new
HashingFn()).withoutDefaults());
+ .apply(ParDo.of(new TestRow.SelectNameFn()))
+ .apply("Hash row contents", Combine.globally(new
HashingFn()).withoutDefaults());
PAssert.that(consolidatedHashcode)
- .containsInAnyOrder(TestRow.getExpectedHashForRowCount(numberOfRows));
+ .containsInAnyOrder(TestRow.getExpectedHashForRowCount(numberOfRows));
PCollection<List<TestRow>> frontOfList =
namesAndIds.apply(Top.smallest(500));
Iterable<TestRow> expectedFrontOfList = TestRow.getExpectedValues(0, 500);
PAssert.thatSingletonIterable(frontOfList).containsInAnyOrder(expectedFrontOfList);
PCollection<List<TestRow>> backOfList =
namesAndIds.apply(Top.largest(500));
- Iterable<TestRow> expectedBackOfList =
- TestRow.getExpectedValues(numberOfRows - 500, numberOfRows);
+ Iterable<TestRow> expectedBackOfList = TestRow
+ .getExpectedValues(numberOfRows - 500, numberOfRows);
PAssert.thatSingletonIterable(backOfList).containsInAnyOrder(expectedBackOfList);
pipelineRead.run().waitUntilFinish();
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 110117)
Time Spent: 7h 20m (was: 7h 10m)
> Split IOTestPipelineOptions to multiple, test-specific files
> ------------------------------------------------------------
>
> Key: BEAM-4137
> URL: https://issues.apache.org/jira/browse/BEAM-4137
> Project: Beam
> Issue Type: Improvement
> Components: testing
> Reporter: Łukasz Gajowy
> Assignee: Łukasz Gajowy
> Priority: Minor
> Time Spent: 7h 20m
> Remaining Estimate: 0h
>
> Currently we have one big IOTestPipelineOptions interface that is used in
> many IOITs. It contains test specific options that should rather be located
> next to testing classes, not in a generic file. Let's split this.
> Additionally, besides separation of concerns, this will allow adding
> test-specific @Default and @Required annotations and validate the options
> better.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)