This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 7dfc0c03b1b Sync it framework (#28541)
7dfc0c03b1b is described below

commit 7dfc0c03b1b793d9f798dc2dd93b0f81547568b3
Author: Yi Hu <[email protected]>
AuthorDate: Wed Sep 20 11:45:37 2023 -0400

    Sync it framework (#28541)
    
    * Sync it framework
    
    * spotless and checkstyle; exclude not-sync file
    
    * change to json extension to bypass RAT check
---
 .../it/cassandra/matchers/CassandraAsserts.java    |  2 +-
 .../apache/beam/it/common/utils/PipelineUtils.java | 21 ++++++++-
 .../beam/it/common/utils/PipelineUtilsTest.java    |  8 +++-
 .../it/elasticsearch/ElasticsearchUtilsTest.java   |  2 +-
 .../java/org/apache/beam/it/gcp/LoadTestBase.java  | 53 ++++++++++++++-------
 .../it/gcp/bigquery/BigQueryResourceManager.java   |  1 +
 .../it/gcp/bigtable/BigtableResourceManager.java   | 33 +++++++++++++
 .../gcp/bigtable/BigtableResourceManagerUtils.java |  2 +-
 .../it/gcp/dataflow/AbstractPipelineLauncher.java  | 13 +++++-
 .../it/gcp/dataflow/DefaultPipelineLauncher.java   |  6 +--
 .../beam/it/gcp/dataflow/DirectRunnerClient.java   |  6 +--
 .../beam/it/gcp/datagenerator/DataGenerator.java   | 54 +++++++++++-----------
 .../gcp/datastore/matchers/DatastoreAsserts.java   |  3 +-
 .../apache/beam/it/gcp/dlp/DlpResourceManager.java |  5 +-
 .../apache/beam/it/gcp/kms/KMSResourceManager.java |  5 +-
 .../beam/it/gcp/monitoring/MonitoringClient.java   | 12 ++---
 .../beam/it/gcp/pubsub/PubsubResourceManager.java  | 45 ++++++++++++++----
 .../src/main/resources/test-artifact.json          |  1 +
 .../apache/beam/it/gcp/bigquery/BigQueryIOLT.java  | 18 ++++----
 .../apache/beam/it/gcp/bigtable/BigTableIOLT.java  | 22 ++++-----
 .../gcp/bigtable/BigtableResourceManagerTest.java  |  1 +
 .../it/gcp/dataflow/ClassicTemplateClientTest.java | 14 +++++-
 .../it/gcp/dataflow/FlexTemplateClientTest.java    | 14 +++++-
 .../apache/beam/it/gcp/storage/FileBasedIOLT.java  | 14 +++---
 .../it/gcp/storage/GcsResourceManagerTest.java     |  2 +-
 .../src/test/resources/test-artifact.txt           |  1 -
 .../apache/beam/it/jdbc/MSSQLResourceManager.java  |  7 +--
 .../apache/beam/it/jdbc/MySQLResourceManager.java  |  4 +-
 .../apache/beam/it/jdbc/OracleResourceManager.java |  8 ++--
 .../beam/it/jdbc/PostgresResourceManager.java      |  9 ++--
 .../apache/beam/it/kafka/KafkaResourceManager.java | 11 +++--
 .../java/org/apache/beam/it/kafka/KafkaIOLT.java   |  4 +-
 .../beam/it/mongodb/MongoDBResourceManager.java    | 10 ++--
 .../beam/it/mongodb/matchers/MongoDBAsserts.java   |  2 +-
 .../apache/beam/it/neo4j/Neo4jResourceManager.java | 11 +++--
 .../beam/it/splunk/SplunkResourceManager.java      | 10 ++--
 .../TestContainerResourceManager.java              | 15 +++---
 .../beam/it/truthmatchers/LaunchInfoSubject.java   |  2 +-
 .../beam/it/truthmatchers/RecordsSubject.java      |  2 +-
 39 files changed, 303 insertions(+), 150 deletions(-)

diff --git 
a/it/cassandra/src/main/java/org/apache/beam/it/cassandra/matchers/CassandraAsserts.java
 
b/it/cassandra/src/main/java/org/apache/beam/it/cassandra/matchers/CassandraAsserts.java
index 61f730bf357..6aecc6609cf 100644
--- 
a/it/cassandra/src/main/java/org/apache/beam/it/cassandra/matchers/CassandraAsserts.java
+++ 
b/it/cassandra/src/main/java/org/apache/beam/it/cassandra/matchers/CassandraAsserts.java
@@ -31,7 +31,7 @@ import org.apache.beam.it.truthmatchers.RecordsSubject;
 public class CassandraAsserts {
 
   /**
-   * Convert Cassandra {@link Row} list to a list of maps.
+   * Convert Cassandra {@link com.datastax.oss.driver.api.core.cql.Row} list 
to a list of maps.
    *
    * @param rows Rows to parse.
    * @return List of maps to use in {@link RecordsSubject}.
diff --git 
a/it/common/src/main/java/org/apache/beam/it/common/utils/PipelineUtils.java 
b/it/common/src/main/java/org/apache/beam/it/common/utils/PipelineUtils.java
index c696457bbdd..d249d43d378 100644
--- a/it/common/src/main/java/org/apache/beam/it/common/utils/PipelineUtils.java
+++ b/it/common/src/main/java/org/apache/beam/it/common/utils/PipelineUtils.java
@@ -27,6 +27,7 @@ import org.apache.beam.sdk.PipelineRunner;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.CaseFormat;
+import org.apache.commons.lang3.RandomStringUtils;
 
 /** Utilities to make working with Dataflow easier. */
 public class PipelineUtils {
@@ -73,6 +74,15 @@ public class PipelineUtils {
     }
   }
 
+  /**
+   * Creates a job name. Method uses {@link #createJobName(String, int)}} 
without a random suffix.
+   *
+   * @see #createJobName(String, int)
+   */
+  public static String createJobName(String prefix) {
+    return createJobName(prefix, 0);
+  }
+
   /**
    * Creates a job name.
    *
@@ -83,17 +93,24 @@ public class PipelineUtils {
    * same prefix are requested in a short period of time.
    *
    * @param prefix a prefix for the job
+   * @param randomChars if the string should contain random chars at the end, 
to increase the
+   *     likelihood of being unique.
    * @return the prefix plus some way of identifying it separate from other 
jobs with the same
    *     prefix
    */
-  public static String createJobName(String prefix) {
+  public static String createJobName(String prefix, int randomChars) {
     String convertedPrefix =
         
CaseFormat.UPPER_CAMEL.converterTo(CaseFormat.LOWER_HYPHEN).convert(prefix);
     String formattedTimestamp =
         DateTimeFormatter.ofPattern("yyyyMMddHHmmssSSS")
             .withZone(ZoneId.of("UTC"))
             .format(Instant.now());
-    return String.format("%s-%s", convertedPrefix, formattedTimestamp);
+
+    String suffix = "";
+    if (randomChars > 0) {
+      suffix = "-" + 
RandomStringUtils.randomAlphanumeric(randomChars).toLowerCase();
+    }
+    return String.format("%s-%s%s", convertedPrefix, formattedTimestamp, 
suffix);
   }
 
   /** Get raw job name (without prefix) from a jobName generated by 
createJobName. */
diff --git 
a/it/common/src/test/java/org/apache/beam/it/common/utils/PipelineUtilsTest.java
 
b/it/common/src/test/java/org/apache/beam/it/common/utils/PipelineUtilsTest.java
index acf203b06e6..316283cdf7d 100644
--- 
a/it/common/src/test/java/org/apache/beam/it/common/utils/PipelineUtilsTest.java
+++ 
b/it/common/src/test/java/org/apache/beam/it/common/utils/PipelineUtilsTest.java
@@ -37,7 +37,13 @@ public class PipelineUtilsTest {
 
   @Test
   public void testCreateJobNameWithUppercase() {
-    
assertThat(createJobName("testWithUpperCase")).matches("test-with-upper-case" + 
"-\\d{17}");
+    
assertThat(createJobName("testWithUpperCase")).matches("test-with-upper-case-\\d{17}");
+  }
+
+  @Test
+  public void testCreateJobNameWithUppercaseSuffix() {
+    assertThat(createJobName("testWithUpperCase", 8))
+        .matches("test-with-upper-case-\\d{17}-[a-z0-9]{8}");
   }
 
   @Test
diff --git 
a/it/elasticsearch/src/test/java/org/apache/beam/it/elasticsearch/ElasticsearchUtilsTest.java
 
b/it/elasticsearch/src/test/java/org/apache/beam/it/elasticsearch/ElasticsearchUtilsTest.java
index eb250a1c5f8..61d6b5d57c2 100644
--- 
a/it/elasticsearch/src/test/java/org/apache/beam/it/elasticsearch/ElasticsearchUtilsTest.java
+++ 
b/it/elasticsearch/src/test/java/org/apache/beam/it/elasticsearch/ElasticsearchUtilsTest.java
@@ -34,7 +34,7 @@ public class ElasticsearchUtilsTest {
   @Test
   public void testGenerateIndexNameShouldReplaceForwardSlash() {
     String testBaseString = "Test/DB/Name";
-    String actual = generateIndexName(testBaseString);
+    String actual = ElasticsearchUtils.generateIndexName(testBaseString);
     assertThat(actual).matches("test-db-name-\\d{8}-\\d{6}-\\d{6}");
   }
 
diff --git 
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/LoadTestBase.java
 
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/LoadTestBase.java
index f6e359fed96..d9c1990ef07 100644
--- 
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/LoadTestBase.java
+++ 
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/LoadTestBase.java
@@ -18,6 +18,7 @@
 package org.apache.beam.it.gcp;
 
 import static org.apache.beam.it.common.logging.LogStrings.formatForLogging;
+import static 
org.apache.beam.it.gcp.dataflow.AbstractPipelineLauncher.RUNNER_V2;
 
 import com.google.api.gax.core.CredentialsProvider;
 import com.google.api.gax.core.FixedCredentialsProvider;
@@ -49,6 +50,7 @@ import org.apache.beam.it.gcp.monitoring.MonitoringClient;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.rules.TestRule;
 import org.junit.rules.TestWatcher;
@@ -107,11 +109,14 @@ public abstract class LoadTestBase {
         }
       };
 
-  @Before
-  @SuppressFBWarnings("ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD")
-  public void setUp() throws IOException {
+  @BeforeClass
+  public static void setUpClass() {
     project = TestProperties.project();
     region = TestProperties.region();
+  }
+
+  @Before
+  public void setUp() throws IOException {
     monitoringClient = MonitoringClient.builder(CREDENTIALS_PROVIDER).build();
     pipelineLauncher = launcher();
     pipelineOperator = new PipelineOperator(pipelineLauncher);
@@ -239,7 +244,7 @@ public abstract class LoadTestBase {
       metrics.put("EstimatedDataProcessedGB", dataProcessed / 1e9d);
     }
     metrics.putAll(getCpuUtilizationMetrics(launchInfo.jobId(), 
workerTimeInterval));
-    metrics.putAll(getThroughputMetrics(launchInfo.jobId(), config, 
workerTimeInterval));
+    metrics.putAll(getThroughputMetrics(launchInfo, config, 
workerTimeInterval));
   }
 
   /**
@@ -349,25 +354,30 @@ public abstract class LoadTestBase {
   /**
    * Computes throughput metrics of the given pcollection in dataflow job.
    *
-   * @param jobId dataflow job id
+   * @param jobInfo dataflow job LaunchInfo
    * @param config the {@class MetricsConfiguration}
    * @param timeInterval interval for the monitoring query
    * @return throughput metrics of the pcollection
    */
   protected Map<String, Double> getThroughputMetrics(
-      String jobId, MetricsConfiguration config, TimeInterval timeInterval) {
+      LaunchInfo jobInfo, MetricsConfiguration config, TimeInterval 
timeInterval) {
+    String jobId = jobInfo.jobId();
+    String iColl =
+        RUNNER_V2.equals(jobInfo.runner())
+            ? config.inputPCollectionV2()
+            : config.inputPCollection();
+    String oColl =
+        RUNNER_V2.equals(jobInfo.runner())
+            ? config.outputPCollectionV2()
+            : config.outputPCollection();
     List<Double> inputThroughputBytesPerSec =
-        monitoringClient.getThroughputBytesPerSecond(
-            project, jobId, config.inputPCollection(), timeInterval);
+        monitoringClient.getThroughputBytesPerSecond(project, jobId, iColl, 
timeInterval);
     List<Double> inputThroughputElementsPerSec =
-        monitoringClient.getThroughputElementsPerSecond(
-            project, jobId, config.inputPCollection(), timeInterval);
+        monitoringClient.getThroughputElementsPerSecond(project, jobId, iColl, 
timeInterval);
     List<Double> outputThroughputBytesPerSec =
-        monitoringClient.getThroughputBytesPerSecond(
-            project, jobId, config.outputPCollection(), timeInterval);
+        monitoringClient.getThroughputBytesPerSecond(project, jobId, oColl, 
timeInterval);
     List<Double> outputThroughputElementsPerSec =
-        monitoringClient.getThroughputElementsPerSecond(
-            project, jobId, config.outputPCollection(), timeInterval);
+        monitoringClient.getThroughputElementsPerSecond(project, jobId, oColl, 
timeInterval);
     return getThroughputMetrics(
         inputThroughputBytesPerSec,
         inputThroughputElementsPerSec,
@@ -495,22 +505,31 @@ public abstract class LoadTestBase {
      */
     public abstract @Nullable String inputPCollection();
 
+    /** Input PCollection name under Dataflow runner v2. */
+    public abstract @Nullable String inputPCollectionV2();
+
     /**
      * Input PCollection of the Dataflow job to query additional metrics. If 
not provided, the
      * metrics for inputPCollection will not be calculated.
      */
     public abstract @Nullable String outputPCollection();
 
-    public static Builder builder() {
+    public abstract @Nullable String outputPCollectionV2();
+
+    public static MetricsConfiguration.Builder builder() {
       return new AutoValue_LoadTestBase_MetricsConfiguration.Builder();
     }
 
     @AutoValue.Builder
     public abstract static class Builder {
 
-      public abstract Builder setInputPCollection(@Nullable String value);
+      public abstract MetricsConfiguration.Builder 
setInputPCollection(@Nullable String value);
+
+      public abstract MetricsConfiguration.Builder 
setInputPCollectionV2(@Nullable String value);
+
+      public abstract MetricsConfiguration.Builder 
setOutputPCollection(@Nullable String value);
 
-      public abstract Builder setOutputPCollection(@Nullable String value);
+      public abstract MetricsConfiguration.Builder 
setOutputPCollectionV2(@Nullable String value);
 
       public abstract MetricsConfiguration build();
     }
diff --git 
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/bigquery/BigQueryResourceManager.java
 
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/bigquery/BigQueryResourceManager.java
index 80bf5cfd938..d6d348f524b 100644
--- 
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/bigquery/BigQueryResourceManager.java
+++ 
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/bigquery/BigQueryResourceManager.java
@@ -461,6 +461,7 @@ public final class BigQueryResourceManager implements 
ResourceManager {
                   projectId, dataset.getDatasetId().getDataset(), 
table.getTableId().getTable()));
         }
         bigQuery.delete(dataset.getDatasetId());
+        dataset = null;
       }
     } catch (Exception e) {
       throw new BigQueryResourceManagerException("Failed to delete 
resources.", e);
diff --git 
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/bigtable/BigtableResourceManager.java
 
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/bigtable/BigtableResourceManager.java
index 1e6750cc81e..71388022928 100644
--- 
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/bigtable/BigtableResourceManager.java
+++ 
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/bigtable/BigtableResourceManager.java
@@ -33,6 +33,7 @@ import com.google.cloud.bigtable.admin.v2.models.AppProfile;
 import 
com.google.cloud.bigtable.admin.v2.models.AppProfile.MultiClusterRoutingPolicy;
 import com.google.cloud.bigtable.admin.v2.models.AppProfile.RoutingPolicy;
 import 
com.google.cloud.bigtable.admin.v2.models.AppProfile.SingleClusterRoutingPolicy;
+import com.google.cloud.bigtable.admin.v2.models.Cluster;
 import com.google.cloud.bigtable.admin.v2.models.CreateAppProfileRequest;
 import com.google.cloud.bigtable.admin.v2.models.CreateInstanceRequest;
 import com.google.cloud.bigtable.admin.v2.models.CreateTableRequest;
@@ -54,6 +55,8 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
 import javax.annotation.Nullable;
 import org.apache.beam.it.common.ResourceManager;
 import org.apache.commons.lang3.StringUtils;
@@ -93,6 +96,8 @@ public class BigtableResourceManager implements 
ResourceManager {
   private final Set<String> cdcEnabledTables;
 
   private boolean hasInstance;
+  private Iterable<BigtableResourceManagerCluster> clusters;
+
   private final boolean usingStaticInstance;
 
   private BigtableResourceManager(Builder builder) throws IOException {
@@ -111,6 +116,7 @@ public class BigtableResourceManager implements 
ResourceManager {
     this.createdTables = new ArrayList<>();
     this.createdAppProfiles = new ArrayList<>();
     this.cdcEnabledTables = new HashSet<>();
+    this.clusters = new ArrayList<>();
 
     // Check if RM was configured to use static Bigtable instance.
     if (builder.useStaticInstance) {
@@ -223,6 +229,7 @@ public class BigtableResourceManager implements 
ResourceManager {
           "Failed to create instance " + instanceId + ".", e);
     }
     hasInstance = true;
+    this.clusters = clusters;
 
     LOG.info("Successfully created instance {}.", instanceId);
   }
@@ -544,6 +551,32 @@ public class BigtableResourceManager implements 
ResourceManager {
     return tableRows;
   }
 
+  /** Get all the cluster names of the current instance. */
+  public List<String> getClusterNames() {
+    return StreamSupport.stream(getClusters().spliterator(), false)
+        .map(BigtableResourceManagerCluster::clusterId)
+        .collect(Collectors.toList());
+  }
+
+  private Iterable<BigtableResourceManagerCluster> getClusters() {
+    if (usingStaticInstance && this.clusters == null) {
+      try (BigtableInstanceAdminClient instanceAdminClient =
+          bigtableResourceManagerClientFactory.bigtableInstanceAdminClient()) {
+        List<BigtableResourceManagerCluster> managedClusters = new 
ArrayList<>();
+        for (Cluster cluster : instanceAdminClient.listClusters(instanceId)) {
+          managedClusters.add(
+              BigtableResourceManagerCluster.create(
+                  cluster.getId(),
+                  cluster.getZone(),
+                  cluster.getServeNodes(),
+                  cluster.getStorageType()));
+        }
+        this.clusters = managedClusters;
+      }
+    }
+    return this.clusters;
+  }
+
   /**
    * Deletes all created resources (instance and tables) and cleans up all 
Bigtable clients, making
    * the manager object unusable.
diff --git 
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/bigtable/BigtableResourceManagerUtils.java
 
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/bigtable/BigtableResourceManagerUtils.java
index eb2323e5297..a893493d766 100644
--- 
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/bigtable/BigtableResourceManagerUtils.java
+++ 
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/bigtable/BigtableResourceManagerUtils.java
@@ -34,7 +34,7 @@ public final class BigtableResourceManagerUtils {
   private static final Pattern ILLEGAL_INSTANCE_ID_CHARS = 
Pattern.compile("[^a-z0-9-]");
   private static final String REPLACE_INSTANCE_ID_CHAR = "-";
   private static final int MIN_TABLE_ID_LENGTH = 1;
-  private static final int MAX_TABLE_ID_LENGTH = 30;
+  private static final int MAX_TABLE_ID_LENGTH = 40;
   private static final Pattern ILLEGAL_TABLE_CHARS = 
Pattern.compile("[^a-zA-Z0-9-_.]");
   private static final String REPLACE_TABLE_ID_CHAR = "-";
 
diff --git 
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/dataflow/AbstractPipelineLauncher.java
 
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/dataflow/AbstractPipelineLauncher.java
index 08688d88b10..b5c9535953b 100644
--- 
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/dataflow/AbstractPipelineLauncher.java
+++ 
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/dataflow/AbstractPipelineLauncher.java
@@ -58,6 +58,11 @@ public abstract class AbstractPipelineLauncher implements 
PipelineLauncher {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(AbstractPipelineLauncher.class);
   private static final Pattern CURRENT_METRICS = 
Pattern.compile(".*Current.*");
+  public static final String LEGACY_RUNNER = "Dataflow Legacy Runner";
+  public static final String RUNNER_V2 = "Dataflow Runner V2";
+  public static final String PARAM_RUNNER = "runner";
+  public static final String PARAM_JOB_TYPE = "jobType";
+  public static final String PARAM_JOB_ID = "jobId";
 
   protected final List<String> launchedJobs = new ArrayList<>();
 
@@ -244,12 +249,12 @@ public abstract class AbstractPipelineLauncher implements 
PipelineLauncher {
    */
   protected LaunchInfo.Builder getJobInfoBuilder(LaunchConfig options, 
JobState state, Job job) {
     Map<String, String> labels = job.getLabels();
-    String runner = "Dataflow Legacy Runner";
+    String runner = LEGACY_RUNNER;
     Environment environment = job.getEnvironment();
     if (environment != null
         && environment.getExperiments() != null
         && environment.getExperiments().contains("use_runner_v2")) {
-      runner = "Dataflow Runner V2";
+      runner = RUNNER_V2;
     }
     LaunchInfo.Builder builder =
         LaunchInfo.builder()
@@ -266,6 +271,10 @@ public abstract class AbstractPipelineLauncher implements 
PipelineLauncher {
     // tests
     Map<String, String> parameters = new HashMap<>(options.parameters());
     options.environment().forEach((key, val) -> parameters.put(key, 
val.toString()));
+    // attach basic job info to parameters so that these are exported for load 
tests
+    parameters.put(PARAM_RUNNER, runner);
+    parameters.put(PARAM_JOB_TYPE, job.getType());
+    parameters.put(PARAM_JOB_ID, job.getId());
     builder.setParameters(ImmutableMap.copyOf(parameters));
     if (labels != null && !labels.isEmpty()) {
       // template job
diff --git 
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/dataflow/DefaultPipelineLauncher.java
 
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/dataflow/DefaultPipelineLauncher.java
index 7918dd6227d..ad2dcafc007 100644
--- 
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/dataflow/DefaultPipelineLauncher.java
+++ 
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/dataflow/DefaultPipelineLauncher.java
@@ -99,7 +99,7 @@ public class DefaultPipelineLauncher extends 
AbstractPipelineLauncher {
           .put(PipelineResult.State.UNRECOGNIZED, JobState.UNKNOWN)
           .build();
 
-  private DefaultPipelineLauncher(Builder builder) {
+  private DefaultPipelineLauncher(DefaultPipelineLauncher.Builder builder) {
     super(
         new Dataflow(
             Utils.getDefaultTransport(),
@@ -109,8 +109,8 @@ public class DefaultPipelineLauncher extends 
AbstractPipelineLauncher {
                 : new HttpCredentialsAdapter(builder.getCredentials())));
   }
 
-  public static Builder builder(Credentials credentials) {
-    return new Builder(credentials);
+  public static DefaultPipelineLauncher.Builder builder(Credentials 
credentials) {
+    return new DefaultPipelineLauncher.Builder(credentials);
   }
 
   @Override
diff --git 
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/dataflow/DirectRunnerClient.java
 
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/dataflow/DirectRunnerClient.java
index 8017009ff37..57f8ad40c1b 100644
--- 
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/dataflow/DirectRunnerClient.java
+++ 
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/dataflow/DirectRunnerClient.java
@@ -53,8 +53,8 @@ public class DirectRunnerClient implements PipelineLauncher {
     this.mainClass = builder.getMainClass();
   }
 
-  public static Builder builder(Class<?> mainClass) {
-    return new Builder(mainClass);
+  public static DirectRunnerClient.Builder builder(Class<?> mainClass) {
+    return new DirectRunnerClient.Builder(mainClass);
   }
 
   @Override
@@ -172,7 +172,7 @@ public class DirectRunnerClient implements PipelineLauncher 
{
       return mainClass;
     }
 
-    public Builder setCredentials(Credentials value) {
+    public DirectRunnerClient.Builder setCredentials(Credentials value) {
       credentials = value;
       return this;
     }
diff --git 
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/datagenerator/DataGenerator.java
 
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/datagenerator/DataGenerator.java
index 99016b5dd3a..832a75defd9 100644
--- 
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/datagenerator/DataGenerator.java
+++ 
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/datagenerator/DataGenerator.java
@@ -61,14 +61,16 @@ public class DataGenerator {
             .build();
   }
 
-  public static Builder builderWithSchemaLocation(String testName, String 
schemaLocation) {
-    return new Builder(testName + "-data-generator")
+  public static DataGenerator.Builder builderWithSchemaLocation(
+      String testName, String schemaLocation) {
+    return new DataGenerator.Builder(testName + "-data-generator")
         .setSchemaLocation(schemaLocation)
         .setAutoscalingAlgorithm(AutoscalingAlgorithmType.THROUGHPUT_BASED);
   }
 
-  public static Builder builderWithSchemaTemplate(String testName, String 
schemaTemplate) {
-    return new Builder(testName + "-data-generator")
+  public static DataGenerator.Builder builderWithSchemaTemplate(
+      String testName, String schemaTemplate) {
+    return new DataGenerator.Builder(testName + "-data-generator")
         .setSchemaTemplate(schemaTemplate)
         .setAutoscalingAlgorithm(AutoscalingAlgorithmType.THROUGHPUT_BASED);
   }
@@ -129,27 +131,27 @@ public class DataGenerator {
       return parameters;
     }
 
-    public Builder setSchemaTemplate(String value) {
+    public DataGenerator.Builder setSchemaTemplate(String value) {
       parameters.put("schemaTemplate", value);
       return this;
     }
 
-    public Builder setSchemaLocation(String value) {
+    public DataGenerator.Builder setSchemaLocation(String value) {
       parameters.put("schemaLocation", value);
       return this;
     }
 
-    public Builder setMessagesLimit(String value) {
+    public DataGenerator.Builder setMessagesLimit(String value) {
       parameters.put(MESSAGES_LIMIT, value);
       return this;
     }
 
-    public Builder setQPS(String value) {
+    public DataGenerator.Builder setQPS(String value) {
       parameters.put("qps", value);
       return this;
     }
 
-    public Builder setSinkType(String value) {
+    public DataGenerator.Builder setSinkType(String value) {
       parameters.put("sinkType", value);
       return this;
     }
@@ -164,87 +166,87 @@ public class DataGenerator {
       return this;
     }
 
-    public Builder setMaxNumWorkers(String value) {
+    public DataGenerator.Builder setMaxNumWorkers(String value) {
       parameters.put("maxNumWorkers", value);
       return this;
     }
 
-    public Builder setAutoscalingAlgorithm(AutoscalingAlgorithmType value) {
+    public DataGenerator.Builder 
setAutoscalingAlgorithm(AutoscalingAlgorithmType value) {
       parameters.put("autoscalingAlgorithm", value.toString());
       return this;
     }
 
-    public Builder setOutputDirectory(String value) {
+    public DataGenerator.Builder setOutputDirectory(String value) {
       parameters.put("outputDirectory", value);
       return this;
     }
 
-    public Builder setOutputType(String value) {
+    public DataGenerator.Builder setOutputType(String value) {
       parameters.put("outputType", value);
       return this;
     }
 
-    public Builder setNumShards(String value) {
+    public DataGenerator.Builder setNumShards(String value) {
       parameters.put("numShards", value);
       return this;
     }
 
-    public Builder setAvroSchemaLocation(String value) {
+    public DataGenerator.Builder setAvroSchemaLocation(String value) {
       parameters.put("avroSchemaLocation", value);
       return this;
     }
 
-    public Builder setTopic(String value) {
+    public DataGenerator.Builder setTopic(String value) {
       parameters.put("topic", value);
       return this;
     }
 
-    public Builder setProjectId(String value) {
+    public DataGenerator.Builder setProjectId(String value) {
       parameters.put("projectId", value);
       return this;
     }
 
-    public Builder setSpannerInstanceName(String value) {
+    public DataGenerator.Builder setSpannerInstanceName(String value) {
       parameters.put("spannerInstanceName", value);
       return this;
     }
 
-    public Builder setSpannerDatabaseName(String value) {
+    public DataGenerator.Builder setSpannerDatabaseName(String value) {
       parameters.put("spannerDatabaseName", value);
       return this;
     }
 
-    public Builder setSpannerTableName(String value) {
+    public DataGenerator.Builder setSpannerTableName(String value) {
       parameters.put("spannerTableName", value);
       return this;
     }
 
-    public Builder setDriverClassName(String value) {
+    public DataGenerator.Builder setDriverClassName(String value) {
       parameters.put("driverClassName", value);
       return this;
     }
 
-    public Builder setConnectionUrl(String value) {
+    public DataGenerator.Builder setConnectionUrl(String value) {
       parameters.put("connectionUrl", value);
       return this;
     }
 
-    public Builder setUsername(String value) {
+    public DataGenerator.Builder setUsername(String value) {
       parameters.put("username", value);
       return this;
     }
 
-    public Builder setPassword(String value) {
+    public DataGenerator.Builder setPassword(String value) {
       parameters.put("password", value);
       return this;
     }
 
-    public Builder setConnectionProperties(String value) {
+    public DataGenerator.Builder setConnectionProperties(String value) {
       parameters.put("connectionProperties", value);
       return this;
     }
 
-    public Builder setStatement(String value) {
+    public DataGenerator.Builder setStatement(String value) {
       parameters.put("statement", value);
       return this;
     }
diff --git 
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/datastore/matchers/DatastoreAsserts.java
 
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/datastore/matchers/DatastoreAsserts.java
index ef67a5a5c4f..78fa7543150 100644
--- 
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/datastore/matchers/DatastoreAsserts.java
+++ 
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/datastore/matchers/DatastoreAsserts.java
@@ -61,7 +61,8 @@ public class DatastoreAsserts {
   /**
    * Creates a {@link RecordsSubject} to assert information within a list of 
records.
    *
-   * @param results Records in Datastore {@link Entity} format to use in the 
comparison.
+   * @param results Records in Datastore {@link 
com.google.cloud.datastore.Entity} format to use in
+   *     the comparison.
    * @return Truth subject to chain assertions.
    */
   public static RecordsSubject assertThatDatastoreRecords(Collection<Entity> 
results) {
diff --git 
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/dlp/DlpResourceManager.java
 
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/dlp/DlpResourceManager.java
index f59794af3e1..de818a1bbff 100644
--- 
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/dlp/DlpResourceManager.java
+++ 
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/dlp/DlpResourceManager.java
@@ -113,8 +113,9 @@ public class DlpResourceManager implements ResourceManager {
    * @param project the GCP project ID
    * @return a new instance of Builder
    */
-  public static Builder builder(String project, CredentialsProvider 
credentialsProvider) {
-    return new Builder(project, credentialsProvider);
+  public static DlpResourceManager.Builder builder(
+      String project, CredentialsProvider credentialsProvider) {
+    return new DlpResourceManager.Builder(project, credentialsProvider);
   }
 
   /** A builder class for creating instances of {@link DlpResourceManager}. */
diff --git 
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/kms/KMSResourceManager.java
 
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/kms/KMSResourceManager.java
index 7e1a403c735..2cad6d0b9fa 100644
--- 
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/kms/KMSResourceManager.java
+++ 
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/kms/KMSResourceManager.java
@@ -72,8 +72,9 @@ public class KMSResourceManager implements ResourceManager {
     this.keyRing = null;
   }
 
-  public static Builder builder(String projectId, CredentialsProvider 
credentialsProvider) {
-    return new Builder(projectId, credentialsProvider);
+  public static KMSResourceManager.Builder builder(
+      String projectId, CredentialsProvider credentialsProvider) {
+    return new KMSResourceManager.Builder(projectId, credentialsProvider);
   }
 
   /**
diff --git 
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/monitoring/MonitoringClient.java
 
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/monitoring/MonitoringClient.java
index 0fc5614a363..06591ea4fe0 100644
--- 
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/monitoring/MonitoringClient.java
+++ 
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/monitoring/MonitoringClient.java
@@ -150,8 +150,8 @@ public final class MonitoringClient {
     Aggregation aggregation =
         Aggregation.newBuilder()
             .setAlignmentPeriod(Duration.newBuilder().setSeconds(60).build())
-            .setPerSeriesAligner(Aligner.ALIGN_MEAN)
-            .setCrossSeriesReducer(Reducer.REDUCE_MEAN)
+            .setPerSeriesAligner(Aggregation.Aligner.ALIGN_MEAN)
+            .setCrossSeriesReducer(Aggregation.Reducer.REDUCE_MEAN)
             .addGroupByFields("resource.instance_id")
             .build();
     ListTimeSeriesRequest request =
@@ -188,7 +188,7 @@ public final class MonitoringClient {
     Aggregation aggregation =
         Aggregation.newBuilder()
             .setAlignmentPeriod(Duration.newBuilder().setSeconds(60).build())
-            .setPerSeriesAligner(Aligner.ALIGN_MEAN)
+            .setPerSeriesAligner(Aggregation.Aligner.ALIGN_MEAN)
             .setCrossSeriesReducer(Reducer.REDUCE_MAX)
             .build();
     ListTimeSeriesRequest request =
@@ -225,7 +225,7 @@ public final class MonitoringClient {
     Aggregation aggregation =
         Aggregation.newBuilder()
             .setAlignmentPeriod(Duration.newBuilder().setSeconds(60).build())
-            .setPerSeriesAligner(Aligner.ALIGN_MEAN)
+            .setPerSeriesAligner(Aggregation.Aligner.ALIGN_MEAN)
             .setCrossSeriesReducer(Reducer.REDUCE_MAX)
             .build();
     ListTimeSeriesRequest request =
@@ -269,7 +269,7 @@ public final class MonitoringClient {
     Aggregation aggregation =
         Aggregation.newBuilder()
             .setAlignmentPeriod(Duration.newBuilder().setSeconds(60).build())
-            .setPerSeriesAligner(Aligner.ALIGN_RATE)
+            .setPerSeriesAligner(Aggregation.Aligner.ALIGN_RATE)
             .build();
     ListTimeSeriesRequest request =
         ListTimeSeriesRequest.newBuilder()
@@ -312,7 +312,7 @@ public final class MonitoringClient {
     Aggregation aggregation =
         Aggregation.newBuilder()
             .setAlignmentPeriod(Duration.newBuilder().setSeconds(60).build())
-            .setPerSeriesAligner(Aligner.ALIGN_RATE)
+            .setPerSeriesAligner(Aggregation.Aligner.ALIGN_RATE)
             .build();
     ListTimeSeriesRequest request =
         ListTimeSeriesRequest.newBuilder()
diff --git 
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/pubsub/PubsubResourceManager.java
 
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/pubsub/PubsubResourceManager.java
index 3a684d34c04..738620c15b7 100644
--- 
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/pubsub/PubsubResourceManager.java
+++ 
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/pubsub/PubsubResourceManager.java
@@ -20,6 +20,7 @@ package org.apache.beam.it.gcp.pubsub;
 import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
 
 import com.google.api.gax.core.CredentialsProvider;
+import com.google.api.gax.rpc.DeadlineExceededException;
 import com.google.cloud.pubsub.v1.Publisher;
 import com.google.cloud.pubsub.v1.SchemaServiceClient;
 import com.google.cloud.pubsub.v1.SchemaServiceSettings;
@@ -42,12 +43,16 @@ import com.google.pubsub.v1.SubscriptionName;
 import com.google.pubsub.v1.Topic;
 import com.google.pubsub.v1.TopicName;
 import com.google.pubsub.v1.UpdateTopicRequest;
+import dev.failsafe.Failsafe;
+import dev.failsafe.RetryPolicy;
 import java.io.IOException;
+import java.time.Duration;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import org.apache.beam.it.common.ResourceManager;
+import org.apache.beam.it.common.utils.ExceptionUtils;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
 import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
 import org.slf4j.Logger;
@@ -66,6 +71,12 @@ public final class PubsubResourceManager implements 
ResourceManager {
   private static final int DEFAULT_ACK_DEADLINE_SECONDS = 600;
   private static final String RESOURCE_NAME_SEPARATOR = "-";
 
+  // Retry settings for client operations
+  private static final int FAILSAFE_MAX_RETRIES = 5;
+  private static final Duration FAILSAFE_RETRY_DELAY = Duration.ofSeconds(10);
+  private static final Duration FAILSAFE_RETRY_MAX_DELAY = 
Duration.ofSeconds(60);
+  private static final double FAILSAFE_RETRY_JITTER = 0.1;
+
   private final String testId;
   private final String projectId;
   private final PubsubPublisherFactory publisherFactory;
@@ -184,11 +195,14 @@ public final class PubsubResourceManager implements 
ResourceManager {
     LOG.info("Creating subscription '{}' for topic '{}'", subscriptionName, 
topicName);
 
     Subscription subscription =
-        subscriptionAdminClient.createSubscription(
-            getSubscriptionName(subscriptionName),
-            topicName,
-            PushConfig.getDefaultInstance(),
-            DEFAULT_ACK_DEADLINE_SECONDS);
+        Failsafe.with(retryOnDeadlineExceeded())
+            .get(
+                () ->
+                    subscriptionAdminClient.createSubscription(
+                        getSubscriptionName(subscriptionName),
+                        topicName,
+                        PushConfig.getDefaultInstance(),
+                        DEFAULT_ACK_DEADLINE_SECONDS));
     SubscriptionName reference = PubsubUtils.toSubscriptionName(subscription);
     createdSubscriptions.add(getSubscriptionName(subscriptionName));
 
@@ -299,17 +313,19 @@ public final class PubsubResourceManager implements 
ResourceManager {
     try {
       for (SubscriptionName subscription : createdSubscriptions) {
         LOG.info("Deleting subscription '{}'", subscription);
-        subscriptionAdminClient.deleteSubscription(subscription);
+        Failsafe.with(retryOnDeadlineExceeded())
+            .run(() -> 
subscriptionAdminClient.deleteSubscription(subscription));
       }
 
       for (TopicName topic : createdTopics) {
         LOG.info("Deleting topic '{}'", topic);
-        topicAdminClient.deleteTopic(topic);
+        Failsafe.with(retryOnDeadlineExceeded()).run(() -> 
topicAdminClient.deleteTopic(topic));
       }
 
       for (SchemaName schemaName : createdSchemas) {
         LOG.info("Deleting schema '{}'", schemaName);
-        schemaServiceClient.deleteSchema(schemaName);
+        Failsafe.with(retryOnDeadlineExceeded())
+            .run(() -> schemaServiceClient.deleteSchema(schemaName));
       }
     } finally {
       subscriptionAdminClient.close();
@@ -342,7 +358,8 @@ public final class PubsubResourceManager implements 
ResourceManager {
   private TopicName createTopicInternal(TopicName topicName) {
     LOG.info("Creating topic '{}'...", topicName.toString());
 
-    Topic topic = topicAdminClient.createTopic(topicName);
+    Topic topic =
+        Failsafe.with(retryOnDeadlineExceeded()).get(() -> 
topicAdminClient.createTopic(topicName));
     TopicName reference = PubsubUtils.toTopicName(topic);
     createdTopics.add(reference);
 
@@ -355,6 +372,16 @@ public final class PubsubResourceManager implements 
ResourceManager {
     return topicAdminClient.isShutdown() || 
subscriptionAdminClient.isShutdown();
   }
 
+  private static <T> RetryPolicy<T> retryOnDeadlineExceeded() {
+    return RetryPolicy.<T>builder()
+        .handleIf(
+            exception -> ExceptionUtils.containsType(exception, 
DeadlineExceededException.class))
+        .withMaxRetries(FAILSAFE_MAX_RETRIES)
+        .withBackoff(FAILSAFE_RETRY_DELAY, FAILSAFE_RETRY_MAX_DELAY)
+        .withJitter(FAILSAFE_RETRY_JITTER)
+        .build();
+  }
+
   /** Builder for {@link PubsubResourceManager}. */
   public static final class Builder {
 
diff --git a/it/google-cloud-platform/src/main/resources/test-artifact.json 
b/it/google-cloud-platform/src/main/resources/test-artifact.json
new file mode 100644
index 00000000000..551c80d14a6
--- /dev/null
+++ b/it/google-cloud-platform/src/main/resources/test-artifact.json
@@ -0,0 +1 @@
+["This is a test artifact."]
\ No newline at end of file
diff --git 
a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigquery/BigQueryIOLT.java
 
b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigquery/BigQueryIOLT.java
index 03f6e8abfd4..a9ae6814277 100644
--- 
a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigquery/BigQueryIOLT.java
+++ 
b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigquery/BigQueryIOLT.java
@@ -99,12 +99,8 @@ public final class BigQueryIOLT extends IOLoadTestBase {
   private static final String READ_ELEMENT_METRIC_NAME = "read_count";
   private Configuration configuration;
   private String tempLocation;
-
   private TableSchema schema;
 
-  private static final String READ_PCOLLECTION = "Counting element.out0";
-  private static final String WRITE_PCOLLECTION = "Map records.out0";
-
   @Rule public TestPipeline writePipeline = TestPipeline.create();
   @Rule public TestPipeline readPipeline = TestPipeline.create();
 
@@ -268,7 +264,7 @@ public final class BigQueryIOLT extends IOLoadTestBase {
                 
.withCustomGcsTempLocation(ValueProvider.StaticValueProvider.of(tempLocation)));
 
     PipelineLauncher.LaunchConfig options =
-        PipelineLauncher.LaunchConfig.builder("test-bigquery-write")
+        PipelineLauncher.LaunchConfig.builder("write-bigquery")
             .setSdk(PipelineLauncher.Sdk.JAVA)
             .setPipeline(writePipeline)
             .addParameter("runner", configuration.runner)
@@ -284,7 +280,10 @@ public final class BigQueryIOLT extends IOLoadTestBase {
 
     // export metrics
     MetricsConfiguration metricsConfig =
-        
MetricsConfiguration.builder().setInputPCollection(WRITE_PCOLLECTION).build();
+        MetricsConfiguration.builder()
+            .setInputPCollection("Map records.out0")
+            .setInputPCollectionV2("Map records/ParMultiDo(MapKVToV).out0")
+            .build();
     try {
       exportMetricsToBigQuery(launchInfo, getMetrics(launchInfo, 
metricsConfig));
     } catch (ParseException | InterruptedException e) {
@@ -301,7 +300,7 @@ public final class BigQueryIOLT extends IOLoadTestBase {
         .apply("Counting element", ParDo.of(new 
CountingFn<>(READ_ELEMENT_METRIC_NAME)));
 
     PipelineLauncher.LaunchConfig options =
-        PipelineLauncher.LaunchConfig.builder("test-bigquery-read")
+        PipelineLauncher.LaunchConfig.builder("read-bigquery")
             .setSdk(PipelineLauncher.Sdk.JAVA)
             .setPipeline(readPipeline)
             .addParameter("runner", configuration.runner)
@@ -326,7 +325,10 @@ public final class BigQueryIOLT extends IOLoadTestBase {
 
     // export metrics
     MetricsConfiguration metricsConfig =
-        
MetricsConfiguration.builder().setOutputPCollection(READ_PCOLLECTION).build();
+        MetricsConfiguration.builder()
+            .setOutputPCollection("Counting element.out0")
+            .setOutputPCollectionV2("Counting 
element/ParMultiDo(Counting).out0")
+            .build();
     try {
       exportMetricsToBigQuery(launchInfo, getMetrics(launchInfo, 
metricsConfig));
     } catch (ParseException | InterruptedException e) {
diff --git 
a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigtable/BigTableIOLT.java
 
b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigtable/BigTableIOLT.java
index fc7bd87707f..e232ed31cb5 100644
--- 
a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigtable/BigTableIOLT.java
+++ 
b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigtable/BigTableIOLT.java
@@ -115,8 +115,6 @@ public class BigTableIOLT extends IOLoadTestBase {
   /** Run integration test with configurations specified by TestProperties. */
   @Test
   public void testWriteAndRead() throws IOException {
-    final String readPCollection = "Counting element.out0";
-    final String writePCollection = "Map records.out0";
 
     tableId = generateTableId(testName);
     resourceManager.createTable(
@@ -149,8 +147,10 @@ public class BigTableIOLT extends IOLoadTestBase {
     // export metrics
     MetricsConfiguration metricsConfig =
         MetricsConfiguration.builder()
-            .setInputPCollection(writePCollection)
-            .setOutputPCollection(readPCollection)
+            .setInputPCollection("Map records.out0")
+            .setInputPCollectionV2("Map 
records/ParMultiDo(MapToBigTableFormat).out0")
+            .setOutputPCollection("Counting element.out0")
+            .setOutputPCollectionV2("Counting 
element/ParMultiDo(Counting).out0")
             .build();
     try {
       exportMetricsToBigQuery(writeInfo, getMetrics(writeInfo, metricsConfig));
@@ -174,7 +174,7 @@ public class BigTableIOLT extends IOLoadTestBase {
         .apply("Write to BigTable", writeIO);
 
     PipelineLauncher.LaunchConfig options =
-        PipelineLauncher.LaunchConfig.builder("test-bigtable-write")
+        PipelineLauncher.LaunchConfig.builder("write-bigtable")
             .setSdk(PipelineLauncher.Sdk.JAVA)
             .setPipeline(writePipeline)
             .addParameter("runner", configuration.getRunner())
@@ -196,7 +196,7 @@ public class BigTableIOLT extends IOLoadTestBase {
         .apply("Counting element", ParDo.of(new 
CountingFn<>(READ_ELEMENT_METRIC_NAME)));
 
     PipelineLauncher.LaunchConfig options =
-        PipelineLauncher.LaunchConfig.builder("test-bigtable-read")
+        PipelineLauncher.LaunchConfig.builder("read-bigtable")
             .setSdk(PipelineLauncher.Sdk.JAVA)
             .setPipeline(readPipeline)
             .addParameter("runner", configuration.getRunner())
@@ -227,18 +227,18 @@ public class BigTableIOLT extends IOLoadTestBase {
 
     @AutoValue.Builder
     abstract static class Builder {
-      abstract Builder setNumRows(long numRows);
+      abstract Configuration.Builder setNumRows(long numRows);
 
-      abstract Builder setPipelineTimeout(int timeOutMinutes);
+      abstract Configuration.Builder setPipelineTimeout(int timeOutMinutes);
 
-      abstract Builder setRunner(String runner);
+      abstract Configuration.Builder setRunner(String runner);
 
-      abstract Builder setValueSizeBytes(int valueSizeBytes);
+      abstract Configuration.Builder setValueSizeBytes(int valueSizeBytes);
 
       abstract Configuration build();
     }
 
-    abstract Builder toBuilder();
+    abstract Configuration.Builder toBuilder();
   }
 
   /** Maps long number to the BigTable format record. */
diff --git 
a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigtable/BigtableResourceManagerTest.java
 
b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigtable/BigtableResourceManagerTest.java
index 65745aea49b..f8673ed696c 100644
--- 
a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigtable/BigtableResourceManagerTest.java
+++ 
b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigtable/BigtableResourceManagerTest.java
@@ -442,6 +442,7 @@ public class BigtableResourceManagerTest {
     setupReadyTable();
 
     testManager.createTable(TABLE_ID, ImmutableList.of("cf1"));
+
     
when(bigtableResourceManagerClientFactory.bigtableTableAdminClient().exists(anyString()))
         .thenReturn(true);
     testManager.readTable(TABLE_ID);
diff --git 
a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/dataflow/ClassicTemplateClientTest.java
 
b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/dataflow/ClassicTemplateClientTest.java
index cfd56e596e5..88c35589f2b 100644
--- 
a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/dataflow/ClassicTemplateClientTest.java
+++ 
b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/dataflow/ClassicTemplateClientTest.java
@@ -18,6 +18,10 @@
 package org.apache.beam.it.gcp.dataflow;
 
 import static com.google.common.truth.Truth.assertThat;
+import static 
org.apache.beam.it.gcp.dataflow.AbstractPipelineLauncher.LEGACY_RUNNER;
+import static 
org.apache.beam.it.gcp.dataflow.AbstractPipelineLauncher.PARAM_JOB_ID;
+import static 
org.apache.beam.it.gcp.dataflow.AbstractPipelineLauncher.PARAM_JOB_TYPE;
+import static 
org.apache.beam.it.gcp.dataflow.AbstractPipelineLauncher.PARAM_RUNNER;
 import static org.junit.Assert.assertThrows;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
@@ -153,8 +157,14 @@ public final class ClassicTemplateClientTest {
             .setSdk("Apache Beam Java")
             .setVersion("2.42.0")
             .setJobType("JOB_TYPE_BATCH")
-            .setRunner("Dataflow Legacy Runner")
-            .setParameters(ImmutableMap.of(PARAM_KEY, PARAM_VALUE))
+            .setRunner(AbstractPipelineLauncher.LEGACY_RUNNER)
+            .setParameters(
+                ImmutableMap.<String, String>builder()
+                    .put(PARAM_KEY, PARAM_VALUE)
+                    .put(PARAM_JOB_ID, JOB_ID)
+                    .put(PARAM_RUNNER, LEGACY_RUNNER)
+                    .put(PARAM_JOB_TYPE, "JOB_TYPE_BATCH")
+                    .build())
             .build();
     assertThat(actual).isEqualTo(expected);
   }
diff --git 
a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/dataflow/FlexTemplateClientTest.java
 
b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/dataflow/FlexTemplateClientTest.java
index 4088efe6751..06f44437414 100644
--- 
a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/dataflow/FlexTemplateClientTest.java
+++ 
b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/dataflow/FlexTemplateClientTest.java
@@ -18,6 +18,10 @@
 package org.apache.beam.it.gcp.dataflow;
 
 import static com.google.common.truth.Truth.assertThat;
+import static 
org.apache.beam.it.gcp.dataflow.AbstractPipelineLauncher.LEGACY_RUNNER;
+import static 
org.apache.beam.it.gcp.dataflow.AbstractPipelineLauncher.PARAM_JOB_ID;
+import static 
org.apache.beam.it.gcp.dataflow.AbstractPipelineLauncher.PARAM_JOB_TYPE;
+import static 
org.apache.beam.it.gcp.dataflow.AbstractPipelineLauncher.PARAM_RUNNER;
 import static org.junit.Assert.assertThrows;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
@@ -158,8 +162,14 @@ public final class FlexTemplateClientTest {
             .setSdk("Apache Beam Java")
             .setVersion("2.42.0")
             .setJobType("JOB_TYPE_BATCH")
-            .setRunner("Dataflow Legacy Runner")
-            .setParameters(ImmutableMap.of(PARAM_KEY, PARAM_VALUE))
+            .setRunner(LEGACY_RUNNER)
+            .setParameters(
+                ImmutableMap.<String, String>builder()
+                    .put(PARAM_KEY, PARAM_VALUE)
+                    .put(PARAM_JOB_ID, JOB_ID)
+                    .put(PARAM_RUNNER, LEGACY_RUNNER)
+                    .put(PARAM_JOB_TYPE, "JOB_TYPE_BATCH")
+                    .build())
             .build();
     assertThat(actual).isEqualTo(expected);
   }
diff --git 
a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/storage/FileBasedIOLT.java
 
b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/storage/FileBasedIOLT.java
index fd1bc1772f2..704f8337c66 100644
--- 
a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/storage/FileBasedIOLT.java
+++ 
b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/storage/FileBasedIOLT.java
@@ -90,7 +90,7 @@ public class FileBasedIOLT extends IOLoadTestBase {
 
   @Rule public TestPipeline readPipeline = TestPipeline.create();
 
-  private static final Map<String, Configuration> TEST_CONFIGS_PRESET;
+  private static final Map<String, FileBasedIOLT.Configuration> 
TEST_CONFIGS_PRESET;
 
   static {
     try {
@@ -160,8 +160,6 @@ public class FileBasedIOLT extends IOLoadTestBase {
 
   @Test
   public void testTextIOWriteThenRead() throws IOException {
-    final String readPCollection = "Counting element.out0";
-    final String writePCollection = "Map records.out0";
 
     TextIO.TypedWrite<String, Object> write =
         TextIO.write()
@@ -182,7 +180,7 @@ public class FileBasedIOLT extends IOLoadTestBase {
         .apply("Counting element", ParDo.of(new 
CountingFn<>(READ_ELEMENT_METRIC_NAME)));
 
     PipelineLauncher.LaunchConfig writeOptions =
-        PipelineLauncher.LaunchConfig.builder("test-textio-write")
+        PipelineLauncher.LaunchConfig.builder("write-textio")
             .setSdk(PipelineLauncher.Sdk.JAVA)
             .setPipeline(writePipeline)
             .addParameter("runner", configuration.runner)
@@ -196,7 +194,7 @@ public class FileBasedIOLT extends IOLoadTestBase {
     assertThatResult(writeResult).isLaunchFinished();
 
     PipelineLauncher.LaunchConfig readOptions =
-        PipelineLauncher.LaunchConfig.builder("test-textio-read")
+        PipelineLauncher.LaunchConfig.builder("read-textio")
             .setSdk(PipelineLauncher.Sdk.JAVA)
             .setPipeline(readPipeline)
             .addParameter("runner", configuration.runner)
@@ -222,8 +220,10 @@ public class FileBasedIOLT extends IOLoadTestBase {
     // export metrics
     MetricsConfiguration metricsConfig =
         MetricsConfiguration.builder()
-            .setInputPCollection(writePCollection)
-            .setOutputPCollection(readPCollection)
+            .setInputPCollection("Map records.out0")
+            .setInputPCollectionV2("Map 
records/ParMultiDo(MapKVToString).out0")
+            .setOutputPCollection("Counting element.out0")
+            .setOutputPCollectionV2("Counting 
element/ParMultiDo(Counting).out0")
             .build();
     try {
       exportMetricsToBigQuery(writeInfo, getMetrics(writeInfo, metricsConfig));
diff --git 
a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/storage/GcsResourceManagerTest.java
 
b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/storage/GcsResourceManagerTest.java
index 3ec96da8100..0153573feae 100644
--- 
a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/storage/GcsResourceManagerTest.java
+++ 
b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/storage/GcsResourceManagerTest.java
@@ -71,7 +71,7 @@ public final class GcsResourceManagerTest {
   @Mock private Blob blob;
   private GcsResourceManager gcsClient;
 
-  private static final String ARTIFACT_NAME = "test-artifact.txt";
+  private static final String ARTIFACT_NAME = "test-artifact.json";
   private static final Path LOCAL_PATH;
   private static final byte[] TEST_ARTIFACT_CONTENTS;
 
diff --git a/it/google-cloud-platform/src/test/resources/test-artifact.txt 
b/it/google-cloud-platform/src/test/resources/test-artifact.txt
deleted file mode 100644
index 22c4e1d122a..00000000000
--- a/it/google-cloud-platform/src/test/resources/test-artifact.txt
+++ /dev/null
@@ -1 +0,0 @@
-This is a test artifact.
\ No newline at end of file
diff --git 
a/it/jdbc/src/main/java/org/apache/beam/it/jdbc/MSSQLResourceManager.java 
b/it/jdbc/src/main/java/org/apache/beam/it/jdbc/MSSQLResourceManager.java
index 0bcb16c6109..c515b2c4844 100644
--- a/it/jdbc/src/main/java/org/apache/beam/it/jdbc/MSSQLResourceManager.java
+++ b/it/jdbc/src/main/java/org/apache/beam/it/jdbc/MSSQLResourceManager.java
@@ -61,13 +61,14 @@ public class MSSQLResourceManager
   }
 
   @VisibleForTesting
-  <T extends DefaultMSSQLServerContainer<T>> MSSQLResourceManager(T container, 
Builder builder) {
+  <T extends MSSQLResourceManager.DefaultMSSQLServerContainer<T>> 
MSSQLResourceManager(
+      T container, Builder builder) {
     super(container, builder);
     initialized = true;
   }
 
-  public static Builder builder(String testId) {
-    return new Builder(testId);
+  public static MSSQLResourceManager.Builder builder(String testId) {
+    return new MSSQLResourceManager.Builder(testId);
   }
 
   private synchronized void createDatabase(String databaseName) {
diff --git 
a/it/jdbc/src/main/java/org/apache/beam/it/jdbc/MySQLResourceManager.java 
b/it/jdbc/src/main/java/org/apache/beam/it/jdbc/MySQLResourceManager.java
index e1bf3640b53..688c26dfb56 100644
--- a/it/jdbc/src/main/java/org/apache/beam/it/jdbc/MySQLResourceManager.java
+++ b/it/jdbc/src/main/java/org/apache/beam/it/jdbc/MySQLResourceManager.java
@@ -49,8 +49,8 @@ public class MySQLResourceManager extends 
AbstractJDBCResourceManager<MySQLConta
     super(container, builder);
   }
 
-  public static Builder builder(String testId) {
-    return new Builder(testId);
+  public static MySQLResourceManager.Builder builder(String testId) {
+    return new MySQLResourceManager.Builder(testId);
   }
 
   @Override
diff --git 
a/it/jdbc/src/main/java/org/apache/beam/it/jdbc/OracleResourceManager.java 
b/it/jdbc/src/main/java/org/apache/beam/it/jdbc/OracleResourceManager.java
index f44e939936d..8054d26c33f 100644
--- a/it/jdbc/src/main/java/org/apache/beam/it/jdbc/OracleResourceManager.java
+++ b/it/jdbc/src/main/java/org/apache/beam/it/jdbc/OracleResourceManager.java
@@ -45,7 +45,7 @@ public class OracleResourceManager extends 
AbstractJDBCResourceManager<OracleCon
   private static final String DEFAULT_ORACLE_USERNAME = "testUser";
   private static final String DEFAULT_ORACLE_PASSWORD = "testPassword";
 
-  private OracleResourceManager(Builder builder) {
+  private OracleResourceManager(OracleResourceManager.Builder builder) {
     this(
         new OracleContainer(
             
DockerImageName.parse(builder.containerImageName).withTag(builder.containerImageTag)),
@@ -53,12 +53,12 @@ public class OracleResourceManager extends 
AbstractJDBCResourceManager<OracleCon
   }
 
   @VisibleForTesting
-  OracleResourceManager(OracleContainer container, Builder builder) {
+  OracleResourceManager(OracleContainer container, 
OracleResourceManager.Builder builder) {
     super(container, builder);
   }
 
-  public static Builder builder(String testId) {
-    return new Builder(testId);
+  public static OracleResourceManager.Builder builder(String testId) {
+    return new OracleResourceManager.Builder(testId);
   }
 
   @Override
diff --git 
a/it/jdbc/src/main/java/org/apache/beam/it/jdbc/PostgresResourceManager.java 
b/it/jdbc/src/main/java/org/apache/beam/it/jdbc/PostgresResourceManager.java
index e64bc596fc5..7f054dfbc5d 100644
--- a/it/jdbc/src/main/java/org/apache/beam/it/jdbc/PostgresResourceManager.java
+++ b/it/jdbc/src/main/java/org/apache/beam/it/jdbc/PostgresResourceManager.java
@@ -38,7 +38,7 @@ public class PostgresResourceManager extends 
AbstractJDBCResourceManager<Postgre
   // https://hub.docker.com/_/postgres/tags?tab=tags
   private static final String DEFAULT_POSTGRES_CONTAINER_TAG = "15.1";
 
-  private PostgresResourceManager(Builder builder) {
+  private PostgresResourceManager(PostgresResourceManager.Builder builder) {
     this(
         new PostgreSQLContainer<>(
             
DockerImageName.parse(builder.containerImageName).withTag(builder.containerImageTag)),
@@ -46,12 +46,13 @@ public class PostgresResourceManager extends 
AbstractJDBCResourceManager<Postgre
   }
 
   @VisibleForTesting
-  PostgresResourceManager(PostgreSQLContainer<?> container, Builder builder) {
+  PostgresResourceManager(
+      PostgreSQLContainer<?> container, PostgresResourceManager.Builder 
builder) {
     super(container, builder);
   }
 
-  public static Builder builder(String testId) {
-    return new Builder(testId);
+  public static PostgresResourceManager.Builder builder(String testId) {
+    return new PostgresResourceManager.Builder(testId);
   }
 
   @Override
diff --git 
a/it/kafka/src/main/java/org/apache/beam/it/kafka/KafkaResourceManager.java 
b/it/kafka/src/main/java/org/apache/beam/it/kafka/KafkaResourceManager.java
index d9a647dbeeb..7f7fb5b6956 100644
--- a/it/kafka/src/main/java/org/apache/beam/it/kafka/KafkaResourceManager.java
+++ b/it/kafka/src/main/java/org/apache/beam/it/kafka/KafkaResourceManager.java
@@ -71,13 +71,16 @@ public class KafkaResourceManager extends 
TestContainerResourceManager<GenericCo
   private final String connectionString;
   private final boolean usingStaticTopic;
 
-  private KafkaResourceManager(Builder builder) {
+  private KafkaResourceManager(KafkaResourceManager.Builder builder) {
     this(null, new DefaultKafkaContainer(builder), builder);
   }
 
   @VisibleForTesting
   @SuppressWarnings("nullness")
-  KafkaResourceManager(@Nullable AdminClient client, KafkaContainer container, 
Builder builder) {
+  KafkaResourceManager(
+      @Nullable AdminClient client,
+      KafkaContainer container,
+      KafkaResourceManager.Builder builder) {
     super(container, builder);
 
     this.usingStaticTopic = builder.topicNames.size() > 0;
@@ -102,8 +105,8 @@ public class KafkaResourceManager extends 
TestContainerResourceManager<GenericCo
             : AdminClient.create(ImmutableMap.of("bootstrap.servers", 
this.connectionString));
   }
 
-  public static Builder builder(String testId) {
-    return new Builder(testId);
+  public static KafkaResourceManager.Builder builder(String testId) {
+    return new KafkaResourceManager.Builder(testId);
   }
 
   /** Returns the kafka bootstrap server connection string. */
diff --git a/it/kafka/src/test/java/org/apache/beam/it/kafka/KafkaIOLT.java 
b/it/kafka/src/test/java/org/apache/beam/it/kafka/KafkaIOLT.java
index a03030664de..ce6ad877c37 100644
--- a/it/kafka/src/test/java/org/apache/beam/it/kafka/KafkaIOLT.java
+++ b/it/kafka/src/test/java/org/apache/beam/it/kafka/KafkaIOLT.java
@@ -175,7 +175,7 @@ public final class KafkaIOLT extends IOLoadTestBase {
         .apply("Write to Kafka", writeIO.withTopic(kafkaTopic));
 
     PipelineLauncher.LaunchConfig options =
-        PipelineLauncher.LaunchConfig.builder("test-kafka-write")
+        PipelineLauncher.LaunchConfig.builder("write-kafka")
             .setSdk(PipelineLauncher.Sdk.JAVA)
             .setPipeline(writePipeline)
             .addParameter("runner", configuration.getRunner())
@@ -195,7 +195,7 @@ public final class KafkaIOLT extends IOLoadTestBase {
         .apply("Counting element", ParDo.of(new 
CountingFn<>(READ_ELEMENT_METRIC_NAME)));
 
     PipelineLauncher.LaunchConfig options =
-        PipelineLauncher.LaunchConfig.builder("test-kafka-read")
+        PipelineLauncher.LaunchConfig.builder("read-kafka")
             .setSdk(PipelineLauncher.Sdk.JAVA)
             .setPipeline(readPipeline)
             .addParameter("runner", configuration.getRunner())
diff --git 
a/it/mongodb/src/main/java/org/apache/beam/it/mongodb/MongoDBResourceManager.java
 
b/it/mongodb/src/main/java/org/apache/beam/it/mongodb/MongoDBResourceManager.java
index ed0e556bf0d..80216b14ac0 100644
--- 
a/it/mongodb/src/main/java/org/apache/beam/it/mongodb/MongoDBResourceManager.java
+++ 
b/it/mongodb/src/main/java/org/apache/beam/it/mongodb/MongoDBResourceManager.java
@@ -69,7 +69,7 @@ public class MongoDBResourceManager extends 
TestContainerResourceManager<MongoDB
   private final String connectionString;
   private final boolean usingStaticDatabase;
 
-  private MongoDBResourceManager(Builder builder) {
+  private MongoDBResourceManager(MongoDBResourceManager.Builder builder) {
     this(
         /* mongoClient= */ null,
         new MongoDBContainer(
@@ -80,7 +80,9 @@ public class MongoDBResourceManager extends 
TestContainerResourceManager<MongoDB
   @VisibleForTesting
   @SuppressWarnings("nullness")
   MongoDBResourceManager(
-      @Nullable MongoClient mongoClient, MongoDBContainer container, Builder 
builder) {
+      @Nullable MongoClient mongoClient,
+      MongoDBContainer container,
+      MongoDBResourceManager.Builder builder) {
     super(container, builder);
 
     this.usingStaticDatabase = builder.databaseName != null;
@@ -91,8 +93,8 @@ public class MongoDBResourceManager extends 
TestContainerResourceManager<MongoDB
     this.mongoClient = mongoClient == null ? 
MongoClients.create(connectionString) : mongoClient;
   }
 
-  public static Builder builder(String testId) {
-    return new Builder(testId);
+  public static MongoDBResourceManager.Builder builder(String testId) {
+    return new MongoDBResourceManager.Builder(testId);
   }
 
   /** Returns the URI connection string to the MongoDB Database. */
diff --git 
a/it/mongodb/src/main/java/org/apache/beam/it/mongodb/matchers/MongoDBAsserts.java
 
b/it/mongodb/src/main/java/org/apache/beam/it/mongodb/matchers/MongoDBAsserts.java
index ec08854b5c6..1a1b86acf56 100644
--- 
a/it/mongodb/src/main/java/org/apache/beam/it/mongodb/matchers/MongoDBAsserts.java
+++ 
b/it/mongodb/src/main/java/org/apache/beam/it/mongodb/matchers/MongoDBAsserts.java
@@ -31,7 +31,7 @@ import org.bson.Document;
 public class MongoDBAsserts {
 
   /**
-   * Convert MongoDB {@link Document} to a list of maps.
+   * Convert MongoDB {@link org.bson.Document} to a list of maps.
    *
    * @param documents List of Documents to parse
    * @return List of maps to use in {@link RecordsSubject}
diff --git 
a/it/neo4j/src/main/java/org/apache/beam/it/neo4j/Neo4jResourceManager.java 
b/it/neo4j/src/main/java/org/apache/beam/it/neo4j/Neo4jResourceManager.java
index 835be71ce0f..97bcca9e84b 100644
--- a/it/neo4j/src/main/java/org/apache/beam/it/neo4j/Neo4jResourceManager.java
+++ b/it/neo4j/src/main/java/org/apache/beam/it/neo4j/Neo4jResourceManager.java
@@ -66,7 +66,7 @@ public class Neo4jResourceManager extends 
TestContainerResourceManager<Neo4jCont
 
   private final String adminPassword;
 
-  private Neo4jResourceManager(Builder builder) {
+  private Neo4jResourceManager(Neo4jResourceManager.Builder builder) {
     this(
         builder.driver,
         new Neo4jContainer<>(
@@ -79,7 +79,10 @@ public class Neo4jResourceManager extends 
TestContainerResourceManager<Neo4jCont
 
   @VisibleForTesting
   @SuppressWarnings("nullness")
-  Neo4jResourceManager(@Nullable Driver neo4jDriver, Neo4jContainer<?> 
container, Builder builder) {
+  Neo4jResourceManager(
+      @Nullable Driver neo4jDriver,
+      Neo4jContainer<?> container,
+      Neo4jResourceManager.Builder builder) {
     super(container, builder);
 
     this.adminPassword = builder.adminPassword;
@@ -98,8 +101,8 @@ public class Neo4jResourceManager extends 
TestContainerResourceManager<Neo4jCont
     }
   }
 
-  public static Builder builder(String testId) {
-    return new Builder(testId);
+  public static Neo4jResourceManager.Builder builder(String testId) {
+    return new Neo4jResourceManager.Builder(testId);
   }
 
   /** Returns the URI connection string to the Neo4j Database. */
diff --git 
a/it/splunk/src/main/java/org/apache/beam/it/splunk/SplunkResourceManager.java 
b/it/splunk/src/main/java/org/apache/beam/it/splunk/SplunkResourceManager.java
index 0115a791eef..1ef4726df43 100644
--- 
a/it/splunk/src/main/java/org/apache/beam/it/splunk/SplunkResourceManager.java
+++ 
b/it/splunk/src/main/java/org/apache/beam/it/splunk/SplunkResourceManager.java
@@ -85,7 +85,7 @@ public class SplunkResourceManager extends 
TestContainerResourceManager<SplunkCo
   private final SplunkClientFactory clientFactory;
 
   @SuppressWarnings("resource")
-  private SplunkResourceManager(Builder builder) {
+  private SplunkResourceManager(SplunkResourceManager.Builder builder) {
     this(
         new SplunkClientFactory(),
         new SplunkContainer(
@@ -98,7 +98,9 @@ public class SplunkResourceManager extends 
TestContainerResourceManager<SplunkCo
   @VisibleForTesting
   @SuppressWarnings("nullness")
   SplunkResourceManager(
-      SplunkClientFactory clientFactory, SplunkContainer container, Builder 
builder) {
+      SplunkClientFactory clientFactory,
+      SplunkContainer container,
+      SplunkResourceManager.Builder builder) {
     super(setup(container, builder), builder);
 
     String username = DEFAULT_SPLUNK_USERNAME;
@@ -167,8 +169,8 @@ public class SplunkResourceManager extends 
TestContainerResourceManager<SplunkCo
         .withPassword(builder.password);
   }
 
-  public static Builder builder(String testId) {
-    return new Builder(testId);
+  public static SplunkResourceManager.Builder builder(String testId) {
+    return new SplunkResourceManager.Builder(testId);
   }
 
   /**
diff --git 
a/it/testcontainers/src/main/java/org/apache/beam/it/testcontainers/TestContainerResourceManager.java
 
b/it/testcontainers/src/main/java/org/apache/beam/it/testcontainers/TestContainerResourceManager.java
index 098938a291d..77dd6da5805 100644
--- 
a/it/testcontainers/src/main/java/org/apache/beam/it/testcontainers/TestContainerResourceManager.java
+++ 
b/it/testcontainers/src/main/java/org/apache/beam/it/testcontainers/TestContainerResourceManager.java
@@ -32,11 +32,11 @@ import org.testcontainers.containers.GenericContainer;
  * resources.
  *
  * <p>Optionally, a static resource can be specified by calling the 
useStaticContainer() method in
- * the {@link Builder} class. A static resource is a pre-configured database 
or other resource that
- * is ready to be connected to by the resource manager. This could be a 
pre-existing TestContainer
- * that has not been closed, a local database instance, a remote VM, or any 
other source that can be
- * connected to. If a static container is used, the host and port must also be 
configured using the
- * Builder's setHost() and setPort() methods, respectively.
+ * the {@link TestContainerResourceManager.Builder} class. A static resource 
is a pre-configured
+ * database or other resource that is ready to be connected to by the resource 
manager. This could
+ * be a pre-existing TestContainer that has not been closed, a local database 
instance, a remote VM,
+ * or any other source that can be connected to. If a static container is 
used, the host and port
+ * must also be configured using the Builder's setHost() and setPort() 
methods, respectively.
  */
 public abstract class TestContainerResourceManager<T extends 
GenericContainer<?>>
     implements ResourceManager {
@@ -48,11 +48,12 @@ public abstract class TestContainerResourceManager<T 
extends GenericContainer<?>
   private final String host;
   protected int port;
 
-  protected <B extends Builder<?>> TestContainerResourceManager(T container, B 
builder) {
+  protected <B extends TestContainerResourceManager.Builder<?>> 
TestContainerResourceManager(
+      T container, B builder) {
     this(container, builder, null);
   }
 
-  protected <B extends Builder<?>> TestContainerResourceManager(
+  protected <B extends TestContainerResourceManager.Builder<?>> 
TestContainerResourceManager(
       T container, B builder, @Nullable Callable<Void> setup) {
     this.container = container;
     this.usingStaticContainer = builder.useStaticContainer;
diff --git 
a/it/truthmatchers/src/main/java/org/apache/beam/it/truthmatchers/LaunchInfoSubject.java
 
b/it/truthmatchers/src/main/java/org/apache/beam/it/truthmatchers/LaunchInfoSubject.java
index a496ecce944..30a27c9ad25 100644
--- 
a/it/truthmatchers/src/main/java/org/apache/beam/it/truthmatchers/LaunchInfoSubject.java
+++ 
b/it/truthmatchers/src/main/java/org/apache/beam/it/truthmatchers/LaunchInfoSubject.java
@@ -43,7 +43,7 @@ public final class LaunchInfoSubject extends Subject {
   }
 
   /**
-   * Check if the subject reflects succeeded states. A successfully {@link 
LaunchInfo} does not mean
+   * Check if the subject reflects succeeded states. A successful {@link 
LaunchInfo} does not mean
    * that the pipeline finished and no errors happened, it just means that the 
job was able to get
    * itself into an active state (RUNNING, UPDATED).
    */
diff --git 
a/it/truthmatchers/src/main/java/org/apache/beam/it/truthmatchers/RecordsSubject.java
 
b/it/truthmatchers/src/main/java/org/apache/beam/it/truthmatchers/RecordsSubject.java
index 75d5ce3a67c..39a0c0cebed 100644
--- 
a/it/truthmatchers/src/main/java/org/apache/beam/it/truthmatchers/RecordsSubject.java
+++ 
b/it/truthmatchers/src/main/java/org/apache/beam/it/truthmatchers/RecordsSubject.java
@@ -81,7 +81,7 @@ public class RecordsSubject extends Subject {
     Map<String, Object> expected = convertMapToTreeMap(subset);
     for (Map<String, Object> candidate : actual) {
       boolean match = true;
-      for (Entry<String, Object> entry : subset.entrySet()) {
+      for (Map.Entry<String, Object> entry : subset.entrySet()) {
         if (!candidate.containsKey(entry.getKey())
             || !candidate.get(entry.getKey()).equals(entry.getValue())) {
           match = false;

Reply via email to