This is an automated email from the ASF dual-hosted git repository.
yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 7dfc0c03b1b Sync it framework (#28541)
7dfc0c03b1b is described below
commit 7dfc0c03b1b793d9f798dc2dd93b0f81547568b3
Author: Yi Hu <[email protected]>
AuthorDate: Wed Sep 20 11:45:37 2023 -0400
Sync it framework (#28541)
* Sync it framework
* spotless and checkstyle; exclude not-sync file
* change to json extension to bypass RAT check
---
.../it/cassandra/matchers/CassandraAsserts.java | 2 +-
.../apache/beam/it/common/utils/PipelineUtils.java | 21 ++++++++-
.../beam/it/common/utils/PipelineUtilsTest.java | 8 +++-
.../it/elasticsearch/ElasticsearchUtilsTest.java | 2 +-
.../java/org/apache/beam/it/gcp/LoadTestBase.java | 53 ++++++++++++++-------
.../it/gcp/bigquery/BigQueryResourceManager.java | 1 +
.../it/gcp/bigtable/BigtableResourceManager.java | 33 +++++++++++++
.../gcp/bigtable/BigtableResourceManagerUtils.java | 2 +-
.../it/gcp/dataflow/AbstractPipelineLauncher.java | 13 +++++-
.../it/gcp/dataflow/DefaultPipelineLauncher.java | 6 +--
.../beam/it/gcp/dataflow/DirectRunnerClient.java | 6 +--
.../beam/it/gcp/datagenerator/DataGenerator.java | 54 +++++++++++-----------
.../gcp/datastore/matchers/DatastoreAsserts.java | 3 +-
.../apache/beam/it/gcp/dlp/DlpResourceManager.java | 5 +-
.../apache/beam/it/gcp/kms/KMSResourceManager.java | 5 +-
.../beam/it/gcp/monitoring/MonitoringClient.java | 12 ++---
.../beam/it/gcp/pubsub/PubsubResourceManager.java | 45 ++++++++++++++----
.../src/main/resources/test-artifact.json | 1 +
.../apache/beam/it/gcp/bigquery/BigQueryIOLT.java | 18 ++++----
.../apache/beam/it/gcp/bigtable/BigTableIOLT.java | 22 ++++-----
.../gcp/bigtable/BigtableResourceManagerTest.java | 1 +
.../it/gcp/dataflow/ClassicTemplateClientTest.java | 14 +++++-
.../it/gcp/dataflow/FlexTemplateClientTest.java | 14 +++++-
.../apache/beam/it/gcp/storage/FileBasedIOLT.java | 14 +++---
.../it/gcp/storage/GcsResourceManagerTest.java | 2 +-
.../src/test/resources/test-artifact.txt | 1 -
.../apache/beam/it/jdbc/MSSQLResourceManager.java | 7 +--
.../apache/beam/it/jdbc/MySQLResourceManager.java | 4 +-
.../apache/beam/it/jdbc/OracleResourceManager.java | 8 ++--
.../beam/it/jdbc/PostgresResourceManager.java | 9 ++--
.../apache/beam/it/kafka/KafkaResourceManager.java | 11 +++--
.../java/org/apache/beam/it/kafka/KafkaIOLT.java | 4 +-
.../beam/it/mongodb/MongoDBResourceManager.java | 10 ++--
.../beam/it/mongodb/matchers/MongoDBAsserts.java | 2 +-
.../apache/beam/it/neo4j/Neo4jResourceManager.java | 11 +++--
.../beam/it/splunk/SplunkResourceManager.java | 10 ++--
.../TestContainerResourceManager.java | 15 +++---
.../beam/it/truthmatchers/LaunchInfoSubject.java | 2 +-
.../beam/it/truthmatchers/RecordsSubject.java | 2 +-
39 files changed, 303 insertions(+), 150 deletions(-)
diff --git
a/it/cassandra/src/main/java/org/apache/beam/it/cassandra/matchers/CassandraAsserts.java
b/it/cassandra/src/main/java/org/apache/beam/it/cassandra/matchers/CassandraAsserts.java
index 61f730bf357..6aecc6609cf 100644
---
a/it/cassandra/src/main/java/org/apache/beam/it/cassandra/matchers/CassandraAsserts.java
+++
b/it/cassandra/src/main/java/org/apache/beam/it/cassandra/matchers/CassandraAsserts.java
@@ -31,7 +31,7 @@ import org.apache.beam.it.truthmatchers.RecordsSubject;
public class CassandraAsserts {
/**
- * Convert Cassandra {@link Row} list to a list of maps.
+ * Convert Cassandra {@link com.datastax.oss.driver.api.core.cql.Row} list
to a list of maps.
*
* @param rows Rows to parse.
* @return List of maps to use in {@link RecordsSubject}.
diff --git
a/it/common/src/main/java/org/apache/beam/it/common/utils/PipelineUtils.java
b/it/common/src/main/java/org/apache/beam/it/common/utils/PipelineUtils.java
index c696457bbdd..d249d43d378 100644
--- a/it/common/src/main/java/org/apache/beam/it/common/utils/PipelineUtils.java
+++ b/it/common/src/main/java/org/apache/beam/it/common/utils/PipelineUtils.java
@@ -27,6 +27,7 @@ import org.apache.beam.sdk.PipelineRunner;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.CaseFormat;
+import org.apache.commons.lang3.RandomStringUtils;
/** Utilities to make working with Dataflow easier. */
public class PipelineUtils {
@@ -73,6 +74,15 @@ public class PipelineUtils {
}
}
+ /**
+ * Creates a job name. Method uses {@link #createJobName(String, int)}}
without a random suffix.
+ *
+ * @see #createJobName(String, int)
+ */
+ public static String createJobName(String prefix) {
+ return createJobName(prefix, 0);
+ }
+
/**
* Creates a job name.
*
@@ -83,17 +93,24 @@ public class PipelineUtils {
* same prefix are requested in a short period of time.
*
* @param prefix a prefix for the job
+ * @param randomChars if the string should contain random chars at the end,
to increase the
+ * likelihood of being unique.
* @return the prefix plus some way of identifying it separate from other
jobs with the same
* prefix
*/
- public static String createJobName(String prefix) {
+ public static String createJobName(String prefix, int randomChars) {
String convertedPrefix =
CaseFormat.UPPER_CAMEL.converterTo(CaseFormat.LOWER_HYPHEN).convert(prefix);
String formattedTimestamp =
DateTimeFormatter.ofPattern("yyyyMMddHHmmssSSS")
.withZone(ZoneId.of("UTC"))
.format(Instant.now());
- return String.format("%s-%s", convertedPrefix, formattedTimestamp);
+
+ String suffix = "";
+ if (randomChars > 0) {
+ suffix = "-" +
RandomStringUtils.randomAlphanumeric(randomChars).toLowerCase();
+ }
+ return String.format("%s-%s%s", convertedPrefix, formattedTimestamp,
suffix);
}
/** Get raw job name (without prefix) from a jobName generated by
createJobName. */
diff --git
a/it/common/src/test/java/org/apache/beam/it/common/utils/PipelineUtilsTest.java
b/it/common/src/test/java/org/apache/beam/it/common/utils/PipelineUtilsTest.java
index acf203b06e6..316283cdf7d 100644
---
a/it/common/src/test/java/org/apache/beam/it/common/utils/PipelineUtilsTest.java
+++
b/it/common/src/test/java/org/apache/beam/it/common/utils/PipelineUtilsTest.java
@@ -37,7 +37,13 @@ public class PipelineUtilsTest {
@Test
public void testCreateJobNameWithUppercase() {
-
assertThat(createJobName("testWithUpperCase")).matches("test-with-upper-case" +
"-\\d{17}");
+
assertThat(createJobName("testWithUpperCase")).matches("test-with-upper-case-\\d{17}");
+ }
+
+ @Test
+ public void testCreateJobNameWithUppercaseSuffix() {
+ assertThat(createJobName("testWithUpperCase", 8))
+ .matches("test-with-upper-case-\\d{17}-[a-z0-9]{8}");
}
@Test
diff --git
a/it/elasticsearch/src/test/java/org/apache/beam/it/elasticsearch/ElasticsearchUtilsTest.java
b/it/elasticsearch/src/test/java/org/apache/beam/it/elasticsearch/ElasticsearchUtilsTest.java
index eb250a1c5f8..61d6b5d57c2 100644
---
a/it/elasticsearch/src/test/java/org/apache/beam/it/elasticsearch/ElasticsearchUtilsTest.java
+++
b/it/elasticsearch/src/test/java/org/apache/beam/it/elasticsearch/ElasticsearchUtilsTest.java
@@ -34,7 +34,7 @@ public class ElasticsearchUtilsTest {
@Test
public void testGenerateIndexNameShouldReplaceForwardSlash() {
String testBaseString = "Test/DB/Name";
- String actual = generateIndexName(testBaseString);
+ String actual = ElasticsearchUtils.generateIndexName(testBaseString);
assertThat(actual).matches("test-db-name-\\d{8}-\\d{6}-\\d{6}");
}
diff --git
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/LoadTestBase.java
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/LoadTestBase.java
index f6e359fed96..d9c1990ef07 100644
---
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/LoadTestBase.java
+++
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/LoadTestBase.java
@@ -18,6 +18,7 @@
package org.apache.beam.it.gcp;
import static org.apache.beam.it.common.logging.LogStrings.formatForLogging;
+import static
org.apache.beam.it.gcp.dataflow.AbstractPipelineLauncher.RUNNER_V2;
import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.core.FixedCredentialsProvider;
@@ -49,6 +50,7 @@ import org.apache.beam.it.gcp.monitoring.MonitoringClient;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
import org.junit.After;
import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.rules.TestRule;
import org.junit.rules.TestWatcher;
@@ -107,11 +109,14 @@ public abstract class LoadTestBase {
}
};
- @Before
- @SuppressFBWarnings("ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD")
- public void setUp() throws IOException {
+ @BeforeClass
+ public static void setUpClass() {
project = TestProperties.project();
region = TestProperties.region();
+ }
+
+ @Before
+ public void setUp() throws IOException {
monitoringClient = MonitoringClient.builder(CREDENTIALS_PROVIDER).build();
pipelineLauncher = launcher();
pipelineOperator = new PipelineOperator(pipelineLauncher);
@@ -239,7 +244,7 @@ public abstract class LoadTestBase {
metrics.put("EstimatedDataProcessedGB", dataProcessed / 1e9d);
}
metrics.putAll(getCpuUtilizationMetrics(launchInfo.jobId(),
workerTimeInterval));
- metrics.putAll(getThroughputMetrics(launchInfo.jobId(), config,
workerTimeInterval));
+ metrics.putAll(getThroughputMetrics(launchInfo, config,
workerTimeInterval));
}
/**
@@ -349,25 +354,30 @@ public abstract class LoadTestBase {
/**
* Computes throughput metrics of the given pcollection in dataflow job.
*
- * @param jobId dataflow job id
+ * @param jobInfo dataflow job LaunchInfo
* @param config the {@class MetricsConfiguration}
* @param timeInterval interval for the monitoring query
* @return throughput metrics of the pcollection
*/
protected Map<String, Double> getThroughputMetrics(
- String jobId, MetricsConfiguration config, TimeInterval timeInterval) {
+ LaunchInfo jobInfo, MetricsConfiguration config, TimeInterval
timeInterval) {
+ String jobId = jobInfo.jobId();
+ String iColl =
+ RUNNER_V2.equals(jobInfo.runner())
+ ? config.inputPCollectionV2()
+ : config.inputPCollection();
+ String oColl =
+ RUNNER_V2.equals(jobInfo.runner())
+ ? config.outputPCollectionV2()
+ : config.outputPCollection();
List<Double> inputThroughputBytesPerSec =
- monitoringClient.getThroughputBytesPerSecond(
- project, jobId, config.inputPCollection(), timeInterval);
+ monitoringClient.getThroughputBytesPerSecond(project, jobId, iColl,
timeInterval);
List<Double> inputThroughputElementsPerSec =
- monitoringClient.getThroughputElementsPerSecond(
- project, jobId, config.inputPCollection(), timeInterval);
+ monitoringClient.getThroughputElementsPerSecond(project, jobId, iColl,
timeInterval);
List<Double> outputThroughputBytesPerSec =
- monitoringClient.getThroughputBytesPerSecond(
- project, jobId, config.outputPCollection(), timeInterval);
+ monitoringClient.getThroughputBytesPerSecond(project, jobId, oColl,
timeInterval);
List<Double> outputThroughputElementsPerSec =
- monitoringClient.getThroughputElementsPerSecond(
- project, jobId, config.outputPCollection(), timeInterval);
+ monitoringClient.getThroughputElementsPerSecond(project, jobId, oColl,
timeInterval);
return getThroughputMetrics(
inputThroughputBytesPerSec,
inputThroughputElementsPerSec,
@@ -495,22 +505,31 @@ public abstract class LoadTestBase {
*/
public abstract @Nullable String inputPCollection();
+ /** Input PCollection name under Dataflow runner v2. */
+ public abstract @Nullable String inputPCollectionV2();
+
/**
* Input PCollection of the Dataflow job to query additional metrics. If
not provided, the
* metrics for inputPCollection will not be calculated.
*/
public abstract @Nullable String outputPCollection();
- public static Builder builder() {
+ public abstract @Nullable String outputPCollectionV2();
+
+ public static MetricsConfiguration.Builder builder() {
return new AutoValue_LoadTestBase_MetricsConfiguration.Builder();
}
@AutoValue.Builder
public abstract static class Builder {
- public abstract Builder setInputPCollection(@Nullable String value);
+ public abstract MetricsConfiguration.Builder
setInputPCollection(@Nullable String value);
+
+ public abstract MetricsConfiguration.Builder
setInputPCollectionV2(@Nullable String value);
+
+ public abstract MetricsConfiguration.Builder
setOutputPCollection(@Nullable String value);
- public abstract Builder setOutputPCollection(@Nullable String value);
+ public abstract MetricsConfiguration.Builder
setOutputPCollectionV2(@Nullable String value);
public abstract MetricsConfiguration build();
}
diff --git
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/bigquery/BigQueryResourceManager.java
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/bigquery/BigQueryResourceManager.java
index 80bf5cfd938..d6d348f524b 100644
---
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/bigquery/BigQueryResourceManager.java
+++
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/bigquery/BigQueryResourceManager.java
@@ -461,6 +461,7 @@ public final class BigQueryResourceManager implements
ResourceManager {
projectId, dataset.getDatasetId().getDataset(),
table.getTableId().getTable()));
}
bigQuery.delete(dataset.getDatasetId());
+ dataset = null;
}
} catch (Exception e) {
throw new BigQueryResourceManagerException("Failed to delete
resources.", e);
diff --git
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/bigtable/BigtableResourceManager.java
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/bigtable/BigtableResourceManager.java
index 1e6750cc81e..71388022928 100644
---
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/bigtable/BigtableResourceManager.java
+++
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/bigtable/BigtableResourceManager.java
@@ -33,6 +33,7 @@ import com.google.cloud.bigtable.admin.v2.models.AppProfile;
import
com.google.cloud.bigtable.admin.v2.models.AppProfile.MultiClusterRoutingPolicy;
import com.google.cloud.bigtable.admin.v2.models.AppProfile.RoutingPolicy;
import
com.google.cloud.bigtable.admin.v2.models.AppProfile.SingleClusterRoutingPolicy;
+import com.google.cloud.bigtable.admin.v2.models.Cluster;
import com.google.cloud.bigtable.admin.v2.models.CreateAppProfileRequest;
import com.google.cloud.bigtable.admin.v2.models.CreateInstanceRequest;
import com.google.cloud.bigtable.admin.v2.models.CreateTableRequest;
@@ -54,6 +55,8 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
import javax.annotation.Nullable;
import org.apache.beam.it.common.ResourceManager;
import org.apache.commons.lang3.StringUtils;
@@ -93,6 +96,8 @@ public class BigtableResourceManager implements
ResourceManager {
private final Set<String> cdcEnabledTables;
private boolean hasInstance;
+ private Iterable<BigtableResourceManagerCluster> clusters;
+
private final boolean usingStaticInstance;
private BigtableResourceManager(Builder builder) throws IOException {
@@ -111,6 +116,7 @@ public class BigtableResourceManager implements
ResourceManager {
this.createdTables = new ArrayList<>();
this.createdAppProfiles = new ArrayList<>();
this.cdcEnabledTables = new HashSet<>();
+ this.clusters = new ArrayList<>();
// Check if RM was configured to use static Bigtable instance.
if (builder.useStaticInstance) {
@@ -223,6 +229,7 @@ public class BigtableResourceManager implements
ResourceManager {
"Failed to create instance " + instanceId + ".", e);
}
hasInstance = true;
+ this.clusters = clusters;
LOG.info("Successfully created instance {}.", instanceId);
}
@@ -544,6 +551,32 @@ public class BigtableResourceManager implements
ResourceManager {
return tableRows;
}
+ /** Get all the cluster names of the current instance. */
+ public List<String> getClusterNames() {
+ return StreamSupport.stream(getClusters().spliterator(), false)
+ .map(BigtableResourceManagerCluster::clusterId)
+ .collect(Collectors.toList());
+ }
+
+ private Iterable<BigtableResourceManagerCluster> getClusters() {
+ if (usingStaticInstance && this.clusters == null) {
+ try (BigtableInstanceAdminClient instanceAdminClient =
+ bigtableResourceManagerClientFactory.bigtableInstanceAdminClient()) {
+ List<BigtableResourceManagerCluster> managedClusters = new
ArrayList<>();
+ for (Cluster cluster : instanceAdminClient.listClusters(instanceId)) {
+ managedClusters.add(
+ BigtableResourceManagerCluster.create(
+ cluster.getId(),
+ cluster.getZone(),
+ cluster.getServeNodes(),
+ cluster.getStorageType()));
+ }
+ this.clusters = managedClusters;
+ }
+ }
+ return this.clusters;
+ }
+
/**
* Deletes all created resources (instance and tables) and cleans up all
Bigtable clients, making
* the manager object unusable.
diff --git
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/bigtable/BigtableResourceManagerUtils.java
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/bigtable/BigtableResourceManagerUtils.java
index eb2323e5297..a893493d766 100644
---
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/bigtable/BigtableResourceManagerUtils.java
+++
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/bigtable/BigtableResourceManagerUtils.java
@@ -34,7 +34,7 @@ public final class BigtableResourceManagerUtils {
private static final Pattern ILLEGAL_INSTANCE_ID_CHARS =
Pattern.compile("[^a-z0-9-]");
private static final String REPLACE_INSTANCE_ID_CHAR = "-";
private static final int MIN_TABLE_ID_LENGTH = 1;
- private static final int MAX_TABLE_ID_LENGTH = 30;
+ private static final int MAX_TABLE_ID_LENGTH = 40;
private static final Pattern ILLEGAL_TABLE_CHARS =
Pattern.compile("[^a-zA-Z0-9-_.]");
private static final String REPLACE_TABLE_ID_CHAR = "-";
diff --git
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/dataflow/AbstractPipelineLauncher.java
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/dataflow/AbstractPipelineLauncher.java
index 08688d88b10..b5c9535953b 100644
---
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/dataflow/AbstractPipelineLauncher.java
+++
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/dataflow/AbstractPipelineLauncher.java
@@ -58,6 +58,11 @@ public abstract class AbstractPipelineLauncher implements
PipelineLauncher {
private static final Logger LOG =
LoggerFactory.getLogger(AbstractPipelineLauncher.class);
private static final Pattern CURRENT_METRICS =
Pattern.compile(".*Current.*");
+ public static final String LEGACY_RUNNER = "Dataflow Legacy Runner";
+ public static final String RUNNER_V2 = "Dataflow Runner V2";
+ public static final String PARAM_RUNNER = "runner";
+ public static final String PARAM_JOB_TYPE = "jobType";
+ public static final String PARAM_JOB_ID = "jobId";
protected final List<String> launchedJobs = new ArrayList<>();
@@ -244,12 +249,12 @@ public abstract class AbstractPipelineLauncher implements
PipelineLauncher {
*/
protected LaunchInfo.Builder getJobInfoBuilder(LaunchConfig options,
JobState state, Job job) {
Map<String, String> labels = job.getLabels();
- String runner = "Dataflow Legacy Runner";
+ String runner = LEGACY_RUNNER;
Environment environment = job.getEnvironment();
if (environment != null
&& environment.getExperiments() != null
&& environment.getExperiments().contains("use_runner_v2")) {
- runner = "Dataflow Runner V2";
+ runner = RUNNER_V2;
}
LaunchInfo.Builder builder =
LaunchInfo.builder()
@@ -266,6 +271,10 @@ public abstract class AbstractPipelineLauncher implements
PipelineLauncher {
// tests
Map<String, String> parameters = new HashMap<>(options.parameters());
options.environment().forEach((key, val) -> parameters.put(key,
val.toString()));
+ // attach basic job info to parameters so that these are exported for load
tests
+ parameters.put(PARAM_RUNNER, runner);
+ parameters.put(PARAM_JOB_TYPE, job.getType());
+ parameters.put(PARAM_JOB_ID, job.getId());
builder.setParameters(ImmutableMap.copyOf(parameters));
if (labels != null && !labels.isEmpty()) {
// template job
diff --git
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/dataflow/DefaultPipelineLauncher.java
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/dataflow/DefaultPipelineLauncher.java
index 7918dd6227d..ad2dcafc007 100644
---
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/dataflow/DefaultPipelineLauncher.java
+++
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/dataflow/DefaultPipelineLauncher.java
@@ -99,7 +99,7 @@ public class DefaultPipelineLauncher extends
AbstractPipelineLauncher {
.put(PipelineResult.State.UNRECOGNIZED, JobState.UNKNOWN)
.build();
- private DefaultPipelineLauncher(Builder builder) {
+ private DefaultPipelineLauncher(DefaultPipelineLauncher.Builder builder) {
super(
new Dataflow(
Utils.getDefaultTransport(),
@@ -109,8 +109,8 @@ public class DefaultPipelineLauncher extends
AbstractPipelineLauncher {
: new HttpCredentialsAdapter(builder.getCredentials())));
}
- public static Builder builder(Credentials credentials) {
- return new Builder(credentials);
+ public static DefaultPipelineLauncher.Builder builder(Credentials
credentials) {
+ return new DefaultPipelineLauncher.Builder(credentials);
}
@Override
diff --git
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/dataflow/DirectRunnerClient.java
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/dataflow/DirectRunnerClient.java
index 8017009ff37..57f8ad40c1b 100644
---
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/dataflow/DirectRunnerClient.java
+++
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/dataflow/DirectRunnerClient.java
@@ -53,8 +53,8 @@ public class DirectRunnerClient implements PipelineLauncher {
this.mainClass = builder.getMainClass();
}
- public static Builder builder(Class<?> mainClass) {
- return new Builder(mainClass);
+ public static DirectRunnerClient.Builder builder(Class<?> mainClass) {
+ return new DirectRunnerClient.Builder(mainClass);
}
@Override
@@ -172,7 +172,7 @@ public class DirectRunnerClient implements PipelineLauncher
{
return mainClass;
}
- public Builder setCredentials(Credentials value) {
+ public DirectRunnerClient.Builder setCredentials(Credentials value) {
credentials = value;
return this;
}
diff --git
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/datagenerator/DataGenerator.java
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/datagenerator/DataGenerator.java
index 99016b5dd3a..832a75defd9 100644
---
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/datagenerator/DataGenerator.java
+++
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/datagenerator/DataGenerator.java
@@ -61,14 +61,16 @@ public class DataGenerator {
.build();
}
- public static Builder builderWithSchemaLocation(String testName, String
schemaLocation) {
- return new Builder(testName + "-data-generator")
+ public static DataGenerator.Builder builderWithSchemaLocation(
+ String testName, String schemaLocation) {
+ return new DataGenerator.Builder(testName + "-data-generator")
.setSchemaLocation(schemaLocation)
.setAutoscalingAlgorithm(AutoscalingAlgorithmType.THROUGHPUT_BASED);
}
- public static Builder builderWithSchemaTemplate(String testName, String
schemaTemplate) {
- return new Builder(testName + "-data-generator")
+ public static DataGenerator.Builder builderWithSchemaTemplate(
+ String testName, String schemaTemplate) {
+ return new DataGenerator.Builder(testName + "-data-generator")
.setSchemaTemplate(schemaTemplate)
.setAutoscalingAlgorithm(AutoscalingAlgorithmType.THROUGHPUT_BASED);
}
@@ -129,27 +131,27 @@ public class DataGenerator {
return parameters;
}
- public Builder setSchemaTemplate(String value) {
+ public DataGenerator.Builder setSchemaTemplate(String value) {
parameters.put("schemaTemplate", value);
return this;
}
- public Builder setSchemaLocation(String value) {
+ public DataGenerator.Builder setSchemaLocation(String value) {
parameters.put("schemaLocation", value);
return this;
}
- public Builder setMessagesLimit(String value) {
+ public DataGenerator.Builder setMessagesLimit(String value) {
parameters.put(MESSAGES_LIMIT, value);
return this;
}
- public Builder setQPS(String value) {
+ public DataGenerator.Builder setQPS(String value) {
parameters.put("qps", value);
return this;
}
- public Builder setSinkType(String value) {
+ public DataGenerator.Builder setSinkType(String value) {
parameters.put("sinkType", value);
return this;
}
@@ -164,87 +166,87 @@ public class DataGenerator {
return this;
}
- public Builder setMaxNumWorkers(String value) {
+ public DataGenerator.Builder setMaxNumWorkers(String value) {
parameters.put("maxNumWorkers", value);
return this;
}
- public Builder setAutoscalingAlgorithm(AutoscalingAlgorithmType value) {
+ public DataGenerator.Builder
setAutoscalingAlgorithm(AutoscalingAlgorithmType value) {
parameters.put("autoscalingAlgorithm", value.toString());
return this;
}
- public Builder setOutputDirectory(String value) {
+ public DataGenerator.Builder setOutputDirectory(String value) {
parameters.put("outputDirectory", value);
return this;
}
- public Builder setOutputType(String value) {
+ public DataGenerator.Builder setOutputType(String value) {
parameters.put("outputType", value);
return this;
}
- public Builder setNumShards(String value) {
+ public DataGenerator.Builder setNumShards(String value) {
parameters.put("numShards", value);
return this;
}
- public Builder setAvroSchemaLocation(String value) {
+ public DataGenerator.Builder setAvroSchemaLocation(String value) {
parameters.put("avroSchemaLocation", value);
return this;
}
- public Builder setTopic(String value) {
+ public DataGenerator.Builder setTopic(String value) {
parameters.put("topic", value);
return this;
}
- public Builder setProjectId(String value) {
+ public DataGenerator.Builder setProjectId(String value) {
parameters.put("projectId", value);
return this;
}
- public Builder setSpannerInstanceName(String value) {
+ public DataGenerator.Builder setSpannerInstanceName(String value) {
parameters.put("spannerInstanceName", value);
return this;
}
- public Builder setSpannerDatabaseName(String value) {
+ public DataGenerator.Builder setSpannerDatabaseName(String value) {
parameters.put("spannerDatabaseName", value);
return this;
}
- public Builder setSpannerTableName(String value) {
+ public DataGenerator.Builder setSpannerTableName(String value) {
parameters.put("spannerTableName", value);
return this;
}
- public Builder setDriverClassName(String value) {
+ public DataGenerator.Builder setDriverClassName(String value) {
parameters.put("driverClassName", value);
return this;
}
- public Builder setConnectionUrl(String value) {
+ public DataGenerator.Builder setConnectionUrl(String value) {
parameters.put("connectionUrl", value);
return this;
}
- public Builder setUsername(String value) {
+ public DataGenerator.Builder setUsername(String value) {
parameters.put("username", value);
return this;
}
- public Builder setPassword(String value) {
+ public DataGenerator.Builder setPassword(String value) {
parameters.put("password", value);
return this;
}
- public Builder setConnectionProperties(String value) {
+ public DataGenerator.Builder setConnectionProperties(String value) {
parameters.put("connectionProperties", value);
return this;
}
- public Builder setStatement(String value) {
+ public DataGenerator.Builder setStatement(String value) {
parameters.put("statement", value);
return this;
}
diff --git
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/datastore/matchers/DatastoreAsserts.java
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/datastore/matchers/DatastoreAsserts.java
index ef67a5a5c4f..78fa7543150 100644
---
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/datastore/matchers/DatastoreAsserts.java
+++
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/datastore/matchers/DatastoreAsserts.java
@@ -61,7 +61,8 @@ public class DatastoreAsserts {
/**
* Creates a {@link RecordsSubject} to assert information within a list of
records.
*
- * @param results Records in Datastore {@link Entity} format to use in the
comparison.
+ * @param results Records in Datastore {@link
com.google.cloud.datastore.Entity} format to use in
+ * the comparison.
* @return Truth subject to chain assertions.
*/
public static RecordsSubject assertThatDatastoreRecords(Collection<Entity>
results) {
diff --git
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/dlp/DlpResourceManager.java
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/dlp/DlpResourceManager.java
index f59794af3e1..de818a1bbff 100644
---
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/dlp/DlpResourceManager.java
+++
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/dlp/DlpResourceManager.java
@@ -113,8 +113,9 @@ public class DlpResourceManager implements ResourceManager {
* @param project the GCP project ID
* @return a new instance of Builder
*/
- public static Builder builder(String project, CredentialsProvider
credentialsProvider) {
- return new Builder(project, credentialsProvider);
+ public static DlpResourceManager.Builder builder(
+ String project, CredentialsProvider credentialsProvider) {
+ return new DlpResourceManager.Builder(project, credentialsProvider);
}
/** A builder class for creating instances of {@link DlpResourceManager}. */
diff --git
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/kms/KMSResourceManager.java
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/kms/KMSResourceManager.java
index 7e1a403c735..2cad6d0b9fa 100644
---
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/kms/KMSResourceManager.java
+++
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/kms/KMSResourceManager.java
@@ -72,8 +72,9 @@ public class KMSResourceManager implements ResourceManager {
this.keyRing = null;
}
- public static Builder builder(String projectId, CredentialsProvider
credentialsProvider) {
- return new Builder(projectId, credentialsProvider);
+ public static KMSResourceManager.Builder builder(
+ String projectId, CredentialsProvider credentialsProvider) {
+ return new KMSResourceManager.Builder(projectId, credentialsProvider);
}
/**
diff --git
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/monitoring/MonitoringClient.java
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/monitoring/MonitoringClient.java
index 0fc5614a363..06591ea4fe0 100644
---
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/monitoring/MonitoringClient.java
+++
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/monitoring/MonitoringClient.java
@@ -150,8 +150,8 @@ public final class MonitoringClient {
Aggregation aggregation =
Aggregation.newBuilder()
.setAlignmentPeriod(Duration.newBuilder().setSeconds(60).build())
- .setPerSeriesAligner(Aligner.ALIGN_MEAN)
- .setCrossSeriesReducer(Reducer.REDUCE_MEAN)
+ .setPerSeriesAligner(Aggregation.Aligner.ALIGN_MEAN)
+ .setCrossSeriesReducer(Aggregation.Reducer.REDUCE_MEAN)
.addGroupByFields("resource.instance_id")
.build();
ListTimeSeriesRequest request =
@@ -188,7 +188,7 @@ public final class MonitoringClient {
Aggregation aggregation =
Aggregation.newBuilder()
.setAlignmentPeriod(Duration.newBuilder().setSeconds(60).build())
- .setPerSeriesAligner(Aligner.ALIGN_MEAN)
+ .setPerSeriesAligner(Aggregation.Aligner.ALIGN_MEAN)
.setCrossSeriesReducer(Reducer.REDUCE_MAX)
.build();
ListTimeSeriesRequest request =
@@ -225,7 +225,7 @@ public final class MonitoringClient {
Aggregation aggregation =
Aggregation.newBuilder()
.setAlignmentPeriod(Duration.newBuilder().setSeconds(60).build())
- .setPerSeriesAligner(Aligner.ALIGN_MEAN)
+ .setPerSeriesAligner(Aggregation.Aligner.ALIGN_MEAN)
.setCrossSeriesReducer(Reducer.REDUCE_MAX)
.build();
ListTimeSeriesRequest request =
@@ -269,7 +269,7 @@ public final class MonitoringClient {
Aggregation aggregation =
Aggregation.newBuilder()
.setAlignmentPeriod(Duration.newBuilder().setSeconds(60).build())
- .setPerSeriesAligner(Aligner.ALIGN_RATE)
+ .setPerSeriesAligner(Aggregation.Aligner.ALIGN_RATE)
.build();
ListTimeSeriesRequest request =
ListTimeSeriesRequest.newBuilder()
@@ -312,7 +312,7 @@ public final class MonitoringClient {
Aggregation aggregation =
Aggregation.newBuilder()
.setAlignmentPeriod(Duration.newBuilder().setSeconds(60).build())
- .setPerSeriesAligner(Aligner.ALIGN_RATE)
+ .setPerSeriesAligner(Aggregation.Aligner.ALIGN_RATE)
.build();
ListTimeSeriesRequest request =
ListTimeSeriesRequest.newBuilder()
diff --git
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/pubsub/PubsubResourceManager.java
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/pubsub/PubsubResourceManager.java
index 3a684d34c04..738620c15b7 100644
---
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/pubsub/PubsubResourceManager.java
+++
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/pubsub/PubsubResourceManager.java
@@ -20,6 +20,7 @@ package org.apache.beam.it.gcp.pubsub;
import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
import com.google.api.gax.core.CredentialsProvider;
+import com.google.api.gax.rpc.DeadlineExceededException;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.cloud.pubsub.v1.SchemaServiceClient;
import com.google.cloud.pubsub.v1.SchemaServiceSettings;
@@ -42,12 +43,16 @@ import com.google.pubsub.v1.SubscriptionName;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;
import com.google.pubsub.v1.UpdateTopicRequest;
+import dev.failsafe.Failsafe;
+import dev.failsafe.RetryPolicy;
import java.io.IOException;
+import java.time.Duration;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.beam.it.common.ResourceManager;
+import org.apache.beam.it.common.utils.ExceptionUtils;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.slf4j.Logger;
@@ -66,6 +71,12 @@ public final class PubsubResourceManager implements
ResourceManager {
private static final int DEFAULT_ACK_DEADLINE_SECONDS = 600;
private static final String RESOURCE_NAME_SEPARATOR = "-";
+ // Retry settings for client operations
+ private static final int FAILSAFE_MAX_RETRIES = 5;
+ private static final Duration FAILSAFE_RETRY_DELAY = Duration.ofSeconds(10);
+ private static final Duration FAILSAFE_RETRY_MAX_DELAY =
Duration.ofSeconds(60);
+ private static final double FAILSAFE_RETRY_JITTER = 0.1;
+
private final String testId;
private final String projectId;
private final PubsubPublisherFactory publisherFactory;
@@ -184,11 +195,14 @@ public final class PubsubResourceManager implements
ResourceManager {
LOG.info("Creating subscription '{}' for topic '{}'", subscriptionName,
topicName);
Subscription subscription =
- subscriptionAdminClient.createSubscription(
- getSubscriptionName(subscriptionName),
- topicName,
- PushConfig.getDefaultInstance(),
- DEFAULT_ACK_DEADLINE_SECONDS);
+ Failsafe.with(retryOnDeadlineExceeded())
+ .get(
+ () ->
+ subscriptionAdminClient.createSubscription(
+ getSubscriptionName(subscriptionName),
+ topicName,
+ PushConfig.getDefaultInstance(),
+ DEFAULT_ACK_DEADLINE_SECONDS));
SubscriptionName reference = PubsubUtils.toSubscriptionName(subscription);
createdSubscriptions.add(getSubscriptionName(subscriptionName));
@@ -299,17 +313,19 @@ public final class PubsubResourceManager implements
ResourceManager {
try {
for (SubscriptionName subscription : createdSubscriptions) {
LOG.info("Deleting subscription '{}'", subscription);
- subscriptionAdminClient.deleteSubscription(subscription);
+ Failsafe.with(retryOnDeadlineExceeded())
+ .run(() ->
subscriptionAdminClient.deleteSubscription(subscription));
}
for (TopicName topic : createdTopics) {
LOG.info("Deleting topic '{}'", topic);
- topicAdminClient.deleteTopic(topic);
+ Failsafe.with(retryOnDeadlineExceeded()).run(() ->
topicAdminClient.deleteTopic(topic));
}
for (SchemaName schemaName : createdSchemas) {
LOG.info("Deleting schema '{}'", schemaName);
- schemaServiceClient.deleteSchema(schemaName);
+ Failsafe.with(retryOnDeadlineExceeded())
+ .run(() -> schemaServiceClient.deleteSchema(schemaName));
}
} finally {
subscriptionAdminClient.close();
@@ -342,7 +358,8 @@ public final class PubsubResourceManager implements
ResourceManager {
private TopicName createTopicInternal(TopicName topicName) {
LOG.info("Creating topic '{}'...", topicName.toString());
- Topic topic = topicAdminClient.createTopic(topicName);
+ Topic topic =
+ Failsafe.with(retryOnDeadlineExceeded()).get(() ->
topicAdminClient.createTopic(topicName));
TopicName reference = PubsubUtils.toTopicName(topic);
createdTopics.add(reference);
@@ -355,6 +372,16 @@ public final class PubsubResourceManager implements
ResourceManager {
return topicAdminClient.isShutdown() ||
subscriptionAdminClient.isShutdown();
}
+ private static <T> RetryPolicy<T> retryOnDeadlineExceeded() {
+ return RetryPolicy.<T>builder()
+ .handleIf(
+ exception -> ExceptionUtils.containsType(exception,
DeadlineExceededException.class))
+ .withMaxRetries(FAILSAFE_MAX_RETRIES)
+ .withBackoff(FAILSAFE_RETRY_DELAY, FAILSAFE_RETRY_MAX_DELAY)
+ .withJitter(FAILSAFE_RETRY_JITTER)
+ .build();
+ }
+
/** Builder for {@link PubsubResourceManager}. */
public static final class Builder {
diff --git a/it/google-cloud-platform/src/main/resources/test-artifact.json
b/it/google-cloud-platform/src/main/resources/test-artifact.json
new file mode 100644
index 00000000000..551c80d14a6
--- /dev/null
+++ b/it/google-cloud-platform/src/main/resources/test-artifact.json
@@ -0,0 +1 @@
+["This is a test artifact."]
\ No newline at end of file
diff --git
a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigquery/BigQueryIOLT.java
b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigquery/BigQueryIOLT.java
index 03f6e8abfd4..a9ae6814277 100644
---
a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigquery/BigQueryIOLT.java
+++
b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigquery/BigQueryIOLT.java
@@ -99,12 +99,8 @@ public final class BigQueryIOLT extends IOLoadTestBase {
private static final String READ_ELEMENT_METRIC_NAME = "read_count";
private Configuration configuration;
private String tempLocation;
-
private TableSchema schema;
- private static final String READ_PCOLLECTION = "Counting element.out0";
- private static final String WRITE_PCOLLECTION = "Map records.out0";
-
@Rule public TestPipeline writePipeline = TestPipeline.create();
@Rule public TestPipeline readPipeline = TestPipeline.create();
@@ -268,7 +264,7 @@ public final class BigQueryIOLT extends IOLoadTestBase {
.withCustomGcsTempLocation(ValueProvider.StaticValueProvider.of(tempLocation)));
PipelineLauncher.LaunchConfig options =
- PipelineLauncher.LaunchConfig.builder("test-bigquery-write")
+ PipelineLauncher.LaunchConfig.builder("write-bigquery")
.setSdk(PipelineLauncher.Sdk.JAVA)
.setPipeline(writePipeline)
.addParameter("runner", configuration.runner)
@@ -284,7 +280,10 @@ public final class BigQueryIOLT extends IOLoadTestBase {
// export metrics
MetricsConfiguration metricsConfig =
-
MetricsConfiguration.builder().setInputPCollection(WRITE_PCOLLECTION).build();
+ MetricsConfiguration.builder()
+ .setInputPCollection("Map records.out0")
+ .setInputPCollectionV2("Map records/ParMultiDo(MapKVToV).out0")
+ .build();
try {
exportMetricsToBigQuery(launchInfo, getMetrics(launchInfo,
metricsConfig));
} catch (ParseException | InterruptedException e) {
@@ -301,7 +300,7 @@ public final class BigQueryIOLT extends IOLoadTestBase {
.apply("Counting element", ParDo.of(new
CountingFn<>(READ_ELEMENT_METRIC_NAME)));
PipelineLauncher.LaunchConfig options =
- PipelineLauncher.LaunchConfig.builder("test-bigquery-read")
+ PipelineLauncher.LaunchConfig.builder("read-bigquery")
.setSdk(PipelineLauncher.Sdk.JAVA)
.setPipeline(readPipeline)
.addParameter("runner", configuration.runner)
@@ -326,7 +325,10 @@ public final class BigQueryIOLT extends IOLoadTestBase {
// export metrics
MetricsConfiguration metricsConfig =
-
MetricsConfiguration.builder().setOutputPCollection(READ_PCOLLECTION).build();
+ MetricsConfiguration.builder()
+ .setOutputPCollection("Counting element.out0")
+ .setOutputPCollectionV2("Counting
element/ParMultiDo(Counting).out0")
+ .build();
try {
exportMetricsToBigQuery(launchInfo, getMetrics(launchInfo,
metricsConfig));
} catch (ParseException | InterruptedException e) {
diff --git
a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigtable/BigTableIOLT.java
b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigtable/BigTableIOLT.java
index fc7bd87707f..e232ed31cb5 100644
---
a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigtable/BigTableIOLT.java
+++
b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigtable/BigTableIOLT.java
@@ -115,8 +115,6 @@ public class BigTableIOLT extends IOLoadTestBase {
/** Run integration test with configurations specified by TestProperties. */
@Test
public void testWriteAndRead() throws IOException {
- final String readPCollection = "Counting element.out0";
- final String writePCollection = "Map records.out0";
tableId = generateTableId(testName);
resourceManager.createTable(
@@ -149,8 +147,10 @@ public class BigTableIOLT extends IOLoadTestBase {
// export metrics
MetricsConfiguration metricsConfig =
MetricsConfiguration.builder()
- .setInputPCollection(writePCollection)
- .setOutputPCollection(readPCollection)
+ .setInputPCollection("Map records.out0")
+ .setInputPCollectionV2("Map
records/ParMultiDo(MapToBigTableFormat).out0")
+ .setOutputPCollection("Counting element.out0")
+ .setOutputPCollectionV2("Counting
element/ParMultiDo(Counting).out0")
.build();
try {
exportMetricsToBigQuery(writeInfo, getMetrics(writeInfo, metricsConfig));
@@ -174,7 +174,7 @@ public class BigTableIOLT extends IOLoadTestBase {
.apply("Write to BigTable", writeIO);
PipelineLauncher.LaunchConfig options =
- PipelineLauncher.LaunchConfig.builder("test-bigtable-write")
+ PipelineLauncher.LaunchConfig.builder("write-bigtable")
.setSdk(PipelineLauncher.Sdk.JAVA)
.setPipeline(writePipeline)
.addParameter("runner", configuration.getRunner())
@@ -196,7 +196,7 @@ public class BigTableIOLT extends IOLoadTestBase {
.apply("Counting element", ParDo.of(new
CountingFn<>(READ_ELEMENT_METRIC_NAME)));
PipelineLauncher.LaunchConfig options =
- PipelineLauncher.LaunchConfig.builder("test-bigtable-read")
+ PipelineLauncher.LaunchConfig.builder("read-bigtable")
.setSdk(PipelineLauncher.Sdk.JAVA)
.setPipeline(readPipeline)
.addParameter("runner", configuration.getRunner())
@@ -227,18 +227,18 @@ public class BigTableIOLT extends IOLoadTestBase {
@AutoValue.Builder
abstract static class Builder {
- abstract Builder setNumRows(long numRows);
+ abstract Configuration.Builder setNumRows(long numRows);
- abstract Builder setPipelineTimeout(int timeOutMinutes);
+ abstract Configuration.Builder setPipelineTimeout(int timeOutMinutes);
- abstract Builder setRunner(String runner);
+ abstract Configuration.Builder setRunner(String runner);
- abstract Builder setValueSizeBytes(int valueSizeBytes);
+ abstract Configuration.Builder setValueSizeBytes(int valueSizeBytes);
abstract Configuration build();
}
- abstract Builder toBuilder();
+ abstract Configuration.Builder toBuilder();
}
/** Maps long number to the BigTable format record. */
diff --git
a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigtable/BigtableResourceManagerTest.java
b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigtable/BigtableResourceManagerTest.java
index 65745aea49b..f8673ed696c 100644
---
a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigtable/BigtableResourceManagerTest.java
+++
b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigtable/BigtableResourceManagerTest.java
@@ -442,6 +442,7 @@ public class BigtableResourceManagerTest {
setupReadyTable();
testManager.createTable(TABLE_ID, ImmutableList.of("cf1"));
+
when(bigtableResourceManagerClientFactory.bigtableTableAdminClient().exists(anyString()))
.thenReturn(true);
testManager.readTable(TABLE_ID);
diff --git
a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/dataflow/ClassicTemplateClientTest.java
b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/dataflow/ClassicTemplateClientTest.java
index cfd56e596e5..88c35589f2b 100644
---
a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/dataflow/ClassicTemplateClientTest.java
+++
b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/dataflow/ClassicTemplateClientTest.java
@@ -18,6 +18,10 @@
package org.apache.beam.it.gcp.dataflow;
import static com.google.common.truth.Truth.assertThat;
+import static
org.apache.beam.it.gcp.dataflow.AbstractPipelineLauncher.LEGACY_RUNNER;
+import static
org.apache.beam.it.gcp.dataflow.AbstractPipelineLauncher.PARAM_JOB_ID;
+import static
org.apache.beam.it.gcp.dataflow.AbstractPipelineLauncher.PARAM_JOB_TYPE;
+import static
org.apache.beam.it.gcp.dataflow.AbstractPipelineLauncher.PARAM_RUNNER;
import static org.junit.Assert.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
@@ -153,8 +157,14 @@ public final class ClassicTemplateClientTest {
.setSdk("Apache Beam Java")
.setVersion("2.42.0")
.setJobType("JOB_TYPE_BATCH")
- .setRunner("Dataflow Legacy Runner")
- .setParameters(ImmutableMap.of(PARAM_KEY, PARAM_VALUE))
+ .setRunner(AbstractPipelineLauncher.LEGACY_RUNNER)
+ .setParameters(
+ ImmutableMap.<String, String>builder()
+ .put(PARAM_KEY, PARAM_VALUE)
+ .put(PARAM_JOB_ID, JOB_ID)
+ .put(PARAM_RUNNER, LEGACY_RUNNER)
+ .put(PARAM_JOB_TYPE, "JOB_TYPE_BATCH")
+ .build())
.build();
assertThat(actual).isEqualTo(expected);
}
diff --git
a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/dataflow/FlexTemplateClientTest.java
b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/dataflow/FlexTemplateClientTest.java
index 4088efe6751..06f44437414 100644
---
a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/dataflow/FlexTemplateClientTest.java
+++
b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/dataflow/FlexTemplateClientTest.java
@@ -18,6 +18,10 @@
package org.apache.beam.it.gcp.dataflow;
import static com.google.common.truth.Truth.assertThat;
+import static
org.apache.beam.it.gcp.dataflow.AbstractPipelineLauncher.LEGACY_RUNNER;
+import static
org.apache.beam.it.gcp.dataflow.AbstractPipelineLauncher.PARAM_JOB_ID;
+import static
org.apache.beam.it.gcp.dataflow.AbstractPipelineLauncher.PARAM_JOB_TYPE;
+import static
org.apache.beam.it.gcp.dataflow.AbstractPipelineLauncher.PARAM_RUNNER;
import static org.junit.Assert.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
@@ -158,8 +162,14 @@ public final class FlexTemplateClientTest {
.setSdk("Apache Beam Java")
.setVersion("2.42.0")
.setJobType("JOB_TYPE_BATCH")
- .setRunner("Dataflow Legacy Runner")
- .setParameters(ImmutableMap.of(PARAM_KEY, PARAM_VALUE))
+ .setRunner(LEGACY_RUNNER)
+ .setParameters(
+ ImmutableMap.<String, String>builder()
+ .put(PARAM_KEY, PARAM_VALUE)
+ .put(PARAM_JOB_ID, JOB_ID)
+ .put(PARAM_RUNNER, LEGACY_RUNNER)
+ .put(PARAM_JOB_TYPE, "JOB_TYPE_BATCH")
+ .build())
.build();
assertThat(actual).isEqualTo(expected);
}
diff --git
a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/storage/FileBasedIOLT.java
b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/storage/FileBasedIOLT.java
index fd1bc1772f2..704f8337c66 100644
---
a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/storage/FileBasedIOLT.java
+++
b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/storage/FileBasedIOLT.java
@@ -90,7 +90,7 @@ public class FileBasedIOLT extends IOLoadTestBase {
@Rule public TestPipeline readPipeline = TestPipeline.create();
- private static final Map<String, Configuration> TEST_CONFIGS_PRESET;
+ private static final Map<String, FileBasedIOLT.Configuration>
TEST_CONFIGS_PRESET;
static {
try {
@@ -160,8 +160,6 @@ public class FileBasedIOLT extends IOLoadTestBase {
@Test
public void testTextIOWriteThenRead() throws IOException {
- final String readPCollection = "Counting element.out0";
- final String writePCollection = "Map records.out0";
TextIO.TypedWrite<String, Object> write =
TextIO.write()
@@ -182,7 +180,7 @@ public class FileBasedIOLT extends IOLoadTestBase {
.apply("Counting element", ParDo.of(new
CountingFn<>(READ_ELEMENT_METRIC_NAME)));
PipelineLauncher.LaunchConfig writeOptions =
- PipelineLauncher.LaunchConfig.builder("test-textio-write")
+ PipelineLauncher.LaunchConfig.builder("write-textio")
.setSdk(PipelineLauncher.Sdk.JAVA)
.setPipeline(writePipeline)
.addParameter("runner", configuration.runner)
@@ -196,7 +194,7 @@ public class FileBasedIOLT extends IOLoadTestBase {
assertThatResult(writeResult).isLaunchFinished();
PipelineLauncher.LaunchConfig readOptions =
- PipelineLauncher.LaunchConfig.builder("test-textio-read")
+ PipelineLauncher.LaunchConfig.builder("read-textio")
.setSdk(PipelineLauncher.Sdk.JAVA)
.setPipeline(readPipeline)
.addParameter("runner", configuration.runner)
@@ -222,8 +220,10 @@ public class FileBasedIOLT extends IOLoadTestBase {
// export metrics
MetricsConfiguration metricsConfig =
MetricsConfiguration.builder()
- .setInputPCollection(writePCollection)
- .setOutputPCollection(readPCollection)
+ .setInputPCollection("Map records.out0")
+ .setInputPCollectionV2("Map
records/ParMultiDo(MapKVToString).out0")
+ .setOutputPCollection("Counting element.out0")
+ .setOutputPCollectionV2("Counting
element/ParMultiDo(Counting).out0")
.build();
try {
exportMetricsToBigQuery(writeInfo, getMetrics(writeInfo, metricsConfig));
diff --git
a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/storage/GcsResourceManagerTest.java
b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/storage/GcsResourceManagerTest.java
index 3ec96da8100..0153573feae 100644
---
a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/storage/GcsResourceManagerTest.java
+++
b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/storage/GcsResourceManagerTest.java
@@ -71,7 +71,7 @@ public final class GcsResourceManagerTest {
@Mock private Blob blob;
private GcsResourceManager gcsClient;
- private static final String ARTIFACT_NAME = "test-artifact.txt";
+ private static final String ARTIFACT_NAME = "test-artifact.json";
private static final Path LOCAL_PATH;
private static final byte[] TEST_ARTIFACT_CONTENTS;
diff --git a/it/google-cloud-platform/src/test/resources/test-artifact.txt
b/it/google-cloud-platform/src/test/resources/test-artifact.txt
deleted file mode 100644
index 22c4e1d122a..00000000000
--- a/it/google-cloud-platform/src/test/resources/test-artifact.txt
+++ /dev/null
@@ -1 +0,0 @@
-This is a test artifact.
\ No newline at end of file
diff --git
a/it/jdbc/src/main/java/org/apache/beam/it/jdbc/MSSQLResourceManager.java
b/it/jdbc/src/main/java/org/apache/beam/it/jdbc/MSSQLResourceManager.java
index 0bcb16c6109..c515b2c4844 100644
--- a/it/jdbc/src/main/java/org/apache/beam/it/jdbc/MSSQLResourceManager.java
+++ b/it/jdbc/src/main/java/org/apache/beam/it/jdbc/MSSQLResourceManager.java
@@ -61,13 +61,14 @@ public class MSSQLResourceManager
}
@VisibleForTesting
- <T extends DefaultMSSQLServerContainer<T>> MSSQLResourceManager(T container,
Builder builder) {
+ <T extends MSSQLResourceManager.DefaultMSSQLServerContainer<T>>
MSSQLResourceManager(
+ T container, Builder builder) {
super(container, builder);
initialized = true;
}
- public static Builder builder(String testId) {
- return new Builder(testId);
+ public static MSSQLResourceManager.Builder builder(String testId) {
+ return new MSSQLResourceManager.Builder(testId);
}
private synchronized void createDatabase(String databaseName) {
diff --git
a/it/jdbc/src/main/java/org/apache/beam/it/jdbc/MySQLResourceManager.java
b/it/jdbc/src/main/java/org/apache/beam/it/jdbc/MySQLResourceManager.java
index e1bf3640b53..688c26dfb56 100644
--- a/it/jdbc/src/main/java/org/apache/beam/it/jdbc/MySQLResourceManager.java
+++ b/it/jdbc/src/main/java/org/apache/beam/it/jdbc/MySQLResourceManager.java
@@ -49,8 +49,8 @@ public class MySQLResourceManager extends
AbstractJDBCResourceManager<MySQLConta
super(container, builder);
}
- public static Builder builder(String testId) {
- return new Builder(testId);
+ public static MySQLResourceManager.Builder builder(String testId) {
+ return new MySQLResourceManager.Builder(testId);
}
@Override
diff --git
a/it/jdbc/src/main/java/org/apache/beam/it/jdbc/OracleResourceManager.java
b/it/jdbc/src/main/java/org/apache/beam/it/jdbc/OracleResourceManager.java
index f44e939936d..8054d26c33f 100644
--- a/it/jdbc/src/main/java/org/apache/beam/it/jdbc/OracleResourceManager.java
+++ b/it/jdbc/src/main/java/org/apache/beam/it/jdbc/OracleResourceManager.java
@@ -45,7 +45,7 @@ public class OracleResourceManager extends
AbstractJDBCResourceManager<OracleCon
private static final String DEFAULT_ORACLE_USERNAME = "testUser";
private static final String DEFAULT_ORACLE_PASSWORD = "testPassword";
- private OracleResourceManager(Builder builder) {
+ private OracleResourceManager(OracleResourceManager.Builder builder) {
this(
new OracleContainer(
DockerImageName.parse(builder.containerImageName).withTag(builder.containerImageTag)),
@@ -53,12 +53,12 @@ public class OracleResourceManager extends
AbstractJDBCResourceManager<OracleCon
}
@VisibleForTesting
- OracleResourceManager(OracleContainer container, Builder builder) {
+ OracleResourceManager(OracleContainer container,
OracleResourceManager.Builder builder) {
super(container, builder);
}
- public static Builder builder(String testId) {
- return new Builder(testId);
+ public static OracleResourceManager.Builder builder(String testId) {
+ return new OracleResourceManager.Builder(testId);
}
@Override
diff --git
a/it/jdbc/src/main/java/org/apache/beam/it/jdbc/PostgresResourceManager.java
b/it/jdbc/src/main/java/org/apache/beam/it/jdbc/PostgresResourceManager.java
index e64bc596fc5..7f054dfbc5d 100644
--- a/it/jdbc/src/main/java/org/apache/beam/it/jdbc/PostgresResourceManager.java
+++ b/it/jdbc/src/main/java/org/apache/beam/it/jdbc/PostgresResourceManager.java
@@ -38,7 +38,7 @@ public class PostgresResourceManager extends
AbstractJDBCResourceManager<Postgre
// https://hub.docker.com/_/postgres/tags?tab=tags
private static final String DEFAULT_POSTGRES_CONTAINER_TAG = "15.1";
- private PostgresResourceManager(Builder builder) {
+ private PostgresResourceManager(PostgresResourceManager.Builder builder) {
this(
new PostgreSQLContainer<>(
DockerImageName.parse(builder.containerImageName).withTag(builder.containerImageTag)),
@@ -46,12 +46,13 @@ public class PostgresResourceManager extends
AbstractJDBCResourceManager<Postgre
}
@VisibleForTesting
- PostgresResourceManager(PostgreSQLContainer<?> container, Builder builder) {
+ PostgresResourceManager(
+ PostgreSQLContainer<?> container, PostgresResourceManager.Builder
builder) {
super(container, builder);
}
- public static Builder builder(String testId) {
- return new Builder(testId);
+ public static PostgresResourceManager.Builder builder(String testId) {
+ return new PostgresResourceManager.Builder(testId);
}
@Override
diff --git
a/it/kafka/src/main/java/org/apache/beam/it/kafka/KafkaResourceManager.java
b/it/kafka/src/main/java/org/apache/beam/it/kafka/KafkaResourceManager.java
index d9a647dbeeb..7f7fb5b6956 100644
--- a/it/kafka/src/main/java/org/apache/beam/it/kafka/KafkaResourceManager.java
+++ b/it/kafka/src/main/java/org/apache/beam/it/kafka/KafkaResourceManager.java
@@ -71,13 +71,16 @@ public class KafkaResourceManager extends
TestContainerResourceManager<GenericCo
private final String connectionString;
private final boolean usingStaticTopic;
- private KafkaResourceManager(Builder builder) {
+ private KafkaResourceManager(KafkaResourceManager.Builder builder) {
this(null, new DefaultKafkaContainer(builder), builder);
}
@VisibleForTesting
@SuppressWarnings("nullness")
- KafkaResourceManager(@Nullable AdminClient client, KafkaContainer container,
Builder builder) {
+ KafkaResourceManager(
+ @Nullable AdminClient client,
+ KafkaContainer container,
+ KafkaResourceManager.Builder builder) {
super(container, builder);
this.usingStaticTopic = builder.topicNames.size() > 0;
@@ -102,8 +105,8 @@ public class KafkaResourceManager extends
TestContainerResourceManager<GenericCo
: AdminClient.create(ImmutableMap.of("bootstrap.servers",
this.connectionString));
}
- public static Builder builder(String testId) {
- return new Builder(testId);
+ public static KafkaResourceManager.Builder builder(String testId) {
+ return new KafkaResourceManager.Builder(testId);
}
/** Returns the kafka bootstrap server connection string. */
diff --git a/it/kafka/src/test/java/org/apache/beam/it/kafka/KafkaIOLT.java
b/it/kafka/src/test/java/org/apache/beam/it/kafka/KafkaIOLT.java
index a03030664de..ce6ad877c37 100644
--- a/it/kafka/src/test/java/org/apache/beam/it/kafka/KafkaIOLT.java
+++ b/it/kafka/src/test/java/org/apache/beam/it/kafka/KafkaIOLT.java
@@ -175,7 +175,7 @@ public final class KafkaIOLT extends IOLoadTestBase {
.apply("Write to Kafka", writeIO.withTopic(kafkaTopic));
PipelineLauncher.LaunchConfig options =
- PipelineLauncher.LaunchConfig.builder("test-kafka-write")
+ PipelineLauncher.LaunchConfig.builder("write-kafka")
.setSdk(PipelineLauncher.Sdk.JAVA)
.setPipeline(writePipeline)
.addParameter("runner", configuration.getRunner())
@@ -195,7 +195,7 @@ public final class KafkaIOLT extends IOLoadTestBase {
.apply("Counting element", ParDo.of(new
CountingFn<>(READ_ELEMENT_METRIC_NAME)));
PipelineLauncher.LaunchConfig options =
- PipelineLauncher.LaunchConfig.builder("test-kafka-read")
+ PipelineLauncher.LaunchConfig.builder("read-kafka")
.setSdk(PipelineLauncher.Sdk.JAVA)
.setPipeline(readPipeline)
.addParameter("runner", configuration.getRunner())
diff --git
a/it/mongodb/src/main/java/org/apache/beam/it/mongodb/MongoDBResourceManager.java
b/it/mongodb/src/main/java/org/apache/beam/it/mongodb/MongoDBResourceManager.java
index ed0e556bf0d..80216b14ac0 100644
---
a/it/mongodb/src/main/java/org/apache/beam/it/mongodb/MongoDBResourceManager.java
+++
b/it/mongodb/src/main/java/org/apache/beam/it/mongodb/MongoDBResourceManager.java
@@ -69,7 +69,7 @@ public class MongoDBResourceManager extends
TestContainerResourceManager<MongoDB
private final String connectionString;
private final boolean usingStaticDatabase;
- private MongoDBResourceManager(Builder builder) {
+ private MongoDBResourceManager(MongoDBResourceManager.Builder builder) {
this(
/* mongoClient= */ null,
new MongoDBContainer(
@@ -80,7 +80,9 @@ public class MongoDBResourceManager extends
TestContainerResourceManager<MongoDB
@VisibleForTesting
@SuppressWarnings("nullness")
MongoDBResourceManager(
- @Nullable MongoClient mongoClient, MongoDBContainer container, Builder
builder) {
+ @Nullable MongoClient mongoClient,
+ MongoDBContainer container,
+ MongoDBResourceManager.Builder builder) {
super(container, builder);
this.usingStaticDatabase = builder.databaseName != null;
@@ -91,8 +93,8 @@ public class MongoDBResourceManager extends
TestContainerResourceManager<MongoDB
this.mongoClient = mongoClient == null ?
MongoClients.create(connectionString) : mongoClient;
}
- public static Builder builder(String testId) {
- return new Builder(testId);
+ public static MongoDBResourceManager.Builder builder(String testId) {
+ return new MongoDBResourceManager.Builder(testId);
}
/** Returns the URI connection string to the MongoDB Database. */
diff --git
a/it/mongodb/src/main/java/org/apache/beam/it/mongodb/matchers/MongoDBAsserts.java
b/it/mongodb/src/main/java/org/apache/beam/it/mongodb/matchers/MongoDBAsserts.java
index ec08854b5c6..1a1b86acf56 100644
---
a/it/mongodb/src/main/java/org/apache/beam/it/mongodb/matchers/MongoDBAsserts.java
+++
b/it/mongodb/src/main/java/org/apache/beam/it/mongodb/matchers/MongoDBAsserts.java
@@ -31,7 +31,7 @@ import org.bson.Document;
public class MongoDBAsserts {
/**
- * Convert MongoDB {@link Document} to a list of maps.
+ * Convert MongoDB {@link org.bson.Document} to a list of maps.
*
* @param documents List of Documents to parse
* @return List of maps to use in {@link RecordsSubject}
diff --git
a/it/neo4j/src/main/java/org/apache/beam/it/neo4j/Neo4jResourceManager.java
b/it/neo4j/src/main/java/org/apache/beam/it/neo4j/Neo4jResourceManager.java
index 835be71ce0f..97bcca9e84b 100644
--- a/it/neo4j/src/main/java/org/apache/beam/it/neo4j/Neo4jResourceManager.java
+++ b/it/neo4j/src/main/java/org/apache/beam/it/neo4j/Neo4jResourceManager.java
@@ -66,7 +66,7 @@ public class Neo4jResourceManager extends
TestContainerResourceManager<Neo4jCont
private final String adminPassword;
- private Neo4jResourceManager(Builder builder) {
+ private Neo4jResourceManager(Neo4jResourceManager.Builder builder) {
this(
builder.driver,
new Neo4jContainer<>(
@@ -79,7 +79,10 @@ public class Neo4jResourceManager extends
TestContainerResourceManager<Neo4jCont
@VisibleForTesting
@SuppressWarnings("nullness")
- Neo4jResourceManager(@Nullable Driver neo4jDriver, Neo4jContainer<?>
container, Builder builder) {
+ Neo4jResourceManager(
+ @Nullable Driver neo4jDriver,
+ Neo4jContainer<?> container,
+ Neo4jResourceManager.Builder builder) {
super(container, builder);
this.adminPassword = builder.adminPassword;
@@ -98,8 +101,8 @@ public class Neo4jResourceManager extends
TestContainerResourceManager<Neo4jCont
}
}
- public static Builder builder(String testId) {
- return new Builder(testId);
+ public static Neo4jResourceManager.Builder builder(String testId) {
+ return new Neo4jResourceManager.Builder(testId);
}
/** Returns the URI connection string to the Neo4j Database. */
diff --git
a/it/splunk/src/main/java/org/apache/beam/it/splunk/SplunkResourceManager.java
b/it/splunk/src/main/java/org/apache/beam/it/splunk/SplunkResourceManager.java
index 0115a791eef..1ef4726df43 100644
---
a/it/splunk/src/main/java/org/apache/beam/it/splunk/SplunkResourceManager.java
+++
b/it/splunk/src/main/java/org/apache/beam/it/splunk/SplunkResourceManager.java
@@ -85,7 +85,7 @@ public class SplunkResourceManager extends
TestContainerResourceManager<SplunkCo
private final SplunkClientFactory clientFactory;
@SuppressWarnings("resource")
- private SplunkResourceManager(Builder builder) {
+ private SplunkResourceManager(SplunkResourceManager.Builder builder) {
this(
new SplunkClientFactory(),
new SplunkContainer(
@@ -98,7 +98,9 @@ public class SplunkResourceManager extends
TestContainerResourceManager<SplunkCo
@VisibleForTesting
@SuppressWarnings("nullness")
SplunkResourceManager(
- SplunkClientFactory clientFactory, SplunkContainer container, Builder
builder) {
+ SplunkClientFactory clientFactory,
+ SplunkContainer container,
+ SplunkResourceManager.Builder builder) {
super(setup(container, builder), builder);
String username = DEFAULT_SPLUNK_USERNAME;
@@ -167,8 +169,8 @@ public class SplunkResourceManager extends
TestContainerResourceManager<SplunkCo
.withPassword(builder.password);
}
- public static Builder builder(String testId) {
- return new Builder(testId);
+ public static SplunkResourceManager.Builder builder(String testId) {
+ return new SplunkResourceManager.Builder(testId);
}
/**
diff --git
a/it/testcontainers/src/main/java/org/apache/beam/it/testcontainers/TestContainerResourceManager.java
b/it/testcontainers/src/main/java/org/apache/beam/it/testcontainers/TestContainerResourceManager.java
index 098938a291d..77dd6da5805 100644
---
a/it/testcontainers/src/main/java/org/apache/beam/it/testcontainers/TestContainerResourceManager.java
+++
b/it/testcontainers/src/main/java/org/apache/beam/it/testcontainers/TestContainerResourceManager.java
@@ -32,11 +32,11 @@ import org.testcontainers.containers.GenericContainer;
* resources.
*
* <p>Optionally, a static resource can be specified by calling the
useStaticContainer() method in
- * the {@link Builder} class. A static resource is a pre-configured database
or other resource that
- * is ready to be connected to by the resource manager. This could be a
pre-existing TestContainer
- * that has not been closed, a local database instance, a remote VM, or any
other source that can be
- * connected to. If a static container is used, the host and port must also be
configured using the
- * Builder's setHost() and setPort() methods, respectively.
+ * the {@link TestContainerResourceManager.Builder} class. A static resource
is a pre-configured
+ * database or other resource that is ready to be connected to by the resource
manager. This could
+ * be a pre-existing TestContainer that has not been closed, a local database
instance, a remote VM,
+ * or any other source that can be connected to. If a static container is
used, the host and port
+ * must also be configured using the Builder's setHost() and setPort()
methods, respectively.
*/
public abstract class TestContainerResourceManager<T extends
GenericContainer<?>>
implements ResourceManager {
@@ -48,11 +48,12 @@ public abstract class TestContainerResourceManager<T
extends GenericContainer<?>
private final String host;
protected int port;
- protected <B extends Builder<?>> TestContainerResourceManager(T container, B
builder) {
+ protected <B extends TestContainerResourceManager.Builder<?>>
TestContainerResourceManager(
+ T container, B builder) {
this(container, builder, null);
}
- protected <B extends Builder<?>> TestContainerResourceManager(
+ protected <B extends TestContainerResourceManager.Builder<?>>
TestContainerResourceManager(
T container, B builder, @Nullable Callable<Void> setup) {
this.container = container;
this.usingStaticContainer = builder.useStaticContainer;
diff --git
a/it/truthmatchers/src/main/java/org/apache/beam/it/truthmatchers/LaunchInfoSubject.java
b/it/truthmatchers/src/main/java/org/apache/beam/it/truthmatchers/LaunchInfoSubject.java
index a496ecce944..30a27c9ad25 100644
---
a/it/truthmatchers/src/main/java/org/apache/beam/it/truthmatchers/LaunchInfoSubject.java
+++
b/it/truthmatchers/src/main/java/org/apache/beam/it/truthmatchers/LaunchInfoSubject.java
@@ -43,7 +43,7 @@ public final class LaunchInfoSubject extends Subject {
}
/**
- * Check if the subject reflects succeeded states. A successfully {@link
LaunchInfo} does not mean
+ * Check if the subject reflects succeeded states. A successful {@link
LaunchInfo} does not mean
* that the pipeline finished and no errors happened, it just means that the
job was able to get
* itself into an active state (RUNNING, UPDATED).
*/
diff --git
a/it/truthmatchers/src/main/java/org/apache/beam/it/truthmatchers/RecordsSubject.java
b/it/truthmatchers/src/main/java/org/apache/beam/it/truthmatchers/RecordsSubject.java
index 75d5ce3a67c..39a0c0cebed 100644
---
a/it/truthmatchers/src/main/java/org/apache/beam/it/truthmatchers/RecordsSubject.java
+++
b/it/truthmatchers/src/main/java/org/apache/beam/it/truthmatchers/RecordsSubject.java
@@ -81,7 +81,7 @@ public class RecordsSubject extends Subject {
Map<String, Object> expected = convertMapToTreeMap(subset);
for (Map<String, Object> candidate : actual) {
boolean match = true;
- for (Entry<String, Object> entry : subset.entrySet()) {
+ for (Map.Entry<String, Object> entry : subset.entrySet()) {
if (!candidate.containsKey(entry.getKey())
|| !candidate.get(entry.getKey()).equals(entry.getValue())) {
match = false;