[
https://issues.apache.org/jira/browse/BEAM-3456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16330176#comment-16330176
]
ASF GitHub Bot commented on BEAM-3456:
--------------------------------------
chamikaramj closed pull request #4392: [BEAM-3456] Enable jenkins and large
scale scenario in JDBC
URL: https://github.com/apache/beam/pull/4392
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java
b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java
index e7b475d4caa..b86020ec278 100644
---
a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java
+++
b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java
@@ -91,10 +91,10 @@
/* Options for test pipeline for file-based I/O in
'sdks/java/io/file-based-io-tests/'. */
@Description("Number records that will be written and read by the test")
- @Default.Long(100000)
- Long getNumberOfRecords();
+ @Default.Integer(100000)
+ Integer getNumberOfRecords();
- void setNumberOfRecords(Long count);
+ void setNumberOfRecords(Integer count);
@Description("Destination prefix for files generated by the test")
@Validation.Required
diff --git
a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/TestRow.java
b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/TestRow.java
index 5f0a2fb00b2..79a144d144d 100644
---
a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/TestRow.java
+++
b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/TestRow.java
@@ -95,7 +95,9 @@ public void processElement(ProcessContext c) {
* the name() for the rows generated from seeds in [0, n).
*/
private static final Map<Integer, String> EXPECTED_HASHES = ImmutableMap.of(
- 1000, "7d94d63a41164be058a9680002914358"
+ 1000, "7d94d63a41164be058a9680002914358",
+ 100_000, "c7cbddb319209e200f1c5eebef8fe960",
+ 5_000_000, "c44f8a5648cd9207c9c6f77395a998dc"
);
/**
diff --git
a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/avro/AvroIOIT.java
b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/avro/AvroIOIT.java
index be0d6df2eb7..07562f38ca3 100644
---
a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/avro/AvroIOIT.java
+++
b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/avro/AvroIOIT.java
@@ -75,7 +75,7 @@
+ "}");
private static String filenamePrefix;
- private static Long numberOfTextLines;
+ private static Integer numberOfTextLines;
@Rule
public TestPipeline pipeline = TestPipeline.create();
diff --git
a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/common/FileBasedIOITHelper.java
b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/common/FileBasedIOITHelper.java
index cf20d8e5954..40b04617d8a 100644
---
a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/common/FileBasedIOITHelper.java
+++
b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/common/FileBasedIOITHelper.java
@@ -55,11 +55,11 @@ public static String appendTimestampToPrefix(String
filenamePrefix) {
return String.format("%s_%s", filenamePrefix, new Date().getTime());
}
- public static String getExpectedHashForLineCount(Long lineCount) {
- Map<Long, String> expectedHashes = ImmutableMap.of(
- 100_000L, "4c8bb3b99dcc59459b20fefba400d446",
- 1_000_000L, "9796db06e7a7960f974d5a91164afff1",
- 100_000_000L, "6ce05f456e2fdc846ded2abd0ec1de95"
+ public static String getExpectedHashForLineCount(int lineCount) {
+ Map<Integer, String> expectedHashes = ImmutableMap.of(
+ 100_000, "4c8bb3b99dcc59459b20fefba400d446",
+ 1_000_000, "9796db06e7a7960f974d5a91164afff1",
+ 100_000_000, "6ce05f456e2fdc846ded2abd0ec1de95"
);
String hash = expectedHashes.get(lineCount);
diff --git
a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java
b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java
index 1a4ecccc0ef..b611a5746d3 100644
---
a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java
+++
b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java
@@ -23,7 +23,6 @@
import static
org.apache.beam.sdk.io.common.FileBasedIOITHelper.getExpectedHashForLineCount;
import static
org.apache.beam.sdk.io.common.FileBasedIOITHelper.readTestPipelineOptions;
-import java.text.ParseException;
import org.apache.beam.sdk.io.Compression;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.TextIO;
@@ -65,14 +64,14 @@
public class TextIOIT {
private static String filenamePrefix;
- private static Long numberOfTextLines;
+ private static Integer numberOfTextLines;
private static Compression compressionType;
@Rule
public TestPipeline pipeline = TestPipeline.create();
@BeforeClass
- public static void setup() throws ParseException {
+ public static void setup() {
IOTestPipelineOptions options = readTestPipelineOptions();
numberOfTextLines = options.getNumberOfRecords();
diff --git
a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/tfrecord/TFRecordIOIT.java
b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/tfrecord/TFRecordIOIT.java
index 3f08d76750c..2908c8c2b71 100644
---
a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/tfrecord/TFRecordIOIT.java
+++
b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/tfrecord/TFRecordIOIT.java
@@ -23,7 +23,6 @@
import static
org.apache.beam.sdk.io.common.FileBasedIOITHelper.getExpectedHashForLineCount;
import static
org.apache.beam.sdk.io.common.FileBasedIOITHelper.readTestPipelineOptions;
-import java.text.ParseException;
import org.apache.beam.sdk.io.Compression;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.TFRecordIO;
@@ -67,7 +66,7 @@
public class TFRecordIOIT {
private static String filenamePrefix;
- private static Long numberOfTextLines;
+ private static Integer numberOfTextLines;
private static Compression compressionType;
@Rule
@@ -77,7 +76,7 @@
public TestPipeline readPipeline = TestPipeline.create();
@BeforeClass
- public static void setup() throws ParseException {
+ public static void setup() {
IOTestPipelineOptions options = readTestPipelineOptions();
numberOfTextLines = options.getNumberOfRecords();
diff --git a/sdks/java/io/jdbc/pom.xml b/sdks/java/io/jdbc/pom.xml
index e6bb357a089..73fbc52590a 100644
--- a/sdks/java/io/jdbc/pom.xml
+++ b/sdks/java/io/jdbc/pom.xml
@@ -122,6 +122,7 @@
<executable>${python.interpreter.bin}</executable>
<arguments>
<argument>${pkbLocation}</argument>
+ <argument>-beam_it_timeout=1800</argument>
<argument>-benchmarks=beam_integration_benchmark</argument>
<argument>-beam_it_profile=io-it</argument>
<argument>-beam_location=${beamRootProjectDir}</argument>
@@ -204,6 +205,7 @@
<executable>${python.interpreter.bin}</executable>
<arguments>
<argument>${pkbLocation}</argument>
+ <argument>-beam_it_timeout=1800</argument>
<argument>-benchmarks=beam_integration_benchmark</argument>
<argument>-beam_it_profile=io-it</argument>
<argument>-beam_location=${beamRootProjectDir}</argument>
diff --git
a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java
b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java
index 32d6d9e80b4..941a77543d4 100644
--- a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java
+++ b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java
@@ -41,9 +41,6 @@
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.postgresql.ds.PGSimpleDataSource;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
/**
* A test of {@link org.apache.beam.sdk.io.jdbc.JdbcIO} on an independent
Postgres instance.
@@ -56,7 +53,8 @@
* "--postgresUsername=postgres",
* "--postgresDatabaseName=myfancydb",
* "--postgresPassword=mypass",
- * "--postgresSsl=false" ]'
+ * "--postgresSsl=false",
+ * "--numberOfRecords=1000" ]'
* </pre>
*
* <p>If you want to run this with a runner besides directrunner, there are
profiles for dataflow
@@ -65,8 +63,8 @@
*/
@RunWith(JUnit4.class)
public class JdbcIOIT {
- private static final Logger LOG = LoggerFactory.getLogger(JdbcIOIT.class);
- public static final int EXPECTED_ROW_COUNT = 1000;
+
+ private static int numberOfRows;
private static PGSimpleDataSource dataSource;
private static String tableName;
@@ -81,14 +79,14 @@ public static void setup() throws SQLException,
ParseException {
IOTestPipelineOptions options = TestPipeline.testingPipelineOptions()
.as(IOTestPipelineOptions.class);
+ numberOfRows = options.getNumberOfRecords();
dataSource = getDataSource(options);
tableName = JdbcTestHelper.getTableName("IT");
JdbcTestHelper.createDataTable(dataSource, tableName);
}
- private static PGSimpleDataSource getDataSource(IOTestPipelineOptions
options)
- throws SQLException {
+ private static PGSimpleDataSource getDataSource(IOTestPipelineOptions
options) {
PGSimpleDataSource dataSource = new PGSimpleDataSource();
dataSource.setDatabaseName(options.getPostgresDatabaseName());
@@ -124,7 +122,7 @@ public void testWriteThenRead() {
* the database.)
*/
private void runWrite() {
- pipelineWrite.apply(GenerateSequence.from(0).to((long) EXPECTED_ROW_COUNT))
+ pipelineWrite.apply(GenerateSequence.from(0).to(numberOfRows))
.apply(ParDo.of(new TestRow.DeterministicallyConstructTestRowFn()))
.apply(JdbcIO.<TestRow>write()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(dataSource))
@@ -162,13 +160,13 @@ private void runRead() {
PAssert.thatSingleton(
namesAndIds.apply("Count All", Count.<TestRow>globally()))
- .isEqualTo((long) EXPECTED_ROW_COUNT);
+ .isEqualTo((long) numberOfRows);
PCollection<String> consolidatedHashcode = namesAndIds
.apply(ParDo.of(new TestRow.SelectNameFn()))
.apply("Hash row contents", Combine.globally(new
HashingFn()).withoutDefaults());
PAssert.that(consolidatedHashcode)
-
.containsInAnyOrder(TestRow.getExpectedHashForRowCount(EXPECTED_ROW_COUNT));
+ .containsInAnyOrder(TestRow.getExpectedHashForRowCount(numberOfRows));
PCollection<List<TestRow>> frontOfList =
namesAndIds.apply(Top.<TestRow>smallest(500));
@@ -178,8 +176,7 @@ private void runRead() {
PCollection<List<TestRow>> backOfList =
namesAndIds.apply(Top.<TestRow>largest(500));
Iterable<TestRow> expectedBackOfList =
- TestRow.getExpectedValues(EXPECTED_ROW_COUNT - 500,
- EXPECTED_ROW_COUNT);
+ TestRow.getExpectedValues(numberOfRows - 500, numberOfRows);
PAssert.thatSingletonIterable(backOfList).containsInAnyOrder(expectedBackOfList);
pipelineRead.run().waitUntilFinish();
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Enable large scale JdbcIOIT Performance Test
> --------------------------------------------
>
> Key: BEAM-3456
> URL: https://issues.apache.org/jira/browse/BEAM-3456
> Project: Beam
> Issue Type: Bug
> Components: testing
> Reporter: Łukasz Gajowy
> Assignee: Łukasz Gajowy
> Priority: Major
>
> We should use numberOfRecords pipelineOption in Jdbc performance test and
> enable jenkins for it.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)