This is an automated email from the ASF dual-hosted git repository.
yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 99e7bbf348c Fix Postcommit Java IO Performance tests workflow (#33915)
99e7bbf348c is described below
commit 99e7bbf348c9e2903773e21cd063625f21e84238
Author: akashorabek <[email protected]>
AuthorDate: Fri Feb 7 21:36:10 2025 +0500
Fix Postcommit Java IO Performance tests workflow (#33915)
---
.../java/org/apache/beam/it/gcp/bigquery/BigQueryIOST.java | 6 +++---
.../org/apache/beam/it/gcp/bigquery/BigQueryStreamingLT.java | 11 +++++------
2 files changed, 8 insertions(+), 9 deletions(-)
diff --git
a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigquery/BigQueryIOST.java
b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigquery/BigQueryIOST.java
index ddb300d74f6..eaa6f75dc0f 100644
---
a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigquery/BigQueryIOST.java
+++
b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigquery/BigQueryIOST.java
@@ -44,7 +44,6 @@ import org.apache.beam.it.common.TestProperties;
import org.apache.beam.it.common.utils.ResourceManagerUtils;
import org.apache.beam.it.gcp.IOStressTestBase;
import
org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions;
-import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.gcp.bigquery.AvroWriteRequest;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
@@ -297,10 +296,11 @@ public final class BigQueryIOST extends IOStressTestBase {
.withSchema(schema)
.withCustomGcsTempLocation(ValueProvider.StaticValueProvider.of(tempLocation)));
+ String runnerV2Experiment = "use_runner_v2";
String experiments =
configuration.writeMethod.equals(STORAGE_API_AT_LEAST_ONCE_METHOD)
- ? GcpOptions.STREAMING_ENGINE_EXPERIMENT +
",streaming_mode_at_least_once"
- : GcpOptions.STREAMING_ENGINE_EXPERIMENT;
+ ? runnerV2Experiment + ",streaming_mode_at_least_once"
+ : runnerV2Experiment;
PipelineLauncher.LaunchConfig options =
PipelineLauncher.LaunchConfig.builder("write-bigquery")
diff --git
a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigquery/BigQueryStreamingLT.java
b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigquery/BigQueryStreamingLT.java
index 44685a2381f..e89fe1dc852 100644
---
a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigquery/BigQueryStreamingLT.java
+++
b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigquery/BigQueryStreamingLT.java
@@ -43,7 +43,6 @@ import org.apache.beam.it.common.PipelineOperator;
import org.apache.beam.it.common.TestProperties;
import org.apache.beam.it.gcp.IOLoadTestBase;
import org.apache.beam.runners.dataflow.DataflowRunner;
-import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions;
@@ -361,7 +360,7 @@ public class BigQueryStreamingLT extends IOLoadTestBase {
.setPipeline(storageApiPipeline)
.addParameter("runner", config.getRunner())
.addParameter("streaming", "true")
- .addParameter("experiments",
GcpOptions.STREAMING_ENGINE_EXPERIMENT)
+ .addParameter("experiments", "use_runner_v2")
.addParameter(
"maxNumWorkers",
TestProperties.getProperty("maxNumWorkers", "10",
TestProperties.Type.PROPERTY))
@@ -376,7 +375,7 @@ public class BigQueryStreamingLT extends IOLoadTestBase {
.setJobId(storageApiInfo.jobId())
.setProject(project)
.setRegion(region)
-
.setTimeoutAfter(java.time.Duration.ofMinutes(config.getMinutes() * 2L))
+
.setTimeoutAfter(java.time.Duration.ofMinutes(config.getMinutes() * 4L))
.setCheckAfter(java.time.Duration.ofSeconds(config.getMinutes() * 60 / 20))
.build());
// Check the initial launch didn't fail
@@ -493,7 +492,7 @@ public class BigQueryStreamingLT extends IOLoadTestBase {
"WITH \n"
+ "storage_api_table AS (SELECT %s FROM `%s`), \n"
+ "expected_table AS (SELECT %s FROM `%s`), \n"
- + "rows_mismatched AS (SELECT * FROM expected_table EXCEPT
DISTINCT SELECT * FROM storage_api_table) \n"
+ + "rows_mismatched AS (SELECT * FROM storage_api_table EXCEPT
DISTINCT SELECT * FROM expected_table) \n"
+ "SELECT COUNT(*) FROM rows_mismatched",
columnNames, destTable, columnNames, expectedTable);
@@ -501,7 +500,7 @@ public class BigQueryStreamingLT extends IOLoadTestBase {
TableRow queryResponse =
Iterables.getOnlyElement(
- BQ_CLIENT.queryUnflattened(checkCorrectnessQuery,
"google.com:clouddfe", true, true));
+ BQ_CLIENT.queryUnflattened(checkCorrectnessQuery, project, true,
true));
long result = Long.parseLong((String) queryResponse.get("f0_"));
LOG.info("Number of mismatched rows: {}", result);
@@ -522,7 +521,7 @@ public class BigQueryStreamingLT extends IOLoadTestBase {
TableRow queryResponse =
Iterables.getOnlyElement(
- BQ_CLIENT.queryUnflattened(checkDuplicationQuery,
"google.com:clouddfe", true, true));
+ BQ_CLIENT.queryUnflattened(checkDuplicationQuery, project, true,
true));
long actualCount = Long.parseLong((String)
queryResponse.get("actualCount"));
long expectedCount = Long.parseLong((String)
queryResponse.get("expectedCount"));
assertEquals(